From 21e12f56ee94090577ca7f2a2c8254f56daaa73a Mon Sep 17 00:00:00 2001 From: haad Date: Thu, 24 Nov 2016 12:56:07 +0100 Subject: [PATCH 01/15] refactor(streams): Refactor response stream handling. Add dagnode-stream to transform file results to DAGNode objects. Add stream-to-value to convert a response stream to a single value. Refactor request-api so that chunked JSON objects are not buffered anymore. This touches a bunch of the files pushing the transform function down to the individual commands. --- src/api/add.js | 18 +++-- src/api/dht.js | 7 +- src/api/get.js | 15 ++-- src/api/ping.js | 32 ++++++-- src/api/refs.js | 16 +++- src/api/util/fs-add.js | 12 +-- src/api/util/url-add.js | 12 +-- src/dagnode-stream.js | 55 ++++++++++++++ src/get-dagnode.js | 6 +- src/request-api.js | 103 ++++++++------------------ src/stream-to-json-value.js | 34 +++++++++ src/stream-to-value.js | 12 +++ src/stringlist-to-array.js | 9 +++ src/tar-stream-to-objects.js | 69 +++++++++-------- test/interface-ipfs-core/ping.spec.js | 8 ++ test/interface-ipfs-core/refs.spec.js | 1 - test/ipfs-api/util.spec.js | 1 - test/setup/spawn-daemons.js | 8 +- 18 files changed, 269 insertions(+), 149 deletions(-) create mode 100644 src/dagnode-stream.js create mode 100644 src/stream-to-json-value.js create mode 100644 src/stream-to-value.js create mode 100644 src/stringlist-to-array.js diff --git a/src/api/add.js b/src/api/add.js index 1379a4296..1b41f7578 100644 --- a/src/api/add.js +++ b/src/api/add.js @@ -1,24 +1,26 @@ 'use strict' const isStream = require('isstream') -const addToDagNodesTransform = require('../add-to-dagnode-transform') const promisify = require('promisify-es6') +const DAGNodeStream = require('../dagnode-stream') module.exports = (send) => { return promisify((files, callback) => { - const good = Buffer.isBuffer(files) || + const ok = Buffer.isBuffer(files) || isStream.isReadable(files) || Array.isArray(files) - if (!good) { - callback(new Error('"files" must be a buffer, readable stream, or array of objects')) + if (!ok) { + return callback(new Error('"files" must be a buffer, readable stream, or array of objects')) } - const sendWithTransform = send.withTransform(addToDagNodesTransform) - - return sendWithTransform({ + const request = { path: 'add', files: files - }, callback) + } + + // Transform the response stream to DAGNode values + const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback) + send.andTransform(request, transform, callback) }) } diff --git a/src/api/dht.js b/src/api/dht.js index fe42146ec..dd78e7e0e 100644 --- a/src/api/dht.js +++ b/src/api/dht.js @@ -1,6 +1,7 @@ 'use strict' const promisify = require('promisify-es6') +const streamToValue = require('../stream-to-value') module.exports = (send) => { return { @@ -19,11 +20,13 @@ module.exports = (send) => { opts = {} } - send({ + const request = { path: 'dht/findprovs', args: args, qs: opts - }, callback) + } + + send.andTransform(request, streamToValue, callback) }), get: promisify((key, opts, callback) => { if (typeof opts === 'function' && diff --git a/src/api/get.js b/src/api/get.js index fa7ba1250..9634b7235 100644 --- a/src/api/get.js +++ b/src/api/get.js @@ -1,11 +1,11 @@ 'use strict' -const tarStreamToObjects = require('../tar-stream-to-objects') -const cleanMultihash = require('../clean-multihash') const promisify = require('promisify-es6') +const cleanMultihash = require('../clean-multihash') +const TarStreamToObjects = require('../tar-stream-to-objects') module.exports = (send) => { - return promisify(function get (path, opts, callback) { + return promisify((path, opts, callback) => { if (typeof opts === 'function' && !callback) { callback = opts @@ -26,12 +26,13 @@ module.exports = (send) => { return callback(err) } - var sendWithTransform = send.withTransform(tarStreamToObjects) - - sendWithTransform({ + const request = { path: 'get', args: path, qs: opts - }, callback) + } + + // Convert the response stream to TarStream objects + send.andTransform(request, TarStreamToObjects.from, callback) }) } diff --git a/src/api/ping.js b/src/api/ping.js index 5e7c74f6f..eeaa3125a 100644 --- a/src/api/ping.js +++ b/src/api/ping.js @@ -1,18 +1,36 @@ 'use strict' const promisify = require('promisify-es6') +const streamToValue = require('../stream-to-value') module.exports = (send) => { return promisify((id, callback) => { - send({ + const request = { path: 'ping', args: id, qs: { n: 1 } - }, function (err, res) { - if (err) { - return callback(err, null) - } - callback(null, res[1]) - }) + } + + // Transform the response stream to a value: + // { Success: , Time: , Text: } + const transform = (res, callback) => { + streamToValue(res, (err, res) => { + if (err) { + return callback(err) + } + + // go-ipfs http api currently returns 3 lines for a ping. + // they're a little messed, so take the correct values from each lines. + const pingResult = { + Success: res[1].Success, + Time: res[1].Time, + Text: res[2].Text + } + + callback(null, pingResult) + }) + } + + send.andTransform(request, transform, callback) }) } diff --git a/src/api/refs.js b/src/api/refs.js index 318ccb59f..56958ca7f 100644 --- a/src/api/refs.js +++ b/src/api/refs.js @@ -1,6 +1,7 @@ 'use strict' const promisify = require('promisify-es6') +const streamToValue = require('../stream-to-value') module.exports = (send) => { const refs = promisify((args, opts, callback) => { @@ -8,21 +9,28 @@ module.exports = (send) => { callback = opts opts = {} } - return send({ + + const request = { path: 'refs', args: args, qs: opts - }, callback) + } + + send.andTransform(request, streamToValue, callback) }) + refs.local = promisify((opts, callback) => { if (typeof (opts) === 'function') { callback = opts opts = {} } - return send({ + + const request = { path: 'refs', qs: opts - }, callback) + } + + send.andTransform(request, streamToValue, callback) }) return refs diff --git a/src/api/util/fs-add.js b/src/api/util/fs-add.js index c78dda13a..6c63cf1ab 100644 --- a/src/api/util/fs-add.js +++ b/src/api/util/fs-add.js @@ -1,8 +1,8 @@ 'use strict' const isNode = require('detect-node') -const addToDagNodesTransform = require('./../../add-to-dagnode-transform') const promisify = require('promisify-es6') +const DAGNodeStream = require('../../dagnode-stream') module.exports = (send) => { return promisify((path, opts, callback) => { @@ -28,12 +28,14 @@ module.exports = (send) => { return callback(new Error('"path" must be a string')) } - const sendWithTransform = send.withTransform(addToDagNodesTransform) - - sendWithTransform({ + const request = { path: 'add', qs: opts, files: path - }, callback) + } + + // Transform the response stream to DAGNode values + const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback) + send.andTransform(request, transform, callback) }) } diff --git a/src/api/util/url-add.js b/src/api/util/url-add.js index 7dc7694b2..7db525ad7 100644 --- a/src/api/util/url-add.js +++ b/src/api/util/url-add.js @@ -3,9 +3,8 @@ const promisify = require('promisify-es6') const once = require('once') const parseUrl = require('url').parse - const request = require('../../request') -const addToDagNodesTransform = require('./../../add-to-dagnode-transform') +const DAGNodeStream = require('../../dagnode-stream') module.exports = (send) => { return promisify((url, opts, callback) => { @@ -28,7 +27,6 @@ module.exports = (send) => { return callback(new Error('"url" param must be an http(s) url')) } - const sendWithTransform = send.withTransform(addToDagNodesTransform) callback = once(callback) request(parseUrl(url).protocol)(url, (res) => { @@ -37,11 +35,15 @@ module.exports = (send) => { return callback(new Error(`Failed to download with ${res.statusCode}`)) } - sendWithTransform({ + const params = { path: 'add', qs: opts, files: res - }, callback) + } + + // Transform the response stream to DAGNode values + const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback) + send.andTransform(params, transform, callback) }).end() }) } diff --git a/src/dagnode-stream.js b/src/dagnode-stream.js new file mode 100644 index 000000000..35cb12c44 --- /dev/null +++ b/src/dagnode-stream.js @@ -0,0 +1,55 @@ +'use strict' + +const TransformStream = require('readable-stream').Transform +const streamToValue = require('./stream-to-value') +const getDagNode = require('./get-dagnode') + +/* + Transforms a stream of objects to DAGNodes and outputs them as objects. + + Usage: inputStream.pipe(DAGNodeStream({ send: send })) + + Input object format: + { + Name: '/path/to/file/foo.txt', + Hash: 'Qma4hjFTnCasJ8PVp3mZbZK5g2vGDT4LByLJ7m8ciyRFZP' + } + + Output object format: + { + path: '/path/to/file/foo.txt', + hash: 'Qma4hjFTnCasJ8PVp3mZbZK5g2vGDT4LByLJ7m8ciyRFZP', + size: 20 + } +*/ +class DAGNodeStream extends TransformStream { + constructor (options) { + const opts = Object.assign(options || {}, { objectMode: true }) + super(opts) + this._send = opts.send + } + + static streamToValue (send, inputStream, callback) { + const outputStream = inputStream.pipe(new DAGNodeStream({ send: send })) + streamToValue(outputStream, callback) + } + + _transform (obj, enc, callback) { + getDagNode(this._send, obj.Hash, (err, node) => { + if (err) { + return callback(err) + } + + const dag = { + path: obj.Name, + hash: obj.Hash, + size: node.size + } + + this.push(dag) + callback(null) + }) + } +} + +module.exports = DAGNodeStream diff --git a/src/get-dagnode.js b/src/get-dagnode.js index 75cd0886c..d9942d99c 100644 --- a/src/get-dagnode.js +++ b/src/get-dagnode.js @@ -1,8 +1,8 @@ 'use strict' const DAGNode = require('ipld-dag-pb').DAGNode -const bl = require('bl') const parallel = require('async/parallel') +const streamToValue = require('./stream-to-value') module.exports = function (send, hash, callback) { // Retrieve the object and its data in parallel, then produce a DAGNode @@ -36,12 +36,12 @@ module.exports = function (send, hash, callback) { if (Buffer.isBuffer(stream)) { DAGNode.create(stream, object.Links, callback) } else { - stream.pipe(bl(function (err, data) { + streamToValue(stream, (err, data) => { if (err) { return callback(err) } DAGNode.create(data, object.Links, callback) - })) + }) } }) } diff --git a/src/request-api.js b/src/request-api.js index 5eb150269..b980bc4ac 100644 --- a/src/request-api.js +++ b/src/request-api.js @@ -1,50 +1,28 @@ 'use strict' const Qs = require('qs') -const ndjson = require('ndjson') const isNode = require('detect-node') +const ndjson = require('ndjson') const once = require('once') -const concat = require('concat-stream') - const getFilesStream = require('./get-files-stream') +const streamToValue = require('./stream-to-value') +const streamToJsonValue = require('./stream-to-json-value') const request = require('./request') // -- Internal -function parseChunkedJson (res, cb) { - res - .pipe(ndjson.parse()) - .once('error', cb) - .pipe(concat((data) => cb(null, data))) -} - -function parseRaw (res, cb) { - res - .once('error', cb) - .pipe(concat((data) => cb(null, data))) -} - -function parseJson (res, cb) { - res - .once('error', cb) - .pipe(concat((data) => { - if (!data || data.length === 0) { - return cb() - } - - if (Buffer.isBuffer(data)) { - data = data.toString() - } - - let res - try { - res = JSON.parse(data) - } catch (err) { - return cb(err) - } - - cb(null, res) - })) +function parseError (res, cb) { + const error = new Error(`Server responded with ${res.statusCode}`) + streamToJsonValue(res, (err, payload) => { + if (err) { + return cb(err) + } + if (payload) { + error.code = payload.Code + error.message = payload.Message || payload.toString() + } + cb(error) + }) } function onRes (buffer, cb) { @@ -55,33 +33,26 @@ function onRes (buffer, cb) { res.headers['content-type'].indexOf('application/json') === 0 if (res.statusCode >= 400 || !res.statusCode) { - const error = new Error(`Server responded with ${res.statusCode}`) - - parseJson(res, (err, payload) => { - if (err) { - return cb(err) - } - if (payload) { - error.code = payload.Code - error.message = payload.Message || payload.toString() - } - cb(error) - }) + return parseError(res, cb) } + // Return the response stream directly if (stream && !buffer) { return cb(null, res) } + // Return a stream of JSON objects if (chunkedObjects && isJson) { - return parseChunkedJson(res, cb) + return cb(null, res.pipe(ndjson.parse())) } + // Return a JSON object if (isJson) { - return parseJson(res, cb) + return streamToJsonValue(res, cb) } - parseRaw(res, cb) + // Return a value + return streamToValue(res, cb) } } @@ -163,7 +134,7 @@ function requestAPI (config, options, callback) { // // -- Module Interface -exports = module.exports = function getRequestAPI (config) { +exports = module.exports = (config) => { /* * options: { * path: // API path (like /add or /config) - type: string @@ -173,7 +144,7 @@ exports = module.exports = function getRequestAPI (config) { * buffer: // buffer the request before sending it - type: bool * } */ - const send = function (options, callback) { + const send = (options, callback) => { if (typeof options !== 'object') { return callback(new Error('no options were passed')) } @@ -181,25 +152,13 @@ exports = module.exports = function getRequestAPI (config) { return requestAPI(config, options, callback) } - // Wraps the 'send' function such that an asynchronous - // transform may be applied to its result before - // passing it on to either its callback or promise. - send.withTransform = function (transform) { - return function (options, callback) { - if (typeof options !== 'object') { - return callback(new Error('no options were passed')) + send.andTransform = (options, transform, callback) => { + return send(options, (err, res) => { + if (err) { + return callback(err) } - - send(options, wrap(callback)) - - function wrap (func) { - if (func) { - return function (err, res) { - transform(err, res, send, func) - } - } - } - } + transform(res, callback) + }) } return send diff --git a/src/stream-to-json-value.js b/src/stream-to-json-value.js new file mode 100644 index 000000000..e42de2fc6 --- /dev/null +++ b/src/stream-to-json-value.js @@ -0,0 +1,34 @@ +'use strict' + +const streamToValue = require('./stream-to-value') + +/* + Converts a stream to a single JSON value +*/ +function streamToJsonValue (res, cb) { + streamToValue(res, (err, data) => { + if (err) { + return cb(err) + } + + if (!data || data.length === 0) { + return cb() + } + + // TODO: check if needed, afaik JSON.parse can parse Buffers + if (Buffer.isBuffer(data)) { + data = data.toString() + } + + let res + try { + res = JSON.parse(data) + } catch (err) { + return cb(err) + } + + cb(null, res) + }) +} + +module.exports = streamToJsonValue diff --git a/src/stream-to-value.js b/src/stream-to-value.js new file mode 100644 index 000000000..66e8e2ad6 --- /dev/null +++ b/src/stream-to-value.js @@ -0,0 +1,12 @@ +'use strict' + +const concat = require('concat-stream') + +/* + Concatenate a stream to a single value. +*/ +function streamToValue (res, callback) { + res.pipe(concat((data) => callback(null, data))) +} + +module.exports = streamToValue diff --git a/src/stringlist-to-array.js b/src/stringlist-to-array.js new file mode 100644 index 000000000..df28ee6df --- /dev/null +++ b/src/stringlist-to-array.js @@ -0,0 +1,9 @@ +'use strict' + +// Converts a go-ipfs "stringList" to an array +// { Strings: ['A', 'B'] } --> ['A', 'B'] +function stringlistToArray (res, cb) { + cb(null, res.Strings || []) +} + +module.exports = stringlistToArray diff --git a/src/tar-stream-to-objects.js b/src/tar-stream-to-objects.js index acab14658..aabea2745 100644 --- a/src/tar-stream-to-objects.js +++ b/src/tar-stream-to-objects.js @@ -1,38 +1,45 @@ 'use strict' const tar = require('tar-stream') -const Readable = require('readable-stream') +const ReadableStream = require('readable-stream').Readable +/* + Transform tar stream into a stream of objects: -// transform tar stream into readable stream of -// { path: 'string', content: Readable } -module.exports = (err, res, send, done) => { - if (err) { - return done(err) + Output format: + { path: 'string', content: Readable } +*/ +class TarStreamToObjects extends ReadableStream { + constructor (options) { + const opts = Object.assign(options || {}, { objectMode: true }) + super(opts) } - const objStream = new Readable({ objectMode: true }) - objStream._read = function noop () {} - - res - .pipe(tar.extract()) - .on('entry', (header, stream, next) => { - stream.on('end', next) - - if (header.type !== 'directory') { - objStream.push({ - path: header.name, - content: stream - }) - } else { - objStream.push({ - path: header.name - }) - stream.resume() - } - }) - .on('finish', () => { - objStream.push(null) - }) - - done(null, objStream) + static from (inputStream, callback) { + let outputStream = new TarStreamToObjects() + + inputStream + .pipe(tar.extract()) + .on('entry', (header, stream, next) => { + stream.on('end', next) + + if (header.type !== 'directory') { + outputStream.push({ + path: header.name, + content: stream + }) + } else { + outputStream.push({ + path: header.name + }) + stream.resume() + } + }) + .on('finish', () => outputStream.push(null)) + + callback(null, outputStream) + } + + _read () {} } + +module.exports = TarStreamToObjects diff --git a/test/interface-ipfs-core/ping.spec.js b/test/interface-ipfs-core/ping.spec.js index bf4e2e606..a7dbb4a28 100644 --- a/test/interface-ipfs-core/ping.spec.js +++ b/test/interface-ipfs-core/ping.spec.js @@ -12,6 +12,10 @@ describe('.ping', () => { apiClients.a.ping(id.id, (err, res) => { expect(err).to.not.exist expect(res).to.have.a.property('Success') + expect(res).to.have.a.property('Time') + expect(res).to.have.a.property('Text') + expect(res.Text).to.contain('Average latency') + expect(res.Time).to.be.a('number') done() }) }) @@ -25,6 +29,10 @@ describe('.ping', () => { }) .then((res) => { expect(res).to.have.a.property('Success') + expect(res).to.have.a.property('Time') + expect(res).to.have.a.property('Text') + expect(res.Text).to.contain('Average latency') + expect(res.Time).to.be.a('number') }) }) }) diff --git a/test/interface-ipfs-core/refs.spec.js b/test/interface-ipfs-core/refs.spec.js index 5b33662b0..3a38abe1f 100644 --- a/test/interface-ipfs-core/refs.spec.js +++ b/test/interface-ipfs-core/refs.spec.js @@ -65,7 +65,6 @@ describe('.refs', () => { ipfs.refs(folder, {format: ' '}, (err, objs) => { expect(err).to.not.exist expect(objs).to.eql(result) - done() }) }) diff --git a/test/ipfs-api/util.spec.js b/test/ipfs-api/util.spec.js index e2e9dede2..d103844bd 100644 --- a/test/ipfs-api/util.spec.js +++ b/test/ipfs-api/util.spec.js @@ -65,7 +65,6 @@ describe('.util', () => { ipfs.util.addFromFs(filePath, (err, result) => { expect(err).to.not.exist expect(result.length).to.be.above(5) - done() }) }) diff --git a/test/setup/spawn-daemons.js b/test/setup/spawn-daemons.js index 5a2d3b659..2131fed82 100644 --- a/test/setup/spawn-daemons.js +++ b/test/setup/spawn-daemons.js @@ -28,9 +28,11 @@ function startDisposableDaemons (callback) { const configValues = { Bootstrap: [], Discovery: {}, - 'HTTPHeaders.Access-Control-Allow-Origin': ['*'], - 'HTTPHeaders.Access-Control-Allow-Credentials': 'true', - 'HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET'] + API: { + 'HTTPHeaders.Access-Control-Allow-Origin': ['*'], + 'HTTPHeaders.Access-Control-Allow-Credentials': 'true', + 'HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET'] + } } eachSeries(Object.keys(configValues), (configKey, cb) => { From fb1f1358daac6bde66f20b42e30d6ebf117db6d0 Mon Sep 17 00:00:00 2001 From: haad Date: Mon, 19 Dec 2016 09:23:35 +0100 Subject: [PATCH 02/15] Improve function descriptions --- src/dagnode-stream.js | 3 ++- src/request-api.js | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/dagnode-stream.js b/src/dagnode-stream.js index 35cb12c44..150ddfff6 100644 --- a/src/dagnode-stream.js +++ b/src/dagnode-stream.js @@ -5,7 +5,8 @@ const streamToValue = require('./stream-to-value') const getDagNode = require('./get-dagnode') /* - Transforms a stream of objects to DAGNodes and outputs them as objects. + Transforms a stream of {Name, Hash} objects to include size + of the DAG object. Usage: inputStream.pipe(DAGNodeStream({ send: send })) diff --git a/src/request-api.js b/src/request-api.js index b980bc4ac..e7389d021 100644 --- a/src/request-api.js +++ b/src/request-api.js @@ -152,6 +152,10 @@ exports = module.exports = (config) => { return requestAPI(config, options, callback) } + // Send a HTTP request and pass via a transform function + // to convert the response data to wanted format before + // returning it to the callback. + // Eg. send.andTransform({}, (e) => JSON.parse(e), (err, res) => ...) send.andTransform = (options, transform, callback) => { return send(options, (err, res) => { if (err) { From b12abf2eed609255766ef3561288b4e26a05563e Mon Sep 17 00:00:00 2001 From: haad Date: Mon, 19 Dec 2016 09:57:42 +0100 Subject: [PATCH 03/15] Change variable name to be more descriptive in dagnode-stream --- src/dagnode-stream.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dagnode-stream.js b/src/dagnode-stream.js index 150ddfff6..19b9b073e 100644 --- a/src/dagnode-stream.js +++ b/src/dagnode-stream.js @@ -41,13 +41,13 @@ class DAGNodeStream extends TransformStream { return callback(err) } - const dag = { + const result = { path: obj.Name, hash: obj.Hash, size: node.size } - this.push(dag) + this.push(result) callback(null) }) } From 220ca6d9d84f6eb44425491910b642c4b842ea29 Mon Sep 17 00:00:00 2001 From: haad Date: Mon, 19 Dec 2016 10:05:21 +0100 Subject: [PATCH 04/15] Refactor tar-stream-to-objects to better show intent --- src/api/get.js | 2 +- src/tar-stream-to-objects.js | 63 ++++++++++++++++++------------------ 2 files changed, 33 insertions(+), 32 deletions(-) diff --git a/src/api/get.js b/src/api/get.js index 9634b7235..3b4c0c983 100644 --- a/src/api/get.js +++ b/src/api/get.js @@ -33,6 +33,6 @@ module.exports = (send) => { } // Convert the response stream to TarStream objects - send.andTransform(request, TarStreamToObjects.from, callback) + send.andTransform(request, TarStreamToObjects, callback) }) } diff --git a/src/tar-stream-to-objects.js b/src/tar-stream-to-objects.js index aabea2745..b68c887fb 100644 --- a/src/tar-stream-to-objects.js +++ b/src/tar-stream-to-objects.js @@ -2,44 +2,45 @@ const tar = require('tar-stream') const ReadableStream = require('readable-stream').Readable -/* - Transform tar stream into a stream of objects: - Output format: - { path: 'string', content: Readable } -*/ -class TarStreamToObjects extends ReadableStream { +class ObjectsStreams extends ReadableStream { constructor (options) { const opts = Object.assign(options || {}, { objectMode: true }) super(opts) } - static from (inputStream, callback) { - let outputStream = new TarStreamToObjects() - - inputStream - .pipe(tar.extract()) - .on('entry', (header, stream, next) => { - stream.on('end', next) - - if (header.type !== 'directory') { - outputStream.push({ - path: header.name, - content: stream - }) - } else { - outputStream.push({ - path: header.name - }) - stream.resume() - } - }) - .on('finish', () => outputStream.push(null)) - - callback(null, outputStream) - } + _read () {} +} - _read () {} +/* + Transform a tar stream into a stream of objects: + + Output format: + { path: 'string', content: Stream } +*/ +const TarStreamToObjects = (inputStream, callback) => { + let outputStream = new ObjectsStreams() + + inputStream + .pipe(tar.extract()) + .on('entry', (header, stream, next) => { + stream.on('end', next) + + if (header.type !== 'directory') { + outputStream.push({ + path: header.name, + content: stream + }) + } else { + outputStream.push({ + path: header.name + }) + stream.resume() + } + }) + .on('finish', () => outputStream.push(null)) + + callback(null, outputStream) } module.exports = TarStreamToObjects From fd1ea278ffaddd15b04d2850f3c9a19a7df2d2b3 Mon Sep 17 00:00:00 2001 From: haad Date: Mon, 19 Dec 2016 12:31:40 +0100 Subject: [PATCH 05/15] Use stream-to-value in Block command instead of bl --- package.json | 4 +-- src/api/block.js | 55 ++++++++++++++++++++---------------- src/api/log.js | 4 ++- src/api/object.js | 4 +-- src/dagnode-stream.js | 5 +++- src/request-api.js | 4 ++- src/stream-to-value.js | 6 +++- src/tar-stream-to-objects.js | 8 ++++-- 8 files changed, 54 insertions(+), 36 deletions(-) diff --git a/package.json b/package.json index f51d41c88..f3644da6b 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,6 @@ }, "dependencies": { "async": "^2.1.4", - "bl": "^1.1.2", "bs58": "^3.1.0", "concat-stream": "^1.6.0", "detect-node": "^2.0.3", @@ -43,6 +42,7 @@ "peer-id": "^0.8.1", "peer-info": "^0.8.1", "promisify-es6": "^1.0.2", + "pump": "^1.0.1", "qs": "^6.3.0", "readable-stream": "1.1.14", "stream-http": "^2.5.0", @@ -117,4 +117,4 @@ "url": "https://github.com/ipfs/js-ipfs-api/issues" }, "homepage": "https://github.com/ipfs/js-ipfs-api" -} \ No newline at end of file +} diff --git a/src/api/block.js b/src/api/block.js index 35cc8af3e..73d745063 100644 --- a/src/api/block.js +++ b/src/api/block.js @@ -1,10 +1,10 @@ 'use strict' const promisify = require('promisify-es6') -const bl = require('bl') const Block = require('ipfs-block') const multihash = require('multihashes') const CID = require('cids') +const streamToValue = require('../stream-to-value') module.exports = (send) => { return { @@ -21,25 +21,27 @@ module.exports = (send) => { opts = {} } - return send({ - path: 'block/get', - args: args, - qs: opts - }, (err, res) => { - if (err) { - return callback(err) - } + // Transform the response from Buffer or a Stream to a Block + const transform = (res, callback) => { if (Buffer.isBuffer(res)) { callback(null, new Block(res)) } else { - res.pipe(bl((err, data) => { + streamToValue(res, (err, data) => { if (err) { return callback(err) } callback(null, new Block(data)) - })) + }) } - }) + } + + const request = { + path: 'block/get', + args: args, + qs: opts + } + + send.andTransform(request, transform, callback) }), stat: promisify((args, opts, callback) => { // TODO this needs to be adjusted with the new go-ipfs http-api @@ -51,19 +53,22 @@ module.exports = (send) => { callback = opts opts = {} } - return send({ + + const request = { path: 'block/stat', args: args, qs: opts - }, (err, stats) => { - if (err) { - return callback(err) - } + } + + // Transform the response from { Key, Size } objects to { key, size } objects + const transform = (stats, callback) => { callback(null, { key: stats.Key, size: stats.Size }) - }) + } + + send.andTransform(request, transform, callback) }), put: promisify((block, cid, callback) => { // TODO this needs to be adjusted with the new go-ipfs http-api @@ -81,15 +86,15 @@ module.exports = (send) => { block = block.data } - return send({ + const request = { path: 'block/put', files: block - }, (err, blockInfo) => { - if (err) { - return callback(err) - } - callback(null, new Block(block)) - }) + } + + // Transform the response to a Block + const transform = (blockInfo, callback) => callback(null, new Block(block)) + + send.andTransform(request, transform, callback) }) } } diff --git a/src/api/log.js b/src/api/log.js index f7a77aafd..74c750d5e 100644 --- a/src/api/log.js +++ b/src/api/log.js @@ -1,5 +1,6 @@ 'use strict' +const pump = require('pump') const ndjson = require('ndjson') const promisify = require('promisify-es6') @@ -12,7 +13,8 @@ module.exports = (send) => { if (err) { return callback(err) } - callback(null, response.pipe(ndjson.parse())) + const outputStream = pump(response, ndjson.parse()) + callback(null, outputStream) }) }) } diff --git a/src/api/object.js b/src/api/object.js index 1f403bc03..f7cd675a8 100644 --- a/src/api/object.js +++ b/src/api/object.js @@ -5,7 +5,7 @@ const DAGNode = dagPB.DAGNode const DAGLink = dagPB.DAGLink const promisify = require('promisify-es6') const bs58 = require('bs58') -const bl = require('bl') +const streamToValue = require('../stream-to-value') const cleanMultihash = require('../clean-multihash') const LRU = require('lru-cache') const lruOptions = { @@ -188,7 +188,7 @@ module.exports = (send) => { } if (typeof result.pipe === 'function') { - result.pipe(bl(callback)) + streamToValue(result, callback) } else { callback(null, result) } diff --git a/src/dagnode-stream.js b/src/dagnode-stream.js index 19b9b073e..bba259cb3 100644 --- a/src/dagnode-stream.js +++ b/src/dagnode-stream.js @@ -1,5 +1,6 @@ 'use strict' +const pump = require('pump') const TransformStream = require('readable-stream').Transform const streamToValue = require('./stream-to-value') const getDagNode = require('./get-dagnode') @@ -31,7 +32,9 @@ class DAGNodeStream extends TransformStream { } static streamToValue (send, inputStream, callback) { - const outputStream = inputStream.pipe(new DAGNodeStream({ send: send })) + const outputStream = pump(inputStream, new DAGNodeStream({ send: send }), (err) => { + if (err) callback(err) + }) streamToValue(outputStream, callback) } diff --git a/src/request-api.js b/src/request-api.js index e7389d021..b12e44cdd 100644 --- a/src/request-api.js +++ b/src/request-api.js @@ -3,6 +3,7 @@ const Qs = require('qs') const isNode = require('detect-node') const ndjson = require('ndjson') +const pump = require('pump') const once = require('once') const getFilesStream = require('./get-files-stream') const streamToValue = require('./stream-to-value') @@ -43,7 +44,8 @@ function onRes (buffer, cb) { // Return a stream of JSON objects if (chunkedObjects && isJson) { - return cb(null, res.pipe(ndjson.parse())) + const outputStream = pump(res, ndjson.parse()) + return cb(null, outputStream) } // Return a JSON object diff --git a/src/stream-to-value.js b/src/stream-to-value.js index 66e8e2ad6..6cf0c1ec5 100644 --- a/src/stream-to-value.js +++ b/src/stream-to-value.js @@ -1,12 +1,16 @@ 'use strict' +const pump = require('pump') const concat = require('concat-stream') /* Concatenate a stream to a single value. */ function streamToValue (res, callback) { - res.pipe(concat((data) => callback(null, data))) + const done = (data) => callback(null, data) + pump(res, concat(done), (err) => { + if (err) callback(err) + }) } module.exports = streamToValue diff --git a/src/tar-stream-to-objects.js b/src/tar-stream-to-objects.js index b68c887fb..6d7765a03 100644 --- a/src/tar-stream-to-objects.js +++ b/src/tar-stream-to-objects.js @@ -1,5 +1,6 @@ 'use strict' +const pump = require('pump') const tar = require('tar-stream') const ReadableStream = require('readable-stream').Readable @@ -9,7 +10,7 @@ class ObjectsStreams extends ReadableStream { super(opts) } - _read () {} + _read () {} } /* @@ -20,9 +21,9 @@ class ObjectsStreams extends ReadableStream { */ const TarStreamToObjects = (inputStream, callback) => { let outputStream = new ObjectsStreams() + let extractStream = tar.extract() - inputStream - .pipe(tar.extract()) + extractStream .on('entry', (header, stream, next) => { stream.on('end', next) @@ -40,6 +41,7 @@ const TarStreamToObjects = (inputStream, callback) => { }) .on('finish', () => outputStream.push(null)) + pump(inputStream, extractStream) callback(null, outputStream) } From db91f3ccf5ae22a698bf04a4170f2245aa23f5a4 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Mon, 19 Dec 2016 13:15:50 +0100 Subject: [PATCH 06/15] test: fix daemon config setting --- test/setup/spawn-daemons.js | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/test/setup/spawn-daemons.js b/test/setup/spawn-daemons.js index 2131fed82..a747ffc2f 100644 --- a/test/setup/spawn-daemons.js +++ b/test/setup/spawn-daemons.js @@ -28,15 +28,13 @@ function startDisposableDaemons (callback) { const configValues = { Bootstrap: [], Discovery: {}, - API: { - 'HTTPHeaders.Access-Control-Allow-Origin': ['*'], - 'HTTPHeaders.Access-Control-Allow-Credentials': 'true', - 'HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET'] - } + 'API.HTTPHeaders.Access-Control-Allow-Origin': ['*'], + 'API.HTTPHeaders.Access-Control-Allow-Credentials': 'true', + 'API.HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET'] } eachSeries(Object.keys(configValues), (configKey, cb) => { - nodes[key].setConfig(`API.${configKey}`, JSON.stringify(configValues[configKey]), cb) + nodes[key].setConfig(configKey, JSON.stringify(configValues[configKey]), cb) }, (err) => { if (err) { return cb(err) From 241df6d7179a9c36190933b2df9608400667b7f7 Mon Sep 17 00:00:00 2001 From: haad Date: Wed, 23 Nov 2016 17:38:50 +0100 Subject: [PATCH 07/15] refactor(pubsub): Refactored and completed pubsub API --- package.json | 3 +- src/api/pubsub.js | 98 ++++++++++++++++++++++++++++++++++ src/load-commands.js | 1 + src/pubsub-message-stream.js | 30 +++++++++++ src/pubsub-message-utils.js | 47 ++++++++++++++++ src/pubsub-message.js | 28 ++++++++++ test/factory/daemon-spawner.js | 16 +++--- test/setup/spawn-daemons.js | 4 +- 8 files changed, 217 insertions(+), 10 deletions(-) create mode 100644 src/api/pubsub.js create mode 100644 src/pubsub-message-stream.js create mode 100644 src/pubsub-message-utils.js create mode 100644 src/pubsub-message.js diff --git a/package.json b/package.json index f3644da6b..b3b957061 100644 --- a/package.json +++ b/package.json @@ -34,6 +34,7 @@ "ipld-dag-pb": "^0.9.3", "is-ipfs": "^0.2.1", "isstream": "^0.1.2", + "js-base64": "^2.1.9", "lru-cache": "^4.0.2", "multiaddr": "^2.1.1", "multipart-stream": "^2.0.1", @@ -64,7 +65,7 @@ "gulp": "^3.9.1", "hapi": "^16.0.1", "interface-ipfs-core": "^0.22.1", - "ipfsd-ctl": "^0.17.0", + "@haad/ipfsd-ctl": "^0.18.0-beta.5", "pre-commit": "^1.2.0", "socket.io": "^1.7.1", "socket.io-client": "^1.7.1", diff --git a/src/api/pubsub.js b/src/api/pubsub.js new file mode 100644 index 000000000..b64e2970a --- /dev/null +++ b/src/api/pubsub.js @@ -0,0 +1,98 @@ +'use strict' + +const promisify = require('promisify-es6') +const PubsubMessageStream = require('../pubsub-message-stream') +const stringlistToArray = require('../stringlist-to-array') + +/* Internal subscriptions state and functions */ +let subscriptions = {} + +const addSubscription = (topic, request) => { + subscriptions[topic] = { request: request } +} + +const removeSubscription = promisify((topic, callback) => { + if (!subscriptions[topic]) { + return callback(new Error(`Not subscribed to ${topic}`)) + } + + subscriptions[topic].request.abort() + delete subscriptions[topic] + + if (callback) { + callback(null) + } +}) + +/* Public API */ +module.exports = (send) => { + return { + subscribe: promisify((topic, options, callback) => { + const defaultOptions = { + discover: false + } + + if (typeof options === 'function') { + callback = options + options = defaultOptions + } + + if (!options) { + options = defaultOptions + } + + // If we're already subscribed, return an error + if (subscriptions[topic]) { + return callback(new Error(`Already subscribed to '${topic}'`)) + } + + // Request params + const request = { + path: 'pubsub/sub', + args: [topic], + qs: { discover: options.discover } + } + + // Start the request and transform the response stream to Pubsub messages stream + const req = send.andTransform(request, PubsubMessageStream.from, (err, stream) => { + if (err) { + return callback(err) + } + // Add a cancel method to the stream so that the subscription can be cleanly cancelled + stream.cancel = promisify((cb) => removeSubscription(topic, cb)) + // Add the request to the active subscriptions and return the stream + addSubscription(topic, req) + callback(null, stream) + }) + }), + publish: promisify((topic, data, callback) => { + const buf = Buffer.isBuffer(data) ? data : new Buffer(data) + + const request = { + path: 'pubsub/pub', + args: [topic, buf] + } + + send(request, callback) + }), + ls: promisify((callback) => { + const request = { + path: 'pubsub/ls' + } + + send.andTransform(request, stringlistToArray, callback) + }), + peers: promisify((topic, callback) => { + if (!subscriptions[topic]) { + return callback(new Error(`Not subscribed to '${topic}'`)) + } + + const request = { + path: 'pubsub/peers', + args: [topic] + } + + send.andTransform(request, stringlistToArray, callback) + }) + } +} diff --git a/src/load-commands.js b/src/load-commands.js index b69c197cb..1246ef5ef 100644 --- a/src/load-commands.js +++ b/src/load-commands.js @@ -25,6 +25,7 @@ function requireCommands () { refs: require('./api/refs'), repo: require('./api/repo'), swarm: require('./api/swarm'), + pubsub: require('./api/pubsub'), update: require('./api/update'), version: require('./api/version') } diff --git a/src/pubsub-message-stream.js b/src/pubsub-message-stream.js new file mode 100644 index 000000000..570c87b48 --- /dev/null +++ b/src/pubsub-message-stream.js @@ -0,0 +1,30 @@ +'use strict' + +const TransformStream = require('readable-stream').Transform +const PubsubMessage = require('./pubsub-message-utils') + +class PubsubMessageStream extends TransformStream { + constructor (options) { + const opts = Object.assign(options || {}, { objectMode: true }) + super(opts) + } + + static from (inputStream, callback) { + let outputStream = inputStream.pipe(new PubsubMessageStream()) + inputStream.on('end', () => outputStream.emit('end')) + callback(null, outputStream) + } + + _transform (obj, enc, callback) { + try { + const message = PubsubMessage.deserialize(obj, 'base64') + this.push(message) + } catch (e) { + // Not a valid pubsub message + // go-ipfs returns '{}' as the very first object atm, we skip that + } + callback() + } +} + +module.exports = PubsubMessageStream diff --git a/src/pubsub-message-utils.js b/src/pubsub-message-utils.js new file mode 100644 index 000000000..540165889 --- /dev/null +++ b/src/pubsub-message-utils.js @@ -0,0 +1,47 @@ +'use strict' + +const Base58 = require('bs58') +const Base64 = require('js-base64').Base64 +const PubsubMessage = require('./pubsub-message') + +class PubsubMessageUtils { + static create (senderId, data, seqNo, topics) { + return new PubsubMessage(senderId, data, seqNo, topics) + } + + static deserialize (data, enc = 'json') { + enc = enc ? enc.toLowerCase() : null + + if (enc === 'json') { + return PubsubMessageUtils._deserializeFromJson(data) + } else if (enc === 'base64') { + return PubsubMessageUtils._deserializeFromBase64(data) + } + + throw new Error(`Unsupported encoding: '${enc}'`) + } + + static _deserializeFromJson (data) { + const json = JSON.parse(data) + return PubsubMessageUtils._deserializeFromBase64(json) + } + + static _deserializeFromBase64 (obj) { + if (!PubsubMessageUtils._isPubsubMessage(obj)) { + throw new Error(`Not a pubsub message`) + } + + const senderId = Base58.encode(obj.from) + const payload = Base64.decode(obj.data) + const seqno = Base64.decode(obj.seqno) + const topics = obj.topicIDs + + return PubsubMessageUtils.create(senderId, payload, seqno, topics) + } + + static _isPubsubMessage (obj) { + return obj && obj.from && obj.seqno && obj.data && obj.topicIDs + } +} + +module.exports = PubsubMessageUtils diff --git a/src/pubsub-message.js b/src/pubsub-message.js new file mode 100644 index 000000000..0d209bfa4 --- /dev/null +++ b/src/pubsub-message.js @@ -0,0 +1,28 @@ +'use strict' + +class PubsubMessage { + constructor (senderId, data, seqNo, topics) { + this._senderId = senderId + this._data = data + this._seqNo = seqNo + this._topics = topics + } + + get from () { + return this._senderId + } + + get data () { + return this._data + } + + get seqno () { + return this._seqNo + } + + get topicIDs () { + return this._topics + } +} + +module.exports = PubsubMessage diff --git a/test/factory/daemon-spawner.js b/test/factory/daemon-spawner.js index 86a2d808b..ca2ee4892 100644 --- a/test/factory/daemon-spawner.js +++ b/test/factory/daemon-spawner.js @@ -1,7 +1,7 @@ 'use strict' // const defaultConfig = require('./default-config.json') -const ipfsd = require('ipfsd-ctl') +const ipfsd = require('@haad/ipfsd-ctl') const series = require('async/series') const eachSeries = require('async/eachSeries') const once = require('once') @@ -73,17 +73,19 @@ function spawnEphemeralNode (callback) { (cb) => { const configValues = { Bootstrap: [], - Discovery: {}, - 'HTTPHeaders.Access-Control-Allow-Origin': ['*'], - 'HTTPHeaders.Access-Control-Allow-Credentials': 'true', - 'HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET'] + // Discovery: {}, + API: { + 'HTTPHeaders.Access-Control-Allow-Origin': ['*'], + 'HTTPHeaders.Access-Control-Allow-Credentials': 'true', + 'HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET'] + } } eachSeries(Object.keys(configValues), (configKey, cb) => { - node.setConfig(`API.${configKey}`, JSON.stringify(configValues[configKey]), cb) + node.setConfig(`${configKey}`, JSON.stringify(configValues[configKey]), cb) }, cb) }, - (cb) => node.startDaemon(cb) + (cb) => node.startDaemon(['--enable-pubsub-experiment'], cb) ], (err) => { if (err) { return callback(err) diff --git a/test/setup/spawn-daemons.js b/test/setup/spawn-daemons.js index a747ffc2f..8aa53931f 100644 --- a/test/setup/spawn-daemons.js +++ b/test/setup/spawn-daemons.js @@ -5,7 +5,7 @@ const gulp = require('gulp') const fs = require('fs') const path = require('path') -const ipfsd = require('ipfsd-ctl') +const ipfsd = require('@haad/ipfsd-ctl') const eachSeries = require('async/eachSeries') const parallel = require('async/parallel') @@ -40,7 +40,7 @@ function startDisposableDaemons (callback) { return cb(err) } - nodes[key].startDaemon(cb) + nodes[key].startDaemon(['--enable-pubsub-experiment'], cb) }) }) } From 85805544bd195ca26b07e687d31171da61c65ad9 Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 6 Dec 2016 19:26:56 +0000 Subject: [PATCH 08/15] fix: added missing test files --- .../pubsub-message.spec.js | 26 +++++++++++++++++++ test/interface-ipfs-core/pubsub.spec.js | 20 ++++++++++++++ 2 files changed, 46 insertions(+) create mode 100644 test/interface-ipfs-core/pubsub-message.spec.js create mode 100644 test/interface-ipfs-core/pubsub.spec.js diff --git a/test/interface-ipfs-core/pubsub-message.spec.js b/test/interface-ipfs-core/pubsub-message.spec.js new file mode 100644 index 000000000..e82686b6a --- /dev/null +++ b/test/interface-ipfs-core/pubsub-message.spec.js @@ -0,0 +1,26 @@ +/* eslint-env mocha */ + +'use strict' + +const test = require('interface-ipfs-core') +const FactoryClient = require('../factory/factory-client') +const PubsubMessage = require('../../src/pubsub-message') +const PubsubMessageUtils = require('../../src/pubsub-message-utils') + +let fc + +const common = { + setup: function (callback) { + fc = new FactoryClient() + callback(null, fc) + }, + teardown: function (callback) { + fc.dismantle(callback) + } +} + +// Pass the components down to the tests +test.pubsubMessage(common, { + PubsubMessageUtils: PubsubMessageUtils, + PubsubMessage: PubsubMessage, +}) diff --git a/test/interface-ipfs-core/pubsub.spec.js b/test/interface-ipfs-core/pubsub.spec.js new file mode 100644 index 000000000..886d7db56 --- /dev/null +++ b/test/interface-ipfs-core/pubsub.spec.js @@ -0,0 +1,20 @@ +/* eslint-env mocha */ + +'use strict' + +const test = require('interface-ipfs-core') +const FactoryClient = require('../factory/factory-client') + +let fc + +const common = { + setup: function (callback) { + fc = new FactoryClient() + callback(null, fc) + }, + teardown: function (callback) { + fc.dismantle(callback) + } +} + +test.pubsub(common) From 91cbfa81ff44def7d21a3922194c2a33a06e128d Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 6 Dec 2016 20:05:21 +0000 Subject: [PATCH 09/15] feat: move pubsub message tests to here, they are not interface-ipfs-core --- .../pubsub-message.spec.js | 26 ------ test/ipfs-api/pubsub-message.spec.js | 87 +++++++++++++++++++ 2 files changed, 87 insertions(+), 26 deletions(-) delete mode 100644 test/interface-ipfs-core/pubsub-message.spec.js create mode 100644 test/ipfs-api/pubsub-message.spec.js diff --git a/test/interface-ipfs-core/pubsub-message.spec.js b/test/interface-ipfs-core/pubsub-message.spec.js deleted file mode 100644 index e82686b6a..000000000 --- a/test/interface-ipfs-core/pubsub-message.spec.js +++ /dev/null @@ -1,26 +0,0 @@ -/* eslint-env mocha */ - -'use strict' - -const test = require('interface-ipfs-core') -const FactoryClient = require('../factory/factory-client') -const PubsubMessage = require('../../src/pubsub-message') -const PubsubMessageUtils = require('../../src/pubsub-message-utils') - -let fc - -const common = { - setup: function (callback) { - fc = new FactoryClient() - callback(null, fc) - }, - teardown: function (callback) { - fc.dismantle(callback) - } -} - -// Pass the components down to the tests -test.pubsubMessage(common, { - PubsubMessageUtils: PubsubMessageUtils, - PubsubMessage: PubsubMessage, -}) diff --git a/test/ipfs-api/pubsub-message.spec.js b/test/ipfs-api/pubsub-message.spec.js new file mode 100644 index 000000000..51c00e675 --- /dev/null +++ b/test/ipfs-api/pubsub-message.spec.js @@ -0,0 +1,87 @@ +/* eslint-env mocha */ +'use strict' + +const expect = require('chai').expect +// const PubsubMessage = require('../../src/pubsub-message') +const PubsubMessageUtils = require('../../src/pubsub-message-utils') + +// NOTE! +// (Most of) these tests are skipped for now until we figure out the +// final data types for the messages coming over the wire + +const topicName = 'js-ipfs-api-tests' + +describe('.pubsub-message', () => { + it.skip('create message', () => { + // TODO + }) + + it.skip('deserialize message from JSON object', () => { + const obj = { + from: 'BI:ۛv�m�uyѱ����tU�+��#���V', + data: 'aGk=', + seqno: 'FIlj2BpyEgI=', + topicIDs: [ topicName ] + } + try { + const message = PubsubMessageUtils.deserialize(obj) + expect(message.from).to.equal('AAA') + expect(message.data).to.equal('hi') + expect(message.seqno).to.equal('\u0014�c�\u001ar\u0012\u0002') + expect(message.topicIDs.length).to.equal(1) + expect(message.topicIDs[0]).to.equal(topicName) + } catch (e) { + expect(e).to.not.exist + } + }) + + describe('immutable properties', () => { + const sender = 'A' + const data = 'hello' + const seqno = '123' + const topicIDs = ['hello world'] + + const message = PubsubMessageUtils.create(sender, data, seqno, topicIDs) + + it('from', () => { + try { + message.from = 'not allowed' + } catch (e) { + expect(e).to.be.an('error') + expect(e.toString()).to.equal(`TypeError: Cannot set property from of # which has only a getter`) + } + expect(message.from).to.equal(sender) + }) + + it('data', () => { + try { + message.data = 'not allowed' + } catch (e) { + expect(e).to.be.an('error') + expect(e.toString()).to.equal(`TypeError: Cannot set property data of # which has only a getter`) + } + expect(message.data).to.equal(data) + }) + + it('seqno', () => { + try { + message.seqno = 'not allowed' + } catch (e) { + expect(e).to.be.an('error') + expect(e.toString()).to.equal(`TypeError: Cannot set property seqno of # which has only a getter`) + } + expect(message.seqno).to.equal(seqno) + }) + + it('topicIDs', () => { + try { + message.topicIDs = ['not allowed'] + } catch (e) { + expect(e).to.be.an('error') + expect(e.toString()).to.equal(`TypeError: Cannot set property topicIDs of # which has only a getter`) + } + expect(message.topicIDs[0]).to.equal(topicIDs[0]) + expect(message.topicIDs.length).to.equal(topicIDs.length) + }) + }) +}) From 86b18ce36ca5f78185346f7235eab1932d4dd1f0 Mon Sep 17 00:00:00 2001 From: David Dias Date: Thu, 8 Dec 2016 15:04:44 -0800 Subject: [PATCH 10/15] fix: mdns should be off --- test/factory/daemon-spawner.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/factory/daemon-spawner.js b/test/factory/daemon-spawner.js index ca2ee4892..a43249e3f 100644 --- a/test/factory/daemon-spawner.js +++ b/test/factory/daemon-spawner.js @@ -73,7 +73,9 @@ function spawnEphemeralNode (callback) { (cb) => { const configValues = { Bootstrap: [], - // Discovery: {}, + // Do not use discovery to avoid connecting to + // other nodes by mistake + Discovery: {}, API: { 'HTTPHeaders.Access-Control-Allow-Origin': ['*'], 'HTTPHeaders.Access-Control-Allow-Credentials': 'true', From cf1015a3a41f2d62051cf04361a9924e12e08dfe Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Sun, 18 Dec 2016 22:24:59 +0100 Subject: [PATCH 11/15] pubsub:handle topicCIDs and topicIDs --- src/pubsub-message-utils.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pubsub-message-utils.js b/src/pubsub-message-utils.js index 540165889..6b34f89af 100644 --- a/src/pubsub-message-utils.js +++ b/src/pubsub-message-utils.js @@ -34,13 +34,13 @@ class PubsubMessageUtils { const senderId = Base58.encode(obj.from) const payload = Base64.decode(obj.data) const seqno = Base64.decode(obj.seqno) - const topics = obj.topicIDs + const topics = obj.topicIDs || obj.topicCIDs return PubsubMessageUtils.create(senderId, payload, seqno, topics) } static _isPubsubMessage (obj) { - return obj && obj.from && obj.seqno && obj.data && obj.topicIDs + return obj && obj.from && obj.seqno && obj.data && (obj.topicIDs || obj.topicCIDs) } } From af2b0969deea63c2a09e95a52ad043d88088c809 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Mon, 19 Dec 2016 17:24:18 +0100 Subject: [PATCH 12/15] refactor to new event based pubsub api --- package.json | 1 - src/api/pubsub.js | 113 +++++++++++++++++---------- src/pubsub-message-stream.js | 7 +- src/pubsub-message-utils.js | 51 +++++------- src/pubsub-message.js | 28 ------- test/factory/daemon-spawner.js | 2 +- test/ipfs-api/pubsub-message.spec.js | 76 ++---------------- test/setup/spawn-daemons.js | 2 +- 8 files changed, 107 insertions(+), 173 deletions(-) delete mode 100644 src/pubsub-message.js diff --git a/package.json b/package.json index b3b957061..4fed6c3f3 100644 --- a/package.json +++ b/package.json @@ -34,7 +34,6 @@ "ipld-dag-pb": "^0.9.3", "is-ipfs": "^0.2.1", "isstream": "^0.1.2", - "js-base64": "^2.1.9", "lru-cache": "^4.0.2", "multiaddr": "^2.1.1", "multipart-stream": "^2.0.1", diff --git a/src/api/pubsub.js b/src/api/pubsub.js index b64e2970a..2a8fef472 100644 --- a/src/api/pubsub.js +++ b/src/api/pubsub.js @@ -1,39 +1,26 @@ 'use strict' const promisify = require('promisify-es6') +const EventEmitter = require('events') +const eos = require('end-of-stream') const PubsubMessageStream = require('../pubsub-message-stream') const stringlistToArray = require('../stringlist-to-array') /* Internal subscriptions state and functions */ -let subscriptions = {} - -const addSubscription = (topic, request) => { - subscriptions[topic] = { request: request } -} - -const removeSubscription = promisify((topic, callback) => { - if (!subscriptions[topic]) { - return callback(new Error(`Not subscribed to ${topic}`)) - } - - subscriptions[topic].request.abort() - delete subscriptions[topic] - - if (callback) { - callback(null) - } -}) +const ps = new EventEmitter() +const subscriptions = {} /* Public API */ module.exports = (send) => { return { - subscribe: promisify((topic, options, callback) => { + subscribe: (topic, options, handler, callback) => { const defaultOptions = { discover: false } if (typeof options === 'function') { - callback = options + callback = handler + handler = options options = defaultOptions } @@ -41,30 +28,27 @@ module.exports = (send) => { options = defaultOptions } - // If we're already subscribed, return an error - if (subscriptions[topic]) { - return callback(new Error(`Already subscribed to '${topic}'`)) + // promisify doesn't work as we always pass a + // function as last argument (`handler`) + if (!callback) { + return new Promise((resolve, reject) => { + subscribe(topic, options, handler, (err) => { + if (err) { + return reject(err) + } + resolve() + }) + }) } - // Request params - const request = { - path: 'pubsub/sub', - args: [topic], - qs: { discover: options.discover } + subscribe(topic, options, handler, callback) + }, + unsubscribe (topic, handler) { + ps.removeListener(topic, handler) + if (ps.listenerCount(topic) === 0) { + subscriptions[topic].abort() } - - // Start the request and transform the response stream to Pubsub messages stream - const req = send.andTransform(request, PubsubMessageStream.from, (err, stream) => { - if (err) { - return callback(err) - } - // Add a cancel method to the stream so that the subscription can be cleanly cancelled - stream.cancel = promisify((cb) => removeSubscription(topic, cb)) - // Add the request to the active subscriptions and return the stream - addSubscription(topic, req) - callback(null, stream) - }) - }), + }, publish: promisify((topic, data, callback) => { const buf = Buffer.isBuffer(data) ? data : new Buffer(data) @@ -93,6 +77,53 @@ module.exports = (send) => { } send.andTransform(request, stringlistToArray, callback) + }), + setMaxListeners (n) { + return ps.setMaxListeners(n) + } + } + + function subscribe (topic, options, handler, callback) { + ps.on(topic, handler) + + if (subscriptions[topic]) { + return callback() + } + + // Request params + const request = { + path: 'pubsub/sub', + args: [topic], + qs: { + discover: options.discover + } + } + + // Start the request and transform the response + // stream to Pubsub messages stream + subscriptions[topic] = send.andTransform(request, PubsubMessageStream.from, (err, stream) => { + if (err) { + subscriptions[topic] = null + return callback(err) + } + + stream.on('data', (msg) => { + ps.emit(topic, msg) + }) + + stream.on('error', (err) => { + ps.emit('error', err) + }) + + eos(stream, (err) => { + if (err) { + ps.emit('error', err) + } + + subscriptions[topic] = null + }) + + callback() }) } } diff --git a/src/pubsub-message-stream.js b/src/pubsub-message-stream.js index 570c87b48..b6631726f 100644 --- a/src/pubsub-message-stream.js +++ b/src/pubsub-message-stream.js @@ -16,13 +16,16 @@ class PubsubMessageStream extends TransformStream { } _transform (obj, enc, callback) { + let msg try { - const message = PubsubMessage.deserialize(obj, 'base64') - this.push(message) + msg = PubsubMessage.deserialize(obj, 'base64') } catch (e) { // Not a valid pubsub message // go-ipfs returns '{}' as the very first object atm, we skip that + return callback() } + + this.push(msg) callback() } } diff --git a/src/pubsub-message-utils.js b/src/pubsub-message-utils.js index 6b34f89af..c38cb7233 100644 --- a/src/pubsub-message-utils.js +++ b/src/pubsub-message-utils.js @@ -1,47 +1,38 @@ 'use strict' -const Base58 = require('bs58') -const Base64 = require('js-base64').Base64 -const PubsubMessage = require('./pubsub-message') - -class PubsubMessageUtils { - static create (senderId, data, seqNo, topics) { - return new PubsubMessage(senderId, data, seqNo, topics) - } - - static deserialize (data, enc = 'json') { +module.exports = { + deserialize (data, enc = 'json') { enc = enc ? enc.toLowerCase() : null if (enc === 'json') { - return PubsubMessageUtils._deserializeFromJson(data) + return deserializeFromJson(data) } else if (enc === 'base64') { - return PubsubMessageUtils._deserializeFromBase64(data) + return deserializeFromBase64(data) } throw new Error(`Unsupported encoding: '${enc}'`) } +} - static _deserializeFromJson (data) { - const json = JSON.parse(data) - return PubsubMessageUtils._deserializeFromBase64(json) - } - - static _deserializeFromBase64 (obj) { - if (!PubsubMessageUtils._isPubsubMessage(obj)) { - throw new Error(`Not a pubsub message`) - } - - const senderId = Base58.encode(obj.from) - const payload = Base64.decode(obj.data) - const seqno = Base64.decode(obj.seqno) - const topics = obj.topicIDs || obj.topicCIDs +function deserializeFromJson (data) { + const json = JSON.parse(data) + return deserializeFromBase64(json) +} - return PubsubMessageUtils.create(senderId, payload, seqno, topics) +function deserializeFromBase64 (obj) { + if (!isPubsubMessage(obj)) { + throw new Error(`Not a pubsub message`) } - static _isPubsubMessage (obj) { - return obj && obj.from && obj.seqno && obj.data && (obj.topicIDs || obj.topicCIDs) + return { + // TODO: broken see https://github.com/ipfs/go-ipfs/issues/3522 + from: obj.from, + seqno: new Buffer(obj.seqno, 'base64'), + data: new Buffer(obj.data, 'base64'), + topicCIDs: obj.topicIDs || obj.topicCIDs } } -module.exports = PubsubMessageUtils +function isPubsubMessage (obj) { + return obj && obj.from && obj.seqno && obj.data && (obj.topicIDs || obj.topicCIDs) +} diff --git a/src/pubsub-message.js b/src/pubsub-message.js deleted file mode 100644 index 0d209bfa4..000000000 --- a/src/pubsub-message.js +++ /dev/null @@ -1,28 +0,0 @@ -'use strict' - -class PubsubMessage { - constructor (senderId, data, seqNo, topics) { - this._senderId = senderId - this._data = data - this._seqNo = seqNo - this._topics = topics - } - - get from () { - return this._senderId - } - - get data () { - return this._data - } - - get seqno () { - return this._seqNo - } - - get topicIDs () { - return this._topics - } -} - -module.exports = PubsubMessage diff --git a/test/factory/daemon-spawner.js b/test/factory/daemon-spawner.js index a43249e3f..76b6b61f5 100644 --- a/test/factory/daemon-spawner.js +++ b/test/factory/daemon-spawner.js @@ -78,7 +78,7 @@ function spawnEphemeralNode (callback) { Discovery: {}, API: { 'HTTPHeaders.Access-Control-Allow-Origin': ['*'], - 'HTTPHeaders.Access-Control-Allow-Credentials': 'true', + 'HTTPHeaders.Access-Control-Allow-Credentials': ['true'], 'HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET'] } } diff --git a/test/ipfs-api/pubsub-message.spec.js b/test/ipfs-api/pubsub-message.spec.js index 51c00e675..dbdf6b6d1 100644 --- a/test/ipfs-api/pubsub-message.spec.js +++ b/test/ipfs-api/pubsub-message.spec.js @@ -2,86 +2,24 @@ 'use strict' const expect = require('chai').expect -// const PubsubMessage = require('../../src/pubsub-message') const PubsubMessageUtils = require('../../src/pubsub-message-utils') -// NOTE! -// (Most of) these tests are skipped for now until we figure out the -// final data types for the messages coming over the wire - const topicName = 'js-ipfs-api-tests' describe('.pubsub-message', () => { - it.skip('create message', () => { - // TODO - }) - - it.skip('deserialize message from JSON object', () => { + it('deserialize message from JSON object', () => { const obj = { from: 'BI:ۛv�m�uyѱ����tU�+��#���V', data: 'aGk=', seqno: 'FIlj2BpyEgI=', topicIDs: [ topicName ] } - try { - const message = PubsubMessageUtils.deserialize(obj) - expect(message.from).to.equal('AAA') - expect(message.data).to.equal('hi') - expect(message.seqno).to.equal('\u0014�c�\u001ar\u0012\u0002') - expect(message.topicIDs.length).to.equal(1) - expect(message.topicIDs[0]).to.equal(topicName) - } catch (e) { - expect(e).to.not.exist - } - }) - - describe('immutable properties', () => { - const sender = 'A' - const data = 'hello' - const seqno = '123' - const topicIDs = ['hello world'] - - const message = PubsubMessageUtils.create(sender, data, seqno, topicIDs) - - it('from', () => { - try { - message.from = 'not allowed' - } catch (e) { - expect(e).to.be.an('error') - expect(e.toString()).to.equal(`TypeError: Cannot set property from of # which has only a getter`) - } - expect(message.from).to.equal(sender) - }) - - it('data', () => { - try { - message.data = 'not allowed' - } catch (e) { - expect(e).to.be.an('error') - expect(e.toString()).to.equal(`TypeError: Cannot set property data of # which has only a getter`) - } - expect(message.data).to.equal(data) - }) - - it('seqno', () => { - try { - message.seqno = 'not allowed' - } catch (e) { - expect(e).to.be.an('error') - expect(e.toString()).to.equal(`TypeError: Cannot set property seqno of # which has only a getter`) - } - expect(message.seqno).to.equal(seqno) - }) - it('topicIDs', () => { - try { - message.topicIDs = ['not allowed'] - } catch (e) { - expect(e).to.be.an('error') - expect(e.toString()).to.equal(`TypeError: Cannot set property topicIDs of # which has only a getter`) - } - expect(message.topicIDs[0]).to.equal(topicIDs[0]) - expect(message.topicIDs.length).to.equal(topicIDs.length) - }) + const message = PubsubMessageUtils.deserialize(obj) + expect(message.from).to.equal('AAA') + expect(message.data).to.equal('hi') + expect(message.seqno).to.equal('\u0014�c�\u001ar\u0012\u0002') + expect(message.topicIDs.length).to.equal(1) + expect(message.topicIDs[0]).to.equal(topicName) }) }) diff --git a/test/setup/spawn-daemons.js b/test/setup/spawn-daemons.js index 8aa53931f..d78f4cf95 100644 --- a/test/setup/spawn-daemons.js +++ b/test/setup/spawn-daemons.js @@ -29,7 +29,7 @@ function startDisposableDaemons (callback) { Bootstrap: [], Discovery: {}, 'API.HTTPHeaders.Access-Control-Allow-Origin': ['*'], - 'API.HTTPHeaders.Access-Control-Allow-Credentials': 'true', + 'API.HTTPHeaders.Access-Control-Allow-Credentials': ['true'], 'API.HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET'] } From 5695f39982943224febf499db6d2cfaf7aa86e75 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Mon, 19 Dec 2016 20:41:28 +0100 Subject: [PATCH 13/15] fix issues due to the new event pubsub api --- src/api/pubsub.js | 18 +++++++++++++----- test/ipfs-api/pubsub-message.spec.js | 25 ------------------------- 2 files changed, 13 insertions(+), 30 deletions(-) delete mode 100644 test/ipfs-api/pubsub-message.spec.js diff --git a/src/api/pubsub.js b/src/api/pubsub.js index 2a8fef472..7cfd0db99 100644 --- a/src/api/pubsub.js +++ b/src/api/pubsub.js @@ -6,12 +6,12 @@ const eos = require('end-of-stream') const PubsubMessageStream = require('../pubsub-message-stream') const stringlistToArray = require('../stringlist-to-array') -/* Internal subscriptions state and functions */ -const ps = new EventEmitter() -const subscriptions = {} - /* Public API */ module.exports = (send) => { + /* Internal subscriptions state and functions */ + const ps = new EventEmitter() + const subscriptions = {} + ps.id = Math.random() return { subscribe: (topic, options, handler, callback) => { const defaultOptions = { @@ -44,9 +44,16 @@ module.exports = (send) => { subscribe(topic, options, handler, callback) }, unsubscribe (topic, handler) { + if (ps.listenerCount(topic) === 0 || !subscriptions[topic]) { + throw new Error(`Not subscribed to '${topic}'`) + } + ps.removeListener(topic, handler) + + // Drop the request once we are actualy done if (ps.listenerCount(topic) === 0) { subscriptions[topic].abort() + subscriptions[topic] = null } }, publish: promisify((topic, data, callback) => { @@ -85,7 +92,6 @@ module.exports = (send) => { function subscribe (topic, options, handler, callback) { ps.on(topic, handler) - if (subscriptions[topic]) { return callback() } @@ -104,6 +110,7 @@ module.exports = (send) => { subscriptions[topic] = send.andTransform(request, PubsubMessageStream.from, (err, stream) => { if (err) { subscriptions[topic] = null + ps.removeListener(topic, handler) return callback(err) } @@ -121,6 +128,7 @@ module.exports = (send) => { } subscriptions[topic] = null + ps.removeListener(topic, handler) }) callback() diff --git a/test/ipfs-api/pubsub-message.spec.js b/test/ipfs-api/pubsub-message.spec.js deleted file mode 100644 index dbdf6b6d1..000000000 --- a/test/ipfs-api/pubsub-message.spec.js +++ /dev/null @@ -1,25 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const expect = require('chai').expect -const PubsubMessageUtils = require('../../src/pubsub-message-utils') - -const topicName = 'js-ipfs-api-tests' - -describe('.pubsub-message', () => { - it('deserialize message from JSON object', () => { - const obj = { - from: 'BI:ۛv�m�uyѱ����tU�+��#���V', - data: 'aGk=', - seqno: 'FIlj2BpyEgI=', - topicIDs: [ topicName ] - } - - const message = PubsubMessageUtils.deserialize(obj) - expect(message.from).to.equal('AAA') - expect(message.data).to.equal('hi') - expect(message.seqno).to.equal('\u0014�c�\u001ar\u0012\u0002') - expect(message.topicIDs.length).to.equal(1) - expect(message.topicIDs[0]).to.equal(topicName) - }) -}) From 3a7fc2d3ea4d797b792a215ed4aa47b60cfc0a3d Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Tue, 20 Dec 2016 14:56:37 +0100 Subject: [PATCH 14/15] pubsub.peers: do not throw on missing subscription --- src/api/pubsub.js | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/api/pubsub.js b/src/api/pubsub.js index 7cfd0db99..0f651f29a 100644 --- a/src/api/pubsub.js +++ b/src/api/pubsub.js @@ -74,10 +74,6 @@ module.exports = (send) => { send.andTransform(request, stringlistToArray, callback) }), peers: promisify((topic, callback) => { - if (!subscriptions[topic]) { - return callback(new Error(`Not subscribed to '${topic}'`)) - } - const request = { path: 'pubsub/peers', args: [topic] From dce52056932161235357aace075400a073b445a9 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Wed, 21 Dec 2016 14:51:02 +0100 Subject: [PATCH 15/15] error on non buffer arguments to pubsub.publish --- src/api/pubsub.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/api/pubsub.js b/src/api/pubsub.js index 0f651f29a..1c054fca2 100644 --- a/src/api/pubsub.js +++ b/src/api/pubsub.js @@ -57,11 +57,13 @@ module.exports = (send) => { } }, publish: promisify((topic, data, callback) => { - const buf = Buffer.isBuffer(data) ? data : new Buffer(data) + if (!Buffer.isBuffer(data)) { + return callback(new Error('data must be a Buffer')) + } const request = { path: 'pubsub/pub', - args: [topic, buf] + args: [topic, data] } send(request, callback)