Skip to content

Update ray support functions to handle ray job api operation using tls check verification for insecure cluster #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 87 additions & 12 deletions support/ray_cluster_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package support

import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -45,20 +46,41 @@ type RayJobLogsResponse struct {
Logs string `json:"logs"`
}

type RayClusterClientConfig struct {
Address string
Client *http.Client
InsecureSkipVerify bool
}

var _ RayClusterClient = (*rayClusterClient)(nil)

type rayClusterClient struct {
endpoint url.URL
endpoint url.URL
httpClient *http.Client
bearerToken string
}

type RayClusterClient interface {
CreateJob(job *RayJobSetup) (*RayJobResponse, error)
GetJobDetails(jobID string) (*RayJobDetailsResponse, error)
GetJobLogs(jobID string) (string, error)
GetJobs() (*[]RayJobDetailsResponse, error)
}

func NewRayClusterClient(dashboardEndpoint url.URL) RayClusterClient {
return &rayClusterClient{endpoint: dashboardEndpoint}
func NewRayClusterClient(config RayClusterClientConfig, bearerToken string) (RayClusterClient, error) {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: config.InsecureSkipVerify},
Proxy: http.ProxyFromEnvironment,
}
config.Client = &http.Client{Transport: tr}
endpoint, err := url.Parse(config.Address)
if err != nil {
return nil, fmt.Errorf("invalid dashboard endpoint address")
}
rayClusterApiClient := &rayClusterClient{
endpoint: *endpoint, httpClient: config.Client, bearerToken: bearerToken,
}
return rayClusterApiClient, nil
}

func (client *rayClusterClient) CreateJob(job *RayJobSetup) (response *RayJobResponse, err error) {
Expand All @@ -68,7 +90,8 @@ func (client *rayClusterClient) CreateJob(job *RayJobSetup) (response *RayJobRes
}

createJobURL := client.endpoint.String() + "/api/jobs/"
resp, err := http.Post(createJobURL, "application/json", bytes.NewReader(marshalled))

resp, err := client.httpClient.Post(createJobURL, "application/json", bytes.NewReader(marshalled))
if err != nil {
return
}
Expand All @@ -86,36 +109,88 @@ func (client *rayClusterClient) CreateJob(job *RayJobSetup) (response *RayJobRes
return
}

func (client *rayClusterClient) GetJobs() (response *[]RayJobDetailsResponse, err error) {
getAllJobsDetailsURL := client.endpoint.String() + "/api/jobs/"

req, err := http.NewRequest(http.MethodGet, getAllJobsDetailsURL, nil)
if err != nil {
return nil, err
}
if client.bearerToken != "" {
req.Header.Set("Authorization", "Bearer "+client.bearerToken)
}
resp, err := client.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == 503 {
return nil, fmt.Errorf("service unavailable")
}
respData, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("incorrect response code: %d for retrieving Ray Job details, response body: %s", resp.StatusCode, respData)
}
err = json.Unmarshal(respData, &response)
if err != nil {
return nil, err
}
return response, nil
}

func (client *rayClusterClient) GetJobDetails(jobID string) (response *RayJobDetailsResponse, err error) {
getJobDetailsURL := client.endpoint.String() + "/api/jobs/" + jobID
resp, err := http.Get(getJobDetailsURL)

req, err := http.NewRequest(http.MethodGet, getJobDetailsURL, nil)
if err != nil {
return
return nil, err
}
if client.bearerToken != "" {
req.Header.Set("Authorization", "Bearer "+client.bearerToken)
}

resp, err := client.httpClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode == 503 {
return nil, fmt.Errorf("service unavailable")
}

respData, err := io.ReadAll(resp.Body)
if err != nil {
return
}

if resp.StatusCode != 200 {
return nil, fmt.Errorf("incorrect response code: %d for retrieving Ray Job details, response body: %s", resp.StatusCode, respData)
}

err = json.Unmarshal(respData, &response)
return
if err != nil {
return nil, err
}
return response, nil
}

func (client *rayClusterClient) GetJobLogs(jobID string) (logs string, err error) {
getJobLogsURL := client.endpoint.String() + "/api/jobs/" + jobID + "/logs"
resp, err := http.Get(getJobLogsURL)
req, err := http.NewRequest(http.MethodGet, getJobLogsURL, nil)
if err != nil {
return
return "", err
}
if client.bearerToken != "" {
req.Header.Set("Authorization", "Bearer "+client.bearerToken)
}
resp, err := client.httpClient.Do(req)
if err != nil {
return "", err
}

respData, err := io.ReadAll(resp.Body)
if err != nil {
return
return "", err
}

if resp.StatusCode != 200 {
Expand Down
1 change: 1 addition & 0 deletions support/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
TestTimeoutShort = 1 * time.Minute
TestTimeoutMedium = 2 * time.Minute
TestTimeoutLong = 5 * time.Minute
TestTimeoutDouble = 10 * time.Minute
TestTimeoutGpuProvisioning = 30 * time.Minute
)

Expand Down