diff --git a/.gitignore b/.gitignore index 3ac63ef..504afef 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,2 @@ -.env -data/ node_modules/ +package-lock.json diff --git a/README.md b/README.md index f8440ca..120c128 100644 --- a/README.md +++ b/README.md @@ -4,25 +4,25 @@ An external [bitcoind](https://github.com/bitcoin/bitcoin) index management service. + ## Indexes -By default, this module maintains script, spents, transaction block, txout and block indexes. +By default, this module includes a script, spents, transaction block, txout, tx, median time past and fee indexes. The module uses `getblockheader`, `getblockhash`, `getblock` and `getbestblockhash` RPC methods for blockchain synchronization; and `getrawmempool` for mempool synchronization. `-txindex` is not required for this module; but is still useful for individual transaction lookup (aka `txHex`). See https://github.com/bitcoinjs/indexd/issues/6 if you think an independent transaction index should be added. + ## Usage -Assumes [`yajrpc`](https://github.com/dcousens/yajrpc) is used for the bitcoind RPC; and [`leveldown`](https://github.com/level/leveldown) for the database. -See the [example](#example) for usage. +Assumes [`yajrpc`](https://github.com/dcousens/yajrpc) is used for the provided bitcoind RPC object; and [`leveldown`](https://github.com/level/leveldown) for the database object. +See the [example server](https://github.com/bitcoinjs/private-bitcoin) for an example of an express HTTP API using `indexd`. + +### Conventions +When conveying block height, `-1` represents unconfirmed (in the mempool). +`null` represents unknown or missing. -## Example -The [`example/`](https://github.com/bitcoinjs/indexd/tree/master/example) is a functioning [express](https://www.npmjs.com/package/express) REST HTTP API server. +For example, the height of the transaction `ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff` in the Bitcoin blockchain is `null` (it doesn't exist!). -* Requires a running `bitcoind` node - * with `-txindex`, and - * ZMQ (`-zmqpubhashtx=tcp://127.0.0.1:30001 -zmqpubhashblock=tcp://127.0.0.1:30001`) -* Assumes `--testnet` ports/configuration, see `example/.env` for configuration. -* Change `-rpcworkqueue` from `16` to `32` for increased throughput [in some scenarios] -## License [ISC](LICENSE) +## LICENSE [ISC](LICENSE) diff --git a/adapter.js b/adapter.js deleted file mode 100644 index b4ca609..0000000 --- a/adapter.js +++ /dev/null @@ -1,158 +0,0 @@ -let dbwrapper = require('./dbwrapper') -let { EventEmitter } = require('events') -let parallel = require('run-parallel') - -let Blockchain = require('./blockchain') -let Mempool = require('./mempool') - -function Adapter (db, rpc) { - this.db = dbwrapper(db) - this.emitter = new EventEmitter() - this.emitter.setMaxListeners(Infinity) - - this.blockchain = new Blockchain(this.emitter, this.db, rpc) - this.mempool = new Mempool(this.emitter, rpc) -} - -Adapter.prototype.connect = function (blockId, height, callback) { - this.blockchain.connect(blockId, height, callback) -} - -Adapter.prototype.disconnect = function (blockId, callback) { - this.blockchain.disconnect(blockId, callback) -} - -// QUERIES -Adapter.prototype.blockIdByTransactionId = function (txId, callback) { - this.blockchain.blockIdByTransactionId(txId, callback) -} - -Adapter.prototype.fees = function (n, callback) { - this.blockchain.fees(n, callback) -} - -// returns whether (true/false) the script id (SHA256(script)) has even been seen -Adapter.prototype.seenScriptId = function (scId, callback) { - this.blockchain.seenScriptId(scId, (err, result) => { - if (err) return callback(err) - callback(null, result || this.mempool.seenScriptId(scId)) - }) -} - -// returns list of inputs that spends {txo}, array length is guaranteed to be 1 if confirmed [on the blockchain] -Adapter.prototype.spentsFromTxo = function (txo, callback) { - this.blockchain.spentFromTxo(txo, (err, spent) => { - if (err) return callback(err) - - // if in blockchain, ignore the mempool - if (spent) return callback(null, [spent]) - - // otherwise, could be multiple spents in the mempool - callback(null, this.mempool.spentsFromTxo(txo)) - }) -} - -// returns blockchain chain tip id -Adapter.prototype.tip = function (callback) { - this.blockchain.tip(callback) -} - -// returns blockchain chain tip height -Adapter.prototype.tipHeight = function (callback) { - this.blockchain.tipHeight(callback) -} - -// returns set of transactions associated with script id (SHA256(script)) -// minimum height can be provided if many transaction associations exist -Adapter.prototype.transactionIdsByScriptId = function (scId, height, callback, dbLimit) { - this.blockchain.transactionIdsByScriptId(scId, height, (err, txIds, position) => { - if (err) return callback(err) - - Object.assign(txIds, this.mempool.transactionIdsByScriptId(scId)) - callback(null, txIds, position) - }, dbLimit) -} - -Adapter.prototype.transactionIdListFromScriptId = function (scId, height, callback, dbLimit) { - this.blockchain.transactionIdsByScriptId(scId, height, (err, txIdSet, position) => { - if (err) return callback(err) - - let txIds = [] - for (let txId in txIdSet) txIds.push(txId) - - let txIdSet2 = this.mempool.transactionIdsByScriptId(scId) - for (let txId in txIdSet2) { - if (txIdSet[txId]) continue // prevent [impossible?] duplicates - txIds.push(txId) - } - - callback(null, txIds, position) - }, dbLimit) -} - -// returns a mapping of txos (`txid:vout`) for script id, mapping guarantees no duplicates -// the format `txid:vout`: { .., scId }, supports streamline merging with other queries -Adapter.prototype.txosByScriptId = function (scId, height, callback, dbLimit) { - let resultMap = {} - - this.blockchain.txosByScriptId(scId, height, (err, txosMap) => { - if (err) return callback(err) - - Object.assign(resultMap, txosMap, this.mempool.txosByScriptId(scId)) - callback(null, resultMap) - }, dbLimit) -} - -// returns a list of { txId, vout, scId, height }, height is undefined if from the mempool -// has a weak guarantee of no duplicates, enforced by mempool.clear in resync -Adapter.prototype.txosListByScriptId = function (scId, height, callback, dbLimit) { - this.blockchain.__txosListByScriptId(scId, height, (err, txos) => { - if (err) return callback(err) - - callback(null, txos.concat(this.mempool.__txosListByScriptId(scId))) - }, dbLimit) -} - -// returns extra txo information ({ txId, vout, value }) for the provided txo -// TODO: see #15 -Adapter.prototype.txoByTxo = function (txId, vout, callback) { - this.blockchain.txoByTxo(txId, vout, (err, txo) => { - if (err) return callback(err) - - // if in blockchain, ignore the mempool - if (txo) return callback(null, txo) - - callback(null, this.mempool.txoByTxo(txId, vout)) - }) -} - -// returns a list of unspent txos -Adapter.prototype.utxosByScriptId = function (scId, height, callback, limit) { - this.txosByScriptId(scId, height, (err, txos) => { - if (err) return callback(err) - - txos = Object.keys(txos).map(x => txos[x]) - let tasks = txos.map((txo) => { - return (next) => this.spentsFromTxo(txo, (err, spents) => { - if (err) return next(err) - if (spents.length !== 0) return next() - - this.txoByTxo(txo.txId, txo.vout, (err, txoExtra) => { - if (err) return next(err) - - next(null, Object.assign(txo, txoExtra)) - }) - }) - }) - - parallel(tasks, (err, txos) => { - if (err) return callback(err, txos) - // filter empty txos - callback(err, txos.filter(txo => txo !== undefined)) - }) - }, limit) -} - -module.exports = function makeAdapter (db, rpc) { - return new Adapter(db, rpc) -} diff --git a/blockchain.js b/blockchain.js deleted file mode 100644 index 620aaba..0000000 --- a/blockchain.js +++ /dev/null @@ -1,300 +0,0 @@ -let debug = require('./debug')('indexd:blockchain') -let parallel = require('run-parallel') -let types = require('./types') -let rpcUtil = require('./rpc') - -function Blockchain (emitter, db, rpc) { - this.emitter = emitter - this.db = db - this.rpc = rpc -} - -Blockchain.prototype.connect = function (blockId, height, callback) { - const events = [] - rpcUtil.block(this.rpc, blockId, (err, block) => { - if (err) return callback(err) - if (height !== block.height) return callback(new Error('Height mismatch')) // TODO: necessary? - - let atomic = this.db.atomic() - let { transactions } = block - - transactions.forEach((tx) => { - let { txId, txBuffer, ins, outs } = tx - - ins.forEach((input, vin) => { - if (input.coinbase) return - - let { prevTxId, vout } = input - atomic.put(types.spentIndex, { txId: prevTxId, vout }, { txId, vin }) - events.push(['spent', `${prevTxId}:${vout}`, txId]) - }) - - outs.forEach(({ scId, script, value, vout }) => { - atomic.put(types.scIndex, { scId, height, txId, vout }, null) - atomic.put(types.txoIndex, { txId, vout }, { value, script }) - events.push(['script', scId, txId, txBuffer]) - }) - - atomic.put(types.txIndex, { txId }, { height }) - events.push(['transaction', txId, txBuffer, blockId]) - }) - - // non-blocking, for events only - events.push(['block', blockId, height]) - - debug(`Putting ${blockId} @ ${height} - ${transactions.length} transactions`) - atomic.put(types.tip, {}, { blockId, height }) - atomic.write((err) => { - if (err) return callback(err) - - this.connect2ndOrder(blockId, block, (err) => { - callback(err, block.nextblockhash) - setTimeout(() => { - for (const args of events) this.emitter.emit(...args) - }) - }) - }) - }) -} - -function box (data) { - if (data.length === 0) return { q1: 0, median: 0, q3: 0 } - let quarter = (data.length / 4) | 0 - let midpoint = (data.length / 2) | 0 - - return { - q1: data[quarter], - median: data[midpoint], - q3: data[midpoint + quarter] - } -} - -Blockchain.prototype.connect2ndOrder = function (blockId, block, callback) { - let feeRates = [] - let tasks = [] - let { height, transactions } = block - - transactions.forEach(({ ins, outs, vsize }) => { - let inAccum = 0 - let outAccum = 0 - let subTasks = [] - let coinbase = false - - ins.forEach((input, vin) => { - if (input.coinbase) { - coinbase = true - return - } - - let { prevTxId, vout } = input - subTasks.push((next) => { - this.db.get(types.txoIndex, { txId: prevTxId, vout }, (err, txo) => { - if (err) return next(err) - if (!txo) return next(new Error(`Missing ${prevTxId}:${vout}`)) - - inAccum += txo.value - next() - }) - }) - }) - - outs.forEach(({ value }, vout) => { - outAccum += value - }) - - tasks.push((next) => { - if (coinbase) { - feeRates.push(0) - return next() - } - - parallel(subTasks, (err) => { - if (err) return next(err) - let fee = inAccum - outAccum - let feeRate = Math.floor(fee / vsize) - - feeRates.push(feeRate) - next() - }) - }) - }) - - debug(`Putting Order2 data ${blockId} @ ${height}`) - parallel(tasks, (err) => { - if (err) return callback(err) - - let atomic = this.db.atomic() - feeRates = feeRates.sort((a, b) => a - b) - - atomic.put(types.feeIndex, { height }, { fees: box(feeRates), size: block.size }) - atomic.write(callback) - }) -} - -Blockchain.prototype.disconnect = function (blockId, callback) { - rpcUtil.block(this.rpc, blockId, (err, block) => { - if (err) return callback(err) - - let atomic = this.db.atomic() - let { height, transactions } = block - - transactions.forEach(({ txId, ins, outs }) => { - ins.forEach((input) => { - if (input.coinbase) return - let { prevTxId, vout } = input - - atomic.del(types.spentIndex, { txId: prevTxId, vout }) - }) - - outs.forEach(({ scId }, vout) => { - atomic.del(types.scIndex, { scId, height, txId, vout }) - atomic.del(types.txoIndex, { txId, vout }) - }) - - atomic.del(types.txIndex, { txId }, { height }) - }) - - debug(`Deleting ${blockId} @ ${height} - ${transactions.length} transactions`) - atomic.put(types.tip, {}, { blockId: block.previousblockhash, height }) - atomic.write(callback) - }) -} - -// QUERIES -Blockchain.prototype.blockHeightByTransactionId = function (txId, callback) { - this.db.get(types.txIndex, { txId }, (err, row) => { - if (err) return callback(err) - if (!row) return callback() - - callback(null, row.height) - }) -} - -Blockchain.prototype.blockIdByTransactionId = function (txId, callback) { - this.db.get(types.txIndex, { txId }, (err, row) => { - if (err) return callback(err) - if (!row) return callback() - - rpcUtil.blockIdAtHeight(this.rpc, row.height, callback) - }) -} - -Blockchain.prototype.fees = function (n, callback) { - this.db.get(types.tip, {}, (err, result) => { - if (err) return callback(err) - - let maxHeight = result.height - let fresult = [] - - this.db.iterator(types.feeIndex, { - gte: { height: maxHeight - (n - 1) }, - limit: n - }, ({ height }, { fees, size }) => { - fresult.push({ height, fees, size }) - }, (err) => callback(err, fresult)) - }) -} - -let ZERO64 = '0000000000000000000000000000000000000000000000000000000000000000' -Blockchain.prototype.seenScriptId = function (scId, callback) { - let result = false - - this.db.iterator(types.scIndex, { - gte: { scId, height: 0, txId: ZERO64, vout: 0 }, - lt: { scId, height: 0xffffffff, txId: ZERO64, vout: 0 }, - limit: 1 - }, () => { - result = true - }, (err) => callback(err, result)) -} - -Blockchain.prototype.spentFromTxo = function (txo, callback) { - this.db.get(types.spentIndex, txo, callback) -} - -Blockchain.prototype.tip = function (callback) { - this.db.get(types.tip, {}, (err, tip) => { - callback(err, tip && tip.blockId) - }) -} - -Blockchain.prototype.tipHeight = function (callback) { - this.db.get(types.tip, {}, (err, tip) => { - callback(err, tip && tip.height) - }) -} - -Blockchain.prototype.transactionIdsByScriptId = function (scId, height, callback, limit) { - this.__txosListByScriptId(scId, height, (err, txos, position) => { - if (err) return callback(err) - - let tasks = txos.map((txo) => { - return (next) => this.spentFromTxo(txo, next) - }) - - parallel(tasks, (err, spents) => { - if (err) return callback(err) - - let txIdSet = {} - - spents.forEach((spent) => { - if (!spent) return - - txIdSet[spent.txId] = true - }) - - txos.forEach(({ txId }) => { - txIdSet[txId] = true - }) - - callback(null, txIdSet, position) - }) - }, limit) -} - -// TODO: public? -Blockchain.prototype.__txosListByScriptId = function (scId, height, callback, limit) { - let offset = 0 - if (Array.isArray(limit)) { - offset = limit[0] - limit = limit[1] - offset - } - limit = limit || 10000 - - let results = [] - let count = 0 - let maxHeight = height - - this.db.iterator(types.scIndex, { - gte: { scId, height, txId: ZERO64, vout: 0 }, - lt: { scId, height: 0xffffffff, txId: ZERO64, vout: 0 }, - limit: limit - }, (txo) => { - // XXX: O(offset) walk, not great - ++count - if (count < offset) return - - let { txId, vout, height } = txo - if (height > maxHeight) maxHeight = height - results.push({ txId, vout, scId, height }) - }, (err) => callback(err, results, { height, offset: count })) -} - -Blockchain.prototype.txosByScriptId = function (scId, height, callback, limit) { - limit = limit || 10000 - let resultMap = {} - - this.db.iterator(types.scIndex, { - gte: { scId, height, txId: ZERO64, vout: 0 }, - lt: { scId, height: 0xffffffff, txId: ZERO64, vout: 0 }, - limit: limit - }, ({ txId, vout, height }) => { - resultMap[`${txId}:${vout}`] = { txId, vout, scId, height } - }, (err) => callback(err, resultMap)) -} - -Blockchain.prototype.txoByTxo = function (txId, vout, callback) { - this.db.get(types.txoIndex, { txId, vout }, callback) -} - -module.exports = Blockchain diff --git a/dbwrapper.js b/dbwrapper.js index 87fbf12..2d90a95 100644 --- a/dbwrapper.js +++ b/dbwrapper.js @@ -1,9 +1,11 @@ +// let debug = require('debug')('db') let typeforce = require('typeforce') let NIL = Buffer.alloc(0) function atomic () { let batch = this.batch() +// debug('atomic') return { del: del.bind(batch), put: put.bind(batch), @@ -15,6 +17,7 @@ function del (type, key, callback) { typeforce(type.keyType, key) key = type.key.encode(key) +// debug('del', key.length) return this.del(key, callback) } @@ -37,6 +40,7 @@ function put (type, key, value, callback) { key = type.key.encode(key) value = type.value ? type.value.encode(value) : NIL +// debug('put', key.length, value.length) return this.put(key, value, callback) } @@ -56,6 +60,8 @@ function iterator (type, options, forEach, callback) { options.gte = options.gte && type.key.encode(options.gte) options.lt = options.lt && type.key.encode(options.lt) options.lte = options.lte && type.key.encode(options.lte) + if (!(options.gt || options.gte)) return callback(new RangeError('Missing minimum')) + if (!(options.lt || options.lte)) return callback(new RangeError('Missing maximum')) let iterator = this.iterator(options) @@ -66,8 +72,8 @@ function iterator (type, options, forEach, callback) { key = type.key.decode(key) value = type.value ? type.value.decode(value) : null + forEach(key, value, iterator) - forEach(key, value) iterator.next(loop) } diff --git a/example/.env b/example/.env deleted file mode 100644 index 8796519..0000000 --- a/example/.env +++ /dev/null @@ -1,8 +0,0 @@ -DEBUG=* -INDEXDB=/home/USER/.bitcoin/testnet3/indexd -NODE_ENV=development -RPC=http://127.0.0.1:18332 -RPCBATCHSIZE=500 -RPCCONCURRENT=16 -RPCCOOKIE=/home/USER/.bitcoin/testnet3/.cookie -ZMQ=tcp://127.0.0.1:30001 diff --git a/example/express.js b/example/express.js deleted file mode 100644 index 9f6e334..0000000 --- a/example/express.js +++ /dev/null @@ -1,145 +0,0 @@ -let adapter = require('./service').adapter -let bitcoin = require('bitcoinjs-lib') -let bodyParser = require('body-parser') -let express = require('express') -let handleSocket = require('./socket') -let parallel = require('run-parallel') -let rpc = require('./rpc') -let ws = require('ws') - -function Hex256bit (value) { - return typeof value === 'string' && /^[0-9a-fA-F]{64}$/.test(value) -} - -module.exports = function () { - let router = new express.Router() - - function respond (req, res, err, result, errMatch) { - if (err) console.error(req.path, err) - if (err) { - res.status(typeof err === 'number' ? err : 400) - if (errMatch && errMatch.test(err.message)) res.json(err.message) - return res.end() - } - - res.status(200) - if (result !== undefined) { - if (typeof result === 'string') res.send(result) - else if (Buffer.isBuffer(result)) res.send(result) - else res.json(result) - } - res.end() - } - - router.get('/a/:address/txs', (req, res) => { - let scId - try { - let script = bitcoin.address.toOutputScript(req.params.address) - scId = bitcoin.crypto.sha256(script).toString('hex') - } catch (e) { return respond(req, res, 400) } - - let height = parseInt(req.query.height) - if (!Number.isFinite(height)) height = 0 - - adapter.transactionIdsByScriptId(scId, height, (err, txIdSet) => { - if (err) return respond(req, res, err) - - let tasks = {} - for (let txId in txIdSet) { - tasks[txId] = (next) => rpc('getrawtransaction', [txId], next) - } - - parallel(tasks, (err, result) => respond(req, res, err, result)) - }) - }) - - router.get('/a/:address/txids', (req, res) => { - let scId - try { - let script = bitcoin.address.toOutputScript(req.params.address) - scId = bitcoin.crypto.sha256(script).toString('hex') - } catch (e) { return respond(req, res, 400) } - - let height = parseInt(req.query.height) - if (!Number.isFinite(height)) height = 0 - - adapter.transactionIdsByScriptId(scId, height, (err, txIdSet) => respond(req, res, err, Object.keys(txIdSet))) - }) - - router.get('/a/:address/instances', (req, res) => { - let scId - try { - let script = bitcoin.address.toOutputScript(req.params.address) - scId = bitcoin.crypto.sha256(script).toString('hex') - } catch (e) { return respond(req, res, 400) } - - adapter.seenScriptId(scId, (err, result) => respond(req, res, err, result)) - }) - - router.get('/a/:address/unspents', (req, res) => { - let scId - try { - let script = bitcoin.address.toOutputScript(req.params.address) - scId = bitcoin.crypto.sha256(script).toString('hex') - } catch (e) { return respond(req, res, 400) } - - adapter.utxosByScriptId(scId, (err, result) => respond(req, res, err, result)) - }) - - router.get('/t/:id', (req, res) => { - if (!Hex256bit(req.params.id)) return res.status(400).end() - - rpc('getrawtransaction', req.params.id, (err, result) => respond(req, res, err, result)) - }) - - router.get('/t/:id/block', (req, res) => { - if (!Hex256bit(req.params.id)) return res.status(400).end() - - adapter.blockIdByTransactionId(req.params.id, (err, result) => respond(req, res, err, result)) - }) - - router.post('/t/push', bodyParser.text(), (req, res) => { - rpc('sendrawtransaction', [req.body], (err) => respond(req, res, err, undefined, /./)) - }) - - router.get('/b/:id/header', (req, res) => { - if (!Hex256bit(req.params.id)) return res.status(400).end() - - rpc('getblockheader', [req.params.id, true], (err, header) => respond(req, res, err, header, /not found/)) - }) - - router.get('/b/:id/height', (req, res) => { - if (!Hex256bit(req.params.id)) return res.status(400).end() - - rpc('getblockheader', [req.params.id, false], (err, json) => respond(req, res, err, json && json.height, /not found/)) - }) - - router.get('/b/height', (req, res) => { - rpc('getblockcount', [], (err, result) => respond(req, res, err, result)) - }) - - router.get('/b/fees', (req, res) => { - let blocks = parseInt(req.query.blocks) - if (!Number.isFinite(blocks)) blocks = 12 - blocks = Math.min(blocks, 64) - - adapter.blockchain.fees(blocks, (err, results) => { - if (results) { - results.forEach((x) => { - x.kB = Math.floor(x.size / 1024) - }) - } - - respond(req, res, err, !err && results) - }) - }) - - let wss = new ws.Server({ noServer: true }) - router.use('/ws', function (req, _res, next) { - if (!req.ws) return next() - - wss.handleUpgrade(req, req.socket, [], (socket) => handleSocket(socket)) - }) - - return router -} diff --git a/example/index.js b/example/index.js deleted file mode 100644 index 4d6d78b..0000000 --- a/example/index.js +++ /dev/null @@ -1,14 +0,0 @@ -require('dotenv').load() - -let express = require('express') -let service = require('./service') -let api = require('./express') -var app = express() - -// initialize -service((err) => { - if (err) return console.error('error initializing', err) - - app.use(api()) - app.listen(3000) -}) diff --git a/example/rpc.js b/example/rpc.js deleted file mode 100644 index 030f705..0000000 --- a/example/rpc.js +++ /dev/null @@ -1,6 +0,0 @@ -module.exports = require('yajrpc/qup')({ - url: process.env.RPC, - auth: require('fs').readFileSync(process.env.RPCCOOKIE), - batch: process.env.RPCBATCHSIZE || 500, - concurrent: process.env.RPCCONCURRENT || 16 -}) diff --git a/example/service.js b/example/service.js deleted file mode 100644 index 3f4278b..0000000 --- a/example/service.js +++ /dev/null @@ -1,77 +0,0 @@ -let debug = require('debug')('example:service') -let debugZmq = require('debug')('service:zmq') -let debugZmqTx = require('debug')('service:zmq:tx') -// let indexd = require('indexd') // XXX: you should use npm published package typically -let indexd = require('../') -let leveldown = require('leveldown') -let rpc = require('./rpc') -let zmq = require('zmq') - -let db = leveldown(process.env.INDEXDB) -let adapter = indexd.makeAdapter(db, rpc) - -module.exports = function initialize (callback) { - function errorSink (err) { - if (err) debug(err) - } - - let syncing = false - function resync () { - // already in progress? - if (syncing) return - syncing = true - - indexd.resync(rpc, adapter, (err) => { - syncing = false - errorSink(err) - adapter.mempool.reset(errorSink) - }) - } - - debug(`Opening leveldb @ ${process.env.INDEXDB}`) - db.open({ - writeBufferSize: 1 * 1024 * 1024 * 1024 // 1 GiB - }, (err) => { - if (err) return callback(err) - debug(`Opened leveldb @ ${process.env.INDEXDB}`) - - let zmqSock = zmq.socket('sub') - zmqSock.connect(process.env.ZMQ) - zmqSock.subscribe('hashblock') - zmqSock.subscribe('hashtx') - - let lastSequence = 0 - zmqSock.on('message', (topic, message, sequence) => { - topic = topic.toString('utf8') - message = message.toString('hex') - sequence = sequence.readUInt32LE() - - // were any ZMQ messages were lost? - let expectedSequence = lastSequence + 1 - lastSequence = sequence - if (sequence !== expectedSequence) { - if (sequence < expectedSequence) debugZmq(`bitcoind may have restarted`) - else debugZmq(`${sequence - expectedSequence} messages lost`) - resync() - } - - switch (topic) { - case 'hashblock': { - debugZmq(topic, message) - return resync() - } - - case 'hashtx': { - debugZmqTx(topic, message) - return adapter.mempool.add(message, errorSink) - } - } - }) - - setInterval(resync, 60000) // attempt every minute - resync() - - callback() - }) -} -module.exports.adapter = adapter diff --git a/example/socket.js b/example/socket.js deleted file mode 100644 index a0cf5c4..0000000 --- a/example/socket.js +++ /dev/null @@ -1,188 +0,0 @@ -let adapter = require('./service').adapter -let debug = require('debug')('example:ws') -let rpc = require('./rpc') -let vs = require('varstruct') -let { EventEmitter } = require('events') - -// vs types -let vsHex256bit = vs.String(32, 'Hex') -let vsHashes = vs.VarArray(vs.UInt16LE, vsHex256bit) -let vsHeight = vs.UInt32LE - -let vsBlock = vs([ - ['type', vs.Value(vs.UInt8, 0x00)], - ['id', vsHex256bit], - ['height', vsHeight], - ['time', vs.UInt32LE] -]) -let vsTx = vs([ - ['type', vs.Value(vs.UInt8, 0x01)], - ['data', vs.VarBuffer(vs.UInt16LE)] -]) -let vsStatus = vs([ - ['type', vs.Value(vs.UInt8, 0x02)], - ['blockId', vsHex256bit], - ['txId', vsHex256bit] -]) - -let shared = new EventEmitter() -shared.setMaxListeners(Infinity) - -adapter.emitter.on('block', (blockId) => { - let listeners = shared.listenerCount('block') - if (!listeners) return - - rpc('getblockheader', [blockId, true], (err, header) => { - if (err || !header) return debug(err || `${blockId} not found`) - - shared.emit('block', vsBlock({ - id: header.hash, - height: header.height, - time: header.medianTime - })) - }) -}) - -adapter.emitter.on('script', (scId, _, txBuffer) => { - let listeners = shared.listenerCount(scId) - if (!listeners) return - - shared.emit(scId, vsTx(txBuffer)) -}) - -adapter.emitter.on('transaction', (txId, _, blockId) => { - let listeners = shared.listenerCount(txId) - if (!listeners) return - - shared.emit(txId, vsStatus(blockId, txId)) -}) - -module.exports = function handleSocket (socket) { - let load = 0 - let height = 0 - let watching = {} - - function send (buffer) { - socket.send(buffer) - } - - function sendBlocks (blockIds) { - blockIds.forEach((blockId) => { - rpc('getblockheader', [blockId, true], (err, header) => { - if (err || !header) return debug(err || `${blockId} not found`) - - debug(`sending block ${blockId}`) - send(vsBlock({ - id: header.hash, - height: header.height, - time: header.medianTime - })) - }) - }) - } - - function sendStatus (blockId, txId) { - debug(`sending status ${blockId}:${txId}`) - send(vsStatus({ blockId, txId })) - - load -= 1 - delete watching[txId] - } - - function sendTxs (txIds) { - txIds.forEach((txId) => { - rpc('getrawtransaction', [txId], (err, txHex) => { - if (err || !txHex) return debug(err || `${txId} not found`) - - debug(`sending tx ${txId}`) - let txBuffer = Buffer.from(txHex, 'hex') - send(vsTx(txBuffer)) - }) - }) - } - - function open () { - shared.on('block', send) - - adapter.tip((err, blockId) => { - if (err) return - sendBlocks([blockId]) - }) - } - - function reset () { - shared.removeListener('block', send) - - for (let hash in watching) { - shared.removeListener(hash, send) - shared.removeListener(hash, send) - } - } - - function kill (err) { - if (err) debug(err) - reset() - socket.terminate() - } - - function receive (buffer) { - if (!Buffer.isBuffer(buffer)) return kill(new TypeError('Expected buffer')) - if (buffer.length < 2 || buffer.length > 32000) return kill(new Error('Bad data')) - - let type = buffer[0] - let data = buffer.slice(1) - - if (type === 0x00) { - try { - height = vsHeight.decode(data) - } catch (e) { return kill(e) } - return - } - - let hashes - try { - hashes = vsHashes.decode(data) - } catch (e) { return kill(e) } - - // limit 192KiB (3000 * 32 bytes * 2 (hex)) hashes per socket - if (load > 3000) return kill(new Error('Too high load')) - - // idempotent, blocks / transactions - if (type === 0x01) return sendBlocks(hashes) - if (type === 0x02) return sendTxs(hashes) - - // transactions (status) - if (type === 0x03) { - load += hashes.length - hashes.forEach((txId) => { - watching[txId] = true - shared.on(txId, send) - - adapter.blockIdByTransactionId(txId, (err, blockId) => { - if (err) return - sendStatus(blockId, txId) - }) - }) - return - } - - // scripts - if (type === 0x04) { - load += hashes.length - hashes.forEach((scId) => { - watching[scId] = true - shared.on(scId, send) - - adapter.transactionIdListFromScriptId(scId, height, (err, txIds) => { - if (err) return - - sendTxs(txIds) - }) - }) - } - } - - socket.on('message', receive) - socket.on('close', reset) - open() -} diff --git a/index.js b/index.js index 5e86fda..4b7f015 100644 --- a/index.js +++ b/index.js @@ -1,4 +1,344 @@ -module.exports = { - makeAdapter: require('./adapter'), - resync: require('./resync') +let debug = require('./debug')('indexd') +let dbwrapper = require('./dbwrapper') +let { EventEmitter } = require('events') +let parallel = require('run-parallel') +let rpcUtil = require('./rpc') + +let FeeIndex = require('./indexes/fee') +let MtpIndex = require('./indexes/mediantime') +let ScriptIndex = require('./indexes/script') +let TxIndex = require('./indexes/tx') +let TxinIndex = require('./indexes/txin') +let TxoIndex = require('./indexes/txo') + +function txoToString ({ txId, vout }) { + return `${txId}:${vout}` } + +function Indexd (db, rpc) { + this.db = dbwrapper(db) + this.rpc = rpc + this.emitter = new EventEmitter() // TODO: bind to this + this.emitter.setMaxListeners(Infinity) + this.indexes = { + fee: new FeeIndex(), + mtp: new MtpIndex(), + script: new ScriptIndex(), + tx: new TxIndex(), + txin: new TxinIndex(), + txo: new TxoIndex() + } +} + +Indexd.prototype.tips = function (callback) { + let tasks = {} + + for (let indexName in this.indexes) { + let index = this.indexes[indexName] + tasks[indexName] = (next) => index.tip(this.db, next) + } + + parallel(tasks, callback) +} + +// recurses until `nextBlockId` is falsy +Indexd.prototype.connectFrom = function (prevBlockId, blockId, callback) { + this.tips((err, tips) => { + if (err) return callback(err) + + let todo = {} + for (let indexName in tips) { + let tip = tips[indexName] + if (tip && tip.blockId !== prevBlockId) continue + if (indexName === 'fee') { + if (!tips.txo) continue + if (tip && tips.fee.height > tips.txo.height) continue + } + + todo[indexName] = true + } + + let todoList = Object.keys(todo) + if (todoList.length === 0) return callback(new RangeError('Misconfiguration')) + + debug(`Downloading ${blockId} (for ${todoList})`) + + rpcUtil.block(this.rpc, blockId, (err, block) => { + if (err) return callback(err) + + let atomic = this.db.atomic() + let events // TODO + let { height } = block + debug(`Connecting ${blockId} @ ${height}`) + + // connect block to relevant chain tips + for (let indexName in todo) { + let index = this.indexes[indexName] + if (!index.connect) continue + + index.connect(atomic, block, events) + } + + atomic.write((err) => { + if (err) return callback(err) + debug(`Connected ${blockId} @ ${height}`) + + let self = this + function loop (err) { + if (err) return callback(err) + + // recurse until nextBlockId is falsy + if (!block.nextBlockId) return callback(null, true) + self.connectFrom(blockId, block.nextBlockId, callback) + } + + if (!todo.fee) return loop() + + debug(`Connecting ${blockId} (2nd Order)`) + let atomic2 = this.db.atomic() + this.indexes.fee.connect2ndOrder(this.db, this.indexes.txo, atomic2, block, (err) => { + if (err) return loop(err) + + debug(`Connected ${blockId} (2nd Order)`) + atomic2.write(loop) + }) + }) + }) + }) +} + +Indexd.prototype.disconnect = function (blockId, callback) { + debug(`Disconnecting ${blockId}`) + + function fin (err) { + if (err) return callback(err) + debug(`Disconnected ${blockId}`) + callback() + } + + this.tips((err, tips) => { + if (err) return fin(err) + + // TODO: fetch lazily + rpcUtil.block(this.rpc, blockId, (err, block) => { + if (err) return fin(err) + + let atomic = this.db.atomic() + + // disconnect block from relevant chain tips + for (let indexName in this.indexes) { + let index = this.indexes[indexName] + let tip = tips[indexName] + if (!tip) continue + if (tip.blockId !== block.blockId) continue + + index.disconnect(atomic, block) + } + + atomic.write(fin) + }) + }) +} + +// empties the mempool +Indexd.prototype.clear = function () { + for (let indexName in this.indexes) { + this.indexes[indexName].constructor() + } +} + +Indexd.prototype.__resync = function (done) { + debug('resynchronizing') + + let self = this + function lowestTip (callback) { + self.tips((err, tips) => { + if (err) return callback(err) + + let lowest + for (let key in tips) { + let tip = tips[key] + if (!tip) return callback() + if (!lowest) lowest = tip + if (lowest.height < tip.height) continue + lowest = tip + } + + callback(null, lowest) + }) + } + + parallel({ + bitcoind: (f) => rpcUtil.tip(this.rpc, f), + indexd: (f) => lowestTip(f) + }, (err, r) => { + if (err) return done(err) + + // Step 0, genesis? + if (!r.indexd) { + debug('genesis') + return rpcUtil.blockIdAtHeight(this.rpc, 0, (err, genesisId) => { + if (err) return done(err) + + this.connectFrom(null, genesisId, done) + }) + } + + // Step 1, equal? + debug('...', r) + if (r.bitcoind.blockId === r.indexd.blockId) return done() + + // Step 2, is indexd behind? [aka, does bitcoind have the indexd tip] + rpcUtil.headerJSON(this.rpc, r.indexd.blockId, (err, common) => { +// if (err && /not found/.test(err.message)) return fin(err) // uh, burn it to the ground + if (err) return done(err) + + // forked? + if (common.confirmations === -1) { + debug('forked') + return this.disconnect(r.indexd.blockId, (err) => { + if (err) return done(err) + + this.__resync(done) + }) + } + + // yes, indexd is behind + debug('bitcoind is ahead') + this.connectFrom(common.blockId, common.nextBlockId, done) + }) + }) +} + +Indexd.prototype.tryResync = function (callback) { + if (callback) { + this.emitter.once('resync', callback) + } + + if (this.syncing) return + this.syncing = true + + let self = this + function fin (err) { + self.syncing = false + self.emitter.emit('resync', err) + } + + this.__resync((err, updated) => { + if (err) return fin(err) + if (updated) return this.tryResyncMempool(fin) + fin() + }) +} + +Indexd.prototype.tryResyncMempool = function (callback) { + rpcUtil.mempool(this.rpc, (err, txIds) => { + if (err) return callback(err) + + this.clear() + parallel(txIds.map((txId) => (next) => this.notify(txId, next)), callback) + }) +} + +Indexd.prototype.notify = function (txId, callback) { + rpcUtil.transaction(this.rpc, txId, (err, tx) => { + if (err) return callback(err) + + for (let indexName in this.indexes) { + let index = this.indexes[indexName] + + if (!index.mempool) continue + index.mempool(tx) + } + + callback() + }) +} + +// QUERIES +Indexd.prototype.blockIdByTransactionId = function (txId, callback) { + this.indexes.tx.heightBy(this.db, txId, (err, height) => { + if (err) return callback(err) + if (height === -1) return callback() + + rpcUtil.blockIdAtHeight(this.rpc, height, callback) + }) +} + +Indexd.prototype.latestFeesForNBlocks = function (nBlocks, callback) { + this.indexes.fee.latestFeesFor(this.db, nBlocks, callback) +} + +// returns a txo { txId, vout, value, script }, by key { txId, vout } +Indexd.prototype.txoByTxo = function (txo, callback) { + this.indexes.txo.txoBy(this.db, txo, callback) +} + +// returns the height at scId was first-seen (-1 if unconfirmed) +Indexd.prototype.firstSeenScriptId = function (scId, callback) { + this.indexes.script.firstSeenScriptId(this.db, scId, callback) +} + +// returns a list of txIds with inputs/outputs from/to a { scId, heightRange, ?mempool } +Indexd.prototype.transactionIdsByScriptRange = function (scRange, dbLimit, callback) { + this.txosByScriptRange(scRange, dbLimit, (err, txos) => { + if (err) return callback(err) + + let txIdSet = {} + let tasks = txos.map((txo) => { + txIdSet[txo.txId] = true + return (next) => this.indexes.txin.txinBy(this.db, txo, next) + }) + + parallel(tasks, (err, txins) => { + if (err) return callback(err) + + txins.forEach((txin) => { + if (!txin) return + txIdSet[txin.txId] = true + }) + + callback(null, Object.keys(txIdSet)) + }) + }) +} + +// returns a list of txos { txId, vout, height, value } by { scId, heightRange, ?mempool } +Indexd.prototype.txosByScriptRange = function (scRange, dbLimit, callback) { + this.indexes.script.txosBy(this.db, scRange, dbLimit, callback) +} + +// returns a list of (unspent) txos { txId, vout, height, value }, by { scId, heightRange, ?mempool } +// XXX: despite txo queries being bound by heightRange, the UTXO status is up-to-date +Indexd.prototype.utxosByScriptRange = function (scRange, dbLimit, callback) { + this.txosByScriptRange(scRange, dbLimit, (err, txos) => { + if (err) return callback(err) + + let taskMap = {} + let unspentMap = {} + + txos.forEach((txo) => { + let txoId = txoToString(txo) + unspentMap[txoId] = txo + taskMap[txoId] = (next) => this.indexes.txin.txinBy(this.db, txo, next) + }) + + parallel(taskMap, (err, txinMap) => { + if (err) return callback(err) + + let unspents = [] + for (let txoId in txinMap) { + let txin = txinMap[txoId] + + // has a txin, therefore spent + if (txin) continue + + unspents.push(unspentMap[txoId]) + } + + callback(null, unspents) + }) + }) +} + +module.exports = Indexd diff --git a/indexes/fee.js b/indexes/fee.js new file mode 100644 index 0000000..3598038 --- /dev/null +++ b/indexes/fee.js @@ -0,0 +1,141 @@ +let parallel = require('run-parallel') +let typeforce = require('typeforce') +let types = require('./types') +let vstruct = require('varstruct') + +let FEEPREFIX = 0x81 +let FEETIP = types.tip(FEEPREFIX) +let FEE = { + keyType: typeforce.compile({ + height: typeforce.UInt32 + }), + key: vstruct([ + ['prefix', vstruct.Value(vstruct.UInt8, FEEPREFIX)], + ['height', vstruct.UInt32BE] // big-endian for lexicographical sort + ]), + valueType: typeforce.compile({ + iqr: { + q1: typeforce.UInt53, + median: typeforce.UInt53, + q3: typeforce.UInt53 + }, + size: typeforce.UInt32 + }), + value: vstruct([ + ['iqr', vstruct([ + ['q1', vstruct.UInt64LE], + ['median', vstruct.UInt64LE], + ['q3', vstruct.UInt64LE] + ])], + ['size', vstruct.UInt32LE] + ]) +} + +function FeeIndex () {} + +FeeIndex.prototype.tip = function (db, callback) { + db.get(FEETIP, {}, callback) +} + +function box (data) { + if (data.length === 0) return { q1: 0, median: 0, q3: 0 } + let quarter = (data.length / 4) | 0 + let midpoint = (data.length / 2) | 0 + + return { + q1: data[quarter], + median: data[midpoint], + q3: data[midpoint + quarter] + } +} + +FeeIndex.prototype.connect2ndOrder = function (db, txoIndex, atomic, block, callback) { + let { height, transactions } = block + + let txTasks = [] + transactions.forEach((tx) => { + let { ins, outs, vsize } = tx + let inAccum = 0 + let outAccum = 0 + let txoTasks = [] + let coinbase = false + + ins.forEach((input, vin) => { + if (coinbase) return + if (input.coinbase) { + coinbase = true + return + } + + let { prevTxId, vout } = input + txoTasks.push((next) => { + txoIndex.txoBy(db, { txId: prevTxId, vout }, (err, txo) => { + if (err) return next(err) + if (!txo) return next(new Error(`Missing ${prevTxId}:${vout}`)) + + inAccum += txo.value + next() + }) + }) + }) + + outs.forEach(({ value }, vout) => { + if (coinbase) return + outAccum += value + }) + + txTasks.push((next) => { + if (coinbase) return next(null, 0) + + parallel(txoTasks, (err) => { + if (err) return next(err) + let fee = inAccum - outAccum + let feeRate = Math.floor(fee / vsize) + + next(null, feeRate) + }) + }) + }) + + parallel(txTasks, (err, feeRates) => { + if (err) return callback(err) + feeRates = feeRates.sort((a, b) => a - b) + + atomic.put(FEE, { height }, { + iqr: box(feeRates), + size: block.strippedsize + }) + atomic.put(FEETIP, {}, block) + + callback() + }) +} + +FeeIndex.prototype.disconnect = function (atomic, block) { + let { height } = block + + atomic.del(FEE, { height }) + atomic.put(FEETIP, {}, { blockId: block.prevBlockId, height }) +} + +FeeIndex.prototype.latestFeesFor = function (db, nBlocks, callback) { + db.get(FEETIP, {}, (err, tip) => { + if (err) return callback(err) + if (!tip) return callback(null, []) + + let { height: maxHeight } = tip + let results = [] + + db.iterator(FEE, { + gte: { + height: maxHeight - (nBlocks - 1) + }, + limit: nBlocks + }, ({ height }, { fees, size }) => { + results.push({ height, fees, size }) + }, (err) => callback(err, results)) + }) +} + +module.exports = FeeIndex +module.exports.TYPE = FEE diff --git a/indexes/mediantime.js b/indexes/mediantime.js new file mode 100644 index 0000000..eb4afe9 --- /dev/null +++ b/indexes/mediantime.js @@ -0,0 +1,41 @@ +let typeforce = require('typeforce') +let types = require('./types') +let vstruct = require('varstruct') + +let MTPPREFIX = 0x83 +let MTPTIP = types.tip(MTPPREFIX) +let MTP = { + keyType: typeforce.compile({ + medianTime: typeforce.UInt32, + height: typeforce.UInt32 + }), + key: vstruct([ + ['prefix', vstruct.Value(vstruct.UInt8, MTPPREFIX)], + ['medianTime', vstruct.UInt32BE], // big-endian for lexicographical sort + ['height', vstruct.UInt32LE] + ]), + valueType: typeforce.Null +} + +function MtpIndex () {} + +MtpIndex.prototype.tip = function (db, callback) { + db.get(MTPTIP, {}, callback) +} + +MtpIndex.prototype.connect = function (atomic, block) { + let { height, medianTime } = block + + atomic.put(MTP, { medianTime, height }) + atomic.put(MTPTIP, {}, block) +} + +MtpIndex.prototype.disconnect = function (atomic, block) { + let { height, medianTime } = block + + atomic.del(MTP, { medianTime, height }) + atomic.put(MTPTIP, {}, { blockId: block.prevBlockId, height }) +} + +module.exports = MtpIndex +module.exports.TYPE = MTP diff --git a/indexes/script.js b/indexes/script.js new file mode 100644 index 0000000..8f3320f --- /dev/null +++ b/indexes/script.js @@ -0,0 +1,142 @@ +let crypto = require('crypto') +let types = require('./types') +let typeforce = require('typeforce') +let vstruct = require('varstruct') +let utils = require('./utils') + +let SCRIPTPREFIX = 0x33 +let SCRIPTTIP = types.tip(SCRIPTPREFIX) +let SCRIPT = { + keyType: typeforce.compile({ + scId: typeforce.HexN(64), + height: typeforce.UInt32, + txId: typeforce.HexN(64), + vout: typeforce.UInt32 + }), + key: vstruct([ + ['prefix', vstruct.Value(vstruct.UInt8, SCRIPTPREFIX)], + ['scId', vstruct.String(32, 'hex')], + ['height', vstruct.UInt32BE], // big-endian for lexicographical sort + ['txId', vstruct.String(32, 'hex')], + ['vout', vstruct.UInt32LE] + ]), + valueType: typeforce.compile({ + value: typeforce.UInt53 + }), + value: vstruct([ + ['value', vstruct.UInt64LE] + ]) +} + +function sha256 (buffer) { + return crypto.createHash('sha256') + .update(buffer) + .digest('hex') +} + +function ScriptIndex () { + this.scripts = {} +} + +ScriptIndex.prototype.tip = function (db, callback) { + db.get(SCRIPTTIP, {}, callback) +} + +ScriptIndex.prototype.mempool = function (tx, events) { + let { txId, outs } = tx + + outs.forEach(({ vout, script, value }) => { + let scId = sha256(script) + utils.getOrSetDefault(this.scripts, scId, []) + .push({ txId, vout, height: -1, value }) + + if (events) events.push(['script', scId, null, txId, vout, value]) + }) +} + +ScriptIndex.prototype.connect = function (atomic, block, events) { + let { height, transactions } = block + + transactions.forEach((tx) => { + let { txId, outs } = tx + + outs.forEach(({ vout, script, value }) => { + let scId = sha256(script) + atomic.put(SCRIPT, { scId, height, txId, vout }, { value }) + + if (events) events.push(['script', scId, height, txId, vout, value]) + }) + }) + + atomic.put(SCRIPTTIP, {}, block) +} + +ScriptIndex.prototype.disconnect = function (atomic, block) { + let { height, transactions } = block + + transactions.forEach((tx) => { + let { txId, outs } = tx + + outs.forEach(({ vout, script }) => { + let scId = sha256(script) + atomic.del(SCRIPT, { scId, height, txId, vout }) + }) + }) + + atomic.put(SCRIPTTIP, {}, { blockId: block.prevBlockId, height }) +} + +let ZERO64 = '0000000000000000000000000000000000000000000000000000000000000000' +let MAX64 = 'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff' + +// returns the height at scId was first-seen (-1 if unconfirmed, null if unknown) +ScriptIndex.prototype.firstSeenScriptId = function (db, scId, callback) { + let result = null + db.iterator(SCRIPT, { + gte: { scId, height: 0, txId: ZERO64, vout: 0 }, + lt: { scId, height: 0xffffffff, txId: ZERO64, vout: 0 }, + limit: 1 + }, ({ height }) => { + result = height + }, (err) => { + if (err) return callback(err) + if (result !== null) return callback(null, result) + + let mem = this.scripts[scId] + if (mem) return callback(null, -1) + callback(null, null) + }) +} + +// XXX: if heightRange distance is < 2, the limit is ignored +// -- could be rectified by supporting a minimum txId value (aka, last retrieved) +// +// returns a list of { txId, vout, height, value } by { scId, heightRange: [from, to] } +ScriptIndex.prototype.txosBy = function (db, { scId, heightRange, mempool }, maxRows, callback) { + let [fromHeight, toHeight] = heightRange + let distance = toHeight - fromHeight + if (distance < 0) return callback(null, []) + if (distance < 2) maxRows = Infinity + fromHeight = Math.min(Math.max(0, fromHeight), 0xffffffff) + toHeight = Math.min(Math.max(0, toHeight), 0xffffffff) + + let results = [] + if (mempool && (scId in this.scripts)) { + results = this.scripts[scId].concat() + } + + db.iterator(SCRIPT, { + gte: { scId, height: fromHeight, txId: ZERO64, vout: 0 }, + lt: { scId, height: toHeight, txId: MAX64, vout: 0xffffffff }, + limit: maxRows + 1 + }, ({ height, txId, vout }, { value }, __iterator) => { + results.push({ + txId, vout, height, value + }) + + if (results.length > maxRows) return __iterator.end((err) => callback(err || new RangeError('Exceeded Limit'))) + }, (err) => callback(err, results)) +} + +module.exports = ScriptIndex +module.exports.TYPE = SCRIPT diff --git a/indexes/tx.js b/indexes/tx.js new file mode 100644 index 0000000..82bcdc5 --- /dev/null +++ b/indexes/tx.js @@ -0,0 +1,74 @@ +let types = require('./types') +let typeforce = require('typeforce') +let vstruct = require('varstruct') + +let TXPREFIX = 0x35 +let TXTIP = types.tip(TXPREFIX) +let TX = { + keyType: typeforce.compile({ + txId: typeforce.HexN(64) + }), + key: vstruct([ + ['prefix', vstruct.Value(vstruct.UInt8, TXPREFIX)], + ['txId', vstruct.String(32, 'hex')] + ]), + valueType: typeforce.compile({ + height: typeforce.UInt32 + }), + value: vstruct([ + ['height', vstruct.UInt32LE] + ]) +} + +function TxIndex () { + this.txs = {} +} + +TxIndex.prototype.tip = function (db, callback) { + db.get(TXTIP, {}, callback) +} + +TxIndex.prototype.mempool = function (tx, events) { + let { txId } = tx + + this.txs[txId] = true +} + +TxIndex.prototype.connect = function (atomic, block, events) { + let { height, transactions } = block + + transactions.forEach((tx) => { + let { txId } = tx + atomic.put(TX, { txId }, { height }) + }) + + atomic.put(TXTIP, {}, block) +} + +TxIndex.prototype.disconnect = function (atomic, block) { + let { height, transactions } = block + + transactions.forEach((tx) => { + let { txId } = tx + + atomic.del(TX, { txId }) + }) + + atomic.put(TXTIP, {}, { blockId: block.prevBlockId, height }) +} + +// returns the height (-1 if unconfirmed, null if unknown) of a transaction, by txId +TxIndex.prototype.heightBy = function (db, txId, callback) { + let mem = this.txs[txId] + if (mem) return callback(null, -1) + + db.get(TX, txId, (err, result) => { + if (err) return callback(err) + if (!result) return callback(null, null) + + callback(null, result.height) + }) +} + +module.exports = TxIndex +module.exports.TYPE = TX diff --git a/indexes/txin.js b/indexes/txin.js new file mode 100644 index 0000000..969098b --- /dev/null +++ b/indexes/txin.js @@ -0,0 +1,93 @@ +let types = require('./types') +let typeforce = require('typeforce') +let vstruct = require('varstruct') +let utils = require('./utils') + +let TXINPREFIX = 0x32 +let TXINTIP = types.tip(TXINPREFIX) +let TXIN = { + keyType: typeforce.compile({ + txId: typeforce.HexN(64), + vout: typeforce.UInt32 + }), + key: vstruct([ + ['prefix', vstruct.Value(vstruct.UInt8, TXINPREFIX)], + ['txId', vstruct.String(32, 'hex')], + ['vout', vstruct.UInt32LE] + ]), + valueType: typeforce.compile({ + txId: typeforce.HexN(64), + vin: typeforce.UInt32 + }), + value: vstruct([ + ['txId', vstruct.String(32, 'hex')], + ['vin', vstruct.UInt32LE] + ]) +} + +function TxinIndex () { + this.txins = {} +} + +TxinIndex.prototype.tip = function (db, callback) { + db.get(TXINTIP, {}, callback) +} + +TxinIndex.prototype.mempool = function (tx, events) { + let { txId, ins } = tx + + ins.forEach((input, vin) => { + if (input.coinbase) return + let { prevTxId, vout } = input + + utils.getOrSetDefault(this.txins, `${prevTxId}:${vout}`, []) + .push({ txId, vin }) + + if (events) events.push(['txin', `${prevTxId}:${vout}`, txId, vin]) + }) +} + +TxinIndex.prototype.connect = function (atomic, block, events) { + let { transactions } = block + + transactions.forEach((tx) => { + let { txId, ins } = tx + + ins.forEach((input, vin) => { + if (input.coinbase) return + + let { prevTxId, vout } = input + atomic.put(TXIN, { txId: prevTxId, vout }, { txId, vin }) + + if (events) events.push(['txin', `${prevTxId}:${vout}`, txId, vin]) + }) + }) + + atomic.put(TXINTIP, {}, block) +} + +TxinIndex.prototype.disconnect = function (atomic, block) { + let { height, transactions } = block + + transactions.forEach((tx) => { + let { txId, outs } = tx + + outs.forEach(({ value, vout }) => { + atomic.del(TXIN, { txId, vout }) + }) + }) + + atomic.put(TXINTIP, {}, { blockId: block.prevBlockId, height }) +} + +// returns a txin { txId, vin } by { txId, vout } +TxinIndex.prototype.txinBy = function (db, txo, callback) { + let { txId, vout } = txo + let mem = this.txins[`${txId}:${vout}`] + if (mem) return callback(null, mem[0]) // XXX: returns first-seen only + + db.get(TXIN, txo, callback) +} + +module.exports = TxinIndex +module.exports.TYPE = TXIN diff --git a/indexes/txo.js b/indexes/txo.js new file mode 100644 index 0000000..2f771e1 --- /dev/null +++ b/indexes/txo.js @@ -0,0 +1,81 @@ +let types = require('./types') +let typeforce = require('typeforce') +let vstruct = require('varstruct') + +let TXOPREFIX = 0x34 +let TXOTIP = types.tip(TXOPREFIX) +let TXO = { + keyType: typeforce.compile({ + txId: typeforce.HexN(64), + vout: typeforce.UInt32 + }), + key: vstruct([ + ['prefix', vstruct.Value(vstruct.UInt8, TXOPREFIX)], + ['txId', vstruct.String(32, 'hex')], + ['vout', vstruct.UInt32LE] + ]), + valueType: typeforce.compile({ + value: typeforce.UInt53, + script: typeforce.Buffer + }), + value: vstruct([ + ['value', vstruct.UInt64LE], + ['script', vstruct.VarBuffer(vstruct.UInt16LE)] + ]) +} + +function TxoIndex () { + this.txos = {} +} + +TxoIndex.prototype.tip = function (db, callback) { + db.get(TXOTIP, {}, callback) +} + +TxoIndex.prototype.mempool = function (tx) { + let { txId, outs } = tx + + outs.forEach(({ script, value, vout }) => { + this.txos[`${txId}:${vout}`] = { script, value } + }) +} + +TxoIndex.prototype.connect = function (atomic, block) { + let { transactions } = block + + transactions.forEach((tx) => { + let { txId, outs } = tx + + outs.forEach(({ script, value, vout }) => { + atomic.put(TXO, { txId, vout }, { value, script }) + }) + }) + + atomic.put(TXOTIP, {}, block) +} + +TxoIndex.prototype.disconnect = function (atomic, block) { + let { height, transactions } = block + + transactions.forEach((tx) => { + let { txId, outs } = tx + + outs.forEach(({ value, vout }) => { + atomic.del(TXO, { txId, vout }) + }) + }) + + atomic.put(TXOTIP, {}, { blockId: block.prevBlockId, height }) +} + +// returns a txo { txId, vout, value, script } by { txId, vout } +TxoIndex.prototype.txoBy = function (db, txo, callback) { + let { txId, vout } = txo + let mem = this.txos[`${txId}:${vout}`] + if (mem) return callback(null, mem) + + db.get(TXO, txo, callback) +} + +module.exports = TxoIndex +module.exports.TYPE = TXO diff --git a/indexes/types.js b/indexes/types.js new file mode 100644 index 0000000..c287a4d --- /dev/null +++ b/indexes/types.js @@ -0,0 +1,21 @@ +let typeforce = require('typeforce') +let vstruct = require('varstruct') + +function tip (prefix) { + return { + keyType: {}, + key: vstruct([ + ['prefix', vstruct.Value(vstruct.UInt8, prefix)] + ]), + valueType: { + blockId: typeforce.HexN(64), + height: typeforce.UInt32 + }, + value: vstruct([ + ['blockId', vstruct.String(32, 'hex')], + ['height', vstruct.UInt32LE] + ]) + } +} + +module.exports = { tip } diff --git a/indexes/utils.js b/indexes/utils.js new file mode 100644 index 0000000..52abcae --- /dev/null +++ b/indexes/utils.js @@ -0,0 +1,8 @@ +function getOrSetDefault (object, key, defaultValue) { + let existing = object[key] + if (existing !== undefined) return existing + object[key] = defaultValue + return defaultValue +} + +module.exports = { getOrSetDefault } diff --git a/mempool.js b/mempool.js deleted file mode 100644 index ad9019c..0000000 --- a/mempool.js +++ /dev/null @@ -1,142 +0,0 @@ -let debug = require('./debug')('indexd:mempool') -let parallel = require('run-parallel') -let rpcUtil = require('./rpc') - -function Mempool (emitter, rpc) { - this.emitter = emitter - this.rpc = rpc - this.scripts = {} - this.spents = {} - this.txos = {} - - this.statistics = { - transactions: 0, - inputs: 0, - outputs: 0 - } -} - -function getOrSetDefault (object, key, defaultValue) { - let existing = object[key] - if (existing !== undefined) return existing - object[key] = defaultValue - return defaultValue -} - -let waiting -Mempool.prototype.add = function (txId, callback) { - rpcUtil.transaction(this.rpc, txId, (err, tx) => { - if (err) return callback(err) - if (!tx) { - debug(`${txId} dropped`) - return callback() - } - - let { txBuffer, ins, outs } = tx - - this.statistics.transactions++ - ins.forEach((input, vin) => { - if (input.coinbase) return - let { prevTxId, vout } = input - - getOrSetDefault(this.spents, `${prevTxId}:${vout}`, []).push({ txId, vin }) - this.statistics.inputs++ - - setTimeout(() => this.emitter.emit('spent', `${prevTxId}:${vout}`, txId, txBuffer)) - }) - - outs.forEach(({ scId, script, value }, vout) => { - getOrSetDefault(this.scripts, scId, []).push({ txId, vout }) - this.txos[`${txId}:${vout}`] = { value, script } - this.statistics.outputs++ - - setTimeout(() => this.emitter.emit('script', scId, txId, txBuffer)) - }) - - if (!waiting) { - waiting = true - - debug(this.statistics) - setTimeout(() => { - waiting = false - }, 30000) - } - - setTimeout(() => this.emitter.emit('transaction', txId, txBuffer)) - callback() - }, true) -} - -Mempool.prototype.reset = function (callback) { - this.scripts = {} - this.spents = {} - this.txos = {} - this.statistics = { - transactions: 0, - inputs: 0, - outputs: 0 - } - - debug(`Cleared`) - rpcUtil.mempool(this.rpc, (err, actualTxIds) => { - if (err) return callback(err) - - debug(`Downloading ${actualTxIds.length} transactions`) - let tasks = actualTxIds.map(txId => next => this.add(txId, next)) - - parallel(tasks, (err) => { - if (err) return callback(err) - - debug(`Downloaded ${actualTxIds.length} transactions`) - callback() - }) - }) -} - -// QUERIES -Mempool.prototype.seenScriptId = function (scId) { - return Boolean(this.scripts[scId]) -} - -Mempool.prototype.spentsFromTxo = function ({ txId, vout }) { - return this.spents[`${txId}:${vout}`] || [] -} - -Mempool.prototype.transactionIdsByScriptId = function (scId) { - let txos = this.__txosListByScriptId(scId) - let txIdSet = {} - - txos.forEach((txo) => { - this.spentsFromTxo(txo).forEach(({ txId }) => { - txIdSet[txId] = true - }) - }) - - txos.forEach(({ txId }) => { - txIdSet[txId] = true - }) - - return txIdSet -} - -Mempool.prototype.__txosListByScriptId = function (scId) { - return this.scripts[scId] || [] -} - -Mempool.prototype.txosByScriptId = function (scId) { - let txos = this.scripts[scId] - if (!txos) return {} - - let resultMap = {} - txos.forEach(({ txId, vout }) => { - resultMap[`${txId}:${vout}`] = { txId, vout, scId } - }) - - return resultMap -} - -Mempool.prototype.txoByTxo = function (txId, vout) { - return this.txos[`${txId}:${vout}`] -} - -module.exports = Mempool diff --git a/package.json b/package.json index a97a9a9..71c3d4c 100644 --- a/package.json +++ b/package.json @@ -8,17 +8,6 @@ "typeforce": "^1.10.6", "varstruct": "^6.1.1" }, - "devDependencies": { - "bitcoinjs-lib": "^3.3.0", - "debug": "^3.1.0", - "dotenv": "^4.0.0", - "express": "^4.16.2", - "leveldown": "^1.6.0", - "qup": "^1.2.1", - "ws": "^3.3.1", - "yajrpc": "^1.3.0", - "zmq": "^2.15.3" - }, "optionalDependencies": { "debug": "*" }, @@ -26,9 +15,7 @@ "type": "git", "url": "https://github.com/dcousens/indexd.git" }, - "scripts": { - "start": "cd example/ && node index.js" - }, + "scripts": {}, "author": "Daniel Cousens", "license": "ISC" } diff --git a/resync.js b/resync.js deleted file mode 100644 index d395e5e..0000000 --- a/resync.js +++ /dev/null @@ -1,82 +0,0 @@ -let debug = require('./debug')('indexd:resync') -let parallel = require('run-parallel') -let rpcUtil = require('./rpc') - -// recursively calls connectBlock(id) until `bitcoind[id].next` is falsy -function connectBlock (rpc, indexd, id, height, callback) { - debug(`Connecting ${id} @ ${height}`) - - indexd.connect(id, height, (err, nextblockhash) => { - if (err) return callback(err) - - debug(`Connected ${id} @ ${height}`) - if (!nextblockhash) return callback() - - // recurse until next is falsy - connectBlock(rpc, indexd, nextblockhash, height + 1, callback) - }) -} - -function disconnectBlock (indexd, id, callback) { - debug(`Disconnecting ${id}`) - - indexd.disconnect(id, (err) => { - if (err) return callback(err) - - debug(`Disconnected ${id}`) - callback() - }) -} - -module.exports = function resync (rpc, indexd, callback) { - debug('fetching bitcoind/indexd tips') - - function trySyncFrom (id, height) { - connectBlock(rpc, indexd, id, height, (err) => { - if (err) return callback(err) - callback(null, true) - }) - } - - parallel({ - bitcoind: (f) => rpcUtil.tip(rpc, f), - indexd: (f) => indexd.tip(f) - }, (err, tips) => { - if (err) return callback(err) - - // Step 0, genesis? - if (!tips.indexd) { - debug('genesis') - return rpcUtil.blockIdAtHeight(rpc, 0, (err, genesisId) => { - if (err) return callback(err) - - trySyncFrom(genesisId, 0) - }) - } - - // Step 1, equal? - debug('...', tips) - if (tips.bitcoind === tips.indexd) return callback(null, false) - - // Step 2, is indexd behind? [aka, does bitcoind have the indexd tip] - rpcUtil.headerJSON(rpc, tips.indexd, (err, common) => { - // no? forked - if ( - (err && err.message === 'Block not found') || - (!err && common.confirmations === -1) - ) { - debug('indexd is forked') - return disconnectBlock(indexd, tips.indexd, (err) => { - if (err) return callback(err) - - resync(rpc, indexd, callback) - }) - } - if (err) return callback(err) - - // yes, indexd is behind - debug('bitcoind is ahead') - trySyncFrom(common.nextblockhash, common.height + 1) - }) - }) -} diff --git a/rpc.js b/rpc.js index 7e42059..106f6af 100644 --- a/rpc.js +++ b/rpc.js @@ -1,4 +1,3 @@ -let crypto = require('crypto') let debug = require('./debug')('indexd:rpc') function rpcd (rpc, method, params, done) { @@ -11,14 +10,7 @@ function rpcd (rpc, method, params, done) { }) } -function sha256 (hex) { - return crypto.createHash('sha256') - .update(Buffer.from(hex, 'hex')) - .digest('hex') -} - function augment (tx) { - tx.txBuffer = Buffer.from(tx.hex, 'hex') delete tx.hex tx.txId = tx.txid delete tx.txid @@ -29,7 +21,6 @@ function augment (tx) { tx.vout.forEach((output) => { output.script = Buffer.from(output.scriptPubKey.hex, 'hex') delete output.scriptPubKey - output.scId = sha256(output.script) output.value = Math.round(output.value * 1e8) output.vout = output.n delete output.n @@ -45,8 +36,18 @@ function block (rpc, blockId, done) { rpcd(rpc, 'getblock', [blockId, 2], (err, block) => { if (err) return done(err) + block.blockId = blockId + delete block.hash + block.nextBlockId = block.nextblockhash + delete block.nextblockhash + block.prevBlockId = block.previousblockhash + delete block.prevblockhash + block.medianTime = block.mediantime + delete block.mediantime + block.transactions = block.tx.map(t => augment(t)) delete block.tx + done(null, block) }) } @@ -56,7 +57,16 @@ function blockIdAtHeight (rpc, height, done) { } function headerJSON (rpc, blockId, done) { - rpcd(rpc, 'getblockheader', [blockId, true], done) + rpcd(rpc, 'getblockheader', [blockId, true], (err, header) => { + if (err) return done(err) + + header.blockId = blockId + delete header.hash + header.nextBlockId = header.nextblockhash + delete header.nextblockhash + + done(null, header) + }) } function mempool (rpc, done) { @@ -64,7 +74,16 @@ function mempool (rpc, done) { } function tip (rpc, done) { - rpcd(rpc, 'getbestblockhash', [], done) + rpcd(rpc, 'getchaintips', [], (err, tips) => { + if (err) return done(err) + + let { + hash: blockId, + height + } = tips.filter(x => x.status === 'active').pop() + + done(null, { blockId, height }) + }) } function transaction (rpc, txId, next, forgiving) { diff --git a/types.js b/types.js deleted file mode 100644 index d956027..0000000 --- a/types.js +++ /dev/null @@ -1,131 +0,0 @@ -let typeforce = require('typeforce') -let tfHex64 = typeforce.HexN(64) - -let vstruct = require('varstruct') -let Hex64 = vstruct.String(32, 'hex') -let vout = vstruct.UInt32LE -let satoshis = vstruct.UInt64LE - -let tip = { - keyType: {}, - key: vstruct([ - ['prefix', vstruct.Value(vstruct.UInt8, 0x00)] - ]), - valueType: { - blockId: tfHex64, - height: typeforce.UInt32 - }, - value: vstruct([ - ['blockId', Hex64], - ['height', vstruct.UInt32LE] - ]) -} - -let scIndex = { - keyType: typeforce.compile({ - scId: tfHex64, - height: typeforce.UInt32, - txId: tfHex64, - vout: typeforce.UInt32 - }), - key: vstruct([ - ['prefix', vstruct.Value(vstruct.UInt8, 0x01)], - ['scId', Hex64], - ['height', vstruct.UInt32BE], // big-endian for lexicographical sort - ['txId', Hex64], - ['vout', vout] - ]), - valueType: typeforce.Null, - value: null -} - -let spentIndex = { - keyType: typeforce.compile({ - txId: tfHex64, - vout: typeforce.UInt32 - }), - key: vstruct([ - ['prefix', vstruct.Value(vstruct.UInt8, 0x02)], - ['txId', Hex64], - ['vout', vout] - ]), - valueType: typeforce.compile({ - txId: tfHex64, - vin: typeforce.UInt32 - }), - value: vstruct([ - ['txId', Hex64], - ['vin', vout] - ]) -} - -let txIndex = { - keyType: typeforce.compile({ - txId: tfHex64 - }), - key: vstruct([ - ['prefix', vstruct.Value(vstruct.UInt8, 0x03)], - ['txId', Hex64] - ]), - valueType: typeforce.compile({ - height: typeforce.UInt32 - }), - value: vstruct([ - ['height', vstruct.UInt32LE] - ]) -} - -let txoIndex = { - keyType: typeforce.compile({ - txId: tfHex64, - vout: typeforce.UInt32 - }), - key: vstruct([ - ['prefix', vstruct.Value(vstruct.UInt8, 0x04)], - ['txId', Hex64], - ['vout', vout] - ]), - valueType: typeforce.compile({ - value: typeforce.UInt53, - script: typeforce.Buffer - }), - value: vstruct([ - ['value', satoshis] -// ['script', vstruct.VarBuffer(vstruct.UInt16LE)] // TODO: add in 0.9.0 - ]) -} - -let feeIndex = { - keyType: typeforce.compile({ - height: typeforce.UInt32 - }), - key: vstruct([ - ['prefix', vstruct.Value(vstruct.UInt8, 0x11)], - ['height', vstruct.UInt32BE] // big-endian for lexicographical sort - ]), - valueType: typeforce.compile({ - size: typeforce.UInt32, - fees: { - q1: typeforce.UInt53, - median: typeforce.UInt53, - q3: typeforce.UInt53 - } - }), - value: vstruct([ - ['size', vstruct.UInt32LE], - ['fees', vstruct([ - ['q1', satoshis], - ['median', satoshis], - ['q3', satoshis] - ])] - ]) -} - -module.exports = { - feeIndex, - scIndex, - spentIndex, - txIndex, - txoIndex, - tip -}