Skip to content

Commit 1da4cd5

Browse files
committed
Property throw Duplicate Doc exception + WIP pipelines
1 parent 4f25f7c commit 1da4cd5

File tree

18 files changed

+329
-71
lines changed

18 files changed

+329
-71
lines changed

src/Foundatio.Repositories.Elasticsearch/Configuration/DailyIndex.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,9 @@ public void AddAlias(string name, TimeSpan? maxAge = null) {
6767
});
6868
}
6969

70-
public override Task ConfigureAsync() {
71-
return Task.CompletedTask;
70+
public override async Task ConfigureAsync() {
71+
foreach (var t in IndexTypes)
72+
await t.ConfigureAsync().AnyContext();
7273
}
7374

7475
protected override async Task CreateAliasAsync(string index, string name) {

src/Foundatio.Repositories.Elasticsearch/Configuration/IIndexType.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Threading.Tasks;
34
using Foundatio.Repositories.Elasticsearch.Extensions;
45
using Foundatio.Repositories.Elasticsearch.Queries.Builders;
56
using Foundatio.Repositories.Models;
@@ -15,7 +16,8 @@ public interface IIndexType : IDisposable {
1516
int DefaultCacheExpirationSeconds { get; set; }
1617
int BulkBatchSize { get; set; }
1718
ISet<string> AllowedAggregationFields { get; }
18-
CreateIndexDescriptor Configure(CreateIndexDescriptor idx);
19+
Task ConfigureAsync();
20+
CreateIndexDescriptor ConfigureIndex(CreateIndexDescriptor idx);
1921
void ConfigureSettings(ConnectionSettings settings);
2022
IElasticQueryBuilder QueryBuilder { get; }
2123
}
@@ -79,7 +81,11 @@ public virtual string CreateDocumentId(T document) {
7981
return ObjectId.GenerateNewId().ToString();
8082
}
8183

82-
public virtual CreateIndexDescriptor Configure(CreateIndexDescriptor idx) {
84+
public virtual Task ConfigureAsync() {
85+
return Task.CompletedTask;
86+
}
87+
88+
public virtual CreateIndexDescriptor ConfigureIndex(CreateIndexDescriptor idx) {
8389
return idx.Mappings(m => m.Map<T>(Name, BuildMapping));
8490
}
8591

@@ -96,4 +102,8 @@ public virtual void Dispose() {}
96102
public int DefaultCacheExpirationSeconds { get; set; } = RepositoryConstants.DEFAULT_CACHE_EXPIRATION_SECONDS;
97103
public int BulkBatchSize { get; set; } = 1000;
98104
}
105+
106+
public interface IHavePipelinedIndexType {
107+
string Pipeline { get; }
108+
}
99109
}

src/Foundatio.Repositories.Elasticsearch/Configuration/Index.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
using System;
22
using System.Threading.Tasks;
3+
using Foundatio.Repositories.Extensions;
34

45
namespace Foundatio.Repositories.Elasticsearch.Configuration {
56
public class Index : IndexBase {
67
public Index(IElasticConfiguration configuration, string name) : base(configuration, name) {}
78

8-
public override Task ConfigureAsync() {
9-
return CreateIndexAsync(Name, ConfigureDescriptor);
9+
public override async Task ConfigureAsync() {
10+
await base.ConfigureAsync().AnyContext();
11+
await CreateIndexAsync(Name, ConfigureDescriptor).AnyContext();
1012
}
1113
}
1214

src/Foundatio.Repositories.Elasticsearch/Configuration/IndexBase.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ public IIndexType<T> AddDynamicType<T>(string name) where T : class {
4444
return indexType;
4545
}
4646

47-
public abstract Task ConfigureAsync();
47+
public virtual async Task ConfigureAsync() {
48+
foreach (var t in IndexTypes)
49+
await t.ConfigureAsync().AnyContext();
50+
}
4851

4952
public virtual Task DeleteAsync() {
5053
return DeleteIndexAsync(Name);
@@ -85,7 +88,7 @@ protected virtual async Task CreateIndexAsync(string name, Func<CreateIndexDescr
8588

8689
public virtual CreateIndexDescriptor ConfigureDescriptor(CreateIndexDescriptor idx) {
8790
foreach (var t in IndexTypes)
88-
idx = t.Configure(idx);
91+
idx = t.ConfigureIndex(idx);
8992

9093
return idx;
9194
}

src/Foundatio.Repositories.Elasticsearch/Configuration/VersionedIndex.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public VersionedIndex(IElasticConfiguration configuration, string name, int vers
2323
public bool DiscardIndexesOnReindex { get; set; } = true;
2424

2525
public override async Task ConfigureAsync() {
26+
await base.ConfigureAsync().AnyContext();
2627
if (!await IndexExistsAsync(VersionedName).AnyContext()) {
2728
if (!await AliasExistsAsync(Name).AnyContext())
2829
await CreateIndexAsync(VersionedName, d => ConfigureDescriptor(d).Aliases(ad => ad.Alias(Name))).AnyContext();

src/Foundatio.Repositories.Elasticsearch/Extensions/ElasticIndexExtensions.cs

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
using System.Threading.Tasks;
55
using Foundatio.Repositories.Models;
66
using Nest;
7-
using Foundatio.Repositories.Elasticsearch.Queries.Builders;
8-
using Foundatio.Repositories.Extensions;
97
using Foundatio.Utility;
108

119
namespace Foundatio.Repositories.Elasticsearch.Extensions {
@@ -103,38 +101,6 @@ public static IDictionary<string, AggregationResult> ToAggregationResult<T>(this
103101
return res.Aggregations.ToAggregationResult();
104102
}
105103

106-
public static Task<IBulkResponse> IndexManyAsync<T>(this IElasticClient client, IEnumerable<T> objects, Func<T, string> getParent, Func<T, string> getIndex = null, string type = null, bool isCreateOperation = false) where T : class {
107-
if (objects == null)
108-
throw new ArgumentNullException(nameof(objects));
109-
110-
var indexBulkRequest = CreateIndexBulkRequest(objects, getIndex, type, getParent, isCreateOperation);
111-
return client.BulkAsync(indexBulkRequest);
112-
}
113-
114-
private static BulkRequest CreateIndexBulkRequest<T>(IEnumerable<T> objects, Func<T, string> getIndex, string type, Func<T, string> getParent, bool isCreateOperation) where T : class {
115-
var bulkRequest = new BulkRequest();
116-
var list = objects.Select(o => {
117-
IBulkOperation doc = isCreateOperation ? (IBulkOperation)new BulkCreateOperation<T>(o) : new BulkIndexOperation<T>(o);
118-
doc.Type = type;
119-
if (getParent != null)
120-
doc.Parent = getParent(o);
121-
122-
if (getIndex != null)
123-
doc.Index = getIndex(o);
124-
125-
if (!isCreateOperation) {
126-
var versionedDoc = o as IVersioned;
127-
if (versionedDoc != null)
128-
doc.Version = versionedDoc.Version;
129-
}
130-
131-
return doc;
132-
}).ToList();
133-
bulkRequest.Operations = list;
134-
135-
return bulkRequest;
136-
}
137-
138104
public static PropertiesDescriptor<T> SetupDefaults<T>(this PropertiesDescriptor<T> pd) where T : class {
139105
var hasIdentity = typeof(IIdentity).IsAssignableFrom(typeof(T));
140106
var hasDates = typeof(IHaveDates).IsAssignableFrom(typeof(T));

src/Foundatio.Repositories.Elasticsearch/Repositories/ElasticRepositoryBase.cs

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
using Foundatio.Repositories.Elasticsearch.Configuration;
1414
using Foundatio.Repositories.Elasticsearch.Models;
1515
using Foundatio.Repositories.Elasticsearch.Queries.Builders;
16+
using Foundatio.Repositories.Exceptions;
1617
using Foundatio.Repositories.Extensions;
1718
using Foundatio.Repositories.JsonPatch;
1819
using Foundatio.Repositories.Models;
@@ -108,10 +109,11 @@ public async Task PatchAsync(string id, object update, bool sendNotification = t
108109
if (update == null)
109110
throw new ArgumentNullException(nameof(update));
110111

112+
string pipeline = ElasticType is IHavePipelinedIndexType ? ((IHavePipelinedIndexType)ElasticType).Pipeline : null;
111113
string script = update as string;
112114
var patch = update as PatchDocument;
113-
114115
if (script != null) {
116+
// TODO: Figure out how to specify a pipeline here.
115117
var request = new UpdateRequest<T, T>(GetIndexById(id), ElasticType.Name, id) {
116118
Script = script,
117119
RetryOnConflict = 10
@@ -140,7 +142,7 @@ public async Task PatchAsync(string id, object update, bool sendNotification = t
140142
var target = response.Source as JToken;
141143
new JsonPatcher().Patch(ref target, patch);
142144

143-
var updateResponse = await _client.LowLevel.IndexPutAsync<object>(response.Index, response.Type, id, new PostData<object>(target.ToString())).AnyContext();
145+
var updateResponse = await _client.LowLevel.IndexPutAsync<object>(response.Index, response.Type, id, new PostData<object>(target.ToString()), p => p.Pipeline(pipeline)).AnyContext();
144146
_logger.Trace(() => updateResponse.GetRequest());
145147

146148
if (!updateResponse.Success) {
@@ -149,6 +151,7 @@ public async Task PatchAsync(string id, object update, bool sendNotification = t
149151
throw new ApplicationException(message, updateResponse.OriginalException);
150152
}
151153
} else {
154+
// TODO: Figure out how to specify a pipeline here.
152155
var request = new UpdateRequest<T, object>(GetIndexById(id), ElasticType.Name, id) {
153156
Doc = update,
154157
RetryOnConflict = 10
@@ -195,9 +198,12 @@ public async Task PatchAsync(IEnumerable<string> ids, object update, bool sendNo
195198
return;
196199
}
197200

201+
string pipeline = ElasticType is IHavePipelinedIndexType ? ((IHavePipelinedIndexType)ElasticType).Pipeline : null;
198202
var script = update as string;
199203
var bulkResponse = await _client.BulkAsync(b => {
200204
foreach (var id in idList) {
205+
b.Pipeline(pipeline);
206+
201207
if (script != null)
202208
b.Update<T>(u => u
203209
.Id(id)
@@ -242,6 +248,7 @@ protected async Task<long> PatchAllAsync<TQuery>(TQuery query, object update, bo
242248
throw new ArgumentNullException(nameof(update));
243249

244250
long affectedRecords = 0;
251+
string pipeline = ElasticType is IHavePipelinedIndexType ? ((IHavePipelinedIndexType)ElasticType).Pipeline : null;
245252
var patch = update as PatchDocument;
246253
if (patch != null) {
247254
var patcher = new JsonPatcher();
@@ -256,6 +263,7 @@ protected async Task<long> PatchAllAsync<TQuery>(TQuery query, object update, bo
256263
.Id(h.Id)
257264
.Index(h.GetIndex())
258265
.Type(h.GetIndexType())
266+
.Pipeline(pipeline)
259267
.Version(h.Version));
260268
}
261269

@@ -293,6 +301,7 @@ protected async Task<long> PatchAllAsync<TQuery>(TQuery query, object update, bo
293301
Query = ElasticType.QueryBuilder.BuildQuery(query, GetQueryOptions(), new SearchDescriptor<T>()),
294302
Conflicts = Conflicts.Proceed,
295303
Script = new InlineScript(script),
304+
Pipeline = pipeline,
296305
Version = HasVersion
297306
};
298307

@@ -313,6 +322,8 @@ protected async Task<long> PatchAllAsync<TQuery>(TQuery query, object update, bo
313322

314323
affectedRecords += await BatchProcessAsync(query, async results => {
315324
var bulkResult = await _client.BulkAsync(b => {
325+
b.Pipeline(pipeline);
326+
316327
foreach (var h in results.Hits) {
317328
if (script != null)
318329
b.Update<T>(u => u
@@ -674,11 +685,13 @@ private async Task IndexDocumentsAsync(IReadOnlyCollection<T> documents, bool is
674685
await TimeSeriesType.EnsureIndexAsync(documentGroup.First()).AnyContext();
675686
}
676687

688+
string pipeline = ElasticType is IHavePipelinedIndexType ? ((IHavePipelinedIndexType)ElasticType).Pipeline : null;
677689
if (documents.Count == 1) {
678690
var document = documents.Single();
679691
var response = await _client.IndexAsync(document, i => {
680692
i.OpType(isCreateOperation ? OpType.Create : OpType.Index);
681693
i.Type(ElasticType.Name);
694+
i.Pipeline(pipeline);
682695

683696
if (GetParentIdFunc != null)
684697
i.Parent(GetParentIdFunc(document));
@@ -687,8 +700,8 @@ private async Task IndexDocumentsAsync(IReadOnlyCollection<T> documents, bool is
687700
i.Index(GetDocumentIndexFunc(document));
688701

689702
if (HasVersion && isCreateOperation) {
690-
var versionDoc = (IVersioned)document;
691-
i.Version(versionDoc.Version);
703+
var versionedDoc = (IVersioned)document;
704+
i.Version(versionedDoc.Version);
692705
}
693706

694707
return i;
@@ -698,6 +711,9 @@ private async Task IndexDocumentsAsync(IReadOnlyCollection<T> documents, bool is
698711
if (!response.IsValid) {
699712
string message = response.GetErrorMessage();
700713
_logger.Error().Exception(response.OriginalException).Message(message).Property("request", response.GetRequest()).Write();
714+
if (response.ServerError.Status == 409)
715+
throw new DuplicateDocumentException(message, response.OriginalException);
716+
701717
throw new ApplicationException(message, response.OriginalException);
702718
}
703719

@@ -706,7 +722,30 @@ private async Task IndexDocumentsAsync(IReadOnlyCollection<T> documents, bool is
706722
versionDoc.Version = response.Version;
707723
}
708724
} else {
709-
var response = await _client.IndexManyAsync(documents, GetParentIdFunc, GetDocumentIndexFunc, ElasticType.Name, isCreateOperation).AnyContext();
725+
var bulkRequest = new BulkRequest();
726+
var list = documents.Select(d => {
727+
IBulkOperation o = isCreateOperation
728+
? (IBulkOperation)new BulkCreateOperation<T>(d) { Pipeline = pipeline }
729+
: new BulkIndexOperation<T>(d) { Pipeline = pipeline };
730+
731+
o.Type = ElasticType.Name;
732+
if (GetParentIdFunc != null)
733+
o.Parent = GetParentIdFunc(d);
734+
735+
if (GetDocumentIndexFunc != null)
736+
o.Index = GetDocumentIndexFunc(d);
737+
738+
if (HasVersion && !isCreateOperation) {
739+
var versionedDoc = (IVersioned)d;
740+
if (versionedDoc != null)
741+
o.Version = versionedDoc.Version;
742+
}
743+
744+
return o;
745+
}).ToList();
746+
bulkRequest.Operations = list;
747+
748+
var response = await _client.BulkAsync(bulkRequest).AnyContext();
710749
_logger.Trace(() => response.GetRequest());
711750

712751
if (HasVersion) {
@@ -739,6 +778,9 @@ private async Task IndexDocumentsAsync(IReadOnlyCollection<T> documents, bool is
739778
if (!response.IsValid) {
740779
string message = response.GetErrorMessage();
741780
_logger.Error().Exception(response.OriginalException).Message(message).Property("request", response.GetRequest()).Write();
781+
if (allErrors.Any(e => e.Status == 409))
782+
throw new DuplicateDocumentException(message, response.OriginalException);
783+
742784
throw new ApplicationException(message, response.OriginalException);
743785
}
744786
}

src/Foundatio.Repositories/Exceptions/DuplicateDocumentException.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@
22

33
namespace Foundatio.Repositories.Exceptions {
44
public class DuplicateDocumentException : ApplicationException {
5+
public DuplicateDocumentException(string message, Exception innerException) : base(message, innerException) {}
56
}
6-
}
7+
}

src/Foundatio.Repositories/ISearchableReadOnlyRepository.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using System.Collections.Generic;
2-
using System.Threading.Tasks;
1+
using System.Threading.Tasks;
32
using Foundatio.Repositories.Models;
43
using Foundatio.Repositories.Queries;
54

tests/Foundatio.Repositories.Elasticsearch.Tests/Foundatio.Repositories.Elasticsearch.Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@
155155
<Compile Include="Repositories\Models\Employee.cs" />
156156
<Compile Include="Repositories\Queries\AgeQuery.cs" />
157157
<Compile Include="Repositories\Queries\CompanyQuery.cs" />
158+
<Compile Include="PipelineTests.cs" />
158159
<Compile Include="VersionedTests.cs" />
159160
<Compile Include="RepositoryTests.cs" />
160161
<Compile Include="SearchableRepositoryTests.cs" />

0 commit comments

Comments
 (0)