Skip to content

Commit 282f121

Browse files
committed
Merge branch 'master' of github.com:Mpdreamz/NEST
2 parents 0ad797a + 8334a68 commit 282f121

File tree

1 file changed

+61
-46
lines changed

1 file changed

+61
-46
lines changed

Diff for: src/Nest/ElasticClient-Bulk.cs

+61-46
Original file line numberDiff line numberDiff line change
@@ -10,63 +10,78 @@
1010

1111
namespace Nest
1212
{
13-
public partial class ElasticClient
13+
using System.Threading.Tasks;
14+
15+
public partial class ElasticClient
1416
{
1517
public IBulkResponse Bulk(Func<BulkDescriptor, BulkDescriptor> bulkSelector)
1618
{
1719
bulkSelector.ThrowIfNull("bulkSelector");
1820
var bulkDescriptor = bulkSelector(new BulkDescriptor());
1921
return this.Bulk(bulkDescriptor);
2022
}
23+
24+
private void GenerateBulkPathAndJson(BulkDescriptor bulkDescriptor, out string json, out string path)
25+
{
26+
bulkDescriptor.ThrowIfNull("bulkDescriptor");
27+
bulkDescriptor._Operations.ThrowIfEmpty("Bulk descriptor does not define any operations");
28+
var sb = new StringBuilder();
29+
30+
foreach (var operation in bulkDescriptor._Operations)
31+
{
32+
var command = operation._Operation;
33+
var index = operation._Index ??
34+
bulkDescriptor._FixedIndex ??
35+
new IndexNameResolver(this._connectionSettings).GetIndexForType(operation._ClrType);
36+
var typeName = operation._Type
37+
?? bulkDescriptor._FixedType
38+
?? this.Infer.TypeName(operation._ClrType);
39+
40+
var id = operation.GetIdForObject(this.Infer);
41+
operation._Index = index;
42+
operation._Type = typeName;
43+
operation._Id = id;
44+
45+
var opJson = this.Serializer.Serialize(operation, Formatting.None);
46+
47+
var action = "{{ \"{0}\" : {1} }}\n".F(command, opJson);
48+
sb.Append(action);
49+
50+
if (command == "index" || command == "create")
51+
{
52+
string jsonCommand = this.Serializer.Serialize(operation._Object, Formatting.None);
53+
sb.Append(jsonCommand + "\n");
54+
}
55+
else if (command == "update")
56+
{
57+
string jsonCommand = this.Serializer.Serialize(operation.GetBody(), Formatting.None);
58+
sb.Append(jsonCommand + "\n");
59+
}
60+
}
61+
json = sb.ToString();
62+
path = "_bulk";
63+
if (!bulkDescriptor._FixedIndex.IsNullOrEmpty())
64+
{
65+
if (!bulkDescriptor._FixedType.IsNullOrEmpty())
66+
path = bulkDescriptor._FixedType + "/" + path;
67+
path = bulkDescriptor._FixedIndex + "/" + path;
68+
}
69+
}
2170
public IBulkResponse Bulk(BulkDescriptor bulkDescriptor)
2271
{
23-
bulkDescriptor.ThrowIfNull("bulkDescriptor");
24-
bulkDescriptor._Operations.ThrowIfEmpty("Bulk descriptor does not define any operations");
25-
var sb = new StringBuilder();
26-
27-
foreach (var operation in bulkDescriptor._Operations)
28-
{
29-
var command = operation._Operation;
30-
var index = operation._Index ??
31-
bulkDescriptor._FixedIndex ??
32-
new IndexNameResolver(this._connectionSettings).GetIndexForType(operation._ClrType);
33-
var typeName = operation._Type
34-
?? bulkDescriptor._FixedType
35-
?? this.Infer.TypeName(operation._ClrType);
36-
37-
var id = operation.GetIdForObject(this.Infer);
38-
operation._Index = index;
39-
operation._Type = typeName;
40-
operation._Id = id;
41-
42-
var opJson = this.Serializer.Serialize(operation, Formatting.None);
43-
44-
var action = "{{ \"{0}\" : {1} }}\n".F(command, opJson);
45-
sb.Append(action);
46-
47-
if (command == "index" || command == "create")
48-
{
49-
string jsonCommand = this.Serializer.Serialize(operation._Object, Formatting.None);
50-
sb.Append(jsonCommand + "\n");
51-
}
52-
else if (command == "update")
53-
{
54-
string jsonCommand = this.Serializer.Serialize(operation.GetBody(), Formatting.None);
55-
sb.Append(jsonCommand + "\n");
56-
}
57-
}
58-
var json = sb.ToString();
59-
var path = "_bulk";
60-
if (!bulkDescriptor._FixedIndex.IsNullOrEmpty())
61-
{
62-
if (!bulkDescriptor._FixedType.IsNullOrEmpty())
63-
path = bulkDescriptor._FixedType + "/" + path;
64-
path = bulkDescriptor._FixedIndex + "/" + path;
65-
}
72+
string json, path;
73+
this.GenerateBulkPathAndJson(bulkDescriptor, out json, out path);
6674
var status = this.Connection.PostSync(path, json);
6775
return this.Deserialize<BulkResponse>(status);
68-
}
69-
76+
}
77+
public Task<IBulkResponse> BulkAsync(BulkDescriptor bulkDescriptor)
78+
{
79+
string json, path;
80+
this.GenerateBulkPathAndJson(bulkDescriptor, out json, out path);
81+
var task = this.Connection.Post(path, json);
82+
return task.ContinueWith(t => (IBulkResponse)this.Deserialize<BulkResponse>(t.Result));
83+
}
84+
7085
internal string GenerateBulkIndexCommand<T>(IEnumerable<T> objects) where T : class
7186
{
7287
return this.GenerateBulkCommand<T>(@objects, "index");

0 commit comments

Comments
 (0)