From 2588e6fa6a97beb49556e9bd1b60648010376ead Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 25 Nov 2019 16:10:14 +0000 Subject: [PATCH 1/2] perf: debounce pin flush When importing a directory of files, after every block is added we pin it and flush pins to the repo. This is really slow and if we are adding lots of files, the pinset is out of date almost immediately so this PR debounces flushing pins until after we've stopped adding things. A call to `pinManager.flushPins` will return a promise, and that promise will resolve once the pins have been flushed, but it will first wait 100ms and see if any subsequent flushes have been requested in that time - if so that promise will end early and return the promise created by the later flush request. --- src/core/components/pin/pin-manager.js | 93 ++++++++++++++++---------- 1 file changed, 58 insertions(+), 35 deletions(-) diff --git a/src/core/components/pin/pin-manager.js b/src/core/components/pin/pin-manager.js index 8cd8c1c261..53a16b2fbb 100644 --- a/src/core/components/pin/pin-manager.js +++ b/src/core/components/pin/pin-manager.js @@ -10,7 +10,8 @@ const multicodec = require('multicodec') const dagCborLinks = require('dag-cbor-links') const debug = require('debug') const { cidToString } = require('../../../utils/cid') - +const delay = require('delay') +const AbortController = require('abort-controller') const createPinSet = require('./pin-set') const { Errors } = require('interface-datastore') @@ -106,43 +107,65 @@ class PinManager { // Encode and write pin key sets to the datastore: // a DAGLink for each of the recursive and direct pinsets // a DAGNode holding those as DAGLinks, a kind of root pin - async flushPins () { - const [ - dLink, - rLink - ] = await Promise.all([ - // create a DAGLink to the node with direct pins - this.pinset.storeSet(this.directKeys()) - .then((result) => { - return new DAGLink(PinTypes.direct, result.node.size, result.cid) - }), - // create a DAGLink to the node with recursive pins - this.pinset.storeSet(this.recursiveKeys()) - .then((result) => { - return new DAGLink(PinTypes.recursive, result.node.size, result.cid) - }), - // the pin-set nodes link to a special 'empty' node, so make sure it exists - this.dag.put(new DAGNode(Buffer.alloc(0)), { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - }) - ]) + async flushPins () { // eslint-disable-line require-await + if (this._flushingPins) { + this._flushingPins.controller.abort() + } - // create a root node with DAGLinks to the direct and recursive DAGs - const rootNode = new DAGNode(Buffer.alloc(0), [dLink, rLink]) - const rootCid = await this.dag.put(rootNode, { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - }) + const controller = new AbortController() + + this._flushingPins = { + controller, + promise: delay(100) + .then(async () => { + if (controller.signal.aborted) { + return + } - // save root to datastore under a consistent key - await this.repo.datastore.put(PIN_DS_KEY, rootCid.buffer) + this._flushingPins = null + + const [ + dLink, + rLink + ] = await Promise.all([ + // create a DAGLink to the node with direct pins + this.pinset.storeSet(this.directKeys()) + .then((result) => { + return new DAGLink(PinTypes.direct, result.node.size, result.cid) + }), + // create a DAGLink to the node with recursive pins + this.pinset.storeSet(this.recursiveKeys()) + .then((result) => { + return new DAGLink(PinTypes.recursive, result.node.size, result.cid) + }), + // the pin-set nodes link to a special 'empty' node, so make sure it exists + this.dag.put(new DAGNode(Buffer.alloc(0)), { + version: 0, + format: multicodec.DAG_PB, + hashAlg: multicodec.SHA2_256, + preload: false + }) + ]) + + // create a root node with DAGLinks to the direct and recursive DAGs + const rootNode = new DAGNode(Buffer.alloc(0), [dLink, rLink]) + + const rootCid = await this.dag.put(rootNode, { + version: 0, + format: multicodec.DAG_PB, + hashAlg: multicodec.SHA2_256, + preload: false + }) + + // save root to datastore under a consistent key + await this.repo.datastore.put(PIN_DS_KEY, rootCid.buffer) + + this.log(`Flushed pins with root: ${rootCid}`) + }) + .catch(err => this.log(`Could not flush pins: ${err}`)) + } - this.log(`Flushed pins with root: ${rootCid}`) + return this._flushingPins.promise } async load () { From b96557a1f3a08127b48fc21b6ff818b1f7ba1969 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 25 Nov 2019 18:45:45 +0000 Subject: [PATCH 2/2] fix: move delay to deps --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index f660be2702..bc773936c2 100644 --- a/package.json +++ b/package.json @@ -85,6 +85,7 @@ "datastore-core": "~0.7.0", "datastore-pubsub": "^0.2.1", "debug": "^4.1.0", + "delay": "^4.1.0", "dlv": "^1.1.3", "err-code": "^2.0.0", "explain-error": "^1.0.4", @@ -199,7 +200,6 @@ "async": "^2.6.3", "base64url": "^3.0.1", "clear-module": "^4.0.0", - "delay": "^4.1.0", "detect-node": "^2.0.4", "dir-compare": "^1.7.3", "execa": "^3.0.0",