From d0c1ea57cb7ef5ff08597ae238d26b245b455219 Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Tue, 20 Aug 2024 22:53:42 +0530 Subject: [PATCH 1/4] Update support functions to handle ray job api operation using tls verification and added functions for asserting ray job status --- support/ray_cluster_client.go | 110 +++++++++++++++++++++++++++++++--- 1 file changed, 101 insertions(+), 9 deletions(-) diff --git a/support/ray_cluster_client.go b/support/ray_cluster_client.go index bbce9af..4b44757 100644 --- a/support/ray_cluster_client.go +++ b/support/ray_cluster_client.go @@ -18,11 +18,13 @@ package support import ( "bytes" + "crypto/tls" "encoding/json" "fmt" "io" "net/http" "net/url" + "time" ) type RayJobSetup struct { @@ -45,20 +47,32 @@ type RayJobLogsResponse struct { Logs string `json:"logs"` } +type RayClusterClientConfig struct { + SkipTlsVerification bool +} + var _ RayClusterClient = (*rayClusterClient)(nil) type rayClusterClient struct { - endpoint url.URL + endpoint url.URL + httpClient *http.Client + authHeader string } type RayClusterClient interface { CreateJob(job *RayJobSetup) (*RayJobResponse, error) GetJobDetails(jobID string) (*RayJobDetailsResponse, error) GetJobLogs(jobID string) (string, error) + GetAllJobsData() ([]map[string]interface{}, error) + WaitForJobStatus(jobID string) (string, error) } -func NewRayClusterClient(dashboardEndpoint url.URL) RayClusterClient { - return &rayClusterClient{endpoint: dashboardEndpoint} +func NewRayClusterClient(dashboardEndpoint url.URL, config RayClusterClientConfig, authHeader string) RayClusterClient { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: config.SkipTlsVerification}, + Proxy: http.ProxyFromEnvironment, + } + return &rayClusterClient{endpoint: dashboardEndpoint, httpClient: &http.Client{Transport: tr}, authHeader: authHeader} } func (client *rayClusterClient) CreateJob(job *RayJobSetup) (response *RayJobResponse, err error) { @@ -68,7 +82,8 @@ func (client *rayClusterClient) CreateJob(job *RayJobSetup) (response *RayJobRes } createJobURL := client.endpoint.String() + "/api/jobs/" - resp, err := http.Post(createJobURL, "application/json", bytes.NewReader(marshalled)) + + resp, err := client.httpClient.Post(createJobURL, "application/json", bytes.NewReader(marshalled)) if err != nil { return } @@ -86,11 +101,51 @@ func (client *rayClusterClient) CreateJob(job *RayJobSetup) (response *RayJobRes return } +func (client *rayClusterClient) GetAllJobsData() ([]map[string]interface{}, error) { + getAllJobsDetailsURL := client.endpoint.String() + "/api/jobs/" + + req, err := http.NewRequest(http.MethodGet, getAllJobsDetailsURL, nil) + if err != nil { + return nil, err + } + if client.authHeader != "" { + req.Header.Set("Authorization", "Bearer "+client.authHeader) + } + resp, err := client.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode == 503 { + return nil, fmt.Errorf("service unavailable") + } + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + var result []map[string]interface{} + err = json.Unmarshal(body, &result) + if err != nil { + return nil, err + } + return result, nil +} + func (client *rayClusterClient) GetJobDetails(jobID string) (response *RayJobDetailsResponse, err error) { getJobDetailsURL := client.endpoint.String() + "/api/jobs/" + jobID - resp, err := http.Get(getJobDetailsURL) + + req, err := http.NewRequest(http.MethodGet, getJobDetailsURL, nil) if err != nil { - return + return nil, err + } + if client.authHeader != "" { + req.Header.Set("Authorization", "Bearer "+client.authHeader) + } + + resp, err := client.httpClient.Do(req) + if err != nil { + return nil, err } respData, err := io.ReadAll(resp.Body) @@ -108,14 +163,21 @@ func (client *rayClusterClient) GetJobDetails(jobID string) (response *RayJobDet func (client *rayClusterClient) GetJobLogs(jobID string) (logs string, err error) { getJobLogsURL := client.endpoint.String() + "/api/jobs/" + jobID + "/logs" - resp, err := http.Get(getJobLogsURL) + req, err := http.NewRequest(http.MethodGet, getJobLogsURL, nil) if err != nil { - return + return "", err + } + if client.authHeader != "" { + req.Header.Set("Authorization", "Bearer "+client.authHeader) + } + resp, err := client.httpClient.Do(req) + if err != nil { + return "", err } respData, err := io.ReadAll(resp.Body) if err != nil { - return + return "", err } if resp.StatusCode != 200 { @@ -126,3 +188,33 @@ func (client *rayClusterClient) GetJobLogs(jobID string) (logs string, err error err = json.Unmarshal(respData, &jobLogs) return jobLogs.Logs, err } + +func (client *rayClusterClient) WaitForJobStatus(jobID string) (string, error) { + var status string + var prevStatus string + fmt.Printf("Waiting for job to be Succeeded...\n") + var err error + var resp *RayJobDetailsResponse + for status != "SUCCEEDED" { + resp, err = client.GetJobDetails(jobID) + if err != nil { + time.Sleep(2 * time.Second) + continue + } + statusVal := resp.Status + if statusVal == "SUCCEEDED" || statusVal == "FAILED" { + fmt.Printf("JobStatus : %s\n", statusVal) + prevStatus = statusVal + return prevStatus, err + } + if prevStatus != statusVal && statusVal != "SUCCEEDED" { + fmt.Printf("JobStatus : %s...\n", statusVal) + prevStatus = statusVal + } + time.Sleep(3 * time.Second) + } + if prevStatus != "SUCCEEDED" { + err = fmt.Errorf("Job failed !") + } + return prevStatus, err +} From e15c13214f544d0dcc8b6cac1541fd07d1b7cd1b Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Wed, 21 Aug 2024 14:57:02 +0530 Subject: [PATCH 2/4] In WaitForJobStatus function, replaced for loop logic with the polling logic using gomega --- support/ray_cluster_client.go | 66 +++++++++++++++++------------------ support/support.go | 1 + 2 files changed, 34 insertions(+), 33 deletions(-) diff --git a/support/ray_cluster_client.go b/support/ray_cluster_client.go index 4b44757..9379447 100644 --- a/support/ray_cluster_client.go +++ b/support/ray_cluster_client.go @@ -25,6 +25,8 @@ import ( "net/http" "net/url" "time" + + . "github.com/onsi/gomega" ) type RayJobSetup struct { @@ -54,25 +56,25 @@ type RayClusterClientConfig struct { var _ RayClusterClient = (*rayClusterClient)(nil) type rayClusterClient struct { - endpoint url.URL - httpClient *http.Client - authHeader string + endpoint url.URL + httpClient *http.Client + bearerToken string } type RayClusterClient interface { CreateJob(job *RayJobSetup) (*RayJobResponse, error) GetJobDetails(jobID string) (*RayJobDetailsResponse, error) GetJobLogs(jobID string) (string, error) - GetAllJobsData() ([]map[string]interface{}, error) - WaitForJobStatus(jobID string) (string, error) + GetJobs() ([]map[string]interface{}, error) + WaitForJobStatus(test Test, jobID string) string } -func NewRayClusterClient(dashboardEndpoint url.URL, config RayClusterClientConfig, authHeader string) RayClusterClient { +func NewRayClusterClient(dashboardEndpoint url.URL, config RayClusterClientConfig, bearerToken string) RayClusterClient { tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: config.SkipTlsVerification}, Proxy: http.ProxyFromEnvironment, } - return &rayClusterClient{endpoint: dashboardEndpoint, httpClient: &http.Client{Transport: tr}, authHeader: authHeader} + return &rayClusterClient{endpoint: dashboardEndpoint, httpClient: &http.Client{Transport: tr}, bearerToken: bearerToken} } func (client *rayClusterClient) CreateJob(job *RayJobSetup) (response *RayJobResponse, err error) { @@ -101,15 +103,15 @@ func (client *rayClusterClient) CreateJob(job *RayJobSetup) (response *RayJobRes return } -func (client *rayClusterClient) GetAllJobsData() ([]map[string]interface{}, error) { +func (client *rayClusterClient) GetJobs() ([]map[string]interface{}, error) { getAllJobsDetailsURL := client.endpoint.String() + "/api/jobs/" req, err := http.NewRequest(http.MethodGet, getAllJobsDetailsURL, nil) if err != nil { return nil, err } - if client.authHeader != "" { - req.Header.Set("Authorization", "Bearer "+client.authHeader) + if client.bearerToken != "" { + req.Header.Set("Authorization", "Bearer "+client.bearerToken) } resp, err := client.httpClient.Do(req) if err != nil { @@ -139,8 +141,8 @@ func (client *rayClusterClient) GetJobDetails(jobID string) (response *RayJobDet if err != nil { return nil, err } - if client.authHeader != "" { - req.Header.Set("Authorization", "Bearer "+client.authHeader) + if client.bearerToken != "" { + req.Header.Set("Authorization", "Bearer "+client.bearerToken) } resp, err := client.httpClient.Do(req) @@ -167,8 +169,8 @@ func (client *rayClusterClient) GetJobLogs(jobID string) (logs string, err error if err != nil { return "", err } - if client.authHeader != "" { - req.Header.Set("Authorization", "Bearer "+client.authHeader) + if client.bearerToken != "" { + req.Header.Set("Authorization", "Bearer "+client.bearerToken) } resp, err := client.httpClient.Do(req) if err != nil { @@ -189,32 +191,30 @@ func (client *rayClusterClient) GetJobLogs(jobID string) (logs string, err error return jobLogs.Logs, err } -func (client *rayClusterClient) WaitForJobStatus(jobID string) (string, error) { +func (client *rayClusterClient) WaitForJobStatus(test Test, jobID string) string { var status string - var prevStatus string fmt.Printf("Waiting for job to be Succeeded...\n") - var err error - var resp *RayJobDetailsResponse - for status != "SUCCEEDED" { - resp, err = client.GetJobDetails(jobID) - if err != nil { - time.Sleep(2 * time.Second) - continue - } + + test.Eventually(func() string { + resp, err := client.GetJobDetails(jobID) + test.Expect(err).ToNot(HaveOccurred()) statusVal := resp.Status if statusVal == "SUCCEEDED" || statusVal == "FAILED" { fmt.Printf("JobStatus : %s\n", statusVal) - prevStatus = statusVal - return prevStatus, err + status = statusVal + return status } - if prevStatus != statusVal && statusVal != "SUCCEEDED" { + if status != statusVal && statusVal != "SUCCEEDED" { fmt.Printf("JobStatus : %s...\n", statusVal) - prevStatus = statusVal + status = statusVal } - time.Sleep(3 * time.Second) - } - if prevStatus != "SUCCEEDED" { - err = fmt.Errorf("Job failed !") + return status + }, TestTimeoutDouble, 3*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time") + + if status == "SUCCEEDED" { + fmt.Printf("Job succeeded !\n") + } else { + fmt.Printf("Job failed !\n") } - return prevStatus, err + return status } diff --git a/support/support.go b/support/support.go index 36c8c9b..a76c3a0 100644 --- a/support/support.go +++ b/support/support.go @@ -33,6 +33,7 @@ var ( TestTimeoutShort = 1 * time.Minute TestTimeoutMedium = 2 * time.Minute TestTimeoutLong = 5 * time.Minute + TestTimeoutDouble = 10 * time.Minute TestTimeoutGpuProvisioning = 30 * time.Minute ) From b834fd92509583ec42d12a6cb3dd1dfcda086467 Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Thu, 22 Aug 2024 16:40:05 +0530 Subject: [PATCH 3/4] Updated NewRayClusterClient function logic to utilise RayClusterClientConfig and removed waitForJobStatus function --- support/ray_cluster_client.go | 58 +++++++++++++---------------------- 1 file changed, 21 insertions(+), 37 deletions(-) diff --git a/support/ray_cluster_client.go b/support/ray_cluster_client.go index 9379447..50bcde0 100644 --- a/support/ray_cluster_client.go +++ b/support/ray_cluster_client.go @@ -24,9 +24,6 @@ import ( "io" "net/http" "net/url" - "time" - - . "github.com/onsi/gomega" ) type RayJobSetup struct { @@ -50,6 +47,8 @@ type RayJobLogsResponse struct { } type RayClusterClientConfig struct { + Address string + Client *http.Client SkipTlsVerification bool } @@ -66,15 +65,28 @@ type RayClusterClient interface { GetJobDetails(jobID string) (*RayJobDetailsResponse, error) GetJobLogs(jobID string) (string, error) GetJobs() ([]map[string]interface{}, error) - WaitForJobStatus(test Test, jobID string) string } -func NewRayClusterClient(dashboardEndpoint url.URL, config RayClusterClientConfig, bearerToken string) RayClusterClient { - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: config.SkipTlsVerification}, - Proxy: http.ProxyFromEnvironment, +var rayClusterApiClient RayClusterClient + +func NewRayClusterClient(config RayClusterClientConfig, bearerToken string) (RayClusterClient, error) { + if rayClusterApiClient == nil { + if config.Client == nil { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: config.SkipTlsVerification}, + Proxy: http.ProxyFromEnvironment, + } + config.Client = &http.Client{Transport: tr} + } + endpoint, err := url.Parse(config.Address) + if err != nil { + return nil, fmt.Errorf("invalid dashboard endpoint address") + } + rayClusterApiClient = &rayClusterClient{ + endpoint: *endpoint, httpClient: config.Client, bearerToken: bearerToken, + } } - return &rayClusterClient{endpoint: dashboardEndpoint, httpClient: &http.Client{Transport: tr}, bearerToken: bearerToken} + return rayClusterApiClient, nil } func (client *rayClusterClient) CreateJob(job *RayJobSetup) (response *RayJobResponse, err error) { @@ -190,31 +202,3 @@ func (client *rayClusterClient) GetJobLogs(jobID string) (logs string, err error err = json.Unmarshal(respData, &jobLogs) return jobLogs.Logs, err } - -func (client *rayClusterClient) WaitForJobStatus(test Test, jobID string) string { - var status string - fmt.Printf("Waiting for job to be Succeeded...\n") - - test.Eventually(func() string { - resp, err := client.GetJobDetails(jobID) - test.Expect(err).ToNot(HaveOccurred()) - statusVal := resp.Status - if statusVal == "SUCCEEDED" || statusVal == "FAILED" { - fmt.Printf("JobStatus : %s\n", statusVal) - status = statusVal - return status - } - if status != statusVal && statusVal != "SUCCEEDED" { - fmt.Printf("JobStatus : %s...\n", statusVal) - status = statusVal - } - return status - }, TestTimeoutDouble, 3*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time") - - if status == "SUCCEEDED" { - fmt.Printf("Job succeeded !\n") - } else { - fmt.Printf("Job failed !\n") - } - return status -} From d8cc6b3d27893eee0643ddd637c6a10c6c438dd8 Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Mon, 26 Aug 2024 16:54:30 +0530 Subject: [PATCH 4/4] Updated GetJobs functions to use RayJobDetails custom resource struct instead of using map --- support/ray_cluster_client.go | 59 +++++++++++++++++------------------ 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/support/ray_cluster_client.go b/support/ray_cluster_client.go index 50bcde0..ca9cd1a 100644 --- a/support/ray_cluster_client.go +++ b/support/ray_cluster_client.go @@ -47,9 +47,9 @@ type RayJobLogsResponse struct { } type RayClusterClientConfig struct { - Address string - Client *http.Client - SkipTlsVerification bool + Address string + Client *http.Client + InsecureSkipVerify bool } var _ RayClusterClient = (*rayClusterClient)(nil) @@ -64,27 +64,21 @@ type RayClusterClient interface { CreateJob(job *RayJobSetup) (*RayJobResponse, error) GetJobDetails(jobID string) (*RayJobDetailsResponse, error) GetJobLogs(jobID string) (string, error) - GetJobs() ([]map[string]interface{}, error) + GetJobs() (*[]RayJobDetailsResponse, error) } -var rayClusterApiClient RayClusterClient - func NewRayClusterClient(config RayClusterClientConfig, bearerToken string) (RayClusterClient, error) { - if rayClusterApiClient == nil { - if config.Client == nil { - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: config.SkipTlsVerification}, - Proxy: http.ProxyFromEnvironment, - } - config.Client = &http.Client{Transport: tr} - } - endpoint, err := url.Parse(config.Address) - if err != nil { - return nil, fmt.Errorf("invalid dashboard endpoint address") - } - rayClusterApiClient = &rayClusterClient{ - endpoint: *endpoint, httpClient: config.Client, bearerToken: bearerToken, - } + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: config.InsecureSkipVerify}, + Proxy: http.ProxyFromEnvironment, + } + config.Client = &http.Client{Transport: tr} + endpoint, err := url.Parse(config.Address) + if err != nil { + return nil, fmt.Errorf("invalid dashboard endpoint address") + } + rayClusterApiClient := &rayClusterClient{ + endpoint: *endpoint, httpClient: config.Client, bearerToken: bearerToken, } return rayClusterApiClient, nil } @@ -115,7 +109,7 @@ func (client *rayClusterClient) CreateJob(job *RayJobSetup) (response *RayJobRes return } -func (client *rayClusterClient) GetJobs() ([]map[string]interface{}, error) { +func (client *rayClusterClient) GetJobs() (response *[]RayJobDetailsResponse, err error) { getAllJobsDetailsURL := client.endpoint.String() + "/api/jobs/" req, err := http.NewRequest(http.MethodGet, getAllJobsDetailsURL, nil) @@ -133,17 +127,18 @@ func (client *rayClusterClient) GetJobs() ([]map[string]interface{}, error) { if resp.StatusCode == 503 { return nil, fmt.Errorf("service unavailable") } - body, err := io.ReadAll(resp.Body) + respData, err := io.ReadAll(resp.Body) if err != nil { return nil, err } - - var result []map[string]interface{} - err = json.Unmarshal(body, &result) + if resp.StatusCode != 200 { + return nil, fmt.Errorf("incorrect response code: %d for retrieving Ray Job details, response body: %s", resp.StatusCode, respData) + } + err = json.Unmarshal(respData, &response) if err != nil { return nil, err } - return result, nil + return response, nil } func (client *rayClusterClient) GetJobDetails(jobID string) (response *RayJobDetailsResponse, err error) { @@ -161,18 +156,22 @@ func (client *rayClusterClient) GetJobDetails(jobID string) (response *RayJobDet if err != nil { return nil, err } + if resp.StatusCode == 503 { + return nil, fmt.Errorf("service unavailable") + } respData, err := io.ReadAll(resp.Body) if err != nil { return } - if resp.StatusCode != 200 { return nil, fmt.Errorf("incorrect response code: %d for retrieving Ray Job details, response body: %s", resp.StatusCode, respData) } - err = json.Unmarshal(respData, &response) - return + if err != nil { + return nil, err + } + return response, nil } func (client *rayClusterClient) GetJobLogs(jobID string) (logs string, err error) {