Skip to content

Commit 8454924

Browse files
authoredDec 10, 2022
Merge pull request #710 from amqp-node/examples
Examples
2 parents b98228f + cbe2f29 commit 8454924

29 files changed

+786
-661
lines changed
 

‎examples/direct_reply_to_client.js

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#!/usr/bin/env node
2+
3+
const amqp = require('../');
4+
5+
const queue = 'rpc_queue';
6+
7+
(async () => {
8+
const connection = await amqp.connect();
9+
const channel = await connection.createChannel();
10+
11+
await channel.consume('amq.rabbitmq.reply-to', async (message) => {
12+
console.log(message.content.toString());
13+
await channel.close();
14+
await connection.close();
15+
}, { noAck: true });
16+
17+
await channel.assertQueue(queue, { durable: false });
18+
19+
channel.sendToQueue(queue, Buffer.from(' [X] ping'), {
20+
replyTo: 'amq.rabbitmq.reply-to',
21+
});
22+
})();

‎examples/direct_reply_to_server.js

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#!/usr/bin/env node
2+
3+
const amqp = require('../');
4+
const { v4: uuid } = require('uuid');
5+
6+
const queue = 'rpc_queue';
7+
8+
(async () => {
9+
const connection = await amqp.connect();
10+
const channel = await connection.createChannel();
11+
12+
process.once('SIGINT', async () => {
13+
await channel.close();
14+
await connection.close();
15+
});
16+
17+
await channel.assertQueue(queue, { durable: false });
18+
await channel.consume(queue, (message) => {
19+
console.log(message.content.toString());
20+
channel.sendToQueue(message.properties.replyTo, Buffer.from(' [.] pong'));
21+
}, { noAck: true });
22+
23+
console.log(' [x] To exit press CTRL+C.');
24+
25+
})();

‎examples/headers.js

+29-31
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,40 @@
11
#!/usr/bin/env node
22

3-
// Example of using a headers exchange
4-
5-
var amqp = require('../')
6-
7-
amqp.connect().then(function(conn) {
8-
return conn.createChannel().then(withChannel);
9-
}, console.error);
10-
11-
function withChannel(ch) {
12-
// NB the type of the exchange is 'headers'
13-
ch.assertExchange('matching exchange', 'headers').then(function(ex) {
14-
ch.assertQueue().then(function(q) {
15-
bindAndConsume(ch, ex, q).then(function() {
16-
send(ch, ex);
17-
});
18-
});
3+
const amqp = require('../');
4+
5+
(async () => {
6+
7+
const connection = await amqp.connect();
8+
const channel = await connection.createChannel();
9+
10+
process.once('SIGINT', async () => {
11+
await channel.close();
12+
await connection.close();
1913
});
20-
}
2114

22-
function bindAndConsume(ch, ex, q) {
15+
const { exchange } = await channel.assertExchange('matching exchange', 'headers');
16+
const { queue } = await channel.assertQueue();
17+
2318
// When using a headers exchange, the headers to be matched go in
2419
// the binding arguments. The routing key is ignore, so best left
2520
// empty.
2621

2722
// 'x-match' is 'all' or 'any', meaning "all fields must match" or
2823
// "at least one field must match", respectively. The values to be
2924
// matched go in subsequent fields.
30-
ch.bindQueue(q.queue, ex.exchange, '', {'x-match': 'any',
31-
'foo': 'bar',
32-
'baz': 'boo'});
33-
return ch.consume(q.queue, function(msg) {
34-
console.log(msg.content.toString());
35-
}, {noAck: true});
36-
}
37-
38-
function send(ch, ex) {
39-
// The headers for a message are given as an option to `publish`:
40-
ch.publish(ex.exchange, '', Buffer.from('hello'), {headers: {baz: 'boo'}});
41-
ch.publish(ex.exchange, '', Buffer.from('world'), {headers: {foo: 'bar'}});
42-
}
25+
await channel.bindQueue(queue, exchange, '', {
26+
'x-match': 'any',
27+
'foo': 'bar',
28+
'baz': 'boo'
29+
});
30+
31+
await channel.consume(queue, (message) => {
32+
console.log(message.content.toString());
33+
}, { noAck: true });
34+
35+
channel.publish(exchange, '', Buffer.from('hello'), { headers: { baz: 'boo' }});
36+
channel.publish(exchange, '', Buffer.from('hello'), { headers: { foo: 'bar' }});
37+
channel.publish(exchange, '', Buffer.from('lost'), { headers: { meh: 'nah' }});
38+
39+
console.log(' [x] To exit press CTRL+C.');
40+
})();

‎examples/ssl.js

+16-10
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
//
1919
// openssl s_client -connect localhost:5671
2020

21-
var amqp = require('../');
22-
var fs = require('fs');
21+
const amqp = require('../');
22+
const fs = require('fs');
2323

2424
// Assemble the SSL options; for verification we need at least
2525
// * a certificate to present to the server ('cert', in PEM format)
@@ -34,7 +34,7 @@ var fs = require('fs');
3434
// to use `rejectUnauthorized: false`.
3535

3636
// Options for full client and server verification:
37-
var opts = {
37+
const opts = {
3838
cert: fs.readFileSync('../etc/client/cert.pem'),
3939
key: fs.readFileSync('../etc/client/key.pem'),
4040
// cert and key or
@@ -49,16 +49,22 @@ var opts = {
4949
// {verify, verify_none},
5050
// {fail_if_no_peer_cert,false}
5151
//
52-
// var opts = { ca: [fs.readFileSync('../etc/testca/cacert.pem')] };
52+
// const opts = { ca: [fs.readFileSync('../etc/testca/cacert.pem')] };
5353

5454
// Option to use the SSL client certificate for authentication
5555
// opts.credentials = amqp.credentials.external();
5656

57-
var open = amqp.connect('amqps://localhost', opts);
57+
(async () => {
58+
const connection = await amqp.connect('amqp://localhost', opts);
59+
const channel = await connection.createChannel();
5860

59-
open.then(function(conn) {
60-
process.on('SIGINT', conn.close.bind(conn));
61-
return conn.createChannel().then(function(ch) {
62-
ch.sendToQueue('foo', Buffer.from('Hello World!'));
61+
process.on('SIGINT', async () => {
62+
await channel.close();
63+
await connection.close();
6364
});
64-
}).then(null, console.warn);
65+
66+
channel.sendToQueue('foo', Buffer.from('Hello World!'));
67+
68+
console.log(' [x] To exit press CTRL+C.');
69+
})();
70+
+22-22
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,28 @@
11
#!/usr/bin/env node
22

3-
var amqp = require('amqplib/callback_api');
3+
const amqp = require('amqplib/callback_api');
44

5-
function bail(err, conn) {
6-
console.error(err);
7-
if (conn) conn.close(function() { process.exit(1); });
8-
}
9-
10-
function on_connect(err, conn) {
11-
if (err !== null) return bail(err);
5+
const exchange = 'logs';
6+
const text = process.argv.slice(2).join(' ') || 'info: Hello World!';
127

13-
var ex = 'logs';
8+
amqp.connect((err, connection) => {
9+
if (err) return bail(err);
10+
connection.createChannel((err, channel) => {
11+
if (err) return bail(err, connection);
12+
channel.assertExchange(exchange, 'fanout', { durable: false }, (err) => {
13+
if (err) return bail(err, connection);
14+
channel.publish(exchange, '', Buffer.from(text));
15+
console.log(" [x] Sent '%s'", text);
16+
channel.close(() => {
17+
connection.close();
18+
});
19+
});
20+
});
21+
});
1422

15-
function on_channel_open(err, ch) {
16-
if (err !== null) return bail(err, conn);
17-
ch.assertExchange(ex, 'fanout', {durable: false});
18-
var msg = process.argv.slice(2).join(' ') ||
19-
'info: Hello World!';
20-
ch.publish(ex, '', Buffer.from(msg));
21-
console.log(" [x] Sent '%s'", msg);
22-
ch.close(function() { conn.close(); });
23-
}
24-
25-
conn.createChannel(on_channel_open);
23+
function bail(err, connection) {
24+
console.error(err);
25+
if (connection) connection.close(() => {
26+
process.exit(1);
27+
});
2628
}
27-
28-
amqp.connect(on_connect);
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,30 @@
11
#!/usr/bin/env node
22

3-
var amqp = require('amqplib/callback_api');
3+
const amqp = require('amqplib/callback_api');
44

5-
var args = process.argv.slice(2);
6-
var severity = (args.length > 0) ? args[0] : 'info';
7-
var message = args.slice(1).join(' ') || 'Hello World!';
5+
const exchange = 'direct_logs';
6+
const args = process.argv.slice(2);
7+
const routingKey = (args.length > 0) ? args[0] : 'info';
8+
const text = args.slice(1).join(' ') || 'Hello World!';
89

9-
function bail(err, conn) {
10-
console.error(err);
11-
if (conn) conn.close(function() { process.exit(1); });
12-
}
13-
14-
function on_connect(err, conn) {
15-
if (err !== null) return bail(err);
16-
17-
var ex = 'direct_logs';
18-
var exopts = {durable: false};
19-
20-
function on_channel_open(err, ch) {
21-
if (err !== null) return bail(err, conn);
22-
ch.assertExchange(ex, 'direct', exopts, function(err, ok) {
23-
ch.publish(ex, severity, Buffer.from(message));
24-
ch.close(function() { conn.close(); });
10+
amqp.connect((err, connection) => {
11+
if (err) return bail(err);
12+
connection.createChannel((err, channel) => {
13+
if (err) return bail(err, connection);
14+
channel.assertExchange(exchange, 'direct', { durable: false }, (err) => {
15+
if (err) return bail(err, connection);
16+
channel.publish(exchange, routingKey, Buffer.from(text));
17+
console.log(" [x] Sent '%s'", text);
18+
channel.close(() => {
19+
connection.close();
20+
});
2521
});
26-
}
27-
conn.createChannel(on_channel_open);
28-
}
22+
});
23+
});
2924

30-
amqp.connect(on_connect);
25+
function bail(err, connection) {
26+
console.error(err);
27+
if (connection) connection.close(() => {
28+
process.exit(1);
29+
});
30+
}
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,30 @@
11
#!/usr/bin/env node
22

3-
var amqp = require('amqplib/callback_api');
3+
const amqp = require('amqplib/callback_api');
44

5-
var args = process.argv.slice(2);
6-
var key = (args.length > 0) ? args[0] : 'info';
7-
var message = args.slice(1).join(' ') || 'Hello World!';
5+
const exchange = 'topic_logs';
6+
const args = process.argv.slice(2);
7+
const routingKey = (args.length > 0) ? args[0] : 'info';
8+
const text = args.slice(1).join(' ') || 'Hello World!';
89

9-
function bail(err, conn) {
10-
console.error(err);
11-
if (conn) conn.close(function() { process.exit(1); });
12-
}
13-
14-
function on_connect(err, conn) {
15-
if (err !== null) return bail(err);
16-
var ex = 'topic_logs', exopts = {durable: false};
17-
conn.createChannel(function(err, ch) {
18-
ch.assertExchange(ex, 'topic', exopts, function(err, ok) {
19-
if (err !== null) return bail(err, conn);
20-
ch.publish(ex, key, Buffer.from(message));
21-
console.log(" [x] Sent %s:'%s'", key, message);
22-
ch.close(function() { conn.close(); });
10+
amqp.connect((err, connection) => {
11+
if (err) return bail(err);
12+
connection.createChannel((err, channel) => {
13+
if (err) return bail(err, connection);
14+
channel.assertExchange(exchange, 'topic', { durable: false }, (err) => {
15+
if (err) return bail(err, connection);
16+
channel.publish(exchange, routingKey, Buffer.from(text));
17+
console.log(" [x] Sent '%s'", text);
18+
channel.close(() => {
19+
connection.close();
20+
});
2321
});
2422
});
25-
}
23+
});
2624

27-
amqp.connect(on_connect);
25+
function bail(err, connection) {
26+
console.error(err);
27+
if (connection) connection.close(() => {
28+
process.exit(1);
29+
});
30+
}
+21-20
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,28 @@
11
#!/usr/bin/env node
22

3-
var amqp = require('amqplib/callback_api');
3+
const amqp = require('amqplib/callback_api');
44

5-
function bail(err, conn) {
6-
console.error(err);
7-
if (conn) conn.close(function() { process.exit(1); });
8-
}
5+
const queue = 'task_queue';
6+
const text = process.argv.slice(2).join(' ') || "Hello World!";
97

10-
function on_connect(err, conn) {
11-
if (err !== null) return bail(err);
12-
13-
var q = 'task_queue';
14-
15-
conn.createChannel(function(err, ch) {
16-
if (err !== null) return bail(err, conn);
17-
ch.assertQueue(q, {durable: true}, function(err, _ok) {
18-
if (err !== null) return bail(err, conn);
19-
var msg = process.argv.slice(2).join(' ') || "Hello World!";
20-
ch.sendToQueue(q, Buffer.from(msg), {persistent: true});
21-
console.log(" [x] Sent '%s'", msg);
22-
ch.close(function() { conn.close(); });
8+
amqp.connect((err, connection) => {
9+
if (err) return bail(err);
10+
connection.createChannel((err, channel) => {
11+
if (err) return bail(err, connection);
12+
channel.assertQueue(queue, { durable: true }, (err) => {
13+
if (err) return bails(err, connection);
14+
channel.sendToQueue(queue, Buffer.from(text), { persistent: true });
15+
console.log(" [x] Sent '%s'", text);
16+
channel.close(() => {
17+
connection.close();
18+
});
2319
});
2420
});
25-
}
21+
});
2622

27-
amqp.connect(on_connect);
23+
function bail(err, connection) {
24+
console.error(err);
25+
if (connection) connection.close(() => {
26+
process.exit(1);
27+
});
28+
}

0 commit comments

Comments
 (0)
Please sign in to comment.