Skip to content

fix: issues found when integrating into js-ipfs #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions src/decision/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ const debug = require('debug')
const _ = require('highland')
const async = require('async')

const log = debug('engine')
log.error = debug('engine:error')
const log = debug('bitswap:engine')
log.error = debug('bitswap:engine:error')

const Message = require('../message')
const Wantlist = require('../wantlist')
Expand All @@ -29,7 +29,7 @@ module.exports = class Engine {
const msg = new Message(false)
msg.addBlock(env.block)

log('Sending block %s to %s', env.peer.toHexString(), env.block)
log('Sending block to %s', env.peer.toB58String(), env.block.data.toString())

this.network.sendMessage(env.peer, msg, (err) => {
if (err) {
Expand All @@ -55,6 +55,7 @@ module.exports = class Engine {
if (!nextTask) return push(null, _.nil)

this.datastore.get(nextTask.entry.key, (err, block) => {
log('fetched: %s', block.key.toString('hex'), block.data.toString())
if (err || !block) {
nextTask.done()
} else {
Expand All @@ -78,11 +79,11 @@ module.exports = class Engine {
}

wantlistForPeer (peerId) {
if (!this.ledgerMap.has(peerId)) {
if (!this.ledgerMap.has(peerId.toB58String())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return new Map()
}

return this.ledgerMap.get(peerId).wantlist.sortedEntries()
return this.ledgerMap.get(peerId.toB58String()).wantlist.sortedEntries()
}

peers () {
Expand All @@ -92,7 +93,7 @@ module.exports = class Engine {
// Handle incoming messages
messageReceived (peerId, msg, cb) {
if (msg.empty) {
log('received empty message from %s', peerId)
log('received empty message from %s', peerId.toB58String())
}

const ledger = this._findOrCreate(peerId)
Expand All @@ -103,7 +104,7 @@ module.exports = class Engine {
}

this._processBlocks(msg.blocks, ledger)

log('wantlist', Array.from(msg.wantlist.values()))
async.eachSeries(
msg.wantlist.values(),
this._processWantlist.bind(this, ledger, peerId),
Expand Down Expand Up @@ -133,19 +134,20 @@ module.exports = class Engine {

_processWantlist (ledger, peerId, entry, cb) {
if (entry.cancel) {
log('cancel %s', entry.key)
log('cancel %s', entry.key.toString('hex'))
ledger.cancelWant(entry.key)
this.peerRequestQueue.remove(entry.key, peerId)
async.setImmediate(() => cb())
} else {
log('wants %s - %s', entry.key, entry.priority)
log('wants %s - %s', entry.key.toString('hex'), entry.priority)
ledger.wants(entry.key, entry.priority)

// If we already have the block, serve it
this.datastore.has(entry.key, (err, exists) => {
if (err) {
log('failed existence check %s', entry.key)
} else {
log('failed existence check %s', entry.key.toString('hex'))
} else if (exists) {
log('has want %s', entry.key.toString('hex'))
this.peerRequestQueue.push(entry.entry, peerId)
}
cb()
Expand All @@ -155,7 +157,7 @@ module.exports = class Engine {

_processBlocks (blocks, ledger) {
for (let block of blocks.values()) {
log('got block %s %s bytes', block.key, block.data.length)
log('got block %s %s bytes', block.key.toString('hex'), block.data.length)
ledger.receivedBytes(block.data.length)

this.receivedBlock(block)
Expand All @@ -181,21 +183,21 @@ module.exports = class Engine {
}

peerDisconnected (peerId) {
// if (this.ledgerMap.has(peerId)) {
// this.ledgerMap.delete(peerId)
// if (this.ledgerMap.has(peerId.toB58String())) {
// this.ledgerMap.delete(peerId.toB58String())
// }
//
// TODO: figure out how to remove all other references
// in the peerrequestqueue
}

_findOrCreate (peerId) {
if (this.ledgerMap.has(peerId)) {
return this.ledgerMap.get(peerId)
if (this.ledgerMap.has(peerId.toB58String())) {
return this.ledgerMap.get(peerId.toB58String())
}

const l = new Ledger(peerId)
this.ledgerMap.set(peerId, l)
this.ledgerMap.set(peerId.toB58String(), l)

return l
}
Expand Down
8 changes: 4 additions & 4 deletions src/decision/peer-request-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ module.exports = class PeerRequestQueue {

// Add a new entry to the queue
push (entry, to) {
let partner = this.partners.get(to)
let partner = this.partners.get(to.toB58String())

if (!partner) {
partner = new ActivePartner()
this.pQueue.push(partner)
this.partners.set(to, partner)
this.partners.set(to.toB58String(), partner)
}

if (partner.activeBlocks.has(entry.key)) {
Expand Down Expand Up @@ -118,13 +118,13 @@ module.exports = class PeerRequestQueue {
t.trash = true

// having canceled a block, we now account for that in the given partner
this.partners.get(peerId).requests --
this.partners.get(peerId.toB58String()).requests --
}
}
}

function taskKey (peerId, key) {
return `${peerId.toHexString()}:${key.toString('hex')}`
return `${peerId.toB58String()}:${key.toString('hex')}`
}

function partnerCompare (a, b) {
Expand Down
52 changes: 37 additions & 15 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ module.exports = class Bitwap {

this.notifications = new EventEmitter()
this.notifications.setMaxListeners(cs.maxListeners)

this.wm.run()
}

// handle messages received through the network
_receiveMessage (peerId, incoming, cb) {
cb = cb || (() => {})
log('receiving message from %s', peerId.toB58String())
this.engine.messageReceived(peerId, incoming, (err) => {
if (err) {
log('failed to receive message', incoming)
Expand All @@ -55,7 +55,7 @@ module.exports = class Bitwap {
for (let block of iblocks.values()) {
const found = this.wm.wl.contains(block.key)
if (!found) {
log('received un-askes-for %s from %s', block, peerId)
log('received un-askes-for %s from %s', block.key.toString('hex'), peerId.toB58String())
} else {
keys.push(block.key)
}
Expand All @@ -71,7 +71,7 @@ module.exports = class Bitwap {
return innerCb()
}

log('got block %s from %s', block, peerId)
log('got block from %s', peerId.toB58String(), block.data.toString())
innerCb()
}),
(innerCb) => this.hasBlock(block, (err) => {
Expand Down Expand Up @@ -104,14 +104,15 @@ module.exports = class Bitwap {
}

_tryPutBlock (block, times, cb) {
log('trying to put block %s', block.data.toString())
async.retry({times, interval: 400}, (done) => {
this.datastore.put(block, done)
}, cb)
}

// handle errors on the receiving channel
_receiveError (err) {
log.debug('Bitswap ReceiveError: %s', err.message)
log.error('ReceiveError: %s', err.message)
}

// handle new peers
Expand Down Expand Up @@ -155,6 +156,7 @@ module.exports = class Bitwap {
const blocks = []
const finish = (block) => {
blocks.push(block)
log('finish: %s/%s', blocks.length, keys.length)
if (blocks.length === keys.length) {
cb(null, blocks)
}
Expand All @@ -163,20 +165,28 @@ module.exports = class Bitwap {
keys.forEach((key) => {
// Sanity check, we don't want to announce looking for blocks
// when we might have them ourselves
this.datastore.get(key, (err, res) => {
if (!err && res) {
this.wm.cancelWants([key])
finish(res)
this.datastore.has(key, (err, exists) => {
if (err) {
log('error in datastore.has: ', err.message)
return
}

if (err) {
log('error in datastore.get: ', err.message)
}
if (exists) {
this.datastore.get(key, (err, res) => {
if (!err && res) {
this.wm.cancelWants([key])
finish(res)
return
}

this.notifications.once(`block:${key.toString('hex')}`, (block) => {
finish(block)
})
if (err) {
log('error in datastore.get: ', err.message)
}
})
}
})
this.notifications.once(`block:${key.toString('hex')}`, (block) => {
finish(block)
})
})

Expand All @@ -197,6 +207,7 @@ module.exports = class Bitwap {
log.error('Error writing block to datastore: %s', err.message)
return cb(err)
}
log('put block: %s', block.key.toString('hex'))
this.notifications.emit(`block:${block.key.toString('hex')}`, block)
this.engine.receivedBlock(block)
cb()
Expand All @@ -216,4 +227,15 @@ module.exports = class Bitwap {
peers: this.engine.peers()
}
}

start () {
this.wm.run()
this.network.start()
}

// Halt everything
stop () {
this.wm.stop()
this.network.start()
}
}
14 changes: 7 additions & 7 deletions src/message/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,31 @@ class BitswapMessage {
}

addEntry (key, priority, cancel) {
const e = this.wantlist.get(key)
const e = this.wantlist.get(key.toString('hex'))

if (e) {
e.priority = priority
e.cancel = Boolean(cancel)
} else {
this.wantlist.set(key, new Entry(key, priority, cancel))
this.wantlist.set(key.toString('hex'), new Entry(key, priority, cancel))
}
}

addBlock (block) {
this.blocks.set(block.key, block)
this.blocks.set(block.key.toString('hex'), block)
}

cancel (key) {
this.wantlist.delete(key)
this.addEntry(key, 0, true)
this.wantlist.delete(key.toString('hex'))
this.addEntry(key.toString('hex'), 0, true)
}

toProto () {
return pbm.Message.encode({
wantlist: {
entries: Array.from(this.wantlist.values()).map((e) => {
return {
block: String(e.key),
block: e.key.toString('hex'),
priority: Number(e.priority),
cancel: Boolean(e.cancel)
}
Expand Down Expand Up @@ -73,7 +73,7 @@ BitswapMessage.fromProto = (raw) => {
const m = new BitswapMessage(dec.wantlist.full)

dec.wantlist.entries.forEach((e) => {
m.addEntry(e.block, e.priority, e.cancel)
m.addEntry(new Buffer(e.block, 'hex'), e.priority, e.cancel)
})
dec.blocks.forEach((b) => m.addBlock(new Block(b)))

Expand Down
32 changes: 25 additions & 7 deletions src/network/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,38 @@

const bl = require('bl')
const async = require('async')
const debug = require('debug')

const Message = require('../message')
const log = debug('bitswap:network')

const PROTOCOL_IDENTIFIER = '/ipfs/bitswap/1.0.0'

module.exports = class Network {
constructor (libp2p, peerBook, bitswap) {
this.libp2p = libp2p
this.peerBook = peerBook
this.bitswap = bitswap

this._attachSwarmListeners()
}

_attachSwarmListeners () {
this.libp2p.swarm.handle('/ipfs/bitswap/1.0.0', this._onConnection.bind(this))
start () {
// bind event listeners
this._onConnection = this._onConnection.bind(this)
this._onPeerMux = this._onPeerMux.bind(this)
this._onPeerMuxClosed = this._onPeerMuxClosed.bind(this)

this.libp2p.swarm.handle(PROTOCOL_IDENTIFIER, this._onConnection)

this.libp2p.swarm.on('peer-mux-established', this._onPeerMux)

this.libp2p.swarm.on('peer-mux-closed', this._onPeerMuxClosed)
}

this.libp2p.swarm.on('peer-mux-established', this._onPeerMux.bind(this))
stop () {
this.libp2p.swarm.unhandle(PROTOCOL_IDENTIFIER)
this.libp2p.swarm.removeEventListener('peer-mux-established', this._onPeerMux)

this.libp2p.swarm.on('peer-mux-closed', this._onPeerMuxClosed.bind(this))
this.libp2p.swarm.removeEventListener('peer-mux-closed', this._onPeerMuxClosed)
}

_onConnection (conn) {
Expand Down Expand Up @@ -47,6 +62,7 @@ module.exports = class Network {

// Connect to the given peer
connectTo (peerId, cb) {
log('connecting to %s', peerId.toB58String())
const done = (err) => async.setImmediate(() => cb(err))
// NOTE: For now, all this does is ensure that we are
// connected. Once we have Peer Routing, we will be able
Expand All @@ -60,6 +76,8 @@ module.exports = class Network {

// Send the given msg (instance of Message) to the given peer
sendMessage (peerId, msg, cb) {
log('sendMessage to %s', peerId.toB58String())
log('msg %s', msg.full, msg.wantlist, msg.blocks)
const done = (err) => async.setImmediate(() => cb(err))
let peerInfo
try {
Expand All @@ -75,7 +93,7 @@ module.exports = class Network {

conn.write(msg.toProto())
conn.once('error', (err) => done(err))
conn.once('end', done)
conn.once('finish', done)
conn.end()
})
}
Expand Down
Loading