-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathEngineClient.cs
134 lines (115 loc) · 4.56 KB
/
EngineClient.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
using System;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
namespace CognitiveXR.CogStream
{
/// <summary>
/// Engine Client manages the connection to the engine and holds sender and receiver components
/// </summary>
public class EngineClient
{
private readonly StreamSpec streamSpec;
private NetworkStream networkStream;
private NetworkStreamResultPacketScanner packetScanner;
private readonly IFrameSendChannel sendChannel;
private readonly ResultReceiveChannel resultReceiveChannel;
private readonly uint streamId;
private TcpClient client;
/// <summary>
/// Create a Engine Client instance
/// </summary>
/// <param name="streamSpec"></param>
/// <param name="sendChannel"></param>
/// <param name="resultReceiveChannel"></param>
/// <param name="streamId"></param>
public EngineClient(StreamSpec streamSpec, IFrameSendChannel sendChannel,
ResultReceiveChannel resultReceiveChannel, uint streamId = 0)
{
this.streamSpec = streamSpec;
this.sendChannel = sendChannel;
this.resultReceiveChannel = resultReceiveChannel;
this.streamId = streamId;
this.sendChannel.SetStreamId(this.streamId);
}
/// <summary>
/// Open a connection to the engine
/// </summary>
public async void Open()
{
string[] parts = streamSpec.engineAddress.Split(':');
IPAddress ipAddress = IPAddress.Parse(parts[0]);
IPEndPoint endPoint = new IPEndPoint(ipAddress, int.Parse(parts[1]));
try
{
client = new TcpClient();
{
await client.ConnectAsync(endPoint.Address, endPoint.Port);
string json = streamSpec.ToJson();
byte[] jsonAsBytes = Encoding.UTF8.GetBytes(json);
byte[] lengthInByte = BitConverter.GetBytes(jsonAsBytes.Length);
byte[] data = new byte[4 + jsonAsBytes.Length];
Buffer.BlockCopy(lengthInByte, 0, data, 0, lengthInByte.Length);
Buffer.BlockCopy(jsonAsBytes, 0, data, 4, jsonAsBytes.Length);
networkStream = client.GetStream();
await networkStream.WriteAsync(data, 0, data.Length);
await networkStream.FlushAsync();
NetworkStreamFrameWriter frameWriter = new NetworkStreamFrameWriter(networkStream);
sendChannel.SetWriter(frameWriter);
packetScanner = new NetworkStreamResultPacketScanner(networkStream);
resultReceiveChannel.SetResultPacketScanner(packetScanner);
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
/// <summary>
/// Returns whether the client is connected or not
/// </summary>
/// <returns></returns>
public bool isConnected()
{
return (client != null) && (client.Connected);
}
/// <summary>
/// Converts StreamSpec to json
/// </summary>
/// <param name="streamSpec"></param>
/// <returns></returns>
private string streamSpecToJson(StreamSpec streamSpec)
{
string attributes = streamSpec.attributes.Aggregate(
"\"attributes\": {",
(current, streamSpecAttribute)
=> current + $"\"{streamSpecAttribute.Key}\": [\"{streamSpecAttribute.Value[0]}\"],");
attributes = attributes.TrimEnd(',');
attributes += "}";
return $"{{" +
$"\"engineAddress\": \"{streamSpec.engineAddress}\"" +
", " + attributes +
$"}}";
}
/// <summary>
/// Returns the send channel of this engine
/// </summary>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public T GetSendChannel<T>() where T : IFrameSendChannel
{
return (T) sendChannel;
}
/// <summary>
/// Returns the receive channel of this engine
/// </summary>
/// <typeparam name="T">where T is of type ResultReceiveChannel</typeparam>
/// <returns></returns>
public T GetReceiveChannel<T>() where T : ResultReceiveChannel
{
return (T) resultReceiveChannel;
}
}
}