Skip to content

Commit 247ad58

Browse files
authored
Use p-map instead of promise-pool (#946)
* Use p-map instead of promise-pool * Add a integration test to prevent regression * Make parallel-matrix.test.ts concurrent * Make test concurrent * Fix openhandles by passing jobs down the stack, instead of relying on parser to be completely finished * Use fetch-depth: 0 * Stop using strategy.matrix * Fix module import * Add --verbose option to jest * cleanupJobResources in handler's last line
1 parent f791b77 commit 247ad58

12 files changed

+121
-108
lines changed

.github/workflows/build.yml

+4-23
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,10 @@ jobs:
2323

2424
pkg:
2525
runs-on: ubuntu-latest
26-
strategy:
27-
matrix:
28-
node-version: [18]
2926
steps:
3027
- uses: actions/checkout@v3
3128
- uses: actions/setup-node@v3
3229
with:
33-
node-version: ${{ matrix.node-version }}
3430
cache: 'npm'
3531
- uses: actions/cache@v3
3632
with:
@@ -45,46 +41,36 @@ jobs:
4541

4642
eslint:
4743
runs-on: ubuntu-latest
48-
strategy:
49-
matrix:
50-
node-version: [18]
5144
steps:
5245
- uses: actions/checkout@v3
5346
- uses: actions/setup-node@v3
5447
with:
55-
node-version: ${{ matrix.node-version }}
5648
cache: 'npm'
5749
- run: npm ci
5850
- run: npm run lint
5951

6052
unused-deps:
6153
runs-on: ubuntu-latest
62-
strategy:
63-
matrix:
64-
node-version: [18]
6554
steps:
6655
- uses: actions/checkout@v3
6756
- uses: actions/setup-node@v3
6857
with:
69-
node-version: ${{ matrix.node-version }}
7058
cache: 'npm'
7159
- run: npm ci
7260
- run: npx depcheck --ignores depcheck
7361

7462
jest:
7563
runs-on: ubuntu-latest
76-
strategy:
77-
matrix:
78-
node-version: [18]
7964
steps:
8065
- uses: actions/checkout@v3
66+
with:
67+
fetch-depth: 0
8168
- uses: actions/setup-node@v3
8269
with:
83-
node-version: ${{ matrix.node-version }}
8470
cache: 'npm'
8571
- run: npm ci
8672
- name: Run Tests
87-
run: FORCE_COLOR=1 npx jest --coverage --testTimeout 90000
73+
run: FORCE_COLOR=1 npx jest --coverage --testTimeout 90000 --verbose
8874
- uses: sonarsource/[email protected]
8975
env:
9076
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
@@ -99,19 +85,14 @@ jobs:
9985
contents: read
10086
security-events: write
10187

102-
strategy:
103-
fail-fast: false
104-
matrix:
105-
language: [ 'typescript' ]
106-
10788
steps:
10889
- name: Checkout repository
10990
uses: actions/checkout@v3
11091

11192
- name: Initialize CodeQL
11293
uses: github/codeql-action/init@v2
11394
with:
114-
languages: ${{ matrix.language }}
95+
languages: 'typescript'
11596

11697
- name: Autobuild
11798
uses: github/codeql-action/autobuild@v2

package-lock.json

+43-9
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
"start": "ts-node --log-error src/index.ts --cwd examples/docker-compose-nodejs"
2121
},
2222
"dependencies": {
23-
"@supercharge/promise-pool": "3.x.x",
2423
"axios": "0.27.x",
2524
"base64url": "3.x.x",
2625
"camelcase": "6.x.x",
@@ -33,6 +32,7 @@
3332
"globby": "11.x.x",
3433
"js-yaml": "4.x.x",
3534
"object-traversal": "1.x.x",
35+
"p-map": "4.x.x",
3636
"pretty-hrtime": "1.x.x",
3737
"source-map-support": "0.5.x",
3838
"split2": "4.x.x",

src/executor.ts

+3-7
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import chalk from "chalk";
22
import {Job} from "./job";
33
import assert, {AssertionError} from "assert";
44
import {Argv} from "./argv";
5-
import {PromisePool} from "@supercharge/promise-pool";
5+
import pMap from "p-map";
66

77
export class Executor {
88

@@ -12,12 +12,8 @@ export class Executor {
1212
do {
1313
startCandidates = Executor.getStartCandidates(jobs, stages, potentialStarters, argv.manual);
1414
if (startCandidates.length > 0) {
15-
await PromisePool
16-
.withConcurrency(argv.concurrency ?? startCandidates.length)
17-
.for(startCandidates)
18-
.process(async (job: Job) => {
19-
return job.start();
20-
});
15+
const mapper = async (startCandidate: Job) => startCandidate.start();
16+
await pMap(startCandidates, mapper, {concurrency: argv.concurrency ?? startCandidates.length});
2117
}
2218
} while (startCandidates.length > 0);
2319
}

src/handler.ts

+12-47
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ import {Parser} from "./parser";
77
import * as state from "./state";
88
import prettyHrtime from "pretty-hrtime";
99
import {WriteStreams} from "./write-streams";
10-
import {Job} from "./job";
10+
import {cleanupJobResources, Job} from "./job";
1111
import {Utils} from "./utils";
1212
import {Argv} from "./argv";
13-
import assert, {AssertionError} from "assert";
13+
import assert from "assert";
1414

1515
const generateGitIgnore = (cwd: string, stateDir: string) => {
1616
const gitIgnoreFilePath = `${cwd}/${stateDir}/.gitignore`;
@@ -20,47 +20,13 @@ const generateGitIgnore = (cwd: string, stateDir: string) => {
2020
}
2121
};
2222

23-
const cleanupResources = async (parser: Parser | null) => {
24-
if (!parser) {
25-
return;
26-
}
27-
const promises = [];
28-
for (const job of parser.jobs.values()) {
29-
promises.push(job.cleanupResources());
30-
}
31-
await Promise.all(promises);
32-
};
33-
34-
export async function handler (args: any, writeStreams: WriteStreams): Promise<ReadonlyArray<Job>> {
23+
export async function handler (args: any, writeStreams: WriteStreams, jobs: Job[] = []) {
3524
const argv = new Argv(args);
3625
const cwd = argv.cwd;
3726
const stateDir = argv.stateDir;
3827
const file = argv.file;
3928
let parser: Parser | null = null;
4029

41-
process.on("unhandledRejection", (e) => {
42-
if (e instanceof AssertionError) {
43-
process.stderr.write(chalk`{red ${e.message.trim()}}\n`);
44-
} else if (e instanceof Error) {
45-
process.stderr.write(chalk`{red ${e.stack?.trim() ?? e.message.trim()}}\n`);
46-
} else if (e) {
47-
process.stderr.write(chalk`{red ${e.toString().trim()}}\n`);
48-
}
49-
if (parser) {
50-
cleanupResources(parser).finally(process.exit(1));
51-
} else {
52-
process.exit(1);
53-
}
54-
});
55-
56-
process.on("exit", (_: string, code: number) => {
57-
cleanupResources(parser).finally(process.exit(code));
58-
});
59-
60-
process.on("SIGINT", (_: string, code: number) => {
61-
cleanupResources(parser).finally(process.exit(code));
62-
});
63-
6430
if (argv.completion) {
6531
yargs.showCompletionScript();
6632
return [];
@@ -69,13 +35,13 @@ export async function handler (args: any, writeStreams: WriteStreams): Promise<R
6935
assert(fs.existsSync(`${cwd}/${file}`), `${cwd}/${file} could not be found`);
7036

7137
if (argv.fetchIncludes) {
72-
parser = await Parser.create(argv, writeStreams, 0);
38+
await Parser.create(argv, writeStreams, 0, jobs);
7339
return [];
7440
}
7541

7642
if (argv.preview) {
7743
const pipelineIid = await state.getPipelineIid(cwd, stateDir);
78-
parser = await Parser.create(argv, writeStreams, pipelineIid);
44+
parser = await Parser.create(argv, writeStreams, pipelineIid, jobs);
7945
const gitlabData = parser.gitlabData;
8046
for (const jobName of Object.keys(gitlabData)) {
8147
if (jobName === "stages") {
@@ -88,15 +54,15 @@ export async function handler (args: any, writeStreams: WriteStreams): Promise<R
8854
writeStreams.stdout(`---\n${yaml.dump(gitlabData, {lineWidth: 160})}`);
8955
} else if (argv.list || argv.listAll) {
9056
const pipelineIid = await state.getPipelineIid(cwd, stateDir);
91-
parser = await Parser.create(argv, writeStreams, pipelineIid);
57+
parser = await Parser.create(argv, writeStreams, pipelineIid, jobs);
9258
Commander.runList(parser, writeStreams, argv.listAll);
9359
} else if (argv.listJson) {
9460
const pipelineIid = await state.getPipelineIid(cwd, stateDir);
95-
parser = await Parser.create(argv, writeStreams, pipelineIid);
61+
parser = await Parser.create(argv, writeStreams, pipelineIid, jobs);
9662
Commander.runJson(parser, writeStreams);
9763
} else if (argv.listCsv || argv.listCsvAll) {
9864
const pipelineIid = await state.getPipelineIid(cwd, stateDir);
99-
parser = await Parser.create(argv, writeStreams, pipelineIid);
65+
parser = await Parser.create(argv, writeStreams, pipelineIid, jobs);
10066
Commander.runCsv(parser, writeStreams, argv.listCsvAll);
10167
} else if (argv.job.length > 0) {
10268
assert(argv.stage == null, "You cannot use --stage when starting individual jobs");
@@ -107,7 +73,7 @@ export async function handler (args: any, writeStreams: WriteStreams): Promise<R
10773
await state.incrementPipelineIid(cwd, stateDir);
10874
}
10975
const pipelineIid = await state.getPipelineIid(cwd, stateDir);
110-
parser = await Parser.create(argv, writeStreams, pipelineIid);
76+
parser = await Parser.create(argv, writeStreams, pipelineIid, jobs);
11177
await Utils.rsyncTrackedFiles(cwd, stateDir, ".docker");
11278
await Commander.runJobs(argv, parser, writeStreams);
11379
if (argv.needs || argv.onlyNeeds) {
@@ -117,7 +83,7 @@ export async function handler (args: any, writeStreams: WriteStreams): Promise<R
11783
generateGitIgnore(cwd, stateDir);
11884
const time = process.hrtime();
11985
const pipelineIid = await state.getPipelineIid(cwd, stateDir);
120-
parser = await Parser.create(argv, writeStreams, pipelineIid);
86+
parser = await Parser.create(argv, writeStreams, pipelineIid, jobs);
12187
await Utils.rsyncTrackedFiles(cwd, stateDir, ".docker");
12288
await Commander.runJobsInStage(argv, parser, writeStreams);
12389
writeStreams.stderr(chalk`{grey pipeline finished} in {grey ${prettyHrtime(process.hrtime(time))}}\n`);
@@ -127,14 +93,13 @@ export async function handler (args: any, writeStreams: WriteStreams): Promise<R
12793
await fs.remove(`${cwd}/${stateDir}/artifacts`);
12894
await state.incrementPipelineIid(cwd, stateDir);
12995
const pipelineIid = await state.getPipelineIid(cwd, stateDir);
130-
parser = await Parser.create(argv, writeStreams, pipelineIid);
96+
parser = await Parser.create(argv, writeStreams, pipelineIid, jobs);
13197
await Utils.rsyncTrackedFiles(cwd, stateDir, ".docker");
13298
await Commander.runPipeline(argv, parser, writeStreams);
13399
writeStreams.stderr(chalk`{grey pipeline finished} in {grey ${prettyHrtime(process.hrtime(time))}}\n`);
134100
}
135101
writeStreams.flush();
136102

137-
await cleanupResources(parser);
138-
return parser.jobs;
103+
return cleanupJobResources(jobs);
139104
}
140105

0 commit comments

Comments
 (0)