"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.EventualMap = exports.streamCollect = exports.streamCollectInto = void 0;
const maps_1 = require("./maps");
const support_1 = require("../support");
/**
* Insert the entries of a ReadableStream into `seed` with an optional Reconciler.
*
* @param {ReadableStream} stream The input stream.
* @param {Map} seed The Map to update with the contents of `stream`.
* @param {Reconciler} reconcileFn Function to call to resolve collisions.
* @returns A promise of the updated map, to be returned when the ReadableStream closes.
*/
async function streamCollectInto(stream, seed, reconcileFn) {
if (reconcileFn) {
await stream.forEach(entry => {
const [key, val] = entry;
const reconciled = reconcileFn(seed.get(key), val, key);
reconciled !== undefined && seed.set(key, reconciled);
});
}
else {
await stream.forEach(entry => {
const [key, val] = entry;
seed.set(key, val);
});
}
return seed;
}
exports.streamCollectInto = streamCollectInto;
/**
* Generate a new map from the ReadableStream of entries using an optional Reconciler.
*
* @param {ReadableStream} stream The input stream.
* @param {Reconciler} reconcileFn Function to call to resolve collisions.
* @returns A promise of the generated map, to be returned when the ReadableStream closes.
*/
function streamCollect(stream, reconcileFn) {
return streamCollectInto(stream, new Map(), reconcileFn);
}
exports.streamCollect = streamCollect;
const none = {
isSome: false
};
function some(val) {
return {
isSome: true,
value: val
};
}
function foldOption(opt, some, none) {
if (opt.isSome) {
return some(opt.value);
}
else {
return none();
}
}
async function getGetOrHasPromise({ finalized }, switchboard, underlyingMap, key) {
if (finalized || underlyingMap.has(key)) {
return underlyingMap.has(key) ? some(underlyingMap.get(key))
: none;
}
else if (switchboard.has(key)) {
return maps_1.getOrFail(switchboard, key)[0];
}
else {
let resolver;
const newPromise = new Promise(resolve => resolver = resolve);
switchboard.set(key, [newPromise, support_1.defined(resolver, "Resolver not properly captured from Promise, this might be due to an unexpected implementation of Promises")]);
return newPromise;
}
}
async function queryMap({ finalized }, switchboard, underlyingMap, onSome, onNone, key) {
const ret = await getGetOrHasPromise({ finalized }, switchboard, underlyingMap, key);
return foldOption(ret, onSome, onNone);
}
/**
* Initialize an EventualMap from a stream of entries. An EventualMap is a Map-like object that returns Promises which resolve as soon as possible.
*
* - If a request comes in for a key that has already been loaded in from the stream, it resolves immediately with that value.
*
* - If a request comes in before the corresponding entry arrives, it is added to a queue.
*
* - When the entry with the request key comes in, the Promise resolves with that value.
*
* - If the stream ends, and the requested key has not arrived in the stream, the Promise resolves with `undefined`.
*
* @remarks
* To ensure the correctness of early `get` calls, the eventualMap does not allow existing values to be overwritten.
* Instead, collisions can be resolved by modifying the incoming key using the `bumper` option.
* If the `bumper` returns `undefined`, the second entry to arrive is simply ignored.
*
* @param {Stream.ReadableStream} stream The input stream to draw the entries from.
* @param {object} opts
* - bumper The function to call on key collisions to get a new key for the colliding entry.
* By default, after a key arrives, subsequent entries with the same key will be discarded.
* - seed The Map to load entries into. By default, generates a new Map.
*
* @returns A Map that is in the process of being built from a Stream.
*
* @method get Return the value that will eventually be at the key.
* @method has Return `true` if the key is eventually set, `false` if it is not set before the input stream ends.
* @method getOrElse Return the value that will eventually be at the key, or the result of calling the argument function `substitute` if the key is not set before the input stream ends.
* @method getOrVal Return the value that will eventually be at the key, or `substitute` if the key is not set before the input stream ends.
* @method getOrFail Return the value that will eventually be at the key or throw an error if the key is not set before the input stream ends.
* @method foldingGet Return the result of calling `some` on the input value when the key is set, the result of calling `none` if the result is not set before the input stream ends.
* @method getNow Immediately return the value that is at the key whether the input stream has ended or not.
* @method hasNow Return `true` if the key is set now, `false` otherwise.
* @field _underlyingMap The Map that is being populated with Stream entries.
* This must be accessed with caution as mutating operations on `_underlyingMap`, like `set` and `delete`, destroy all correctness guarantees for the other methods.
* @field finalMap A Promise resolving to `underlyingMap` when the input stream ends.
*/
function EventualMap(stream, { bumper, seed } = {}) {
const _underlyingMap = seed || new Map();
const switchboard = new Map();
let resolveFinalMapPromise;
let finalizedWrapper = {
finalized: false
};
const finalMapPromise = new Promise((resolve, _) => {
resolveFinalMapPromise = (val) => {
finalizedWrapper.finalized = true;
resolve(val);
};
});
stream.forEach(([key, value]) => {
let keyToUse;
if (_underlyingMap.has(key)) {
if (bumper) {
let newKey = key;
let attempts = 0;
do {
attempts++;
const innerNewKey = bumper(newKey, attempts, key, maps_1.getOrFail(_underlyingMap, key), value);
if (innerNewKey === undefined) {
// Failed to set
break;
}
else if (!_underlyingMap.has(innerNewKey)) {
_underlyingMap.set(innerNewKey, value);
break;
}
else {
newKey = innerNewKey;
}
} while (!!newKey);
}
else {
keyToUse = undefined;
}
}
else {
keyToUse = key;
}
if (!!keyToUse) {
_underlyingMap.set(keyToUse, value);
maps_1.foldingGet(switchboard, keyToUse, ([_, resolver]) => resolver(some(value)));
}
}).then(() => {
switchboard.forEach(([_, resolver]) => resolver(none));
resolveFinalMapPromise(_underlyingMap);
});
return {
get: (key) => queryMap(finalizedWrapper, switchboard, _underlyingMap, some => some, () => undefined, key),
has: (key) => queryMap(finalizedWrapper, switchboard, _underlyingMap, () => true, () => false, key),
getOrElse: (key, substitute) => queryMap(finalizedWrapper, switchboard, _underlyingMap, (val) => val, () => substitute(key), key),
getOrVal: (key, substitute) => queryMap(finalizedWrapper, switchboard, _underlyingMap, (val) => val, () => substitute, key),
getOrFail: (key, error) => queryMap(finalizedWrapper, switchboard, _underlyingMap, (val) => val, () => {
throw new Error(typeof error === "function"
? error(key)
: typeof error === "undefined"
? `Map has no entry "${key}"`
: error);
}, key),
foldingGet(key, some, none) {
return queryMap(finalizedWrapper, switchboard, _underlyingMap, some, none, key);
},
getNow(key) {
return _underlyingMap.get(key);
},
hasNow(key) {
return _underlyingMap.has(key);
},
_underlyingMap,
finalMap: finalMapPromise
};
}
exports.EventualMap = EventualMap;