Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
feat: pubsub over gRPC (#3813)
Browse files Browse the repository at this point in the history
* feat: pubsub over gRPC

Browsers can only have six concurrently open connections to a host name.

Pubsub works over HTTP by holding a connection open per subscription, which
means you can only subscribe six times before things start to hang.

gRPC runs over websockets so doesn't have this limitation.  This PR adds
pubsub support to the gRPC server and `ipfs-client` module so you can subscribe
to lots and lots of channels concurrently, working around the browser connection
limitation.

Refs: #3741
  • Loading branch information
achingbrain committed Aug 16, 2021
1 parent 5ab3ced commit e7d5509
Show file tree
Hide file tree
Showing 18 changed files with 440 additions and 25 deletions.
1 change: 1 addition & 0 deletions packages/interface-ipfs-core/package.json
Expand Up @@ -77,6 +77,7 @@
"pako": "^1.0.2",
"peer-id": "^0.15.1",
"readable-stream": "^3.4.0",
"sinon": "^11.1.1",
"uint8arrays": "^2.1.6"
},
"contributors": [
Expand Down
6 changes: 3 additions & 3 deletions packages/interface-ipfs-core/src/pubsub/peers.js
Expand Up @@ -37,10 +37,10 @@ module.exports = (factory, options) => {
let ipfs3Id

before(async () => {
ipfs1 = (await factory.spawn({ type: 'proc', ipfsOptions })).api
ipfs1 = (await factory.spawn({ ipfsOptions })).api
// webworkers are not dialable because webrtc is not available
ipfs2 = (await factory.spawn({ type: isWebWorker ? 'go' : undefined })).api
ipfs3 = (await factory.spawn({ type: isWebWorker ? 'go' : undefined })).api
ipfs2 = (await factory.spawn({ type: isWebWorker ? 'js' : undefined, ipfsOptions })).api
ipfs3 = (await factory.spawn({ type: isWebWorker ? 'js' : undefined, ipfsOptions })).api

ipfs2Id = await ipfs2.id()
ipfs3Id = await ipfs3.id()
Expand Down
141 changes: 137 additions & 4 deletions packages/interface-ipfs-core/src/pubsub/subscribe.js
Expand Up @@ -13,6 +13,7 @@ const { AbortController } = require('native-abort-controller')
const { isWebWorker, isNode } = require('ipfs-utils/src/env')
const getIpfsOptions = require('../utils/ipfs-options-websockets-filter-all')
const first = require('it-first')
const sinon = require('sinon')

/**
* @typedef {import('ipfsd-ctl').Factory} Factory
Expand Down Expand Up @@ -44,12 +45,10 @@ module.exports = (factory, options) => {
let ipfs2Id

before(async () => {
ipfs1 = (await factory.spawn({ type: 'proc', ipfsOptions })).api
// TODO 'multiple connected nodes' tests fails with go in Firefox
// and JS is flaky everywhere
ipfs1 = (await factory.spawn({ ipfsOptions })).api

// webworkers are not dialable because webrtc is not available
ipfs2 = (await factory.spawn({ type: isWebWorker ? 'go' : undefined })).api
ipfs2 = (await factory.spawn({ type: isWebWorker ? 'js' : undefined, ipfsOptions })).api

ipfs1Id = await ipfs1.id()
ipfs2Id = await ipfs2.id()
Expand Down Expand Up @@ -84,6 +83,7 @@ module.exports = (factory, options) => {
await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hi'))

const msg = await first(msgStream)

expect(uint8ArrayToString(msg.data)).to.equal('hi')
expect(msg).to.have.property('seqno')
expect(msg.seqno).to.be.an.instanceof(Uint8Array)
Expand Down Expand Up @@ -410,6 +410,139 @@ module.exports = (factory, options) => {
expect(uint8ArrayToString(msg.data).startsWith(msgBase)).to.be.true()
})
})

it('should receive messages from a different node on lots of topics', async () => {
// @ts-ignore this is mocha
this.timeout(5 * 60 * 1000)

const numTopics = 20
const topics = []
const expectedStrings = []
const msgStreams = []

for (let i = 0; i < numTopics; i++) {
const topic = `pubsub-topic-${Math.random()}`
topics.push(topic)

const msgStream1 = pushable()
const msgStream2 = pushable()

msgStreams.push({
msgStream1,
msgStream2
})

/** @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn} */
const sub1 = msg => {
msgStream1.push(msg)
msgStream1.end()
}
/** @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn} */
const sub2 = msg => {
msgStream2.push(msg)
msgStream2.end()
}

await Promise.all([
ipfs1.pubsub.subscribe(topic, sub1),
ipfs2.pubsub.subscribe(topic, sub2)
])

await waitForPeers(ipfs2, topic, [ipfs1Id.id], 30000)
}

await delay(5000) // gossipsub needs this delay https://github.com/libp2p/go-libp2p-pubsub/issues/331

for (let i = 0; i < numTopics; i++) {
const expectedString = `hello pubsub ${Math.random()}`
expectedStrings.push(expectedString)

await ipfs2.pubsub.publish(topics[i], uint8ArrayFromString(expectedString))
}

for (let i = 0; i < numTopics; i++) {
const [sub1Msg] = await all(msgStreams[i].msgStream1)
expect(uint8ArrayToString(sub1Msg.data)).to.equal(expectedStrings[i])
expect(sub1Msg.from).to.eql(ipfs2Id.id)

const [sub2Msg] = await all(msgStreams[i].msgStream2)
expect(uint8ArrayToString(sub2Msg.data)).to.equal(expectedStrings[i])
expect(sub2Msg.from).to.eql(ipfs2Id.id)
}
})

it('should unsubscribe multiple handlers', async () => {
// @ts-ignore this is mocha
this.timeout(2 * 60 * 1000)

const topic = `topic-${Math.random()}`

const handler1 = sinon.stub()
const handler2 = sinon.stub()

await Promise.all([
ipfs1.pubsub.subscribe(topic, sinon.stub()),
ipfs2.pubsub.subscribe(topic, handler1),
ipfs2.pubsub.subscribe(topic, handler2)
])

await waitForPeers(ipfs1, topic, [ipfs2Id.id], 30000)

expect(handler1).to.have.property('callCount', 0)
expect(handler2).to.have.property('callCount', 0)

await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 1'))

await delay(1000)

expect(handler1).to.have.property('callCount', 1)
expect(handler2).to.have.property('callCount', 1)

await ipfs2.pubsub.unsubscribe(topic)

await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 2'))

await delay(1000)

expect(handler1).to.have.property('callCount', 1)
expect(handler2).to.have.property('callCount', 1)
})

it('should unsubscribe individual handlers', async () => {
// @ts-ignore this is mocha
this.timeout(2 * 60 * 1000)

const topic = `topic-${Math.random()}`

const handler1 = sinon.stub()
const handler2 = sinon.stub()

await Promise.all([
ipfs1.pubsub.subscribe(topic, sinon.stub()),
ipfs2.pubsub.subscribe(topic, handler1),
ipfs2.pubsub.subscribe(topic, handler2)
])

await waitForPeers(ipfs1, topic, [ipfs2Id.id], 30000)

expect(handler1).to.have.property('callCount', 0)
expect(handler2).to.have.property('callCount', 0)

await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 1'))

await delay(1000)

expect(handler1).to.have.property('callCount', 1)
expect(handler2).to.have.property('callCount', 1)

await ipfs2.pubsub.unsubscribe(topic, handler1)
await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 2'))

await delay(1000)

expect(handler1).to.have.property('callCount', 1)
expect(handler2).to.have.property('callCount', 2)
})
})
})
}
1 change: 1 addition & 0 deletions packages/ipfs-grpc-client/package.json
Expand Up @@ -44,6 +44,7 @@
"it-pushable": "^1.4.2",
"multiaddr": "^10.0.0",
"multiformats": "^9.4.1",
"p-defer": "^3.0.0",
"protobufjs": "^6.10.2",
"wherearewe": "1.0.0",
"ws": "^7.3.1"
Expand Down
58 changes: 58 additions & 0 deletions packages/ipfs-grpc-client/src/core-api/pubsub/subscribe.js
@@ -0,0 +1,58 @@
'use strict'

const serverStreamToIterator = require('../../utils/server-stream-to-iterator')
const withTimeoutOption = require('ipfs-core-utils/src/with-timeout-option')
const subscriptions = require('./subscriptions')
const defer = require('p-defer')

/**
* @param {import('@improbable-eng/grpc-web').grpc} grpc
* @param {*} service
* @param {import('../../types').Options} opts
*/
module.exports = function grpcPubsubSubscribe (grpc, service, opts) {
/**
* @type {import('ipfs-core-types/src/pubsub').API["subscribe"]}
*/
async function pubsubSubscribe (topic, handler, options = {}) {
const request = {
topic
}

const deferred = defer()

Promise.resolve().then(async () => {
try {
for await (const result of serverStreamToIterator(grpc, service, request, {
host: opts.url,
debug: Boolean(process.env.DEBUG),
metadata: options,
agent: opts.agent
})) {
if (result.handler) {
const subs = subscriptions.get(topic) || new Map()
subs.set(result.handler, handler)
subscriptions.set(topic, subs)

deferred.resolve()
} else {
handler({
from: result.from,
seqno: result.seqno,
data: result.data,
topicIDs: result.topicIDs
})
}
}
} catch (err) {
if (options && options.onError) {
options.onError(err)
}
}
})

await deferred.promise
}

return withTimeoutOption(pubsubSubscribe)
}
10 changes: 10 additions & 0 deletions packages/ipfs-grpc-client/src/core-api/pubsub/subscriptions.js
@@ -0,0 +1,10 @@
'use strict'

/**
* @typedef {import('ipfs-core-types/src/pubsub').MessageHandlerFn} Subscription
*/

/** @type {Map<string, Map<string, Subscription>>} */
const subs = new Map()

module.exports = subs
56 changes: 56 additions & 0 deletions packages/ipfs-grpc-client/src/core-api/pubsub/unsubscribe.js
@@ -0,0 +1,56 @@
'use strict'

const withTimeoutOption = require('ipfs-core-utils/src/with-timeout-option')
const toHeaders = require('../../utils/to-headers')
const unaryToPromise = require('../../utils/unary-to-promise')
const subscriptions = require('./subscriptions')

/**
* @param {import('@improbable-eng/grpc-web').grpc} grpc
* @param {*} service
* @param {import('../../types').Options} opts
*/
module.exports = function grpcPubsubUnsubscribe (grpc, service, opts) {
/**
* @type {import('ipfs-core-types/src/pubsub').API["unsubscribe"]}
*/
async function pubsubUnsubscribe (topic, handler, options = {}) {
const handlers = []
const subs = subscriptions.get(topic)

if (!subs) {
return
}

if (handler) {
for (const [key, value] of subs.entries()) {
if (value === handler) {
handlers.push(key)
}
}
} else {

}

const request = {
topic,
handlers
}

await unaryToPromise(grpc, service, request, {
host: opts.url,
metadata: toHeaders(options),
agent: opts.agent
})

for (const handlerId of handlers) {
subs.delete(handlerId)
}

if (!subs.size) {
subscriptions.delete(topic)
}
}

return withTimeoutOption(pubsubUnsubscribe)
}
6 changes: 6 additions & 0 deletions packages/ipfs-grpc-client/src/index.js
Expand Up @@ -49,6 +49,12 @@ function create (opts = { url: '' }) {
ls: require('./core-api/files/ls')(grpc, service.MFS.ls, options),
// @ts-ignore - TODO: fix after https://github.com/ipfs/js-ipfs/issues/3594
write: require('./core-api/files/write')(grpc, service.MFS.write, options)
},
pubsub: {
// @ts-ignore - TODO: fix after https://github.com/ipfs/js-ipfs/issues/3594
subscribe: require('./core-api/pubsub/subscribe')(grpc, service.PubSub.subscribe, options),
// @ts-ignore - TODO: fix after https://github.com/ipfs/js-ipfs/issues/3594
unsubscribe: require('./core-api/pubsub/unsubscribe')(grpc, service.PubSub.unsubscribe, options)
}
}

Expand Down
31 changes: 31 additions & 0 deletions packages/ipfs-grpc-protocol/src/pubsub.proto
@@ -0,0 +1,31 @@
syntax = "proto3";

import "common.proto";

package ipfs;

service PubSub {
rpc subscribe (SubscribeRequest) returns (stream SubscribeResponse) {}
rpc unsubscribe (UnSubscribeRequest) returns (UnSubscribeResponse) {}
}

message SubscribeRequest {
string topic = 1;
}

message SubscribeResponse {
string handler = 1;
string from = 2;
bytes seqno = 3;
bytes data = 4;
repeated string topicIDs = 5;
}

message UnSubscribeRequest {
string topic = 1;
repeated string handlers = 2;
}

message UnSubscribeResponse {

}
1 change: 1 addition & 0 deletions packages/ipfs-grpc-server/package.json
Expand Up @@ -42,6 +42,7 @@
"it-pipe": "^1.1.0",
"it-pushable": "^1.4.2",
"multiaddr": "^10.0.0",
"nanoid": "3.1.23",
"protobufjs": "^6.10.2",
"ws": "^7.3.1"
},
Expand Down

0 comments on commit e7d5509

Please sign in to comment.