Skip to content

Commit

Permalink
De-register all shared workers
Browse files Browse the repository at this point in the history
Co-authored-by: Mark Wubben <mark@novemberborn.net>
  • Loading branch information
codetheweb and novemberborn committed Jan 15, 2023
1 parent 639b905 commit ffed948
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 20 deletions.
52 changes: 32 additions & 20 deletions lib/plugin-support/shared-workers.js
Expand Up @@ -49,49 +49,61 @@ function launchWorker(filename, initialData) {
}

export async function observeWorkerProcess(fork, runStatus) {
let registrationCount = 0;
let signalDeregistered;
let launched;
const deregistered = new Promise(resolve => {
signalDeregistered = () => {
// Only unref the worker once all test workers have been deregistered, otherwise the worker may exit before test workers are deregistered
launched?.worker.unref();
let signalDone;

const done = new Promise(resolve => {
signalDone = () => {
resolve();
};
});

fork.promise.finally(() => {
if (registrationCount === 0) {
signalDeregistered();
const activeInstances = new Set();

const removeInstance = instance => {
instance.worker.unref();
activeInstances.delete(instance);

if (activeInstances.size === 0) {
signalDone();
}
};

const removeAllInstances = () => {
if (activeInstances.size === 0) {
signalDone();
return;
}

for (const instance of activeInstances) {
removeInstance(instance);
}
};

fork.promise.finally(() => {
removeAllInstances();
});

fork.onConnectSharedWorker(async ({filename, initialData, port, signalError}) => {
launched = launchWorker(filename, initialData);
const launched = launchWorker(filename, initialData);
activeInstances.add(launched);

const handleWorkerMessage = async message => {
if (message.type === 'deregistered-test-worker' && message.id === fork.threadId) {
launched.worker.off('message', handleWorkerMessage);

registrationCount--;
if (registrationCount === 0) {
signalDeregistered();
}
removeInstance(launched);
}
};

launched.statePromises.error.then(error => {
signalDeregistered();
launched.worker.off('message', handleWorkerMessage);
removeAllInstances();
runStatus.emitStateChange({type: 'shared-worker-error', err: serializeError('Shared worker error', true, error)});
signalError();
});

try {
await launched.statePromises.available;

registrationCount++;

port.postMessage({type: 'ready'});

launched.worker.postMessage({
Expand All @@ -112,5 +124,5 @@ export async function observeWorkerProcess(fork, runStatus) {
} catch {}
});

return deregistered;
return done;
}
@@ -0,0 +1,8 @@
{
"type": "module",
"ava": {
"files": [
"*.js"
]
}
}
30 changes: 30 additions & 0 deletions test/shared-workers/multiple-workers-are-loaded/fixtures/test.js
@@ -0,0 +1,30 @@
import test from 'ava';
import {registerSharedWorker} from 'ava/plugin';

const worker1 = registerSharedWorker({
filename: new URL('worker.mjs#1', import.meta.url),
supportedProtocols: ['ava-4'],
initialData: {
id: '1',
},
});

const worker2 = registerSharedWorker({
filename: new URL('worker.mjs#2', import.meta.url),
supportedProtocols: ['ava-4'],
initialData: {
id: '2',
},
});

const messageFromWorker1 = worker1.subscribe().next();
const messageFromWorker2 = worker2.subscribe().next();

test('can load multiple workers', async t => {
const {value: {data: dataFromWorker1}} = await messageFromWorker1;
const {value: {data: dataFromWorker2}} = await messageFromWorker2;

t.deepEqual(dataFromWorker1, {id: '1'});
t.deepEqual(dataFromWorker2, {id: '2'});
t.pass();
});
@@ -0,0 +1,9 @@
export default async ({negotiateProtocol}) => {
const protocol = negotiateProtocol(['ava-4']);

await protocol.ready();

for await (const testWorker of protocol.testWorkers()) {
testWorker.publish(protocol.initialData);
}
};
9 changes: 9 additions & 0 deletions test/shared-workers/multiple-workers-are-loaded/test.js
@@ -0,0 +1,9 @@
import test from '@ava/test';

import {fixture} from '../../helpers/exec.js';

test('can load multiple workers', async t => {
await fixture();

t.pass();
});

0 comments on commit ffed948

Please sign in to comment.