Skip to content

Commit

Permalink
feat: Web Stream API (#5286)
Browse files Browse the repository at this point in the history
* feat: Web Stream API

* chore: update error PR number

* test: skip test when not available

* test: exit when skip test

* test: use Readable.toWeb instead

* docs: update ToC and onSend hook

* refactor: toString

* refactor: direct for-of payload.headers

* test: ensure compatibility of third-party Response

* refactor: reduce toString call
  • Loading branch information
climba03003 committed Jan 29, 2024
1 parent bdd647d commit 101ba57
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 10 deletions.
3 changes: 3 additions & 0 deletions docs/Reference/Errors.md
Expand Up @@ -45,6 +45,7 @@
- [FST_ERR_LOG_INVALID_DESTINATION](#fst_err_log_invalid_destination)
- [FST_ERR_LOG_INVALID_LOGGER](#fst_err_log_invalid_logger)
- [FST_ERR_REP_INVALID_PAYLOAD_TYPE](#fst_err_rep_invalid_payload_type)
- [FST_ERR_REP_RESPONSE_BODY_CONSUMED](#fst_err_rep_response_body_consumed)
- [FST_ERR_REP_ALREADY_SENT](#fst_err_rep_already_sent)
- [FST_ERR_REP_SENT_VALUE](#fst_err_rep_sent_value)
- [FST_ERR_SEND_INSIDE_ONERR](#fst_err_send_inside_onerr)
Expand Down Expand Up @@ -312,6 +313,8 @@ Below is a table with all the error codes that Fastify uses.
| <a id="fst_err_log_invalid_destination">FST_ERR_LOG_INVALID_DESTINATION</a> | The logger does not accept the specified destination. | Use a `'stream'` or a `'file'` as the destination. | [#1168](https://github.com/fastify/fastify/pull/1168) |
| <a id="fst_err_log_invalid_logger">FST_ERR_LOG_INVALID_LOGGER</a> | The logger should have all these methods: `'info'`, `'error'`, `'debug'`, `'fatal'`, `'warn'`, `'trace'`, `'child'`. | Use a logger with all the required methods. | [#4520](https://github.com/fastify/fastify/pull/4520) |
| <a id="fst_err_rep_invalid_payload_type">FST_ERR_REP_INVALID_PAYLOAD_TYPE</a> | Reply payload can be either a `string` or a `Buffer`. | Use a `string` or a `Buffer` for the payload. | [#1168](https://github.com/fastify/fastify/pull/1168) |
| <a id="fst_err_rep_response_body_consumed">FST_ERR_REP_RESPONSE_BODY_CONSUMED</a> | Using `Response` as reply payload
but the body is being consumed. | Make sure you don't consume the `Response.body` | [#5286](https://github.com/fastify/fastify/pull/5286) |
| <a id="fst_err_rep_already_sent">FST_ERR_REP_ALREADY_SENT</a> | A response was already sent. | - | [#1336](https://github.com/fastify/fastify/pull/1336) |
| <a id="fst_err_rep_sent_value">FST_ERR_REP_SENT_VALUE</a> | The only possible value for `reply.sent` is `true`. | - | [#1336](https://github.com/fastify/fastify/pull/1336) |
| <a id="fst_err_send_inside_onerr">FST_ERR_SEND_INSIDE_ONERR</a> | You cannot use `send` inside the `onError` hook. | - | [#1348](https://github.com/fastify/fastify/pull/1348) |
Expand Down
2 changes: 1 addition & 1 deletion docs/Reference/Hooks.md
Expand Up @@ -232,7 +232,7 @@ fastify.addHook('onSend', (request, reply, payload, done) => {
> `null`.
Note: If you change the payload, you may only change it to a `string`, a
`Buffer`, a `stream`, or `null`.
`Buffer`, a `stream`, a `ReadableStream`, a `Response`, or `null`.


### onResponse
Expand Down
48 changes: 48 additions & 0 deletions docs/Reference/Reply.md
Expand Up @@ -33,6 +33,8 @@
- [Strings](#strings)
- [Streams](#streams)
- [Buffers](#buffers)
- [ReadableStream](#send-readablestream)
- [Response](#send-response)
- [Errors](#errors)
- [Type of the final payload](#type-of-the-final-payload)
- [Async-Await and Promises](#async-await-and-promises)
Expand Down Expand Up @@ -756,6 +758,52 @@ fastify.get('/streams', function (request, reply) {
})
```

#### ReadableStream
<a id="send-readablestream"></a>

`ReadableStream` will be treated as a node stream mentioned above,
the content is considered to be pre-serialized, so they will be
sent unmodified without response validation.

```js
const fs = require('node:fs')
const { ReadableStream } = require('node:stream/web')
fastify.get('/streams', function (request, reply) {
const stream = fs.createReadStream('some-file')
reply.header('Content-Type', 'application/octet-stream')
reply.send(ReadableStream.from(stream))
})
```

#### Response
<a id="send-response"></a>

`Response` allows to manage the reply payload, status code and
headers in one place. The payload provided inside `Response` is
considered to be pre-serialized, so they will be sent unmodified
without response validation.

Plese be aware when using `Response`, the status code and headers
will not directly reflect to `reply.statusCode` and `reply.getHeaders()`.
Such behavior is based on `Response` only allow `readonly` status
code and headers. The data is not allow to be bi-direction editing,
and may confuse when checking the `payload` in `onSend` hooks.

```js
const fs = require('node:fs')
const { ReadableStream } = require('node:stream/web')
fastify.get('/streams', function (request, reply) {
const stream = fs.createReadStream('some-file')
const readableStream = ReadableStream.from(stream)
const response = new Response(readableStream, {
status: 200,
headers: { 'content-type': 'application/octet-stream' }
})
reply.send(response)
})
```


#### Errors
<a id="errors"></a>

Expand Down
4 changes: 4 additions & 0 deletions lib/errors.js
Expand Up @@ -212,6 +212,10 @@ const codes = {
500,
TypeError
),
FST_ERR_REP_RESPONSE_BODY_CONSUMED: createError(
'FST_ERR_REP_RESPONSE_BODY_CONSUMED',
'Response.body is already consumed.'
),
FST_ERR_REP_ALREADY_SENT: createError(
'FST_ERR_REP_ALREADY_SENT',
'Reply was already sent, did you forget to "return reply" in "%s" (%s)?'
Expand Down
57 changes: 55 additions & 2 deletions lib/reply.js
@@ -1,6 +1,7 @@
'use strict'

const eos = require('node:stream').finished
const Readable = require('node:stream').Readable

const {
kFourOhFourContext,
Expand Down Expand Up @@ -44,6 +45,7 @@ const CONTENT_TYPE = {
}
const {
FST_ERR_REP_INVALID_PAYLOAD_TYPE,
FST_ERR_REP_RESPONSE_BODY_CONSUMED,
FST_ERR_REP_ALREADY_SENT,
FST_ERR_REP_SENT_VALUE,
FST_ERR_SEND_INSIDE_ONERR,
Expand All @@ -55,6 +57,8 @@ const {
} = require('./errors')
const { FSTDEP010, FSTDEP013, FSTDEP019, FSTDEP020 } = require('./warnings')

const toString = Object.prototype.toString

function Reply (res, request, log) {
this.raw = res
this[kReplySerializer] = null
Expand Down Expand Up @@ -163,7 +167,14 @@ Reply.prototype.send = function (payload) {
const hasContentType = contentType !== undefined

if (payload !== null) {
if (typeof payload.pipe === 'function') {
if (
// node:stream
typeof payload.pipe === 'function' ||
// node:stream/web
typeof payload.getReader === 'function' ||
// Response
toString.call(payload) === '[object Response]'
) {
onSendHook(this, payload)
return this
}
Expand Down Expand Up @@ -570,7 +581,6 @@ function safeWriteHead (reply, statusCode) {
function onSendEnd (reply, payload) {
const res = reply.raw
const req = reply.request
const statusCode = res.statusCode

// we check if we need to update the trailers header and set it
if (reply[kReplyTrailers] !== null) {
Expand All @@ -586,6 +596,17 @@ function onSendEnd (reply, payload) {
reply.header('Trailer', header.trim())
}

// since Response contain status code, we need to update before
// any action that used statusCode
const isResponse = toString.call(payload) === '[object Response]'
if (isResponse) {
// https://developer.mozilla.org/en-US/docs/Web/API/Response/status
if (typeof payload.status === 'number') {
reply.code(payload.status)
}
}
const statusCode = res.statusCode

if (payload === undefined || payload === null) {
// according to https://datatracker.ietf.org/doc/html/rfc7230#section-3.3.2
// we cannot send a content-length for 304 and 204, and all status code
Expand Down Expand Up @@ -617,11 +638,38 @@ function onSendEnd (reply, payload) {
return
}

// node:stream
if (typeof payload.pipe === 'function') {
sendStream(payload, res, reply)
return
}

// node:stream/web
if (typeof payload.getReader === 'function') {
sendWebStream(payload, res, reply)
return
}

// Response
if (isResponse) {
// https://developer.mozilla.org/en-US/docs/Web/API/Response/headers
if (typeof payload.headers === 'object' && typeof payload.headers.forEach === 'function') {
for (const [headerName, headerValue] of payload.headers) {
reply.header(headerName, headerValue)
}
}

// https://developer.mozilla.org/en-US/docs/Web/API/Response/body
if (payload.body != null) {
if (payload.bodyUsed) {
throw new FST_ERR_REP_RESPONSE_BODY_CONSUMED()
}
// Response.body always a ReadableStream
sendWebStream(payload.body, res, reply)
}
return
}

if (typeof payload !== 'string' && !Buffer.isBuffer(payload)) {
throw new FST_ERR_REP_INVALID_PAYLOAD_TYPE(typeof payload)
}
Expand Down Expand Up @@ -654,6 +702,11 @@ function logStreamError (logger, err, res) {
}
}

function sendWebStream (payload, res, reply) {
const nodeStream = Readable.fromWeb(payload)
sendStream(nodeStream, res, reply)
}

function sendStream (payload, res, reply) {
let sourceOpen = true
let errorLogged = false
Expand Down
24 changes: 17 additions & 7 deletions test/internals/errors.test.js
Expand Up @@ -5,7 +5,7 @@ const errors = require('../../lib/errors')
const { readFileSync } = require('node:fs')
const { resolve } = require('node:path')

test('should expose 78 errors', t => {
test('should expose 79 errors', t => {
t.plan(1)
const exportedKeys = Object.keys(errors)
let counter = 0
Expand All @@ -14,11 +14,11 @@ test('should expose 78 errors', t => {
counter++
}
}
t.equal(counter, 78)
t.equal(counter, 79)
})

test('ensure name and codes of Errors are identical', t => {
t.plan(78)
t.plan(79)
const exportedKeys = Object.keys(errors)
for (const key of exportedKeys) {
if (errors[key].name === 'FastifyError') {
Expand Down Expand Up @@ -337,6 +337,16 @@ test('FST_ERR_REP_INVALID_PAYLOAD_TYPE', t => {
t.ok(error instanceof TypeError)
})

test('FST_ERR_REP_RESPONSE_BODY_CONSUMED', t => {
t.plan(5)
const error = new errors.FST_ERR_REP_RESPONSE_BODY_CONSUMED()
t.equal(error.name, 'FastifyError')
t.equal(error.code, 'FST_ERR_REP_RESPONSE_BODY_CONSUMED')
t.equal(error.message, 'Response.body is already consumed.')
t.equal(error.statusCode, 500)
t.ok(error instanceof Error)
})

test('FST_ERR_REP_ALREADY_SENT', t => {
t.plan(5)
const error = new errors.FST_ERR_REP_ALREADY_SENT('/hello', 'GET')
Expand Down Expand Up @@ -818,7 +828,7 @@ test('FST_ERR_LISTEN_OPTIONS_INVALID', t => {
})

test('Ensure that all errors are in Errors.md TOC', t => {
t.plan(78)
t.plan(79)
const errorsMd = readFileSync(resolve(__dirname, '../../docs/Reference/Errors.md'), 'utf8')

const exportedKeys = Object.keys(errors)
Expand All @@ -830,7 +840,7 @@ test('Ensure that all errors are in Errors.md TOC', t => {
})

test('Ensure that non-existing errors are not in Errors.md TOC', t => {
t.plan(78)
t.plan(79)
const errorsMd = readFileSync(resolve(__dirname, '../../docs/Reference/Errors.md'), 'utf8')

const matchRE = / {4}- \[([A-Z0-9_]+)\]\(#[a-z0-9_]+\)/g
Expand All @@ -843,7 +853,7 @@ test('Ensure that non-existing errors are not in Errors.md TOC', t => {
})

test('Ensure that all errors are in Errors.md documented', t => {
t.plan(78)
t.plan(79)
const errorsMd = readFileSync(resolve(__dirname, '../../docs/Reference/Errors.md'), 'utf8')

const exportedKeys = Object.keys(errors)
Expand All @@ -855,7 +865,7 @@ test('Ensure that all errors are in Errors.md documented', t => {
})

test('Ensure that non-existing errors are not in Errors.md documented', t => {
t.plan(78)
t.plan(79)
const errorsMd = readFileSync(resolve(__dirname, '../../docs/Reference/Errors.md'), 'utf8')

const matchRE = /<a id="[0-9a-zA-Z_]+">([0-9a-zA-Z_]+)<\/a>/g
Expand Down

0 comments on commit 101ba57

Please sign in to comment.