@@ -25,6 +25,8 @@ import (
25
25
"net/http"
26
26
"net/url"
27
27
"time"
28
+
29
+ . "github.com/onsi/gomega"
28
30
)
29
31
30
32
type RayJobSetup struct {
@@ -54,25 +56,25 @@ type RayClusterClientConfig struct {
54
56
var _ RayClusterClient = (* rayClusterClient )(nil )
55
57
56
58
type rayClusterClient struct {
57
- endpoint url.URL
58
- httpClient * http.Client
59
- authHeader string
59
+ endpoint url.URL
60
+ httpClient * http.Client
61
+ bearerToken string
60
62
}
61
63
62
64
type RayClusterClient interface {
63
65
CreateJob (job * RayJobSetup ) (* RayJobResponse , error )
64
66
GetJobDetails (jobID string ) (* RayJobDetailsResponse , error )
65
67
GetJobLogs (jobID string ) (string , error )
66
- GetAllJobsData () ([]map [string ]interface {}, error )
67
- WaitForJobStatus (jobID string ) ( string , error )
68
+ GetJobs () ([]map [string ]interface {}, error )
69
+ WaitForJobStatus (test Test , jobID string ) string
68
70
}
69
71
70
- func NewRayClusterClient (dashboardEndpoint url.URL , config RayClusterClientConfig , authHeader string ) RayClusterClient {
72
+ func NewRayClusterClient (dashboardEndpoint url.URL , config RayClusterClientConfig , bearerToken string ) RayClusterClient {
71
73
tr := & http.Transport {
72
74
TLSClientConfig : & tls.Config {InsecureSkipVerify : config .SkipTlsVerification },
73
75
Proxy : http .ProxyFromEnvironment ,
74
76
}
75
- return & rayClusterClient {endpoint : dashboardEndpoint , httpClient : & http.Client {Transport : tr }, authHeader : authHeader }
77
+ return & rayClusterClient {endpoint : dashboardEndpoint , httpClient : & http.Client {Transport : tr }, bearerToken : bearerToken }
76
78
}
77
79
78
80
func (client * rayClusterClient ) CreateJob (job * RayJobSetup ) (response * RayJobResponse , err error ) {
@@ -101,15 +103,15 @@ func (client *rayClusterClient) CreateJob(job *RayJobSetup) (response *RayJobRes
101
103
return
102
104
}
103
105
104
- func (client * rayClusterClient ) GetAllJobsData () ([]map [string ]interface {}, error ) {
106
+ func (client * rayClusterClient ) GetJobs () ([]map [string ]interface {}, error ) {
105
107
getAllJobsDetailsURL := client .endpoint .String () + "/api/jobs/"
106
108
107
109
req , err := http .NewRequest (http .MethodGet , getAllJobsDetailsURL , nil )
108
110
if err != nil {
109
111
return nil , err
110
112
}
111
- if client .authHeader != "" {
112
- req .Header .Set ("Authorization" , "Bearer " + client .authHeader )
113
+ if client .bearerToken != "" {
114
+ req .Header .Set ("Authorization" , "Bearer " + client .bearerToken )
113
115
}
114
116
resp , err := client .httpClient .Do (req )
115
117
if err != nil {
@@ -139,8 +141,8 @@ func (client *rayClusterClient) GetJobDetails(jobID string) (response *RayJobDet
139
141
if err != nil {
140
142
return nil , err
141
143
}
142
- if client .authHeader != "" {
143
- req .Header .Set ("Authorization" , "Bearer " + client .authHeader )
144
+ if client .bearerToken != "" {
145
+ req .Header .Set ("Authorization" , "Bearer " + client .bearerToken )
144
146
}
145
147
146
148
resp , err := client .httpClient .Do (req )
@@ -167,8 +169,8 @@ func (client *rayClusterClient) GetJobLogs(jobID string) (logs string, err error
167
169
if err != nil {
168
170
return "" , err
169
171
}
170
- if client .authHeader != "" {
171
- req .Header .Set ("Authorization" , "Bearer " + client .authHeader )
172
+ if client .bearerToken != "" {
173
+ req .Header .Set ("Authorization" , "Bearer " + client .bearerToken )
172
174
}
173
175
resp , err := client .httpClient .Do (req )
174
176
if err != nil {
@@ -189,32 +191,30 @@ func (client *rayClusterClient) GetJobLogs(jobID string) (logs string, err error
189
191
return jobLogs .Logs , err
190
192
}
191
193
192
- func (client * rayClusterClient ) WaitForJobStatus (jobID string ) ( string , error ) {
194
+ func (client * rayClusterClient ) WaitForJobStatus (test Test , jobID string ) string {
193
195
var status string
194
- var prevStatus string
195
196
fmt .Printf ("Waiting for job to be Succeeded...\n " )
196
- var err error
197
- var resp * RayJobDetailsResponse
198
- for status != "SUCCEEDED" {
199
- resp , err = client .GetJobDetails (jobID )
200
- if err != nil {
201
- time .Sleep (2 * time .Second )
202
- continue
203
- }
197
+
198
+ test .Eventually (func () string {
199
+ resp , err := client .GetJobDetails (jobID )
200
+ test .Expect (err ).ToNot (HaveOccurred ())
204
201
statusVal := resp .Status
205
202
if statusVal == "SUCCEEDED" || statusVal == "FAILED" {
206
203
fmt .Printf ("JobStatus : %s\n " , statusVal )
207
- prevStatus = statusVal
208
- return prevStatus , err
204
+ status = statusVal
205
+ return status
209
206
}
210
- if prevStatus != statusVal && statusVal != "SUCCEEDED" {
207
+ if status != statusVal && statusVal != "SUCCEEDED" {
211
208
fmt .Printf ("JobStatus : %s...\n " , statusVal )
212
- prevStatus = statusVal
209
+ status = statusVal
213
210
}
214
- time .Sleep (3 * time .Second )
215
- }
216
- if prevStatus != "SUCCEEDED" {
217
- err = fmt .Errorf ("Job failed !" )
211
+ return status
212
+ }, TestTimeoutDouble , 3 * time .Second ).Should (Or (Equal ("SUCCEEDED" ), Equal ("FAILED" )), "Job did not complete within the expected time" )
213
+
214
+ if status == "SUCCEEDED" {
215
+ fmt .Printf ("Job succeeded !\n " )
216
+ } else {
217
+ fmt .Printf ("Job failed !\n " )
218
218
}
219
- return prevStatus , err
219
+ return status
220
220
}
0 commit comments