Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit b001934

Browse files
committedMay 9, 2023
Re-implement R2 gateway using new storage system
Closes DEVX-592

File tree

8 files changed

+523
-249
lines changed

8 files changed

+523
-249
lines changed
 

‎packages/tre/src/plugins/r2/gateway.ts

+333-128
Large diffs are not rendered by default.
+38-80
Original file line numberDiff line numberDiff line change
@@ -1,108 +1,64 @@
1-
import assert from "assert";
2-
import crypto from "crypto";
3-
import { TextEncoder } from "util";
1+
import { Blob } from "buffer";
2+
import { ReadableStream, TransformStream } from "stream/web";
43
import type { R2StringChecksums } from "@cloudflare/workers-types/experimental";
54
import { R2Objects } from "./gateway";
65
import {
7-
BASE64_REGEXP,
86
HEX_REGEXP,
7+
ObjectRow,
98
R2HeadResponse,
109
R2HttpFields,
1110
R2Range,
1211
} from "./schemas";
1312

14-
const encoder = new TextEncoder();
15-
16-
export interface R2ObjectMetadata {
17-
// The object’s key.
18-
key: string;
19-
// Random unique string associated with a specific upload of a key.
20-
version: string;
21-
// Size of the object in bytes.
22-
size: number;
23-
// The etag associated with the object upload.
24-
etag: string;
25-
// The object's etag, in quotes to be returned as a header.
26-
httpEtag: string;
27-
// The time the object was uploaded.
28-
uploaded: number;
29-
// Various HTTP headers associated with the object. Refer to HTTP Metadata:
30-
// https://developers.cloudflare.com/r2/runtime-apis/#http-metadata.
31-
httpMetadata: R2HttpFields;
32-
// A map of custom, user-defined metadata associated with the object.
33-
customMetadata: Record<string, string>;
34-
// If a GET request was made with a range option, this will be added
35-
range?: R2Range;
36-
// Hashes used to check the received object’s integrity. At most one can be
37-
// specified.
38-
checksums?: R2StringChecksums;
39-
}
40-
4113
export interface EncodedMetadata {
4214
metadataSize: number;
43-
value: Uint8Array;
15+
value: ReadableStream<Uint8Array>;
4416
}
4517

46-
export function createVersion(): string {
47-
return crypto.randomBytes(16).toString("hex");
48-
}
49-
50-
/**
51-
* R2Object is created when you PUT an object into an R2 bucket.
52-
* R2Object represents the metadata of an object based on the information
53-
* provided by the uploader. Every object that you PUT into an R2 bucket
54-
* will have an R2Object created.
55-
*/
56-
export class R2Object implements R2ObjectMetadata {
18+
export class R2Object {
5719
readonly key: string;
5820
readonly version: string;
5921
readonly size: number;
6022
readonly etag: string;
61-
readonly httpEtag: string;
6223
readonly uploaded: number;
6324
readonly httpMetadata: R2HttpFields;
6425
readonly customMetadata: Record<string, string>;
6526
readonly range?: R2Range;
6627
readonly checksums: R2StringChecksums;
6728

68-
constructor(metadata: R2ObjectMetadata) {
69-
this.key = metadata.key;
70-
this.version = metadata.version;
71-
this.size = metadata.size;
72-
this.etag = metadata.etag;
73-
this.httpEtag = metadata.httpEtag;
74-
this.uploaded = metadata.uploaded;
75-
this.httpMetadata = metadata.httpMetadata;
76-
this.customMetadata = metadata.customMetadata;
77-
this.range = metadata.range;
29+
constructor(row: Omit<ObjectRow, "blob_id">, range?: R2Range) {
30+
this.key = row.key;
31+
this.version = row.version;
32+
this.size = row.size;
33+
this.etag = row.etag;
34+
this.uploaded = row.uploaded;
35+
this.httpMetadata = JSON.parse(row.http_metadata);
36+
this.customMetadata = JSON.parse(row.custom_metadata);
37+
this.range = range;
7838

7939
// For non-multipart uploads, we always need to store an MD5 hash in
80-
// `checksums`, but never explicitly stored one. Luckily, `R2Bucket#put()`
81-
// always makes `etag` an MD5 hash.
82-
const checksums: R2StringChecksums = { ...metadata.checksums };
83-
const etag = metadata.etag;
84-
if (etag.length === 32 && HEX_REGEXP.test(etag)) {
85-
checksums.md5 = metadata.etag;
86-
} else if (etag.length === 24 && BASE64_REGEXP.test(etag)) {
87-
// TODO: remove this when we switch underlying storage mechanisms
88-
// Previous versions of Miniflare 3 base64 encoded `etag` instead
89-
checksums.md5 = Buffer.from(etag, "base64").toString("hex");
90-
} else {
91-
assert.fail("Expected `etag` to be an MD5 hash");
40+
// `checksums`. To avoid data duplication, we just use `etag` for this.
41+
const checksums: R2StringChecksums = JSON.parse(row.checksums);
42+
if (this.etag.length === 32 && HEX_REGEXP.test(this.etag)) {
43+
checksums.md5 = row.etag;
9244
}
9345
this.checksums = checksums;
9446
}
9547

9648
// Format for return to the Workers Runtime
9749
#rawProperties(): R2HeadResponse {
9850
return {
99-
...this,
10051
name: this.key,
52+
version: this.version,
53+
size: this.size,
54+
etag: this.etag,
55+
uploaded: this.uploaded,
10156
httpFields: this.httpMetadata,
10257
customFields: Object.entries(this.customMetadata).map(([k, v]) => ({
10358
k,
10459
v,
10560
})),
61+
range: this.range,
10662
checksums: {
10763
0: this.checksums.md5,
10864
1: this.checksums.sha1,
@@ -115,36 +71,38 @@ export class R2Object implements R2ObjectMetadata {
11571

11672
encode(): EncodedMetadata {
11773
const json = JSON.stringify(this.#rawProperties());
118-
const bytes = encoder.encode(json);
119-
return { metadataSize: bytes.length, value: bytes };
74+
const blob = new Blob([json]);
75+
return { metadataSize: blob.size, value: blob.stream() };
12076
}
12177

12278
static encodeMultiple(objects: R2Objects): EncodedMetadata {
12379
const json = JSON.stringify({
12480
...objects,
12581
objects: objects.objects.map((o) => o.#rawProperties()),
12682
});
127-
const bytes = encoder.encode(json);
128-
return { metadataSize: bytes.length, value: bytes };
83+
const blob = new Blob([json]);
84+
return { metadataSize: blob.size, value: blob.stream() };
12985
}
13086
}
13187

13288
export class R2ObjectBody extends R2Object {
133-
readonly body: Uint8Array;
134-
135-
constructor(metadata: R2ObjectMetadata, body: Uint8Array) {
136-
super(metadata);
137-
this.body = body;
89+
constructor(
90+
metadata: Omit<ObjectRow, "blob_id">,
91+
readonly body: ReadableStream<Uint8Array>,
92+
range?: R2Range
93+
) {
94+
super(metadata, range);
13895
}
13996

14097
encode(): EncodedMetadata {
14198
const { metadataSize, value: metadata } = super.encode();
142-
const merged = new Uint8Array(metadataSize + this.body.length);
143-
merged.set(metadata);
144-
merged.set(this.body, metadataSize);
99+
const identity = new TransformStream();
100+
void metadata
101+
.pipeTo(identity.writable, { preventClose: true })
102+
.then(() => this.body.pipeTo(identity.writable));
145103
return {
146104
metadataSize: metadataSize,
147-
value: merged,
105+
value: identity.readable,
148106
};
149107
}
150108
}

‎packages/tre/src/plugins/r2/router.ts

+51-16
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { TextDecoder } from "util";
1+
import assert from "assert";
2+
import { ReadableStream, TransformStream } from "stream/web";
23
import { Request, Response } from "../../http";
34
import {
45
CfHeader,
@@ -13,23 +14,46 @@ import { R2Gateway, R2Objects } from "./gateway";
1314
import { EncodedMetadata, R2Object, R2ObjectBody } from "./r2Object";
1415
import { R2BindingRequestSchema } from "./schemas";
1516

16-
const decoder = new TextDecoder();
17-
1817
async function decodeMetadata(req: Request) {
19-
const bytes = await req.arrayBuffer();
20-
2118
const metadataSize = Number(req.headers.get(CfHeader.MetadataSize));
22-
if (Number.isNaN(metadataSize)) {
23-
throw new InvalidMetadata();
19+
if (Number.isNaN(metadataSize)) throw new InvalidMetadata();
20+
21+
assert(req.body !== null);
22+
const body = req.body as ReadableStream<Uint8Array>;
23+
24+
// Read just metadata from body stream
25+
const chunks: Uint8Array[] = [];
26+
let chunksLength = 0;
27+
for await (const chunk of body.values({ preventCancel: true })) {
28+
chunks.push(chunk);
29+
chunksLength += chunk.byteLength;
30+
// Once we've read enough bytes, stop
31+
if (chunksLength >= metadataSize) break;
2432
}
33+
// If we read the entire stream without enough bytes for metadata, throw
34+
if (chunksLength < metadataSize) throw new InvalidMetadata();
35+
const atLeastMetadata = Buffer.concat(chunks, chunksLength);
36+
const metadataJson = atLeastMetadata.subarray(0, metadataSize).toString();
37+
const metadata = R2BindingRequestSchema.parse(JSON.parse(metadataJson));
2538

26-
const [metadataBytes, value] = [
27-
bytes.slice(0, metadataSize),
28-
bytes.slice(metadataSize),
29-
];
30-
const metadataText = decoder.decode(metadataBytes);
31-
const metadata = R2BindingRequestSchema.parse(JSON.parse(metadataText));
32-
return { metadata, value: new Uint8Array(value) };
39+
let value = body;
40+
// If we read some value when reading metadata (quite likely), create a new
41+
// stream, write the bit we read, then write the rest of the body stream
42+
if (chunksLength > metadataSize) {
43+
const identity = new TransformStream();
44+
const writer = identity.writable.getWriter();
45+
// The promise returned by `writer.write()` will only resolve once the chunk
46+
// is read, which won't be until after this function returns, so we can't
47+
// use `await` here
48+
void writer.write(atLeastMetadata.subarray(metadataSize)).then(() => {
49+
// Release the writer without closing the stream
50+
writer.releaseLock();
51+
return body.pipeTo(identity.writable);
52+
});
53+
value = identity.readable;
54+
}
55+
56+
return { metadata, metadataSize, value };
3357
}
3458
function decodeHeaderMetadata(req: Request) {
3559
const header = req.headers.get(CfHeader.Request);
@@ -80,7 +104,7 @@ export class R2Router extends Router<R2Gateway> {
80104

81105
@PUT("/:bucket")
82106
put: RouteHandler<R2Params> = async (req, params) => {
83-
const { metadata, value } = await decodeMetadata(req);
107+
const { metadata, metadataSize, value } = await decodeMetadata(req);
84108
const persist = decodePersist(req.headers);
85109
const gateway = this.gatewayFactory.get(params.bucket, persist);
86110

@@ -90,7 +114,18 @@ export class R2Router extends Router<R2Gateway> {
90114
);
91115
return new Response();
92116
} else if (metadata.method === "put") {
93-
const result = await gateway.put(metadata.object, value, metadata);
117+
const contentLength = Number(req.headers.get("Content-Length"));
118+
// `workerd` requires a known value size for R2 put requests:
119+
// - https://github.com/cloudflare/workerd/blob/e3479895a2ace28e4fd5f1399cea4c92291966ab/src/workerd/api/r2-rpc.c%2B%2B#L154-L156
120+
// - https://github.com/cloudflare/workerd/blob/e3479895a2ace28e4fd5f1399cea4c92291966ab/src/workerd/api/r2-rpc.c%2B%2B#L188-L189
121+
assert(!isNaN(contentLength));
122+
const valueSize = contentLength - metadataSize;
123+
const result = await gateway.put(
124+
metadata.object,
125+
value,
126+
valueSize,
127+
metadata
128+
);
94129
return encodeResult(result);
95130
} else {
96131
throw new InternalError(); // Unknown method: should never be reached

‎packages/tre/src/plugins/r2/schemas.ts

+30
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,30 @@
11
import { z } from "zod";
22

3+
export interface ObjectRow {
4+
key: string;
5+
blob_id: string | null; // null if multipart
6+
version: string;
7+
size: number; // total size of object (all parts) in bytes
8+
etag: string; // hex MD5 hash if not multipart
9+
uploaded: number; // milliseconds since unix epoch
10+
checksums: string; // JSON-serialised `R2StringChecksums` (workers-types)
11+
http_metadata: string; // JSON-serialised `R2HTTPMetadata` (workers-types)
12+
custom_metadata: string; // JSON-serialised user-defined metadata
13+
}
14+
export const SQL_SCHEMA = `
15+
CREATE TABLE IF NOT EXISTS _mf_objects (
16+
key TEXT PRIMARY KEY,
17+
blob_id TEXT,
18+
version TEXT NOT NULL,
19+
size INTEGER NOT NULL,
20+
etag TEXT NOT NULL,
21+
uploaded INTEGER NOT NULL,
22+
checksums TEXT NOT NULL,
23+
http_metadata TEXT NOT NULL,
24+
custom_metadata TEXT NOT NULL
25+
);
26+
`;
27+
328
// https://github.com/cloudflare/workerd/blob/4290f9717bc94647d9c8afd29602cdac97fdff1b/src/workerd/api/r2-api.capnp
429

530
export const HEX_REGEXP = /^[0-9a-f]*$/i;
@@ -193,6 +218,11 @@ export const R2BindingRequestSchema = z.union([
193218
R2DeleteRequestSchema,
194219
]);
195220

221+
export type OmitRequest<T> = Omit<T, "method" | "object">;
222+
export type R2GetOptions = OmitRequest<z.infer<typeof R2GetRequestSchema>>;
223+
export type R2PutOptions = OmitRequest<z.infer<typeof R2PutRequestSchema>>;
224+
export type R2ListOptions = OmitRequest<z.infer<typeof R2ListRequestSchema>>;
225+
196226
export interface R2ErrorResponse {
197227
version: number;
198228
v4code: number;

‎packages/tre/src/plugins/r2/validator.ts

+51-10
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
1-
import crypto from "crypto";
1+
import assert from "assert";
22
import { R2StringChecksums } from "@cloudflare/workers-types/experimental";
3+
import { InclusiveRange } from "../../storage2";
4+
import { _parseRanges } from "../shared";
35
import {
46
BadDigest,
57
EntityTooLarge,
68
InvalidMaxKeys,
79
InvalidObjectName,
10+
InvalidRange,
811
MetadataTooLarge,
912
PreconditionFailed,
1013
} from "./errors";
11-
import { R2Object, R2ObjectMetadata } from "./r2Object";
12-
import { R2Conditional } from "./schemas";
14+
import { R2Object } from "./r2Object";
15+
import { R2Conditional, R2GetOptions } from "./schemas";
1316

1417
export const MAX_LIST_KEYS = 1_000;
1518
const MAX_KEY_SIZE = 1024;
@@ -28,7 +31,7 @@ function truncateToSeconds(ms: number) {
2831
/** @internal */
2932
export function _testR2Conditional(
3033
cond: R2Conditional,
31-
metadata?: Pick<R2ObjectMetadata, "etag" | "uploaded">
34+
metadata?: Pick<R2Object, "etag" | "uploaded">
3235
): boolean {
3336
// Adapted from internal R2 gateway implementation.
3437
// See also https://datatracker.ietf.org/doc/html/rfc7232#section-6.
@@ -79,12 +82,14 @@ function serialisedLength(x: string) {
7982
}
8083

8184
export class Validator {
82-
hash(value: Uint8Array, hashes: R2Hashes): R2StringChecksums {
85+
hash(digests: Map<string, Buffer>, hashes: R2Hashes): R2StringChecksums {
8386
const checksums: R2StringChecksums = {};
8487
for (const { name, field } of R2_HASH_ALGORITHMS) {
8588
const providedHash = hashes[field];
8689
if (providedHash !== undefined) {
87-
const computedHash = crypto.createHash(field).update(value).digest();
90+
const computedHash = digests.get(field);
91+
// Should've computed all required digests
92+
assert(computedHash !== undefined);
8893
if (!providedHash.equals(computedHash)) {
8994
throw new BadDigest(name, providedHash, computedHash);
9095
}
@@ -96,17 +101,53 @@ export class Validator {
96101
return checksums;
97102
}
98103

99-
condition(meta?: R2Object, onlyIf?: R2Conditional): Validator {
104+
condition(
105+
meta?: R2Object | Pick<R2Object, "etag" | "uploaded">,
106+
onlyIf?: R2Conditional
107+
): Validator {
100108
if (onlyIf !== undefined && !_testR2Conditional(onlyIf, meta)) {
101109
let error = new PreconditionFailed();
102-
if (meta !== undefined) error = error.attach(meta);
110+
if (meta instanceof R2Object) error = error.attach(meta);
103111
throw error;
104112
}
105113
return this;
106114
}
107115

108-
size(value: Uint8Array): Validator {
109-
if (value.byteLength > MAX_VALUE_SIZE) {
116+
range(
117+
options: Pick<R2GetOptions, "rangeHeader" | "range">,
118+
size: number
119+
): InclusiveRange | undefined {
120+
if (options.rangeHeader !== undefined) {
121+
const ranges = _parseRanges(options.rangeHeader, size);
122+
// If the header contained a single range, use it. Otherwise, if the
123+
// header was invalid, or contained multiple ranges, just return the full
124+
// response.
125+
if (ranges?.length === 1) {
126+
const [start, end] = ranges[0];
127+
return { start, end };
128+
}
129+
} else if (options.range !== undefined) {
130+
let { offset, length, suffix } = options.range;
131+
// Eliminate suffix is specified
132+
if (suffix !== undefined) {
133+
if (suffix <= 0) throw new InvalidRange();
134+
if (suffix > size) suffix = size;
135+
offset = size - suffix;
136+
length = suffix;
137+
}
138+
// Validate offset and length
139+
if (offset === undefined) offset = 0;
140+
if (length === undefined) length = size - offset;
141+
if (offset < 0 || offset > size || length <= 0) throw new InvalidRange();
142+
// Clamp length to maximum
143+
if (offset + length > size) length = size - offset;
144+
// Convert to inclusive range
145+
return { start: offset, end: offset + length - 1 }; // TODO: check -1
146+
}
147+
}
148+
149+
size(size: number): Validator {
150+
if (size > MAX_VALUE_SIZE) {
110151
throw new EntityTooLarge();
111152
}
112153
return this;

‎packages/tre/src/storage2/sql.ts

+5
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,8 @@ export type TypedDatabase = Omit<Database, "prepare"> & {
1919
? TypedStatement<Params, SingleResult>
2020
: TypedStatement<[Params], SingleResult>;
2121
};
22+
23+
export function escapeLike(prefix: string) {
24+
// Prefix all instances of `\`, `_` and `%` with `\`
25+
return prefix.replace(/[\\_%]/g, "\\$&");
26+
}

‎packages/tre/test/plugins/r2/index.spec.ts

+13-13
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,7 @@ import {
3636
} from "@miniflare/tre";
3737
import { Macro, ThrowsExpectation } from "ava";
3838
import { z } from "zod";
39-
import {
40-
MiniflareTestContext,
41-
miniflareTest,
42-
useTmp,
43-
utf8Decode,
44-
} from "../../test-shared";
39+
import { MiniflareTestContext, miniflareTest, useTmp } from "../../test-shared";
4540
import { isWithin } from "../../test-shared/asserts";
4641

4742
const WITHIN_EPSILON = 10_000;
@@ -967,9 +962,7 @@ test(
967962
{
968963
keys: ["key1", "key2", "key3", "key4"],
969964
options: { startAfter: "key1", limit: 2 },
970-
// TODO(soon): this should be `[["key2", "key3"], ["key4"]]`, see comment in
971-
// `gateway.ts` for details, we'll fix this with the new storage system
972-
pages: [["key2"], ["key3", "key4"]],
965+
pages: [["key2", "key3"], ["key4"]],
973966
}
974967
);
975968
test(
@@ -1204,16 +1197,20 @@ test.serial("operations persist stored data", async (t) => {
12041197

12051198
// Create new temporary file-system persistence directory
12061199
const tmp = await useTmp(t);
1207-
const storage = new FileStorage(path.join(tmp, "bucket"));
1200+
const legacyStorage = new FileStorage(path.join(tmp, "bucket"));
1201+
const newStorage = legacyStorage.getNewStorage();
12081202

12091203
// Set option, then reset after test
12101204
await t.context.setOptions({ ...opts, r2Persist: tmp });
12111205
t.teardown(() => t.context.setOptions(opts));
12121206

12131207
// Check put respects persist
12141208
await r2.put("key", "value");
1215-
const stored = await storage.get(`${ns}key`);
1216-
t.is(utf8Decode(stored?.value), "value");
1209+
const stmtListByNs = newStorage.db.prepare<{ ns: string }, { key: string }>(
1210+
"SELECT key FROM _mf_objects WHERE key LIKE :ns || '%'"
1211+
);
1212+
let stored = stmtListByNs.all({ ns });
1213+
t.deepEqual(stored, [{ key: `${ns}key` }]);
12171214

12181215
// Check head respects persist
12191216
const object = await r2.head("key");
@@ -1230,5 +1227,8 @@ test.serial("operations persist stored data", async (t) => {
12301227

12311228
// Check delete respects persist
12321229
await r2.delete("key");
1233-
t.false(await storage.has(`${ns}key`));
1230+
stored = stmtListByNs.all({ ns });
1231+
t.deepEqual(stored, []);
12341232
});
1233+
1234+
// TODO: add tests for empty key

‎packages/tre/test/plugins/r2/validator.spec.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { R2Conditional } from "@cloudflare/workers-types/experimental";
2-
import { R2ObjectMetadata, _testR2Conditional } from "@miniflare/tre";
2+
import { R2Object, _testR2Conditional } from "@miniflare/tre";
33
import test from "ava";
44

55
test("testR2Conditional: matches various conditions", (t) => {
@@ -11,7 +11,7 @@ test("testR2Conditional: matches various conditions", (t) => {
1111
const pastDate = new Date(uploadedDate.getTime() - 30_000);
1212
const futureDate = new Date(uploadedDate.getTime() + 30_000);
1313

14-
const metadata: Pick<R2ObjectMetadata, "etag" | "uploaded"> = {
14+
const metadata: Pick<R2Object, "etag" | "uploaded"> = {
1515
etag,
1616
uploaded: uploadedDate.getTime(),
1717
};

0 commit comments

Comments
 (0)
This repository has been archived.