Skip to content

Commit de916f6

Browse files
qmfrederikbrendandburns
authored andcommitted
Additional watcher tests (#178)
1 parent 7723604 commit de916f6

File tree

1 file changed

+207
-2
lines changed

1 file changed

+207
-2
lines changed

Diff for: tests/KubernetesClient.Tests/WatchTests.cs

+207-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class WatchTests
2626
private static readonly string MockModifiedStreamLine = BuildWatchEventStreamLine(WatchEventType.Modified);
2727
private static readonly string MockErrorStreamLine = BuildWatchEventStreamLine(WatchEventType.Error);
2828
private static readonly string MockBadStreamLine = "bad json";
29-
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(15);
29+
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(150);
3030

3131
private readonly ITestOutputHelper testOutput;
3232

@@ -241,8 +241,75 @@ public async Task WatchAllEvents()
241241
Host = server.Uri.ToString()
242242
});
243243

244-
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result;
244+
var listTask = await client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true);
245+
246+
var events = new HashSet<WatchEventType>();
247+
var errors = 0;
248+
249+
var watcher = listTask.Watch<V1Pod>(
250+
(type, item) =>
251+
{
252+
testOutput.WriteLine($"Watcher received '{type}' event.");
253+
254+
events.Add(type);
255+
eventsReceived.Signal();
256+
},
257+
error =>
258+
{
259+
testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error.");
260+
261+
errors += 1;
262+
eventsReceived.Signal();
263+
}
264+
);
265+
266+
// wait server yields all events
267+
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout));
268+
269+
Assert.True(
270+
eventsReceived.CurrentCount == 0,
271+
"Timed out waiting for all events / errors to be received."
272+
);
273+
274+
Assert.Contains(WatchEventType.Added, events);
275+
Assert.Contains(WatchEventType.Deleted, events);
276+
Assert.Contains(WatchEventType.Modified, events);
277+
Assert.Contains(WatchEventType.Error, events);
278+
279+
Assert.Equal(0, errors);
280+
281+
Assert.True(watcher.Watching);
282+
283+
serverShutdown.Set();
284+
}
285+
}
286+
287+
[Fact]
288+
public async Task WatchEventsWithTimeout()
289+
{
290+
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4 /* first line of response is eaten by WatcherDelegatingHandler */);
291+
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
292+
293+
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
294+
{
295+
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse);
296+
await Task.Delay(TimeSpan.FromSeconds(120)); // The default timeout is 100 seconds
297+
await WriteStreamLine(httpContext, MockAddedEventStreamLine);
298+
await WriteStreamLine(httpContext, MockDeletedStreamLine);
299+
await WriteStreamLine(httpContext, MockModifiedStreamLine);
300+
await WriteStreamLine(httpContext, MockErrorStreamLine);
301+
302+
// make server alive, cannot set to int.max as of it would block response
303+
await serverShutdown.WaitAsync();
304+
return false;
305+
}))
306+
{
307+
var client = new Kubernetes(new KubernetesClientConfiguration
308+
{
309+
Host = server.Uri.ToString()
310+
});
245311

312+
var listTask = await client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true);
246313

247314
var events = new HashSet<WatchEventType>();
248315
var errors = 0;
@@ -395,5 +462,143 @@ public async Task TestWatchWithHandlers()
395462
serverShutdown.Set();
396463
}
397464
}
465+
466+
[Fact]
467+
public async Task DirectWatchAllEvents()
468+
{
469+
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4);
470+
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
471+
472+
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
473+
{
474+
await WriteStreamLine(httpContext, MockAddedEventStreamLine);
475+
await WriteStreamLine(httpContext, MockDeletedStreamLine);
476+
await WriteStreamLine(httpContext, MockModifiedStreamLine);
477+
await WriteStreamLine(httpContext, MockErrorStreamLine);
478+
479+
// make server alive, cannot set to int.max as of it would block response
480+
await serverShutdown.WaitAsync();
481+
return false;
482+
}))
483+
{
484+
var client = new Kubernetes(new KubernetesClientConfiguration
485+
{
486+
Host = server.Uri.ToString()
487+
});
488+
489+
var events = new HashSet<WatchEventType>();
490+
var errors = 0;
491+
492+
var watcher = await client.WatchNamespacedPodAsync(
493+
name: "myPod",
494+
@namespace: "default",
495+
onEvent:
496+
(type, item) =>
497+
{
498+
testOutput.WriteLine($"Watcher received '{type}' event.");
499+
500+
events.Add(type);
501+
eventsReceived.Signal();
502+
},
503+
onError:
504+
error =>
505+
{
506+
testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error.");
507+
508+
errors += 1;
509+
eventsReceived.Signal();
510+
}
511+
);
512+
513+
// wait server yields all events
514+
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout));
515+
516+
Assert.True(
517+
eventsReceived.CurrentCount == 0,
518+
"Timed out waiting for all events / errors to be received."
519+
);
520+
521+
Assert.Contains(WatchEventType.Added, events);
522+
Assert.Contains(WatchEventType.Deleted, events);
523+
Assert.Contains(WatchEventType.Modified, events);
524+
Assert.Contains(WatchEventType.Error, events);
525+
526+
Assert.Equal(0, errors);
527+
528+
Assert.True(watcher.Watching);
529+
530+
serverShutdown.Set();
531+
}
532+
}
533+
534+
[Fact (Skip = "https://github.com/kubernetes-client/csharp/issues/165")]
535+
public async Task DirectWatchEventsWithTimeout()
536+
{
537+
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4);
538+
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
539+
540+
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
541+
{
542+
await Task.Delay(TimeSpan.FromSeconds(120)); // The default timeout is 100 seconds
543+
await WriteStreamLine(httpContext, MockAddedEventStreamLine);
544+
await WriteStreamLine(httpContext, MockDeletedStreamLine);
545+
await WriteStreamLine(httpContext, MockModifiedStreamLine);
546+
await WriteStreamLine(httpContext, MockErrorStreamLine);
547+
548+
// make server alive, cannot set to int.max as of it would block response
549+
await serverShutdown.WaitAsync();
550+
return false;
551+
}))
552+
{
553+
var client = new Kubernetes(new KubernetesClientConfiguration
554+
{
555+
Host = server.Uri.ToString()
556+
});
557+
558+
var events = new HashSet<WatchEventType>();
559+
var errors = 0;
560+
561+
var watcher = await client.WatchNamespacedPodAsync(
562+
name: "myPod",
563+
@namespace: "default",
564+
onEvent:
565+
(type, item) =>
566+
{
567+
testOutput.WriteLine($"Watcher received '{type}' event.");
568+
569+
events.Add(type);
570+
eventsReceived.Signal();
571+
},
572+
onError:
573+
error =>
574+
{
575+
testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error.");
576+
577+
errors += 1;
578+
eventsReceived.Signal();
579+
}
580+
);
581+
582+
// wait server yields all events
583+
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout));
584+
585+
Assert.True(
586+
eventsReceived.CurrentCount == 0,
587+
"Timed out waiting for all events / errors to be received."
588+
);
589+
590+
Assert.Contains(WatchEventType.Added, events);
591+
Assert.Contains(WatchEventType.Deleted, events);
592+
Assert.Contains(WatchEventType.Modified, events);
593+
Assert.Contains(WatchEventType.Error, events);
594+
595+
Assert.Equal(0, errors);
596+
597+
Assert.True(watcher.Watching);
598+
599+
serverShutdown.Set();
600+
}
601+
}
602+
398603
}
399604
}

0 commit comments

Comments
 (0)