-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMediatorClient.cs
138 lines (120 loc) · 4.45 KB
/
MediatorClient.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using UnityEngine;
namespace CognitiveXR.CogStream
{
/// <summary>
/// The MediatorClient establishes a connection to the mediator and handles communication with it
/// </summary>
public class MediatorClient
{
private ClientWebSocket webSocket;
private readonly Uri uri;
public MediatorClient(string url)
{
uri = new Uri(url);
}
public bool IsOpen() => webSocket != null && webSocket.State == WebSocketState.Open;
/// <summary>
/// Connect to the Mediator
/// </summary>
public async Task Open()
{
await ConnectToServer(uri);
}
/// <summary>
/// Close a connection if it is open
/// </summary>
public async Task Close()
{
if (webSocket.State != WebSocketState.Open) return;
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, String.Empty, CancellationToken.None);
}
/// <summary>
/// Connects to the server proper
/// </summary>
/// <param name="uri"></param>
private async Task ConnectToServer(Uri uri)
{
try
{
webSocket = new ClientWebSocket();
await webSocket.ConnectAsync(uri, CancellationToken.None);
Debug.Log("connected");
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
/// <summary>
/// Returns a list of available engines
/// </summary>
/// <returns></returns>
public async Task<List<Engine>> GetEngines()
{
await SendMessage(MediatorClientMessage.GetServicesMessage());
Message service = await Receive();
return service.content.engines;
}
/// <summary>
/// sends the launch command for an engine and returns its address
/// </summary>
/// <param name="engine">address of the engine</param>
/// <returns></returns>
public async Task<string> StartEngine(Engine engine)
{
Message selectedEngine = MediatorClientMessage.GetSelectEngineMessage(engine);
await SendMessage(selectedEngine);
Message engineAddressMessage = await Receive();
return engineAddressMessage.content.engineAddress;
}
private Task SendMessage(Message message)
{
return SendMessage(message.ToJson());
}
private async Task SendMessage(string message)
{
byte[] encoded = Encoding.UTF8.GetBytes(message);
var buffer = new ArraySegment<Byte>(encoded, 0, encoded.Length);
await webSocket.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None);
}
private async Task<Message> Receive()
{
ArraySegment<Byte> buffer = new ArraySegment<byte>(new byte[1024]);
if (webSocket.State == WebSocketState.Open)
{
WebSocketReceiveResult result = null;
using (var memoryStream = new MemoryStream())
{
do
{
result = await webSocket.ReceiveAsync(buffer, CancellationToken.None);
memoryStream.Write(buffer.Array, buffer.Offset, result.Count);
} while (!result.EndOfMessage);
memoryStream.Seek(0, SeekOrigin.Begin);
if (result.MessageType == WebSocketMessageType.Text)
{
using (var reader = new StreamReader(memoryStream, Encoding.UTF8))
{
string answer = reader.ReadToEnd();
return MediatorClientMessage.Parse(answer);
}
}
else if (result.MessageType == WebSocketMessageType.Close)
{
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, String.Empty,
CancellationToken.None);
}
}
}
throw new Exception("No connection to the server");
}
}
}