Skip to content

Commit f7c8212

Browse files
author
Matthias Huerbe
committed
unity cogstream and cpop clients
1 parent f4cc54c commit f7c8212

37 files changed

+738
-0
lines changed

Runtime.meta

+8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Runtime/CognitiveXR.asmdef

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
{
2+
"name": "CognitiveXR",
3+
"references": [],
4+
"includePlatforms": [],
5+
"excludePlatforms": [],
6+
"allowUnsafeCode": false,
7+
"overrideReferences": false,
8+
"precompiledReferences": [],
9+
"autoReferenced": true,
10+
"defineConstraints": [],
11+
"versionDefines": [],
12+
"noEngineReferences": false
13+
}

Runtime/CognitiveXR.asmdef.meta

+7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Runtime/Cpop.meta

+8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Runtime/Cpop/CpopData.cs

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using System.Collections.Generic;
2+
3+
namespace CognitiveXR.Cpop
4+
{
5+
[System.Serializable]
6+
public struct CpopData
7+
{
8+
public float Timestamp;
9+
public string Type;
10+
public Coordinates Position;
11+
public List<Coordinates> Shape;
12+
13+
public override string ToString()
14+
{
15+
return $"{Timestamp}: {Type}/{Position}";
16+
}
17+
}
18+
19+
[System.Serializable]
20+
public struct Coordinates
21+
{
22+
public float X;
23+
public float Y;
24+
public float Z;
25+
26+
public override string ToString()
27+
{
28+
return $"({X},{Y},{Z})";
29+
}
30+
}
31+
}

Runtime/Cpop/CpopData.cs.meta

+11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Runtime/Cpop/CpopSubscriber.cs

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Threading;
4+
using MQTTnet;
5+
using MQTTnet.Client;
6+
using MQTTnet.Client.Options;
7+
using UnityEngine;
8+
9+
namespace CognitiveXR.Cpop
10+
{
11+
public class CpopServerOptions
12+
{
13+
public string Server { get; set; }
14+
public int? Port { get; set; }
15+
}
16+
17+
public class CpopSubscriber
18+
{
19+
public ConcurrentQueue<CpopData> Queue { get; }
20+
private CpopServerOptions _options;
21+
private CancellationTokenSource _cancellationTokenSource;
22+
private IMqttClient _client;
23+
24+
public CpopSubscriber(ConcurrentQueue<CpopData> queue, CpopServerOptions options)
25+
{
26+
Queue = queue;
27+
_options = options;
28+
_cancellationTokenSource = new CancellationTokenSource();
29+
var factory = new MqttFactory();
30+
_client = factory.CreateMqttClient();
31+
}
32+
33+
public CpopSubscriber(CpopServerOptions options) : this(new ConcurrentQueue<CpopData>(), options)
34+
{
35+
}
36+
37+
public CpopSubscriber() : this(new CpopServerOptions {Server = "localhost"})
38+
{
39+
}
40+
41+
public void Unsubscribe()
42+
{
43+
_cancellationTokenSource.Cancel();
44+
}
45+
46+
public async void Subscribe()
47+
{
48+
var options = new MqttClientOptionsBuilder()
49+
.WithClientId("CS-Client")
50+
.WithTcpServer(_options.Server, _options.Port)
51+
.WithCleanSession()
52+
.Build();
53+
_client.UseApplicationMessageReceivedHandler(DefaultCpopMessageHandlerJson);
54+
await _client.ConnectAsync(options, _cancellationTokenSource.Token);
55+
await _client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("cpop").Build());
56+
}
57+
58+
protected void DefaultCpopMessageHandlerJson(MqttApplicationMessageReceivedEventArgs e)
59+
{
60+
try
61+
{
62+
var payload = e.ApplicationMessage.Payload;
63+
64+
String jsonText = System.Text.Encoding.UTF8.GetString(payload);
65+
66+
var cpopData = JsonUtility.FromJson<CpopData>(jsonText);
67+
68+
Queue.Enqueue(cpopData);
69+
70+
}
71+
catch (Exception exp)
72+
{
73+
Debug.LogError(exp);
74+
}
75+
}
76+
}
77+
}

Runtime/Cpop/CpopSubscriber.cs.meta

+11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Runtime/cogstream.meta

+8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+

2+
using System.Collections.Generic;
3+
4+
namespace CognitiveXR.CogStream
5+
{
6+
public class DebugReceiveChannel : ResultReceiveChannel
7+
{
8+
protected override List<EngineResult> ParseResultPacket(ResultPacket resultPacket)
9+
{
10+
EngineResult engineResult = new EngineResult()
11+
{
12+
frameId = resultPacket.frameId,
13+
seconds = resultPacket.seconds,
14+
nanoseconds = resultPacket.nanoseconds,
15+
result = System.Text.Encoding.UTF8.GetString(resultPacket.data),
16+
};
17+
18+
return new List<EngineResult>{ engineResult };
19+
}
20+
}
21+
}

Runtime/cogstream/DebugReceiveChannel.cs.meta

+11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Runtime/cogstream/EngineClient.cs

+113
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
using System;
2+
using System.Linq;
3+
using System.Net;
4+
using System.Net.Sockets;
5+
using System.Text;
6+
7+
namespace CognitiveXR.CogStream
8+
{
9+
public class EngineClient
10+
{
11+
private readonly StreamSpec streamSpec;
12+
private NetworkStream networkStream;
13+
private NetworkStreamResultPacketScanner packetScanner;
14+
private readonly IFrameSendChannel sendChannel;
15+
private readonly ResultReceiveChannel resultReceiveChannel;
16+
private readonly uint streamId;
17+
18+
private TcpClient client;
19+
20+
public EngineClient(StreamSpec streamSpec, IFrameSendChannel sendChannel,
21+
ResultReceiveChannel resultReceiveChannel, uint streamId = 0)
22+
{
23+
this.streamSpec = streamSpec;
24+
this.sendChannel = sendChannel;
25+
this.resultReceiveChannel = resultReceiveChannel;
26+
this.streamId = streamId;
27+
this.sendChannel.SetStreamId(this.streamId);
28+
}
29+
30+
public async void Open()
31+
{
32+
// TODO: implement exception handling
33+
string[] parts = streamSpec.engineAddress.Split(':');
34+
35+
IPAddress ipAddress = IPAddress.Parse(parts[0]);
36+
IPEndPoint endPoint = new IPEndPoint(ipAddress, int.Parse(parts[1]));
37+
38+
try
39+
{
40+
client = new TcpClient();
41+
{
42+
await client.ConnectAsync(endPoint.Address, endPoint.Port);
43+
44+
string json = streamSpecToJson(streamSpec);
45+
46+
byte[] jsonAsBytes = Encoding.UTF8.GetBytes(json);
47+
byte[] lengthInByte = BitConverter.GetBytes(jsonAsBytes.Length);
48+
49+
byte[] data = new byte[4 + jsonAsBytes.Length];
50+
Buffer.BlockCopy(lengthInByte, 0, data, 0, lengthInByte.Length);
51+
Buffer.BlockCopy(jsonAsBytes, 0, data, 4, jsonAsBytes.Length);
52+
53+
networkStream = client.GetStream();
54+
await networkStream.WriteAsync(data, 0, data.Length);
55+
await networkStream.FlushAsync();
56+
57+
NetworkStreamFrameWriter frameWriter = new NetworkStreamFrameWriter(networkStream);
58+
sendChannel.SetWriter(frameWriter);
59+
packetScanner = new NetworkStreamResultPacketScanner(networkStream);
60+
resultReceiveChannel.SetResultPacketScanner(packetScanner);
61+
}
62+
}
63+
catch (Exception e)
64+
{
65+
Console.WriteLine(e);
66+
throw;
67+
}
68+
69+
}
70+
71+
public bool isConnected()
72+
{
73+
return (client != null) && (client.Connected);
74+
}
75+
76+
private string streamSpecToJson(StreamSpec streamSpec)
77+
{
78+
string attributes = streamSpec.attributes.Aggregate(
79+
"\"attributes\": {",
80+
(current, streamSpecAttribute)
81+
=> current + $"\"{streamSpecAttribute.Key}\": [\"{streamSpecAttribute.Value[0]}\"],");
82+
83+
attributes = attributes.TrimEnd(',');
84+
85+
attributes += "}";
86+
87+
return $"{{" +
88+
$"\"engineAddress\": \"{streamSpec.engineAddress}\"" +
89+
", " + attributes +
90+
$"}}";
91+
}
92+
93+
/// <summary>
94+
/// Returns the send channel of this engine
95+
/// </summary>
96+
/// <typeparam name="T"></typeparam>
97+
/// <returns></returns>
98+
public T GetSendChannel<T>() where T : IFrameSendChannel
99+
{
100+
return (T) sendChannel;
101+
}
102+
103+
/// <summary>
104+
/// Returns the receive channel of this engine
105+
/// </summary>
106+
/// <typeparam name="T">where T is of type ResultReceiveChannel</typeparam>
107+
/// <returns></returns>
108+
public T GetReceiveChannel<T>() where T : ResultReceiveChannel
109+
{
110+
return (T) resultReceiveChannel;
111+
}
112+
}
113+
}

Runtime/cogstream/EngineClient.cs.meta

+11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Runtime/cogstream/EngineResult.cs

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+

2+
namespace CognitiveXR.CogStream
3+
{
4+
public class EngineResult
5+
{
6+
public uint frameId;
7+
public uint seconds;
8+
public uint nanoseconds;
9+
public string result;
10+
}
11+
}

Runtime/cogstream/EngineResult.cs.meta

+11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Runtime/cogstream/Frame.cs

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using System;
2+
3+
namespace CognitiveXR.CogStream
4+
{
5+
public struct Frame
6+
{
7+
public uint frameId;
8+
public DateTime timestamp;
9+
public int width;
10+
public int height;
11+
public byte[] rawFrame;
12+
}
13+
}

Runtime/cogstream/Frame.cs.meta

+11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)