Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: moscajs/aedes-persistence-mongodb
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: b09f8bc2d85036fc1dc450fa76f5a0db75328a07
Choose a base ref
...
head repository: moscajs/aedes-persistence-mongodb
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 610878b3c34f655fa28f9d54e91ccf7b734fef9c
Choose a head ref
  • 1 commit
  • 4 files changed
  • 1 contributor

Commits on Apr 7, 2022

  1. Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    610878b View commit details
Showing with 44 additions and 30 deletions.
  1. +1 −1 .github/workflows/ci.yml
  2. +2 −3 package.json
  3. +14 −9 persistence.js
  4. +27 −17 test.js
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [10.x, 12.x, 13.x]
node-version: [12.x, 14.x, 16.x]

steps:
- uses: actions/checkout@v1
5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -54,8 +54,7 @@
"concat-stream": "^2.0.0",
"faucet": "0.0.1",
"license-checker": "^25.0.1",
"mongo-clean": "^2.0.0",
"mqemitter-mongodb": "^7.0.1",
"mqemitter-mongodb": "^8.1.0",
"nyc": "^15.0.1",
"pre-commit": "^1.2.2",
"release-it": "^14.0.3",
@@ -67,7 +66,7 @@
"aedes-cached-persistence": "^8.1.1",
"escape-string-regexp": "^4.0.0",
"fastparallel": "^2.3.0",
"mongodb": "^3.6.0",
"mongodb": "^4.5.0",
"native-url": "^0.3.1",
"pump": "^3.0.0",
"qlobber": "^5.0.0",
23 changes: 14 additions & 9 deletions persistence.js
Original file line number Diff line number Diff line change
@@ -17,6 +17,10 @@ const qlobberOpts = {
match_empty_levels: true
}

function toStream (op) {
return op.stream ? op.stream() : op
}

function MongoPersistence (opts) {
if (!(this instanceof MongoPersistence)) {
return new MongoPersistence(opts)
@@ -102,9 +106,9 @@ MongoPersistence.prototype._setup = function () {

function initCollections () {
function finishInit () {
subscriptions.find({
toStream(subscriptions.find({
qos: { $gte: 0 }
}).on('data', function (chunk) {
})).on('data', function (chunk) {
that._trie.add(chunk.topic, chunk)
}).on('end', function () {
that.emit('ready')
@@ -312,9 +316,9 @@ MongoPersistence.prototype.createRetainedStreamCombi = function (patterns) {
regex = regex.join('|')

return pump(
this._cl.retained.find({
toStream(this._cl.retained.find({
topic: new RegExp(regex)
}),
})),
instance
)
}
@@ -357,6 +361,7 @@ MongoPersistence.prototype.addSubscriptions = function (client, subs, cb) {
function finish (err) {
errored = err
published++
console.log('published', published)
if (published === 2) {
cb(errored, client)
}
@@ -428,11 +433,11 @@ MongoPersistence.prototype.subscriptionsByClient = function (client, cb) {
MongoPersistence.prototype.countOffline = function (cb) {
var clientsCount = 0
var that = this
this._cl.subscriptions.aggregate([{
toStream(this._cl.subscriptions.aggregate([{
$group: {
_id: '$clientId'
}
}]).on('data', function () {
}])).on('data', function () {
clientsCount++
}).on('end', function () {
cb(null, that._trie.subscriptionsCount, clientsCount)
@@ -506,7 +511,7 @@ function asPacket (obj, enc, cb) {

MongoPersistence.prototype.outgoingStream = function (client) {
return pump(
this._cl.outgoing.find({ clientId: client.id }),
toStream(this._cl.outgoing.find({ clientId: client.id })),
through.obj(asPacket))
}

@@ -697,7 +702,7 @@ MongoPersistence.prototype.streamWill = function (brokers) {
if (brokers) {
query['packet.brokerId'] = { $nin: Object.keys(brokers) }
}
return pump(this._cl.will.find(query), through.obj(asPacket))
return pump(toStream(this._cl.will.find(query)), through.obj(asPacket))
}

MongoPersistence.prototype.getClientList = function (topic) {
@@ -707,7 +712,7 @@ MongoPersistence.prototype.getClientList = function (topic) {
query.topic = topic
}

return pump(this._cl.subscriptions.find(query), through.obj(function asPacket (obj, enc, cb) {
return pump(toStream(this._cl.subscriptions.find(query)), through.obj(function asPacket (obj, enc, cb) {
this.push(obj.clientId)
cb()
}))
44 changes: 27 additions & 17 deletions test.js
Original file line number Diff line number Diff line change
@@ -5,12 +5,9 @@ var persistence = require('./')
var MongoClient = require('mongodb').MongoClient
var abs = require('aedes-cached-persistence/abstract')
var mqemitterMongo = require('mqemitter-mongodb')
var clean = require('mongo-clean')
var dbname = 'aedes-test'
var mongourl = 'mongodb://127.0.0.1/' + dbname
var cleanopts = {
action: 'deleteMany'
}
let clean = null

MongoClient.connect(mongourl, { useNewUrlParser: true, useUnifiedTopology: true, w: 1 }, function (err, client) {
if (err) {
@@ -19,13 +16,26 @@ MongoClient.connect(mongourl, { useNewUrlParser: true, useUnifiedTopology: true,

var db = client.db(dbname)

const collections = [
db.collection('subscriptions'),
db.collection('retained'),
db.collection('will'),
db.collection('outgoing'),
db.collection('incoming')
]

clean = async (cb) => {
await Promise.all(collections.map((c) => c.deleteMany({})))
cb()
}

// set ttl task to run every 2 seconds
db.executeDbAdminCommand({ setParameter: 1, ttlMonitorSleepSecs: 2 }, function (err) {
db.admin().command({ setParameter: 1, ttlMonitorSleepSecs: 2 }, function (err) {
if (err) {
throw err
}

clean(db, cleanopts, function (err, db) {
clean(function (err) {
if (err) {
throw err
}
@@ -53,7 +63,7 @@ function runTest (client, db) {
return emitter
},
persistence: function build (cb) {
clean(db, cleanopts, function (err) {
clean(function (err) {
if (err) {
return cb(err)
}
@@ -88,7 +98,7 @@ function runTest (client, db) {
test('multiple persistences', function (t) {
t.plan(12)

clean(db, cleanopts, function (err) {
clean(function (err) {
t.error(err)

var emitter = mqemitterMongo(dbopts)
@@ -167,7 +177,7 @@ function runTest (client, db) {
test('multiple persistences with passed db object and url', function (t) {
t.plan(12)

clean(db, cleanopts, function (err) {
clean(function (err) {
t.error(err)

var emitter = mqemitterMongo(dboptsWithDbObjectAndUrl)
@@ -245,7 +255,7 @@ function runTest (client, db) {
test('multiple persistences with passed only db object', function (t) {
t.plan(12)

clean(db, cleanopts, function (err) {
clean(function (err) {
t.error(err)

var emitter = mqemitterMongo(dboptsWithOnlyDbObject)
@@ -319,7 +329,7 @@ function runTest (client, db) {
test('qos 0 subs restoration', function (t) {
t.plan(10)

clean(db, cleanopts, function (err) {
clean(function (err) {
t.error(err)

var emitter = mqemitterMongo(dbopts)
@@ -365,7 +375,7 @@ function runTest (client, db) {
})

test('look up for expire after seconds index', function (t) {
clean(db, cleanopts, function (err) {
clean(function (err) {
t.error(err)

dbopts.ttl = {
@@ -416,7 +426,7 @@ function runTest (client, db) {
})

test('look up for query indexes', function (t) {
clean(db, cleanopts, function (err) {
clean(function (err) {
t.error(err)

dbopts.ttl = {
@@ -471,7 +481,7 @@ function runTest (client, db) {
})

test('look up for packet with added property', function (t) {
clean(db, cleanopts, function (err) {
clean(function (err) {
t.error(err)

dbopts.ttl = {
@@ -541,7 +551,7 @@ function runTest (client, db) {
})
}

clean(db, cleanopts, function (err) {
clean(function (err) {
t.notOk(err, 'no error')

dbopts.ttl = {
@@ -591,7 +601,7 @@ function runTest (client, db) {
test('look up for expired packets', function (t) {
t.plan(17)

clean(db, cleanopts, function (err) {
clean(function (err) {
t.error(err)

dbopts.ttl = {
@@ -669,7 +679,7 @@ function runTest (client, db) {
var dboptsWithUrlMongoOptions = {
url: mongourl,
mongoOptions: {
appname: 'TEST'
raw: true // must be a valid mongo option
}
}