import { RangeIterable } from './util/RangeIterable.js'; import { getAddress, Cursor, Txn, orderedBinary, lmdbError, getByBinary, setGlobalBuffer, prefetch, iterate, position as doPosition, resetTxn, getCurrentValue, getCurrentShared, getStringByBinary, globalBuffer, getSharedBuffer, startRead, setReadCallback, directWrite, getUserSharedBuffer, notifyUserCallbacks, attemptLock, unlock, } from './native.js'; import { saveKey } from './keys.js'; const IF_EXISTS = 3.542694326329068e-103; const DEFAULT_BEGINNING_KEY = Buffer.from([5]); // the default starting key for iteration, which excludes symbols/metadata const ITERATOR_DONE = { done: true, value: undefined }; const Uint8ArraySlice = Uint8Array.prototype.slice; let getValueBytes = globalBuffer; if (!getValueBytes.maxLength) { getValueBytes.maxLength = getValueBytes.length; getValueBytes.isGlobal = true; Object.defineProperty(getValueBytes, 'length', { value: getValueBytes.length, writable: true, configurable: true, }); } const START_ADDRESS_POSITION = 4064; const NEW_BUFFER_THRESHOLD = 0x8000; const SOURCE_SYMBOL = Symbol.for('source'); export const UNMODIFIED = {}; let mmaps = []; export function addReadMethods( LMDBStore, { maxKeySize, env, keyBytes, keyBytesView, getLastVersion, getLastTxnId }, ) { let readTxn, readTxnRenewed, asSafeBuffer = false; let renewId = 1; let outstandingReads = 0; Object.assign(LMDBStore.prototype, { getString(id, options) { let txn = env.writeTxn || (options && options.transaction) || (readTxnRenewed ? readTxn : renewReadTxn(this)); let string = getStringByBinary( this.dbAddress, this.writeKey(id, keyBytes, 0), txn.address || 0, ); if (typeof string === 'number') { // indicates the buffer wasn't large enough this._allocateGetBuffer(string); // and then try again string = getStringByBinary( this.dbAddress, this.writeKey(id, keyBytes, 0), txn.address || 0, ); } if (string) this.lastSize = string.length; return string; }, getBinaryFast(id, options) { let rc; let txn = env.writeTxn || (options && options.transaction) || (readTxnRenewed ? readTxn : renewReadTxn(this)); rc = this.lastSize = getByBinary( this.dbAddress, this.writeKey(id, keyBytes, 0), (options && options.ifNotTxnId) || 0, txn.address || 0, ); if (rc < 0) { if (rc == -30798) // MDB_NOTFOUND return; // undefined if (rc == -30004) // txn id matched return UNMODIFIED; if ( rc == -30781 /*MDB_BAD_VALSIZE*/ && this.writeKey(id, keyBytes, 0) == 0 ) throw new Error( id === undefined ? 'A key is required for get, but is undefined' : 'Zero length key is not allowed in LMDB', ); if (rc == -30000) // int32 overflow, read uint32 rc = this.lastSize = keyBytesView.getUint32(0, true); else if (rc == -30001) { // shared buffer this.lastSize = keyBytesView.getUint32(0, true); let bufferId = keyBytesView.getUint32(4, true); let bytes = getMMapBuffer(bufferId, this.lastSize); return asSafeBuffer ? Buffer.from(bytes) : bytes; } else throw lmdbError(rc); } let compression = this.compression; let bytes = compression ? compression.getValueBytes : getValueBytes; if (rc > bytes.maxLength) { // this means the target buffer wasn't big enough, so the get failed to copy all the data from the database, need to either grow or use special buffer return this._returnLargeBuffer(() => getByBinary( this.dbAddress, this.writeKey(id, keyBytes, 0), 0, txn.address || 0, ), ); } bytes.length = this.lastSize; return bytes; }, getBFAsync(id, options, callback) { let txn = env.writeTxn || (options && options.transaction) || (readTxnRenewed ? readTxn : renewReadTxn(this)); txn.refCount = (txn.refCount || 0) + 1; outstandingReads++; if (!txn.address) { throw new Error('Invalid transaction, it has no address'); } let address = recordReadInstruction( txn.address, this.db.dbi, id, this.writeKey, maxKeySize, (rc, bufferId, offset, size) => { if (rc && rc !== 1) callback(lmdbError(rc)); outstandingReads--; let buffer = mmaps[bufferId]; if (!buffer) { buffer = mmaps[bufferId] = getSharedBuffer(bufferId, env.address); } //console.log({bufferId, offset, size}) if (buffer.isSharedMap) { // using LMDB shared memory // TODO: We may want explicit support for clearing aborting the transaction on the next event turn, // but for now we are relying on the GC to cleanup transaction for larger blocks of memory let bytes = new Uint8Array(buffer, offset, size); bytes.txn = txn; callback(bytes, 0, size); } else { // using copied memory txn.done(); // decrement and possibly abort callback(buffer, offset, size); } }, ); if (address) { startRead(address, () => { resolveReads(); }); } }, getAsync(id, options, callback) { let promise; if (!callback) promise = new Promise((resolve) => (callback = resolve)); this.getBFAsync(id, options, (buffer, offset, size) => { if (this.useVersions) { // TODO: And get the version offset += 8; size -= 8; } let bytes = new Uint8Array(buffer, offset, size); let value; if (this.decoder) { // the decoder potentially uses the data from the buffer in the future and needs a stable buffer value = bytes && this.decoder.decode(bytes); } else if (this.encoding == 'binary') { value = bytes; } else { value = Buffer.prototype.utf8Slice.call(bytes, 0, size); if (this.encoding == 'json' && value) value = JSON.parse(value); } callback(value); }); return promise; }, retain(data, options) { if (!data) return; let source = data[SOURCE_SYMBOL]; let buffer = source ? source.bytes : data; if (!buffer.isGlobal && !env.writeTxn) { let txn = options?.transaction || (readTxnRenewed ? readTxn : renewReadTxn(this)); buffer.txn = txn; txn.refCount = (txn.refCount || 0) + 1; return data; } else { buffer = Uint8ArraySlice.call(buffer, 0, this.lastSize); if (source) { source.bytes = buffer; return data; } else return buffer; } }, _returnLargeBuffer(getFast) { let bytes; let compression = this.compression; if (asSafeBuffer && this.lastSize > NEW_BUFFER_THRESHOLD) { // used by getBinary to indicate it should create a dedicated buffer to receive this let bytesToRestore; try { if (compression) { bytesToRestore = compression.getValueBytes; let dictionary = compression.dictionary || []; let dictLength = (dictionary.length >> 3) << 3; // make sure it is word-aligned bytes = makeReusableBuffer(this.lastSize); compression.setBuffer( bytes.buffer, bytes.byteOffset, this.lastSize, dictionary, dictLength, ); compression.getValueBytes = bytes; } else { bytesToRestore = getValueBytes; setGlobalBuffer( (bytes = getValueBytes = makeReusableBuffer(this.lastSize)), ); } getFast(); } finally { if (compression) { let dictLength = (compression.dictionary.length >> 3) << 3; compression.setBuffer( bytesToRestore.buffer, bytesToRestore.byteOffset, bytesToRestore.maxLength, compression.dictionary, dictLength, ); compression.getValueBytes = bytesToRestore; } else { setGlobalBuffer(bytesToRestore); getValueBytes = bytesToRestore; } } return bytes; } // grow our shared/static buffer to accomodate the size of the data bytes = this._allocateGetBuffer(this.lastSize); // and try again getFast(); bytes.length = this.lastSize; return bytes; }, _allocateGetBuffer(lastSize) { let newLength = Math.min(Math.max(lastSize * 2, 0x1000), 0xfffffff8); let bytes; if (this.compression) { let dictionary = this.compression.dictionary || Buffer.allocUnsafeSlow(0); let dictLength = (dictionary.length >> 3) << 3; // make sure it is word-aligned bytes = Buffer.allocUnsafeSlow(newLength + dictLength); bytes.set(dictionary); // copy dictionary into start // the section after the dictionary is the target area for get values bytes = bytes.subarray(dictLength); this.compression.setBuffer( bytes.buffer, bytes.byteOffset, newLength, dictionary, dictLength, ); bytes.maxLength = newLength; Object.defineProperty(bytes, 'length', { value: newLength, writable: true, configurable: true, }); this.compression.getValueBytes = bytes; } else { bytes = makeReusableBuffer(newLength); setGlobalBuffer((getValueBytes = bytes)); } bytes.isGlobal = true; return bytes; }, getBinary(id, options) { try { asSafeBuffer = true; let fastBuffer = this.getBinaryFast(id, options); return ( fastBuffer && (fastBuffer.isGlobal ? Uint8ArraySlice.call(fastBuffer, 0, this.lastSize) : fastBuffer) ); } finally { asSafeBuffer = false; } }, getSharedBinary(id, options) { let fastBuffer = this.getBinaryFast(id, options); if (fastBuffer) { if (fastBuffer.isGlobal || writeTxn) return Uint8ArraySlice.call(fastBuffer, 0, this.lastSize); fastBuffer.txn = options && options.transaction; options.transaction.refCount = (options.transaction.refCount || 0) + 1; return fastBuffer; } }, get(id, options) { if (this.decoderCopies) { // the decoder copies any data, so we can use the fast binary retrieval that overwrites the same buffer space let bytes = this.getBinaryFast(id, options); return ( bytes && (bytes == UNMODIFIED ? UNMODIFIED : this.decoder.decode(bytes, options)) ); } if (this.encoding == 'binary') return this.getBinary(id, options); if (this.decoder) { // the decoder potentially uses the data from the buffer in the future and needs a stable buffer let bytes = this.getBinary(id, options); return ( bytes && (bytes == UNMODIFIED ? UNMODIFIED : this.decoder.decode(bytes)) ); } let result = this.getString(id, options); if (result) { if (this.encoding == 'json') return JSON.parse(result); } return result; }, getEntry(id, options) { let value = this.get(id, options); if (value !== undefined) { if (this.useVersions) return { value, version: getLastVersion(), //size: this.lastSize }; else return { value, //size: this.lastSize }; } }, directWrite(id, options) { let rc; let txn = env.writeTxn || (options && options.transaction) || (readTxnRenewed ? readTxn : renewReadTxn(this)); let keySize = this.writeKey(id, keyBytes, 0); let dataOffset = ((keySize >> 3) + 1) << 3; keyBytes.set(options.bytes, dataOffset); rc = directWrite( this.dbAddress, keySize, options.offset, options.bytes.length, txn.address || 0, ); if (rc < 0) lmdbError(rc); }, getUserSharedBuffer(id, defaultBuffer, options) { let keySize; const setKeyBytes = () => { if (options?.envKey) keySize = this.writeKey(id, keyBytes, 0); else { keyBytes.dataView.setUint32(0, this.db.dbi); keySize = this.writeKey(id, keyBytes, 4); } }; setKeyBytes(); let sharedBuffer = getUserSharedBuffer( env.address, keySize, defaultBuffer, options?.callback, ); sharedBuffer.notify = () => { setKeyBytes(); return notifyUserCallbacks(env.address, keySize); }; return sharedBuffer; }, attemptLock(id, version, callback) { if (!env.address) throw new Error('Can not operate on a closed database'); keyBytes.dataView.setUint32(0, this.db.dbi); keyBytes.dataView.setFloat64(4, version); let keySize = this.writeKey(id, keyBytes, 12); return attemptLock(env.address, keySize, callback); }, unlock(id, version, onlyCheck) { if (!env.address) throw new Error('Can not operate on a closed database'); keyBytes.dataView.setUint32(0, this.db.dbi); keyBytes.dataView.setFloat64(4, version); let keySize = this.writeKey(id, keyBytes, 12); return unlock(env.address, keySize, onlyCheck); }, hasLock(id, version) { return this.unlock(id, version, true); }, resetReadTxn() { resetReadTxn(); }, _commitReadTxn() { if (readTxn) { readTxn.isCommitted = true; readTxn.commit(); } lastReadTxnRef = null; readTxnRenewed = null; readTxn = null; }, ensureReadTxn() { if (!env.writeTxn && !readTxnRenewed) renewReadTxn(this); }, doesExist(key, versionOrValue, options) { if (versionOrValue == null) { // undefined means the entry exists, null is used specifically to check for the entry *not* existing return ( (this.getBinaryFast(key, options) === undefined) == (versionOrValue === null) ); } else if (this.useVersions) { return ( this.getBinaryFast(key, options) !== undefined && (versionOrValue === IF_EXISTS || getLastVersion() === versionOrValue) ); } else { if (versionOrValue && versionOrValue['\x10binary-data\x02']) versionOrValue = versionOrValue['\x10binary-data\x02']; else if (this.encoder) versionOrValue = this.encoder.encode(versionOrValue); if (typeof versionOrValue == 'string') versionOrValue = Buffer.from(versionOrValue); let defaultOptions = { start: versionOrValue, exactMatch: true }; return ( this.getValuesCount( key, options ? Object.assign(defaultOptions, options) : defaultOptions, ) > 0 ); } }, getValues(key, options) { let defaultOptions = { key, valuesForKey: true, }; if (options && options.snapshot === false) throw new Error('Can not disable snapshots for getValues'); return this.getRange( options ? Object.assign(defaultOptions, options) : defaultOptions, ); }, getKeys(options) { if (!options) options = {}; options.values = false; return this.getRange(options); }, getCount(options) { if (!options) options = {}; options.onlyCount = true; return this.getRange(options).iterate(); }, getKeysCount(options) { if (!options) options = {}; options.onlyCount = true; options.values = false; return this.getRange(options).iterate(); }, getValuesCount(key, options) { if (!options) options = {}; options.key = key; options.valuesForKey = true; options.onlyCount = true; return this.getRange(options).iterate(); }, getRange(options) { let iterable = new RangeIterable(); let textDecoder = new TextDecoder(); if (!options) options = {}; let includeValues = options.values !== false; let includeVersions = options.versions; let valuesForKey = options.valuesForKey; let limit = options.limit; let db = this.db; let snapshot = options.snapshot; if (snapshot === false && this.dupSort && includeValues) throw new Error( 'Can not disable snapshot on a' + ' dupSort data store', ); let compression = this.compression; iterable.iterate = () => { const reverse = options.reverse; let currentKey = valuesForKey ? options.key : reverse || 'start' in options ? options.start : DEFAULT_BEGINNING_KEY; let count = 0; let cursor, cursorRenewId, cursorAddress; let txn; let flags = (includeValues ? 0x100 : 0) | (reverse ? 0x400 : 0) | (valuesForKey ? 0x800 : 0) | (options.exactMatch ? 0x4000 : 0) | (options.inclusiveEnd ? 0x8000 : 0) | (options.exclusiveStart ? 0x10000 : 0); let store = this; function resetCursor() { try { if (cursor) finishCursor(); let txnAddress; txn = options.transaction; if (txn) { if (txn.isDone) throw new Error( 'Can not iterate on range with transaction that is already' + ' done', ); txnAddress = txn.address; if (!txnAddress) { throw new Error('Invalid transaction, it has no address'); } cursor = null; } else { let writeTxn = env.writeTxn; if (writeTxn) snapshot = false; txn = env.writeTxn || options.transaction || (readTxnRenewed ? readTxn : renewReadTxn(store)); cursor = !writeTxn && db.availableCursor; } if (cursor) { db.availableCursor = null; flags |= 0x2000; } else { cursor = new Cursor(db, txnAddress || 0); } cursorAddress = cursor.address; if (txn.use) txn.use(); // track transaction so we always use the same one else txn.refCount = (txn.refCount || 0) + 1; if (snapshot === false) { cursorRenewId = renewId; // use shared read transaction txn.renewingRefCount = (txn.renewingRefCount || 0) + 1; // need to know how many are renewing cursors } } catch (error) { if (cursor) { try { cursor.close(); } catch (error) {} } throw error; } } resetCursor(); if (options.onlyCount) { flags |= 0x1000; let count = position(options.offset); if (count < 0) lmdbError(count); finishCursor(); return count; } function position(offset) { if (!env.address) { throw new Error('Can not iterate on a closed database'); } let keySize = currentKey === undefined ? 0 : store.writeKey(currentKey, keyBytes, 0); let endAddress; if (valuesForKey) { if (options.start === undefined && options.end === undefined) endAddress = 0; else { let startAddress; if (store.encoder.writeKey) { startAddress = saveKey( options.start, store.encoder.writeKey, iterable, maxKeySize, ); keyBytesView.setFloat64( START_ADDRESS_POSITION, startAddress, true, ); endAddress = saveKey( options.end, store.encoder.writeKey, iterable, maxKeySize, ); } else if ( (!options.start || options.start instanceof Uint8Array) && (!options.end || options.end instanceof Uint8Array) ) { startAddress = saveKey( options.start, orderedBinary.writeKey, iterable, maxKeySize, ); keyBytesView.setFloat64( START_ADDRESS_POSITION, startAddress, true, ); endAddress = saveKey( options.end, orderedBinary.writeKey, iterable, maxKeySize, ); } else { throw new Error( 'Only key-based encoding is supported for start/end values', ); let encoded = store.encoder.encode(options.start); let bufferAddress = encoded.buffer.address || (encoded.buffer.address = getAddress(encoded.buffer) - encoded.byteOffset); startAddress = bufferAddress + encoded.byteOffset; } } } else endAddress = saveKey( reverse && !('end' in options) ? DEFAULT_BEGINNING_KEY : options.end, store.writeKey, iterable, maxKeySize, ); return doPosition( cursorAddress, flags, offset || 0, keySize, endAddress, ); } function finishCursor() { if (!cursor || txn.isDone) return; if (iterable.onDone) iterable.onDone(); if (cursorRenewId) txn.renewingRefCount--; if (txn.refCount <= 1 && txn.notCurrent) { cursor.close(); // this must be closed before the transaction is aborted or it can cause a // segmentation fault } if (txn.done) txn.done(); else if (--txn.refCount <= 0 && txn.notCurrent) { txn.abort(); txn.isDone = true; } if (!txn.isDone) { if (db.availableCursor || txn != readTxn) { cursor.close(); } else { // try to reuse it db.availableCursor = cursor; db.cursorTxn = txn; } } cursor = null; } return { next() { let keySize, lastSize; if (cursorRenewId && (cursorRenewId != renewId || txn.isDone)) { if (flags & 0x10000) flags = flags & ~0x10000; // turn off exclusive start when repositioning resetCursor(); keySize = position(0); } if (!cursor) { return ITERATOR_DONE; } if (count === 0) { // && includeValues) // on first entry, get current value if we need to keySize = position(options.offset); } else keySize = iterate(cursorAddress); if (keySize <= 0 || count++ >= limit) { if (keySize < -30700 && keySize !== -30798) lmdbError(keySize); finishCursor(); return ITERATOR_DONE; } if (!valuesForKey || snapshot === false) { if (keySize > 20000) { if (keySize > 0x1000000) lmdbError(keySize - 0x100000000); throw new Error('Invalid key size ' + keySize.toString(16)); } currentKey = store.readKey(keyBytes, 32, keySize + 32); } if (includeValues) { let value; lastSize = keyBytesView.getUint32(0, true); let bufferId = keyBytesView.getUint32(4, true); let bytes; if (bufferId) { bytes = getMMapBuffer(bufferId, lastSize); if (store.encoding === 'binary') bytes = Buffer.from(bytes); } else { bytes = compression ? compression.getValueBytes : getValueBytes; if (lastSize > bytes.maxLength) { store.lastSize = lastSize; asSafeBuffer = store.encoding === 'binary'; try { bytes = store._returnLargeBuffer(() => getCurrentValue(cursorAddress), ); } finally { asSafeBuffer = false; } } else bytes.length = lastSize; } if (store.decoder) { value = store.decoder.decode(bytes, lastSize); } else if (store.encoding == 'binary') value = bytes.isGlobal ? Uint8ArraySlice.call(bytes, 0, lastSize) : bytes; else { // use the faster utf8Slice if available, otherwise fall back to TextDecoder (a little slower) // note applying Buffer's utf8Slice to a Uint8Array works in Node, but not in Bun. value = bytes.utf8Slice ? bytes.utf8Slice(0, lastSize) : textDecoder.decode( Uint8ArraySlice.call(bytes, 0, lastSize), ); if (store.encoding == 'json' && value) value = JSON.parse(value); } if (includeVersions) return { value: { key: currentKey, value, version: getLastVersion(), }, }; else if (valuesForKey) return { value, }; else return { value: { key: currentKey, value, }, }; } else if (includeVersions) { return { value: { key: currentKey, version: getLastVersion(), }, }; } else { return { value: currentKey, }; } }, return() { finishCursor(); return ITERATOR_DONE; }, throw() { finishCursor(); return ITERATOR_DONE; }, }; }; return iterable; }, getMany(keys, callback) { // this is an asynchronous get for multiple keys. It actually works by prefetching asynchronously, // allowing a separate thread/task to absorb the potentially largest cost: hard page faults (and disk I/O). // And then we just do standard sync gets (to deserialized data) to fulfil the callback/promise // once the prefetch occurs let promise = callback ? undefined : new Promise( (resolve) => (callback = (error, results) => resolve(results)), ); this.prefetch(keys, () => { let results = new Array(keys.length); for (let i = 0, l = keys.length; i < l; i++) { results[i] = get.call(this, keys[i]); } callback(null, results); }); return promise; }, getSharedBufferForGet(id, options) { let txn = env.writeTxn || (options && options.transaction) || (readTxnRenewed ? readTxn : renewReadTxn(this)); this.lastSize = this.keyIsCompatibility ? txn.getBinaryShared(id) : this.db.get(this.writeKey(id, keyBytes, 0)); if (this.lastSize === -30798) { // not found code return; //undefined } return this.lastSize; this.lastSize = keyBytesView.getUint32(0, true); let bufferIndex = keyBytesView.getUint32(12, true); lastOffset = keyBytesView.getUint32(8, true); let buffer = buffers[bufferIndex]; let startOffset; if ( !buffer || lastOffset < (startOffset = buffer.startOffset) || lastOffset + this.lastSize > startOffset + 0x100000000 ) { if (buffer) env.detachBuffer(buffer.buffer); startOffset = (lastOffset >>> 16) * 0x10000; console.log( 'make buffer for address', bufferIndex * 0x100000000 + startOffset, ); buffer = buffers[bufferIndex] = Buffer.from( getBufferForAddress(bufferIndex * 0x100000000 + startOffset), ); buffer.startOffset = startOffset; } lastOffset -= startOffset; return buffer; return buffer.slice( lastOffset, lastOffset + this.lastSize, ); /*Uint8ArraySlice.call(buffer, lastOffset, lastOffset + this.lastSize)*/ }, prefetch(keys, callback) { if (!keys) throw new Error('An array of keys must be provided'); if (!keys.length) { if (callback) { callback(null); return; } else return Promise.resolve(); } let buffers = []; let startPosition; let bufferHolder = {}; let lastBuffer; for (let key of keys) { let position; if (key && key.key !== undefined && key.value !== undefined) { position = saveKey( key.value, this.writeKey, bufferHolder, maxKeySize, 0x80000000, ); saveReferenceToBuffer(); saveKey(key.key, this.writeKey, bufferHolder, maxKeySize); } else { position = saveKey(key, this.writeKey, bufferHolder, maxKeySize); } if (!startPosition) startPosition = position; saveReferenceToBuffer(); } function saveReferenceToBuffer() { if (bufferHolder.saveBuffer != lastBuffer) { buffers.push(bufferHolder.saveBuffer); lastBuffer = bufferHolder.saveBuffer; } } saveKey(undefined, this.writeKey, bufferHolder, maxKeySize); saveReferenceToBuffer(); outstandingReads++; prefetch(this.dbAddress, startPosition, (error) => { outstandingReads--; if (error) console.error('Error with prefetch', buffers); // partly exists to keep the buffers pinned in memory else callback(null); }); if (!callback) return new Promise((resolve) => (callback = resolve)); }, useReadTransaction() { let txn = readTxnRenewed ? readTxn : renewReadTxn(this); if (!txn.use) { throw new Error('Can not use read transaction from a closed database'); } // because the renew actually happens lazily in read operations, renew needs to be explicit // here in order to actually secure a real read transaction. Try to only do it if necessary; // once it has a refCount, it should be good to go if (!(readTxn.refCount - (readTxn.renewingRefCount || 0) > 0)) txn.renew(); txn.use(); return txn; }, close(callback) { this.status = 'closing'; let txnPromise; if (this.isRoot) { // if it is root, we need to abort and/or wait for transactions to finish if (readTxn) { try { readTxn.abort(); } catch (error) {} } else readTxn = {}; readTxn.isDone = true; Object.defineProperty(readTxn, 'renew', { value: () => { throw new Error('Can not read from a closed database'); }, configurable: true, }); Object.defineProperty(readTxn, 'use', { value: () => { throw new Error('Can not read from a closed database'); }, configurable: true, }); readTxnRenewed = null; txnPromise = this._endWrites && this._endWrites(); } const doClose = () => { if (this.isRoot) { if (outstandingReads > 0) { return new Promise((resolve) => setTimeout(() => resolve(doClose()), 1), ); } env.address = 0; try { env.close(); } catch (error) {} } else this.db.close(); this.status = 'closed'; if (callback) callback(); }; if (txnPromise) return txnPromise.then(doClose); else { doClose(); return Promise.resolve(); } }, getStats() { let txn = env.writeTxn || (readTxnRenewed ? readTxn : renewReadTxn(this)); let dbStats = this.db.stat(); dbStats.root = env.stat(); Object.assign(dbStats, env.info()); dbStats.free = env.freeStat(); return dbStats; }, }); let get = LMDBStore.prototype.get; let lastReadTxnRef; function getMMapBuffer(bufferId, size) { let buffer = mmaps[bufferId]; if (!buffer) { buffer = mmaps[bufferId] = getSharedBuffer(bufferId, env.address); } let offset = keyBytesView.getUint32(8, true); return new Uint8Array(buffer, offset, size); } function renewReadTxn(store) { if (!env.address) { throw new Error('Can not renew a transaction from a closed database'); } if (!readTxn) { let retries = 0; let waitArray; do { try { let lastReadTxn = lastReadTxnRef && lastReadTxnRef.deref(); readTxn = new Txn( env, 0x20000, lastReadTxn && !lastReadTxn.isDone && lastReadTxn, ); if (readTxn.address == 0) { readTxn = lastReadTxn; if (readTxn.notCurrent) readTxn.notCurrent = false; } break; } catch (error) { if (error.message.includes('temporarily')) { if (!waitArray) waitArray = new Int32Array(new SharedArrayBuffer(4), 0, 1); Atomics.wait(waitArray, 0, 0, retries * 2); } else throw error; } } while (retries++ < 100); } // we actually don't renew here, we let the renew take place in the next // lmdb native read/call so as to avoid an extra native call readTxnRenewed = setTimeout(resetReadTxn, 0); store.emit('begin-transaction'); return readTxn; } function resetReadTxn() { renewId++; if (readTxnRenewed) { readTxnRenewed = null; if (readTxn.refCount - (readTxn.renewingRefCount || 0) > 0) { readTxn.notCurrent = true; lastReadTxnRef = new WeakRef(readTxn); readTxn = null; } else if (readTxn.address && !readTxn.isDone) { resetTxn(readTxn.address); } else { console.warn('Attempt to reset an invalid read txn', readTxn); throw new Error('Attempt to reset an invalid read txn'); } } } } export function makeReusableBuffer(size) { let bytes = typeof Buffer != 'undefined' ? Buffer.alloc(size) : new Uint8Array(size); bytes.maxLength = size; Object.defineProperty(bytes, 'length', { value: size, writable: true, configurable: true, }); return bytes; } Txn.prototype.done = function () { this.refCount--; if (this.refCount === 0 && this.notCurrent) { this.abort(); this.isDone = true; } else if (this.refCount < 0) throw new Error('Can not finish a transaction more times than it was used'); }; Txn.prototype.use = function () { this.refCount = (this.refCount || 0) + 1; }; let readInstructions, readCallbacks = new Map(), uint32Instructions, instructionsDataView = { setFloat64() {}, setUint32() {} }, instructionsAddress; let savePosition = 8000; let DYNAMIC_KEY_BUFFER_SIZE = 8192; function allocateInstructionsBuffer() { readInstructions = typeof Buffer != 'undefined' ? Buffer.alloc(DYNAMIC_KEY_BUFFER_SIZE) : new Uint8Array(DYNAMIC_KEY_BUFFER_SIZE); uint32Instructions = new Int32Array( readInstructions.buffer, 0, readInstructions.buffer.byteLength >> 2, ); uint32Instructions[2] = 0xf0000000; // indicates a new read task must be started instructionsAddress = readInstructions.buffer.address = getAddress( readInstructions.buffer, ); readInstructions.dataView = instructionsDataView = new DataView( readInstructions.buffer, readInstructions.byteOffset, readInstructions.byteLength, ); savePosition = 0; } export function recordReadInstruction( txnAddress, dbi, key, writeKey, maxKeySize, callback, ) { if (savePosition > 7800) { allocateInstructionsBuffer(); } let start = savePosition; let keyPosition = savePosition + 16; try { savePosition = key === undefined ? keyPosition : writeKey(key, readInstructions, keyPosition); } catch (error) { if (error.name == 'RangeError') { if (8180 - start < maxKeySize) { allocateInstructionsBuffer(); // try again: return recordReadInstruction( txnAddress, dbi, key, writeKey, maxKeySize, callback, ); } throw new Error('Key was too large, max key size is ' + maxKeySize); } else throw error; } let length = savePosition - keyPosition; if (length > maxKeySize) { savePosition = start; throw new Error( 'Key of size ' + length + ' was too large, max key size is ' + maxKeySize, ); } uint32Instructions[(start >> 2) + 3] = length; // save the length uint32Instructions[(start >> 2) + 2] = dbi; savePosition = (savePosition + 12) & 0xfffffc; instructionsDataView.setFloat64(start, txnAddress, true); let callbackId = addReadCallback(() => { let position = start >> 2; let rc = thisInstructions[position]; callback( rc, thisInstructions[position + 1], thisInstructions[position + 2], thisInstructions[position + 3], ); }); let thisInstructions = uint32Instructions; //if (start === 0) return startRead(instructionsAddress + start, callbackId, {}, 'read'); //else //nextRead(start); } let nextCallbackId = 0; let addReadCallback = globalThis.__lmdb_read_callback; if (!addReadCallback) { addReadCallback = globalThis.__lmdb_read_callback = function (callback) { let callbackId = nextCallbackId++; readCallbacks.set(callbackId, callback); return callbackId; }; setReadCallback(function (callbackId) { readCallbacks.get(callbackId)(); readCallbacks.delete(callbackId); }); }