Skip to content

Commit 43b58bc

Browse files
committed
Merge pull request #1440 from elastic/1433-support-generic-reindex
Support reindex with to better syntax
2 parents 197901b + 7cd053a commit 43b58bc

File tree

5 files changed

+142
-116
lines changed

5 files changed

+142
-116
lines changed

Diff for: src/Nest/DSL/Reindex/ReindexDescriptor.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ public class ReindexDescriptor<T> where T : class
66
internal string _ToIndexName { get; set; }
77
internal string _FromIndexName { get; set; }
88
internal string _Scroll { get; set; }
9-
internal int? _Size { get; set; }
9+
internal int? _Size { get; set; }
10+
internal bool _allTypes { get; set; }
1011

1112
internal Func<QueryDescriptor<T>, QueryContainer> _QuerySelector { get; set; }
1213

Diff for: src/Nest/Domain/Observers/ReindexObservable.cs

+120-113
Original file line numberDiff line numberDiff line change
@@ -1,115 +1,122 @@
1-
using System;
2-
using Elasticsearch.Net;
3-
4-
namespace Nest
5-
{
6-
public class ReindexObservable<T> : IDisposable, IObservable<IReindexResponse<T>> where T : class
7-
{
8-
private ReindexDescriptor<T> _reindexDescriptor;
9-
private readonly IConnectionSettingsValues _connectionSettings;
10-
internal IElasticClient CurrentClient { get; set; }
11-
internal ReindexDescriptor<T> ReindexDescriptor { get; set; }
12-
13-
public ReindexObservable(IElasticClient client, IConnectionSettingsValues connectionSettings, ReindexDescriptor<T> reindexDescriptor)
14-
{
15-
this._connectionSettings = connectionSettings;
16-
this._reindexDescriptor = reindexDescriptor;
17-
this.CurrentClient = client;
18-
}
19-
20-
public IDisposable Subscribe(IObserver<IReindexResponse<T>> observer)
21-
{
22-
observer.ThrowIfNull("observer");
23-
try
24-
{
25-
this.Reindex(observer);
26-
}
27-
catch (Exception e)
28-
{
29-
observer.OnError(e);
30-
}
31-
return this;
32-
33-
}
34-
35-
private void Reindex(IObserver<IReindexResponse<T>> observer)
36-
{
37-
var fromIndex = this._reindexDescriptor._FromIndexName;
38-
var toIndex = this._reindexDescriptor._ToIndexName;
1+
using System;
2+
using Elasticsearch.Net;
3+
4+
namespace Nest
5+
{
6+
public class ReindexObservable<T> : IDisposable, IObservable<IReindexResponse<T>> where T : class
7+
{
8+
private ReindexDescriptor<T> _reindexDescriptor;
9+
private readonly IConnectionSettingsValues _connectionSettings;
10+
internal IElasticClient CurrentClient { get; set; }
11+
internal ReindexDescriptor<T> ReindexDescriptor { get; set; }
12+
13+
public ReindexObservable(IElasticClient client, IConnectionSettingsValues connectionSettings, ReindexDescriptor<T> reindexDescriptor)
14+
{
15+
this._connectionSettings = connectionSettings;
16+
this._reindexDescriptor = reindexDescriptor;
17+
this.CurrentClient = client;
18+
}
19+
20+
public IDisposable Subscribe(IObserver<IReindexResponse<T>> observer)
21+
{
22+
observer.ThrowIfNull("observer");
23+
try
24+
{
25+
this.Reindex(observer);
26+
}
27+
catch (Exception e)
28+
{
29+
observer.OnError(e);
30+
}
31+
return this;
32+
33+
}
34+
35+
private void Reindex(IObserver<IReindexResponse<T>> observer)
36+
{
37+
var fromIndex = this._reindexDescriptor._FromIndexName;
38+
var toIndex = this._reindexDescriptor._ToIndexName;
3939
var scroll = this._reindexDescriptor._Scroll ?? "2m";
40-
var size = this._reindexDescriptor._Size ?? 100;
41-
42-
fromIndex.ThrowIfNullOrEmpty("fromIndex");
43-
toIndex.ThrowIfNullOrEmpty("toIndex");
44-
45-
var indexSettings = this.CurrentClient.GetIndexSettings(i=>i.Index(this._reindexDescriptor._FromIndexName));
46-
Func<CreateIndexDescriptor, CreateIndexDescriptor> settings =
47-
this._reindexDescriptor._CreateIndexSelector ?? ((ci) => ci);
48-
49-
var createIndexResponse = this.CurrentClient.CreateIndex(
50-
toIndex, (c) => settings(c.InitializeUsing(indexSettings.IndexSettings)));
51-
if (!createIndexResponse.IsValid)
52-
throw new ReindexException(createIndexResponse.ConnectionStatus);
53-
54-
var page = 0;
55-
var searchResult = this.CurrentClient.Search<T>(
56-
s => s
57-
.Index(fromIndex)
58-
.AllTypes()
59-
.From(0)
60-
.Size(size)
61-
.Query(this._reindexDescriptor._QuerySelector ?? (q=>q.MatchAll()))
62-
.SearchType(SearchType.Scan)
63-
.Scroll(scroll)
64-
);
65-
if (searchResult.Total <= 0)
66-
throw new ReindexException(searchResult.ConnectionStatus, "index " + fromIndex + " has no documents!");
67-
IBulkResponse indexResult = null;
68-
do
69-
{
70-
var result = searchResult;
71-
searchResult = this.CurrentClient.Scroll<T>(s => s
72-
.Scroll(scroll)
73-
.ScrollId(result.ScrollId)
74-
);
75-
if (searchResult.Documents.HasAny())
76-
indexResult = this.IndexSearchResults(searchResult, observer, toIndex, page);
77-
page++;
78-
} while (searchResult.IsValid && indexResult != null && indexResult.IsValid && searchResult.Documents.HasAny());
79-
80-
81-
observer.OnCompleted();
82-
}
83-
84-
public IBulkResponse IndexSearchResults(ISearchResponse<T> searchResult,IObserver<IReindexResponse<T>> observer, string toIndex, int page)
85-
{
86-
if (!searchResult.IsValid)
87-
throw new ReindexException(searchResult.ConnectionStatus, "reindex failed on scroll #" + page);
88-
89-
var bb = new BulkDescriptor();
90-
foreach (var d in searchResult.Hits)
91-
{
92-
IHit<T> d1 = d;
93-
bb.Index<T>(bi => bi.Document(d1.Source).Type(d1.Type).Index(toIndex).Id(d.Id));
94-
}
95-
96-
var indexResult = this.CurrentClient.Bulk(b=>bb);
97-
if (!indexResult.IsValid)
98-
throw new ReindexException(indexResult.ConnectionStatus, "reindex failed when indexing page " + page);
99-
100-
observer.OnNext(new ReindexResponse<T>()
101-
{
102-
BulkResponse = indexResult,
103-
SearchResponse = searchResult,
104-
Scroll = page
105-
});
106-
return indexResult;
107-
}
108-
109-
110-
public void Dispose()
111-
{
112-
113-
}
114-
}
40+
var size = this._reindexDescriptor._Size ?? 100;
41+
var allTypes = this._reindexDescriptor._allTypes;
42+
43+
fromIndex.ThrowIfNullOrEmpty("fromIndex");
44+
toIndex.ThrowIfNullOrEmpty("toIndex");
45+
46+
var indexSettings = this.CurrentClient.GetIndexSettings(i => i.Index(this._reindexDescriptor._FromIndexName));
47+
Func<CreateIndexDescriptor, CreateIndexDescriptor> settings =
48+
this._reindexDescriptor._CreateIndexSelector ?? ((ci) => ci);
49+
50+
var createIndexResponse = this.CurrentClient.CreateIndex(
51+
toIndex, (c) => settings(c.InitializeUsing(indexSettings.IndexSettings)));
52+
if (!createIndexResponse.IsValid)
53+
throw new ReindexException(createIndexResponse.ConnectionStatus);
54+
55+
var page = 0;
56+
Func<SearchDescriptor<T>, SearchDescriptor<T>> searchDescriptor = s => s.Index(fromIndex);
57+
58+
if (allTypes)
59+
searchDescriptor = s => searchDescriptor(s).AllTypes();
60+
else
61+
searchDescriptor = s => searchDescriptor(s).Type<T>();
62+
63+
64+
var searchResult = this.CurrentClient.Search<T>(
65+
s => searchDescriptor(s)
66+
.From(0)
67+
.Size(size)
68+
.Query(this._reindexDescriptor._QuerySelector ?? (q => q.MatchAll()))
69+
.SearchType(SearchType.Scan)
70+
.Scroll(scroll));
71+
72+
if (searchResult.Total <= 0)
73+
throw new ReindexException(searchResult.ConnectionStatus, string.Format("index {0} has no documents!", fromIndex));
74+
IBulkResponse indexResult = null;
75+
do
76+
{
77+
var result = searchResult;
78+
searchResult = this.CurrentClient.Scroll<T>(s => s
79+
.Scroll(scroll)
80+
.ScrollId(result.ScrollId)
81+
);
82+
if (searchResult.Documents.HasAny())
83+
indexResult = this.IndexSearchResults(searchResult, observer, toIndex, page);
84+
page++;
85+
} while (searchResult.IsValid && indexResult != null && indexResult.IsValid && searchResult.Documents.HasAny());
86+
87+
88+
observer.OnCompleted();
89+
}
90+
91+
public IBulkResponse IndexSearchResults(ISearchResponse<T> searchResult, IObserver<IReindexResponse<T>> observer, string toIndex, int page)
92+
{
93+
if (!searchResult.IsValid)
94+
throw new ReindexException(searchResult.ConnectionStatus, "reindex failed on scroll #" + page);
95+
96+
var bb = new BulkDescriptor();
97+
foreach (var d in searchResult.Hits)
98+
{
99+
IHit<T> d1 = d;
100+
bb.Index<T>(bi => bi.Document(d1.Source).Type(d1.Type).Index(toIndex).Id(d.Id));
101+
}
102+
103+
var indexResult = this.CurrentClient.Bulk(b => bb);
104+
if (!indexResult.IsValid)
105+
throw new ReindexException(indexResult.ConnectionStatus, "reindex failed when indexing page " + page);
106+
107+
observer.OnNext(new ReindexResponse<T>
108+
{
109+
BulkResponse = indexResult,
110+
SearchResponse = searchResult,
111+
Scroll = page
112+
});
113+
return indexResult;
114+
}
115+
116+
117+
public void Dispose()
118+
{
119+
120+
}
121+
}
115122
}

Diff for: src/Nest/ElasticClient-Reindex.cs

+12
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,17 @@ public IObservable<IReindexResponse<T>> Reindex<T>(Func<ReindexDescriptor<T>, Re
1414
var observable = new ReindexObservable<T>(this, _connectionSettings, reindexDescriptor);
1515
return observable;
1616
}
17+
18+
/// <inheritdoc />
19+
public IObservable<IReindexResponse<IDocument>> Reindex(Func<ReindexDescriptor<IDocument>, ReindexDescriptor<IDocument>> reindexSelector)
20+
{
21+
reindexSelector.ThrowIfNull("reindexSelector");
22+
var reindexDescriptor = reindexSelector(new ReindexDescriptor<IDocument>());
23+
24+
reindexDescriptor._allTypes = true;
25+
26+
var observable = new ReindexObservable<IDocument>(this, _connectionSettings, reindexDescriptor);
27+
return observable;
28+
}
1729
}
1830
}

Diff for: src/Nest/IElasticClient.cs

+6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ public interface IElasticClient
1919
IObservable<IReindexResponse<T>> Reindex<T>(Func<ReindexDescriptor<T>, ReindexDescriptor<T>> reindexSelector)
2020
where T : class;
2121

22+
/// <summary>
23+
/// Helper method that allows you to reindex from one index into another using SCAN and SCROLL.
24+
/// </summary>
25+
/// <returns>An IObservable you can subscribe to to listen to the progress of the reindexation process</returns>
26+
IObservable<IReindexResponse<IDocument>> Reindex(Func<ReindexDescriptor<IDocument>, ReindexDescriptor<IDocument>> reindexSelector);
27+
2228
/// <summary>
2329
/// A search request can be scrolled by specifying the scroll parameter.
2430
/// <para>The scroll parameter is a time value parameter (for example: scroll=5m),

Diff for: src/Tests/Nest.Tests.Integration/Index/ReindexTests.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public void ReindexMinimal()
4242
public void Reindex()
4343
{
4444
var toIndex = ElasticsearchConfiguration.NewUniqueIndexName();
45-
var observable = this.Client.Reindex<object>(r => r
45+
var observable = this.Client.Reindex(r => r
4646
.FromIndex(ElasticsearchConfiguration.DefaultIndex)
4747
.ToIndex(toIndex)
4848
.Query(q=>q.MatchAll())
@@ -53,7 +53,7 @@ public void Reindex()
5353
.NumberOfShards(1)
5454
)
5555
);
56-
var observer = new ReindexObserver<object>(
56+
var observer = new ReindexObserver<IDocument>(
5757
onNext: (r) =>
5858
{
5959
var scrollResults = r.SearchResponse;

0 commit comments

Comments
 (0)