Skip to content

Commit

Permalink
feat(gatsby): restart worker pool after query running in workers (#32365
Browse files Browse the repository at this point in the history
)

* feat(gatsby): restart worker pool after query running in workers

* test that old processes are killed after restart
  • Loading branch information
vladar committed Jul 14, 2021
1 parent d65de41 commit b9236e1
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 3 deletions.
4 changes: 4 additions & 0 deletions packages/gatsby-worker/src/__tests__/fixtures/test-child.ts
Expand Up @@ -15,6 +15,10 @@ export async function async(
}`
}

export function pid(): number {
return process.pid
}

export function neverEnding(): Promise<string> {
return new Promise<string>(() => {})
}
Expand Down
97 changes: 96 additions & 1 deletion packages/gatsby-worker/src/__tests__/integration.ts
@@ -1,6 +1,6 @@
import "jest-extended"
import { WorkerPool } from "../"
import { isPromise } from "../utils"
import { isPromise, isRunning } from "../utils"
import { MessagesFromChild, MessagesFromParent } from "./fixtures/test-child"

describe(`gatsby-worker`, () => {
Expand Down Expand Up @@ -44,6 +44,7 @@ describe(`gatsby-worker`, () => {
Array [
"sync",
"async",
"pid",
"neverEnding",
"syncThrow",
"asyncThrow",
Expand Down Expand Up @@ -241,6 +242,100 @@ describe(`gatsby-worker`, () => {
})
})

describe(`.restart`, () => {
it(`spawns new processes on restart`, async () => {
if (!workerPool) {
fail(`worker pool not created`)
}

const initialPids = await Promise.all(workerPool.all.pid())

// sanity checks:
expect(initialPids).toBeArrayOfSize(numWorkers)
expect(initialPids).toSatisfyAll(value => typeof value === `number`)

await workerPool.restart()
const newPids = await Promise.all(workerPool.all.pid())
expect(newPids).toBeArrayOfSize(numWorkers)
expect(newPids).toSatisfyAll(value => !initialPids.includes(value))
})

it(`kills old processes on restart`, async () => {
if (!workerPool) {
fail(`worker pool not created`)
}

const initialPids = await Promise.all(workerPool.all.pid())

// sanity checks:
expect(initialPids).toBeArrayOfSize(numWorkers)
expect(initialPids).toSatisfyAll(pid => isRunning(pid))

await workerPool.restart()
expect(initialPids).toSatisfyAll(pid => !isRunning(pid))
})

it(`.single works after restart`, async () => {
if (!workerPool) {
fail(`worker pool not created`)
}

const returnValue = workerPool.single.async(`.single async`)
// promise is preserved
expect(isPromise(returnValue)).toEqual(true)

const resolvedValue = await returnValue
expect(resolvedValue).toMatchInlineSnapshot(`"foo .single async"`)
})

it(`.all works after restart`, async () => {
if (!workerPool) {
fail(`worker pool not created`)
}

const returnValue = workerPool.all.async(`.single async`, {
addWorkerId: true,
})

expect(returnValue).toBeArrayOfSize(numWorkers)
// promise is preserved
expect(returnValue).toSatisfyAll(isPromise)

const resolvedValue = await Promise.all(returnValue)
expect(resolvedValue).toMatchInlineSnapshot(`
Array [
"foo .single async (worker #1)",
"foo .single async (worker #2)",
]
`)
})

it(`fails currently executed and pending tasks when worker is restarted`, async () => {
if (!workerPool) {
fail(`worker pool not created`)
}

expect.assertions(numWorkers * 2)

// queueing 2 * numWorkers task, so that currently executed tasks reject
// as well as pending tasks
for (let i = 0; i < numWorkers * 2; i++) {
workerPool.single
.neverEnding()
.then(() => {
fail(`promise should reject`)
})
.catch(e => {
expect(e).toMatchInlineSnapshot(
`[Error: Worker exited before finishing task]`
)
})
}

await workerPool.restart()
})
})

describe(`task queue`, () => {
it(`distributes task between workers when using .single`, async () => {
if (!workerPool) {
Expand Down
18 changes: 16 additions & 2 deletions packages/gatsby-worker/src/index.ts
Expand Up @@ -124,7 +124,7 @@ export class WorkerPool<
(msg: MessagesFromChild, workerId: number) => void
> = []

constructor(workerPath: string, options?: IWorkerOptions) {
constructor(private workerPath: string, private options?: IWorkerOptions) {
const single: Partial<WorkerPool<WorkerModuleExports>["single"]> = {}
const all: Partial<WorkerPool<WorkerModuleExports>["all"]> = {}

Expand Down Expand Up @@ -160,15 +160,19 @@ export class WorkerPool<

this.single = single as WorkerPool<WorkerModuleExports>["single"]
this.all = all as WorkerPool<WorkerModuleExports>["all"]
this.startAll()
}

private startAll(): void {
const options = this.options
for (let workerId = 1; workerId <= (options?.numWorkers ?? 1); workerId++) {
const worker = fork(childWrapperPath, {
cwd: process.cwd(),
env: {
...process.env,
...(options?.env ?? {}),
GATSBY_WORKER_ID: workerId.toString(),
GATSBY_WORKER_MODULE_PATH: workerPath,
GATSBY_WORKER_MODULE_PATH: this.workerPath,
},
// Suppress --debug / --inspect flags while preserving others (like --harmony).
execArgv: process.execArgv.filter(v => !/^--(debug|inspect)/.test(v)),
Expand Down Expand Up @@ -274,11 +278,21 @@ export class WorkerPool<
for (const taskNode of this.taskQueue) {
taskNode.value.reject(new Error(`Worker exited before finishing task`))
}
this.workers = []
this.idleWorkers = new Set()
})

return results
}

/**
* Kills all running worker processes and spawns a new pool of processes
*/
async restart(): Promise<void> {
await Promise.all(this.end())
this.startAll()
}

private checkForWork<T extends keyof WorkerModuleExports>(
workerInfo: IWorkerInfo<T>
): void {
Expand Down
11 changes: 11 additions & 0 deletions packages/gatsby-worker/src/utils.ts
Expand Up @@ -2,3 +2,14 @@ export const isPromise = (obj: any): obj is PromiseLike<unknown> =>
!!obj &&
(typeof obj === `object` || typeof obj === `function`) &&
typeof obj.then === `function`

export const isRunning = (pid: number): boolean => {
try {
// "As a special case, a signal of 0 can be used to test for the existence of a process."
// See https://nodejs.org/api/process.html#process_process_kill_pid_signal
process.kill(pid, 0)
return true
} catch (e) {
return false
}
}
3 changes: 3 additions & 0 deletions packages/gatsby/src/commands/build.ts
Expand Up @@ -93,8 +93,10 @@ module.exports = async function build(program: IBuildArgs): Promise<void> {

const { queryIds } = await calculateDirtyQueries({ store })

let waitForWorkerPoolRestart = Promise.resolve()
if (process.env.GATSBY_EXPERIMENTAL_PARALLEL_QUERY_RUNNING) {
await runQueriesInWorkersQueue(workerPool, queryIds)
waitForWorkerPoolRestart = workerPool.restart()
} else {
await runStaticQueries({
queryIds,
Expand Down Expand Up @@ -214,6 +216,7 @@ module.exports = async function build(program: IBuildArgs): Promise<void> {
buildSSRBundleActivityProgress.end()
}

await waitForWorkerPoolRestart
const {
toRegenerate,
toDelete,
Expand Down

0 comments on commit b9236e1

Please sign in to comment.