Skip to content

Commit 2dc182d

Browse files
committed
Review handling of status code in HTTP requests
1 parent 208501b commit 2dc182d

File tree

13 files changed

+222
-99
lines changed

13 files changed

+222
-99
lines changed

internal/benchrunner/runners/rally/metrics.go

+19-15
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,14 @@ func (c *collector) stop() {
112112
}
113113

114114
func (c *collector) collectMetricsBeforeRallyRun() {
115-
_, err := c.esAPI.Indices.Refresh(c.esAPI.Indices.Refresh.WithIndex(c.datastream))
115+
resp, err := c.esAPI.Indices.Refresh(c.esAPI.Indices.Refresh.WithIndex(c.datastream))
116116
if err != nil {
117-
logger.Errorf("unable to refresh data stream at the beginning of rally run")
117+
logger.Errorf("unable to refresh data stream at the beginning of rally run: %s", err)
118+
return
119+
}
120+
defer resp.Body.Close()
121+
if resp.IsError() {
122+
logger.Errorf("unable to refresh data stream at the beginning of rally run: %s", resp)
118123
return
119124
}
120125

@@ -157,19 +162,13 @@ func (c *collector) publish(events [][]byte) {
157162
logger.Errorf("error indexing event in metricstore: %w", err)
158163
return
159164
}
160-
161-
if resp.Body == nil {
162-
logger.Errorf("empty index response body from metricstore: %w", err)
163-
return
164-
}
165+
defer resp.Body.Close()
165166

166167
body, err := io.ReadAll(resp.Body)
167168
if err != nil {
168169
logger.Errorf("failed to read index response body from metricstore: %w", err)
169170
}
170171

171-
resp.Body.Close()
172-
173172
if resp.StatusCode != 201 {
174173
logger.Errorf("error indexing event in metricstore (%d): %s: %v", resp.StatusCode, resp.Status(), elasticsearch.NewError(body))
175174
}
@@ -187,18 +186,18 @@ func (c *collector) createMetricsIndex() {
187186

188187
logger.Debugf("creating %s index in metricstore...", c.indexName())
189188

190-
createRes, err := c.metricsAPI.Indices.Create(
189+
resp, err := c.metricsAPI.Indices.Create(
191190
c.indexName(),
192191
c.metricsAPI.Indices.Create.WithBody(reader),
193192
)
194193
if err != nil {
195194
logger.Errorf("could not create index: %w", err)
196195
return
197196
}
198-
createRes.Body.Close()
197+
defer resp.Body.Close()
199198

200-
if createRes.IsError() {
201-
logger.Errorf("got a response error while creating index")
199+
if resp.IsError() {
200+
logger.Errorf("got a response error while creating index: %s", resp)
202201
}
203202
}
204203

@@ -287,9 +286,14 @@ func (c *collector) collectDiskUsage() map[string]ingest.DiskUsage {
287286
}
288287

289288
func (c *collector) collectMetricsAfterRallyRun() {
290-
_, err := c.esAPI.Indices.Refresh(c.esAPI.Indices.Refresh.WithIndex(c.datastream))
289+
resp, err := c.esAPI.Indices.Refresh(c.esAPI.Indices.Refresh.WithIndex(c.datastream))
291290
if err != nil {
292-
logger.Errorf("unable to refresh data stream at the end of rally run")
291+
logger.Errorf("unable to refresh data stream at the end of rally run: %s", err)
292+
return
293+
}
294+
defer resp.Body.Close()
295+
if resp.IsError() {
296+
logger.Errorf("unable to refresh data stream at the end of rally run: %s", resp)
293297
return
294298
}
295299

internal/benchrunner/runners/rally/runner.go

+68-39
Original file line numberDiff line numberDiff line change
@@ -336,9 +336,13 @@ func (r *runner) collectAndSummarizeMetrics() (*metricsSummary, error) {
336336

337337
func (r *runner) deleteDataStreamDocs(dataStream string) error {
338338
body := strings.NewReader(`{ "query": { "match_all": {} } }`)
339-
_, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body)
339+
resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body)
340340
if err != nil {
341-
return err
341+
return fmt.Errorf("failed to delete data stream docs for data stream %s: %w", dataStream, err)
342+
}
343+
defer resp.Body.Close()
344+
if resp.IsError() {
345+
return fmt.Errorf("failed to delete data stream docs for data stream %s: %s", dataStream, resp.String())
342346
}
343347
return nil
344348
}
@@ -665,6 +669,9 @@ func (r *runner) reindexData() error {
665669
return fmt.Errorf("error getting mapping: %w", err)
666670
}
667671
defer mappingRes.Body.Close()
672+
if mappingRes.IsError() {
673+
return fmt.Errorf("error getting mapping: %s", mappingRes)
674+
}
668675

669676
body, err := io.ReadAll(mappingRes.Body)
670677
if err != nil {
@@ -709,7 +716,7 @@ func (r *runner) reindexData() error {
709716
defer createRes.Body.Close()
710717

711718
if createRes.IsError() {
712-
return errors.New("got a response error while creating index")
719+
return fmt.Errorf("got a response error while creating index: %s", createRes)
713720
}
714721

715722
bodyReader := strings.NewReader(`{"query":{"match_all":{}}}`)
@@ -725,21 +732,13 @@ func (r *runner) reindexData() error {
725732
return fmt.Errorf("error executing search: %w", err)
726733
}
727734
defer res.Body.Close()
728-
729-
type searchRes struct {
730-
Error *struct {
731-
Reason string `json:"reson"`
732-
} `json:"error"`
733-
ScrollID string `json:"_scroll_id"`
734-
Hits []struct {
735-
ID string `json:"_id"`
736-
Source map[string]interface{} `json:"_source"`
737-
} `json:"hits"`
735+
if res.IsError() {
736+
return fmt.Errorf("error executing search: %s", res)
738737
}
739738

740739
// Iterate through the search results using the Scroll API
741740
for {
742-
var sr searchRes
741+
var sr searchResponse
743742
if err := json.NewDecoder(res.Body).Decode(&sr); err != nil {
744743
return fmt.Errorf("error decoding search response: %w", err)
745744
}
@@ -752,40 +751,66 @@ func (r *runner) reindexData() error {
752751
break
753752
}
754753

755-
var bulkBodyBuilder strings.Builder
756-
for _, hit := range sr.Hits {
757-
bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID))
758-
enriched := r.enrichEventWithBenchmarkMetadata(hit.Source)
759-
src, err := json.Marshal(enriched)
760-
if err != nil {
761-
return fmt.Errorf("error decoding _source: %w", err)
762-
}
763-
bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src)))
754+
err := r.bulkMetrics(indexName, sr)
755+
if err != nil {
756+
return err
764757
}
758+
}
759+
760+
logger.Debug("reindexing operation finished")
761+
return nil
762+
}
765763

766-
logger.Debugf("bulk request of %d events...", len(sr.Hits))
764+
type searchResponse struct {
765+
Error *struct {
766+
Reason string `json:"reson"`
767+
} `json:"error"`
768+
ScrollID string `json:"_scroll_id"`
769+
Hits []struct {
770+
ID string `json:"_id"`
771+
Source map[string]interface{} `json:"_source"`
772+
} `json:"hits"`
773+
}
767774

768-
bulkRes, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String()))
775+
func (r *runner) bulkMetrics(indexName string, sr searchResponse) error {
776+
var bulkBodyBuilder strings.Builder
777+
for _, hit := range sr.Hits {
778+
bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID))
779+
enriched := r.enrichEventWithBenchmarkMetadata(hit.Source)
780+
src, err := json.Marshal(enriched)
769781
if err != nil {
770-
return fmt.Errorf("error performing the bulk index request: %w", err)
782+
return fmt.Errorf("error decoding _source: %w", err)
771783
}
772-
bulkRes.Body.Close()
784+
bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src)))
785+
}
773786

774-
if sr.ScrollID == "" {
775-
return errors.New("error getting scroll ID")
776-
}
787+
logger.Debugf("bulk request of %d events...", len(sr.Hits))
777788

778-
res, err = r.options.ESAPI.Scroll(
779-
r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID),
780-
r.options.ESAPI.Scroll.WithScroll(time.Minute),
781-
)
782-
if err != nil {
783-
return fmt.Errorf("error executing scroll: %s", err)
784-
}
785-
res.Body.Close()
789+
resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String()))
790+
if err != nil {
791+
return fmt.Errorf("error performing the bulk index request: %w", err)
792+
}
793+
defer resp.Body.Close()
794+
if resp.IsError() {
795+
return fmt.Errorf("error performing the bulk index request: %s", resp)
786796
}
787797

788-
logger.Debug("reindexing operation finished")
798+
if sr.ScrollID == "" {
799+
return errors.New("error getting scroll ID")
800+
}
801+
802+
resp, err = r.options.ESAPI.Scroll(
803+
r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID),
804+
r.options.ESAPI.Scroll.WithScroll(time.Minute),
805+
)
806+
if err != nil {
807+
return fmt.Errorf("error executing scroll: %s", err)
808+
}
809+
if resp.IsError() {
810+
return fmt.Errorf("error executing scroll: %s", resp)
811+
}
812+
resp.Body.Close()
813+
789814
return nil
790815
}
791816

@@ -815,6 +840,10 @@ func getTotalHits(esapi *elasticsearch.API, dataStream string) (int, error) {
815840
}
816841
defer resp.Body.Close()
817842

843+
if resp.IsError() {
844+
return 0, fmt.Errorf("failed to get hits count: %s", resp.String())
845+
}
846+
818847
var results struct {
819848
Count int
820849
Error *struct {

internal/benchrunner/runners/system/metrics.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func (c *collector) createMetricsIndex() {
196196
createRes.Body.Close()
197197

198198
if createRes.IsError() {
199-
logger.Debug("got a response error while creating index")
199+
logger.Debug("got a response error while creating index: %s", createRes)
200200
}
201201
}
202202

0 commit comments

Comments
 (0)