Skip to content

Commit 072ca61

Browse files
committedJun 12, 2023
Improve stream example
1 parent 4993dee commit 072ca61

File tree

5 files changed

+43
-21
lines changed

5 files changed

+43
-21
lines changed
 

‎CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Change log for amqplib
22

3+
## Unreleased
4+
5+
- Improve stream example as per https://github.com/amqp-node/amqplib/issues/722
6+
37
## Changes in v0.10.3
48

59
git log v0.10.2..v0.10.3

‎examples/stream_queues/.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package-lock.json

‎examples/stream_queues/package.json

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"name": "stream_queues",
3+
"version": "1.0.0",
4+
"description": "An example demonstrating use of stream queues",
5+
"main": "n",
6+
"dependencies": {
7+
"amqplib": "^0.10.3"
8+
},
9+
"devDependencies": {},
10+
"scripts": {
11+
"test": "echo \"Error: no test specified\" && exit 1"
12+
},
13+
"license": "ISC"
14+
}

‎examples/stream_queues/receive_stream.js

+18-12
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1-
#!/usr/bin/env node
21
const amqp = require('amqplib');
32

4-
5-
async function receiveStream() {
3+
(async () => {
64
try {
75
const connection = await amqp.connect('amqp://localhost');
86
process.once('SIGINT', connection.close);
@@ -30,18 +28,26 @@ async function receiveStream() {
3028
}, {
3129
noAck: false,
3230
arguments: {
33-
'x-stream-offset': 'first' // Here you can specify the offset: : first, last, next, and timestamp
34-
// with first start consuming always from the beginning
31+
/*
32+
Here you can specify the offset: : first, last, next, offset and timestamp, i.e.
33+
34+
'x-stream-offset': 'first'
35+
'x-stream-offset': 'last'
36+
'x-stream-offset': 'next'
37+
'x-stream-offset': 5
38+
'x-stream-offset': { '!': 'timestamp', value: 1686519750 }
39+
40+
The timestamp must be the desired number of seconds since 00:00:00 UTC, 1970-01-01
41+
42+
*/
43+
'x-stream-offset': 'first'
3544
}
3645
});
3746

3847
console.log(' [*] Waiting for messages. To exit press CTRL+C');
3948
}
4049
// Catch and display any errors in the console
41-
catch(e) { console.log(e) }
42-
}
43-
44-
45-
module.exports = {
46-
receiveStream
47-
}
50+
catch(e) {
51+
console.log(e)
52+
}
53+
})();

‎examples/stream_queues/send_stream.js

+6-9
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
1-
#!/usr/bin/env node
21
const amqp = require('amqplib');
32

43

5-
async function sendStream () {
4+
(async () => {
65
try {
76
const connection = await amqp.connect('amqp://localhost');
87
process.once('SIGINT', connection.close);
98

109
const channel = await connection.createChannel();
1110
const queue = 'my_first_stream';
12-
const msg = 'Hello World!';
11+
const msg = `Hello World! ${Date.now()}`;
1312

1413
// Define the queue stream
1514
// Mandatory: exclusive: false, durable: true autoDelete: false
@@ -32,10 +31,8 @@ async function sendStream () {
3231
connection.close();
3332
}
3433
// Catch and display any errors in the console
35-
catch(e) { console.log(e) }
36-
}
37-
34+
catch(e) {
35+
console.log(e)
36+
}
37+
})();
3838

39-
module.exports = {
40-
sendStream
41-
}

0 commit comments

Comments
 (0)
Please sign in to comment.