read.js 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184
  1. import { RangeIterable } from './util/RangeIterable.js';
  2. import {
  3. getAddress,
  4. Cursor,
  5. Txn,
  6. orderedBinary,
  7. lmdbError,
  8. getByBinary,
  9. setGlobalBuffer,
  10. prefetch,
  11. iterate,
  12. position as doPosition,
  13. resetTxn,
  14. getCurrentValue,
  15. getCurrentShared,
  16. getStringByBinary,
  17. globalBuffer,
  18. getSharedBuffer,
  19. startRead,
  20. setReadCallback,
  21. directWrite,
  22. getUserSharedBuffer,
  23. notifyUserCallbacks,
  24. attemptLock,
  25. unlock,
  26. } from './native.js';
  27. import { saveKey } from './keys.js';
  28. const IF_EXISTS = 3.542694326329068e-103;
  29. const DEFAULT_BEGINNING_KEY = Buffer.from([5]); // the default starting key for iteration, which excludes symbols/metadata
  30. const ITERATOR_DONE = { done: true, value: undefined };
  31. const Uint8ArraySlice = Uint8Array.prototype.slice;
  32. let getValueBytes = globalBuffer;
  33. if (!getValueBytes.maxLength) {
  34. getValueBytes.maxLength = getValueBytes.length;
  35. getValueBytes.isGlobal = true;
  36. Object.defineProperty(getValueBytes, 'length', {
  37. value: getValueBytes.length,
  38. writable: true,
  39. configurable: true,
  40. });
  41. }
  42. const START_ADDRESS_POSITION = 4064;
  43. const NEW_BUFFER_THRESHOLD = 0x8000;
  44. const SOURCE_SYMBOL = Symbol.for('source');
  45. export const UNMODIFIED = {};
  46. let mmaps = [];
  47. export function addReadMethods(
  48. LMDBStore,
  49. { maxKeySize, env, keyBytes, keyBytesView, getLastVersion, getLastTxnId },
  50. ) {
  51. let readTxn,
  52. readTxnRenewed,
  53. asSafeBuffer = false;
  54. let renewId = 1;
  55. let outstandingReads = 0;
  56. Object.assign(LMDBStore.prototype, {
  57. getString(id, options) {
  58. let txn =
  59. env.writeTxn ||
  60. (options && options.transaction) ||
  61. (readTxnRenewed ? readTxn : renewReadTxn(this));
  62. let string = getStringByBinary(
  63. this.dbAddress,
  64. this.writeKey(id, keyBytes, 0),
  65. txn.address || 0,
  66. );
  67. if (typeof string === 'number') {
  68. // indicates the buffer wasn't large enough
  69. this._allocateGetBuffer(string);
  70. // and then try again
  71. string = getStringByBinary(
  72. this.dbAddress,
  73. this.writeKey(id, keyBytes, 0),
  74. txn.address || 0,
  75. );
  76. }
  77. if (string) this.lastSize = string.length;
  78. return string;
  79. },
  80. getBinaryFast(id, options) {
  81. let rc;
  82. let txn =
  83. env.writeTxn ||
  84. (options && options.transaction) ||
  85. (readTxnRenewed ? readTxn : renewReadTxn(this));
  86. rc = this.lastSize = getByBinary(
  87. this.dbAddress,
  88. this.writeKey(id, keyBytes, 0),
  89. (options && options.ifNotTxnId) || 0,
  90. txn.address || 0,
  91. );
  92. if (rc < 0) {
  93. if (rc == -30798)
  94. // MDB_NOTFOUND
  95. return; // undefined
  96. if (rc == -30004)
  97. // txn id matched
  98. return UNMODIFIED;
  99. if (
  100. rc == -30781 /*MDB_BAD_VALSIZE*/ &&
  101. this.writeKey(id, keyBytes, 0) == 0
  102. )
  103. throw new Error(
  104. id === undefined
  105. ? 'A key is required for get, but is undefined'
  106. : 'Zero length key is not allowed in LMDB',
  107. );
  108. if (rc == -30000)
  109. // int32 overflow, read uint32
  110. rc = this.lastSize = keyBytesView.getUint32(0, true);
  111. else if (rc == -30001) {
  112. // shared buffer
  113. this.lastSize = keyBytesView.getUint32(0, true);
  114. let bufferId = keyBytesView.getUint32(4, true);
  115. let bytes = getMMapBuffer(bufferId, this.lastSize);
  116. return asSafeBuffer ? Buffer.from(bytes) : bytes;
  117. } else throw lmdbError(rc);
  118. }
  119. let compression = this.compression;
  120. let bytes = compression ? compression.getValueBytes : getValueBytes;
  121. if (rc > bytes.maxLength) {
  122. // this means the target buffer wasn't big enough, so the get failed to copy all the data from the database, need to either grow or use special buffer
  123. return this._returnLargeBuffer(() =>
  124. getByBinary(
  125. this.dbAddress,
  126. this.writeKey(id, keyBytes, 0),
  127. 0,
  128. txn.address || 0,
  129. ),
  130. );
  131. }
  132. bytes.length = this.lastSize;
  133. return bytes;
  134. },
  135. getBFAsync(id, options, callback) {
  136. let txn =
  137. env.writeTxn ||
  138. (options && options.transaction) ||
  139. (readTxnRenewed ? readTxn : renewReadTxn(this));
  140. txn.refCount = (txn.refCount || 0) + 1;
  141. outstandingReads++;
  142. if (!txn.address) {
  143. throw new Error('Invalid transaction, it has no address');
  144. }
  145. let address = recordReadInstruction(
  146. txn.address,
  147. this.db.dbi,
  148. id,
  149. this.writeKey,
  150. maxKeySize,
  151. (rc, bufferId, offset, size) => {
  152. if (rc && rc !== 1) callback(lmdbError(rc));
  153. outstandingReads--;
  154. let buffer = mmaps[bufferId];
  155. if (!buffer) {
  156. buffer = mmaps[bufferId] = getSharedBuffer(bufferId, env.address);
  157. }
  158. //console.log({bufferId, offset, size})
  159. if (buffer.isSharedMap) {
  160. // using LMDB shared memory
  161. // TODO: We may want explicit support for clearing aborting the transaction on the next event turn,
  162. // but for now we are relying on the GC to cleanup transaction for larger blocks of memory
  163. let bytes = new Uint8Array(buffer, offset, size);
  164. bytes.txn = txn;
  165. callback(bytes, 0, size);
  166. } else {
  167. // using copied memory
  168. txn.done(); // decrement and possibly abort
  169. callback(buffer, offset, size);
  170. }
  171. },
  172. );
  173. if (address) {
  174. startRead(address, () => {
  175. resolveReads();
  176. });
  177. }
  178. },
  179. getAsync(id, options, callback) {
  180. let promise;
  181. if (!callback) promise = new Promise((resolve) => (callback = resolve));
  182. this.getBFAsync(id, options, (buffer, offset, size) => {
  183. if (this.useVersions) {
  184. // TODO: And get the version
  185. offset += 8;
  186. size -= 8;
  187. }
  188. let bytes = new Uint8Array(buffer, offset, size);
  189. let value;
  190. if (this.decoder) {
  191. // the decoder potentially uses the data from the buffer in the future and needs a stable buffer
  192. value = bytes && this.decoder.decode(bytes);
  193. } else if (this.encoding == 'binary') {
  194. value = bytes;
  195. } else {
  196. value = Buffer.prototype.utf8Slice.call(bytes, 0, size);
  197. if (this.encoding == 'json' && value) value = JSON.parse(value);
  198. }
  199. callback(value);
  200. });
  201. return promise;
  202. },
  203. retain(data, options) {
  204. if (!data) return;
  205. let source = data[SOURCE_SYMBOL];
  206. let buffer = source ? source.bytes : data;
  207. if (!buffer.isGlobal && !env.writeTxn) {
  208. let txn =
  209. options?.transaction ||
  210. (readTxnRenewed ? readTxn : renewReadTxn(this));
  211. buffer.txn = txn;
  212. txn.refCount = (txn.refCount || 0) + 1;
  213. return data;
  214. } else {
  215. buffer = Uint8ArraySlice.call(buffer, 0, this.lastSize);
  216. if (source) {
  217. source.bytes = buffer;
  218. return data;
  219. } else return buffer;
  220. }
  221. },
  222. _returnLargeBuffer(getFast) {
  223. let bytes;
  224. let compression = this.compression;
  225. if (asSafeBuffer && this.lastSize > NEW_BUFFER_THRESHOLD) {
  226. // used by getBinary to indicate it should create a dedicated buffer to receive this
  227. let bytesToRestore;
  228. try {
  229. if (compression) {
  230. bytesToRestore = compression.getValueBytes;
  231. let dictionary = compression.dictionary || [];
  232. let dictLength = (dictionary.length >> 3) << 3; // make sure it is word-aligned
  233. bytes = makeReusableBuffer(this.lastSize);
  234. compression.setBuffer(
  235. bytes.buffer,
  236. bytes.byteOffset,
  237. this.lastSize,
  238. dictionary,
  239. dictLength,
  240. );
  241. compression.getValueBytes = bytes;
  242. } else {
  243. bytesToRestore = getValueBytes;
  244. setGlobalBuffer(
  245. (bytes = getValueBytes = makeReusableBuffer(this.lastSize)),
  246. );
  247. }
  248. getFast();
  249. } finally {
  250. if (compression) {
  251. let dictLength = (compression.dictionary.length >> 3) << 3;
  252. compression.setBuffer(
  253. bytesToRestore.buffer,
  254. bytesToRestore.byteOffset,
  255. bytesToRestore.maxLength,
  256. compression.dictionary,
  257. dictLength,
  258. );
  259. compression.getValueBytes = bytesToRestore;
  260. } else {
  261. setGlobalBuffer(bytesToRestore);
  262. getValueBytes = bytesToRestore;
  263. }
  264. }
  265. return bytes;
  266. }
  267. // grow our shared/static buffer to accomodate the size of the data
  268. bytes = this._allocateGetBuffer(this.lastSize);
  269. // and try again
  270. getFast();
  271. bytes.length = this.lastSize;
  272. return bytes;
  273. },
  274. _allocateGetBuffer(lastSize) {
  275. let newLength = Math.min(Math.max(lastSize * 2, 0x1000), 0xfffffff8);
  276. let bytes;
  277. if (this.compression) {
  278. let dictionary =
  279. this.compression.dictionary || Buffer.allocUnsafeSlow(0);
  280. let dictLength = (dictionary.length >> 3) << 3; // make sure it is word-aligned
  281. bytes = Buffer.allocUnsafeSlow(newLength + dictLength);
  282. bytes.set(dictionary); // copy dictionary into start
  283. // the section after the dictionary is the target area for get values
  284. bytes = bytes.subarray(dictLength);
  285. this.compression.setBuffer(
  286. bytes.buffer,
  287. bytes.byteOffset,
  288. newLength,
  289. dictionary,
  290. dictLength,
  291. );
  292. bytes.maxLength = newLength;
  293. Object.defineProperty(bytes, 'length', {
  294. value: newLength,
  295. writable: true,
  296. configurable: true,
  297. });
  298. this.compression.getValueBytes = bytes;
  299. } else {
  300. bytes = makeReusableBuffer(newLength);
  301. setGlobalBuffer((getValueBytes = bytes));
  302. }
  303. bytes.isGlobal = true;
  304. return bytes;
  305. },
  306. getBinary(id, options) {
  307. try {
  308. asSafeBuffer = true;
  309. let fastBuffer = this.getBinaryFast(id, options);
  310. return (
  311. fastBuffer &&
  312. (fastBuffer.isGlobal
  313. ? Uint8ArraySlice.call(fastBuffer, 0, this.lastSize)
  314. : fastBuffer)
  315. );
  316. } finally {
  317. asSafeBuffer = false;
  318. }
  319. },
  320. getSharedBinary(id, options) {
  321. let fastBuffer = this.getBinaryFast(id, options);
  322. if (fastBuffer) {
  323. if (fastBuffer.isGlobal || writeTxn)
  324. return Uint8ArraySlice.call(fastBuffer, 0, this.lastSize);
  325. fastBuffer.txn = options && options.transaction;
  326. options.transaction.refCount = (options.transaction.refCount || 0) + 1;
  327. return fastBuffer;
  328. }
  329. },
  330. get(id, options) {
  331. if (this.decoderCopies) {
  332. // the decoder copies any data, so we can use the fast binary retrieval that overwrites the same buffer space
  333. let bytes = this.getBinaryFast(id, options);
  334. return (
  335. bytes &&
  336. (bytes == UNMODIFIED
  337. ? UNMODIFIED
  338. : this.decoder.decode(bytes, options))
  339. );
  340. }
  341. if (this.encoding == 'binary') return this.getBinary(id, options);
  342. if (this.decoder) {
  343. // the decoder potentially uses the data from the buffer in the future and needs a stable buffer
  344. let bytes = this.getBinary(id, options);
  345. return (
  346. bytes &&
  347. (bytes == UNMODIFIED ? UNMODIFIED : this.decoder.decode(bytes))
  348. );
  349. }
  350. let result = this.getString(id, options);
  351. if (result) {
  352. if (this.encoding == 'json') return JSON.parse(result);
  353. }
  354. return result;
  355. },
  356. getEntry(id, options) {
  357. let value = this.get(id, options);
  358. if (value !== undefined) {
  359. if (this.useVersions)
  360. return {
  361. value,
  362. version: getLastVersion(),
  363. //size: this.lastSize
  364. };
  365. else
  366. return {
  367. value,
  368. //size: this.lastSize
  369. };
  370. }
  371. },
  372. directWrite(id, options) {
  373. let rc;
  374. let txn =
  375. env.writeTxn ||
  376. (options && options.transaction) ||
  377. (readTxnRenewed ? readTxn : renewReadTxn(this));
  378. let keySize = this.writeKey(id, keyBytes, 0);
  379. let dataOffset = ((keySize >> 3) + 1) << 3;
  380. keyBytes.set(options.bytes, dataOffset);
  381. rc = directWrite(
  382. this.dbAddress,
  383. keySize,
  384. options.offset,
  385. options.bytes.length,
  386. txn.address || 0,
  387. );
  388. if (rc < 0) lmdbError(rc);
  389. },
  390. getUserSharedBuffer(id, defaultBuffer, options) {
  391. let keySize;
  392. const setKeyBytes = () => {
  393. if (options?.envKey) keySize = this.writeKey(id, keyBytes, 0);
  394. else {
  395. keyBytes.dataView.setUint32(0, this.db.dbi);
  396. keySize = this.writeKey(id, keyBytes, 4);
  397. }
  398. };
  399. setKeyBytes();
  400. let sharedBuffer = getUserSharedBuffer(
  401. env.address,
  402. keySize,
  403. defaultBuffer,
  404. options?.callback,
  405. );
  406. sharedBuffer.notify = () => {
  407. setKeyBytes();
  408. return notifyUserCallbacks(env.address, keySize);
  409. };
  410. return sharedBuffer;
  411. },
  412. attemptLock(id, version, callback) {
  413. if (!env.address) throw new Error('Can not operate on a closed database');
  414. keyBytes.dataView.setUint32(0, this.db.dbi);
  415. keyBytes.dataView.setFloat64(4, version);
  416. let keySize = this.writeKey(id, keyBytes, 12);
  417. return attemptLock(env.address, keySize, callback);
  418. },
  419. unlock(id, version, onlyCheck) {
  420. if (!env.address) throw new Error('Can not operate on a closed database');
  421. keyBytes.dataView.setUint32(0, this.db.dbi);
  422. keyBytes.dataView.setFloat64(4, version);
  423. let keySize = this.writeKey(id, keyBytes, 12);
  424. return unlock(env.address, keySize, onlyCheck);
  425. },
  426. hasLock(id, version) {
  427. return this.unlock(id, version, true);
  428. },
  429. resetReadTxn() {
  430. resetReadTxn();
  431. },
  432. _commitReadTxn() {
  433. if (readTxn) {
  434. readTxn.isCommitted = true;
  435. readTxn.commit();
  436. }
  437. lastReadTxnRef = null;
  438. readTxnRenewed = null;
  439. readTxn = null;
  440. },
  441. ensureReadTxn() {
  442. if (!env.writeTxn && !readTxnRenewed) renewReadTxn(this);
  443. },
  444. doesExist(key, versionOrValue, options) {
  445. if (versionOrValue == null) {
  446. // undefined means the entry exists, null is used specifically to check for the entry *not* existing
  447. return (
  448. (this.getBinaryFast(key, options) === undefined) ==
  449. (versionOrValue === null)
  450. );
  451. } else if (this.useVersions) {
  452. return (
  453. this.getBinaryFast(key, options) !== undefined &&
  454. (versionOrValue === IF_EXISTS || getLastVersion() === versionOrValue)
  455. );
  456. } else {
  457. if (versionOrValue && versionOrValue['\x10binary-data\x02'])
  458. versionOrValue = versionOrValue['\x10binary-data\x02'];
  459. else if (this.encoder)
  460. versionOrValue = this.encoder.encode(versionOrValue);
  461. if (typeof versionOrValue == 'string')
  462. versionOrValue = Buffer.from(versionOrValue);
  463. let defaultOptions = { start: versionOrValue, exactMatch: true };
  464. return (
  465. this.getValuesCount(
  466. key,
  467. options ? Object.assign(defaultOptions, options) : defaultOptions,
  468. ) > 0
  469. );
  470. }
  471. },
  472. getValues(key, options) {
  473. let defaultOptions = {
  474. key,
  475. valuesForKey: true,
  476. };
  477. if (options && options.snapshot === false)
  478. throw new Error('Can not disable snapshots for getValues');
  479. return this.getRange(
  480. options ? Object.assign(defaultOptions, options) : defaultOptions,
  481. );
  482. },
  483. getKeys(options) {
  484. if (!options) options = {};
  485. options.values = false;
  486. return this.getRange(options);
  487. },
  488. getCount(options) {
  489. if (!options) options = {};
  490. options.onlyCount = true;
  491. return this.getRange(options).iterate();
  492. },
  493. getKeysCount(options) {
  494. if (!options) options = {};
  495. options.onlyCount = true;
  496. options.values = false;
  497. return this.getRange(options).iterate();
  498. },
  499. getValuesCount(key, options) {
  500. if (!options) options = {};
  501. options.key = key;
  502. options.valuesForKey = true;
  503. options.onlyCount = true;
  504. return this.getRange(options).iterate();
  505. },
  506. getRange(options) {
  507. let iterable = new RangeIterable();
  508. let textDecoder = new TextDecoder();
  509. if (!options) options = {};
  510. let includeValues = options.values !== false;
  511. let includeVersions = options.versions;
  512. let valuesForKey = options.valuesForKey;
  513. let limit = options.limit;
  514. let db = this.db;
  515. let snapshot = options.snapshot;
  516. if (snapshot === false && this.dupSort && includeValues)
  517. throw new Error(
  518. 'Can not disable snapshot on a' + ' dupSort data store',
  519. );
  520. let compression = this.compression;
  521. iterable.iterate = () => {
  522. const reverse = options.reverse;
  523. let currentKey = valuesForKey
  524. ? options.key
  525. : reverse || 'start' in options
  526. ? options.start
  527. : DEFAULT_BEGINNING_KEY;
  528. let count = 0;
  529. let cursor, cursorRenewId, cursorAddress;
  530. let txn;
  531. let flags =
  532. (includeValues ? 0x100 : 0) |
  533. (reverse ? 0x400 : 0) |
  534. (valuesForKey ? 0x800 : 0) |
  535. (options.exactMatch ? 0x4000 : 0) |
  536. (options.inclusiveEnd ? 0x8000 : 0) |
  537. (options.exclusiveStart ? 0x10000 : 0);
  538. let store = this;
  539. function resetCursor() {
  540. try {
  541. if (cursor) finishCursor();
  542. let txnAddress;
  543. txn = options.transaction;
  544. if (txn) {
  545. if (txn.isDone)
  546. throw new Error(
  547. 'Can not iterate on range with transaction that is already' +
  548. ' done',
  549. );
  550. txnAddress = txn.address;
  551. if (!txnAddress) {
  552. throw new Error('Invalid transaction, it has no address');
  553. }
  554. cursor = null;
  555. } else {
  556. let writeTxn = env.writeTxn;
  557. if (writeTxn) snapshot = false;
  558. txn =
  559. env.writeTxn ||
  560. options.transaction ||
  561. (readTxnRenewed ? readTxn : renewReadTxn(store));
  562. cursor = !writeTxn && db.availableCursor;
  563. }
  564. if (cursor) {
  565. db.availableCursor = null;
  566. flags |= 0x2000;
  567. } else {
  568. cursor = new Cursor(db, txnAddress || 0);
  569. }
  570. cursorAddress = cursor.address;
  571. if (txn.use)
  572. txn.use(); // track transaction so we always use the same one
  573. else txn.refCount = (txn.refCount || 0) + 1;
  574. if (snapshot === false) {
  575. cursorRenewId = renewId; // use shared read transaction
  576. txn.renewingRefCount = (txn.renewingRefCount || 0) + 1; // need to know how many are renewing cursors
  577. }
  578. } catch (error) {
  579. if (cursor) {
  580. try {
  581. cursor.close();
  582. } catch (error) {}
  583. }
  584. throw error;
  585. }
  586. }
  587. resetCursor();
  588. if (options.onlyCount) {
  589. flags |= 0x1000;
  590. let count = position(options.offset);
  591. if (count < 0) lmdbError(count);
  592. finishCursor();
  593. return count;
  594. }
  595. function position(offset) {
  596. if (!env.address) {
  597. throw new Error('Can not iterate on a closed database');
  598. }
  599. let keySize =
  600. currentKey === undefined
  601. ? 0
  602. : store.writeKey(currentKey, keyBytes, 0);
  603. let endAddress;
  604. if (valuesForKey) {
  605. if (options.start === undefined && options.end === undefined)
  606. endAddress = 0;
  607. else {
  608. let startAddress;
  609. if (store.encoder.writeKey) {
  610. startAddress = saveKey(
  611. options.start,
  612. store.encoder.writeKey,
  613. iterable,
  614. maxKeySize,
  615. );
  616. keyBytesView.setFloat64(
  617. START_ADDRESS_POSITION,
  618. startAddress,
  619. true,
  620. );
  621. endAddress = saveKey(
  622. options.end,
  623. store.encoder.writeKey,
  624. iterable,
  625. maxKeySize,
  626. );
  627. } else if (
  628. (!options.start || options.start instanceof Uint8Array) &&
  629. (!options.end || options.end instanceof Uint8Array)
  630. ) {
  631. startAddress = saveKey(
  632. options.start,
  633. orderedBinary.writeKey,
  634. iterable,
  635. maxKeySize,
  636. );
  637. keyBytesView.setFloat64(
  638. START_ADDRESS_POSITION,
  639. startAddress,
  640. true,
  641. );
  642. endAddress = saveKey(
  643. options.end,
  644. orderedBinary.writeKey,
  645. iterable,
  646. maxKeySize,
  647. );
  648. } else {
  649. throw new Error(
  650. 'Only key-based encoding is supported for start/end values',
  651. );
  652. let encoded = store.encoder.encode(options.start);
  653. let bufferAddress =
  654. encoded.buffer.address ||
  655. (encoded.buffer.address =
  656. getAddress(encoded.buffer) - encoded.byteOffset);
  657. startAddress = bufferAddress + encoded.byteOffset;
  658. }
  659. }
  660. } else
  661. endAddress = saveKey(
  662. reverse && !('end' in options)
  663. ? DEFAULT_BEGINNING_KEY
  664. : options.end,
  665. store.writeKey,
  666. iterable,
  667. maxKeySize,
  668. );
  669. return doPosition(
  670. cursorAddress,
  671. flags,
  672. offset || 0,
  673. keySize,
  674. endAddress,
  675. );
  676. }
  677. function finishCursor() {
  678. if (!cursor || txn.isDone) return;
  679. if (iterable.onDone) iterable.onDone();
  680. if (cursorRenewId) txn.renewingRefCount--;
  681. if (txn.refCount <= 1 && txn.notCurrent) {
  682. cursor.close(); // this must be closed before the transaction is aborted or it can cause a
  683. // segmentation fault
  684. }
  685. if (txn.done) txn.done();
  686. else if (--txn.refCount <= 0 && txn.notCurrent) {
  687. txn.abort();
  688. txn.isDone = true;
  689. }
  690. if (!txn.isDone) {
  691. if (db.availableCursor || txn != readTxn) {
  692. cursor.close();
  693. } else {
  694. // try to reuse it
  695. db.availableCursor = cursor;
  696. db.cursorTxn = txn;
  697. }
  698. }
  699. cursor = null;
  700. }
  701. return {
  702. next() {
  703. let keySize, lastSize;
  704. if (cursorRenewId && (cursorRenewId != renewId || txn.isDone)) {
  705. if (flags & 0x10000) flags = flags & ~0x10000; // turn off exclusive start when repositioning
  706. resetCursor();
  707. keySize = position(0);
  708. }
  709. if (!cursor) {
  710. return ITERATOR_DONE;
  711. }
  712. if (count === 0) {
  713. // && includeValues) // on first entry, get current value if we need to
  714. keySize = position(options.offset);
  715. } else keySize = iterate(cursorAddress);
  716. if (keySize <= 0 || count++ >= limit) {
  717. if (keySize < -30700 && keySize !== -30798) lmdbError(keySize);
  718. finishCursor();
  719. return ITERATOR_DONE;
  720. }
  721. if (!valuesForKey || snapshot === false) {
  722. if (keySize > 20000) {
  723. if (keySize > 0x1000000) lmdbError(keySize - 0x100000000);
  724. throw new Error('Invalid key size ' + keySize.toString(16));
  725. }
  726. currentKey = store.readKey(keyBytes, 32, keySize + 32);
  727. }
  728. if (includeValues) {
  729. let value;
  730. lastSize = keyBytesView.getUint32(0, true);
  731. let bufferId = keyBytesView.getUint32(4, true);
  732. let bytes;
  733. if (bufferId) {
  734. bytes = getMMapBuffer(bufferId, lastSize);
  735. if (store.encoding === 'binary') bytes = Buffer.from(bytes);
  736. } else {
  737. bytes = compression ? compression.getValueBytes : getValueBytes;
  738. if (lastSize > bytes.maxLength) {
  739. store.lastSize = lastSize;
  740. asSafeBuffer = store.encoding === 'binary';
  741. try {
  742. bytes = store._returnLargeBuffer(() =>
  743. getCurrentValue(cursorAddress),
  744. );
  745. } finally {
  746. asSafeBuffer = false;
  747. }
  748. } else bytes.length = lastSize;
  749. }
  750. if (store.decoder) {
  751. value = store.decoder.decode(bytes, lastSize);
  752. } else if (store.encoding == 'binary')
  753. value = bytes.isGlobal
  754. ? Uint8ArraySlice.call(bytes, 0, lastSize)
  755. : bytes;
  756. else {
  757. // use the faster utf8Slice if available, otherwise fall back to TextDecoder (a little slower)
  758. // note applying Buffer's utf8Slice to a Uint8Array works in Node, but not in Bun.
  759. value = bytes.utf8Slice
  760. ? bytes.utf8Slice(0, lastSize)
  761. : textDecoder.decode(
  762. Uint8ArraySlice.call(bytes, 0, lastSize),
  763. );
  764. if (store.encoding == 'json' && value)
  765. value = JSON.parse(value);
  766. }
  767. if (includeVersions)
  768. return {
  769. value: {
  770. key: currentKey,
  771. value,
  772. version: getLastVersion(),
  773. },
  774. };
  775. else if (valuesForKey)
  776. return {
  777. value,
  778. };
  779. else
  780. return {
  781. value: {
  782. key: currentKey,
  783. value,
  784. },
  785. };
  786. } else if (includeVersions) {
  787. return {
  788. value: {
  789. key: currentKey,
  790. version: getLastVersion(),
  791. },
  792. };
  793. } else {
  794. return {
  795. value: currentKey,
  796. };
  797. }
  798. },
  799. return() {
  800. finishCursor();
  801. return ITERATOR_DONE;
  802. },
  803. throw() {
  804. finishCursor();
  805. return ITERATOR_DONE;
  806. },
  807. };
  808. };
  809. return iterable;
  810. },
  811. getMany(keys, callback) {
  812. // this is an asynchronous get for multiple keys. It actually works by prefetching asynchronously,
  813. // allowing a separate thread/task to absorb the potentially largest cost: hard page faults (and disk I/O).
  814. // And then we just do standard sync gets (to deserialized data) to fulfil the callback/promise
  815. // once the prefetch occurs
  816. let promise = callback
  817. ? undefined
  818. : new Promise(
  819. (resolve) => (callback = (error, results) => resolve(results)),
  820. );
  821. this.prefetch(keys, () => {
  822. let results = new Array(keys.length);
  823. for (let i = 0, l = keys.length; i < l; i++) {
  824. results[i] = get.call(this, keys[i]);
  825. }
  826. callback(null, results);
  827. });
  828. return promise;
  829. },
  830. getSharedBufferForGet(id, options) {
  831. let txn =
  832. env.writeTxn ||
  833. (options && options.transaction) ||
  834. (readTxnRenewed ? readTxn : renewReadTxn(this));
  835. this.lastSize = this.keyIsCompatibility
  836. ? txn.getBinaryShared(id)
  837. : this.db.get(this.writeKey(id, keyBytes, 0));
  838. if (this.lastSize === -30798) {
  839. // not found code
  840. return; //undefined
  841. }
  842. return this.lastSize;
  843. this.lastSize = keyBytesView.getUint32(0, true);
  844. let bufferIndex = keyBytesView.getUint32(12, true);
  845. lastOffset = keyBytesView.getUint32(8, true);
  846. let buffer = buffers[bufferIndex];
  847. let startOffset;
  848. if (
  849. !buffer ||
  850. lastOffset < (startOffset = buffer.startOffset) ||
  851. lastOffset + this.lastSize > startOffset + 0x100000000
  852. ) {
  853. if (buffer) env.detachBuffer(buffer.buffer);
  854. startOffset = (lastOffset >>> 16) * 0x10000;
  855. console.log(
  856. 'make buffer for address',
  857. bufferIndex * 0x100000000 + startOffset,
  858. );
  859. buffer = buffers[bufferIndex] = Buffer.from(
  860. getBufferForAddress(bufferIndex * 0x100000000 + startOffset),
  861. );
  862. buffer.startOffset = startOffset;
  863. }
  864. lastOffset -= startOffset;
  865. return buffer;
  866. return buffer.slice(
  867. lastOffset,
  868. lastOffset + this.lastSize,
  869. ); /*Uint8ArraySlice.call(buffer, lastOffset, lastOffset + this.lastSize)*/
  870. },
  871. prefetch(keys, callback) {
  872. if (!keys) throw new Error('An array of keys must be provided');
  873. if (!keys.length) {
  874. if (callback) {
  875. callback(null);
  876. return;
  877. } else return Promise.resolve();
  878. }
  879. let buffers = [];
  880. let startPosition;
  881. let bufferHolder = {};
  882. let lastBuffer;
  883. for (let key of keys) {
  884. let position;
  885. if (key && key.key !== undefined && key.value !== undefined) {
  886. position = saveKey(
  887. key.value,
  888. this.writeKey,
  889. bufferHolder,
  890. maxKeySize,
  891. 0x80000000,
  892. );
  893. saveReferenceToBuffer();
  894. saveKey(key.key, this.writeKey, bufferHolder, maxKeySize);
  895. } else {
  896. position = saveKey(key, this.writeKey, bufferHolder, maxKeySize);
  897. }
  898. if (!startPosition) startPosition = position;
  899. saveReferenceToBuffer();
  900. }
  901. function saveReferenceToBuffer() {
  902. if (bufferHolder.saveBuffer != lastBuffer) {
  903. buffers.push(bufferHolder.saveBuffer);
  904. lastBuffer = bufferHolder.saveBuffer;
  905. }
  906. }
  907. saveKey(undefined, this.writeKey, bufferHolder, maxKeySize);
  908. saveReferenceToBuffer();
  909. outstandingReads++;
  910. prefetch(this.dbAddress, startPosition, (error) => {
  911. outstandingReads--;
  912. if (error)
  913. console.error('Error with prefetch', buffers); // partly exists to keep the buffers pinned in memory
  914. else callback(null);
  915. });
  916. if (!callback) return new Promise((resolve) => (callback = resolve));
  917. },
  918. useReadTransaction() {
  919. let txn = readTxnRenewed ? readTxn : renewReadTxn(this);
  920. if (!txn.use) {
  921. throw new Error('Can not use read transaction from a closed database');
  922. }
  923. // because the renew actually happens lazily in read operations, renew needs to be explicit
  924. // here in order to actually secure a real read transaction. Try to only do it if necessary;
  925. // once it has a refCount, it should be good to go
  926. if (!(readTxn.refCount - (readTxn.renewingRefCount || 0) > 0))
  927. txn.renew();
  928. txn.use();
  929. return txn;
  930. },
  931. close(callback) {
  932. this.status = 'closing';
  933. let txnPromise;
  934. if (this.isRoot) {
  935. // if it is root, we need to abort and/or wait for transactions to finish
  936. if (readTxn) {
  937. try {
  938. readTxn.abort();
  939. } catch (error) {}
  940. } else readTxn = {};
  941. readTxn.isDone = true;
  942. Object.defineProperty(readTxn, 'renew', {
  943. value: () => {
  944. throw new Error('Can not read from a closed database');
  945. },
  946. configurable: true,
  947. });
  948. Object.defineProperty(readTxn, 'use', {
  949. value: () => {
  950. throw new Error('Can not read from a closed database');
  951. },
  952. configurable: true,
  953. });
  954. readTxnRenewed = null;
  955. txnPromise = this._endWrites && this._endWrites();
  956. }
  957. const doClose = () => {
  958. if (this.isRoot) {
  959. if (outstandingReads > 0) {
  960. return new Promise((resolve) =>
  961. setTimeout(() => resolve(doClose()), 1),
  962. );
  963. }
  964. env.address = 0;
  965. try {
  966. env.close();
  967. } catch (error) {}
  968. } else this.db.close();
  969. this.status = 'closed';
  970. if (callback) callback();
  971. };
  972. if (txnPromise) return txnPromise.then(doClose);
  973. else {
  974. doClose();
  975. return Promise.resolve();
  976. }
  977. },
  978. getStats() {
  979. let txn = env.writeTxn || (readTxnRenewed ? readTxn : renewReadTxn(this));
  980. let dbStats = this.db.stat();
  981. dbStats.root = env.stat();
  982. Object.assign(dbStats, env.info());
  983. dbStats.free = env.freeStat();
  984. return dbStats;
  985. },
  986. });
  987. let get = LMDBStore.prototype.get;
  988. let lastReadTxnRef;
  989. function getMMapBuffer(bufferId, size) {
  990. let buffer = mmaps[bufferId];
  991. if (!buffer) {
  992. buffer = mmaps[bufferId] = getSharedBuffer(bufferId, env.address);
  993. }
  994. let offset = keyBytesView.getUint32(8, true);
  995. return new Uint8Array(buffer, offset, size);
  996. }
  997. function renewReadTxn(store) {
  998. if (!env.address) {
  999. throw new Error('Can not renew a transaction from a closed database');
  1000. }
  1001. if (!readTxn) {
  1002. let retries = 0;
  1003. let waitArray;
  1004. do {
  1005. try {
  1006. let lastReadTxn = lastReadTxnRef && lastReadTxnRef.deref();
  1007. readTxn = new Txn(
  1008. env,
  1009. 0x20000,
  1010. lastReadTxn && !lastReadTxn.isDone && lastReadTxn,
  1011. );
  1012. if (readTxn.address == 0) {
  1013. readTxn = lastReadTxn;
  1014. if (readTxn.notCurrent) readTxn.notCurrent = false;
  1015. }
  1016. break;
  1017. } catch (error) {
  1018. if (error.message.includes('temporarily')) {
  1019. if (!waitArray)
  1020. waitArray = new Int32Array(new SharedArrayBuffer(4), 0, 1);
  1021. Atomics.wait(waitArray, 0, 0, retries * 2);
  1022. } else throw error;
  1023. }
  1024. } while (retries++ < 100);
  1025. }
  1026. // we actually don't renew here, we let the renew take place in the next
  1027. // lmdb native read/call so as to avoid an extra native call
  1028. readTxnRenewed = setTimeout(resetReadTxn, 0);
  1029. store.emit('begin-transaction');
  1030. return readTxn;
  1031. }
  1032. function resetReadTxn() {
  1033. renewId++;
  1034. if (readTxnRenewed) {
  1035. readTxnRenewed = null;
  1036. if (readTxn.refCount - (readTxn.renewingRefCount || 0) > 0) {
  1037. readTxn.notCurrent = true;
  1038. lastReadTxnRef = new WeakRef(readTxn);
  1039. readTxn = null;
  1040. } else if (readTxn.address && !readTxn.isDone) {
  1041. resetTxn(readTxn.address);
  1042. } else {
  1043. console.warn('Attempt to reset an invalid read txn', readTxn);
  1044. throw new Error('Attempt to reset an invalid read txn');
  1045. }
  1046. }
  1047. }
  1048. }
  1049. export function makeReusableBuffer(size) {
  1050. let bytes =
  1051. typeof Buffer != 'undefined' ? Buffer.alloc(size) : new Uint8Array(size);
  1052. bytes.maxLength = size;
  1053. Object.defineProperty(bytes, 'length', {
  1054. value: size,
  1055. writable: true,
  1056. configurable: true,
  1057. });
  1058. return bytes;
  1059. }
  1060. Txn.prototype.done = function () {
  1061. this.refCount--;
  1062. if (this.refCount === 0 && this.notCurrent) {
  1063. this.abort();
  1064. this.isDone = true;
  1065. } else if (this.refCount < 0)
  1066. throw new Error('Can not finish a transaction more times than it was used');
  1067. };
  1068. Txn.prototype.use = function () {
  1069. this.refCount = (this.refCount || 0) + 1;
  1070. };
  1071. let readInstructions,
  1072. readCallbacks = new Map(),
  1073. uint32Instructions,
  1074. instructionsDataView = { setFloat64() {}, setUint32() {} },
  1075. instructionsAddress;
  1076. let savePosition = 8000;
  1077. let DYNAMIC_KEY_BUFFER_SIZE = 8192;
  1078. function allocateInstructionsBuffer() {
  1079. readInstructions =
  1080. typeof Buffer != 'undefined'
  1081. ? Buffer.alloc(DYNAMIC_KEY_BUFFER_SIZE)
  1082. : new Uint8Array(DYNAMIC_KEY_BUFFER_SIZE);
  1083. uint32Instructions = new Int32Array(
  1084. readInstructions.buffer,
  1085. 0,
  1086. readInstructions.buffer.byteLength >> 2,
  1087. );
  1088. uint32Instructions[2] = 0xf0000000; // indicates a new read task must be started
  1089. instructionsAddress = readInstructions.buffer.address = getAddress(
  1090. readInstructions.buffer,
  1091. );
  1092. readInstructions.dataView = instructionsDataView = new DataView(
  1093. readInstructions.buffer,
  1094. readInstructions.byteOffset,
  1095. readInstructions.byteLength,
  1096. );
  1097. savePosition = 0;
  1098. }
  1099. export function recordReadInstruction(
  1100. txnAddress,
  1101. dbi,
  1102. key,
  1103. writeKey,
  1104. maxKeySize,
  1105. callback,
  1106. ) {
  1107. if (savePosition > 7800) {
  1108. allocateInstructionsBuffer();
  1109. }
  1110. let start = savePosition;
  1111. let keyPosition = savePosition + 16;
  1112. try {
  1113. savePosition =
  1114. key === undefined
  1115. ? keyPosition
  1116. : writeKey(key, readInstructions, keyPosition);
  1117. } catch (error) {
  1118. if (error.name == 'RangeError') {
  1119. if (8180 - start < maxKeySize) {
  1120. allocateInstructionsBuffer(); // try again:
  1121. return recordReadInstruction(
  1122. txnAddress,
  1123. dbi,
  1124. key,
  1125. writeKey,
  1126. maxKeySize,
  1127. callback,
  1128. );
  1129. }
  1130. throw new Error('Key was too large, max key size is ' + maxKeySize);
  1131. } else throw error;
  1132. }
  1133. let length = savePosition - keyPosition;
  1134. if (length > maxKeySize) {
  1135. savePosition = start;
  1136. throw new Error(
  1137. 'Key of size ' + length + ' was too large, max key size is ' + maxKeySize,
  1138. );
  1139. }
  1140. uint32Instructions[(start >> 2) + 3] = length; // save the length
  1141. uint32Instructions[(start >> 2) + 2] = dbi;
  1142. savePosition = (savePosition + 12) & 0xfffffc;
  1143. instructionsDataView.setFloat64(start, txnAddress, true);
  1144. let callbackId = addReadCallback(() => {
  1145. let position = start >> 2;
  1146. let rc = thisInstructions[position];
  1147. callback(
  1148. rc,
  1149. thisInstructions[position + 1],
  1150. thisInstructions[position + 2],
  1151. thisInstructions[position + 3],
  1152. );
  1153. });
  1154. let thisInstructions = uint32Instructions;
  1155. //if (start === 0)
  1156. return startRead(instructionsAddress + start, callbackId, {}, 'read');
  1157. //else
  1158. //nextRead(start);
  1159. }
  1160. let nextCallbackId = 0;
  1161. let addReadCallback = globalThis.__lmdb_read_callback;
  1162. if (!addReadCallback) {
  1163. addReadCallback = globalThis.__lmdb_read_callback = function (callback) {
  1164. let callbackId = nextCallbackId++;
  1165. readCallbacks.set(callbackId, callback);
  1166. return callbackId;
  1167. };
  1168. setReadCallback(function (callbackId) {
  1169. readCallbacks.get(callbackId)();
  1170. readCallbacks.delete(callbackId);
  1171. });
  1172. }