Skip to content

Commit 1a14c92

Browse files
authoredJul 1, 2022
fix: time out slow senders (#455)
If no Bitswap message is received from a remote after 30s, close the stream. The timeout value is configurable.
1 parent b20e6b9 commit 1a14c92

File tree

5 files changed

+56
-2
lines changed

5 files changed

+56
-2
lines changed
 

‎package.json

+2
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@
168168
"@libp2p/tracked-map": "^2.0.0",
169169
"@multiformats/multiaddr": "^10.1.8",
170170
"@vascosantos/moving-average": "^1.1.0",
171+
"abortable-iterator": "^4.0.2",
171172
"any-signal": "^3.0.0",
172173
"blockstore-core": "^1.0.2",
173174
"debug": "^4.2.0",
@@ -179,6 +180,7 @@
179180
"multiformats": "^9.0.4",
180181
"protobufjs": "^6.10.2",
181182
"readable-stream": "^4.0.0",
183+
"timeout-abort-controller": "^3.0.0",
182184
"uint8arrays": "^3.0.0",
183185
"varint": "^6.0.0",
184186
"varint-decoder": "^1.0.0"

‎src/bitswap.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ export class Bitswap extends BaseBlockstore {
5151
* @param {number} [options.statsComputeThrottleMaxQueueSize=1000]
5252
* @param {number} [options.maxInboundStreams=32]
5353
* @param {number} [options.maxOutboundStreams=32]
54+
* @param {number} [options.incomingStreamTimeout=30000]
5455
* @param {MultihashHasherLoader} [options.hashLoader]
5556
*/
5657
constructor (libp2p, blockstore, options = {}) {
@@ -72,7 +73,8 @@ export class Bitswap extends BaseBlockstore {
7273
this.network = new Network(libp2p, this, this._stats, {
7374
hashLoader: options.hashLoader,
7475
maxInboundStreams: options.maxInboundStreams,
75-
maxOutboundStreams: options.maxOutboundStreams
76+
maxOutboundStreams: options.maxOutboundStreams,
77+
incomingStreamTimeout: options.incomingStreamTimeout
7678
})
7779

7880
// local database

‎src/index.js

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { Bitswap } from './bitswap.js'
1919
* @param {number} [options.statsComputeThrottleMaxQueueSize=1000]
2020
* @param {number} [options.maxInboundStreams=32]
2121
* @param {number} [options.maxOutboundStreams=128]
22+
* @param {number} [options.incomingStreamTimeout=30000]
2223
* @param {MultihashHasherLoader} [options.hashLoader]
2324
* @returns {IPFSBitswap}
2425
*/

‎src/network.js

+12-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import { createTopology } from '@libp2p/topology'
44
import { BitswapMessage as Message } from './message/index.js'
55
import * as CONSTANTS from './constants.js'
66
import { logger } from './utils/index.js'
7+
import { TimeoutController } from 'timeout-abort-controller'
8+
import { abortableSource } from 'abortable-iterator'
79

810
/**
911
* @typedef {import('@libp2p/interface-peer-id').PeerId} PeerId
@@ -26,6 +28,7 @@ const BITSWAP120 = '/ipfs/bitswap/1.2.0'
2628

2729
const DEFAULT_MAX_INBOUND_STREAMS = 32
2830
const DEFAULT_MAX_OUTBOUND_STREAMS = 128
31+
const DEFAULT_INCOMING_STREAM_TIMEOUT = 30000
2932

3033
export class Network {
3134
/**
@@ -37,6 +40,7 @@ export class Network {
3740
* @param {MultihashHasherLoader} [options.hashLoader]
3841
* @param {number} [options.maxInboundStreams=32]
3942
* @param {number} [options.maxOutboundStreams=32]
43+
* @param {number} [options.incomingStreamTimeout=30000]
4044
*/
4145
constructor (libp2p, bitswap, stats, options = {}) {
4246
this._log = logger(libp2p.peerId, 'network')
@@ -60,6 +64,7 @@ export class Network {
6064
this._hashLoader = options.hashLoader
6165
this._maxInboundStreams = options.maxInboundStreams ?? DEFAULT_MAX_INBOUND_STREAMS
6266
this._maxOutboundStreams = options.maxOutboundStreams ?? DEFAULT_MAX_OUTBOUND_STREAMS
67+
this._incomingStreamTimeout = options.incomingStreamTimeout ?? DEFAULT_INCOMING_STREAM_TIMEOUT
6368
}
6469

6570
async start () {
@@ -117,11 +122,13 @@ export class Network {
117122
return
118123
}
119124

125+
const controller = new TimeoutController(this._incomingStreamTimeout)
126+
120127
Promise.resolve().then(async () => {
121128
this._log('incoming new bitswap %s connection from %p', stream.stat.protocol, connection.remotePeer)
122129

123130
await pipe(
124-
stream,
131+
abortableSource(stream.source, controller.signal),
125132
lp.decode(),
126133
async (source) => {
127134
for await (const data of source) {
@@ -138,6 +145,10 @@ export class Network {
138145
})
139146
.catch(err => {
140147
this._log(err)
148+
stream.abort(err)
149+
})
150+
.finally(() => {
151+
controller.clear()
141152
})
142153
}
143154

‎test/network/network.node.js

+38
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { BitswapMessage as Message } from '../../src/message/index.js'
1111
import { Stats } from '../../src/stats/index.js'
1212
import sinon from 'sinon'
1313
import { CID } from 'multiformats/cid'
14+
import delay from 'delay'
1415

1516
/**
1617
* @typedef {import('libp2p').Libp2p} Libp2p
@@ -338,4 +339,41 @@ describe('network', () => {
338339
expect(mockDial.calledWith(provider1.id)).to.be.true()
339340
expect(mockDial.calledWith(provider2.id)).to.be.true()
340341
})
342+
343+
it('times out slow senders', async () => {
344+
const deferred = pDefer()
345+
346+
const libp2p = {
347+
handle: sinon.stub(),
348+
registrar: {
349+
register: sinon.stub()
350+
},
351+
getConnections: () => []
352+
}
353+
354+
// @ts-expect-error not a complete libp2p implementation
355+
const network = new Network(libp2p, {}, {}, {
356+
incomingStreamTimeout: 1
357+
})
358+
await network.start()
359+
360+
const stream = {
361+
source: (async function * () {
362+
await delay(100)
363+
yield 'hello'
364+
}()),
365+
abort: (/** @type {Error} **/ err) => {
366+
deferred.resolve(err)
367+
},
368+
stat: {
369+
protocol: 'hello'
370+
}
371+
}
372+
373+
const handler = libp2p.handle.getCall(0).args[1]
374+
handler({ stream, connection: {} })
375+
376+
const err = await deferred.promise
377+
expect(err).to.have.property('code', 'ABORT_ERR')
378+
})
341379
})

0 commit comments

Comments
 (0)
Please sign in to comment.