Skip to content

Commit 0502e44

Browse files
authored
Fix AWS QueryPages backoff logic (#523)
1 parent af84ccf commit 0502e44

File tree

2 files changed

+43
-18
lines changed

2 files changed

+43
-18
lines changed

pkg/chunk/aws_storage_client.go

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -272,13 +272,32 @@ func (a awsStorageClient) QueryPages(ctx context.Context, query IndexQuery, call
272272
}
273273

274274
request := a.queryRequestFn(ctx, input)
275-
backoff := minBackoff
276275
pageCount := 0
277276
defer func() {
278277
dynamoQueryPagesCount.Observe(float64(pageCount))
279278
}()
279+
280280
for page := request; page != nil; page = page.NextPage() {
281281
pageCount++
282+
283+
response, err := a.queryPage(ctx, input, page)
284+
if err != nil {
285+
return err
286+
}
287+
288+
if getNextPage := callback(response, !page.HasNextPage()); !getNextPage {
289+
if err != nil {
290+
return fmt.Errorf("QueryPages error: table=%v, err=%v", *input.TableName, page.Error())
291+
}
292+
return nil
293+
}
294+
}
295+
return nil
296+
}
297+
298+
func (a awsStorageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (dynamoDBReadResponse, error) {
299+
backoff := minBackoff
300+
for i := 0; i < maxRetries; i++ {
282301
err := instrument.TimeRequestHistogram(ctx, "DynamoDB.QueryPages", dynamoRequestDuration, func(_ context.Context) error {
283302
return page.Send()
284303
})
@@ -290,28 +309,18 @@ func (a awsStorageClient) QueryPages(ctx context.Context, query IndexQuery, call
290309

291310
if err != nil {
292311
recordDynamoError(*input.TableName, err, "DynamoDB.QueryPages")
293-
294312
if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || page.Retryable()) {
295313
time.Sleep(backoff)
296314
backoff = nextBackoff(backoff)
297315
continue
298316
}
299-
300-
return fmt.Errorf("QueryPages error: table=%v, err=%v", *input.TableName, err)
317+
return nil, fmt.Errorf("QueryPage error: table=%v, err=%v", *input.TableName, err)
301318
}
302319

303320
queryOutput := page.Data().(*dynamodb.QueryOutput)
304-
if getNextPage := callback(dynamoDBReadResponse(queryOutput.Items), !page.HasNextPage()); !getNextPage {
305-
if err != nil {
306-
return fmt.Errorf("QueryPages error: table=%v, err=%v", *input.TableName, page.Error())
307-
}
308-
return nil
309-
}
310-
311-
backoff = minBackoff
321+
return dynamoDBReadResponse(queryOutput.Items), nil
312322
}
313-
314-
return nil
323+
return nil, fmt.Errorf("QueryPage error: maxRetries exceeded for table %v", *input.TableName)
315324
}
316325

317326
type dynamoDBRequest interface {

pkg/chunk/aws_storage_client_test.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -605,16 +605,18 @@ func TestAWSStorageClientQueryPages(t *testing.T) {
605605
}
606606

607607
tests := []struct {
608-
name string
609-
query IndexQuery
610-
want []IndexEntry
608+
name string
609+
query IndexQuery
610+
provisionedErr int
611+
want []IndexEntry
611612
}{
612613
{
613614
"check HashValue only",
614615
IndexQuery{
615616
TableName: "table",
616617
HashValue: "flip",
617618
},
619+
0,
618620
[]IndexEntry{entries[5], entries[6], entries[7]},
619621
},
620622
{
@@ -624,6 +626,7 @@ func TestAWSStorageClientQueryPages(t *testing.T) {
624626
HashValue: "foo",
625627
RangeValueStart: []byte("bar:2"),
626628
},
629+
0,
627630
[]IndexEntry{entries[1], entries[2], entries[3], entries[4]},
628631
},
629632
{
@@ -633,6 +636,7 @@ func TestAWSStorageClientQueryPages(t *testing.T) {
633636
HashValue: "foo",
634637
RangeValuePrefix: []byte("baz:"),
635638
},
639+
0,
636640
[]IndexEntry{entries[3], entries[4]},
637641
},
638642
{
@@ -643,13 +647,25 @@ func TestAWSStorageClientQueryPages(t *testing.T) {
643647
RangeValuePrefix: []byte("bar"),
644648
ValueEqual: []byte("20"),
645649
},
650+
0,
651+
[]IndexEntry{entries[1]},
652+
},
653+
{
654+
"check retry logic",
655+
IndexQuery{
656+
TableName: "table",
657+
HashValue: "foo",
658+
RangeValuePrefix: []byte("bar"),
659+
ValueEqual: []byte("20"),
660+
},
661+
2,
646662
[]IndexEntry{entries[1]},
647663
},
648664
}
649665

650666
for _, tt := range tests {
651667
t.Run(tt.name, func(t *testing.T) {
652-
dynamoDB := newMockDynamoDB(0, 0)
668+
dynamoDB := newMockDynamoDB(0, tt.provisionedErr)
653669
client := awsStorageClient{
654670
DynamoDB: dynamoDB,
655671
queryRequestFn: dynamoDB.queryRequest,

0 commit comments

Comments
 (0)