Skip to content

Commit

Permalink
Use thread IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
novemberborn committed Nov 1, 2021
1 parent af30e73 commit c4f6723
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 20 deletions.
8 changes: 1 addition & 7 deletions lib/fork.js
Expand Up @@ -14,8 +14,6 @@ export function _testOnlyReplaceWorkerPath(replacement) {
workerPath = replacement;
}

let forkCounter = 0;

const additionalExecArgv = ['--enable-source-maps'];

const createWorker = (options, execArgv) => {
Expand Down Expand Up @@ -68,9 +66,6 @@ const createWorker = (options, execArgv) => {
};

export default function loadFork(file, options, execArgv = process.execArgv) {
// TODO: this can be changed to use `threadId` when using worker_threads
const forkId = `fork/${++forkCounter}`;

let finished = false;

const emitter = new Emittery();
Expand All @@ -83,7 +78,6 @@ export default function loadFork(file, options, execArgv = process.execArgv) {
options = {
baseDir: process.cwd(),
file,
forkId,
...options,
};

Expand Down Expand Up @@ -161,7 +155,7 @@ export default function loadFork(file, options, execArgv = process.execArgv) {

return {
file,
forkId,
threadId: worker.threadId,
promise,

exit() {
Expand Down
4 changes: 2 additions & 2 deletions lib/plugin-support/shared-worker-loader.js
@@ -1,6 +1,6 @@
import {EventEmitter, on} from 'node:events';
import process from 'node:process';
import {workerData, parentPort} from 'node:worker_threads';
import {workerData, parentPort, threadId} from 'node:worker_threads';

import pkg from '../pkg.cjs';

Expand Down Expand Up @@ -117,7 +117,7 @@ async function * receiveMessages(fromTestWorker, replyTo) {
}

let messageCounter = 0;
const messageIdPrefix = `${workerData.id}/message`;
const messageIdPrefix = `${threadId}/message`;
const nextMessageId = () => `${messageIdPrefix}/${++messageCounter}`;

function publishMessage(testWorker, data, replyTo) {
Expand Down
10 changes: 3 additions & 7 deletions lib/plugin-support/shared-workers.js
Expand Up @@ -6,7 +6,6 @@ import serializeError from '../serialize-error.js';

const LOADER = new URL('shared-worker-loader.js', import.meta.url);

let sharedWorkerCounter = 0;
const launchedWorkers = new Map();

const waitForAvailable = async worker => {
Expand All @@ -22,14 +21,11 @@ function launchWorker(filename, initialData) {
return launchedWorkers.get(filename);
}

// TODO: remove the custom id and use the built-in thread id.
const id = `shared-worker/${++sharedWorkerCounter}`;
const worker = new Worker(LOADER, {
// Ensure the worker crashes for unhandled rejections, rather than allowing undefined behavior.
execArgv: ['--unhandled-rejections=strict'],
workerData: {
filename,
id,
initialData,
},
});
Expand Down Expand Up @@ -69,7 +65,7 @@ export async function observeWorkerProcess(fork, runStatus) {
const launched = launchWorker(filename, initialData);

const handleWorkerMessage = async message => {
if (message.type === 'deregistered-test-worker' && message.id === fork.forkId) {
if (message.type === 'deregistered-test-worker' && message.id === fork.threadId) {
launched.worker.off('message', handleWorkerMessage);

registrationCount--;
Expand All @@ -95,15 +91,15 @@ export async function observeWorkerProcess(fork, runStatus) {

launched.worker.postMessage({
type: 'register-test-worker',
id: fork.forkId,
id: fork.threadId,
file: pathToFileURL(fork.file).toString(),
port,
}, [port]);

fork.promise.finally(() => {
launched.worker.postMessage({
type: 'deregister-test-worker',
id: fork.forkId,
id: fork.threadId,
});
});

Expand Down
6 changes: 2 additions & 4 deletions lib/worker/channel.cjs
@@ -1,13 +1,12 @@
'use strict';
const events = require('events');
const process = require('process');
const {MessageChannel} = require('worker_threads');
const {MessageChannel, threadId} = require('worker_threads');

const pEvent = require('p-event');

const timers = require('../now-and-timers.cjs');

const {get: getOptions} = require('./options.cjs');
const {isRunningInChildProcess, isRunningInThread} = require('./utils.cjs');

const selectAvaMessage = type => message => message.ava && message.ava.type === type;
Expand Down Expand Up @@ -150,8 +149,7 @@ function createChannelEmitter(channelId) {
}

function registerSharedWorker(filename, initialData) {
const {forkId} = getOptions();
const channelId = `${forkId}/channel/${++channelCounter}`;
const channelId = `${threadId}/channel/${++channelCounter}`;

const {port1: ourPort, port2: theirPort} = new MessageChannel();
const sharedWorkerHandle = new MessagePortHandle(ourPort);
Expand Down

0 comments on commit c4f6723

Please sign in to comment.