diff --git a/package.json b/package.json index f660be2702..bc773936c2 100644 --- a/package.json +++ b/package.json @@ -85,6 +85,7 @@ "datastore-core": "~0.7.0", "datastore-pubsub": "^0.2.1", "debug": "^4.1.0", + "delay": "^4.1.0", "dlv": "^1.1.3", "err-code": "^2.0.0", "explain-error": "^1.0.4", @@ -199,7 +200,6 @@ "async": "^2.6.3", "base64url": "^3.0.1", "clear-module": "^4.0.0", - "delay": "^4.1.0", "detect-node": "^2.0.4", "dir-compare": "^1.7.3", "execa": "^3.0.0", diff --git a/src/core/components/pin/pin-manager.js b/src/core/components/pin/pin-manager.js index 8cd8c1c261..53a16b2fbb 100644 --- a/src/core/components/pin/pin-manager.js +++ b/src/core/components/pin/pin-manager.js @@ -10,7 +10,8 @@ const multicodec = require('multicodec') const dagCborLinks = require('dag-cbor-links') const debug = require('debug') const { cidToString } = require('../../../utils/cid') - +const delay = require('delay') +const AbortController = require('abort-controller') const createPinSet = require('./pin-set') const { Errors } = require('interface-datastore') @@ -106,43 +107,65 @@ class PinManager { // Encode and write pin key sets to the datastore: // a DAGLink for each of the recursive and direct pinsets // a DAGNode holding those as DAGLinks, a kind of root pin - async flushPins () { - const [ - dLink, - rLink - ] = await Promise.all([ - // create a DAGLink to the node with direct pins - this.pinset.storeSet(this.directKeys()) - .then((result) => { - return new DAGLink(PinTypes.direct, result.node.size, result.cid) - }), - // create a DAGLink to the node with recursive pins - this.pinset.storeSet(this.recursiveKeys()) - .then((result) => { - return new DAGLink(PinTypes.recursive, result.node.size, result.cid) - }), - // the pin-set nodes link to a special 'empty' node, so make sure it exists - this.dag.put(new DAGNode(Buffer.alloc(0)), { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - }) - ]) + async flushPins () { // eslint-disable-line require-await + if (this._flushingPins) { + this._flushingPins.controller.abort() + } - // create a root node with DAGLinks to the direct and recursive DAGs - const rootNode = new DAGNode(Buffer.alloc(0), [dLink, rLink]) - const rootCid = await this.dag.put(rootNode, { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - }) + const controller = new AbortController() + + this._flushingPins = { + controller, + promise: delay(100) + .then(async () => { + if (controller.signal.aborted) { + return + } - // save root to datastore under a consistent key - await this.repo.datastore.put(PIN_DS_KEY, rootCid.buffer) + this._flushingPins = null + + const [ + dLink, + rLink + ] = await Promise.all([ + // create a DAGLink to the node with direct pins + this.pinset.storeSet(this.directKeys()) + .then((result) => { + return new DAGLink(PinTypes.direct, result.node.size, result.cid) + }), + // create a DAGLink to the node with recursive pins + this.pinset.storeSet(this.recursiveKeys()) + .then((result) => { + return new DAGLink(PinTypes.recursive, result.node.size, result.cid) + }), + // the pin-set nodes link to a special 'empty' node, so make sure it exists + this.dag.put(new DAGNode(Buffer.alloc(0)), { + version: 0, + format: multicodec.DAG_PB, + hashAlg: multicodec.SHA2_256, + preload: false + }) + ]) + + // create a root node with DAGLinks to the direct and recursive DAGs + const rootNode = new DAGNode(Buffer.alloc(0), [dLink, rLink]) + + const rootCid = await this.dag.put(rootNode, { + version: 0, + format: multicodec.DAG_PB, + hashAlg: multicodec.SHA2_256, + preload: false + }) + + // save root to datastore under a consistent key + await this.repo.datastore.put(PIN_DS_KEY, rootCid.buffer) + + this.log(`Flushed pins with root: ${rootCid}`) + }) + .catch(err => this.log(`Could not flush pins: ${err}`)) + } - this.log(`Flushed pins with root: ${rootCid}`) + return this._flushingPins.promise } async load () {