write.js 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124
  1. import {
  2. getAddress,
  3. getBufferAddress,
  4. write,
  5. compress,
  6. lmdbError,
  7. } from './native.js';
  8. import { when } from './util/when.js';
  9. var backpressureArray;
  10. const WAITING_OPERATION = 0x2000000;
  11. const BACKPRESSURE_THRESHOLD = 300000;
  12. const TXN_DELIMITER = 0x8000000;
  13. const TXN_COMMITTED = 0x10000000;
  14. const TXN_FLUSHED = 0x20000000;
  15. const TXN_FAILED = 0x40000000;
  16. export const FAILED_CONDITION = 0x4000000;
  17. const REUSE_BUFFER_MODE = 512;
  18. const RESET_BUFFER_MODE = 1024;
  19. const NO_RESOLVE = 16;
  20. const HAS_TXN = 8;
  21. const CONDITIONAL_VERSION_LESS_THAN = 0x800;
  22. const CONDITIONAL_ALLOW_NOTFOUND = 0x800;
  23. const SYNC_PROMISE_SUCCESS = Promise.resolve(true);
  24. const SYNC_PROMISE_FAIL = Promise.resolve(false);
  25. SYNC_PROMISE_SUCCESS.isSync = true;
  26. SYNC_PROMISE_SUCCESS.result = true;
  27. SYNC_PROMISE_FAIL.isSync = true;
  28. SYNC_PROMISE_FAIL.result = false;
  29. const PROMISE_SUCCESS = Promise.resolve(true);
  30. const arch = process.arch;
  31. export const ABORT = 4.452694326329068e-106; // random/unguessable numbers, which work across module/versions and native
  32. export const IF_EXISTS = 3.542694326329068e-103;
  33. const CALLBACK_THREW = {};
  34. const LocalSharedArrayBuffer =
  35. typeof Deno != 'undefined' || // Deno can't handle SharedArrayBuffer as an FFI
  36. // argument due to https://github.com/denoland/deno/issues/12678
  37. typeof SharedArrayBuffer == 'undefined' // Sometimes electron doesn't have a SharedArrayBuffer
  38. ? ArrayBuffer
  39. : SharedArrayBuffer;
  40. const ByteArray =
  41. typeof Buffer != 'undefined'
  42. ? function (buffer) {
  43. return Buffer.from(buffer);
  44. }
  45. : Uint8Array;
  46. const queueTask =
  47. typeof setImmediate != 'undefined' ? setImmediate : setTimeout; // TODO: Or queueMicrotask?
  48. //let debugLog = []
  49. const WRITE_BUFFER_SIZE = 0x10000;
  50. var log = [];
  51. export function addWriteMethods(
  52. LMDBStore,
  53. {
  54. env,
  55. fixedBuffer,
  56. resetReadTxn,
  57. useWritemap,
  58. maxKeySize,
  59. eventTurnBatching,
  60. txnStartThreshold,
  61. batchStartThreshold,
  62. overlappingSync,
  63. commitDelay,
  64. separateFlushed,
  65. maxFlushDelay,
  66. },
  67. ) {
  68. // stands for write instructions
  69. var dynamicBytes;
  70. function allocateInstructionBuffer(lastPosition) {
  71. // Must use a shared buffer on older node in order to use Atomics, and it is also more correct since we are
  72. // indeed accessing and modifying it from another thread (in C). However, Deno can't handle it for
  73. // FFI so aliased above
  74. let buffer = new LocalSharedArrayBuffer(WRITE_BUFFER_SIZE);
  75. let lastBytes = dynamicBytes;
  76. dynamicBytes = new ByteArray(buffer);
  77. let uint32 = (dynamicBytes.uint32 = new Uint32Array(
  78. buffer,
  79. 0,
  80. WRITE_BUFFER_SIZE >> 2,
  81. ));
  82. uint32[2] = 0;
  83. dynamicBytes.float64 = new Float64Array(buffer, 0, WRITE_BUFFER_SIZE >> 3);
  84. buffer.address = getBufferAddress(dynamicBytes);
  85. uint32.address = buffer.address + uint32.byteOffset;
  86. dynamicBytes.position = 1; // we start at position 1 to save space for writing the txn id before the txn delimiter
  87. if (lastPosition) {
  88. lastBytes.float64[lastPosition + 1] =
  89. dynamicBytes.uint32.address + (dynamicBytes.position << 3);
  90. lastBytes.uint32[lastPosition << 1] = 3; // pointer instruction
  91. }
  92. return dynamicBytes;
  93. }
  94. var newBufferThreshold = (WRITE_BUFFER_SIZE - maxKeySize - 64) >> 3; // need to reserve more room if we do inline values
  95. var outstandingWriteCount = 0;
  96. var startAddress = 0;
  97. var writeTxn = null;
  98. var committed;
  99. var abortedNonChildTransactionWarn;
  100. var nextTxnCallbacks = [];
  101. var commitPromise,
  102. flushPromise,
  103. flushResolvers = [],
  104. batchFlushResolvers = [];
  105. commitDelay = commitDelay || 0;
  106. eventTurnBatching = eventTurnBatching === false ? false : true;
  107. var enqueuedCommit;
  108. var afterCommitCallbacks = [];
  109. var beforeCommitCallbacks = [];
  110. var enqueuedEventTurnBatch;
  111. var batchDepth = 0;
  112. var lastWritePromise;
  113. var writeBatchStart,
  114. outstandingBatchCount,
  115. lastSyncTxnFlush,
  116. lastFlushTimeout,
  117. lastFlushCallback;
  118. var hasUnresolvedTxns;
  119. txnStartThreshold = txnStartThreshold || 5;
  120. batchStartThreshold = batchStartThreshold || 1000;
  121. maxFlushDelay = maxFlushDelay || 500;
  122. allocateInstructionBuffer();
  123. dynamicBytes.uint32[2] = TXN_DELIMITER | TXN_COMMITTED | TXN_FLUSHED;
  124. var txnResolution,
  125. nextResolution = {
  126. uint32: dynamicBytes.uint32,
  127. flagPosition: 2,
  128. flag: 0,
  129. valueBuffer: null,
  130. next: null,
  131. meta: null,
  132. };
  133. var uncommittedResolution = {
  134. uint32: null,
  135. flagPosition: 2,
  136. flag: 0,
  137. valueBuffer: null,
  138. next: nextResolution,
  139. meta: null,
  140. };
  141. var unwrittenResolution = nextResolution;
  142. var lastPromisedResolution = uncommittedResolution;
  143. var lastQueuedResolution = uncommittedResolution;
  144. function writeInstructions(flags, store, key, value, version, ifVersion) {
  145. let writeStatus;
  146. let targetBytes, position, encoder;
  147. let valueSize, valueBuffer, valueBufferStart;
  148. if (flags & 2) {
  149. // encode first in case we have to write a shared structure
  150. encoder = store.encoder;
  151. if (value && value['\x10binary-data\x02'])
  152. valueBuffer = value['\x10binary-data\x02'];
  153. else if (encoder) {
  154. if (encoder.copyBuffers)
  155. // use this as indicator for support buffer reuse for now
  156. valueBuffer = encoder.encode(
  157. value,
  158. REUSE_BUFFER_MODE | (writeTxn ? RESET_BUFFER_MODE : 0),
  159. );
  160. // in addition, if we are writing sync, after using, we can immediately reset the encoder's position to reuse that space, which can improve performance
  161. else {
  162. // various other encoders, including JSON.stringify, that might serialize to a string
  163. valueBuffer = encoder.encode(value);
  164. if (typeof valueBuffer == 'string')
  165. valueBuffer = Buffer.from(valueBuffer); // TODO: Would be nice to write strings inline in the instructions
  166. }
  167. } else if (typeof value == 'string') {
  168. valueBuffer = Buffer.from(value); // TODO: Would be nice to write strings inline in the instructions
  169. } else if (value instanceof Uint8Array) valueBuffer = value;
  170. else
  171. throw new Error(
  172. 'Invalid value to put in database ' +
  173. value +
  174. ' (' +
  175. typeof value +
  176. '), consider using encoder',
  177. );
  178. valueBufferStart = valueBuffer.start;
  179. if (valueBufferStart > -1)
  180. // if we have buffers with start/end position
  181. valueSize = valueBuffer.end - valueBufferStart; // size
  182. else valueSize = valueBuffer.length;
  183. if (store.dupSort && valueSize > maxKeySize)
  184. throw new Error(
  185. 'The value is larger than the maximum size (' +
  186. maxKeySize +
  187. ') for a value in a dupSort database',
  188. );
  189. } else valueSize = 0;
  190. if (writeTxn) {
  191. targetBytes = fixedBuffer;
  192. position = 0;
  193. } else {
  194. if (eventTurnBatching && !enqueuedEventTurnBatch && batchDepth == 0) {
  195. enqueuedEventTurnBatch = queueTask(() => {
  196. try {
  197. for (let i = 0, l = beforeCommitCallbacks.length; i < l; i++) {
  198. try {
  199. beforeCommitCallbacks[i]();
  200. } catch (error) {
  201. console.error('In beforecommit callback', error);
  202. }
  203. }
  204. } catch (error) {
  205. console.error(error);
  206. }
  207. enqueuedEventTurnBatch = null;
  208. batchDepth--;
  209. finishBatch();
  210. if (writeBatchStart) writeBatchStart(); // TODO: When we support delay start of batch, optionally don't delay this
  211. });
  212. commitPromise = null; // reset the commit promise, can't know if it is really a new transaction prior to finishWrite being called
  213. flushPromise = null;
  214. writeBatchStart = writeInstructions(1, store);
  215. outstandingBatchCount = 0;
  216. batchDepth++;
  217. }
  218. targetBytes = dynamicBytes;
  219. position = targetBytes.position;
  220. }
  221. let uint32 = targetBytes.uint32,
  222. float64 = targetBytes.float64;
  223. let flagPosition = position << 1; // flagPosition is the 32-bit word starting position
  224. // don't increment position until we are sure we don't have any key writing errors
  225. if (!uint32) {
  226. throw new Error('Internal buffers have been corrupted');
  227. }
  228. uint32[flagPosition + 1] = store.db.dbi;
  229. if (flags & 4) {
  230. let keyStartPosition = (position << 3) + 12;
  231. let endPosition;
  232. try {
  233. endPosition = store.writeKey(key, targetBytes, keyStartPosition);
  234. if (!(keyStartPosition < endPosition) && (flags & 0xf) != 12)
  235. throw new Error(
  236. 'Invalid key or zero length key is not allowed in LMDB ' + key,
  237. );
  238. } catch (error) {
  239. targetBytes.fill(0, keyStartPosition);
  240. if (error.name == 'RangeError')
  241. error = new Error(
  242. 'Key size is larger than the maximum key size (' + maxKeySize + ')',
  243. );
  244. throw error;
  245. }
  246. let keySize = endPosition - keyStartPosition;
  247. if (keySize > maxKeySize) {
  248. targetBytes.fill(0, keyStartPosition); // restore zeros
  249. throw new Error(
  250. 'Key size is larger than the maximum key size (' + maxKeySize + ')',
  251. );
  252. }
  253. uint32[flagPosition + 2] = keySize;
  254. position = (endPosition + 16) >> 3;
  255. if (flags & 2) {
  256. let mustCompress;
  257. if (valueBufferStart > -1) {
  258. // if we have buffers with start/end position
  259. // record pointer to value buffer
  260. float64[position] =
  261. (valueBuffer.address ||
  262. (valueBuffer.address = getAddress(valueBuffer.buffer))) +
  263. valueBufferStart;
  264. if (store.compression) {
  265. let compressionFlagIndex =
  266. valueBufferStart + (store.compression.startingOffset || 0);
  267. // this is the compression indicator, so we must compress
  268. mustCompress =
  269. compressionFlagIndex < valueBuffer.end &&
  270. valueBuffer[compressionFlagIndex] >= 250;
  271. }
  272. } else {
  273. let valueArrayBuffer = valueBuffer.buffer;
  274. // record pointer to value buffer
  275. let address =
  276. (valueArrayBuffer.address ||
  277. (valueBuffer.length === 0
  278. ? 0 // externally allocated buffers of zero-length with the same non-null-pointer can crash node, #161
  279. : (valueArrayBuffer.address = getAddress(valueArrayBuffer)))) +
  280. valueBuffer.byteOffset;
  281. if (address <= 0 && valueBuffer.length > 0)
  282. console.error('Supplied buffer had an invalid address', address);
  283. float64[position] = address;
  284. if (store.compression) {
  285. let compressionFlagIndex = store.compression.startingOffset || 0;
  286. // this is the compression indicator, so we must compress
  287. mustCompress =
  288. compressionFlagIndex < valueBuffer.length &&
  289. valueBuffer[compressionFlagIndex] >= 250;
  290. }
  291. }
  292. uint32[(position++ << 1) - 1] = valueSize;
  293. if (
  294. store.compression &&
  295. (valueSize >= store.compression.threshold || mustCompress)
  296. ) {
  297. flags |= 0x100000;
  298. float64[position] = store.compression.address;
  299. if (!writeTxn)
  300. compress(env.address, uint32.address + (position << 3), () => {
  301. // this is never actually called in NodeJS, just use to pin the buffer in memory until it is finished
  302. // and is a no-op in Deno
  303. if (!float64) throw new Error('No float64 available');
  304. });
  305. position++;
  306. }
  307. }
  308. if (ifVersion !== undefined) {
  309. if (ifVersion === null)
  310. flags |= 0x10; // if it does not exist, MDB_NOOVERWRITE
  311. else {
  312. flags |= 0x100;
  313. float64[position++] = ifVersion;
  314. }
  315. }
  316. if (version !== undefined) {
  317. flags |= 0x200;
  318. float64[position++] = version || 0;
  319. }
  320. } else position++;
  321. targetBytes.position = position;
  322. if (writeTxn) {
  323. uint32[0] = flags;
  324. write(env.address, uint32.address);
  325. return () =>
  326. uint32[0] & FAILED_CONDITION ? SYNC_PROMISE_FAIL : SYNC_PROMISE_SUCCESS;
  327. }
  328. // if we ever use buffers that haven't been zero'ed, need to clear out the next slot like this:
  329. // uint32[position << 1] = 0 // clear out the next slot
  330. let nextUint32;
  331. if (position > newBufferThreshold) {
  332. // make new buffer and make pointer to it
  333. let lastPosition = position;
  334. targetBytes = allocateInstructionBuffer(position);
  335. position = targetBytes.position;
  336. nextUint32 = targetBytes.uint32;
  337. } else nextUint32 = uint32;
  338. let resolution = nextResolution;
  339. // create the placeholder next resolution
  340. nextResolution = resolution.next = {
  341. // we try keep resolutions exactly the same object type
  342. uint32: nextUint32,
  343. flagPosition: position << 1,
  344. flag: 0, // TODO: eventually eliminate this, as we can probably signify HAS_TXN/NO_RESOLVE/FAILED_CONDITION in upper bits
  345. valueBuffer: fixedBuffer, // these are all just placeholders so that we have the right hidden class initially allocated
  346. next: null,
  347. meta: null,
  348. };
  349. lastQueuedResolution = resolution;
  350. let writtenBatchDepth = batchDepth;
  351. return (callback) => {
  352. if (writtenBatchDepth) {
  353. // If we are in a batch, the transaction can't close, so we do the faster,
  354. // but non-deterministic updates, knowing that the write thread can
  355. // just poll for the status change if we miss a status update.
  356. // That is, if we are on x64 architecture...
  357. if (arch === 'x64') {
  358. writeStatus = uint32[flagPosition];
  359. uint32[flagPosition] = flags;
  360. } else {
  361. // However, on ARM processors, apparently more radical memory reordering can occur
  362. // so we need to use the slower atomic operation to ensure that a memory barrier is set
  363. // and that the value pointer is actually written before the flag is updated
  364. writeStatus = Atomics.or(uint32, flagPosition, flags);
  365. }
  366. if (writeBatchStart && !writeStatus) {
  367. outstandingBatchCount += 1 + (valueSize >> 12);
  368. if (outstandingBatchCount > batchStartThreshold) {
  369. outstandingBatchCount = 0;
  370. writeBatchStart();
  371. writeBatchStart = null;
  372. }
  373. }
  374. } // otherwise the transaction could end at any time and we need to know the
  375. // deterministically if it is ending, so we can reset the commit promise
  376. // so we use the slower atomic operation
  377. else writeStatus = Atomics.or(uint32, flagPosition, flags);
  378. outstandingWriteCount++;
  379. if (writeStatus & TXN_DELIMITER) {
  380. commitPromise = null; // TODO: Don't reset these if this comes from the batch start operation on an event turn batch
  381. flushPromise = null;
  382. flushResolvers = [];
  383. queueCommitResolution(resolution);
  384. if (!startAddress) {
  385. startAddress = uint32.address + (flagPosition << 2);
  386. }
  387. }
  388. if (!writtenBatchDepth && batchFlushResolvers.length > 0) {
  389. flushResolvers.push(...batchFlushResolvers);
  390. batchFlushResolvers = [];
  391. }
  392. if (!flushPromise && overlappingSync) {
  393. flushPromise = new Promise((resolve) => {
  394. if (writtenBatchDepth) {
  395. batchFlushResolvers.push(resolve);
  396. } else {
  397. flushResolvers.push(resolve);
  398. }
  399. });
  400. }
  401. if (writeStatus & WAITING_OPERATION) {
  402. // write thread is waiting
  403. write(env.address, 0);
  404. }
  405. if (outstandingWriteCount > BACKPRESSURE_THRESHOLD && !writeBatchStart) {
  406. if (!backpressureArray)
  407. backpressureArray = new Int32Array(new SharedArrayBuffer(4), 0, 1);
  408. Atomics.wait(
  409. backpressureArray,
  410. 0,
  411. 0,
  412. Math.round(outstandingWriteCount / BACKPRESSURE_THRESHOLD),
  413. );
  414. }
  415. if (startAddress) {
  416. if (eventTurnBatching)
  417. startWriting(); // start writing immediately because this has already been batched/queued
  418. else if (!enqueuedCommit && txnStartThreshold) {
  419. enqueuedCommit =
  420. commitDelay == 0 && typeof setImmediate != 'undefined'
  421. ? setImmediate(() => startWriting())
  422. : setTimeout(() => startWriting(), commitDelay);
  423. } else if (outstandingWriteCount > txnStartThreshold) startWriting();
  424. }
  425. if ((outstandingWriteCount & 7) === 0) resolveWrites();
  426. if (store.cache) {
  427. resolution.meta = {
  428. key,
  429. store,
  430. valueSize: valueBuffer ? valueBuffer.length : 0,
  431. };
  432. }
  433. resolution.valueBuffer = valueBuffer;
  434. if (callback) {
  435. if (callback === IF_EXISTS) ifVersion = IF_EXISTS;
  436. else {
  437. let meta = resolution.meta || (resolution.meta = {});
  438. meta.reject = callback;
  439. meta.resolve = (value) => callback(null, value);
  440. return;
  441. }
  442. }
  443. // if it is not conditional because of ifVersion or has any flags that can make the write conditional
  444. if (ifVersion === undefined && !(flags & 0x22030)) {
  445. if (writtenBatchDepth > 1) {
  446. if (!resolution.flag && !store.cache) resolution.flag = NO_RESOLVE;
  447. return PROMISE_SUCCESS; // or return undefined?
  448. }
  449. if (commitPromise) {
  450. if (!resolution.flag) resolution.flag = NO_RESOLVE;
  451. } else {
  452. commitPromise = new Promise((resolve, reject) => {
  453. let meta = resolution.meta || (resolution.meta = {});
  454. meta.resolve = resolve;
  455. resolve.unconditional = true;
  456. meta.reject = reject;
  457. });
  458. if (separateFlushed)
  459. commitPromise.flushed = overlappingSync
  460. ? flushPromise
  461. : commitPromise;
  462. }
  463. return commitPromise;
  464. }
  465. lastWritePromise = new Promise((resolve, reject) => {
  466. let meta = resolution.meta || (resolution.meta = {});
  467. meta.resolve = resolve;
  468. meta.reject = reject;
  469. });
  470. if (separateFlushed)
  471. lastWritePromise.flushed = overlappingSync
  472. ? flushPromise
  473. : lastWritePromise;
  474. return lastWritePromise;
  475. };
  476. }
  477. let committedFlushResolvers,
  478. lastSync = Promise.resolve();
  479. function startWriting() {
  480. if (enqueuedCommit) {
  481. clearImmediate(enqueuedCommit);
  482. enqueuedCommit = null;
  483. }
  484. let resolvers = flushResolvers;
  485. let start = Date.now();
  486. env.startWriting(startAddress, (status) => {
  487. if (dynamicBytes.uint32[dynamicBytes.position << 1] & TXN_DELIMITER)
  488. queueCommitResolution(nextResolution);
  489. resolveWrites(true);
  490. switch (status) {
  491. case 0:
  492. for (let resolver of resolvers) {
  493. resolver();
  494. }
  495. break;
  496. case 1:
  497. break;
  498. case 2:
  499. hasUnresolvedTxns = false;
  500. executeTxnCallbacks();
  501. return hasUnresolvedTxns;
  502. break;
  503. default:
  504. try {
  505. lmdbError(status);
  506. } catch (error) {
  507. console.error(error);
  508. if (commitRejectPromise) {
  509. commitRejectPromise.reject(error);
  510. commitRejectPromise = null;
  511. }
  512. }
  513. }
  514. });
  515. startAddress = 0;
  516. }
  517. function queueCommitResolution(resolution) {
  518. if (!(resolution.flag & HAS_TXN)) {
  519. resolution.flag = HAS_TXN;
  520. if (txnResolution) {
  521. txnResolution.nextTxn = resolution;
  522. //outstandingWriteCount = 0
  523. } else txnResolution = resolution;
  524. }
  525. }
  526. var TXN_DONE = TXN_COMMITTED | TXN_FAILED;
  527. function resolveWrites(async) {
  528. // clean up finished instructions
  529. let instructionStatus;
  530. while (
  531. (instructionStatus =
  532. unwrittenResolution.uint32[unwrittenResolution.flagPosition]) &
  533. 0x1000000
  534. ) {
  535. if (unwrittenResolution.callbacks) {
  536. nextTxnCallbacks.push(unwrittenResolution.callbacks);
  537. unwrittenResolution.callbacks = null;
  538. }
  539. outstandingWriteCount--;
  540. if (unwrittenResolution.flag !== HAS_TXN) {
  541. if (
  542. unwrittenResolution.flag === NO_RESOLVE &&
  543. !unwrittenResolution.meta
  544. ) {
  545. // in this case we can completely remove from the linked list, clearing more memory
  546. lastPromisedResolution.next = unwrittenResolution =
  547. unwrittenResolution.next;
  548. continue;
  549. }
  550. unwrittenResolution.uint32 = null;
  551. }
  552. unwrittenResolution.valueBuffer = null;
  553. unwrittenResolution.flag = instructionStatus;
  554. lastPromisedResolution = unwrittenResolution;
  555. unwrittenResolution = unwrittenResolution.next;
  556. }
  557. while (
  558. txnResolution &&
  559. (instructionStatus =
  560. txnResolution.uint32[txnResolution.flagPosition] & TXN_DONE)
  561. ) {
  562. if (instructionStatus & TXN_FAILED) rejectCommit();
  563. else resolveCommit(async);
  564. }
  565. }
  566. function resolveCommit(async) {
  567. afterCommit(txnResolution.uint32[txnResolution.flagPosition - 1]);
  568. if (async) resetReadTxn();
  569. else queueMicrotask(resetReadTxn); // TODO: only do this if there are actually committed writes?
  570. do {
  571. if (uncommittedResolution.meta && uncommittedResolution.meta.resolve) {
  572. let resolve = uncommittedResolution.meta.resolve;
  573. if (
  574. uncommittedResolution.flag & FAILED_CONDITION &&
  575. !resolve.unconditional
  576. )
  577. resolve(false);
  578. else resolve(true);
  579. }
  580. } while (
  581. (uncommittedResolution = uncommittedResolution.next) &&
  582. uncommittedResolution != txnResolution
  583. );
  584. txnResolution = txnResolution.nextTxn;
  585. }
  586. var commitRejectPromise;
  587. function rejectCommit() {
  588. afterCommit();
  589. if (!commitRejectPromise) {
  590. let rejectFunction;
  591. commitRejectPromise = new Promise(
  592. (resolve, reject) => (rejectFunction = reject),
  593. );
  594. commitRejectPromise.reject = rejectFunction;
  595. }
  596. do {
  597. if (uncommittedResolution.meta && uncommittedResolution.meta.reject) {
  598. let flag = uncommittedResolution.flag & 0xf;
  599. let error = new Error('Commit failed (see commitError for details)');
  600. error.commitError = commitRejectPromise;
  601. uncommittedResolution.meta.reject(error);
  602. }
  603. } while (
  604. (uncommittedResolution = uncommittedResolution.next) &&
  605. uncommittedResolution != txnResolution
  606. );
  607. txnResolution = txnResolution.nextTxn;
  608. }
  609. function atomicStatus(uint32, flagPosition, newStatus) {
  610. if (batchDepth) {
  611. // if we are in a batch, the transaction can't close, so we do the faster,
  612. // but non-deterministic updates, knowing that the write thread can
  613. // just poll for the status change if we miss a status update
  614. let writeStatus = uint32[flagPosition];
  615. uint32[flagPosition] = newStatus;
  616. return writeStatus;
  617. //return Atomics.or(uint32, flagPosition, newStatus)
  618. } // otherwise the transaction could end at any time and we need to know the
  619. // deterministically if it is ending, so we can reset the commit promise
  620. // so we use the slower atomic operation
  621. else
  622. try {
  623. return Atomics.or(uint32, flagPosition, newStatus);
  624. } catch (error) {
  625. console.error(error);
  626. return;
  627. }
  628. }
  629. function afterCommit(txnId) {
  630. for (let i = 0, l = afterCommitCallbacks.length; i < l; i++) {
  631. try {
  632. afterCommitCallbacks[i]({
  633. next: uncommittedResolution,
  634. last: txnResolution,
  635. txnId,
  636. });
  637. } catch (error) {
  638. console.error('In aftercommit callback', error);
  639. }
  640. }
  641. }
  642. async function executeTxnCallbacks() {
  643. env.writeTxn = writeTxn = { write: true };
  644. nextTxnCallbacks.isExecuting = true;
  645. for (let i = 0; i < nextTxnCallbacks.length; i++) {
  646. let txnCallbacks = nextTxnCallbacks[i];
  647. for (let j = 0, l = txnCallbacks.length; j < l; j++) {
  648. let userTxnCallback = txnCallbacks[j];
  649. let asChild = userTxnCallback.asChild;
  650. if (asChild) {
  651. env.beginTxn(1); // abortable
  652. let parentTxn = writeTxn;
  653. env.writeTxn = writeTxn = { write: true };
  654. try {
  655. let result = userTxnCallback.callback();
  656. if (result && result.then) {
  657. hasUnresolvedTxns = true;
  658. await result;
  659. }
  660. if (result === ABORT) env.abortTxn();
  661. else env.commitTxn();
  662. clearWriteTxn(parentTxn);
  663. txnCallbacks[j] = result;
  664. } catch (error) {
  665. clearWriteTxn(parentTxn);
  666. env.abortTxn();
  667. txnError(error, txnCallbacks, j);
  668. }
  669. } else {
  670. try {
  671. let result = userTxnCallback();
  672. txnCallbacks[j] = result;
  673. if (result && result.then) {
  674. hasUnresolvedTxns = true;
  675. await result;
  676. }
  677. } catch (error) {
  678. txnError(error, txnCallbacks, j);
  679. }
  680. }
  681. }
  682. }
  683. nextTxnCallbacks = [];
  684. clearWriteTxn(null);
  685. if (hasUnresolvedTxns) {
  686. env.resumeWriting();
  687. }
  688. function txnError(error, txnCallbacks, i) {
  689. (txnCallbacks.errors || (txnCallbacks.errors = []))[i] = error;
  690. txnCallbacks[i] = CALLBACK_THREW;
  691. }
  692. }
  693. function finishBatch() {
  694. let bytes = dynamicBytes;
  695. let uint32 = bytes.uint32;
  696. let nextPosition = bytes.position + 1;
  697. let writeStatus;
  698. if (nextPosition > newBufferThreshold) {
  699. allocateInstructionBuffer(nextPosition);
  700. nextResolution.flagPosition = dynamicBytes.position << 1;
  701. nextResolution.uint32 = dynamicBytes.uint32;
  702. writeStatus = atomicStatus(uint32, bytes.position << 1, 2); // atomically write the end block
  703. } else {
  704. uint32[nextPosition << 1] = 0; // clear out the next slot
  705. writeStatus = atomicStatus(uint32, bytes.position++ << 1, 2); // atomically write the end block
  706. nextResolution.flagPosition += 2;
  707. }
  708. if (writeStatus & WAITING_OPERATION) {
  709. write(env.address, 0);
  710. }
  711. }
  712. function clearWriteTxn(parentTxn) {
  713. // TODO: We might actually want to track cursors in a write txn and manually
  714. // close them.
  715. if (writeTxn && writeTxn.refCount > 0) writeTxn.isDone = true;
  716. env.writeTxn = writeTxn = parentTxn || null;
  717. }
  718. Object.assign(LMDBStore.prototype, {
  719. put(key, value, versionOrOptions, ifVersion) {
  720. let callback,
  721. flags = 15,
  722. type = typeof versionOrOptions;
  723. if (type == 'object' && versionOrOptions) {
  724. if (versionOrOptions.noOverwrite) flags |= 0x10;
  725. if (versionOrOptions.noDupData) flags |= 0x20;
  726. if (versionOrOptions.instructedWrite) flags |= 0x2000;
  727. if (versionOrOptions.append) flags |= 0x20000;
  728. if (versionOrOptions.ifVersion != undefined)
  729. ifVersion = versionOrOptions.ifVersion;
  730. versionOrOptions = versionOrOptions.version;
  731. if (typeof ifVersion == 'function') callback = ifVersion;
  732. } else if (type == 'function') {
  733. callback = versionOrOptions;
  734. }
  735. return writeInstructions(
  736. flags,
  737. this,
  738. key,
  739. value,
  740. this.useVersions ? versionOrOptions || 0 : undefined,
  741. ifVersion,
  742. )(callback);
  743. },
  744. remove(key, ifVersionOrValue, callback) {
  745. let flags = 13;
  746. let ifVersion, value;
  747. if (ifVersionOrValue !== undefined) {
  748. if (typeof ifVersionOrValue == 'function') callback = ifVersionOrValue;
  749. else if (ifVersionOrValue === IF_EXISTS && !callback)
  750. // we have a handler for IF_EXISTS in the callback handler for remove
  751. callback = ifVersionOrValue;
  752. else if (this.useVersions) ifVersion = ifVersionOrValue;
  753. else {
  754. flags = 14;
  755. value = ifVersionOrValue;
  756. }
  757. }
  758. return writeInstructions(
  759. flags,
  760. this,
  761. key,
  762. value,
  763. undefined,
  764. ifVersion,
  765. )(callback);
  766. },
  767. del(key, options, callback) {
  768. return this.remove(key, options, callback);
  769. },
  770. ifNoExists(key, callback) {
  771. return this.ifVersion(key, null, callback);
  772. },
  773. ifVersion(key, version, callback, options) {
  774. if (!callback) {
  775. return new Batch((operations, callback) => {
  776. let promise = this.ifVersion(key, version, operations, options);
  777. if (callback) promise.then(callback);
  778. return promise;
  779. });
  780. }
  781. if (writeTxn) {
  782. if (version === undefined || this.doesExist(key, version)) {
  783. callback();
  784. return SYNC_PROMISE_SUCCESS;
  785. }
  786. return SYNC_PROMISE_FAIL;
  787. }
  788. let flags = key === undefined || version === undefined ? 1 : 4;
  789. if (options?.ifLessThan) flags |= CONDITIONAL_VERSION_LESS_THAN;
  790. if (options?.allowNotFound) flags |= CONDITIONAL_ALLOW_NOTFOUND;
  791. let finishStartWrite = writeInstructions(
  792. flags,
  793. this,
  794. key,
  795. undefined,
  796. undefined,
  797. version,
  798. );
  799. let promise;
  800. batchDepth += 2;
  801. if (batchDepth > 2) promise = finishStartWrite();
  802. else {
  803. writeBatchStart = () => {
  804. promise = finishStartWrite();
  805. };
  806. outstandingBatchCount = 0;
  807. }
  808. try {
  809. if (typeof callback === 'function') {
  810. callback();
  811. } else {
  812. for (let i = 0, l = callback.length; i < l; i++) {
  813. let operation = callback[i];
  814. this[operation.type](operation.key, operation.value);
  815. }
  816. }
  817. } finally {
  818. if (!promise) {
  819. finishBatch();
  820. batchDepth -= 2;
  821. promise = finishStartWrite(); // finish write once all the operations have been written (and it hasn't been written prematurely)
  822. writeBatchStart = null;
  823. } else {
  824. batchDepth -= 2;
  825. finishBatch();
  826. }
  827. }
  828. return promise;
  829. },
  830. batch(callbackOrOperations) {
  831. return this.ifVersion(undefined, undefined, callbackOrOperations);
  832. },
  833. drop(callback) {
  834. return writeInstructions(
  835. 1024 + 12,
  836. this,
  837. Buffer.from([]),
  838. undefined,
  839. undefined,
  840. undefined,
  841. )(callback);
  842. },
  843. clearAsync(callback) {
  844. if (this.encoder) {
  845. if (this.encoder.clearSharedData) this.encoder.clearSharedData();
  846. else if (this.encoder.structures) this.encoder.structures = [];
  847. }
  848. return writeInstructions(
  849. 12,
  850. this,
  851. Buffer.from([]),
  852. undefined,
  853. undefined,
  854. undefined,
  855. )(callback);
  856. },
  857. _triggerError() {
  858. finishBatch();
  859. },
  860. putSync(key, value, versionOrOptions, ifVersion) {
  861. if (writeTxn)
  862. return (
  863. this.put(key, value, versionOrOptions, ifVersion) ===
  864. SYNC_PROMISE_SUCCESS
  865. );
  866. else
  867. return this.transactionSync(
  868. () =>
  869. this.put(key, value, versionOrOptions, ifVersion) ===
  870. SYNC_PROMISE_SUCCESS,
  871. overlappingSync ? 0x10002 : 2,
  872. ); // non-abortable, async flush
  873. },
  874. removeSync(key, ifVersionOrValue) {
  875. if (writeTxn)
  876. return this.remove(key, ifVersionOrValue) === SYNC_PROMISE_SUCCESS;
  877. else
  878. return this.transactionSync(
  879. () => this.remove(key, ifVersionOrValue) === SYNC_PROMISE_SUCCESS,
  880. overlappingSync ? 0x10002 : 2,
  881. ); // non-abortable, async flush
  882. },
  883. transaction(callback) {
  884. if (writeTxn && !nextTxnCallbacks.isExecuting) {
  885. // already nested in a transaction, just execute and return
  886. return callback();
  887. }
  888. return this.transactionAsync(callback);
  889. },
  890. childTransaction(callback) {
  891. if (useWritemap)
  892. throw new Error(
  893. 'Child transactions are not supported in writemap mode',
  894. );
  895. if (writeTxn) {
  896. let parentTxn = writeTxn;
  897. let thisTxn = (env.writeTxn = writeTxn = { write: true });
  898. env.beginTxn(1); // abortable
  899. let callbackDone, finishTxn;
  900. try {
  901. return (writeTxn.childResults = when(
  902. callback(),
  903. (finishTxn = (result) => {
  904. if (writeTxn !== thisTxn)
  905. // need to wait for child txn to finish asynchronously
  906. return writeTxn.childResults.then(() => finishTxn(result));
  907. callbackDone = true;
  908. if (result === ABORT) env.abortTxn();
  909. else env.commitTxn();
  910. clearWriteTxn(parentTxn);
  911. return result;
  912. }),
  913. (error) => {
  914. env.abortTxn();
  915. clearWriteTxn(parentTxn);
  916. throw error;
  917. },
  918. ));
  919. } catch (error) {
  920. if (!callbackDone) env.abortTxn();
  921. clearWriteTxn(parentTxn);
  922. throw error;
  923. }
  924. }
  925. return this.transactionAsync(callback, true);
  926. },
  927. transactionAsync(callback, asChild) {
  928. let txnIndex;
  929. let txnCallbacks;
  930. if (lastQueuedResolution.callbacks) {
  931. txnCallbacks = lastQueuedResolution.callbacks;
  932. txnIndex =
  933. txnCallbacks.push(asChild ? { callback, asChild } : callback) - 1;
  934. } else if (nextTxnCallbacks.isExecuting) {
  935. txnCallbacks = [asChild ? { callback, asChild } : callback];
  936. txnCallbacks.results = commitPromise;
  937. nextTxnCallbacks.push(txnCallbacks);
  938. txnIndex = 0;
  939. } else {
  940. if (writeTxn)
  941. throw new Error('Can not enqueue transaction during write txn');
  942. let finishWrite = writeInstructions(
  943. 8 | (this.strictAsyncOrder ? 0x100000 : 0),
  944. this,
  945. );
  946. txnCallbacks = [asChild ? { callback, asChild } : callback];
  947. lastQueuedResolution.callbacks = txnCallbacks;
  948. lastQueuedResolution.id = Math.random();
  949. txnCallbacks.results = finishWrite();
  950. txnIndex = 0;
  951. }
  952. return txnCallbacks.results.then((results) => {
  953. let result = txnCallbacks[txnIndex];
  954. if (result === CALLBACK_THREW) throw txnCallbacks.errors[txnIndex];
  955. return result;
  956. });
  957. },
  958. transactionSync(callback, flags) {
  959. if (writeTxn) {
  960. if (!useWritemap && (flags == undefined || flags & 1))
  961. // can't use child transactions in write maps
  962. // already nested in a transaction, execute as child transaction (if possible) and return
  963. return this.childTransaction(callback);
  964. let result = callback(); // else just run in current transaction
  965. if (result == ABORT && !abortedNonChildTransactionWarn) {
  966. console.warn(
  967. 'Can not abort a transaction inside another transaction with ' +
  968. (this.cache ? 'caching enabled' : 'useWritemap enabled'),
  969. );
  970. abortedNonChildTransactionWarn = true;
  971. }
  972. return result;
  973. }
  974. let callbackDone, finishTxn;
  975. this.transactions++;
  976. if (!env.address)
  977. throw new Error(
  978. 'The database has been closed and you can not transact on it',
  979. );
  980. env.beginTxn(flags == undefined ? 3 : flags);
  981. let thisTxn = (writeTxn = env.writeTxn = { write: true });
  982. try {
  983. this.emit('begin-transaction');
  984. return (writeTxn.childResults = when(
  985. callback(),
  986. (finishTxn = (result) => {
  987. if (writeTxn !== thisTxn)
  988. // need to wait for child txn to finish asynchronously
  989. return writeTxn.childResults.then(() => finishTxn(result));
  990. try {
  991. callbackDone = true;
  992. if (result === ABORT) env.abortTxn();
  993. else {
  994. env.commitTxn();
  995. resetReadTxn();
  996. }
  997. return result;
  998. } finally {
  999. clearWriteTxn(null);
  1000. }
  1001. }),
  1002. (error) => {
  1003. try {
  1004. env.abortTxn();
  1005. } catch (e) {}
  1006. clearWriteTxn(null);
  1007. throw error;
  1008. },
  1009. ));
  1010. } catch (error) {
  1011. if (!callbackDone)
  1012. try {
  1013. env.abortTxn();
  1014. } catch (e) {}
  1015. clearWriteTxn(null);
  1016. throw error;
  1017. }
  1018. },
  1019. getWriteTxnId() {
  1020. return env.getWriteTxnId();
  1021. },
  1022. transactionSyncStart(callback) {
  1023. return this.transactionSync(callback, 0);
  1024. },
  1025. // make the db a thenable/promise-like for when the last commit is committed
  1026. committed: (committed = {
  1027. then(onfulfilled, onrejected) {
  1028. if (commitPromise) return commitPromise.then(onfulfilled, onrejected);
  1029. if (lastWritePromise)
  1030. // always resolve to true
  1031. return lastWritePromise.then(() => onfulfilled(true), onrejected);
  1032. return SYNC_PROMISE_SUCCESS.then(onfulfilled, onrejected);
  1033. },
  1034. }),
  1035. flushed: {
  1036. // make this a thenable for when the commit is flushed to disk
  1037. then(onfulfilled, onrejected) {
  1038. if (flushPromise) flushPromise.hasCallbacks = true;
  1039. return Promise.all([flushPromise || committed, lastSyncTxnFlush]).then(
  1040. onfulfilled,
  1041. onrejected,
  1042. );
  1043. },
  1044. },
  1045. _endWrites(resolvedPromise, resolvedSyncPromise) {
  1046. this.put =
  1047. this.remove =
  1048. this.del =
  1049. this.batch =
  1050. this.removeSync =
  1051. this.putSync =
  1052. this.transactionAsync =
  1053. this.drop =
  1054. this.clearAsync =
  1055. () => {
  1056. throw new Error('Database is closed');
  1057. };
  1058. // wait for all txns to finish, checking again after the current txn is done
  1059. let finalPromise = flushPromise || commitPromise || lastWritePromise;
  1060. if (flushPromise) flushPromise.hasCallbacks = true;
  1061. let finalSyncPromise = lastSyncTxnFlush;
  1062. if (
  1063. (finalPromise && resolvedPromise != finalPromise) ||
  1064. (finalSyncPromise && resolvedSyncPromise != finalSyncPromise)
  1065. ) {
  1066. return Promise.all([finalPromise, finalSyncPromise]).then(
  1067. () => this._endWrites(finalPromise, finalSyncPromise),
  1068. () => this._endWrites(finalPromise, finalSyncPromise),
  1069. );
  1070. }
  1071. Object.defineProperty(env, 'sync', { value: null });
  1072. },
  1073. on(event, callback) {
  1074. if (event == 'beforecommit') {
  1075. eventTurnBatching = true;
  1076. beforeCommitCallbacks.push(callback);
  1077. } else if (event == 'aftercommit') afterCommitCallbacks.push(callback);
  1078. else if (event == 'committed') {
  1079. this.getUserSharedBuffer('__committed__', new ArrayBuffer(0), {
  1080. envKey: true,
  1081. callback,
  1082. });
  1083. } else super.on(event, callback);
  1084. },
  1085. });
  1086. }
  1087. class Batch extends Array {
  1088. constructor(callback) {
  1089. super();
  1090. this.callback = callback;
  1091. }
  1092. put(key, value) {
  1093. this.push({ type: 'put', key, value });
  1094. }
  1095. del(key) {
  1096. this.push({ type: 'del', key });
  1097. }
  1098. clear() {
  1099. this.length = 0;
  1100. }
  1101. write(callback) {
  1102. return this.callback(this, callback);
  1103. }
  1104. }
  1105. export function asBinary(buffer) {
  1106. return {
  1107. ['\x10binary-data\x02']: buffer,
  1108. };
  1109. }