Skip to content
This repository was archived by the owner on Oct 28, 2021. It is now read-only.

Commit 871690b

Browse files
authored
Merge pull request #11 from jforsell/firehose_api
Firehose API
2 parents 8227db0 + 8dbd25c commit 871690b

File tree

4 files changed

+342
-0
lines changed

4 files changed

+342
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
// Licensed to the Symphony Software Foundation (SSF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The SSF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
namespace SymphonyOSS.RestApiClient.Api.AgentApi
19+
{
20+
using System;
21+
using System.Collections.Generic;
22+
using System.Diagnostics;
23+
using System.Threading.Tasks;
24+
using Authentication;
25+
using Generated.Json;
26+
using Generated.OpenApi.AgentApi.Client;
27+
using Generated.OpenApi.AgentApi.Model;
28+
29+
/// <summary>
30+
/// Provides an event-based firehose of a pod's incoming messages.
31+
/// Encapsulates <see cref="Generated.OpenApi.AgentApi.Api.FirehoseApi"/>,
32+
/// adding authentication token management and a custom execution strategy.
33+
/// </summary>
34+
public class FirehoseApi
35+
{
36+
private static readonly TraceSource _traceSource = new TraceSource("SymphonyOSS.RestApiClient");
37+
38+
private readonly Generated.OpenApi.AgentApi.Api.IFirehoseApi _firehoseApi;
39+
40+
private readonly IAuthTokens _authTokens;
41+
42+
private readonly IApiExecutor _apiExecutor;
43+
44+
private readonly Dictionary<EventHandler<MessageEventArgs>, Task> _tasks = new Dictionary<EventHandler<MessageEventArgs>, Task>();
45+
46+
private volatile bool _shouldStop;
47+
48+
static FirehoseApi()
49+
{
50+
JsonSubtypeConverter.Register(typeof(V2Message));
51+
}
52+
53+
/// <summary>
54+
/// Initializes a new instance of the <see cref="FirehoseApi" /> class.
55+
/// See <see cref="Factories.AgentApiFactory"/> for conveniently constructing
56+
/// an instance.
57+
/// </summary>
58+
/// <param name="authTokens">Authentication tokens.</param>
59+
/// <param name="configuration">Api configuration.</param>
60+
/// <param name="apiExecutor">Execution strategy.</param>
61+
public FirehoseApi(IAuthTokens authTokens, Configuration configuration, IApiExecutor apiExecutor)
62+
{
63+
_firehoseApi = new Generated.OpenApi.AgentApi.Api.FirehoseApi(configuration);
64+
_authTokens = authTokens;
65+
_apiExecutor = apiExecutor;
66+
}
67+
68+
private event EventHandler<MessageEventArgs> _onMessage;
69+
public event EventHandler<MessageEventArgs> OnMessage
70+
{
71+
add
72+
{
73+
_onMessage += value;
74+
}
75+
remove
76+
{
77+
_onMessage -= value;
78+
lock (_tasks)
79+
{
80+
_tasks.Remove(value);
81+
}
82+
}
83+
}
84+
85+
/// <summary>
86+
/// Starts listening, notifying event handlers about incoming messages. Blocks
87+
/// until <see cref="Stop"/> is invoked.
88+
/// </summary>
89+
public void Listen()
90+
{
91+
_shouldStop = false;
92+
var firehose = CreateFirehose();
93+
while (!_shouldStop)
94+
{
95+
var messageList = ReadFirehose(ref firehose);
96+
if (_shouldStop)
97+
{
98+
// Don't process messages if the user has stopped listening.
99+
break;
100+
}
101+
102+
if (messageList == null || _onMessage == null)
103+
{
104+
continue;
105+
}
106+
107+
foreach (var eventHandler in _onMessage.GetInvocationList())
108+
{
109+
NotifyAsync((EventHandler<MessageEventArgs>)eventHandler, messageList);
110+
}
111+
}
112+
}
113+
114+
/// <summary>
115+
/// Requests that <see cref="Listen"/> should stop blocking and return control
116+
/// to the calling thread. Calling <see cref="Stop"/> will not immediately return
117+
/// control, but wait for the current outstanding request to complete.
118+
/// </summary>
119+
public void Stop()
120+
{
121+
_shouldStop = true;
122+
}
123+
124+
private async void NotifyAsync(EventHandler<MessageEventArgs> messageEventHandler, V2MessageList messageList)
125+
{
126+
// Notify each handler in a separate task, maintaining the order of messages in the list, and
127+
// get back to reading the data feed again without waiting for listeners to process messages.
128+
Task task;
129+
if (_tasks.TryGetValue(messageEventHandler, out task))
130+
{
131+
await task;
132+
}
133+
_tasks[messageEventHandler] = Task.Run(() => Notify(messageEventHandler, messageList));
134+
}
135+
136+
private void Notify(EventHandler<MessageEventArgs> messageEventHandler, V2MessageList messageList)
137+
{
138+
foreach (var message in messageList)
139+
{
140+
try
141+
{
142+
messageEventHandler.Invoke(this, new MessageEventArgs(message));
143+
}
144+
catch (Exception e)
145+
{
146+
_traceSource.TraceEvent(
147+
TraceEventType.Error, 0,
148+
"Unhandled exception caught when notifying listener about message with ID \"{0}\": {1}",
149+
message.Id, e);
150+
}
151+
}
152+
}
153+
154+
private Firehose CreateFirehose()
155+
{
156+
return _apiExecutor.Execute(_firehoseApi.V1FirehoseCreatePost, _authTokens.SessionToken, _authTokens.KeyManagerToken);
157+
}
158+
159+
private V2MessageList ReadFirehose(string id, int? maxMessages = null)
160+
{
161+
return _apiExecutor.Execute(_firehoseApi.V2FirehoseIdReadGet, id, _authTokens.SessionToken, _authTokens.KeyManagerToken, maxMessages);
162+
}
163+
164+
private V2MessageList ReadFirehose(ref Firehose firehose, int? maxMessages = null)
165+
{
166+
var countFirehoseErrors = 0;
167+
while (true)
168+
{
169+
try
170+
{
171+
var messageList = ReadFirehose(firehose.Id, maxMessages);
172+
if (countFirehoseErrors > 0)
173+
{
174+
_traceSource.TraceEvent(
175+
TraceEventType.Information, 0,
176+
"Firehose re-established.");
177+
}
178+
return messageList;
179+
}
180+
catch (ApiException e)
181+
{
182+
++countFirehoseErrors;
183+
if (countFirehoseErrors >= 2)
184+
{
185+
throw;
186+
}
187+
_traceSource.TraceEvent(
188+
TraceEventType.Error, 0,
189+
"Unhandled API exception caught when reading firehose, retrying: {0}", e);
190+
firehose = CreateFirehose();
191+
}
192+
}
193+
}
194+
}
195+
}

src/SymphonyOSS.RestApiClient/SymphonyOSS.RestApiClient.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
<Reference Include="System.Xml" />
5050
</ItemGroup>
5151
<ItemGroup>
52+
<Compile Include="Api\AgentApi\FirehoseApi.cs" />
5253
<Compile Include="Api\AgentApi\DatafeedApi.cs" />
5354
<Compile Include="Api\AgentApi\MessageEventArgs.cs" />
5455
<Compile Include="Api\AgentApi\AttachmentsApi.cs" />
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Licensed to the Symphony Software Foundation (SSF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The SSF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
namespace SymphonyOSS.RestApiClient.Tests
19+
{
20+
using System;
21+
using System.Threading;
22+
using System.Threading.Tasks;
23+
using Api;
24+
using Api.AgentApi;
25+
using Authentication;
26+
using Generated.OpenApi.AgentApi.Client;
27+
using Generated.OpenApi.AgentApi.Model;
28+
using Moq;
29+
using Xunit;
30+
31+
public class FirehoseApiTest
32+
{
33+
private readonly Configuration _configuration;
34+
35+
private readonly FirehoseApi _firehoseApi;
36+
37+
private readonly Mock<IApiExecutor> _apiExecutorMock;
38+
39+
public FirehoseApiTest()
40+
{
41+
var sessionManagerMock = new Mock<IAuthTokens>();
42+
sessionManagerMock.Setup(obj => obj.SessionToken).Returns("sessionToken");
43+
sessionManagerMock.Setup(obj => obj.KeyManagerToken).Returns("keyManagerToken");
44+
_configuration = new Configuration();
45+
_apiExecutorMock = new Mock<IApiExecutor>();
46+
_firehoseApi = new FirehoseApi(sessionManagerMock.Object, _configuration, _apiExecutorMock.Object);
47+
}
48+
49+
[Fact]
50+
public void EnsureHandler_receives_messages()
51+
{
52+
var semaphore = new Semaphore(0, int.MaxValue);
53+
var messageList = CreateMessageList(2);
54+
_apiExecutorMock.Setup(obj => obj.Execute(It.IsAny<Func<string, string, Firehose>>(), "sessionToken", "keyManagerToken")).Returns(new Firehose("streamId"));
55+
_apiExecutorMock.Setup(obj => obj.Execute(It.IsAny<Func<string, string, string, int?, V2MessageList>>(), "streamId", "sessionToken", "keyManagerToken", (int?)null))
56+
.Returns(messageList);
57+
var messagesReceived = 0;
58+
_firehoseApi.OnMessage += (_, messageEventArgs) =>
59+
{
60+
++messagesReceived;
61+
if ((messageEventArgs.Message as V2Message)?.Id == "msg2")
62+
{
63+
semaphore.Release(1);
64+
}
65+
};
66+
var task = Task.Run(() => _firehoseApi.Listen());
67+
semaphore.WaitOne();
68+
_firehoseApi.Stop();
69+
task.Wait();
70+
Assert.True(messagesReceived >= 2);
71+
}
72+
73+
74+
[Fact]
75+
public void EnsureRemoved_handler_does_not_receive_messages()
76+
{
77+
var sendSemaphore = new Semaphore(1, int.MaxValue);
78+
var mainSemaphore = new Semaphore(0, int.MaxValue);
79+
var messagesSent = 0;
80+
_apiExecutorMock.Setup(obj => obj.Execute(It.IsAny<Func<string, string, Firehose>>(), "sessionToken", "keyManagerToken")).Returns(new Firehose("streamId"));
81+
_apiExecutorMock.Setup(obj => obj.Execute(It.IsAny<Func<string, string, string, int?, V2MessageList>>(), "streamId", "sessionToken", "keyManagerToken", (int?)null))
82+
.Returns(() =>
83+
{
84+
if (messagesSent <= 1)
85+
{
86+
sendSemaphore.WaitOne();
87+
}
88+
if (messagesSent < 10)
89+
{
90+
var messageList = CreateMessageList(1, messagesSent);
91+
messagesSent += messageList.Count;
92+
return messageList;
93+
}
94+
else
95+
{
96+
mainSemaphore.Release();
97+
return null;
98+
}
99+
});
100+
var messagesReceived = 0;
101+
EventHandler<MessageEventArgs> handler = null;
102+
handler = (_, messageEventArgs) =>
103+
{
104+
++messagesReceived;
105+
_firehoseApi.OnMessage -= handler;
106+
sendSemaphore.Release(1);
107+
};
108+
_firehoseApi.OnMessage += handler;
109+
var task = Task.Run(() => _firehoseApi.Listen());
110+
mainSemaphore.WaitOne();
111+
_firehoseApi.Stop();
112+
task.Wait();
113+
Assert.Equal(10, messagesSent);
114+
Assert.Equal(1, messagesReceived);
115+
}
116+
117+
[Fact]
118+
public void EnsureListen_can_be_stopped()
119+
{
120+
var semaphore = new Semaphore(0, int.MaxValue);
121+
_apiExecutorMock.Setup(obj => obj.Execute(It.IsAny<Func<string, string, Firehose>>(), "sessionToken", "keyManagerToken")).Returns(new Firehose("streamId"));
122+
_apiExecutorMock.Setup(obj => obj.Execute(It.IsAny<Func<string, string, string, int?, V2MessageList>>(), "streamId", "sessionToken", "keyManagerToken", (int?)null))
123+
.Returns((V2MessageList)null)
124+
.Callback(() =>
125+
{
126+
semaphore.Release(1);
127+
});
128+
var task = Task.Run(() => _firehoseApi.Listen());
129+
semaphore.WaitOne();
130+
_firehoseApi.Stop();
131+
task.Wait();
132+
_apiExecutorMock.Verify(obj => obj.Execute(It.IsAny<Func<string, string, string, int?, V2MessageList>>(), "streamId", "sessionToken", "keyManagerToken", (int?)null));
133+
}
134+
135+
private V2MessageList CreateMessageList(int count, int startId = 1)
136+
{
137+
var result = new V2MessageList();
138+
for (var i = 0; i < count; ++i)
139+
{
140+
result.Add(new V2Message("msg" + (startId + i), "timestamp", "messageType", "streamId", "message", 1));
141+
};
142+
return result;
143+
}
144+
}
145+
}

test/SymphonyOSS.RestApiClient.Tests/SymphonyOSS.RestApiClient.Tests.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
</Otherwise>
8686
</Choose>
8787
<ItemGroup>
88+
<Compile Include="FirehoseApiTest.cs" />
8889
<Compile Include="MessageParserTest.cs" />
8990
<Compile Include="RoomMembershipApiTest.cs" />
9091
<Compile Include="SecurityApiTest.cs" />

0 commit comments

Comments
 (0)