12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184 |
- import { RangeIterable } from './util/RangeIterable.js';
- import {
- getAddress,
- Cursor,
- Txn,
- orderedBinary,
- lmdbError,
- getByBinary,
- setGlobalBuffer,
- prefetch,
- iterate,
- position as doPosition,
- resetTxn,
- getCurrentValue,
- getCurrentShared,
- getStringByBinary,
- globalBuffer,
- getSharedBuffer,
- startRead,
- setReadCallback,
- directWrite,
- getUserSharedBuffer,
- notifyUserCallbacks,
- attemptLock,
- unlock,
- } from './native.js';
- import { saveKey } from './keys.js';
- const IF_EXISTS = 3.542694326329068e-103;
- const DEFAULT_BEGINNING_KEY = Buffer.from([5]); // the default starting key for iteration, which excludes symbols/metadata
- const ITERATOR_DONE = { done: true, value: undefined };
- const Uint8ArraySlice = Uint8Array.prototype.slice;
- let getValueBytes = globalBuffer;
- if (!getValueBytes.maxLength) {
- getValueBytes.maxLength = getValueBytes.length;
- getValueBytes.isGlobal = true;
- Object.defineProperty(getValueBytes, 'length', {
- value: getValueBytes.length,
- writable: true,
- configurable: true,
- });
- }
- const START_ADDRESS_POSITION = 4064;
- const NEW_BUFFER_THRESHOLD = 0x8000;
- const SOURCE_SYMBOL = Symbol.for('source');
- export const UNMODIFIED = {};
- let mmaps = [];
- export function addReadMethods(
- LMDBStore,
- { maxKeySize, env, keyBytes, keyBytesView, getLastVersion, getLastTxnId },
- ) {
- let readTxn,
- readTxnRenewed,
- asSafeBuffer = false;
- let renewId = 1;
- let outstandingReads = 0;
- Object.assign(LMDBStore.prototype, {
- getString(id, options) {
- let txn =
- env.writeTxn ||
- (options && options.transaction) ||
- (readTxnRenewed ? readTxn : renewReadTxn(this));
- let string = getStringByBinary(
- this.dbAddress,
- this.writeKey(id, keyBytes, 0),
- txn.address || 0,
- );
- if (typeof string === 'number') {
- // indicates the buffer wasn't large enough
- this._allocateGetBuffer(string);
- // and then try again
- string = getStringByBinary(
- this.dbAddress,
- this.writeKey(id, keyBytes, 0),
- txn.address || 0,
- );
- }
- if (string) this.lastSize = string.length;
- return string;
- },
- getBinaryFast(id, options) {
- let rc;
- let txn =
- env.writeTxn ||
- (options && options.transaction) ||
- (readTxnRenewed ? readTxn : renewReadTxn(this));
- rc = this.lastSize = getByBinary(
- this.dbAddress,
- this.writeKey(id, keyBytes, 0),
- (options && options.ifNotTxnId) || 0,
- txn.address || 0,
- );
- if (rc < 0) {
- if (rc == -30798)
- // MDB_NOTFOUND
- return; // undefined
- if (rc == -30004)
- // txn id matched
- return UNMODIFIED;
- if (
- rc == -30781 /*MDB_BAD_VALSIZE*/ &&
- this.writeKey(id, keyBytes, 0) == 0
- )
- throw new Error(
- id === undefined
- ? 'A key is required for get, but is undefined'
- : 'Zero length key is not allowed in LMDB',
- );
- if (rc == -30000)
- // int32 overflow, read uint32
- rc = this.lastSize = keyBytesView.getUint32(0, true);
- else if (rc == -30001) {
- // shared buffer
- this.lastSize = keyBytesView.getUint32(0, true);
- let bufferId = keyBytesView.getUint32(4, true);
- let bytes = getMMapBuffer(bufferId, this.lastSize);
- return asSafeBuffer ? Buffer.from(bytes) : bytes;
- } else throw lmdbError(rc);
- }
- let compression = this.compression;
- let bytes = compression ? compression.getValueBytes : getValueBytes;
- if (rc > bytes.maxLength) {
- // this means the target buffer wasn't big enough, so the get failed to copy all the data from the database, need to either grow or use special buffer
- return this._returnLargeBuffer(() =>
- getByBinary(
- this.dbAddress,
- this.writeKey(id, keyBytes, 0),
- 0,
- txn.address || 0,
- ),
- );
- }
- bytes.length = this.lastSize;
- return bytes;
- },
- getBFAsync(id, options, callback) {
- let txn =
- env.writeTxn ||
- (options && options.transaction) ||
- (readTxnRenewed ? readTxn : renewReadTxn(this));
- txn.refCount = (txn.refCount || 0) + 1;
- outstandingReads++;
- if (!txn.address) {
- throw new Error('Invalid transaction, it has no address');
- }
- let address = recordReadInstruction(
- txn.address,
- this.db.dbi,
- id,
- this.writeKey,
- maxKeySize,
- (rc, bufferId, offset, size) => {
- if (rc && rc !== 1) callback(lmdbError(rc));
- outstandingReads--;
- let buffer = mmaps[bufferId];
- if (!buffer) {
- buffer = mmaps[bufferId] = getSharedBuffer(bufferId, env.address);
- }
- //console.log({bufferId, offset, size})
- if (buffer.isSharedMap) {
- // using LMDB shared memory
- // TODO: We may want explicit support for clearing aborting the transaction on the next event turn,
- // but for now we are relying on the GC to cleanup transaction for larger blocks of memory
- let bytes = new Uint8Array(buffer, offset, size);
- bytes.txn = txn;
- callback(bytes, 0, size);
- } else {
- // using copied memory
- txn.done(); // decrement and possibly abort
- callback(buffer, offset, size);
- }
- },
- );
- if (address) {
- startRead(address, () => {
- resolveReads();
- });
- }
- },
- getAsync(id, options, callback) {
- let promise;
- if (!callback) promise = new Promise((resolve) => (callback = resolve));
- this.getBFAsync(id, options, (buffer, offset, size) => {
- if (this.useVersions) {
- // TODO: And get the version
- offset += 8;
- size -= 8;
- }
- let bytes = new Uint8Array(buffer, offset, size);
- let value;
- if (this.decoder) {
- // the decoder potentially uses the data from the buffer in the future and needs a stable buffer
- value = bytes && this.decoder.decode(bytes);
- } else if (this.encoding == 'binary') {
- value = bytes;
- } else {
- value = Buffer.prototype.utf8Slice.call(bytes, 0, size);
- if (this.encoding == 'json' && value) value = JSON.parse(value);
- }
- callback(value);
- });
- return promise;
- },
- retain(data, options) {
- if (!data) return;
- let source = data[SOURCE_SYMBOL];
- let buffer = source ? source.bytes : data;
- if (!buffer.isGlobal && !env.writeTxn) {
- let txn =
- options?.transaction ||
- (readTxnRenewed ? readTxn : renewReadTxn(this));
- buffer.txn = txn;
- txn.refCount = (txn.refCount || 0) + 1;
- return data;
- } else {
- buffer = Uint8ArraySlice.call(buffer, 0, this.lastSize);
- if (source) {
- source.bytes = buffer;
- return data;
- } else return buffer;
- }
- },
- _returnLargeBuffer(getFast) {
- let bytes;
- let compression = this.compression;
- if (asSafeBuffer && this.lastSize > NEW_BUFFER_THRESHOLD) {
- // used by getBinary to indicate it should create a dedicated buffer to receive this
- let bytesToRestore;
- try {
- if (compression) {
- bytesToRestore = compression.getValueBytes;
- let dictionary = compression.dictionary || [];
- let dictLength = (dictionary.length >> 3) << 3; // make sure it is word-aligned
- bytes = makeReusableBuffer(this.lastSize);
- compression.setBuffer(
- bytes.buffer,
- bytes.byteOffset,
- this.lastSize,
- dictionary,
- dictLength,
- );
- compression.getValueBytes = bytes;
- } else {
- bytesToRestore = getValueBytes;
- setGlobalBuffer(
- (bytes = getValueBytes = makeReusableBuffer(this.lastSize)),
- );
- }
- getFast();
- } finally {
- if (compression) {
- let dictLength = (compression.dictionary.length >> 3) << 3;
- compression.setBuffer(
- bytesToRestore.buffer,
- bytesToRestore.byteOffset,
- bytesToRestore.maxLength,
- compression.dictionary,
- dictLength,
- );
- compression.getValueBytes = bytesToRestore;
- } else {
- setGlobalBuffer(bytesToRestore);
- getValueBytes = bytesToRestore;
- }
- }
- return bytes;
- }
- // grow our shared/static buffer to accomodate the size of the data
- bytes = this._allocateGetBuffer(this.lastSize);
- // and try again
- getFast();
- bytes.length = this.lastSize;
- return bytes;
- },
- _allocateGetBuffer(lastSize) {
- let newLength = Math.min(Math.max(lastSize * 2, 0x1000), 0xfffffff8);
- let bytes;
- if (this.compression) {
- let dictionary =
- this.compression.dictionary || Buffer.allocUnsafeSlow(0);
- let dictLength = (dictionary.length >> 3) << 3; // make sure it is word-aligned
- bytes = Buffer.allocUnsafeSlow(newLength + dictLength);
- bytes.set(dictionary); // copy dictionary into start
- // the section after the dictionary is the target area for get values
- bytes = bytes.subarray(dictLength);
- this.compression.setBuffer(
- bytes.buffer,
- bytes.byteOffset,
- newLength,
- dictionary,
- dictLength,
- );
- bytes.maxLength = newLength;
- Object.defineProperty(bytes, 'length', {
- value: newLength,
- writable: true,
- configurable: true,
- });
- this.compression.getValueBytes = bytes;
- } else {
- bytes = makeReusableBuffer(newLength);
- setGlobalBuffer((getValueBytes = bytes));
- }
- bytes.isGlobal = true;
- return bytes;
- },
- getBinary(id, options) {
- try {
- asSafeBuffer = true;
- let fastBuffer = this.getBinaryFast(id, options);
- return (
- fastBuffer &&
- (fastBuffer.isGlobal
- ? Uint8ArraySlice.call(fastBuffer, 0, this.lastSize)
- : fastBuffer)
- );
- } finally {
- asSafeBuffer = false;
- }
- },
- getSharedBinary(id, options) {
- let fastBuffer = this.getBinaryFast(id, options);
- if (fastBuffer) {
- if (fastBuffer.isGlobal || writeTxn)
- return Uint8ArraySlice.call(fastBuffer, 0, this.lastSize);
- fastBuffer.txn = options && options.transaction;
- options.transaction.refCount = (options.transaction.refCount || 0) + 1;
- return fastBuffer;
- }
- },
- get(id, options) {
- if (this.decoderCopies) {
- // the decoder copies any data, so we can use the fast binary retrieval that overwrites the same buffer space
- let bytes = this.getBinaryFast(id, options);
- return (
- bytes &&
- (bytes == UNMODIFIED
- ? UNMODIFIED
- : this.decoder.decode(bytes, options))
- );
- }
- if (this.encoding == 'binary') return this.getBinary(id, options);
- if (this.decoder) {
- // the decoder potentially uses the data from the buffer in the future and needs a stable buffer
- let bytes = this.getBinary(id, options);
- return (
- bytes &&
- (bytes == UNMODIFIED ? UNMODIFIED : this.decoder.decode(bytes))
- );
- }
- let result = this.getString(id, options);
- if (result) {
- if (this.encoding == 'json') return JSON.parse(result);
- }
- return result;
- },
- getEntry(id, options) {
- let value = this.get(id, options);
- if (value !== undefined) {
- if (this.useVersions)
- return {
- value,
- version: getLastVersion(),
- //size: this.lastSize
- };
- else
- return {
- value,
- //size: this.lastSize
- };
- }
- },
- directWrite(id, options) {
- let rc;
- let txn =
- env.writeTxn ||
- (options && options.transaction) ||
- (readTxnRenewed ? readTxn : renewReadTxn(this));
- let keySize = this.writeKey(id, keyBytes, 0);
- let dataOffset = ((keySize >> 3) + 1) << 3;
- keyBytes.set(options.bytes, dataOffset);
- rc = directWrite(
- this.dbAddress,
- keySize,
- options.offset,
- options.bytes.length,
- txn.address || 0,
- );
- if (rc < 0) lmdbError(rc);
- },
- getUserSharedBuffer(id, defaultBuffer, options) {
- let keySize;
- const setKeyBytes = () => {
- if (options?.envKey) keySize = this.writeKey(id, keyBytes, 0);
- else {
- keyBytes.dataView.setUint32(0, this.db.dbi);
- keySize = this.writeKey(id, keyBytes, 4);
- }
- };
- setKeyBytes();
- let sharedBuffer = getUserSharedBuffer(
- env.address,
- keySize,
- defaultBuffer,
- options?.callback,
- );
- sharedBuffer.notify = () => {
- setKeyBytes();
- return notifyUserCallbacks(env.address, keySize);
- };
- return sharedBuffer;
- },
- attemptLock(id, version, callback) {
- if (!env.address) throw new Error('Can not operate on a closed database');
- keyBytes.dataView.setUint32(0, this.db.dbi);
- keyBytes.dataView.setFloat64(4, version);
- let keySize = this.writeKey(id, keyBytes, 12);
- return attemptLock(env.address, keySize, callback);
- },
- unlock(id, version, onlyCheck) {
- if (!env.address) throw new Error('Can not operate on a closed database');
- keyBytes.dataView.setUint32(0, this.db.dbi);
- keyBytes.dataView.setFloat64(4, version);
- let keySize = this.writeKey(id, keyBytes, 12);
- return unlock(env.address, keySize, onlyCheck);
- },
- hasLock(id, version) {
- return this.unlock(id, version, true);
- },
- resetReadTxn() {
- resetReadTxn();
- },
- _commitReadTxn() {
- if (readTxn) {
- readTxn.isCommitted = true;
- readTxn.commit();
- }
- lastReadTxnRef = null;
- readTxnRenewed = null;
- readTxn = null;
- },
- ensureReadTxn() {
- if (!env.writeTxn && !readTxnRenewed) renewReadTxn(this);
- },
- doesExist(key, versionOrValue, options) {
- if (versionOrValue == null) {
- // undefined means the entry exists, null is used specifically to check for the entry *not* existing
- return (
- (this.getBinaryFast(key, options) === undefined) ==
- (versionOrValue === null)
- );
- } else if (this.useVersions) {
- return (
- this.getBinaryFast(key, options) !== undefined &&
- (versionOrValue === IF_EXISTS || getLastVersion() === versionOrValue)
- );
- } else {
- if (versionOrValue && versionOrValue['\x10binary-data\x02'])
- versionOrValue = versionOrValue['\x10binary-data\x02'];
- else if (this.encoder)
- versionOrValue = this.encoder.encode(versionOrValue);
- if (typeof versionOrValue == 'string')
- versionOrValue = Buffer.from(versionOrValue);
- let defaultOptions = { start: versionOrValue, exactMatch: true };
- return (
- this.getValuesCount(
- key,
- options ? Object.assign(defaultOptions, options) : defaultOptions,
- ) > 0
- );
- }
- },
- getValues(key, options) {
- let defaultOptions = {
- key,
- valuesForKey: true,
- };
- if (options && options.snapshot === false)
- throw new Error('Can not disable snapshots for getValues');
- return this.getRange(
- options ? Object.assign(defaultOptions, options) : defaultOptions,
- );
- },
- getKeys(options) {
- if (!options) options = {};
- options.values = false;
- return this.getRange(options);
- },
- getCount(options) {
- if (!options) options = {};
- options.onlyCount = true;
- return this.getRange(options).iterate();
- },
- getKeysCount(options) {
- if (!options) options = {};
- options.onlyCount = true;
- options.values = false;
- return this.getRange(options).iterate();
- },
- getValuesCount(key, options) {
- if (!options) options = {};
- options.key = key;
- options.valuesForKey = true;
- options.onlyCount = true;
- return this.getRange(options).iterate();
- },
- getRange(options) {
- let iterable = new RangeIterable();
- let textDecoder = new TextDecoder();
- if (!options) options = {};
- let includeValues = options.values !== false;
- let includeVersions = options.versions;
- let valuesForKey = options.valuesForKey;
- let limit = options.limit;
- let db = this.db;
- let snapshot = options.snapshot;
- if (snapshot === false && this.dupSort && includeValues)
- throw new Error(
- 'Can not disable snapshot on a' + ' dupSort data store',
- );
- let compression = this.compression;
- iterable.iterate = () => {
- const reverse = options.reverse;
- let currentKey = valuesForKey
- ? options.key
- : reverse || 'start' in options
- ? options.start
- : DEFAULT_BEGINNING_KEY;
- let count = 0;
- let cursor, cursorRenewId, cursorAddress;
- let txn;
- let flags =
- (includeValues ? 0x100 : 0) |
- (reverse ? 0x400 : 0) |
- (valuesForKey ? 0x800 : 0) |
- (options.exactMatch ? 0x4000 : 0) |
- (options.inclusiveEnd ? 0x8000 : 0) |
- (options.exclusiveStart ? 0x10000 : 0);
- let store = this;
- function resetCursor() {
- try {
- if (cursor) finishCursor();
- let txnAddress;
- txn = options.transaction;
- if (txn) {
- if (txn.isDone)
- throw new Error(
- 'Can not iterate on range with transaction that is already' +
- ' done',
- );
- txnAddress = txn.address;
- if (!txnAddress) {
- throw new Error('Invalid transaction, it has no address');
- }
- cursor = null;
- } else {
- let writeTxn = env.writeTxn;
- if (writeTxn) snapshot = false;
- txn =
- env.writeTxn ||
- options.transaction ||
- (readTxnRenewed ? readTxn : renewReadTxn(store));
- cursor = !writeTxn && db.availableCursor;
- }
- if (cursor) {
- db.availableCursor = null;
- flags |= 0x2000;
- } else {
- cursor = new Cursor(db, txnAddress || 0);
- }
- cursorAddress = cursor.address;
- if (txn.use)
- txn.use(); // track transaction so we always use the same one
- else txn.refCount = (txn.refCount || 0) + 1;
- if (snapshot === false) {
- cursorRenewId = renewId; // use shared read transaction
- txn.renewingRefCount = (txn.renewingRefCount || 0) + 1; // need to know how many are renewing cursors
- }
- } catch (error) {
- if (cursor) {
- try {
- cursor.close();
- } catch (error) {}
- }
- throw error;
- }
- }
- resetCursor();
- if (options.onlyCount) {
- flags |= 0x1000;
- let count = position(options.offset);
- if (count < 0) lmdbError(count);
- finishCursor();
- return count;
- }
- function position(offset) {
- if (!env.address) {
- throw new Error('Can not iterate on a closed database');
- }
- let keySize =
- currentKey === undefined
- ? 0
- : store.writeKey(currentKey, keyBytes, 0);
- let endAddress;
- if (valuesForKey) {
- if (options.start === undefined && options.end === undefined)
- endAddress = 0;
- else {
- let startAddress;
- if (store.encoder.writeKey) {
- startAddress = saveKey(
- options.start,
- store.encoder.writeKey,
- iterable,
- maxKeySize,
- );
- keyBytesView.setFloat64(
- START_ADDRESS_POSITION,
- startAddress,
- true,
- );
- endAddress = saveKey(
- options.end,
- store.encoder.writeKey,
- iterable,
- maxKeySize,
- );
- } else if (
- (!options.start || options.start instanceof Uint8Array) &&
- (!options.end || options.end instanceof Uint8Array)
- ) {
- startAddress = saveKey(
- options.start,
- orderedBinary.writeKey,
- iterable,
- maxKeySize,
- );
- keyBytesView.setFloat64(
- START_ADDRESS_POSITION,
- startAddress,
- true,
- );
- endAddress = saveKey(
- options.end,
- orderedBinary.writeKey,
- iterable,
- maxKeySize,
- );
- } else {
- throw new Error(
- 'Only key-based encoding is supported for start/end values',
- );
- let encoded = store.encoder.encode(options.start);
- let bufferAddress =
- encoded.buffer.address ||
- (encoded.buffer.address =
- getAddress(encoded.buffer) - encoded.byteOffset);
- startAddress = bufferAddress + encoded.byteOffset;
- }
- }
- } else
- endAddress = saveKey(
- reverse && !('end' in options)
- ? DEFAULT_BEGINNING_KEY
- : options.end,
- store.writeKey,
- iterable,
- maxKeySize,
- );
- return doPosition(
- cursorAddress,
- flags,
- offset || 0,
- keySize,
- endAddress,
- );
- }
- function finishCursor() {
- if (!cursor || txn.isDone) return;
- if (iterable.onDone) iterable.onDone();
- if (cursorRenewId) txn.renewingRefCount--;
- if (txn.refCount <= 1 && txn.notCurrent) {
- cursor.close(); // this must be closed before the transaction is aborted or it can cause a
- // segmentation fault
- }
- if (txn.done) txn.done();
- else if (--txn.refCount <= 0 && txn.notCurrent) {
- txn.abort();
- txn.isDone = true;
- }
- if (!txn.isDone) {
- if (db.availableCursor || txn != readTxn) {
- cursor.close();
- } else {
- // try to reuse it
- db.availableCursor = cursor;
- db.cursorTxn = txn;
- }
- }
- cursor = null;
- }
- return {
- next() {
- let keySize, lastSize;
- if (cursorRenewId && (cursorRenewId != renewId || txn.isDone)) {
- if (flags & 0x10000) flags = flags & ~0x10000; // turn off exclusive start when repositioning
- resetCursor();
- keySize = position(0);
- }
- if (!cursor) {
- return ITERATOR_DONE;
- }
- if (count === 0) {
- // && includeValues) // on first entry, get current value if we need to
- keySize = position(options.offset);
- } else keySize = iterate(cursorAddress);
- if (keySize <= 0 || count++ >= limit) {
- if (keySize < -30700 && keySize !== -30798) lmdbError(keySize);
- finishCursor();
- return ITERATOR_DONE;
- }
- if (!valuesForKey || snapshot === false) {
- if (keySize > 20000) {
- if (keySize > 0x1000000) lmdbError(keySize - 0x100000000);
- throw new Error('Invalid key size ' + keySize.toString(16));
- }
- currentKey = store.readKey(keyBytes, 32, keySize + 32);
- }
- if (includeValues) {
- let value;
- lastSize = keyBytesView.getUint32(0, true);
- let bufferId = keyBytesView.getUint32(4, true);
- let bytes;
- if (bufferId) {
- bytes = getMMapBuffer(bufferId, lastSize);
- if (store.encoding === 'binary') bytes = Buffer.from(bytes);
- } else {
- bytes = compression ? compression.getValueBytes : getValueBytes;
- if (lastSize > bytes.maxLength) {
- store.lastSize = lastSize;
- asSafeBuffer = store.encoding === 'binary';
- try {
- bytes = store._returnLargeBuffer(() =>
- getCurrentValue(cursorAddress),
- );
- } finally {
- asSafeBuffer = false;
- }
- } else bytes.length = lastSize;
- }
- if (store.decoder) {
- value = store.decoder.decode(bytes, lastSize);
- } else if (store.encoding == 'binary')
- value = bytes.isGlobal
- ? Uint8ArraySlice.call(bytes, 0, lastSize)
- : bytes;
- else {
- // use the faster utf8Slice if available, otherwise fall back to TextDecoder (a little slower)
- // note applying Buffer's utf8Slice to a Uint8Array works in Node, but not in Bun.
- value = bytes.utf8Slice
- ? bytes.utf8Slice(0, lastSize)
- : textDecoder.decode(
- Uint8ArraySlice.call(bytes, 0, lastSize),
- );
- if (store.encoding == 'json' && value)
- value = JSON.parse(value);
- }
- if (includeVersions)
- return {
- value: {
- key: currentKey,
- value,
- version: getLastVersion(),
- },
- };
- else if (valuesForKey)
- return {
- value,
- };
- else
- return {
- value: {
- key: currentKey,
- value,
- },
- };
- } else if (includeVersions) {
- return {
- value: {
- key: currentKey,
- version: getLastVersion(),
- },
- };
- } else {
- return {
- value: currentKey,
- };
- }
- },
- return() {
- finishCursor();
- return ITERATOR_DONE;
- },
- throw() {
- finishCursor();
- return ITERATOR_DONE;
- },
- };
- };
- return iterable;
- },
- getMany(keys, callback) {
- // this is an asynchronous get for multiple keys. It actually works by prefetching asynchronously,
- // allowing a separate thread/task to absorb the potentially largest cost: hard page faults (and disk I/O).
- // And then we just do standard sync gets (to deserialized data) to fulfil the callback/promise
- // once the prefetch occurs
- let promise = callback
- ? undefined
- : new Promise(
- (resolve) => (callback = (error, results) => resolve(results)),
- );
- this.prefetch(keys, () => {
- let results = new Array(keys.length);
- for (let i = 0, l = keys.length; i < l; i++) {
- results[i] = get.call(this, keys[i]);
- }
- callback(null, results);
- });
- return promise;
- },
- getSharedBufferForGet(id, options) {
- let txn =
- env.writeTxn ||
- (options && options.transaction) ||
- (readTxnRenewed ? readTxn : renewReadTxn(this));
- this.lastSize = this.keyIsCompatibility
- ? txn.getBinaryShared(id)
- : this.db.get(this.writeKey(id, keyBytes, 0));
- if (this.lastSize === -30798) {
- // not found code
- return; //undefined
- }
- return this.lastSize;
- this.lastSize = keyBytesView.getUint32(0, true);
- let bufferIndex = keyBytesView.getUint32(12, true);
- lastOffset = keyBytesView.getUint32(8, true);
- let buffer = buffers[bufferIndex];
- let startOffset;
- if (
- !buffer ||
- lastOffset < (startOffset = buffer.startOffset) ||
- lastOffset + this.lastSize > startOffset + 0x100000000
- ) {
- if (buffer) env.detachBuffer(buffer.buffer);
- startOffset = (lastOffset >>> 16) * 0x10000;
- console.log(
- 'make buffer for address',
- bufferIndex * 0x100000000 + startOffset,
- );
- buffer = buffers[bufferIndex] = Buffer.from(
- getBufferForAddress(bufferIndex * 0x100000000 + startOffset),
- );
- buffer.startOffset = startOffset;
- }
- lastOffset -= startOffset;
- return buffer;
- return buffer.slice(
- lastOffset,
- lastOffset + this.lastSize,
- ); /*Uint8ArraySlice.call(buffer, lastOffset, lastOffset + this.lastSize)*/
- },
- prefetch(keys, callback) {
- if (!keys) throw new Error('An array of keys must be provided');
- if (!keys.length) {
- if (callback) {
- callback(null);
- return;
- } else return Promise.resolve();
- }
- let buffers = [];
- let startPosition;
- let bufferHolder = {};
- let lastBuffer;
- for (let key of keys) {
- let position;
- if (key && key.key !== undefined && key.value !== undefined) {
- position = saveKey(
- key.value,
- this.writeKey,
- bufferHolder,
- maxKeySize,
- 0x80000000,
- );
- saveReferenceToBuffer();
- saveKey(key.key, this.writeKey, bufferHolder, maxKeySize);
- } else {
- position = saveKey(key, this.writeKey, bufferHolder, maxKeySize);
- }
- if (!startPosition) startPosition = position;
- saveReferenceToBuffer();
- }
- function saveReferenceToBuffer() {
- if (bufferHolder.saveBuffer != lastBuffer) {
- buffers.push(bufferHolder.saveBuffer);
- lastBuffer = bufferHolder.saveBuffer;
- }
- }
- saveKey(undefined, this.writeKey, bufferHolder, maxKeySize);
- saveReferenceToBuffer();
- outstandingReads++;
- prefetch(this.dbAddress, startPosition, (error) => {
- outstandingReads--;
- if (error)
- console.error('Error with prefetch', buffers); // partly exists to keep the buffers pinned in memory
- else callback(null);
- });
- if (!callback) return new Promise((resolve) => (callback = resolve));
- },
- useReadTransaction() {
- let txn = readTxnRenewed ? readTxn : renewReadTxn(this);
- if (!txn.use) {
- throw new Error('Can not use read transaction from a closed database');
- }
- // because the renew actually happens lazily in read operations, renew needs to be explicit
- // here in order to actually secure a real read transaction. Try to only do it if necessary;
- // once it has a refCount, it should be good to go
- if (!(readTxn.refCount - (readTxn.renewingRefCount || 0) > 0))
- txn.renew();
- txn.use();
- return txn;
- },
- close(callback) {
- this.status = 'closing';
- let txnPromise;
- if (this.isRoot) {
- // if it is root, we need to abort and/or wait for transactions to finish
- if (readTxn) {
- try {
- readTxn.abort();
- } catch (error) {}
- } else readTxn = {};
- readTxn.isDone = true;
- Object.defineProperty(readTxn, 'renew', {
- value: () => {
- throw new Error('Can not read from a closed database');
- },
- configurable: true,
- });
- Object.defineProperty(readTxn, 'use', {
- value: () => {
- throw new Error('Can not read from a closed database');
- },
- configurable: true,
- });
- readTxnRenewed = null;
- txnPromise = this._endWrites && this._endWrites();
- }
- const doClose = () => {
- if (this.isRoot) {
- if (outstandingReads > 0) {
- return new Promise((resolve) =>
- setTimeout(() => resolve(doClose()), 1),
- );
- }
- env.address = 0;
- try {
- env.close();
- } catch (error) {}
- } else this.db.close();
- this.status = 'closed';
- if (callback) callback();
- };
- if (txnPromise) return txnPromise.then(doClose);
- else {
- doClose();
- return Promise.resolve();
- }
- },
- getStats() {
- let txn = env.writeTxn || (readTxnRenewed ? readTxn : renewReadTxn(this));
- let dbStats = this.db.stat();
- dbStats.root = env.stat();
- Object.assign(dbStats, env.info());
- dbStats.free = env.freeStat();
- return dbStats;
- },
- });
- let get = LMDBStore.prototype.get;
- let lastReadTxnRef;
- function getMMapBuffer(bufferId, size) {
- let buffer = mmaps[bufferId];
- if (!buffer) {
- buffer = mmaps[bufferId] = getSharedBuffer(bufferId, env.address);
- }
- let offset = keyBytesView.getUint32(8, true);
- return new Uint8Array(buffer, offset, size);
- }
- function renewReadTxn(store) {
- if (!env.address) {
- throw new Error('Can not renew a transaction from a closed database');
- }
- if (!readTxn) {
- let retries = 0;
- let waitArray;
- do {
- try {
- let lastReadTxn = lastReadTxnRef && lastReadTxnRef.deref();
- readTxn = new Txn(
- env,
- 0x20000,
- lastReadTxn && !lastReadTxn.isDone && lastReadTxn,
- );
- if (readTxn.address == 0) {
- readTxn = lastReadTxn;
- if (readTxn.notCurrent) readTxn.notCurrent = false;
- }
- break;
- } catch (error) {
- if (error.message.includes('temporarily')) {
- if (!waitArray)
- waitArray = new Int32Array(new SharedArrayBuffer(4), 0, 1);
- Atomics.wait(waitArray, 0, 0, retries * 2);
- } else throw error;
- }
- } while (retries++ < 100);
- }
- // we actually don't renew here, we let the renew take place in the next
- // lmdb native read/call so as to avoid an extra native call
- readTxnRenewed = setTimeout(resetReadTxn, 0);
- store.emit('begin-transaction');
- return readTxn;
- }
- function resetReadTxn() {
- renewId++;
- if (readTxnRenewed) {
- readTxnRenewed = null;
- if (readTxn.refCount - (readTxn.renewingRefCount || 0) > 0) {
- readTxn.notCurrent = true;
- lastReadTxnRef = new WeakRef(readTxn);
- readTxn = null;
- } else if (readTxn.address && !readTxn.isDone) {
- resetTxn(readTxn.address);
- } else {
- console.warn('Attempt to reset an invalid read txn', readTxn);
- throw new Error('Attempt to reset an invalid read txn');
- }
- }
- }
- }
- export function makeReusableBuffer(size) {
- let bytes =
- typeof Buffer != 'undefined' ? Buffer.alloc(size) : new Uint8Array(size);
- bytes.maxLength = size;
- Object.defineProperty(bytes, 'length', {
- value: size,
- writable: true,
- configurable: true,
- });
- return bytes;
- }
- Txn.prototype.done = function () {
- this.refCount--;
- if (this.refCount === 0 && this.notCurrent) {
- this.abort();
- this.isDone = true;
- } else if (this.refCount < 0)
- throw new Error('Can not finish a transaction more times than it was used');
- };
- Txn.prototype.use = function () {
- this.refCount = (this.refCount || 0) + 1;
- };
- let readInstructions,
- readCallbacks = new Map(),
- uint32Instructions,
- instructionsDataView = { setFloat64() {}, setUint32() {} },
- instructionsAddress;
- let savePosition = 8000;
- let DYNAMIC_KEY_BUFFER_SIZE = 8192;
- function allocateInstructionsBuffer() {
- readInstructions =
- typeof Buffer != 'undefined'
- ? Buffer.alloc(DYNAMIC_KEY_BUFFER_SIZE)
- : new Uint8Array(DYNAMIC_KEY_BUFFER_SIZE);
- uint32Instructions = new Int32Array(
- readInstructions.buffer,
- 0,
- readInstructions.buffer.byteLength >> 2,
- );
- uint32Instructions[2] = 0xf0000000; // indicates a new read task must be started
- instructionsAddress = readInstructions.buffer.address = getAddress(
- readInstructions.buffer,
- );
- readInstructions.dataView = instructionsDataView = new DataView(
- readInstructions.buffer,
- readInstructions.byteOffset,
- readInstructions.byteLength,
- );
- savePosition = 0;
- }
- export function recordReadInstruction(
- txnAddress,
- dbi,
- key,
- writeKey,
- maxKeySize,
- callback,
- ) {
- if (savePosition > 7800) {
- allocateInstructionsBuffer();
- }
- let start = savePosition;
- let keyPosition = savePosition + 16;
- try {
- savePosition =
- key === undefined
- ? keyPosition
- : writeKey(key, readInstructions, keyPosition);
- } catch (error) {
- if (error.name == 'RangeError') {
- if (8180 - start < maxKeySize) {
- allocateInstructionsBuffer(); // try again:
- return recordReadInstruction(
- txnAddress,
- dbi,
- key,
- writeKey,
- maxKeySize,
- callback,
- );
- }
- throw new Error('Key was too large, max key size is ' + maxKeySize);
- } else throw error;
- }
- let length = savePosition - keyPosition;
- if (length > maxKeySize) {
- savePosition = start;
- throw new Error(
- 'Key of size ' + length + ' was too large, max key size is ' + maxKeySize,
- );
- }
- uint32Instructions[(start >> 2) + 3] = length; // save the length
- uint32Instructions[(start >> 2) + 2] = dbi;
- savePosition = (savePosition + 12) & 0xfffffc;
- instructionsDataView.setFloat64(start, txnAddress, true);
- let callbackId = addReadCallback(() => {
- let position = start >> 2;
- let rc = thisInstructions[position];
- callback(
- rc,
- thisInstructions[position + 1],
- thisInstructions[position + 2],
- thisInstructions[position + 3],
- );
- });
- let thisInstructions = uint32Instructions;
- //if (start === 0)
- return startRead(instructionsAddress + start, callbackId, {}, 'read');
- //else
- //nextRead(start);
- }
- let nextCallbackId = 0;
- let addReadCallback = globalThis.__lmdb_read_callback;
- if (!addReadCallback) {
- addReadCallback = globalThis.__lmdb_read_callback = function (callback) {
- let callbackId = nextCallbackId++;
- readCallbacks.set(callbackId, callback);
- return callbackId;
- };
- setReadCallback(function (callbackId) {
- readCallbacks.get(callbackId)();
- readCallbacks.delete(callbackId);
- });
- }
|