1
1
import byline from "byline" ;
2
2
import Dockerode from "dockerode" ;
3
- import { setTimeout } from "timers/promises" ;
4
3
import { log } from "../common" ;
5
4
import { getContainerRuntimeClient } from "../container-runtime" ;
6
5
import { BoundPorts } from "../utils/bound-ports" ;
@@ -17,37 +16,46 @@ export class LogWaitStrategy extends AbstractWaitStrategy {
17
16
}
18
17
19
18
public async waitUntilReady ( container : Dockerode . Container , boundPorts : BoundPorts , startTime ?: Date ) : Promise < void > {
20
- await Promise . race ( [ this . handleTimeout ( container . id ) , this . handleLogs ( container , startTime ) ] ) ;
21
- }
22
-
23
- async handleTimeout ( containerId : string ) : Promise < void > {
24
- await setTimeout ( this . startupTimeout ) ;
25
- this . throwError ( containerId , `Log message "${ this . message } " not received after ${ this . startupTimeout } ms` ) ;
26
- }
27
-
28
- async handleLogs ( container : Dockerode . Container , startTime ?: Date ) : Promise < void > {
29
19
log . debug ( `Waiting for log message "${ this . message } "...` , { containerId : container . id } ) ;
30
20
const client = await getContainerRuntimeClient ( ) ;
31
21
const stream = await client . container . logs ( container , { since : startTime ? startTime . getTime ( ) / 1000 : 0 } ) ;
32
-
33
- let matches = 0 ;
34
- for await ( const line of byline ( stream ) ) {
35
- if ( this . matches ( line ) ) {
36
- if ( ++ matches === this . times ) {
37
- return log . debug ( `Log wait strategy complete` , { containerId : container . id } ) ;
22
+ return new Promise ( ( resolve , reject ) => {
23
+ const timeout = setTimeout ( ( ) => {
24
+ const message = `Log message "${ this . message } " not received after ${ this . startupTimeout } ms` ;
25
+ log . error ( message , { containerId : container . id } ) ;
26
+ reject ( new Error ( message ) ) ;
27
+ } , this . startupTimeout ) ;
28
+
29
+ const comparisonFn : ( line : string ) => boolean = ( line : string ) => {
30
+ if ( this . message instanceof RegExp ) {
31
+ return this . message . test ( line ) ;
32
+ } else {
33
+ return line . includes ( this . message ) ;
38
34
}
39
- }
40
- }
41
-
42
- this . throwError ( container . id , `Log stream ended and message "${ this . message } " was not received` ) ;
43
- }
44
-
45
- matches ( line : string ) : boolean {
46
- return this . message instanceof RegExp ? this . message . test ( line ) : line . includes ( this . message ) ;
47
- }
48
-
49
- throwError ( containerId : string , message : string ) : void {
50
- log . error ( message , { containerId } ) ;
51
- throw new Error ( message ) ;
35
+ } ;
36
+
37
+ let count = 0 ;
38
+ const lineProcessor = ( line : string ) => {
39
+ if ( comparisonFn ( line ) ) {
40
+ if ( ++ count === this . times ) {
41
+ stream . destroy ( ) ;
42
+ clearTimeout ( timeout ) ;
43
+ log . debug ( `Log wait strategy complete` , { containerId : container . id } ) ;
44
+ resolve ( ) ;
45
+ }
46
+ }
47
+ } ;
48
+
49
+ byline ( stream )
50
+ . on ( "data" , lineProcessor )
51
+ . on ( "err" , lineProcessor )
52
+ . on ( "end" , ( ) => {
53
+ stream . destroy ( ) ;
54
+ clearTimeout ( timeout ) ;
55
+ const message = `Log stream ended and message "${ this . message } " was not received` ;
56
+ log . error ( message , { containerId : container . id } ) ;
57
+ reject ( new Error ( message ) ) ;
58
+ } ) ;
59
+ } ) ;
52
60
}
53
61
}
0 commit comments