Skip to content

Commit db3ce48

Browse files
committed
Pipeline crud tests, working on deserializing get pipeline response
1 parent 105b6e9 commit db3ce48

File tree

7 files changed

+292
-8
lines changed

7 files changed

+292
-8
lines changed

Diff for: src/Nest/Ingest/GetPipeline/GetPipelineResponse.cs

+15-6
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,29 @@
11
using Newtonsoft.Json;
22
using System.Collections.Generic;
3+
using System.Linq;
4+
using System;
35

46
namespace Nest
57
{
8+
[JsonObject(MemberSerialization.OptIn)]
69
public interface IGetPipelineResponse : IResponse
710
{
8-
Pipeline Source { get; }
11+
[JsonProperty("pipelines")]
12+
List<PipelineInfo> Pipelines { get; }
913
}
1014

11-
[JsonObject(MemberSerialization.OptIn)]
1215
public class GetPipelineResponse : ResponseBase, IGetPipelineResponse
1316
{
14-
[JsonProperty("_source")]
15-
public Pipeline Source { get; internal set; }
17+
public List<PipelineInfo> Pipelines { get; internal set; }
18+
}
19+
20+
[JsonObject(MemberSerialization.OptIn)]
21+
public class PipelineInfo
22+
{
23+
[JsonProperty("id")]
24+
public string Id { get; internal set; }
1625

17-
[JsonProperty("_version")]
18-
public int Version { get; internal set; }
26+
[JsonProperty("config")]
27+
public Pipeline Config { get; internal set; }
1928
}
2029
}

Diff for: src/Nest/Ingest/Pipeline.cs

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
namespace Nest
99
{
1010
[JsonObject(MemberSerialization.OptIn)]
11+
[JsonConverter(typeof(PipelineJsonConverter))]
1112
public interface IPipeline
1213
{
1314
[JsonProperty("description")]

Diff for: src/Nest/Ingest/PipelineJsonConverter.cs

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
using Newtonsoft.Json;
2+
using Newtonsoft.Json.Linq;
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Linq;
6+
using System.Text;
7+
using System.Threading.Tasks;
8+
9+
namespace Nest
10+
{
11+
internal class PipelineJsonConverter : JsonConverter
12+
{
13+
public override bool CanConvert(Type objectType) => true;
14+
public override bool CanWrite => false;
15+
public override bool CanRead => true;
16+
17+
public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
18+
{
19+
var root = JObject.Load(reader);
20+
var pipeline = new Pipeline();
21+
pipeline.Description = root["description"]?.ToString();
22+
if (root["processors"] != null)
23+
pipeline.Processors = GetProcessors(root["processors"], serializer);
24+
if (root["on_failure"] != null)
25+
pipeline.OnFailure = GetProcessors(root["on_failure"], serializer);
26+
return pipeline;
27+
}
28+
29+
private List<IProcessor> GetProcessors(JToken jsonProcessors, JsonSerializer serializer)
30+
{
31+
var processors = new List<IProcessor>();
32+
foreach (var jsonProcessor in jsonProcessors.ToArray())
33+
{
34+
var processorName = jsonProcessor.ToObject<JObject>().Properties().First().Name;
35+
switch (processorName)
36+
{
37+
case "append":
38+
processors.Add(jsonProcessor.ToObject<AppendProcessor>(serializer));
39+
break;
40+
case "convert":
41+
processors.Add(jsonProcessor.ToObject<ConvertProcessor>(serializer));
42+
break;
43+
case "date":
44+
processors.Add(jsonProcessor.ToObject<DateProcessor>(serializer));
45+
break;
46+
case "fail":
47+
processors.Add(jsonProcessor.ToObject<FailProcessor>(serializer));
48+
break;
49+
case "foreach":
50+
processors.Add(jsonProcessor.ToObject<ForeachProcessor>(serializer));
51+
break;
52+
case "grok":
53+
processors.Add(jsonProcessor.ToObject<GrokProcessor>(serializer));
54+
break;
55+
case "gsub":
56+
processors.Add(jsonProcessor.ToObject<GsubProcessor>(serializer));
57+
break;
58+
case "join":
59+
processors.Add(jsonProcessor.ToObject<JoinProcessor>(serializer));
60+
break;
61+
case "lowercase":
62+
processors.Add(jsonProcessor.ToObject<LowercaseProcessor>(serializer));
63+
break;
64+
case "remove":
65+
processors.Add(jsonProcessor.ToObject<RemoveProcessor>(serializer));
66+
break;
67+
case "rename":
68+
processors.Add(jsonProcessor.ToObject<RenameProcessor>(serializer));
69+
break;
70+
case "set":
71+
processors.Add(jsonProcessor.ToObject<SetProcessor>(serializer));
72+
break;
73+
case "split":
74+
processors.Add(jsonProcessor.ToObject<SplitProcessor>(serializer));
75+
break;
76+
case "trim":
77+
processors.Add(jsonProcessor.ToObject<TrimProcessor>(serializer));
78+
break;
79+
case "uppercase":
80+
processors.Add(jsonProcessor.ToObject<UppercaseProcessor>(serializer));
81+
break;
82+
default:
83+
break;
84+
}
85+
}
86+
return processors;
87+
}
88+
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
89+
{
90+
throw new NotSupportedException();
91+
}
92+
}
93+
}

Diff for: src/Nest/Nest.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -753,6 +753,7 @@
753753
<Compile Include="Ingest\GetPipeline\GetPipelineRequest.cs" />
754754
<Compile Include="Ingest\GetPipeline\GetPipelineResponse.cs" />
755755
<Compile Include="Ingest\Pipeline.cs" />
756+
<Compile Include="Ingest\PipelineJsonConverter.cs" />
756757
<Compile Include="Ingest\Processor.cs" />
757758
<Compile Include="Ingest\ProcessorJsonConverter.cs" />
758759
<Compile Include="Ingest\ProcessorsDescriptor.cs" />

Diff for: src/Tests/Ingest/PipelineCrudTests.cs

+179
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
using FluentAssertions;
2+
using Nest;
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Linq;
6+
using System.Text;
7+
using System.Threading.Tasks;
8+
using Tests.Framework;
9+
using Tests.Framework.Integration;
10+
using Tests.Framework.MockData;
11+
using Xunit;
12+
13+
namespace Tests.Ingest
14+
{
15+
[Collection(IntegrationContext.Indexing)]
16+
public class PipelineCrudTests
17+
: CrudTestBase<IPutPipelineResponse, IGetPipelineResponse, IPutPipelineResponse, IDeletePipelineResponse>
18+
{
19+
public PipelineCrudTests(IndexingCluster cluster, EndpointUsage usage) : base(cluster, usage) { }
20+
21+
protected override LazyResponses Create() => Calls<PutPipelineDescriptor, PutPipelineRequest, IPutPipelineRequest, IPutPipelineResponse>(
22+
CreateInitializer,
23+
CreateFluent,
24+
fluent: (s, c, f) => c.PutPipeline(s, f),
25+
fluentAsync: (s, c, f) => c.PutPipelineAsync(s, f),
26+
request: (s, c, r) => c.PutPipeline(r),
27+
requestAsync: (s, c, r) => c.PutPipelineAsync(r)
28+
);
29+
30+
protected override void ExpectAfterCreate(IGetPipelineResponse response)
31+
{
32+
response.Pipelines.Should().NotBeNull().And.HaveCount(1);
33+
34+
var pipeline = response.Pipelines.First();
35+
pipeline.Config.Should().NotBeNull();
36+
pipeline.Id.Should().NotBeNullOrEmpty();
37+
38+
var processors = pipeline.Config.Processors;
39+
processors.Should().NotBeNull().And.HaveCount(2);
40+
41+
var uppercase = processors.Where(p => p.Name == "uppercase").FirstOrDefault() as UppercaseProcessor;
42+
uppercase.Should().NotBeNull();
43+
uppercase.Field.Should().NotBeNull();
44+
45+
var set = processors.Where(p => p.Name == "set").FirstOrDefault() as SetProcessor;
46+
set.Should().NotBeNull();
47+
set.Field.Should().NotBeNull();
48+
set.Value.Should().NotBeNull();
49+
}
50+
51+
protected PutPipelineRequest CreateInitializer(string pipelineId) => new PutPipelineRequest(pipelineId)
52+
{
53+
Description = "Project Pipeline",
54+
Processors = new IProcessor[]
55+
{
56+
new UppercaseProcessor
57+
{
58+
Field = Infer.Field<Project>(p => p.State)
59+
},
60+
new SetProcessor
61+
{
62+
Field = Infer.Field<Project>(p => p.NumberOfCommits),
63+
Value = 0
64+
}
65+
}
66+
};
67+
68+
protected IPutPipelineRequest CreateFluent(string pipelineId, PutPipelineDescriptor d) => d
69+
.Description("Project Pipeline")
70+
.Processors(ps => ps
71+
.Uppercase<Project>(u => u
72+
.Field(p => p.State)
73+
)
74+
.Set<Project>(s => s
75+
.Field(p => p.NumberOfCommits)
76+
.Value(0)
77+
)
78+
);
79+
80+
protected override LazyResponses Read() => Calls<GetPipelineDescriptor, GetPipelineRequest, IGetPipelineRequest, IGetPipelineResponse>(
81+
GetInitializer,
82+
GetFluent,
83+
fluent: (s, c, f) => c.GetPipeline(s, f),
84+
fluentAsync: (s, c, f) => c.GetPipelineAsync(s, f),
85+
request: (s, c, r) => c.GetPipeline(r),
86+
requestAsync: (s, c, r) => c.GetPipelineAsync(r)
87+
);
88+
89+
protected GetPipelineRequest GetInitializer(string pipelineId) => new GetPipelineRequest(pipelineId);
90+
91+
protected IGetPipelineRequest GetFluent(string pipelineId, GetPipelineDescriptor d) => d;
92+
93+
protected override LazyResponses Update() => Calls<PutPipelineDescriptor, PutPipelineRequest, IPutPipelineRequest, IPutPipelineResponse>(
94+
UpdateInitializer,
95+
UpdateFluent,
96+
fluent: (s, c, f) => c.PutPipeline(s, f),
97+
fluentAsync: (s, c, f) => c.PutPipelineAsync(s, f),
98+
request: (s, c, r) => c.PutPipeline(r),
99+
requestAsync: (s, c, r) => c.PutPipelineAsync(r)
100+
);
101+
102+
protected PutPipelineRequest UpdateInitializer(string pipelineId) => new PutPipelineRequest(pipelineId)
103+
{
104+
Description = "Project Pipeline (updated)",
105+
Processors = new IProcessor[]
106+
{
107+
new UppercaseProcessor
108+
{
109+
Field = Infer.Field<Project>(p => p.State)
110+
},
111+
new SetProcessor
112+
{
113+
Field = Infer.Field<Project>(p => p.NumberOfCommits),
114+
Value = 500
115+
},
116+
new RenameProcessor
117+
{
118+
Field = Infer.Field<Project>(p => p.LeadDeveloper),
119+
TargetField = "techLead"
120+
}
121+
}
122+
};
123+
124+
protected IPutPipelineRequest UpdateFluent(string pipelineId, PutPipelineDescriptor d) => d
125+
.Description("Project Pipeline (updated)")
126+
.Processors(ps => ps
127+
.Uppercase<Project>(u => u
128+
.Field(p => p.State)
129+
)
130+
.Set<Project>(s => s
131+
.Field(p => p.NumberOfCommits)
132+
.Value(500)
133+
)
134+
.Rename<Project>(s => s
135+
.Field(p => p.LeadDeveloper)
136+
.TargetField("techLead")
137+
)
138+
);
139+
140+
protected override void ExpectAfterUpdate(IGetPipelineResponse response)
141+
{
142+
response.Pipelines.Should().NotBeNull().And.HaveCount(1);
143+
144+
var pipeline = response.Pipelines.First();
145+
pipeline.Config.Should().NotBeNull();
146+
pipeline.Id.Should().NotBeNullOrEmpty();
147+
148+
var processors = pipeline.Config.Processors;
149+
processors.Should().NotBeNull().And.HaveCount(3);
150+
151+
var uppercase = processors.Where(p => p.Name == "uppercase").FirstOrDefault() as UppercaseProcessor;
152+
uppercase.Should().NotBeNull();
153+
uppercase.Field.Should().NotBeNull();
154+
155+
var set = processors.Where(p => p.Name == "set").FirstOrDefault() as SetProcessor;
156+
set.Should().NotBeNull();
157+
set.Field.Should().NotBeNull();
158+
set.Value.Should().NotBeNull();
159+
160+
var rename = processors.Where(p => p.Name == "rename").FirstOrDefault() as RenameProcessor;
161+
rename.Should().NotBeNull();
162+
rename.Field.Should().NotBeNull();
163+
rename.TargetField.Should().NotBeNull();
164+
}
165+
166+
protected override LazyResponses Delete() => Calls<DeletePipelineDescriptor, DeletePipelineRequest, IDeletePipelineRequest, IDeletePipelineResponse>(
167+
DeleteInitializer,
168+
DeleteFluent,
169+
fluent: (s, c, f) => c.DeletePipeline(s, f),
170+
fluentAsync: (s, c, f) => c.DeletePipelineAsync(s, f),
171+
request: (s, c, r) => c.DeletePipeline(r),
172+
requestAsync: (s, c, r) => c.DeletePipelineAsync(r)
173+
);
174+
175+
protected DeletePipelineRequest DeleteInitializer(string pipelineId) => new DeletePipelineRequest(pipelineId);
176+
177+
protected IDeletePipelineRequest DeleteFluent(string pipelineId, DeletePipelineDescriptor d) => d;
178+
}
179+
}

Diff for: src/Tests/Tests.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@
336336
<Compile Include="Ingest\DeletePipeline\DeletePipelineUrlTests.cs" />
337337
<Compile Include="Ingest\GetPipeline\GetPipelineApiTests.cs" />
338338
<Compile Include="Ingest\GetPipeline\GetPipelineUrlTests.cs" />
339+
<Compile Include="Ingest\PipelineCrudTests.cs" />
339340
<Compile Include="Ingest\PutPipeline\PutPipelineApiTests.cs" />
340341
<Compile Include="Ingest\PutPipeline\PutPipelineUrlTests.cs" />
341342
<Compile Include="Ingest\SimulatePipeline\SImulatePipelineApiTests.cs" />

Diff for: src/Tests/tests.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# mode either u (unit test), i (integration test) or m (mixed mode)
2-
mode: u
2+
mode: m
33
# the elasticsearch version that should be started
4-
elasticsearch_version: 5.0.0-alpha1
4+
elasticsearch_version: 5.0.0-alpha2-SNAPSHOT
55
# whether we want to forcefully reseed on the node, if you are starting the tests with a node already running
66
force_reseed: true
77
# do not spawn nodes as part of the test setup but rely on a manually started es node being up

0 commit comments

Comments
 (0)