Skip to content

Commit d20b42f

Browse files
author
Tuyen
committedMar 16, 2020
Provide a way to override message id function
1 parent 97515f7 commit d20b42f

File tree

4 files changed

+39
-9
lines changed

4 files changed

+39
-9
lines changed
 

‎src/index.js

+11-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class GossipSub extends BasicPubsub {
2222
* @param {bool} [options.emitSelf] if publish should emit to self, if subscribed, defaults to false
2323
* @param {bool} [options.gossipIncoming] if incoming messages on a subscribed topic should be automatically gossiped, defaults to true
2424
* @param {bool} [options.fallbackToFloodsub] if dial should fallback to floodsub, defaults to true
25+
* @param {function} [options.msgIdFn] override the default message id function
2526
* @constructor
2627
*/
2728
constructor (peerInfo, registrar, options = {}) {
@@ -72,11 +73,16 @@ class GossipSub extends BasicPubsub {
7273
*/
7374
this.control = new Map()
7475

76+
/**
77+
* Use the overriden mesgIdFn or the default one.
78+
*/
79+
this._msgIdFn = options.msgIdFn || this.defaultMsgIdFn
80+
7581
/**
7682
* A message cache that contains the messages for last few hearbeat ticks
7783
*
7884
*/
79-
this.messageCache = new MessageCache(constants.GossipSubHistoryGossip, constants.GossipSubHistoryLength)
85+
this.messageCache = new MessageCache(constants.GossipSubHistoryGossip, constants.GossipSubHistoryLength, this._msgIdFn)
8086

8187
/**
8288
* A heartbeat timer that maintains the mesh
@@ -374,6 +380,10 @@ class GossipSub extends BasicPubsub {
374380
})
375381
}
376382

383+
getMsgId (msg) {
384+
return this._msgIdFn(msg)
385+
}
386+
377387
_publish (messages) {
378388
messages.forEach((msgObj) => {
379389
this.messageCache.put(msgObj)

‎src/messageCache.js

+8-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
'use strict'
22

3-
const { utils } = require('libp2p-pubsub')
4-
53
class CacheEntry {
64
/**
75
* @param {String} msgID
@@ -19,10 +17,11 @@ class MessageCache {
1917
/**
2018
* @param {Number} gossip
2119
* @param {Number} history
20+
* @param {msgIdFn} msgIdFn a function that returns message id from a message
2221
*
2322
* @constructor
2423
*/
25-
constructor (gossip, history) {
24+
constructor (gossip, history, msgIdFn) {
2625
/**
2726
* @type {Map<string, RPC.Message>}
2827
*/
@@ -40,6 +39,11 @@ class MessageCache {
4039
* @type {Number}
4140
*/
4241
this.gossip = gossip
42+
43+
/**
44+
* @type {Function}
45+
*/
46+
this.msgIdFn = msgIdFn
4347
}
4448

4549
/**
@@ -49,7 +53,7 @@ class MessageCache {
4953
* @returns {void}
5054
*/
5155
put (msg) {
52-
const msgID = utils.msgId(msg.from, msg.seqno)
56+
const msgID = this.msgIdFn(msg)
5357
this.msgs.set(msgID, msg)
5458
this.history[0].push(new CacheEntry(msgID, msg.topicIDs))
5559
}

‎src/pubsub.js

+19-3
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ class BasicPubSub extends Pubsub {
7171
this._options = _options
7272

7373
this._onRpc = this._onRpc.bind(this)
74+
75+
/**
76+
* The default msgID implementation
77+
* @param {rpc.RPC.Message} msg the message object
78+
* @returns {string} message id as string
79+
*/
80+
this.defaultMsgIdFn = (msg) => utils.msgId(msg.from, msg.seqno)
7481
}
7582

7683
/**
@@ -166,14 +173,14 @@ class BasicPubSub extends Pubsub {
166173
if (msgs.length) {
167174
msgs.forEach(async message => {
168175
const msg = utils.normalizeInRpcMessage(message)
169-
const seqno = utils.msgId(msg.from, msg.seqno)
176+
const msgID = this.getMsgId(msg)
170177

171178
// Ignore if we've already seen the message
172-
if (this.seenCache.has(seqno)) {
179+
if (this.seenCache.has(msgID)) {
173180
return
174181
}
175182

176-
this.seenCache.put(seqno)
183+
this.seenCache.put(msgID)
177184

178185
// Ensure the message is valid before processing it
179186
let isValid
@@ -399,6 +406,15 @@ class BasicPubSub extends Pubsub {
399406
return Array.from(this.subscriptions)
400407
}
401408

409+
/**
410+
* Child class can override this.
411+
* @param {rpc.RPC.Message} msg the message object
412+
* @returns {string} message id as string
413+
*/
414+
getMsgId (msg) {
415+
return this.defaultMsgIdFn(msg)
416+
}
417+
402418
_emitMessages (topics, messages) {
403419
topics.forEach((topic) => {
404420
if (!this.subscriptions.has(topic)) {

‎test/messageCache.spec.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const getMsgID = (msg) => {
1818
}
1919

2020
describe('Testing Message Cache Operations', () => {
21-
const messageCache = new MessageCache(3, 5)
21+
const messageCache = new MessageCache(3, 5, getMsgID)
2222
const testMessages = []
2323

2424
before(() => {

0 commit comments

Comments
 (0)
Please sign in to comment.