Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 4ada92b

Browse files
committedMay 9, 2023

28 files changed

+1840
-245
lines changed
 

‎packages/tre/src/index.ts

+18-5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import {
3333
HEADER_CF_BLOB,
3434
HEADER_PROBE,
3535
PLUGIN_ENTRIES,
36+
Persistence,
3637
Plugins,
3738
SERVICE_ENTRY,
3839
SOCKET_ENTRY,
@@ -63,17 +64,18 @@ import {
6364
serializeConfig,
6465
} from "./runtime";
6566
import {
66-
Clock,
6767
HttpError,
6868
Log,
6969
MiniflareCoreError,
7070
Mutex,
7171
NoOpLog,
7272
OptionalZodTypeOf,
73-
defaultClock,
73+
Timers,
74+
defaultTimers,
7475
formatResponse,
7576
} from "./shared";
7677
import { anyAbortSignal } from "./shared/signal";
78+
import { NewStorage } from "./storage2";
7779
import { waitForRequest } from "./wait";
7880

7981
// ===== `Miniflare` User Options =====
@@ -262,7 +264,7 @@ export class Miniflare {
262264
#sharedOpts: PluginSharedOptions;
263265
#workerOpts: PluginWorkerOptions[];
264266
#log: Log;
265-
readonly #clock: Clock;
267+
readonly #timers: Timers;
266268

267269
#runtime?: Runtime;
268270
#removeRuntimeExitHook?: () => void;
@@ -314,7 +316,7 @@ export class Miniflare {
314316
this.#sharedOpts = sharedOpts;
315317
this.#workerOpts = workerOpts;
316318
this.#log = this.#sharedOpts.core.log ?? new NoOpLog();
317-
this.#clock = this.#sharedOpts.core.clock ?? defaultClock;
319+
this.#timers = this.#sharedOpts.core.timers ?? defaultTimers;
318320
this.#initPlugins();
319321

320322
this.#liveReloadServer = new WebSocketServer({ noServer: true });
@@ -360,7 +362,7 @@ export class Miniflare {
360362
if (plugin.gateway !== undefined && plugin.router !== undefined) {
361363
const gatewayFactory = new GatewayFactory<any>(
362364
this.#log,
363-
this.#clock,
365+
this.#timers,
364366
this.#sharedOpts.core.cloudflareFetch,
365367
key,
366368
plugin.gateway,
@@ -852,6 +854,17 @@ export class Miniflare {
852854
return response;
853855
}
854856

857+
/** @internal */
858+
_getPluginStorage(
859+
plugin: keyof Plugins,
860+
namespace: string,
861+
persist?: Persistence
862+
): NewStorage {
863+
const factory = this.#gatewayFactories[plugin];
864+
assert(factory !== undefined);
865+
return factory.getStorage(namespace, persist).getNewStorage();
866+
}
867+
855868
async dispose(): Promise<void> {
856869
this.#disposeController.abort();
857870
try {

‎packages/tre/src/plugins/cache/gateway.ts

+11-7
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import http from "http";
44
import { ReadableStream, TransformStream } from "stream/web";
55
import CachePolicy from "http-cache-semantics";
66
import { Headers, HeadersInit, Request, Response, fetch } from "../../http";
7-
import { Clock, DeferredPromise, Log } from "../../shared";
7+
import { DeferredPromise, Log, Timers } from "../../shared";
88
import { Storage } from "../../storage";
99
import {
1010
InclusiveRange,
@@ -26,7 +26,7 @@ interface CacheMetadata {
2626
size: number;
2727
}
2828

29-
function getExpiration(clock: Clock, req: Request, res: Response) {
29+
function getExpiration(timers: Timers, req: Request, res: Response) {
3030
// Cloudflare ignores request Cache-Control
3131
const reqHeaders = normaliseHeaders(req.headers);
3232
delete reqHeaders["cache-control"];
@@ -59,7 +59,7 @@ function getExpiration(clock: Clock, req: Request, res: Response) {
5959
// @ts-expect-error `now` isn't included in CachePolicy's type definitions
6060
const originalNow = CachePolicy.prototype.now;
6161
// @ts-expect-error `now` isn't included in CachePolicy's type definitions
62-
CachePolicy.prototype.now = clock;
62+
CachePolicy.prototype.now = timers.now;
6363
try {
6464
const policy = new CachePolicy(cacheReq, cacheRes, { shared: true });
6565

@@ -230,9 +230,13 @@ class SizingStream extends TransformStream<Uint8Array, Uint8Array> {
230230
export class CacheGateway {
231231
private readonly storage: KeyValueStorage<CacheMetadata>;
232232

233-
constructor(log: Log, legacyStorage: Storage, private readonly clock: Clock) {
233+
constructor(
234+
private readonly log: Log,
235+
legacyStorage: Storage,
236+
private readonly timers: Timers
237+
) {
234238
const storage = legacyStorage.getNewStorage();
235-
this.storage = new KeyValueStorage(storage, clock);
239+
this.storage = new KeyValueStorage(storage, timers);
236240
}
237241

238242
async match(request: Request, cacheKey?: string): Promise<Response> {
@@ -291,7 +295,7 @@ export class CacheGateway {
291295
assert(body !== null);
292296

293297
const { storable, expiration, headers } = getExpiration(
294-
this.clock,
298+
this.timers,
295299
request,
296300
response
297301
);
@@ -321,7 +325,7 @@ export class CacheGateway {
321325
await this.storage.put({
322326
key: cacheKey,
323327
value: body,
324-
expiration: this.clock() + expiration,
328+
expiration: this.timers.now() + expiration,
325329
metadata,
326330
});
327331
return new Response(null, { status: 204 });

‎packages/tre/src/plugins/core/index.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
Log,
1717
LogLevel,
1818
MiniflareCoreError,
19+
Timers,
1920
} from "../../shared";
2021
import { getCacheServiceName } from "../cache";
2122
import { DURABLE_OBJECTS_STORAGE_SERVICE_NAME } from "../do";
@@ -68,7 +69,7 @@ export const CoreSharedOptionsSchema = z.object({
6869
verbose: z.boolean().optional(),
6970

7071
log: z.instanceof(Log).optional(),
71-
clock: z.function().returns(z.number()).optional(),
72+
timers: z.custom<Timers>().optional(),
7273
cloudflareFetch: CloudflareFetchSchema.optional(),
7374

7475
// TODO: add back validation of cf object

‎packages/tre/src/plugins/do/gateway.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
import { Clock, Log } from "../../shared";
1+
import { Log, Timers } from "../../shared";
22
import { Storage } from "../../storage";
33

44
export class DurableObjectsStorageGateway {
55
constructor(
66
private readonly log: Log,
77
private readonly storage: Storage,
8-
private readonly clock: Clock
8+
private readonly timers: Timers
99
) {}
1010

1111
async get(_key: string) {

‎packages/tre/src/plugins/kv/gateway.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { ReadableStream, TransformStream } from "stream/web";
22
import {
3-
Clock,
43
HttpError,
54
Log,
5+
Timers,
66
maybeApply,
77
millisToSeconds,
88
secondsToMillis,
@@ -159,10 +159,10 @@ export class KVGateway {
159159
constructor(
160160
private readonly log: Log,
161161
legacyStorage: Storage,
162-
private readonly clock: Clock
162+
private readonly timers: Timers
163163
) {
164164
const storage = legacyStorage.getNewStorage();
165-
this.storage = new KeyValueStorage(storage, clock);
165+
this.storage = new KeyValueStorage(storage, timers);
166166
}
167167

168168
async get<Metadata = unknown>(
@@ -187,7 +187,7 @@ export class KVGateway {
187187
validateKey(key);
188188

189189
// Normalise and validate expiration
190-
const now = millisToSeconds(this.clock());
190+
const now = millisToSeconds(this.timers.now());
191191
let expiration = normaliseInt(options.expiration);
192192
const expirationTtl = normaliseInt(options.expirationTtl);
193193
if (expirationTtl !== undefined) {

‎packages/tre/src/plugins/kv/remote.ts

+15-11
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import assert from "assert";
22
import { Blob } from "buffer";
33
import { z } from "zod";
44
import { BodyInit, FormData, Response } from "../../http";
5-
import { Clock, millisToSeconds } from "../../shared";
5+
import { Timers, millisToSeconds } from "../../shared";
66
import {
77
RemoteStorage,
88
StorageListOptions,
@@ -78,16 +78,17 @@ const DEFAULT_CACHE_TTL = 60;
7878
// Returns seconds since UNIX epoch key should expire, using the specified
7979
// expiration only if it is sooner than the cache TTL
8080
function getCacheExpiration(
81-
clock: Clock,
81+
timers: Timers,
8282
expiration?: number,
8383
cacheTtl = DEFAULT_CACHE_TTL
8484
): number {
8585
// Return minimum expiration
86-
const cacheExpiration = millisToSeconds(clock()) + cacheTtl;
86+
const cacheExpiration = millisToSeconds(timers.now()) + cacheTtl;
8787
if (expiration === undefined || isNaN(expiration)) return cacheExpiration;
8888
else return Math.min(cacheExpiration, expiration);
8989
}
9090

91+
// TODO(soon): remove all this
9192
export class KVRemoteStorage extends RemoteStorage {
9293
async get(
9394
key: string,
@@ -100,7 +101,7 @@ export class KVRemoteStorage extends RemoteStorage {
100101
// cacheTtl may have changed between the original get call that cached
101102
// this value and now, so check the cache is still fresh with the new TTL
102103
const newExpiration = cachedValue.metadata.storedAt + cacheTtl;
103-
if (newExpiration >= millisToSeconds(this.clock())) {
104+
if (newExpiration >= millisToSeconds(this.timers.now())) {
104105
// If the cache is still fresh, update the expiration and return
105106
await this.cache.put<RemoteCacheMetadata>(key, {
106107
value: cachedValue.value,
@@ -157,9 +158,9 @@ export class KVRemoteStorage extends RemoteStorage {
157158
const result: StoredValueMeta = { value, expiration, metadata };
158159
await this.cache.put<RemoteCacheMetadata>(key, {
159160
value: result.value,
160-
expiration: getCacheExpiration(this.clock, expiration, cacheTtl),
161+
expiration: getCacheExpiration(this.timers, expiration, cacheTtl),
161162
metadata: {
162-
storedAt: millisToSeconds(this.clock()),
163+
storedAt: millisToSeconds(this.timers.now()),
163164
actualExpiration: result.expiration,
164165
actualMetadata: result.metadata,
165166
},
@@ -176,7 +177,7 @@ export class KVRemoteStorage extends RemoteStorage {
176177
if (value.expiration !== undefined) {
177178
// Send expiration as TTL to avoid "expiration times must be at least 60s
178179
// in the future" issues from clock skew when setting `expirationTtl: 60`.
179-
const desiredTtl = value.expiration - millisToSeconds(this.clock());
180+
const desiredTtl = value.expiration - millisToSeconds(this.timers.now());
180181
const ttl = Math.max(desiredTtl, 60);
181182
searchParams.set("expiration_ttl", ttl.toString());
182183
}
@@ -197,9 +198,9 @@ export class KVRemoteStorage extends RemoteStorage {
197198
// Store this value in the cache
198199
await this.cache.put<RemoteCacheMetadata>(key, {
199200
value: value.value,
200-
expiration: getCacheExpiration(this.clock, value.expiration),
201+
expiration: getCacheExpiration(this.timers, value.expiration),
201202
metadata: {
202-
storedAt: millisToSeconds(this.clock()),
203+
storedAt: millisToSeconds(this.timers.now()),
203204
actualExpiration: value.expiration,
204205
actualMetadata: value.metadata,
205206
},
@@ -219,8 +220,11 @@ export class KVRemoteStorage extends RemoteStorage {
219220
// "Store" delete in cache as tombstone
220221
await this.cache.put<RemoteCacheMetadata>(key, {
221222
value: new Uint8Array(),
222-
expiration: getCacheExpiration(this.clock),
223-
metadata: { storedAt: millisToSeconds(this.clock()), tombstone: true },
223+
expiration: getCacheExpiration(this.timers),
224+
metadata: {
225+
storedAt: millisToSeconds(this.timers.now()),
226+
tombstone: true,
227+
},
224228
});
225229

226230
// Technically, it's incorrect to always say we deleted the key by returning

‎packages/tre/src/plugins/r2/errors.ts

+44
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,17 @@ enum CfCode {
1414
InternalError = 10001,
1515
NoSuchObjectKey = 10007,
1616
EntityTooLarge = 100100,
17+
EntityTooSmall = 10011,
1718
MetadataTooLarge = 10012,
1819
InvalidObjectName = 10020,
1920
InvalidMaxKeys = 10022,
21+
NoSuchUpload = 10024,
22+
InvalidPart = 10025,
2023
InvalidArgument = 10029,
2124
PreconditionFailed = 10031,
2225
BadDigest = 10037,
2326
InvalidRange = 10039,
27+
BadUpload = 10048,
2428
}
2529

2630
export class R2Error extends HttpError {
@@ -110,6 +114,16 @@ export class EntityTooLarge extends R2Error {
110114
}
111115
}
112116

117+
export class EntityTooSmall extends R2Error {
118+
constructor() {
119+
super(
120+
Status.BadRequest,
121+
"Your proposed upload is smaller than the minimum allowed object size.",
122+
CfCode.EntityTooSmall
123+
);
124+
}
125+
}
126+
113127
export class MetadataTooLarge extends R2Error {
114128
constructor() {
115129
super(
@@ -160,6 +174,26 @@ export class InvalidMaxKeys extends R2Error {
160174
}
161175
}
162176

177+
export class NoSuchUpload extends R2Error {
178+
constructor() {
179+
super(
180+
Status.BadRequest,
181+
"The specified multipart upload does not exist.",
182+
CfCode.NoSuchUpload
183+
);
184+
}
185+
}
186+
187+
export class InvalidPart extends R2Error {
188+
constructor() {
189+
super(
190+
Status.BadRequest,
191+
"One or more of the specified parts could not be found.",
192+
CfCode.InvalidPart
193+
);
194+
}
195+
}
196+
163197
export class PreconditionFailed extends R2Error {
164198
constructor() {
165199
super(
@@ -179,3 +213,13 @@ export class InvalidRange extends R2Error {
179213
);
180214
}
181215
}
216+
217+
export class BadUpload extends R2Error {
218+
constructor() {
219+
super(
220+
Status.RangeNotSatisfiable,
221+
"There was a problem with the multipart upload.",
222+
CfCode.BadUpload
223+
);
224+
}
225+
}

‎packages/tre/src/plugins/r2/gateway.ts

+607-44
Large diffs are not rendered by default.

‎packages/tre/src/plugins/r2/r2Object.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ export class R2ObjectBody extends R2Object {
9696

9797
encode(): EncodedMetadata {
9898
const { metadataSize, value: metadata } = super.encode();
99-
const identity = new TransformStream();
99+
const identity = new TransformStream<Uint8Array, Uint8Array>();
100100
void metadata
101101
.pipeTo(identity.writable, { preventClose: true })
102102
.then(() => this.body.pipeTo(identity.writable));

‎packages/tre/src/plugins/r2/router.ts

+47-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,13 @@ async function decodeMetadata(req: Request) {
2121
assert(req.body !== null);
2222
const body = req.body as ReadableStream<Uint8Array>;
2323

24-
// Read just metadata from body stream
24+
// Read just metadata from body stream (NOTE: we can't use a `TransformStream`
25+
// and buffer the first N chunks as we need this metadata to determine what
26+
// to do with the rest of the body. We have to *pull* the data as opposed to
27+
// passively transforming it as it's piped somewhere else. If `body` were
28+
// a byte stream, we could use BYOB reads to read just enough. Even better, if
29+
// this were running in the Workers runtime, we could use `readAtLeast()` to
30+
// read everything at once.)
2531
const chunks: Uint8Array[] = [];
2632
let chunksLength = 0;
2733
for await (const chunk of body.values({ preventCancel: true })) {
@@ -40,7 +46,7 @@ async function decodeMetadata(req: Request) {
4046
// If we read some value when reading metadata (quite likely), create a new
4147
// stream, write the bit we read, then write the rest of the body stream
4248
if (chunksLength > metadataSize) {
43-
const identity = new TransformStream();
49+
const identity = new TransformStream<Uint8Array, Uint8Array>();
4450
const writer = identity.writable.getWriter();
4551
// The promise returned by `writer.write()` will only resolve once the chunk
4652
// is read, which won't be until after this function returns, so we can't
@@ -77,6 +83,16 @@ function encodeResult(result: R2Object | R2ObjectBody | R2Objects) {
7783
});
7884
}
7985

86+
function encodeJSONResult(result: unknown) {
87+
const encoded = JSON.stringify(result);
88+
return new Response(encoded, {
89+
headers: {
90+
[CfHeader.MetadataSize]: `${Buffer.byteLength(encoded)}`,
91+
"Content-Type": "application/json",
92+
},
93+
});
94+
}
95+
8096
export interface R2Params {
8197
bucket: string;
8298
}
@@ -127,6 +143,35 @@ export class R2Router extends Router<R2Gateway> {
127143
metadata
128144
);
129145
return encodeResult(result);
146+
} else if (metadata.method === "createMultipartUpload") {
147+
const result = await gateway.createMultipartUpload(
148+
metadata.object,
149+
metadata
150+
);
151+
return encodeJSONResult(result);
152+
} else if (metadata.method === "uploadPart") {
153+
const contentLength = Number(req.headers.get("Content-Length"));
154+
// `workerd` requires a known value size for R2 put requests as above
155+
assert(!isNaN(contentLength));
156+
const valueSize = contentLength - metadataSize;
157+
const result = await gateway.uploadPart(
158+
metadata.object,
159+
metadata.uploadId,
160+
metadata.partNumber,
161+
value,
162+
valueSize
163+
);
164+
return encodeJSONResult(result);
165+
} else if (metadata.method === "completeMultipartUpload") {
166+
const result = await gateway.completeMultipartUpload(
167+
metadata.object,
168+
metadata.uploadId,
169+
metadata.parts
170+
);
171+
return encodeResult(result);
172+
} else if (metadata.method === "abortMultipartUpload") {
173+
await gateway.abortMultipartUpload(metadata.object, metadata.uploadId);
174+
return new Response();
130175
} else {
131176
throw new InternalError(); // Unknown method: should never be reached
132177
}

‎packages/tre/src/plugins/r2/schemas.ts

+62-7
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { z } from "zod";
2+
import { ValueOf } from "../../shared";
23

34
export interface ObjectRow {
45
key: string;
@@ -11,6 +12,30 @@ export interface ObjectRow {
1112
http_metadata: string; // JSON-serialised `R2HTTPMetadata` (workers-types)
1213
custom_metadata: string; // JSON-serialised user-defined metadata
1314
}
15+
export const MultipartUploadState = {
16+
IN_PROGRESS: 0,
17+
COMPLETED: 1,
18+
ABORTED: 2,
19+
} as const;
20+
export interface MultipartUploadRow {
21+
upload_id: string;
22+
key: string;
23+
http_metadata: string; // JSON-serialised `R2HTTPMetadata` (workers-types)
24+
custom_metadata: string; // JSON-serialised user-defined metadata
25+
state: ValueOf<typeof MultipartUploadState>;
26+
// NOTE: we need to keep completed/aborted uploads around for referential
27+
// integrity, and because error messages are different when attempting to
28+
// upload parts to them
29+
}
30+
export interface MultipartPartRow {
31+
upload_id: string;
32+
part_number: number;
33+
blob_id: string;
34+
size: number; // NOTE: used to identify which parts to read for range requests
35+
etag: string; // NOTE: multipart part ETag's are not MD5 checksums
36+
checksum_md5: string; // NOTE: used in construction of final object's ETag
37+
object_key: string | null; // null if in-progress upload
38+
}
1439
export const SQL_SCHEMA = `
1540
CREATE TABLE IF NOT EXISTS _mf_objects (
1641
key TEXT PRIMARY KEY,
@@ -23,7 +48,27 @@ CREATE TABLE IF NOT EXISTS _mf_objects (
2348
http_metadata TEXT NOT NULL,
2449
custom_metadata TEXT NOT NULL
2550
);
51+
CREATE TABLE IF NOT EXISTS _mf_multipart_uploads (
52+
upload_id TEXT PRIMARY KEY,
53+
key TEXT NOT NULL,
54+
http_metadata TEXT NOT NULL,
55+
custom_metadata TEXT NOT NULL,
56+
state TINYINT DEFAULT 0 NOT NULL
57+
);
58+
CREATE TABLE IF NOT EXISTS _mf_multipart_parts (
59+
upload_id TEXT NOT NULL REFERENCES _mf_multipart_uploads(upload_id),
60+
part_number INTEGER NOT NULL,
61+
blob_id TEXT NOT NULL,
62+
size INTEGER NOT NULL,
63+
etag TEXT NOT NULL,
64+
checksum_md5 TEXT NOT NULL,
65+
object_key TEXT REFERENCES _mf_objects(key) DEFERRABLE INITIALLY DEFERRED,
66+
PRIMARY KEY (upload_id, part_number)
67+
);
2668
`;
69+
// NOTE: `object_key` foreign key constraint is deferred, meaning we can delete
70+
// the linked object row, *then* the multipart part rows in a transaction,
71+
// see https://www.sqlite.org/foreignkeys.html#fk_deferred for more details
2772

2873
// https://github.com/cloudflare/workerd/blob/4290f9717bc94647d9c8afd29602cdac97fdff1b/src/workerd/api/r2-api.capnp
2974

@@ -154,13 +199,19 @@ export const R2PutRequestSchema = z
154199
sha512: value.sha512,
155200
}));
156201

157-
// TODO: support multipart
158-
export const R2CreateMultipartUploadRequestSchema = z.object({
159-
method: z.literal("createMultipartUpload"),
160-
object: z.string(),
161-
customFields: RecordSchema.optional(),
162-
httpFields: R2HttpFieldsSchema.optional(),
163-
});
202+
export const R2CreateMultipartUploadRequestSchema = z
203+
.object({
204+
method: z.literal("createMultipartUpload"),
205+
object: z.string(),
206+
customFields: RecordSchema.optional(), // (renamed in transform)
207+
httpFields: R2HttpFieldsSchema.optional(), // (renamed in transform)
208+
})
209+
.transform((value) => ({
210+
method: value.method,
211+
object: value.object,
212+
customMetadata: value.customFields,
213+
httpMetadata: value.httpFields,
214+
}));
164215

165216
export const R2UploadPartRequestSchema = z.object({
166217
method: z.literal("uploadPart"),
@@ -206,6 +257,7 @@ export const R2DeleteRequestSchema = z.intersection(
206257

207258
// Not using `z.discriminatedUnion()` here, as that doesn't work with
208259
// intersection/transformed types.
260+
// TODO(someday): switch to proposed `z.switch()`: https://github.com/colinhacks/zod/issues/2106
209261
export const R2BindingRequestSchema = z.union([
210262
R2HeadRequestSchema,
211263
R2GetRequestSchema,
@@ -222,6 +274,9 @@ export type OmitRequest<T> = Omit<T, "method" | "object">;
222274
export type R2GetOptions = OmitRequest<z.infer<typeof R2GetRequestSchema>>;
223275
export type R2PutOptions = OmitRequest<z.infer<typeof R2PutRequestSchema>>;
224276
export type R2ListOptions = OmitRequest<z.infer<typeof R2ListRequestSchema>>;
277+
export type R2CreateMultipartUploadOptions = OmitRequest<
278+
z.infer<typeof R2CreateMultipartUploadRequestSchema>
279+
>;
225280

226281
export interface R2ErrorResponse {
227282
version: number;

‎packages/tre/src/plugins/r2/validator.ts

+7-12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import assert from "assert";
2-
import { R2StringChecksums } from "@cloudflare/workers-types/experimental";
2+
import type { R2StringChecksums } from "@cloudflare/workers-types/experimental";
33
import { InclusiveRange } from "../../storage2";
44
import { _parseRanges } from "../shared";
55
import {
@@ -102,13 +102,11 @@ export class Validator {
102102
}
103103

104104
condition(
105-
meta?: R2Object | Pick<R2Object, "etag" | "uploaded">,
105+
meta?: Pick<R2Object, "etag" | "uploaded">,
106106
onlyIf?: R2Conditional
107107
): Validator {
108108
if (onlyIf !== undefined && !_testR2Conditional(onlyIf, meta)) {
109-
let error = new PreconditionFailed();
110-
if (meta instanceof R2Object) error = error.attach(meta);
111-
throw error;
109+
throw new PreconditionFailed();
112110
}
113111
return this;
114112
}
@@ -121,14 +119,11 @@ export class Validator {
121119
const ranges = _parseRanges(options.rangeHeader, size);
122120
// If the header contained a single range, use it. Otherwise, if the
123121
// header was invalid, or contained multiple ranges, just return the full
124-
// response.
125-
if (ranges?.length === 1) {
126-
const [start, end] = ranges[0];
127-
return { start, end };
128-
}
122+
// response (by returning undefined from this function).
123+
if (ranges?.length === 1) return ranges[0];
129124
} else if (options.range !== undefined) {
130125
let { offset, length, suffix } = options.range;
131-
// Eliminate suffix is specified
126+
// Eliminate suffix if specified
132127
if (suffix !== undefined) {
133128
if (suffix <= 0) throw new InvalidRange();
134129
if (suffix > size) suffix = size;
@@ -142,7 +137,7 @@ export class Validator {
142137
// Clamp length to maximum
143138
if (offset + length > size) length = size - offset;
144139
// Convert to inclusive range
145-
return { start: offset, end: offset + length - 1 }; // TODO: check -1
140+
return { start: offset, end: offset + length - 1 };
146141
}
147142
}
148143

‎packages/tre/src/plugins/shared/gateway.ts

+12-8
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import { z } from "zod";
44
import { RequestInit, Response } from "../../http";
55
import {
66
Awaitable,
7-
Clock,
87
Log,
98
MiniflareCoreError,
9+
Timers,
1010
sanitisePath,
1111
} from "../../shared";
1212
import {
@@ -34,7 +34,7 @@ export const CloudflareFetchSchema =
3434
export type CloudflareFetch = z.infer<typeof CloudflareFetchSchema>;
3535

3636
export interface GatewayConstructor<Gateway> {
37-
new (log: Log, storage: Storage, clock: Clock): Gateway;
37+
new (log: Log, storage: Storage, timers: Timers): Gateway;
3838
}
3939

4040
export interface RemoteStorageConstructor {
@@ -62,7 +62,7 @@ export class GatewayFactory<Gateway> {
6262

6363
constructor(
6464
private readonly log: Log,
65-
private readonly clock: Clock,
65+
private readonly timers: Timers,
6666
private readonly cloudflareFetch: CloudflareFetch | undefined,
6767
private readonly pluginName: string,
6868
private readonly gatewayClass: GatewayConstructor<Gateway>,
@@ -74,7 +74,7 @@ export class GatewayFactory<Gateway> {
7474
if (storage !== undefined) return storage;
7575
this.#memoryStorages.set(
7676
namespace,
77-
(storage = new MemoryStorage(undefined, this.clock))
77+
(storage = new MemoryStorage(undefined, this.timers.now))
7878
);
7979
return storage;
8080
}
@@ -98,9 +98,13 @@ export class GatewayFactory<Gateway> {
9898
const root = path.join(fileURLToPath(url), sanitisedNamespace);
9999
const unsanitise =
100100
url.searchParams.get(PARAM_FILE_UNSANITISE) === "true";
101-
return new FileStorage(root, !unsanitise, this.clock);
101+
return new FileStorage(root, !unsanitise, this.timers.now);
102102
} else if (url.protocol === "sqlite:") {
103-
return new SqliteStorage(url.pathname, sanitisedNamespace, this.clock);
103+
return new SqliteStorage(
104+
url.pathname,
105+
sanitisedNamespace,
106+
this.timers.now
107+
);
104108
}
105109
// TODO: support Redis storage?
106110
if (url.protocol === "remote:") {
@@ -132,15 +136,15 @@ export class GatewayFactory<Gateway> {
132136
persist === true
133137
? path.join(DEFAULT_PERSIST_ROOT, this.pluginName, sanitisedNamespace)
134138
: path.join(persist, sanitisedNamespace);
135-
return new FileStorage(root, undefined, this.clock);
139+
return new FileStorage(root, undefined, this.timers.now);
136140
}
137141

138142
get(namespace: string, persist: Persistence): Gateway {
139143
const cached = this.#gateways.get(namespace);
140144
if (cached !== undefined && cached[0] === persist) return cached[1];
141145

142146
const storage = this.getStorage(namespace, persist);
143-
const gateway = new this.gatewayClass(this.log, storage, this.clock);
147+
const gateway = new this.gatewayClass(this.log, storage, this.timers);
144148
this.#gateways.set(namespace, [persist, gateway]);
145149
return gateway;
146150
}

‎packages/tre/src/shared/clock.ts

-10
This file was deleted.

‎packages/tre/src/shared/deferred.ts

-25
This file was deleted.

‎packages/tre/src/shared/index.ts

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
export * from "./clock";
1+
export * from "./timers";
22
export * from "./data";
3-
export * from "./deferred";
43
export * from "./error";
54
export * from "./event";
65
export * from "./log";
76
export * from "./matcher";
8-
export * from "./mutex";
7+
export * from "./sync";
98
export * from "./types";

‎packages/tre/src/shared/mutex.ts

-40
This file was deleted.

‎packages/tre/src/shared/sync.ts

+89
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import assert from "assert";
2+
import { Awaitable } from "./types";
3+
4+
export type DeferredPromiseResolve<T> = (value: T | PromiseLike<T>) => void;
5+
export type DeferredPromiseReject = (reason?: any) => void;
6+
7+
export class DeferredPromise<T> extends Promise<T> {
8+
readonly resolve: DeferredPromiseResolve<T>;
9+
readonly reject: DeferredPromiseReject;
10+
11+
constructor(
12+
executor: (
13+
resolve: (value: T | PromiseLike<T>) => void,
14+
reject: (reason?: any) => void
15+
) => void = () => {}
16+
) {
17+
let promiseResolve: DeferredPromiseResolve<T>;
18+
let promiseReject: DeferredPromiseReject;
19+
super((resolve, reject) => {
20+
promiseResolve = resolve;
21+
promiseReject = reject;
22+
return executor(resolve, reject);
23+
});
24+
// Cannot access `this` until after `super`
25+
this.resolve = promiseResolve!;
26+
this.reject = promiseReject!;
27+
}
28+
}
29+
30+
export class Mutex {
31+
private locked = false;
32+
private resolveQueue: (() => void)[] = [];
33+
34+
private lock(): Awaitable<void> {
35+
if (!this.locked) {
36+
this.locked = true;
37+
return;
38+
}
39+
return new Promise((resolve) => this.resolveQueue.push(resolve));
40+
}
41+
42+
private unlock(): void {
43+
assert(this.locked);
44+
if (this.resolveQueue.length > 0) {
45+
this.resolveQueue.shift()?.();
46+
} else {
47+
this.locked = false;
48+
}
49+
}
50+
51+
get hasWaiting(): boolean {
52+
return this.resolveQueue.length > 0;
53+
}
54+
55+
async runWith<T>(closure: () => Awaitable<T>): Promise<T> {
56+
const acquireAwaitable = this.lock();
57+
if (acquireAwaitable instanceof Promise) await acquireAwaitable;
58+
try {
59+
const awaitable = closure();
60+
if (awaitable instanceof Promise) return await awaitable;
61+
return awaitable;
62+
} finally {
63+
this.unlock();
64+
}
65+
}
66+
}
67+
68+
export class WaitGroup {
69+
private counter = 0;
70+
private resolveQueue: (() => void)[] = [];
71+
72+
add(): void {
73+
this.counter++;
74+
}
75+
76+
done(): void {
77+
assert(this.counter > 0);
78+
this.counter--;
79+
if (this.counter === 0) {
80+
let resolve: (() => void) | undefined;
81+
while ((resolve = this.resolveQueue.shift()) !== undefined) resolve();
82+
}
83+
}
84+
85+
wait(): Promise<void> {
86+
if (this.counter === 0) return Promise.resolve();
87+
return new Promise((resolve) => this.resolveQueue.push(resolve));
88+
}
89+
}

‎packages/tre/src/shared/timers.ts

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { Awaitable } from "./types";
2+
3+
export interface Timers {
4+
now(): number; // milliseconds
5+
queueMicrotask(closure: () => Awaitable<unknown>): void;
6+
// TODO(soon): `setTimeout`, for Queues batching
7+
}
8+
9+
export const defaultTimers: Timers = {
10+
now: () => Date.now(),
11+
queueMicrotask,
12+
};
13+
14+
// TODO(soon): remove once we remove the old storage system
15+
export type Clock = Timers["now"];
16+
export const defaultClock = defaultTimers.now;
17+
18+
export function millisToSeconds(millis: number): number {
19+
return Math.floor(millis / 1000);
20+
}
21+
22+
export function secondsToMillis(seconds: number): number {
23+
return seconds * 1000;
24+
}

‎packages/tre/src/storage/storage.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
Awaitable,
55
base64Decode,
66
base64Encode,
7-
defaultClock,
7+
defaultTimers,
88
lexicographicCompare,
99
nonCircularClone,
1010
} from "../shared";
@@ -219,7 +219,7 @@ export abstract class RemoteStorage extends Storage {
219219
protected readonly cache: Storage,
220220
protected readonly cloudflareFetch: CloudflareFetch,
221221
protected readonly namespace: string,
222-
protected readonly clock = defaultClock
222+
protected readonly timers = defaultTimers
223223
) {
224224
super();
225225
}

‎packages/tre/src/storage2/keyvalue.ts

+9-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import assert from "assert";
22
import { ReadableStream } from "stream/web";
3-
import { Awaitable, base64Decode, base64Encode, defaultClock } from "../shared";
3+
import {
4+
Awaitable,
5+
base64Decode,
6+
base64Encode,
7+
defaultTimers,
8+
} from "../shared";
49
import {
510
InclusiveRange,
611
MultipartOptions,
@@ -116,15 +121,15 @@ export class KeyValueStorage<Metadata = unknown> {
116121

117122
constructor(
118123
private readonly storage: NewStorage,
119-
private readonly clock = defaultClock
124+
private readonly timers = defaultTimers
120125
) {
121126
storage.db.pragma("case_sensitive_like = TRUE");
122127
storage.db.exec(SQL_SCHEMA);
123128
this.#stmts = sqlStmts(storage.db);
124129
}
125130

126131
#hasExpired(entry: Pick<Row, "expiration">) {
127-
return entry.expiration !== null && entry.expiration <= this.clock();
132+
return entry.expiration !== null && entry.expiration <= this.timers.now();
128133
}
129134

130135
#backgroundDelete(blobId: string) {
@@ -216,7 +221,7 @@ export class KeyValueStorage<Metadata = unknown> {
216221

217222
async list(opts: KeyEntriesQuery): Promise<KeyEntries<Metadata>> {
218223
// Find non-expired entries matching query after cursor
219-
const now = this.clock();
224+
const now = this.timers.now();
220225
const rows = this.#stmts.list.all({
221226
now,
222227
escaped_prefix: escapePrefix(opts.prefix ?? ""),

‎packages/tre/test/plugins/cache/index.spec.ts

+8-8
Original file line numberDiff line numberDiff line change
@@ -226,9 +226,9 @@ const expireMacro = test.macro({
226226
async exec(t, opts: { headers: HeadersInit; expectedTtl: number }) {
227227
// Reset clock to known time, restoring afterwards.
228228
// Note this macro must be used with `test.serial` to avoid races.
229-
const originalTimestamp = t.context.clock.timestamp;
230-
t.teardown(() => (t.context.clock.timestamp = originalTimestamp));
231-
t.context.clock.timestamp = 1_000_000; // 1000s
229+
const originalTimestamp = t.context.timers.timestamp;
230+
t.teardown(() => (t.context.timers.timestamp = originalTimestamp));
231+
t.context.timers.timestamp = 1_000_000; // 1000s
232232

233233
const key = "http://localhost/cache-expire";
234234
await t.context.mf.dispatchFetch(key, {
@@ -240,11 +240,11 @@ const expireMacro = test.macro({
240240
let res = await t.context.mf.dispatchFetch(key);
241241
t.is(res.status, 200);
242242

243-
t.context.clock.timestamp += opts.expectedTtl / 2;
243+
t.context.timers.timestamp += opts.expectedTtl / 2;
244244
res = await t.context.mf.dispatchFetch(key);
245245
t.is(res.status, 200);
246246

247-
t.context.clock.timestamp += opts.expectedTtl / 2;
247+
t.context.timers.timestamp += opts.expectedTtl / 2;
248248
res = await t.context.mf.dispatchFetch(key);
249249
t.is(res.status, 404);
250250
},
@@ -276,7 +276,7 @@ const isCachedMacro = test.macro({
276276
.digest("hex");
277277
const key = `http://localhost/cache-is-cached-${headersHash}`;
278278

279-
const expires = new Date(t.context.clock.timestamp + 2000).toUTCString();
279+
const expires = new Date(t.context.timers.timestamp + 2000).toUTCString();
280280
await t.context.mf.dispatchFetch(key, {
281281
method: "PUT",
282282
headers: {
@@ -406,15 +406,15 @@ test.serial("operations log warning on workers.dev subdomain", async (t) => {
406406
test.serial("operations persist cached data", async (t) => {
407407
// Create new temporary file-system persistence directory
408408
const tmp = await useTmp(t);
409-
const clock = () => t.context.clock.timestamp;
409+
const clock = () => t.context.timers.timestamp;
410410
// TODO(soon): clean up this mess once we've migrated all gateways
411411
const legacyStorage = new FileStorage(
412412
path.join(tmp, "default"),
413413
undefined,
414414
clock
415415
);
416416
const newStorage = legacyStorage.getNewStorage();
417-
const kvStorage = new KeyValueStorage(newStorage, clock);
417+
const kvStorage = new KeyValueStorage(newStorage, t.context.timers);
418418

419419
// Set option, then reset after test
420420
await t.context.setOptions({ cachePersist: tmp });

‎packages/tre/test/plugins/kv/gateway.spec.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import {
1616
TIME_EXPIRING,
1717
TIME_NOW,
1818
createJunkStream,
19-
testClock,
19+
testTimers,
2020
} from "../../test-shared";
2121

2222
interface Context {
@@ -28,10 +28,10 @@ const test = anyTest as TestFn<Context>;
2828

2929
test.beforeEach((t) => {
3030
// TODO(soon): clean up this mess once we've migrated all gateways
31-
const legacyStorage = new MemoryStorage(undefined, testClock);
31+
const legacyStorage = new MemoryStorage(undefined, testTimers.now);
3232
const newStorage = legacyStorage.getNewStorage();
33-
const gateway = new KVGateway(new NoOpLog(), legacyStorage, testClock);
34-
const kvStorage = new KeyValueStorage(newStorage, testClock);
33+
const gateway = new KVGateway(new NoOpLog(), legacyStorage, testTimers);
34+
const kvStorage = new KeyValueStorage(newStorage, testTimers);
3535
t.context = { storage: kvStorage, gateway };
3636
});
3737

‎packages/tre/test/plugins/r2/index.spec.ts

+734-33
Large diffs are not rendered by default.

‎packages/tre/test/plugins/shared/router.spec.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
Response,
99
RouteHandler,
1010
Router,
11-
defaultClock,
11+
defaultTimers,
1212
} from "@miniflare/tre";
1313
import test from "ava";
1414

@@ -21,7 +21,7 @@ class TestRouter extends Router<TestGateway> {
2121
const log = new NoOpLog();
2222
super(
2323
log,
24-
new GatewayFactory(log, defaultClock, undefined, "test", TestGateway)
24+
new GatewayFactory(log, defaultTimers, undefined, "test", TestGateway)
2525
);
2626
}
2727

‎packages/tre/test/shared/sync.spec.ts

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import { setTimeout } from "timers/promises";
2+
import { DeferredPromise, Mutex, WaitGroup } from "@miniflare/tre";
3+
import test from "ava";
4+
5+
test("DeferredPromise: waits for resolve/reject callbacks", async (t) => {
6+
// Check resolves with regular value
7+
let promise = new DeferredPromise<number>();
8+
promise.resolve(42);
9+
t.is(await promise, 42);
10+
11+
// Check resolves with another Promise
12+
promise = new DeferredPromise<number>();
13+
promise.resolve(Promise.resolve(0));
14+
t.is(await promise, 0);
15+
16+
// Check rejects with error
17+
promise = new DeferredPromise<number>();
18+
promise.reject(new Error("🤯"));
19+
await t.throwsAsync(promise, { message: "🤯" });
20+
});
21+
22+
test("Mutex: runs closures exclusively", async (t) => {
23+
const mutex = new Mutex();
24+
const events: number[] = [];
25+
await Promise.all([
26+
mutex.runWith(async () => {
27+
events.push(1);
28+
await setTimeout();
29+
events.push(2);
30+
}),
31+
mutex.runWith(async () => {
32+
events.push(3);
33+
}),
34+
]);
35+
t.deepEqual(events, events[0] === 1 ? [1, 2, 3] : [3, 1, 2]);
36+
});
37+
test("Mutex: lock can be acquired synchronously", (t) => {
38+
const mutex = new Mutex();
39+
let acquired = false;
40+
mutex.runWith(() => (acquired = true));
41+
t.true(acquired);
42+
});
43+
44+
test("WaitGroup: waits for all tasks to complete", async (t) => {
45+
const group = new WaitGroup();
46+
47+
// Check doesn't wait if no tasks added
48+
await group.wait();
49+
50+
// Check waits for single task
51+
let resolved = false;
52+
group.add(); // count -> 1
53+
group.wait().then(() => (resolved = true));
54+
await Promise.resolve();
55+
t.false(resolved);
56+
57+
group.done(); // count -> 0 (complete)
58+
await Promise.resolve();
59+
t.true(resolved);
60+
61+
// Check waits for multiple tasks, including those added whilst waiting
62+
resolved = false;
63+
group.add(); // count -> 1
64+
group.add(); // count -> 2
65+
group.wait().then(() => (resolved = true));
66+
group.add(); // count -> 3
67+
await Promise.resolve();
68+
t.false(resolved);
69+
70+
group.done(); // count -> 2
71+
await Promise.resolve();
72+
t.false(resolved);
73+
74+
group.done(); // count -> 1
75+
await Promise.resolve();
76+
t.false(resolved);
77+
78+
group.add(); // count -> 2
79+
await Promise.resolve();
80+
t.false(resolved);
81+
82+
group.done(); // count -> 1
83+
await Promise.resolve();
84+
t.false(resolved);
85+
86+
group.done(); // count -> 0 (complete)
87+
await Promise.resolve();
88+
t.true(resolved);
89+
90+
// Check allows multiple waiters
91+
resolved = false;
92+
let resolved2 = false;
93+
group.add(); // count -> 1
94+
group.wait().then(() => (resolved = true));
95+
group.wait().then(() => (resolved2 = true));
96+
await Promise.resolve();
97+
t.false(resolved);
98+
t.false(resolved2);
99+
100+
group.done(); // count -> 0 (complete)
101+
await Promise.resolve();
102+
t.true(resolved);
103+
t.true(resolved2);
104+
});

‎packages/tre/test/test-shared/miniflare.ts

+24-8
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import type {
44
Request as WorkerRequest,
55
Response as WorkerResponse,
66
} from "@cloudflare/workers-types/experimental";
7-
import { Awaitable, Miniflare, MiniflareOptions } from "@miniflare/tre";
7+
import { Awaitable, Miniflare, MiniflareOptions, Timers } from "@miniflare/tre";
88
import anyTest, { TestFn } from "ava";
99
import { getPort } from "./http";
1010
import { TestLog } from "./log";
@@ -16,8 +16,25 @@ export type TestMiniflareHandler<Env> = (
1616
ctx: ExecutionContext
1717
) => Awaitable<WorkerResponse>;
1818

19-
export interface TestClock {
20-
timestamp: number;
19+
export class TestTimers implements Timers {
20+
timestamp = 1_000_000; // 1000s
21+
#pendingMicrotasks = new Set<Promise<unknown>>();
22+
23+
now = () => {
24+
return this.timestamp;
25+
};
26+
27+
queueMicrotask(closure: () => Awaitable<unknown>) {
28+
const result = closure();
29+
if (result instanceof Promise) {
30+
this.#pendingMicrotasks.add(result);
31+
result.then(() => this.#pendingMicrotasks.delete(result));
32+
}
33+
}
34+
35+
async waitForMicrotasks() {
36+
await Promise.all(this.#pendingMicrotasks);
37+
}
2138
}
2239

2340
export interface MiniflareTestContext {
@@ -27,7 +44,7 @@ export interface MiniflareTestContext {
2744
// Warning: if mutating or calling any of the following, `test.serial` must be
2845
// used to prevent races.
2946
log: TestLog;
30-
clock: TestClock;
47+
timers: TestTimers;
3148
setOptions(opts: Partial<MiniflareOptions>): Promise<void>;
3249
}
3350

@@ -72,14 +89,13 @@ export function miniflareTest<
7289
const test = anyTest as TestFn<Context>;
7390
test.before(async (t) => {
7491
const log = new TestLog(t);
75-
const clock: TestClock = { timestamp: 1_000_000 }; // 1000s
76-
const clockFunction = () => clock.timestamp;
92+
const timers = new TestTimers();
7793

7894
const opts: Partial<MiniflareOptions> = {
7995
...scriptOpts,
8096
port: await getPort(),
8197
log,
82-
clock: clockFunction,
98+
timers,
8399
verbose: true,
84100
};
85101

@@ -88,7 +104,7 @@ export function miniflareTest<
88104
// `userOpts`, a `handler` has been provided.
89105
t.context.mf = new Miniflare({ ...userOpts, ...opts } as MiniflareOptions);
90106
t.context.log = log;
91-
t.context.clock = clock;
107+
t.context.timers = timers;
92108
t.context.setOptions = (userOpts) =>
93109
t.context.mf.setOptions({ ...userOpts, ...opts } as MiniflareOptions);
94110
t.context.url = await t.context.mf.ready;

‎packages/tre/test/test-shared/storage.ts

+6-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import fs from "fs/promises";
33
import path from "path";
44
import { ReadableStream } from "stream/web";
55
import { TextDecoder, TextEncoder } from "util";
6-
import { Clock, sanitisePath, unwrapBYOBRequest } from "@miniflare/tre";
6+
import { Timers, sanitisePath, unwrapBYOBRequest } from "@miniflare/tre";
77
import { ExecutionContext } from "ava";
88

99
const encoder = new TextEncoder();
@@ -27,7 +27,11 @@ export const TIME_NOW = 750;
2727
// Tests will check the expiry is within 120s of this.
2828
export const TIME_EXPIRING = 1000;
2929

30-
export const testClock: Clock = () => TIME_NOW * 1000;
30+
// TODO(soon): remove once we remove the old storage system
31+
export const testTimers: Timers = {
32+
now: () => TIME_NOW * 1000,
33+
queueMicrotask,
34+
};
3135

3236
const tmpRoot = path.resolve(".tmp");
3337
export async function useTmp(t: ExecutionContext): Promise<string> {

0 commit comments

Comments
 (0)
This repository has been archived.