-
Notifications
You must be signed in to change notification settings - Fork 525
/
Copy pathSimulatedDataSource.cs
200 lines (174 loc) · 8.38 KB
/
SimulatedDataSource.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
// Copyright 2023 Esri.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
// You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
// language governing permissions and limitations under the License.
using Esri.ArcGISRuntime.Data;
using Esri.ArcGISRuntime.Geometry;
using Esri.ArcGISRuntime.RealTime;
using System.Diagnostics;
using System.Text.Json;
namespace ArcGIS.Samples.AddCustomDynamicEntityDataSource
{
#nullable enable
public class SimulatedDataSource : DynamicEntityDataSource
{
// Hold a reference to the file stream reader, the process task, and the cancellation token source.
private Task? _processTask;
private StreamReader? _streamReader;
private CancellationTokenSource? _cancellationTokenSource;
private List<Field>? _fields;
public SimulatedDataSource(string filePath, string entityIdField, TimeSpan delay)
{
FilePath = filePath;
EntityIdField = entityIdField;
Delay = delay;
}
#region Properties
// Expose the file path, entity ID field, and delay length as properties.
public string FilePath { get; }
public string EntityIdField { get; }
public TimeSpan Delay { get; }
#endregion
protected override async Task<DynamicEntityDataSourceInfo> OnLoadAsync()
{
return await Task.Run(() =>
{
// Derive schema from the first row in the custom data source.
_fields = GetSchema();
// Open the file for processing.
Stream stream = File.OpenRead(FilePath);
_streamReader = new StreamReader(stream);
// Create a new DynamicEntityDataSourceInfo using the entity ID field and the fields derived from the attributes of each observation in the custom data source.
return new DynamicEntityDataSourceInfo(EntityIdField, _fields) { SpatialReference = SpatialReferences.Wgs84 };
});
}
protected override Task OnConnectAsync(CancellationToken cancellationToken)
{
// On connecting to the custom data source begin processing the file.
_cancellationTokenSource = new();
_processTask = Task.Run(() => ObservationProcessLoopAsync(), _cancellationTokenSource.Token);
return Task.CompletedTask;
}
protected override async Task OnDisconnectAsync()
{
// On disconnecting from the custom data source, stop processing the file.
_cancellationTokenSource?.Cancel();
if (_processTask is not null) await _processTask;
_cancellationTokenSource = null;
_processTask = null;
}
private async Task ObservationProcessLoopAsync()
{
try
{
while (!_cancellationTokenSource!.IsCancellationRequested)
{
// Process the next observation.
var processed = await ProcessNextObservation();
// If the end of the file has been reached, break out of the loop.
if (_streamReader != null && _streamReader.EndOfStream) break;
// If the observation was not processed, continue to the next observation.
if (!processed) continue;
// If there is no delay, yield to the UI thread otherwise delay for the specified amount of time.
if (Delay == TimeSpan.Zero)
{
await Task.Yield();
}
else
{
await Task.Delay(Delay, _cancellationTokenSource.Token);
}
}
}
catch (Exception ex)
{
Debug.WriteLine(ex.ToString());
}
}
private async Task<bool> ProcessNextObservation()
{
_ = _streamReader ?? throw new ArgumentNullException("File stream not available.");
// Read the next observation.
var json = await _streamReader.ReadLineAsync();
// If there is no json to read or the schema is not available, return false.
if (string.IsNullOrEmpty(json) || _fields is null) return false;
try
{
JsonElement jsonElement = JsonSerializer.Deserialize<JsonElement>(json);
// Create a new MapPoint from the x and y coordinates of the observation.
MapPoint? point = null;
if (jsonElement.TryGetProperty("geometry", out JsonElement jsonGeometry))
{
point = new MapPoint(
jsonGeometry.GetProperty("x").GetDouble(),
jsonGeometry.GetProperty("y").GetDouble(),
SpatialReferences.Wgs84);
}
// Get the dictionary of attributes from the observation using the field names as keys.
Dictionary<string, object?> attributes = new();
if (jsonElement.TryGetProperty("attributes", out JsonElement jsonAttributes))
{
foreach (var field in _fields)
{
if (jsonAttributes.TryGetProperty(field.Name, out JsonElement prop))
{
object? value = null;
if (prop.ValueKind != JsonValueKind.Null)
{
if (prop.ValueKind == JsonValueKind.Number && field.FieldType == FieldType.Float64)
{
value = prop.GetDouble();
}
else if (prop.ValueKind == JsonValueKind.Number && field.FieldType == FieldType.Int32)
{
value = prop.GetInt32();
}
else if (prop.ValueKind == JsonValueKind.String)
{
value = prop.GetString();
}
}
attributes.Add(field.Name, value);
}
}
}
// Add the observation to the custom data source.
AddObservation(point, attributes);
return true;
}
catch (Exception ex)
{
Debug.WriteLine($"{ex}");
return false;
}
}
private static List<Field> GetSchema()
{
// Return a list of fields matching the attributes of each observation in the custom data source.
return new List<Field>()
{
new Field(FieldType.Text, "MMSI", string.Empty, 256),
new Field(FieldType.Float64, "BaseDateTime", string.Empty, 8),
new Field(FieldType.Float64, "LAT", string.Empty, 8),
new Field(FieldType.Float64, "LONG", string.Empty, 8),
new Field(FieldType.Float64, "SOG", string.Empty, 8),
new Field(FieldType.Float64, "COG", string.Empty, 8),
new Field(FieldType.Float64, "Heading", string.Empty, 8),
new Field(FieldType.Text, "VesselName", string.Empty, 256),
new Field(FieldType.Text, "IMO", string.Empty, 256),
new Field(FieldType.Text, "CallSign", string.Empty, 256),
new Field(FieldType.Text, "VesselType", string.Empty, 256),
new Field(FieldType.Text, "Status", string.Empty, 256),
new Field(FieldType.Float64, "Length", string.Empty, 8),
new Field(FieldType.Float64, "Width", string.Empty, 8),
new Field(FieldType.Text, "Cargo", string.Empty, 256),
new Field(FieldType.Text, "globalid", string.Empty, 256)
};
}
}
#nullable disable
}