Skip to content

Commit

Permalink
fix: memory leaks, worker and main process lifecycles (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
mistic authored and evilebottnawi committed Dec 19, 2018
1 parent 9208e3b commit f10fe55
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 45 deletions.
94 changes: 93 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 71 additions & 5 deletions src/WorkerPool.js
Expand Up @@ -12,16 +12,20 @@ let workerId = 0;

class PoolWorker {
constructor(options, onJobDone) {
this.disposed = false;
this.nextJobId = 0;
this.jobs = Object.create(null);
this.activeJobs = 0;
this.onJobDone = onJobDone;
this.id = workerId;
workerId += 1;
this.worker = childProcess.spawn(process.execPath, [].concat(options.nodeArgs || []).concat(workerPath, options.parallelJobs), {
stdio: ['ignore', 1, 2, 'pipe', 'pipe'],
detached: true,
stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe'],
});

this.worker.unref();

// This prevents a problem where the worker stdio can be undefined
// when the kernel hits the limit of open files.
// More info can be found on: https://github.com/webpack-contrib/thread-loader/issues/2
Expand All @@ -33,9 +37,42 @@ class PoolWorker {
const [, , , readPipe, writePipe] = this.worker.stdio;
this.readPipe = readPipe;
this.writePipe = writePipe;
this.listenStdOutAndErrFromWorker(this.worker.stdout, this.worker.stderr);
this.readNextMessage();
}

listenStdOutAndErrFromWorker(workerStdout, workerStderr) {
if (workerStdout) {
workerStdout.on('data', this.writeToStdout);
}

if (workerStderr) {
workerStderr.on('data', this.writeToStderr);
}
}

ignoreStdOutAndErrFromWorker(workerStdout, workerStderr) {
if (workerStdout) {
workerStdout.removeListener('data', this.writeToStdout);
}

if (workerStderr) {
workerStderr.removeListener('data', this.writeToStderr);
}
}

writeToStdout(data) {
if (!this.disposed) {
process.stdout.write(data);
}
}

writeToStderr(data) {
if (!this.disposed) {
process.stderr.write(data);
}
}

run(data, callback) {
const jobId = this.nextJobId;
this.nextJobId += 1;
Expand Down Expand Up @@ -64,9 +101,7 @@ class PoolWorker {
}

writeEnd() {
const lengthBuffer = new Buffer(4);
lengthBuffer.writeInt32BE(0, 0);
this.writePipe.write(lengthBuffer);
this.writePipe.write(Buffer.alloc(0));
}

readNextMessage() {
Expand All @@ -78,6 +113,7 @@ class PoolWorker {
}
this.state = 'length read';
const length = lengthBuffer.readInt32BE(0);

this.state = 'read message';
this.readBuffer(length, (messageError, messageBuffer) => {
if (messageError) {
Expand Down Expand Up @@ -201,7 +237,11 @@ class PoolWorker {
}

dispose() {
this.writeEnd();
if (!this.disposed) {
this.disposed = true;
this.ignoreStdOutAndErrFromWorker(this.worker.stdout, this.worker.stderr);
this.writeEnd();
}
}
}

Expand All @@ -216,6 +256,32 @@ export default class WorkerPool {
this.activeJobs = 0;
this.timeout = null;
this.poolQueue = asyncQueue(this.distributeJob.bind(this), options.poolParallelJobs);
this.terminated = false;

this.setupLifeCycle();
}

terminate() {
if (!this.terminated) {
this.terminated = true;

this.poolQueue.kill();
this.disposeWorkers();
}
}

setupLifeCycle() {
process.on('SIGTERM', () => {
this.terminate();
});

process.on('SIGINT', () => {
this.terminate();
});

process.on('exit', () => {
this.terminate();
});
}

run(data, callback) {
Expand Down
24 changes: 7 additions & 17 deletions src/readBuffer.js
@@ -1,16 +1,14 @@
export default function readBuffer(pipe, length, callback) {
if (length === 0) {
callback(null, new Buffer(0));
callback(null, Buffer.alloc(0));
return;
}
let terminated = false;

let remainingLength = length;
const buffers = [];

const readChunk = () => {
const onChunk = (arg) => {
if (terminated) {
return;
}
let chunk = arg;
let overflow;
if (chunk.length > remainingLength) {
Expand All @@ -22,26 +20,18 @@ export default function readBuffer(pipe, length, callback) {
}
buffers.push(chunk);
if (remainingLength === 0) {
pipe.pause();
pipe.removeListener('data', onChunk);
pipe.pause();

if (overflow) {
pipe.unshift(overflow);
}
terminated = true;

callback(null, Buffer.concat(buffers, length));
}
};
const onEnd = () => {
if (terminated) {
return;
}
terminated = true;
const err = new Error(`Stream ended ${remainingLength.toString()} bytes prematurely`);
err.name = 'EarlyEOFError';
callback(err);
};

pipe.on('data', onChunk);
pipe.on('end', onEnd);
pipe.resume();
};
readChunk();
Expand Down

0 comments on commit f10fe55

Please sign in to comment.