Skip to content

Commit d1f0f42

Browse files
Gabriel MoskoviczGabriel Moskovicz
Gabriel Moskovicz
authored and
Gabriel Moskovicz
committed
Small Refactor
1 parent 54434c3 commit d1f0f42

File tree

1 file changed

+119
-119
lines changed

1 file changed

+119
-119
lines changed

Diff for: src/Nest/Domain/Observers/ReindexObservable.cs

+119-119
Original file line numberDiff line numberDiff line change
@@ -1,125 +1,125 @@
1-
using System;
2-
using Elasticsearch.Net;
3-
4-
namespace Nest
5-
{
6-
public class ReindexObservable<T> : IDisposable, IObservable<IReindexResponse<T>> where T : class
7-
{
8-
private ReindexDescriptor<T> _reindexDescriptor;
9-
private readonly IConnectionSettingsValues _connectionSettings;
10-
internal IElasticClient CurrentClient { get; set; }
11-
internal ReindexDescriptor<T> ReindexDescriptor { get; set; }
12-
13-
public ReindexObservable(IElasticClient client, IConnectionSettingsValues connectionSettings, ReindexDescriptor<T> reindexDescriptor)
14-
{
15-
this._connectionSettings = connectionSettings;
16-
this._reindexDescriptor = reindexDescriptor;
17-
this.CurrentClient = client;
18-
}
19-
20-
public IDisposable Subscribe(IObserver<IReindexResponse<T>> observer)
21-
{
22-
observer.ThrowIfNull("observer");
23-
try
24-
{
25-
this.Reindex(observer);
26-
}
27-
catch (Exception e)
28-
{
29-
observer.OnError(e);
30-
}
31-
return this;
32-
33-
}
34-
35-
private void Reindex(IObserver<IReindexResponse<T>> observer)
36-
{
37-
var fromIndex = this._reindexDescriptor._FromIndexName;
38-
var toIndex = this._reindexDescriptor._ToIndexName;
1+
using System;
2+
using Elasticsearch.Net;
3+
4+
namespace Nest
5+
{
6+
public class ReindexObservable<T> : IDisposable, IObservable<IReindexResponse<T>> where T : class
7+
{
8+
private ReindexDescriptor<T> _reindexDescriptor;
9+
private readonly IConnectionSettingsValues _connectionSettings;
10+
internal IElasticClient CurrentClient { get; set; }
11+
internal ReindexDescriptor<T> ReindexDescriptor { get; set; }
12+
13+
public ReindexObservable(IElasticClient client, IConnectionSettingsValues connectionSettings, ReindexDescriptor<T> reindexDescriptor)
14+
{
15+
this._connectionSettings = connectionSettings;
16+
this._reindexDescriptor = reindexDescriptor;
17+
this.CurrentClient = client;
18+
}
19+
20+
public IDisposable Subscribe(IObserver<IReindexResponse<T>> observer)
21+
{
22+
observer.ThrowIfNull("observer");
23+
try
24+
{
25+
this.Reindex(observer);
26+
}
27+
catch (Exception e)
28+
{
29+
observer.OnError(e);
30+
}
31+
return this;
32+
33+
}
34+
35+
private void Reindex(IObserver<IReindexResponse<T>> observer)
36+
{
37+
var fromIndex = this._reindexDescriptor._FromIndexName;
38+
var toIndex = this._reindexDescriptor._ToIndexName;
3939
var scroll = this._reindexDescriptor._Scroll ?? "2m";
40-
var size = this._reindexDescriptor._Size ?? 100;
41-
42-
fromIndex.ThrowIfNullOrEmpty("fromIndex");
43-
toIndex.ThrowIfNullOrEmpty("toIndex");
44-
45-
var indexSettings = this.CurrentClient.GetIndexSettings(i=>i.Index(this._reindexDescriptor._FromIndexName));
46-
Func<CreateIndexDescriptor, CreateIndexDescriptor> settings =
47-
this._reindexDescriptor._CreateIndexSelector ?? ((ci) => ci);
48-
49-
var createIndexResponse = this.CurrentClient.CreateIndex(
50-
toIndex, (c) => settings(c.InitializeUsing(indexSettings.IndexSettings)));
51-
if (!createIndexResponse.IsValid)
52-
throw new ReindexException(createIndexResponse.ConnectionStatus);
53-
40+
var size = this._reindexDescriptor._Size ?? 100;
41+
42+
fromIndex.ThrowIfNullOrEmpty("fromIndex");
43+
toIndex.ThrowIfNullOrEmpty("toIndex");
44+
45+
var indexSettings = this.CurrentClient.GetIndexSettings(i => i.Index(this._reindexDescriptor._FromIndexName));
46+
Func<CreateIndexDescriptor, CreateIndexDescriptor> settings =
47+
this._reindexDescriptor._CreateIndexSelector ?? ((ci) => ci);
48+
49+
var createIndexResponse = this.CurrentClient.CreateIndex(
50+
toIndex, (c) => settings(c.InitializeUsing(indexSettings.IndexSettings)));
51+
if (!createIndexResponse.IsValid)
52+
throw new ReindexException(createIndexResponse.ConnectionStatus);
53+
5454
var page = 0;
55-
Func<SearchDescriptor<T>, SearchDescriptor<T>> searchDescriptor = s => s.Index(fromIndex);
56-
57-
if (typeof (T).Name.Equals(typeof (object).Name))
58-
{
59-
searchDescriptor = s => searchDescriptor(s).AllTypes();
60-
}
55+
Func<SearchDescriptor<T>, SearchDescriptor<T>> searchDescriptor = s => s.Index(fromIndex);
56+
57+
if (typeof(T).Name.Equals(typeof(object).Name))
58+
{
59+
searchDescriptor = s => searchDescriptor(s).AllTypes();
60+
}
6161
else
6262
{
63-
searchDescriptor = s => searchDescriptor(s).Type<T>();
64-
}
65-
66-
63+
searchDescriptor = s => searchDescriptor(s).Type<T>();
64+
}
65+
66+
6767
var searchResult = this.CurrentClient.Search<T>(
68-
s => searchDescriptor(s)
69-
.From(0)
70-
.Size(size)
71-
.Query(this._reindexDescriptor._QuerySelector ?? (q=>q.MatchAll()))
72-
.SearchType(SearchType.Scan)
73-
.Scroll(scroll));
74-
75-
if (searchResult.Total <= 0)
76-
throw new ReindexException(searchResult.ConnectionStatus, "index " + fromIndex + " has no documents!");
77-
IBulkResponse indexResult = null;
78-
do
79-
{
80-
var result = searchResult;
81-
searchResult = this.CurrentClient.Scroll<T>(s => s
82-
.Scroll(scroll)
83-
.ScrollId(result.ScrollId)
84-
);
85-
if (searchResult.Documents.HasAny())
86-
indexResult = this.IndexSearchResults(searchResult, observer, toIndex, page);
87-
page++;
88-
} while (searchResult.IsValid && indexResult != null && indexResult.IsValid && searchResult.Documents.HasAny());
89-
90-
91-
observer.OnCompleted();
92-
}
93-
94-
public IBulkResponse IndexSearchResults(ISearchResponse<T> searchResult,IObserver<IReindexResponse<T>> observer, string toIndex, int page)
95-
{
96-
if (!searchResult.IsValid)
97-
throw new ReindexException(searchResult.ConnectionStatus, "reindex failed on scroll #" + page);
98-
99-
var bb = new BulkDescriptor();
100-
foreach (var d in searchResult.Hits)
101-
{
102-
IHit<T> d1 = d;
103-
bb.Index<T>(bi => bi.Document(d1.Source).Type(d1.Type).Index(toIndex).Id(d.Id));
104-
}
105-
106-
var indexResult = this.CurrentClient.Bulk(b=>bb);
107-
if (!indexResult.IsValid)
108-
throw new ReindexException(indexResult.ConnectionStatus, "reindex failed when indexing page " + page);
109-
110-
observer.OnNext(new ReindexResponse<T>()
111-
{
112-
BulkResponse = indexResult,
113-
SearchResponse = searchResult,
114-
Scroll = page
115-
});
116-
return indexResult;
117-
}
118-
119-
120-
public void Dispose()
121-
{
122-
123-
}
124-
}
68+
s => searchDescriptor(s)
69+
.From(0)
70+
.Size(size)
71+
.Query(this._reindexDescriptor._QuerySelector ?? (q => q.MatchAll()))
72+
.SearchType(SearchType.Scan)
73+
.Scroll(scroll));
74+
75+
if (searchResult.Total <= 0)
76+
throw new ReindexException(searchResult.ConnectionStatus, string.Format("index {0} has no documents!", fromIndex));
77+
IBulkResponse indexResult = null;
78+
do
79+
{
80+
var result = searchResult;
81+
searchResult = this.CurrentClient.Scroll<T>(s => s
82+
.Scroll(scroll)
83+
.ScrollId(result.ScrollId)
84+
);
85+
if (searchResult.Documents.HasAny())
86+
indexResult = this.IndexSearchResults(searchResult, observer, toIndex, page);
87+
page++;
88+
} while (searchResult.IsValid && indexResult != null && indexResult.IsValid && searchResult.Documents.HasAny());
89+
90+
91+
observer.OnCompleted();
92+
}
93+
94+
public IBulkResponse IndexSearchResults(ISearchResponse<T> searchResult, IObserver<IReindexResponse<T>> observer, string toIndex, int page)
95+
{
96+
if (!searchResult.IsValid)
97+
throw new ReindexException(searchResult.ConnectionStatus, "reindex failed on scroll #" + page);
98+
99+
var bb = new BulkDescriptor();
100+
foreach (var d in searchResult.Hits)
101+
{
102+
IHit<T> d1 = d;
103+
bb.Index<T>(bi => bi.Document(d1.Source).Type(d1.Type).Index(toIndex).Id(d.Id));
104+
}
105+
106+
var indexResult = this.CurrentClient.Bulk(b => bb);
107+
if (!indexResult.IsValid)
108+
throw new ReindexException(indexResult.ConnectionStatus, "reindex failed when indexing page " + page);
109+
110+
observer.OnNext(new ReindexResponse<T>
111+
{
112+
BulkResponse = indexResult,
113+
SearchResponse = searchResult,
114+
Scroll = page
115+
});
116+
return indexResult;
117+
}
118+
119+
120+
public void Dispose()
121+
{
122+
123+
}
124+
}
125125
}

0 commit comments

Comments
 (0)