Skip to content

Commit 9b7af11

Browse files
authoredMar 21, 2020
Merge pull request #67 from ChainSafe/tuyen/message-id-function
Provide a way to override message id function
2 parents 97515f7 + d8fa6aa commit 9b7af11

File tree

5 files changed

+58
-9
lines changed

5 files changed

+58
-9
lines changed
 

‎README.md

+2
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ Options is an optional object with the following key-value pairs:
6767

6868
* **`fallbackToFloodsub`**: boolean identifying whether the node should fallback to the floodsub protocol, if another connecting peer does not support gossipsub (defaults to **true**).
6969
* **`emitSelf`**: boolean identifying whether the node should emit to self on publish, in the event of the topic being subscribed (defaults to **false**).
70+
* **`msgIdFn`**: a function with signature `(message) => string` defining the message id given a message, used internally to deduplicate gossip (defaults to `(message) => message.from + message.seqno.toString('hex')`)
71+
* **`messageCache`**: optional, a customized `MessageCache` instance, see the implementation for the interface.
7072

7173
For the remaining API, see https://github.com/libp2p/js-libp2p-pubsub
7274

‎src/index.js

+19-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ class GossipSub extends BasicPubsub {
2222
* @param {bool} [options.emitSelf] if publish should emit to self, if subscribed, defaults to false
2323
* @param {bool} [options.gossipIncoming] if incoming messages on a subscribed topic should be automatically gossiped, defaults to true
2424
* @param {bool} [options.fallbackToFloodsub] if dial should fallback to floodsub, defaults to true
25+
* @param {function} [options.msgIdFn] override the default message id function
26+
* @param {Object} [options.messageCache] override the default MessageCache
2527
* @constructor
2628
*/
2729
constructor (peerInfo, registrar, options = {}) {
@@ -72,11 +74,16 @@ class GossipSub extends BasicPubsub {
7274
*/
7375
this.control = new Map()
7476

77+
/**
78+
* Use the overriden mesgIdFn or the default one.
79+
*/
80+
this._msgIdFn = options.msgIdFn || this.defaultMsgIdFn
81+
7582
/**
7683
* A message cache that contains the messages for last few hearbeat ticks
7784
*
7885
*/
79-
this.messageCache = new MessageCache(constants.GossipSubHistoryGossip, constants.GossipSubHistoryLength)
86+
this.messageCache = options.messageCache || new MessageCache(constants.GossipSubHistoryGossip, constants.GossipSubHistoryLength, this._msgIdFn)
8087

8188
/**
8289
* A heartbeat timer that maintains the mesh
@@ -374,6 +381,17 @@ class GossipSub extends BasicPubsub {
374381
})
375382
}
376383

384+
/**
385+
* Override the default implementation in BasicPubSub.
386+
* If we don't provide msgIdFn in constructor option, it's the same.
387+
* @override
388+
* @param {rpc.RPC.Message} msg the message object
389+
* @returns {string} message id as string
390+
*/
391+
getMsgId (msg) {
392+
return this._msgIdFn(msg)
393+
}
394+
377395
_publish (messages) {
378396
messages.forEach((msgObj) => {
379397
this.messageCache.put(msgObj)

‎src/messageCache.js

+17-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
'use strict'
22

3-
const { utils } = require('libp2p-pubsub')
4-
53
class CacheEntry {
64
/**
75
* @param {String} msgID
@@ -19,10 +17,11 @@ class MessageCache {
1917
/**
2018
* @param {Number} gossip
2119
* @param {Number} history
20+
* @param {msgIdFn} msgIdFn a function that returns message id from a message
2221
*
2322
* @constructor
2423
*/
25-
constructor (gossip, history) {
24+
constructor (gossip, history, msgIdFn) {
2625
/**
2726
* @type {Map<string, RPC.Message>}
2827
*/
@@ -40,6 +39,11 @@ class MessageCache {
4039
* @type {Number}
4140
*/
4241
this.gossip = gossip
42+
43+
/**
44+
* @type {Function}
45+
*/
46+
this.msgIdFn = msgIdFn
4347
}
4448

4549
/**
@@ -49,11 +53,20 @@ class MessageCache {
4953
* @returns {void}
5054
*/
5155
put (msg) {
52-
const msgID = utils.msgId(msg.from, msg.seqno)
56+
const msgID = this.getMsgId(msg)
5357
this.msgs.set(msgID, msg)
5458
this.history[0].push(new CacheEntry(msgID, msg.topicIDs))
5559
}
5660

61+
/**
62+
* Get message id of message.
63+
* @param {rpc.RPC.Message} msg
64+
* @returns {string}
65+
*/
66+
getMsgId (msg) {
67+
return this.msgIdFn(msg)
68+
}
69+
5770
/**
5871
* Retrieves a message from the cache by its ID, if it is still present
5972
*

‎src/pubsub.js

+19-3
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ class BasicPubSub extends Pubsub {
7171
this._options = _options
7272

7373
this._onRpc = this._onRpc.bind(this)
74+
75+
/**
76+
* The default msgID implementation
77+
* @param {rpc.RPC.Message} msg the message object
78+
* @returns {string} message id as string
79+
*/
80+
this.defaultMsgIdFn = (msg) => utils.msgId(msg.from, msg.seqno)
7481
}
7582

7683
/**
@@ -166,14 +173,14 @@ class BasicPubSub extends Pubsub {
166173
if (msgs.length) {
167174
msgs.forEach(async message => {
168175
const msg = utils.normalizeInRpcMessage(message)
169-
const seqno = utils.msgId(msg.from, msg.seqno)
176+
const msgID = this.getMsgId(msg)
170177

171178
// Ignore if we've already seen the message
172-
if (this.seenCache.has(seqno)) {
179+
if (this.seenCache.has(msgID)) {
173180
return
174181
}
175182

176-
this.seenCache.put(seqno)
183+
this.seenCache.put(msgID)
177184

178185
// Ensure the message is valid before processing it
179186
let isValid
@@ -399,6 +406,15 @@ class BasicPubSub extends Pubsub {
399406
return Array.from(this.subscriptions)
400407
}
401408

409+
/**
410+
* Child class can override this.
411+
* @param {rpc.RPC.Message} msg the message object
412+
* @returns {string} message id as string
413+
*/
414+
getMsgId (msg) {
415+
return this.defaultMsgIdFn(msg)
416+
}
417+
402418
_emitMessages (topics, messages) {
403419
topics.forEach((topic) => {
404420
if (!this.subscriptions.has(topic)) {

‎test/messageCache.spec.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const getMsgID = (msg) => {
1818
}
1919

2020
describe('Testing Message Cache Operations', () => {
21-
const messageCache = new MessageCache(3, 5)
21+
const messageCache = new MessageCache(3, 5, getMsgID)
2222
const testMessages = []
2323

2424
before(() => {

0 commit comments

Comments
 (0)
Please sign in to comment.