Skip to content

Commit

Permalink
refactor: Use promise queue for rate limiting operations
Browse files Browse the repository at this point in the history
The promise-queue library is added for helping control the concurrent requests.

CC-594
  • Loading branch information
Ilianna Papastefanou committed Jan 12, 2021
1 parent 70937e9 commit 898a208
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 45 deletions.
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -76,6 +76,7 @@
"needle": "2.5.0",
"open": "^7.0.3",
"os-name": "^3.0.0",
"promise-queue": "^2.2.5",
"proxy-agent": "^3.1.1",
"proxy-from-env": "^1.0.0",
"semver": "^6.0.0",
Expand Down
48 changes: 26 additions & 22 deletions src/lib/iac/detect-iac.ts
Expand Up @@ -21,6 +21,8 @@ import {
IllegalTerraformFileError,
} from '../errors/invalid-iac-file';
import { Options, TestOptions, IacFileInDirectory } from '../types';
import * as Queue from 'promise-queue';
import * as util from 'util';

const debug = debugLib('snyk-detect-iac');

Expand Down Expand Up @@ -97,7 +99,9 @@ async function getDirectoryFiles(
maxDepth: options.maxDepth,
});

const promises: Promise<IacFileInDirectory>[] = [];
const iacFiles: IacFileInDirectory[] = [];
const queue = new Queue(50);

for (const filePath of directoryPaths) {
const fileType = pathLib
.extname(filePath)
Expand All @@ -107,31 +111,31 @@ async function getDirectoryFiles(
continue;
}

promises.push(
(async () => {
try {
const projectType = await getProjectTypeForIacFile(filePath);
return {
filePath,
projectType,
fileType,
};
} catch (err) {
return {
filePath,
fileType,
failureReason:
err instanceof CustomError ? err.userMessage : 'Unhandled Error',
};
}
})(),
);
queue.add(async () => {
try {
const projectType = await getProjectTypeForIacFile(filePath);
iacFiles.push({
filePath,
projectType,
fileType,
});
} catch (err) {
iacFiles.push({
filePath,
fileType,
failureReason:
err instanceof CustomError ? err.userMessage : 'Unhandled Error',
});
}
});
}

const iacFiles = await Promise.all(promises);
const setTimeoutAsync = util.promisify(setTimeout);
while (queue.getQueueLength() + queue.getPendingLength() > 0) {
await setTimeoutAsync(100);
}
if (iacFiles.length === 0) {
throw IacDirectoryWithoutAnyIacFileError();
}

return iacFiles;
}
4 changes: 1 addition & 3 deletions src/lib/request/request.ts
Expand Up @@ -8,7 +8,6 @@ import { getProxyForUrl } from 'proxy-from-env';
import * as ProxyAgent from 'proxy-agent';
import * as analytics from '../analytics';
import { Agent } from 'http';
import * as https from 'https';
import { Global } from '../../cli/args';
import { Payload } from './types';
import { getVersion } from '../version';
Expand Down Expand Up @@ -94,15 +93,14 @@ export = function makeRequest(
url = url + '?' + querystring.stringify(payload.qs);
delete payload.qs;
}
const agent = new https.Agent({ keepAlive: true, maxSockets: 10 });

const options: needle.NeedleOptions = {
json: payload.json,
headers: payload.headers,
timeout: payload.timeout,
// eslint-disable-next-line @typescript-eslint/camelcase
follow_max: 5,
family: payload.family,
agent,
};

const proxyUri = getProxyForUrl(url);
Expand Down
42 changes: 22 additions & 20 deletions src/lib/snyk-test/run-test.ts
Expand Up @@ -7,6 +7,8 @@ import * as pathUtil from 'path';
import { parsePackageString as moduleToObject } from 'snyk-module';
import * as depGraphLib from '@snyk/dep-graph';
import { IacScan } from './payload-schema';
import * as Queue from 'promise-queue';
import * as util from 'util';

import {
TestResult,
Expand Down Expand Up @@ -226,28 +228,27 @@ async function sendAndParseResults(
options: Options & TestOptions,
): Promise<TestResult[]> {
const results: TestResult[] = [];
const promises: Promise<TestResult>[] = [];

const queue = new Queue(25, 2000);
for (const payload of payloads) {
await spinner.clear<void>(spinnerLbl)();
await spinner(spinnerLbl);
if (options.iac) {
promises.push(
(async () => {
const iacScan: IacScan = payload.body as IacScan;
analytics.add('iac type', !!iacScan.type);
const res = (await sendTestPayload(payload)) as IacTestResponse;

const projectName =
iacScan.projectNameOverride || iacScan.originalProjectName;
const result = await parseIacTestResult(
res,
iacScan.targetFile,
projectName,
options.severityThreshold,
);
return result;
})(),
);
queue.add(async () => {
const iacScan: IacScan = payload.body as IacScan;
analytics.add('iac type', !!iacScan.type);
const res = (await sendTestPayload(payload)) as IacTestResponse;

const projectName =
iacScan.projectNameOverride || iacScan.originalProjectName;
const result = await parseIacTestResult(
res,
iacScan.targetFile,
projectName,
options.severityThreshold,
);
results.push(result);
});
} else {
/** sendTestPayload() deletes the request.body from the payload once completed. */
const payloadCopy = Object.assign({}, payload);
Expand Down Expand Up @@ -298,8 +299,9 @@ async function sendAndParseResults(
}
}

if (promises.length) {
return await Promise.all(promises);
const setTimeoutAsync = util.promisify(setTimeout);
while (queue.getQueueLength() + queue.getPendingLength() > 0) {
await setTimeoutAsync(100);
}
return results;
}
Expand Down

0 comments on commit 898a208

Please sign in to comment.