Skip to content

Commit

Permalink
Stop reading from stdin after programmatic API finishes (#253)
Browse files Browse the repository at this point in the history
  • Loading branch information
brandonchinn178 committed Aug 8, 2021
1 parent 07a7de1 commit c295062
Show file tree
Hide file tree
Showing 22 changed files with 186 additions and 83 deletions.
6 changes: 5 additions & 1 deletion README.md
Expand Up @@ -228,15 +228,19 @@ For more details, visit https://github.com/open-cli-tools/concurrently
concurrently can be used programmatically by using the API documented below:

### `concurrently(commands[, options])`

- `commands`: an array of either strings (containing the commands to run) or objects
with the shape `{ command, name, prefixColor, env, cwd }`.

- `options` (optional): an object containing any of the below:
- `cwd`: the working directory to be used by all commands. Can be overriden per command.
Default: `process.cwd()`.
- `defaultInputTarget`: the default input target when reading from `inputStream`.
Default: `0`.
- `handleInput`: when `true`, reads input from `process.stdin`.
- `inputStream`: a [`Readable` stream](https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_readable_streams)
to read the input from, eg `process.stdin`.
to read the input from. Should only be used in the rare instance you would like to stream anything other than `process.stdin`. Overrides `handleInput`.
- `pauseInputStreamOnFinish`: by default, pauses the input stream (`process.stdin` when `handleInput` is enabled, or `inputStream` if provided) when all of the processes have finished. If you need to read from the input stream after `concurrently` has finished, set this to `false`. ([#252](https://github.com/kimmobrunfeldt/concurrently/issues/252)).
- `killOthers`: an array of exitting conditions that will cause a process to kill others.
Can contain any of `success` or `failure`.
- `maxProcesses`: how many processes should run at once.
Expand Down
2 changes: 1 addition & 1 deletion bin/concurrently.js
Expand Up @@ -157,7 +157,7 @@ concurrently(args._.map((command, index) => {
name: names[index]
};
}), {
inputStream: args.handleInput && process.stdin,
handleInput: args.handleInput,
defaultInputTarget: args.defaultInputTarget,
killOthers: args.killOthers
? ['success', 'failure']
Expand Down
6 changes: 3 additions & 3 deletions bin/concurrently.spec.js
Expand Up @@ -193,7 +193,7 @@ describe('--names', () => {
});

describe('--prefix', () => {
it('is alised to -p', done => {
it('is aliased to -p', done => {
const child = run('-p command "echo foo" "echo bar"');
child.log.pipe(buffer(child.close)).subscribe(lines => {
expect(lines).toContainEqual(expect.stringContaining('[echo foo] foo'));
Expand All @@ -213,7 +213,7 @@ describe('--prefix', () => {
});

describe('--prefix-length', () => {
it('is alised to -l', done => {
it('is aliased to -l', done => {
const child = run('-p command -l 5 "echo foo" "echo bar"');
child.log.pipe(buffer(child.close)).subscribe(lines => {
expect(lines).toContainEqual(expect.stringContaining('[ec..o] foo'));
Expand Down Expand Up @@ -247,7 +247,7 @@ describe('--restart-tries', () => {
});

describe('--kill-others', () => {
it('is alised to -k', done => {
it('is aliased to -k', done => {
const child = run('-k "sleep 10" "exit 0"');
child.log.pipe(buffer(child.close)).subscribe(lines => {
expect(lines).toContainEqual(expect.stringContaining('[1] exit 0 exited with code 0'));
Expand Down
3 changes: 2 additions & 1 deletion index.js
Expand Up @@ -30,7 +30,8 @@ module.exports = exports = (commands, options = {}) => {
new InputHandler({
logger,
defaultInputTarget: options.defaultInputTarget,
inputStream: options.inputStream,
inputStream: options.inputStream || (options.handleInput && process.stdin),
pauseInputStreamOnFinish: options.pauseInputStreamOnFinish,
}),
new KillOnSignal({ process }),
new RestartProcess({
Expand Down
19 changes: 15 additions & 4 deletions src/concurrently.js
Expand Up @@ -49,18 +49,29 @@ module.exports = (commands, options) => {
))
.value();

commands = options.controllers.reduce(
(prevCommands, controller) => controller.handle(prevCommands),
commands
const handleResult = options.controllers.reduce(
({ commands: prevCommands, onFinishCallbacks }, controller) => {
const { commands, onFinish } = controller.handle(prevCommands);
return {
commands,
onFinishCallbacks: _.concat(onFinishCallbacks, onFinish ? [onFinish] : [])
}
},
{ commands, onFinishCallbacks: [] }
);
commands = handleResult.commands

const commandsLeft = commands.slice();
const maxProcesses = Math.max(1, Number(options.maxProcesses) || commandsLeft.length);
for (let i = 0; i < maxProcesses; i++) {
maybeRunMore(commandsLeft);
}

return new CompletionListener({ successCondition: options.successCondition }).listen(commands);
return new CompletionListener({ successCondition: options.successCondition })
.listen(commands)
.finally(() => {
handleResult.onFinishCallbacks.forEach((onFinish) => onFinish());
});
};

function mapToCommandInfo(command) {
Expand Down
23 changes: 21 additions & 2 deletions src/concurrently.spec.js
@@ -1,6 +1,7 @@
const EventEmitter = require('events');

const createFakeCommand = require('./flow-control/fixtures/fake-command');
const FakeHandler = require('./flow-control/fixtures/fake-handler');
const concurrently = require('./concurrently');

let spawn, kill, controllers, processes = [];
Expand All @@ -18,7 +19,7 @@ beforeEach(() => {
return process;
});
kill = jest.fn();
controllers = [{ handle: jest.fn(arg => arg) }, { handle: jest.fn(arg => arg) }];
controllers = [new FakeHandler(), new FakeHandler()];
});

it('fails if commands is not an array', () => {
Expand Down Expand Up @@ -85,7 +86,7 @@ it('runs commands with a name or prefix color', () => {

it('passes commands wrapped from a controller to the next one', () => {
const fakeCommand = createFakeCommand('banana', 'banana');
controllers[0].handle.mockReturnValue([fakeCommand]);
controllers[0].handle.mockReturnValue({ commands: [fakeCommand] });

create(['echo']);

Expand Down Expand Up @@ -165,3 +166,21 @@ it('uses overridden cwd option for each command if specified', () => {
cwd: 'foobar',
}));
});

it('runs onFinish hook after all commands run', async () => {
const promise = create(['foo', 'bar'], { maxProcesses: 1 });
expect(spawn).toHaveBeenCalledTimes(1);
expect(controllers[0].onFinish).not.toHaveBeenCalled();
expect(controllers[1].onFinish).not.toHaveBeenCalled();

processes[0].emit('close', 0, null);
expect(spawn).toHaveBeenCalledTimes(2);
expect(controllers[0].onFinish).not.toHaveBeenCalled();
expect(controllers[1].onFinish).not.toHaveBeenCalled();

processes[1].emit('close', 0, null);
await promise;

expect(controllers[0].onFinish).toHaveBeenCalled();
expect(controllers[1].onFinish).toHaveBeenCalled();
})
16 changes: 16 additions & 0 deletions src/flow-control/base-handler.js
@@ -0,0 +1,16 @@
module.exports = class BaseHandler {
constructor(options = {}) {
const { logger } = options;

this.logger = logger;
}

handle(commands) {
return {
commands,
// an optional callback to call when all commands have finished
// (either successful or not)
onFinish: null,
};
}
};
13 changes: 13 additions & 0 deletions src/flow-control/fixtures/fake-handler.js
@@ -0,0 +1,13 @@
const BaseHandler = require('../base-handler')

module.exports = class FakeHandler extends BaseHandler {
constructor() {
super();

this.handle = jest.fn(commands => ({
commands,
onFinish: this.onFinish,
}));
this.onFinish = jest.fn();
}
};
21 changes: 16 additions & 5 deletions src/flow-control/input-handler.js
Expand Up @@ -2,17 +2,20 @@ const Rx = require('rxjs');
const { map } = require('rxjs/operators');

const defaults = require('../defaults');
const BaseHandler = require('./base-handler');

module.exports = class InputHandler extends BaseHandler {
constructor({ defaultInputTarget, inputStream, pauseInputStreamOnFinish, logger }) {
super({ logger });

module.exports = class InputHandler {
constructor({ defaultInputTarget, inputStream, logger }) {
this.defaultInputTarget = defaultInputTarget || defaults.defaultInputTarget;
this.inputStream = inputStream;
this.logger = logger;
this.pauseInputStreamOnFinish = pauseInputStreamOnFinish !== false;
}

handle(commands) {
if (!this.inputStream) {
return commands;
return { commands };
}

Rx.fromEvent(this.inputStream, 'data')
Expand All @@ -34,6 +37,14 @@ module.exports = class InputHandler {
}
});

return commands;
return {
commands,
onFinish: () => {
if (this.pauseInputStreamOnFinish) {
// https://github.com/kimmobrunfeldt/concurrently/issues/252
this.inputStream.pause();
}
},
};
}
};
40 changes: 31 additions & 9 deletions src/flow-control/input-handler.spec.js
@@ -1,4 +1,4 @@
const EventEmitter = require('events');
const stream = require('stream');
const { createMockInstance } = require('jest-create-mock-instance');

const Logger = require('../logger');
Expand All @@ -12,7 +12,7 @@ beforeEach(() => {
createFakeCommand('foo', 'echo foo', 0),
createFakeCommand('bar', 'echo bar', 1),
];
inputStream = new EventEmitter();
inputStream = new stream.PassThrough();
logger = createMockInstance(Logger);
controller = new InputHandler({
defaultInputTarget: 0,
Expand All @@ -22,16 +22,16 @@ beforeEach(() => {
});

it('returns same commands', () => {
expect(controller.handle(commands)).toBe(commands);
expect(controller.handle(commands)).toMatchObject({ commands });

controller = new InputHandler({ logger });
expect(controller.handle(commands)).toBe(commands);
expect(controller.handle(commands)).toMatchObject({ commands });
});

it('forwards input stream to default target ID', () => {
controller.handle(commands);

inputStream.emit('data', Buffer.from('something'));
inputStream.write('something');

expect(commands[0].stdin.write).toHaveBeenCalledTimes(1);
expect(commands[0].stdin.write).toHaveBeenCalledWith('something');
Expand All @@ -41,7 +41,7 @@ it('forwards input stream to default target ID', () => {
it('forwards input stream to target index specified in input', () => {
controller.handle(commands);

inputStream.emit('data', Buffer.from('1:something'));
inputStream.write('1:something');

expect(commands[0].stdin.write).not.toHaveBeenCalled();
expect(commands[1].stdin.write).toHaveBeenCalledTimes(1);
Expand All @@ -63,7 +63,7 @@ it('forwards input stream to target index specified in input when input contains
it('forwards input stream to target name specified in input', () => {
controller.handle(commands);

inputStream.emit('data', Buffer.from('bar:something'));
inputStream.write('bar:something');

expect(commands[0].stdin.write).not.toHaveBeenCalled();
expect(commands[1].stdin.write).toHaveBeenCalledTimes(1);
Expand All @@ -74,7 +74,7 @@ it('logs error if command has no stdin open', () => {
commands[0].stdin = null;
controller.handle(commands);

inputStream.emit('data', Buffer.from('something'));
inputStream.write('something');

expect(commands[1].stdin.write).not.toHaveBeenCalled();
expect(logger.logGlobalEvent).toHaveBeenCalledWith('Unable to find command 0, or it has no stdin open\n');
Expand All @@ -83,9 +83,31 @@ it('logs error if command has no stdin open', () => {
it('logs error if command is not found', () => {
controller.handle(commands);

inputStream.emit('data', Buffer.from('foobar:something'));
inputStream.write('foobar:something');

expect(commands[0].stdin.write).not.toHaveBeenCalled();
expect(commands[1].stdin.write).not.toHaveBeenCalled();
expect(logger.logGlobalEvent).toHaveBeenCalledWith('Unable to find command foobar, or it has no stdin open\n');
});

it('pauses input stream when finished', () => {
expect(inputStream.readableFlowing).toBeNull();

const { onFinish } = controller.handle(commands);
expect(inputStream.readableFlowing).toBe(true);

onFinish();
expect(inputStream.readableFlowing).toBe(false);
});

it('does not pause input stream when pauseInputStreamOnFinish is set to false', () => {
controller = new InputHandler({ inputStream, pauseInputStreamOnFinish: false });

expect(inputStream.readableFlowing).toBeNull();

const { onFinish } = controller.handle(commands);
expect(inputStream.readableFlowing).toBe(true);

onFinish();
expect(inputStream.readableFlowing).toBe(true);
});
29 changes: 17 additions & 12 deletions src/flow-control/kill-on-signal.js
@@ -1,8 +1,11 @@
const { map } = require('rxjs/operators');

const BaseHandler = require('./base-handler');

module.exports = class KillOnSignal {
module.exports = class KillOnSignal extends BaseHandler {
constructor({ process }) {
super();

this.process = process;
}

Expand All @@ -15,16 +18,18 @@ module.exports = class KillOnSignal {
});
});

return commands.map(command => {
const closeStream = command.close.pipe(map(exitInfo => {
const exitCode = caughtSignal === 'SIGINT' ? 0 : exitInfo.exitCode;
return Object.assign({}, exitInfo, { exitCode });
}));
return new Proxy(command, {
get(target, prop) {
return prop === 'close' ? closeStream : target[prop];
}
});
});
return {
commands: commands.map(command => {
const closeStream = command.close.pipe(map(exitInfo => {
const exitCode = caughtSignal === 'SIGINT' ? 0 : exitInfo.exitCode;
return Object.assign({}, exitInfo, { exitCode });
}));
return new Proxy(command, {
get(target, prop) {
return prop === 'close' ? closeStream : target[prop];
}
});
})
};
}
};
6 changes: 3 additions & 3 deletions src/flow-control/kill-on-signal.spec.js
Expand Up @@ -14,7 +14,7 @@ beforeEach(() => {
});

it('returns commands that keep non-close streams from original commands', () => {
const newCommands = controller.handle(commands);
const { commands: newCommands } = controller.handle(commands);
newCommands.forEach((newCommand, i) => {
expect(newCommand.close).not.toBe(commands[i].close);
expect(newCommand.error).toBe(commands[i].error);
Expand All @@ -24,7 +24,7 @@ it('returns commands that keep non-close streams from original commands', () =>
});

it('returns commands that map SIGINT to exit code 0', () => {
const newCommands = controller.handle(commands);
const { commands: newCommands } = controller.handle(commands);
expect(newCommands).not.toBe(commands);
expect(newCommands).toHaveLength(commands.length);

Expand All @@ -40,7 +40,7 @@ it('returns commands that map SIGINT to exit code 0', () => {
});

it('returns commands that keep non-SIGINT exit codes', () => {
const newCommands = controller.handle(commands);
const { commands: newCommands } = controller.handle(commands);
expect(newCommands).not.toBe(commands);
expect(newCommands).toHaveLength(commands.length);

Expand Down

0 comments on commit c295062

Please sign in to comment.