1
1
import type { Response } from 'openai/_shims/fetch' ;
2
+
2
3
import { APIResponse , Headers , createResponseHeaders } from './core' ;
3
4
5
+ type Bytes = string | ArrayBuffer | Uint8Array | Buffer | null | undefined ;
6
+
4
7
type ServerSentEvent = {
5
8
event : string | null ;
6
9
data : string ;
7
10
raw : string [ ] ;
8
11
} ;
9
12
10
- class SSEDecoder {
11
- private data : string [ ] ;
12
- private event : string | null ;
13
- private chunks : string [ ] ;
14
-
15
- constructor ( ) {
16
- this . event = null ;
17
- this . data = [ ] ;
18
- this . chunks = [ ] ;
19
- }
20
-
21
- decode ( line : string ) {
22
- if ( line . endsWith ( '\r' ) ) {
23
- line = line . substring ( 0 , line . length - 1 ) ;
24
- }
25
-
26
- if ( ! line ) {
27
- // empty line and we didn't previously encounter any messages
28
- if ( ! this . event && ! this . data . length ) return null ;
29
-
30
- const sse : ServerSentEvent = {
31
- event : this . event ,
32
- data : this . data . join ( '\n' ) ,
33
- raw : this . chunks ,
34
- } ;
35
-
36
- this . event = null ;
37
- this . data = [ ] ;
38
- this . chunks = [ ] ;
39
-
40
- return sse ;
41
- }
42
-
43
- this . chunks . push ( line ) ;
44
-
45
- if ( line . startsWith ( ':' ) ) {
46
- return null ;
47
- }
48
-
49
- let [ fieldname , _ , value ] = partition ( line , ':' ) ;
50
-
51
- if ( value . startsWith ( ' ' ) ) {
52
- value = value . substring ( 1 ) ;
53
- }
54
-
55
- if ( fieldname === 'event' ) {
56
- this . event = value ;
57
- } else if ( fieldname === 'data' ) {
58
- this . data . push ( value ) ;
59
- }
60
-
61
- return null ;
62
- }
63
- }
64
-
65
13
export class Stream < Item > implements AsyncIterable < Item > , APIResponse < Stream < Item > > {
14
+ /** @deprecated - please use the async iterator instead. We plan to add additional helper methods shortly. */
66
15
response : Response ;
16
+ /** @deprecated - we plan to add a different way to access raw response information shortly. */
67
17
responseHeaders : Headers ;
68
18
controller : AbortController ;
69
19
@@ -81,21 +31,11 @@ export class Stream<Item> implements AsyncIterable<Item>, APIResponse<Stream<Ite
81
31
this . controller . abort ( ) ;
82
32
throw new Error ( `Attempted to iterate over a response with no body` ) ;
83
33
}
84
-
85
34
const lineDecoder = new LineDecoder ( ) ;
86
35
87
- // @ts -ignore
88
- for await ( const chunk of this . response . body ) {
89
- let text ;
90
- if ( chunk instanceof Buffer ) {
91
- text = chunk . toString ( ) ;
92
- } else if ( ( chunk as any ) instanceof Uint8Array ) {
93
- text = Buffer . from ( chunk ) . toString ( ) ;
94
- } else {
95
- text = chunk ;
96
- }
97
-
98
- for ( const line of lineDecoder . decode ( text ) ) {
36
+ const iter = readableStreamAsyncIterable < Bytes > ( this . response . body ) ;
37
+ for await ( const chunk of iter ) {
38
+ for ( const line of lineDecoder . decode ( chunk ) ) {
99
39
const sse = this . decoder . decode ( line ) ;
100
40
if ( sse ) yield sse ;
101
41
}
@@ -135,7 +75,60 @@ export class Stream<Item> implements AsyncIterable<Item>, APIResponse<Stream<Ite
135
75
}
136
76
}
137
77
138
- const NEWLINE_CHARS = '\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029' ;
78
+ class SSEDecoder {
79
+ private data : string [ ] ;
80
+ private event : string | null ;
81
+ private chunks : string [ ] ;
82
+
83
+ constructor ( ) {
84
+ this . event = null ;
85
+ this . data = [ ] ;
86
+ this . chunks = [ ] ;
87
+ }
88
+
89
+ decode ( line : string ) {
90
+ if ( line . endsWith ( '\r' ) ) {
91
+ line = line . substring ( 0 , line . length - 1 ) ;
92
+ }
93
+
94
+ if ( ! line ) {
95
+ // empty line and we didn't previously encounter any messages
96
+ if ( ! this . event && ! this . data . length ) return null ;
97
+
98
+ const sse : ServerSentEvent = {
99
+ event : this . event ,
100
+ data : this . data . join ( '\n' ) ,
101
+ raw : this . chunks ,
102
+ } ;
103
+
104
+ this . event = null ;
105
+ this . data = [ ] ;
106
+ this . chunks = [ ] ;
107
+
108
+ return sse ;
109
+ }
110
+
111
+ this . chunks . push ( line ) ;
112
+
113
+ if ( line . startsWith ( ':' ) ) {
114
+ return null ;
115
+ }
116
+
117
+ let [ fieldname , _ , value ] = partition ( line , ':' ) ;
118
+
119
+ if ( value . startsWith ( ' ' ) ) {
120
+ value = value . substring ( 1 ) ;
121
+ }
122
+
123
+ if ( fieldname === 'event' ) {
124
+ this . event = value ;
125
+ } else if ( fieldname === 'data' ) {
126
+ this . data . push ( value ) ;
127
+ }
128
+
129
+ return null ;
130
+ }
131
+ }
139
132
140
133
/**
141
134
* A re-implementation of httpx's `LineDecoder` in Python that handles incrementally
@@ -144,15 +137,22 @@ const NEWLINE_CHARS = '\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029';
144
137
* https://github.com/encode/httpx/blob/920333ea98118e9cf617f246905d7b202510941c/httpx/_decoders.py#L258
145
138
*/
146
139
class LineDecoder {
140
+ // prettier-ignore
141
+ static NEWLINE_CHARS = new Set ( [ '\n' , '\r' , '\x0b' , '\x0c' , '\x1c' , '\x1d' , '\x1e' , '\x85' , '\u2028' , '\u2029' ] ) ;
142
+ static NEWLINE_REGEXP = / \r \n | [ \n \r \x0b \x0c \x1c \x1d \x1e \x85 \u2028 \u2029 ] / g;
143
+
147
144
buffer : string [ ] ;
148
145
trailingCR : boolean ;
146
+ textDecoder : any ; // TextDecoder found in browsers; not typed to avoid pulling in either "dom" or "node" types.
149
147
150
148
constructor ( ) {
151
149
this . buffer = [ ] ;
152
150
this . trailingCR = false ;
153
151
}
154
152
155
- decode ( text : string ) : string [ ] {
153
+ decode ( chunk : Bytes ) : string [ ] {
154
+ let text = this . decodeText ( chunk ) ;
155
+
156
156
if ( this . trailingCR ) {
157
157
text = '\r' + text ;
158
158
this . trailingCR = false ;
@@ -166,10 +166,10 @@ class LineDecoder {
166
166
return [ ] ;
167
167
}
168
168
169
- const trailing_newline = NEWLINE_CHARS . includes ( text . slice ( - 1 ) ) ;
170
- let lines = text . split ( / \r \n | [ \n \r \x0b \x0c \x1c \x1d \x1e \x85 \u2028 \u2029 ] / g ) ;
169
+ const trailingNewline = LineDecoder . NEWLINE_CHARS . has ( text [ text . length - 1 ] || '' ) ;
170
+ let lines = text . split ( LineDecoder . NEWLINE_REGEXP ) ;
171
171
172
- if ( lines . length === 1 && ! trailing_newline ) {
172
+ if ( lines . length === 1 && ! trailingNewline ) {
173
173
this . buffer . push ( lines [ 0 ] ! ) ;
174
174
return [ ] ;
175
175
}
@@ -179,13 +179,50 @@ class LineDecoder {
179
179
this . buffer = [ ] ;
180
180
}
181
181
182
- if ( ! trailing_newline ) {
182
+ if ( ! trailingNewline ) {
183
183
this . buffer = [ lines . pop ( ) || '' ] ;
184
184
}
185
185
186
186
return lines ;
187
187
}
188
188
189
+ decodeText ( bytes : Bytes ) : string {
190
+ if ( bytes == null ) return '' ;
191
+ if ( typeof bytes === 'string' ) return bytes ;
192
+
193
+ // Node:
194
+ if ( typeof Buffer !== 'undefined' ) {
195
+ if ( bytes instanceof Buffer ) {
196
+ return bytes . toString ( ) ;
197
+ }
198
+ if ( bytes instanceof Uint8Array ) {
199
+ return Buffer . from ( bytes ) . toString ( ) ;
200
+ }
201
+
202
+ throw new Error (
203
+ `Unexpected: received non-Uint8Array (${ bytes . constructor . name } ) stream chunk in an environment with a global "Buffer" defined, which this library assumes to be Node. Please report this error.` ,
204
+ ) ;
205
+ }
206
+
207
+ // Browser
208
+ if ( typeof TextDecoder !== 'undefined' ) {
209
+ if ( bytes instanceof Uint8Array || bytes instanceof ArrayBuffer ) {
210
+ this . textDecoder ??= new TextDecoder ( 'utf8' ) ;
211
+ return this . textDecoder . decode ( bytes ) ;
212
+ }
213
+
214
+ throw new Error (
215
+ `Unexpected: received non-Uint8Array/ArrayBuffer (${
216
+ ( bytes as any ) . constructor . name
217
+ } ) in a web platform. Please report this error.`,
218
+ ) ;
219
+ }
220
+
221
+ throw new Error (
222
+ `Unexpected: neither Buffer nor TextDecoder are available as globals. Please report this error.` ,
223
+ ) ;
224
+ }
225
+
189
226
flush ( ) : string [ ] {
190
227
if ( ! this . buffer . length && ! this . trailingCR ) {
191
228
return [ ] ;
@@ -206,3 +243,28 @@ function partition(str: string, delimiter: string): [string, string, string] {
206
243
207
244
return [ str , '' , '' ] ;
208
245
}
246
+
247
+ /**
248
+ * Most browsers don't yet have async iterable support for ReadableStream,
249
+ * and Node has a very different way of reading bytes from its "ReadableStream".
250
+ *
251
+ * This polyfill was pulled from https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1624185965
252
+ */
253
+ function readableStreamAsyncIterable < T > ( stream : any ) : AsyncIterableIterator < T > {
254
+ if ( stream [ Symbol . asyncIterator ] ) return stream [ Symbol . asyncIterator ] ;
255
+
256
+ const reader = stream . getReader ( ) ;
257
+ return {
258
+ next ( ) {
259
+ return reader . read ( ) ;
260
+ } ,
261
+ async return ( ) {
262
+ reader . cancel ( ) ;
263
+ reader . releaseLock ( ) ;
264
+ return { done : true , value : undefined } ;
265
+ } ,
266
+ [ Symbol . asyncIterator ] ( ) {
267
+ return this ;
268
+ } ,
269
+ } ;
270
+ }
0 commit comments