Skip to content

Commit 4f25f7c

Browse files
committed
Added the ability to retry a bulk request
1 parent 5b18f9a commit 4f25f7c

File tree

2 files changed

+32
-18
lines changed

2 files changed

+32
-18
lines changed

src/Foundatio.Repositories.Elasticsearch/Extensions/ElasticIndexExtensions.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -103,35 +103,35 @@ public static IDictionary<string, AggregationResult> ToAggregationResult<T>(this
103103
return res.Aggregations.ToAggregationResult();
104104
}
105105

106-
public static Task<IBulkResponse> IndexManyAsync<T>(this IElasticClient client, IEnumerable<T> objects, Func<T, string> getParent, Func<T, string> getIndex = null, string type = null) where T : class {
106+
public static Task<IBulkResponse> IndexManyAsync<T>(this IElasticClient client, IEnumerable<T> objects, Func<T, string> getParent, Func<T, string> getIndex = null, string type = null, bool isCreateOperation = false) where T : class {
107107
if (objects == null)
108108
throw new ArgumentNullException(nameof(objects));
109109

110-
if (getParent == null && getIndex == null)
111-
return client.IndexManyAsync(objects, null, type);
112-
113-
var indexBulkRequest = CreateIndexBulkRequest(objects, getIndex, type, getParent);
110+
var indexBulkRequest = CreateIndexBulkRequest(objects, getIndex, type, getParent, isCreateOperation);
114111
return client.BulkAsync(indexBulkRequest);
115112
}
116113

117-
private static BulkRequest CreateIndexBulkRequest<T>(IEnumerable<T> objects, Func<T, string> getIndex, string type, Func<T, string> getParent) where T : class {
114+
private static BulkRequest CreateIndexBulkRequest<T>(IEnumerable<T> objects, Func<T, string> getIndex, string type, Func<T, string> getParent, bool isCreateOperation) where T : class {
118115
var bulkRequest = new BulkRequest();
119116
var list = objects.Select(o => {
120-
var doc = new BulkIndexOperation<T>(o) { Type = type };
117+
IBulkOperation doc = isCreateOperation ? (IBulkOperation)new BulkCreateOperation<T>(o) : new BulkIndexOperation<T>(o);
118+
doc.Type = type;
121119
if (getParent != null)
122120
doc.Parent = getParent(o);
123121

124122
if (getIndex != null)
125123
doc.Index = getIndex(o);
126124

127-
var versionedDoc = o as IVersioned;
128-
if (versionedDoc != null && versionedDoc.Version > 0)
129-
doc.Version = versionedDoc.Version;
125+
if (!isCreateOperation) {
126+
var versionedDoc = o as IVersioned;
127+
if (versionedDoc != null)
128+
doc.Version = versionedDoc.Version;
129+
}
130130

131131
return doc;
132-
}).Cast<IBulkOperation>().ToList();
132+
}).ToList();
133133
bulkRequest.Operations = list;
134-
134+
135135
return bulkRequest;
136136
}
137137

src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticRepositoryBase.cs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public async Task AddAsync(IEnumerable<T> documents, bool addToCache = false, Ti
5858
foreach (var doc in docs)
5959
await _validator.ValidateAndThrowAsync(doc).AnyContext();
6060

61-
await IndexDocumentsAsync(docs).AnyContext();
61+
await IndexDocumentsAsync(docs, isCreateOperation: true).AnyContext();
6262

6363
if (addToCache)
6464
await AddToCacheAsync(docs, expiresIn).AnyContext();
@@ -668,7 +668,7 @@ private async Task OnDocumentsChangedAsync(ChangeType changeType, IReadOnlyColle
668668

669669
#endregion
670670

671-
private async Task IndexDocumentsAsync(IReadOnlyCollection<T> documents) {
671+
private async Task IndexDocumentsAsync(IReadOnlyCollection<T> documents, bool isCreateOperation = false) {
672672
if (HasMultipleIndexes) {
673673
foreach (var documentGroup in documents.GroupBy(TimeSeriesType.GetDocumentIndex))
674674
await TimeSeriesType.EnsureIndexAsync(documentGroup.First()).AnyContext();
@@ -677,6 +677,7 @@ private async Task IndexDocumentsAsync(IReadOnlyCollection<T> documents) {
677677
if (documents.Count == 1) {
678678
var document = documents.Single();
679679
var response = await _client.IndexAsync(document, i => {
680+
i.OpType(isCreateOperation ? OpType.Create : OpType.Index);
680681
i.Type(ElasticType.Name);
681682

682683
if (GetParentIdFunc != null)
@@ -685,10 +686,9 @@ private async Task IndexDocumentsAsync(IReadOnlyCollection<T> documents) {
685686
if (GetDocumentIndexFunc != null)
686687
i.Index(GetDocumentIndexFunc(document));
687688

688-
if (HasVersion) {
689+
if (HasVersion && isCreateOperation) {
689690
var versionDoc = (IVersioned)document;
690-
if (versionDoc.Version > 0)
691-
i.Version(versionDoc.Version);
691+
i.Version(versionDoc.Version);
692692
}
693693

694694
return i;
@@ -706,7 +706,7 @@ private async Task IndexDocumentsAsync(IReadOnlyCollection<T> documents) {
706706
versionDoc.Version = response.Version;
707707
}
708708
} else {
709-
var response = await _client.IndexManyAsync(documents, GetParentIdFunc, GetDocumentIndexFunc, ElasticType.Name).AnyContext();
709+
var response = await _client.IndexManyAsync(documents, GetParentIdFunc, GetDocumentIndexFunc, ElasticType.Name, isCreateOperation).AnyContext();
710710
_logger.Trace(() => response.GetRequest());
711711

712712
if (HasVersion) {
@@ -723,12 +723,26 @@ private async Task IndexDocumentsAsync(IReadOnlyCollection<T> documents) {
723723
}
724724
}
725725

726+
var allErrors = response.ItemsWithErrors.ToList();
727+
if (allErrors.Count > 0) {
728+
var retryableIds = allErrors.Where(e => e.Status == 429 || e.Status == 503).Select(e => e.Id).ToList();
729+
if (retryableIds.Count > 0) {
730+
var docs = documents.Where(d => retryableIds.Contains(d.Id)).ToList();
731+
await IndexDocumentsAsync(docs, isCreateOperation).AnyContext();
732+
733+
// return as all recoverable items were retried.
734+
if (allErrors.Count == retryableIds.Count)
735+
return;
736+
}
737+
}
738+
726739
if (!response.IsValid) {
727740
string message = response.GetErrorMessage();
728741
_logger.Error().Exception(response.OriginalException).Message(message).Property("request", response.GetRequest()).Write();
729742
throw new ApplicationException(message, response.OriginalException);
730743
}
731744
}
745+
// 429 // 503
732746
}
733747

734748
protected virtual async Task AddToCacheAsync(ICollection<T> documents, TimeSpan? expiresIn = null) {

0 commit comments

Comments
 (0)