forked from elastic/elasticsearch-net
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathElasticsearchDefaultSerializer.cs
146 lines (132 loc) · 3.95 KB
/
ElasticsearchDefaultSerializer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
namespace Elasticsearch.Net.Serialization
{
public class ElasticsearchDefaultSerializer : IElasticsearchSerializer
{
private static readonly ElasticsearchNetJsonStrategy Strategy = new ElasticsearchNetJsonStrategy();
public T Deserialize<T>(byte[] bytes) where T : class
{
return SimpleJson.DeserializeObject<T>(bytes.Utf8String(), ElasticsearchDefaultSerializer.Strategy);
}
public T Deserialize<T>(Stream stream)
{
if (stream == null)
return default(T);
using (var ms = new MemoryStream())
{
stream.CopyTo(ms);
byte[] buffer = ms.ToArray();
if (buffer.Length <= 1)
return default(T);
return SimpleJson.DeserializeObject<T>(buffer.Utf8String(), ElasticsearchDefaultSerializer.Strategy);
}
}
public Task<T> DeserializeAsync<T>(Stream stream)
{
var tcs = new TaskCompletionSource<T>();
if (stream == null)
{
tcs.SetResult(default(T));
return tcs.Task;
}
// return a task that reads the stream asynchronously
// and finally deserializes the result to T.
this.Iterate<T>(ReadStreamAsync(stream, tcs), tcs);
return tcs.Task;
}
public IEnumerable<Task> ReadStreamAsync<T>(Stream stream, TaskCompletionSource<T> tcs)
{
var ms = stream as MemoryStream;
string json = null;
if (ms != null && ms.Position > 0)
{
json = ms.ToArray().Utf8String();
ms.Close();
}
else
{
using (ms = new MemoryStream())
using (stream)
{
var buffer = new byte[BUFFER_SIZE];
while (stream != null)
{
var read = Task<int>.Factory.FromAsync(stream.BeginRead, stream.EndRead, buffer, 0, BUFFER_SIZE, null);
yield return read;
if (read.Result == 0) break;
ms.Write(buffer, 0, read.Result);
}
json = ms.ToArray().Utf8String();
}
}
var r = SimpleJson.DeserializeObject<T>(json, ElasticsearchDefaultSerializer.Strategy);
tcs.SetResult(r);
}
const int BUFFER_SIZE = 1024;
public void Iterate<T>(IEnumerable<Task> asyncIterator, TaskCompletionSource<T> tcs)
{
var enumerator = asyncIterator.GetEnumerator();
Action<Task> recursiveBody = null;
recursiveBody = completedTask =>
{
if (completedTask != null && completedTask.IsFaulted)
{
//none of the individual steps in _AsyncSteps run in parallel for 1 request
//as this would be impossible we can assume Aggregate Exception.InnerException
var exception = completedTask.Exception.InnerException;
tcs.TrySetException(exception);
enumerator.Dispose();
}
else if (enumerator.MoveNext())
{
//enumerator.Current.ContinueWith(recursiveBody, TaskContinuationOptions.ExecuteSynchronously);
enumerator.Current.ContinueWith(recursiveBody);
}
else
{
enumerator.Dispose();
}
};
recursiveBody(null);
}
public byte[] Serialize(object data, SerializationFormatting formatting = SerializationFormatting.Indented)
{
var serialized = SimpleJson.SerializeObject(data);
if (formatting == SerializationFormatting.None)
serialized = RemoveNewLinesAndTabs(serialized);
return serialized.Utf8Bytes();
}
public string Stringify(object valueType)
{
return ElasticsearchDefaultSerializer.DefaultStringify(valueType);
}
public static string DefaultStringify(object valueType)
{
var s = valueType as string;
if (s != null)
return s;
var ss = valueType as string[];
if (ss != null)
return string.Join(",", ss);
var pns = valueType as IEnumerable<object>;
if (pns != null)
return string.Join(",", pns);
var e = valueType as Enum;
if (e != null) return KnownEnums.Resolve(e);
if (valueType is bool)
return ((bool) valueType) ? "true" : "false";
return valueType.ToString();
}
public static string RemoveNewLinesAndTabs(string input)
{
return new string(input
.Where(c => c != '\r' && c != '\n')
.ToArray());
}
}
}