2
2
3
3
const debug = require ( 'debug' )
4
4
const log = debug ( 'libp2p:webrtc-star' )
5
- const multiaddr = require ( 'multiaddr' )
6
- const mafmt = require ( 'mafmt' )
5
+ log . error = debug ( 'libp2p:webrtc-star:error' )
6
+
7
+ const assert = require ( 'assert' )
8
+ const { EventEmitter } = require ( 'events' )
9
+ const errcode = require ( 'err-code' )
7
10
const withIs = require ( 'class-is' )
8
- const io = require ( 'socket.io-client' )
9
- const EE = require ( 'events' ) . EventEmitter
11
+
12
+ const { AbortError } = require ( 'abortable-iterator' )
10
13
const SimplePeer = require ( 'simple-peer' )
14
+ const webrtcSupport = require ( 'webrtcsupport' )
15
+
16
+ const multiaddr = require ( 'multiaddr' )
17
+ const mafmt = require ( 'mafmt' )
11
18
const PeerId = require ( 'peer-id' )
12
19
const PeerInfo = require ( 'peer-info' )
13
- const Connection = require ( 'interface-connection' ) . Connection
14
- const toPull = require ( 'stream-to-pull-stream' )
15
- const once = require ( 'once' )
16
- const setImmediate = require ( 'async/setImmediate' )
17
- const webrtcSupport = require ( 'webrtcsupport' )
18
- const utils = require ( './utils' )
19
- const cleanUrlSIO = utils . cleanUrlSIO
20
- const cleanMultiaddr = utils . cleanMultiaddr
21
20
22
- const noop = once ( ( ) => { } )
21
+ const { CODE_CIRCUIT } = require ( './constants' )
22
+ const createListener = require ( './listener' )
23
+ const toConnection = require ( './socket-to-conn' )
24
+ const { cleanMultiaddr } = require ( './utils' )
23
25
24
- const sioOptions = {
25
- transports : [ 'websocket' ] ,
26
- 'force new connection' : true
27
- }
26
+ function noop ( ) { }
28
27
28
+ /**
29
+ * @class WebRTCStar
30
+ */
29
31
class WebRTCStar {
30
- constructor ( options ) {
31
- options = options || { }
32
+ /**
33
+ * @constructor
34
+ * @param {object } options
35
+ * @param {Upgrader } options.upgrader
36
+ */
37
+ constructor ( options = { } ) {
38
+ assert ( options . upgrader , 'An upgrader must be provided. See https://github.com/libp2p/interface-transport#upgrader.' )
39
+ this . _upgrader = options . upgrader
32
40
33
41
this . maSelf = undefined
34
42
@@ -41,201 +49,173 @@ class WebRTCStar {
41
49
this . wrtc = options . wrtc
42
50
}
43
51
44
- this . discovery = new EE ( )
52
+ this . listenersRefs = { }
53
+
54
+ // Discovery
55
+ this . discovery = new EventEmitter ( )
45
56
this . discovery . tag = 'webRTCStar'
46
57
this . discovery . _isStarted = false
47
- this . discovery . start = ( callback ) => {
58
+ this . discovery . start = ( ) => {
48
59
this . discovery . _isStarted = true
49
- setImmediate ( callback )
50
60
}
51
- this . discovery . stop = ( callback ) => {
61
+ this . discovery . stop = ( ) => {
52
62
this . discovery . _isStarted = false
53
- setImmediate ( callback )
54
63
}
55
-
56
- this . listenersRefs = { }
57
64
this . _peerDiscovered = this . _peerDiscovered . bind ( this )
58
65
}
59
66
60
- dial ( ma , options , callback ) {
61
- if ( typeof options === 'function' ) {
62
- callback = options
63
- options = { }
64
- }
65
-
66
- callback = callback ? once ( callback ) : noop
67
-
68
- const intentId = ( ~ ~ ( Math . random ( ) * 1e9 ) ) . toString ( 36 ) + Date . now ( )
67
+ /**
68
+ * @async
69
+ * @param {Multiaddr } ma
70
+ * @param {object } options
71
+ * @param {AbortSignal } options.signal Used to abort dial requests
72
+ * @returns {Connection } An upgraded Connection
73
+ */
74
+ async dial ( ma , options = { } ) {
75
+ const rawConn = await this . _connect ( ma , options )
76
+ const maConn = toConnection ( rawConn , { remoteAddr : ma , signal : options . signal } )
77
+ log ( 'new outbound connection %s' , maConn . remoteAddr )
78
+ const conn = await this . _upgrader . upgradeOutbound ( maConn )
79
+ log ( 'outbound connection %s upgraded' , maConn . remoteAddr )
80
+ return conn
81
+ }
69
82
70
- const sioClient = this
71
- . listenersRefs [ Object . keys ( this . listenersRefs ) [ 0 ] ] . io
83
+ /**
84
+ * @private
85
+ * @param {Multiaddr } ma
86
+ * @param {object } options
87
+ * @param {AbortSignal } options.signal Used to abort dial requests
88
+ * @returns {Promise<SimplePeer> } Resolves a SimplePeer Webrtc channel
89
+ */
90
+ _connect ( ma , options = { } ) {
91
+ if ( options . signal && options . signal . aborted ) {
92
+ throw new AbortError ( )
93
+ }
72
94
73
- const spOptions = { initiator : true , trickle : false }
95
+ const spOptions = {
96
+ initiator : true ,
97
+ trickle : false
98
+ }
74
99
75
100
// Use custom WebRTC implementation
76
101
if ( this . wrtc ) { spOptions . wrtc = this . wrtc }
77
102
78
- let channel
79
- try {
80
- channel = new SimplePeer ( spOptions )
81
- } catch ( err ) {
82
- log ( 'Could not create connection:' , err )
83
- return callback ( err )
84
- }
103
+ const cOpts = ma . toOptions ( )
85
104
86
- const conn = new Connection ( toPull . duplex ( channel ) )
87
- let connected = false
105
+ const intentId = ( ~ ~ ( Math . random ( ) * 1e9 ) ) . toString ( 36 ) + Date . now ( )
106
+ const sioClient = this
107
+ . listenersRefs [ Object . keys ( this . listenersRefs ) [ 0 ] ] . io
88
108
89
- channel . on ( 'signal' , ( signal ) => {
90
- sioClient . emit ( 'ss-handshake' , {
91
- intentId : intentId ,
92
- srcMultiaddr : this . maSelf . toString ( ) ,
93
- dstMultiaddr : ma . toString ( ) ,
94
- signal : signal
95
- } )
96
- } )
109
+ return new Promise ( ( resolve , reject ) => {
110
+ const start = Date . now ( )
111
+ let connected
97
112
98
- channel . once ( 'timeout' , ( ) => callback ( new Error ( 'timeout' ) ) )
113
+ log ( 'dialing %s:%s' , cOpts . host , cOpts . port )
114
+ const channel = new SimplePeer ( spOptions )
99
115
100
- channel . once ( 'error' , ( err ) => {
101
- if ( ! connected ) { callback ( err ) }
102
- } )
116
+ const onError = ( err ) => {
117
+ if ( ! connected ) {
118
+ const msg = `connection error ${ cOpts . host } : ${ cOpts . port } : ${ err . message } `
103
119
104
- // NOTE: aegir segfaults if we do .once on the socket.io event emitter and we
105
- // are clueless as to why.
106
- sioClient . on ( 'ws-handshake' , ( offer ) => {
107
- if ( offer . intentId === intentId && offer . err ) {
108
- return callback ( new Error ( offer . err ) )
120
+ err . message = msg
121
+ log . error ( msg )
122
+ done ( err )
123
+ }
109
124
}
110
125
111
- if ( offer . intentId !== intentId || ! offer . answer ) {
112
- return
126
+ const onTimeout = ( ) => {
127
+ log ( 'connnection timeout %s:%s' , cOpts . host , cOpts . port )
128
+ const err = errcode ( new Error ( `connection timeout after ${ Date . now ( ) - start } ms` ) , 'ERR_CONNECT_TIMEOUT' )
129
+ // Note: this will result in onError() being called
130
+ channel . emit ( 'error' , err )
113
131
}
114
132
115
- channel . once ( 'connect' , ( ) => {
133
+ const onConnect = ( ) => {
116
134
connected = true
117
- conn . destroy = channel . destroy . bind ( channel )
118
-
119
- channel . once ( 'close' , ( ) => conn . destroy ( ) )
120
-
121
- conn . getObservedAddrs = ( callback ) => callback ( null , [ ma ] )
122
-
123
- callback ( null , conn )
124
- } )
125
-
126
- channel . signal ( offer . signal )
127
- } )
128
-
129
- return conn
130
- }
131
135
132
- createListener ( options , handler ) {
133
- if ( typeof options === 'function' ) {
134
- handler = options
135
- options = { }
136
- }
137
-
138
- const listener = new EE ( )
139
-
140
- listener . listen = ( ma , callback ) => {
141
- callback = callback ? once ( callback ) : noop
142
-
143
- if ( ! webrtcSupport . support && ! this . wrtc ) {
144
- return setImmediate ( ( ) => callback ( new Error ( 'no WebRTC support' ) ) )
136
+ log ( 'connection opened %s:%s' , cOpts . host , cOpts . port )
137
+ done ( null )
145
138
}
146
139
147
- this . maSelf = ma
148
-
149
- const sioUrl = cleanUrlSIO ( ma )
150
-
151
- log ( 'Dialing to Signalling Server on: ' + sioUrl )
152
-
153
- listener . io = io . connect ( sioUrl , sioOptions )
154
-
155
- listener . io . once ( 'connect_error' , callback )
156
- listener . io . once ( 'error' , ( err ) => {
157
- listener . emit ( 'error' , err )
158
- listener . emit ( 'close' )
159
- } )
140
+ const onAbort = ( ) => {
141
+ log . error ( 'connection aborted %s:%s' , cOpts . host , cOpts . port )
142
+ channel . destroy ( )
143
+ done ( new AbortError ( ) )
144
+ }
160
145
161
- listener . io . on ( 'ws-handshake' , incommingDial )
162
- listener . io . on ( 'ws-peer' , this . _peerDiscovered )
146
+ const done = ( err ) => {
147
+ channel . removeListener ( 'error' , onError )
148
+ channel . removeListener ( 'timeout' , onTimeout )
149
+ channel . removeListener ( 'connect' , onConnect )
150
+ options . signal && options . signal . removeEventListener ( 'abort' , onAbort )
163
151
164
- listener . io . on ( 'connect' , ( ) => {
165
- listener . io . emit ( 'ss-join' , ma . toString ( ) )
166
- } )
152
+ err ? reject ( err ) : resolve ( channel )
153
+ }
167
154
168
- listener . io . once ( 'connect' , ( ) => {
169
- listener . emit ( 'listening' )
170
- callback ( )
155
+ channel . once ( 'error' , onError )
156
+ channel . once ( 'timeout' , onTimeout )
157
+ channel . once ( 'connect' , onConnect )
158
+ channel . on ( 'close' , ( ) => channel . destroy ( ) )
159
+ options . signal && options . signal . addEventListener ( 'abort' , onAbort )
160
+
161
+ channel . on ( 'signal' , ( signal ) => {
162
+ sioClient . emit ( 'ss-handshake' , {
163
+ intentId : intentId ,
164
+ srcMultiaddr : this . maSelf . toString ( ) ,
165
+ dstMultiaddr : ma . toString ( ) ,
166
+ signal : signal
167
+ } )
171
168
} )
172
169
173
- const self = this
174
- function incommingDial ( offer ) {
175
- if ( offer . answer || offer . err ) {
176
- return
170
+ // NOTE: aegir segfaults if we do .once on the socket.io event emitter and we
171
+ // are clueless as to why.
172
+ sioClient . on ( 'ws-handshake' , ( offer ) => {
173
+ if ( offer . intentId === intentId && offer . err ) {
174
+ reject ( offer . err )
177
175
}
178
176
179
- const spOptions = { trickle : false }
180
-
181
- // Use custom WebRTC implementation
182
- if ( self . wrtc ) { spOptions . wrtc = self . wrtc }
183
-
184
- let channel
185
- try {
186
- channel = new SimplePeer ( spOptions )
187
- } catch ( err ) {
188
- log ( 'Could not create incoming connection:' , err )
189
- return callback ( err )
177
+ if ( offer . intentId !== intentId || ! offer . answer ) {
178
+ return
190
179
}
191
180
192
- const conn = new Connection ( toPull . duplex ( channel ) )
193
-
194
- channel . once ( 'connect' , ( ) => {
195
- conn . getObservedAddrs = ( callback ) => {
196
- return callback ( null , [ offer . srcMultiaddr ] )
197
- }
198
-
199
- listener . emit ( 'connection' , conn )
200
- handler ( conn )
201
- } )
202
-
203
- channel . once ( 'signal' , ( signal ) => {
204
- offer . signal = signal
205
- offer . answer = true
206
- listener . io . emit ( 'ss-handshake' , offer )
207
- } )
208
-
209
181
channel . signal ( offer . signal )
210
- }
211
- }
212
-
213
- listener . close = ( callback ) => {
214
- callback = callback ? once ( callback ) : noop
215
-
216
- listener . io . emit ( 'ss-leave' )
217
-
218
- setImmediate ( ( ) => {
219
- listener . emit ( 'close' )
220
- callback ( )
221
182
} )
183
+ } )
184
+ }
185
+
186
+ /**
187
+ * Creates a WebrtcStar listener. The provided `handler` function will be called
188
+ * anytime a new incoming Connection has been successfully upgraded via
189
+ * `upgrader.upgradeInbound`.
190
+ * @param {object } [options] simple-peer options for listener
191
+ * @param {function (Connection) } handler
192
+ * @returns {Listener } A WebrtcStar listener
193
+ */
194
+ createListener ( options = { } , handler ) {
195
+ if ( ! webrtcSupport . support && ! this . wrtc ) {
196
+ throw errcode ( new Error ( 'no WebRTC support' ) , 'ERR_NO_WEBRTC_SUPPORT' )
222
197
}
223
198
224
- listener . getAddrs = ( callback ) => {
225
- setImmediate ( ( ) => callback ( null , [ this . maSelf ] ) )
199
+ if ( typeof options === 'function' ) {
200
+ handler = options
201
+ options = { }
226
202
}
227
203
228
- this . listenersRefs [ multiaddr . toString ( ) ] = listener
229
- return listener
204
+ handler = handler || noop
205
+
206
+ return createListener ( { handler, upgrader : this . _upgrader } , this , options )
230
207
}
231
208
209
+ /**
210
+ * Takes a list of `Multiaddr`s and returns only valid TCP addresses
211
+ * @param {Multiaddr[] } multiaddrs
212
+ * @returns {Multiaddr[] } Valid TCP multiaddrs
213
+ */
232
214
filter ( multiaddrs ) {
233
- if ( ! Array . isArray ( multiaddrs ) ) {
234
- multiaddrs = [ multiaddrs ]
235
- }
215
+ multiaddrs = Array . isArray ( multiaddrs ) ? multiaddrs : [ multiaddrs ]
236
216
237
217
return multiaddrs . filter ( ( ma ) => {
238
- if ( ma . protoNames ( ) . indexOf ( 'p2p-circuit' ) > - 1 ) {
218
+ if ( ma . protoCodes ( ) . includes ( CODE_CIRCUIT ) ) {
239
219
return false
240
220
}
241
221
@@ -249,11 +229,10 @@ class WebRTCStar {
249
229
log ( 'Peer Discovered:' , maStr )
250
230
maStr = cleanMultiaddr ( maStr )
251
231
252
- const split = maStr . split ( '/ipfs/' )
253
- const peerIdStr = split [ split . length - 1 ]
254
- const peerId = PeerId . createFromB58String ( peerIdStr )
232
+ const ma = multiaddr ( maStr )
233
+ const peerId = PeerId . createFromB58String ( ma . getPeerId ( ) )
255
234
const peerInfo = new PeerInfo ( peerId )
256
- peerInfo . multiaddrs . add ( multiaddr ( maStr ) )
235
+ peerInfo . multiaddrs . add ( ma )
257
236
this . discovery . emit ( 'peer' , peerInfo )
258
237
}
259
238
}
0 commit comments