Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit c9bede5

Browse files
Alan Shawvasco-santos
Alan Shaw
authored andcommittedSep 18, 2019
refactor: async iterators (#94)
BREAKING CHANGE: All places in the API that used callbacks are now replaced with async/await while pull-streams are replaced with async iterators. The API has also been updated according to the latest `interface-stream-muxer` version, https://github.com/libp2p/interface-stream-muxer/tree/v0.7.0. License: MIT Signed-off-by: Alan Shaw <alan.shaw@protocol.ai>
1 parent 567bddf commit c9bede5

28 files changed

+1384
-1627
lines changed
 

‎.aegir.js

-35
This file was deleted.

‎.gitignore

+3-9
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,7 @@
1+
node_modules
2+
coverage
3+
.nyc_output
14
package-lock.json
25
yarn.lock
36
docs
4-
5-
**/node_modules
6-
**/*.log
7-
test/setup/tmp-disposable-nodes-addrs.json
87
dist
9-
coverage
10-
**/*.swp
11-
examples/sub-module/**/bundle.js
12-
examples/sub-module/**/*-minified.js
13-
examples/sub-module/*-bundle.js

‎README.md

+116-71
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
js-libp2p-mplex
2-
===================
1+
# js-libp2p-mplex
32

43
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://protocol.ai)
54
[![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/)
@@ -12,100 +11,146 @@ js-libp2p-mplex
1211
![](https://img.shields.io/badge/npm-%3E%3D6.0.0-orange.svg?style=flat-square)
1312
![](https://img.shields.io/badge/Node.js-%3E%3D10.0.0-orange.svg?style=flat-square)
1413

15-
> JavaScript implementation of https://github.com/libp2p/mplex
14+
> JavaScript implementation of [mplex](https://github.com/libp2p/specs/tree/master/mplex).
1615
1716
[![](https://github.com/libp2p/interface-stream-muxer/raw/master/img/badge.png)](https://github.com/libp2p/interface-stream-muxer)
1817

1918
## Lead Maintainer
2019

21-
[Vasco Santos](https://github.com/vasco-santos).
20+
[Vasco Santos](https://github.com/vasco-santos)
21+
22+
## Install
23+
24+
```sh
25+
npm install libp2p-mplex
26+
```
2227

2328
## Usage
2429

25-
Let's define a `listener.js`, which starts a TCP server on port 9999 and waits for a connection. Once we get a connection, we wait for a stream. And finally, once we have the stream, we pull the data from that stream, and printing it to the console.
26-
27-
```JavaScript
28-
const mplex = require('libp2p-mplex')
29-
const tcp = require('net')
30-
const pull = require('pull-stream')
31-
const toPull = require('stream-to-pull-stream')
32-
33-
const listener = tcp.createServer((socket) => {
34-
console.log('[listener] Got connection!')
35-
36-
const muxer = mplex.listener(toPull(socket))
37-
38-
muxer.on('stream', (stream) => {
39-
console.log('[listener] Got stream!')
40-
pull(
41-
stream,
42-
pull.drain((data) => {
43-
console.log('[listener] Received:')
44-
console.log(data.toString())
45-
})
46-
)
47-
})
48-
})
30+
```js
31+
const Mplex = require('libp2p-mplex')
32+
const pipe = require('it-pipe')
4933

50-
listener.listen(9999, () => {
51-
console.log('[listener] listening on 9999')
34+
const muxer = new Mplex({
35+
onStream: stream => { // Receive a duplex stream from the remote
36+
// ...receive data from the remote and optionally send data back
37+
}
5238
})
39+
40+
pipe(conn, muxer, conn) // conn is duplex connection to another peer
41+
42+
const stream = muxer.newStream() // Create a new duplex stream to the remote
43+
44+
// Use the duplex stream to send some data to the remote...
45+
pipe([1, 2, 3], stream)
5346
```
5447

55-
Now, let's define `dialer.js` who will connect to our `listener` over a TCP socket. Once we have that, we'll put a message in the stream for our `listener`.
48+
## API
5649

57-
```JavaScript
58-
const mplex = require('libp2p-mplex')
59-
const tcp = require('net')
60-
const pull = require('pull-stream')
61-
const toPull = require('stream-to-pull-stream')
50+
### `const muxer = new Mplex([options])`
6251

63-
const socket = tcp.connect(9999)
52+
Create a new _duplex_ stream that can be piped together with a connection in order to allow multiplexed communications.
6453

65-
const muxer = mplex.dialer(toPull(socket))
54+
e.g.
6655

67-
console.log('[dialer] opening stream')
68-
const stream = muxer.newStream((err) => {
69-
console.log('[dialer] opened stream')
70-
if (err) throw err
71-
})
56+
```js
57+
const Mplex = require('libp2p-mplex')
58+
const pipe = require('it-pipe')
7259

73-
pull(
74-
pull.values(['hey, how is it going. I am the dialer']),
75-
stream
76-
)
60+
// Create a duplex muxer
61+
const muxer = new Mplex()
62+
63+
// Use the muxer in a pipeline
64+
pipe(conn, muxer, conn) // conn is duplex connection to another peer
7765
```
7866

79-
Now we can first run `listener.js` and then `dialer.js` to see the
80-
following output:
67+
`options` is an optional `Object` that may have the following properties:
68+
69+
* `onStream` - A function called when receiving a new stream from the remote. e.g.
70+
```js
71+
// Receive a new stream on the muxed connection
72+
const onStream = stream => {
73+
// Read from this stream and write back to it (echo server)
74+
pipe(
75+
stream,
76+
source => (async function * () {
77+
for await (const data of source) yield data
78+
})(),
79+
stream
80+
)
81+
}
82+
const muxer = new Mplex({ onStream })
83+
// ...
84+
```
85+
**Note:** The `onStream` function can be passed in place of the `options` object. i.e.
86+
```js
87+
new Mplex(stream => { /* ... */ })
88+
```
89+
* `signal` - An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) which can be used to abort the muxer, _including_ all of it's multiplexed connections. e.g.
90+
```js
91+
const controller = new AbortController()
92+
const muxer = new Mplex({ signal: controller.signal })
93+
94+
pipe(conn, muxer, conn)
95+
96+
controller.abort()
97+
```
98+
* `maxMsgSize` - The maximum size in bytes the data field of multiplexed messages may contain (default 1MB)
99+
100+
### `muxer.onStream`
101+
102+
Use this property as an alternative to passing `onStream` as an option to the `Mplex` constructor.
103+
104+
### `const stream = muxer.newStream([options])`
105+
106+
Initiate a new stream with the remote. Returns a [duplex stream](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it).
107+
108+
e.g.
81109

82-
*listener.js*
110+
```js
111+
// Create a new stream on the muxed connection
112+
const stream = muxer.newStream()
83113

84-
```
85-
$ node listener.js
86-
[listener] listening on 9999
87-
[listener] Got connection!
88-
[listener] Got stream!
89-
[listener] Received:
90-
hey, how is it going. I am the dialer
114+
// Use this new stream like any other duplex stream:
115+
pipe([1, 2, 3], stream, consume)
91116
```
92117

93-
*dialer.js*
118+
In addition to `sink` and `source` properties, this stream also has the following API, that will **normally _not_ be used by stream consumers**.
94119

95-
```
96-
$ node dialer.js
97-
[dialer] opening stream
98-
[dialer] opened stream
99-
```
120+
#### `stream.close()`
100121

101-
## Install
122+
Closes the stream for **reading**. If iterating over the source of this stream in a `for await of` loop, it will return (exit the loop) after any buffered data has been consumed.
102123

103-
```sh
104-
> npm install libp2p-mplex
105-
```
124+
This function is called automatically by the muxer when it receives a `CLOSE` message from the remote.
106125

107-
## API
126+
The source will return normally, the sink will continue to consume.
108127

109-
```js
110-
const mplex = require('libp2p-mplex')
111-
```
128+
#### `stream.abort([err])`
129+
130+
Closes the stream for **reading** _and_ **writing**. This should be called when a _local error_ has occurred.
131+
132+
Note, if called without an error any buffered data in the source can still be consumed and the stream will end normally.
133+
134+
This will cause a `RESET` message to be sent to the remote, _unless_ the sink has already ended.
135+
136+
The sink will return and the source will throw if an error is passed or return normally if not.
137+
138+
#### `stream.reset()`
139+
140+
Closes the stream _immediately_ for **reading** _and_ **writing**. This should be called when a _remote error_ has occurred.
141+
142+
This function is called automatically by the muxer when it receives a `RESET` message from the remote.
143+
144+
The sink will return and the source will throw.
145+
146+
## Contribute
147+
148+
The libp2p implementation in JavaScript is a work in progress. As such, there are a few things you can do right now to help out:
149+
150+
- Go through the modules and **check out existing issues**. This is especially useful for modules in active development. Some knowledge of IPFS/libp2p may be required, as well as the infrastructure behind it - for instance, you may need to read up on p2p and more complex operations like muxing to be able to help technically.
151+
- **Perform code reviews**. More eyes will help a) speed the project along b) ensure quality and c) reduce possible future bugs.
152+
- **Add tests**. There can never be enough tests.
153+
154+
## License
155+
156+
[MIT](LICENSE) © Protocol Labs

‎examples/dialer.js

+35-14
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,42 @@
22
'use strict'
33

44
const tcp = require('net')
5-
const pull = require('pull-stream')
6-
const toPull = require('stream-to-pull-stream')
7-
const multiplex = require('../src')
5+
const pipe = require('it-pipe')
6+
const AbortController = require('abort-controller')
7+
const { toIterable } = require('./util')
8+
const Mplex = require('../src')
89

9-
const socket = tcp.connect(9999)
10+
const socket = toIterable(tcp.connect(9999))
11+
console.log('[dialer] socket stream opened')
1012

11-
const muxer = multiplex.dialer(toPull(socket))
13+
const controller = new AbortController()
1214

13-
console.log('[dialer] opening stream')
14-
const stream = muxer.newStream((err) => {
15-
console.log('[dialer] opened stream')
16-
if (err) throw err
17-
})
15+
const muxer = new Mplex({ signal: controller.signal })
1816

19-
pull(
20-
pull.values(['hey, how is it going. I am the dialer']),
21-
stream
22-
)
17+
const pipeMuxerToSocket = async () => {
18+
await pipe(muxer, socket, muxer)
19+
console.log('[dialer] socket stream closed')
20+
}
21+
22+
const sendAndReceive = async () => {
23+
const muxedStream = muxer.newStream()
24+
console.log('[dialer] muxed stream opened')
25+
26+
await pipe(
27+
['hey, how is it going. I am the dialer'],
28+
muxedStream,
29+
async source => {
30+
for await (const chunk of source) {
31+
console.log('[dialer] received:')
32+
console.log(chunk.toString())
33+
}
34+
}
35+
)
36+
console.log('[dialer] muxed stream closed')
37+
38+
// Close the socket stream after 1s
39+
setTimeout(() => controller.abort(), 1000)
40+
}
41+
42+
pipeMuxerToSocket()
43+
sendAndReceive()

‎examples/listener.js

+25-18
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,34 @@
22
'use strict'
33

44
const tcp = require('net')
5-
const pull = require('pull-stream')
6-
const toPull = require('stream-to-pull-stream')
7-
const multiplex = require('../src')
5+
const pipe = require('it-pipe')
6+
const { toIterable } = require('./util')
7+
const Mplex = require('../src')
88

9-
const listener = tcp.createServer((socket) => {
9+
const listener = tcp.createServer(async socket => {
1010
console.log('[listener] Got connection!')
1111

12-
const muxer = multiplex.listener(toPull(socket))
13-
14-
muxer.on('stream', (stream) => {
15-
console.log('[listener] Got stream!')
16-
pull(
17-
stream,
18-
pull.drain((data) => {
19-
console.log('[listener] Received:')
20-
console.log(data.toString())
21-
})
22-
)
12+
const muxer = new Mplex({
13+
async onStream (stream) {
14+
console.log('[listener] muxed stream opened')
15+
await pipe(
16+
stream,
17+
source => (async function * () {
18+
for await (const chunk of source) {
19+
console.log('[listener] received:')
20+
console.log(chunk.toString())
21+
yield 'thanks for the message, I am the listener'
22+
}
23+
})(),
24+
stream
25+
)
26+
console.log('[listener] muxed stream closed')
27+
}
2328
})
24-
})
2529

26-
listener.listen(9999, () => {
27-
console.log('[listener] listening on 9999')
30+
socket = toIterable(socket)
31+
await pipe(socket, muxer, socket)
32+
console.log('[listener] socket stream closed')
2833
})
34+
35+
listener.listen(9999, () => console.log('[listener] listening on 9999'))

0 commit comments

Comments
 (0)
This repository has been archived.