@@ -18,7 +18,6 @@ package support
18
18
19
19
import (
20
20
"bytes"
21
- "crypto/tls"
22
21
"encoding/json"
23
22
"fmt"
24
23
"io"
@@ -47,39 +46,30 @@ type RayJobLogsResponse struct {
47
46
}
48
47
49
48
type RayClusterClientConfig struct {
50
- Address string
51
- Client * http.Client
52
- InsecureSkipVerify bool
49
+ Address string
50
+ Client * http.Client
53
51
}
54
52
55
53
var _ RayClusterClient = (* rayClusterClient )(nil )
56
54
57
55
type rayClusterClient struct {
58
- endpoint url.URL
59
- httpClient * http.Client
60
- bearerToken string
56
+ config RayClusterClientConfig
61
57
}
62
58
63
59
type RayClusterClient interface {
64
60
CreateJob (job * RayJobSetup ) (* RayJobResponse , error )
65
61
GetJobDetails (jobID string ) (* RayJobDetailsResponse , error )
66
- GetJobLogs (jobID string ) (string , error )
67
- GetJobs () (* []RayJobDetailsResponse , error )
62
+ GetJobLogs (jobID string ) (* RayJobLogsResponse , error )
63
+ ListJobs () ([]RayJobDetailsResponse , error )
68
64
}
69
65
70
- func NewRayClusterClient (config RayClusterClientConfig , bearerToken string ) (RayClusterClient , error ) {
71
- tr := & http.Transport {
72
- TLSClientConfig : & tls.Config {InsecureSkipVerify : config .InsecureSkipVerify },
73
- Proxy : http .ProxyFromEnvironment ,
74
- }
75
- config .Client = & http.Client {Transport : tr }
66
+ func NewRayClusterClient (config RayClusterClientConfig ) (RayClusterClient , error ) {
76
67
endpoint , err := url .Parse (config .Address )
77
68
if err != nil {
78
- return nil , fmt .Errorf ("invalid dashboard endpoint address" )
79
- }
80
- rayClusterApiClient := & rayClusterClient {
81
- endpoint : * endpoint , httpClient : config .Client , bearerToken : bearerToken ,
69
+ return nil , fmt .Errorf ("invalid dashboard endpoint address: %s" , endpoint )
82
70
}
71
+
72
+ rayClusterApiClient := & rayClusterClient {config }
83
73
return rayClusterApiClient , nil
84
74
}
85
75
@@ -89,13 +79,15 @@ func (client *rayClusterClient) CreateJob(job *RayJobSetup) (response *RayJobRes
89
79
return
90
80
}
91
81
92
- createJobURL := client .endpoint . String () + "/api/jobs/"
82
+ createJobURL := client .config . Address + "/api/jobs/"
93
83
94
- resp , err := client .httpClient .Post (createJobURL , "application/json" , bytes .NewReader (marshalled ))
84
+ resp , err := client .config . Client .Post (createJobURL , "application/json" , bytes .NewReader (marshalled ))
95
85
if err != nil {
96
86
return
97
87
}
98
88
89
+ defer resp .Body .Close ()
90
+
99
91
respData , err := io .ReadAll (resp .Body )
100
92
if err != nil {
101
93
return
@@ -109,95 +101,71 @@ func (client *rayClusterClient) CreateJob(job *RayJobSetup) (response *RayJobRes
109
101
return
110
102
}
111
103
112
- func (client * rayClusterClient ) GetJobs () (response * []RayJobDetailsResponse , err error ) {
113
- getAllJobsDetailsURL := client .endpoint . String () + "/api/jobs/"
104
+ func (client * rayClusterClient ) ListJobs () (response []RayJobDetailsResponse , err error ) {
105
+ getAllJobsDetailsURL := client .config . Address + "/api/jobs/"
114
106
115
- req , err := http . NewRequest ( http . MethodGet , getAllJobsDetailsURL , nil )
107
+ resp , err := client . config . Client . Get ( getAllJobsDetailsURL )
116
108
if err != nil {
117
- return nil , err
118
- }
119
- if client .bearerToken != "" {
120
- req .Header .Set ("Authorization" , "Bearer " + client .bearerToken )
121
- }
122
- resp , err := client .httpClient .Do (req )
123
- if err != nil {
124
- return nil , err
109
+ return
125
110
}
111
+
126
112
defer resp .Body .Close ()
127
- if resp .StatusCode == 503 {
128
- return nil , fmt .Errorf ("service unavailable" )
129
- }
113
+
130
114
respData , err := io .ReadAll (resp .Body )
131
115
if err != nil {
132
- return nil , err
116
+ return
133
117
}
118
+
134
119
if resp .StatusCode != 200 {
135
120
return nil , fmt .Errorf ("incorrect response code: %d for retrieving Ray Job details, response body: %s" , resp .StatusCode , respData )
136
121
}
122
+
137
123
err = json .Unmarshal (respData , & response )
138
- if err != nil {
139
- return nil , err
140
- }
141
- return response , nil
124
+ return
142
125
}
143
126
144
127
func (client * rayClusterClient ) GetJobDetails (jobID string ) (response * RayJobDetailsResponse , err error ) {
145
- getJobDetailsURL := client .endpoint . String () + "/api/jobs/" + jobID
128
+ getJobDetailsURL := client .config . Address + "/api/jobs/" + jobID
146
129
147
- req , err := http . NewRequest ( http . MethodGet , getJobDetailsURL , nil )
130
+ resp , err := client . config . Client . Get ( getJobDetailsURL )
148
131
if err != nil {
149
- return nil , err
150
- }
151
- if client .bearerToken != "" {
152
- req .Header .Set ("Authorization" , "Bearer " + client .bearerToken )
132
+ return
153
133
}
154
134
155
- resp , err := client .httpClient .Do (req )
156
- if err != nil {
157
- return nil , err
158
- }
159
- if resp .StatusCode == 503 {
160
- return nil , fmt .Errorf ("service unavailable" )
161
- }
135
+ defer resp .Body .Close ()
162
136
163
137
respData , err := io .ReadAll (resp .Body )
164
138
if err != nil {
165
139
return
166
140
}
141
+
167
142
if resp .StatusCode != 200 {
168
143
return nil , fmt .Errorf ("incorrect response code: %d for retrieving Ray Job details, response body: %s" , resp .StatusCode , respData )
169
144
}
145
+
170
146
err = json .Unmarshal (respData , & response )
171
- if err != nil {
172
- return nil , err
173
- }
174
- return response , nil
147
+ return
175
148
}
176
149
177
- func (client * rayClusterClient ) GetJobLogs (jobID string ) (logs string , err error ) {
178
- getJobLogsURL := client .endpoint .String () + "/api/jobs/" + jobID + "/logs"
179
- req , err := http .NewRequest (http .MethodGet , getJobLogsURL , nil )
180
- if err != nil {
181
- return "" , err
182
- }
183
- if client .bearerToken != "" {
184
- req .Header .Set ("Authorization" , "Bearer " + client .bearerToken )
185
- }
186
- resp , err := client .httpClient .Do (req )
150
+ func (client * rayClusterClient ) GetJobLogs (jobID string ) (response * RayJobLogsResponse , err error ) {
151
+ getJobLogsURL := client .config .Address + "/api/jobs/" + jobID + "/logs"
152
+
153
+ resp , err := client .config .Client .Get (getJobLogsURL )
187
154
if err != nil {
188
- return "" , err
155
+ return
189
156
}
190
157
158
+ defer resp .Body .Close ()
159
+
191
160
respData , err := io .ReadAll (resp .Body )
192
161
if err != nil {
193
- return "" , err
162
+ return
194
163
}
195
164
196
165
if resp .StatusCode != 200 {
197
- return "" , fmt .Errorf ("incorrect response code: %d for retrieving Ray Job logs, response body: %s" , resp .StatusCode , respData )
166
+ return nil , fmt .Errorf ("incorrect response code: %d for retrieving Ray Job logs, response body: %s" , resp .StatusCode , respData )
198
167
}
199
168
200
- jobLogs := RayJobLogsResponse {}
201
- err = json .Unmarshal (respData , & jobLogs )
202
- return jobLogs .Logs , err
169
+ err = json .Unmarshal (respData , & response )
170
+ return
203
171
}
0 commit comments