Skip to content

Commit

Permalink
refactor: Await all promises to resolve in queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilianna Papastefanou committed Jan 12, 2021
1 parent 898a208 commit 48ee642
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 59 deletions.
48 changes: 25 additions & 23 deletions src/lib/iac/detect-iac.ts
Expand Up @@ -22,7 +22,6 @@ import {
} from '../errors/invalid-iac-file';
import { Options, TestOptions, IacFileInDirectory } from '../types';
import * as Queue from 'promise-queue';
import * as util from 'util';

const debug = debugLib('snyk-detect-iac');

Expand Down Expand Up @@ -100,8 +99,10 @@ async function getDirectoryFiles(
});

const iacFiles: IacFileInDirectory[] = [];
const queue = new Queue(50);
const maxConcurrent = 50;
const queue = new Queue(maxConcurrent);

const promises: Array<Promise<IacFileInDirectory>> = [];
for (const filePath of directoryPaths) {
const fileType = pathLib
.extname(filePath)
Expand All @@ -110,30 +111,31 @@ async function getDirectoryFiles(
if (!fileType || !supportedExtensions.has(fileType)) {
continue;
}

queue.add(async () => {
try {
const projectType = await getProjectTypeForIacFile(filePath);
iacFiles.push({
filePath,
projectType,
fileType,
});
} catch (err) {
iacFiles.push({
filePath,
fileType,
failureReason:
err instanceof CustomError ? err.userMessage : 'Unhandled Error',
});
}
});
promises.push(
queue.add(async () => {
try {
const projectType = await getProjectTypeForIacFile(filePath);
iacFiles.push({
filePath,
projectType,
fileType,
});
} catch (err) {
iacFiles.push({
filePath,
fileType,
failureReason:
err instanceof CustomError ? err.userMessage : 'Unhandled Error',
});
}
}),
);
}

const setTimeoutAsync = util.promisify(setTimeout);
while (queue.getQueueLength() + queue.getPendingLength() > 0) {
await setTimeoutAsync(100);
if (promises.length > 0) {
await Promise.all(promises);
}

if (iacFiles.length === 0) {
throw IacDirectoryWithoutAnyIacFileError();
}
Expand Down
78 changes: 42 additions & 36 deletions src/lib/snyk-test/run-test.ts
@@ -1,60 +1,56 @@
import * as fs from 'fs';
import * as _ from 'lodash';
import * as path from 'path';
import * as pathUtil from 'path';
import * as debugModule from 'debug';
import chalk from 'chalk';
import * as pathUtil from 'path';
import { parsePackageString as moduleToObject } from 'snyk-module';
import * as depGraphLib from '@snyk/dep-graph';
import { IacScan } from './payload-schema';
import * as Queue from 'promise-queue';
import * as util from 'util';

import {
TestResult,
DockerIssue,
AffectedPackages,
AnnotatedIssue,
TestDepGraphResponse,
convertTestDepGraphResultToLegacy,
DockerIssue,
LegacyVulnApiResult,
TestDependenciesResponse,
AffectedPackages,
TestDepGraphResponse,
TestResult,
} from './legacy';
import { IacTestResponse } from './iac-test-result';
import {
AuthFailedError,
InternalServerError,
NoSupportedManifestsFoundError,
DockerImageNotFoundError,
FailedToGetVulnerabilitiesError,
FailedToGetVulnsFromUnavailableResource,
FailedToRunTestError,
InternalServerError,
NoSupportedManifestsFoundError,
UnsupportedFeatureFlagError,
DockerImageNotFoundError,
} from '../errors';
import * as snyk from '../';
import { isCI } from '../is-ci';
import * as common from './common';
import * as config from '../config';
import * as analytics from '../analytics';
import { maybePrintDepTree, maybePrintDepGraph } from '../print-deps';
import { GitTarget, ContainerTarget } from '../project-metadata/types';
import { maybePrintDepGraph, maybePrintDepTree } from '../print-deps';
import { ContainerTarget, GitTarget } from '../project-metadata/types';
import * as projectMetadata from '../project-metadata';
import {
DepTree,
Options,
TestOptions,
SupportedProjectTypes,
PolicyOptions,
SupportedProjectTypes,
TestOptions,
} from '../types';
import { pruneGraph } from '../prune';
import { getDepsFromPlugin } from '../plugins/get-deps-from-plugin';
import {
ScannedProjectCustom,
MultiProjectResultCustom,
ScannedProjectCustom,
} from '../plugins/get-multi-plugin-result';

import request = require('../request');
import spinner = require('../spinner');
import { extractPackageManager } from '../plugins/extract-package-manager';
import { getExtraProjectCount } from '../plugins/get-extra-project-count';
import { serializeCallGraphWithMetrics } from '../reachable-vulns';
Expand All @@ -75,6 +71,8 @@ import { getEcosystem } from '../ecosystems';
import { Issue } from '../ecosystems/types';
import { assembleEcosystemPayloads } from './assemble-payloads';
import { NonExistingPackageError } from '../errors/non-existing-package-error';
import request = require('../request');
import spinner = require('../spinner');

const debug = debugModule('snyk:run-test');

Expand Down Expand Up @@ -228,27 +226,36 @@ async function sendAndParseResults(
options: Options & TestOptions,
): Promise<TestResult[]> {
const results: TestResult[] = [];
const promises: Array<Promise<TestResult>> = [];

// There is a RATE_LIMIT setup for network requests within a time period.
// To support this limit and avoid getting back 502 errors from registry,
// we introduced a concurrent requests limit of 25. With that, we will only process MAX 25 requests at the same time.
// In the future, we would probably want to introduce a RATE_LIMIT specific for IaC
const maxConcurrent = 25;
const queue = new Queue(maxConcurrent);

const queue = new Queue(25, 2000);
for (const payload of payloads) {
await spinner.clear<void>(spinnerLbl)();
await spinner(spinnerLbl);
if (options.iac) {
queue.add(async () => {
const iacScan: IacScan = payload.body as IacScan;
analytics.add('iac type', !!iacScan.type);
const res = (await sendTestPayload(payload)) as IacTestResponse;

const projectName =
iacScan.projectNameOverride || iacScan.originalProjectName;
const result = await parseIacTestResult(
res,
iacScan.targetFile,
projectName,
options.severityThreshold,
);
results.push(result);
});
promises.push(
queue.add(async () => {
const iacScan: IacScan = payload.body as IacScan;
analytics.add('iac type', !!iacScan.type);
const res = (await sendTestPayload(payload)) as IacTestResponse;

const projectName =
iacScan.projectNameOverride || iacScan.originalProjectName;
const result = await parseIacTestResult(
res,
iacScan.targetFile,
projectName,
options.severityThreshold,
);
results.push(result);
}),
);
} else {
/** sendTestPayload() deletes the request.body from the payload once completed. */
const payloadCopy = Object.assign({}, payload);
Expand Down Expand Up @@ -299,9 +306,8 @@ async function sendAndParseResults(
}
}

const setTimeoutAsync = util.promisify(setTimeout);
while (queue.getQueueLength() + queue.getPendingLength() > 0) {
await setTimeoutAsync(100);
if (promises.length > 0) {
await Promise.all(promises);
}
return results;
}
Expand Down

0 comments on commit 48ee642

Please sign in to comment.