Skip to content

feat: add QueryAsync overload with Type parameter (#230) #232

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 3.0.0 [unreleased]
### Breaking Changes
1. [#232](https://github.com/influxdata/influxdb-client-csharp/pull/232): Adds a `Type` overload for POCOs to `QueryAsync`. This will add `object ConvertToEntity(FluxRecord, Type)` to `IFluxResultMapper`


## 2.1.0 [unreleased]

### Bug Fixes
Expand Down
2 changes: 1 addition & 1 deletion Client.Core/Client.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<Description>InfluxDB Client Core - exceptions, validations, REST client.</Description>
<Authors>influxdb-client-csharp Contributors</Authors>
<AssemblyName>InfluxDB.Client.Core</AssemblyName>
<VersionPrefix>2.1.0</VersionPrefix>
<VersionPrefix>3.0.0</VersionPrefix>
<VersionSuffix>dev</VersionSuffix>

<PackageId>InfluxDB.Client.Core</PackageId>
Expand Down
28 changes: 22 additions & 6 deletions Client.Core/Flux/Internal/FluxResultMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,26 @@ public T ConvertToEntity<T>(FluxRecord fluxRecord)
return ToPoco<T>(fluxRecord);
}

public object ConvertToEntity(FluxRecord fluxRecord, Type type)
{
return ToPoco(fluxRecord, type);
}


/// <summary>
/// Maps FluxRecord into custom POCO class.
/// </summary>
/// <param name="record">the Flux record</param>
/// <typeparam name="T">the POCO type</typeparam>
/// <returns></returns>
/// <param name="type">the POCO type</param>
/// <returns>An POCO object</returns>
/// <exception cref="InfluxException"></exception>
internal T ToPoco<T>(FluxRecord record)
internal object ToPoco(FluxRecord record, Type type)
{
Arguments.CheckNotNull(record, "Record is required");

try
{
var type = typeof(T);
var poco = (T) Activator.CreateInstance(type);
var poco = Activator.CreateInstance(type);

// copy record to case insensitive dictionary (do this once)
var recordValues =
Expand Down Expand Up @@ -100,6 +105,17 @@ internal T ToPoco<T>(FluxRecord record)
}
}


/// <summary>
/// Maps FluxRecord into custom POCO class.
/// </summary>
/// <param name="record">the Flux record</param>
/// <typeparam name="T">the POCO type</typeparam>
/// <returns></returns>
/// <exception cref="InfluxException"></exception>
internal T ToPoco<T>(FluxRecord record)
=> (T)ToPoco(record, typeof(T));

private void SetFieldValue<T>(T poco, PropertyInfo property, object value)
{
if (property == null || value == null || !property.CanWrite)
Expand Down Expand Up @@ -166,7 +182,7 @@ private DateTime ToDateTimeValue(object value)

if (value is IConvertible)
{
return (DateTime) Convert.ChangeType(value, typeof(DateTime));
return (DateTime)Convert.ChangeType(value, typeof(DateTime));
}

throw new InvalidCastException(
Expand Down
9 changes: 9 additions & 0 deletions Client.Core/Flux/Internal/IFluxResultMapper.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using InfluxDB.Client.Core.Flux.Domain;

namespace InfluxDB.Client.Core.Flux.Internal
Expand All @@ -14,5 +15,13 @@ public interface IFluxResultMapper
/// <typeparam name="T">Type of DomainObject</typeparam>
/// <returns>Converted DomainObject</returns>
T ConvertToEntity<T>(FluxRecord fluxRecord);

/// <summary>
/// Converts FluxRecord to DomainObject specified by Type.
/// </summary>
/// <param name="fluxRecord">Flux record</param>
/// <param name="type">Type of DomainObject</param>
/// <returns>Converted DomainObject</returns>
object ConvertToEntity(FluxRecord fluxRecord, Type type);
}
}
23 changes: 23 additions & 0 deletions Client.Core/Internal/AbstractQueryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,29 @@ protected void ParseFluxResponseToLines(Action<String> onResponse,
}
}
}

public class FluxResponseConsumerPoco : FluxCsvParser.IFluxResponseConsumer
{
private readonly Action<ICancellable, object> _onNext;
private readonly IFluxResultMapper _converter;
private readonly Type _type;

public FluxResponseConsumerPoco(Action<ICancellable, object> onNext, IFluxResultMapper converter, Type type)
{
_onNext = onNext;
_converter = converter;
_type = type;
}

public void Accept(int index, ICancellable cancellable, FluxTable table)
{
}

public void Accept(int index, ICancellable cancellable, FluxRecord record)
{
_onNext(cancellable, _converter.ConvertToEntity(record,_type));
}
}

public class FluxResponseConsumerPoco<T> : FluxCsvParser.IFluxResponseConsumer
{
Expand Down
2 changes: 1 addition & 1 deletion Client.Test/AssemblyHelperTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class AssemblyHelperTest
public void GetAssemblyVersion()
{
var version = AssemblyHelper.GetVersion(typeof(InfluxDBClient));
Assert.AreEqual(2, Version.Parse(version).Major);
Assert.AreEqual(3, Version.Parse(version).Major);
Assert.GreaterOrEqual(Version.Parse(version).Minor, 0);
Assert.AreEqual(0, Version.Parse(version).Build);
Assert.AreEqual(0, Version.Parse(version).Revision);
Expand Down
2 changes: 1 addition & 1 deletion Client.Test/InfluxDbClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public async Task UserAgentHeader()
await _client.GetAuthorizationsApi().FindAuthorizationByIdAsync("id");

var request= MockServer.LogEntries.Last();
StringAssert.StartsWith("influxdb-client-csharp/2.", request.RequestMessage.Headers["User-Agent"].First());
StringAssert.StartsWith("influxdb-client-csharp/3.", request.RequestMessage.Headers["User-Agent"].First());
StringAssert.EndsWith(".0.0", request.RequestMessage.Headers["User-Agent"].First());
}

Expand Down
34 changes: 34 additions & 0 deletions Client.Test/QueryApiTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,39 @@ public async Task ParallelRequest()
var ts = stopWatch.Elapsed;
Assert.LessOrEqual(ts.TotalSeconds, 10, $"Elapsed time: {ts}");
}

[Test]
public async Task GenericAndTypeofCalls()
{
MockServer
.Given(Request.Create().WithPath("/api/v2/query").UsingPost())
.RespondWith(CreateResponse(Data));


var measurements = await _queryApi.QueryAsync<SyncPoco>("from(...");
var measurementsTypeof = await _queryApi.QueryAsync("from(...",typeof(SyncPoco));

Assert.AreEqual(2, measurements.Count);
Assert.AreEqual(2, measurementsTypeof.Count);
Assert.AreEqual(12.25, measurements[0].Value);
Assert.AreEqual(13.00, measurements[1].Value);
Assert.IsAssignableFrom<SyncPoco>(measurementsTypeof[0]);
var cast = measurementsTypeof.Cast<SyncPoco>().ToList();
Assert.AreEqual(measurements[0].Timestamp, cast[0].Timestamp);
Assert.AreEqual(12.25, cast[0].Value);
Assert.AreEqual(13.00, cast[1].Value);
}




private class SyncPoco
{
[Column("id", IsTag = true)] public string Tag { get; set; }

[Column("_value")] public double Value { get; set; }

[Column(IsTimestamp = true)] public Object Timestamp { get; set; }
}
}
}
2 changes: 1 addition & 1 deletion Client.Test/WriteApiTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ public void UserAgentHeader()
listener.Get<WriteSuccessEvent>();

var request= MockServer.LogEntries.Last();
StringAssert.StartsWith("influxdb-client-csharp/2.", request.RequestMessage.Headers["User-Agent"].First());
StringAssert.StartsWith("influxdb-client-csharp/3.", request.RequestMessage.Headers["User-Agent"].First());
StringAssert.EndsWith(".0.0", request.RequestMessage.Headers["User-Agent"].First());
}

Expand Down
2 changes: 1 addition & 1 deletion Client/Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<Description>The reference client that allows query, write and management (bucket, organization, users) for the InfluxDB 2.0.</Description>
<Authors>influxdb-client-csharp Contributors</Authors>
<AssemblyName>InfluxDB.Client</AssemblyName>
<VersionPrefix>2.1.0</VersionPrefix>
<VersionPrefix>3.0.0</VersionPrefix>
<VersionSuffix>dev</VersionSuffix>

<PackageId>InfluxDB.Client</PackageId>
Expand Down
6 changes: 6 additions & 0 deletions Client/Internal/DefaultDomainObjectMapper.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using InfluxDB.Client.Api.Domain;
using InfluxDB.Client.Core.Flux.Domain;
using InfluxDB.Client.Core.Flux.Internal;
Expand All @@ -18,6 +19,11 @@ public T ConvertToEntity<T>(FluxRecord fluxRecord)
return _resultMapper.ToPoco<T>(fluxRecord);
}

public object ConvertToEntity(FluxRecord fluxRecord, Type type)
{
return _resultMapper.ToPoco(fluxRecord, type);
}

public PointData ConvertToPointData<T>(T entity, WritePrecision precision)
{
return _measurementMapper.ToPoint(entity, precision);
Expand Down
110 changes: 110 additions & 0 deletions Client/QueryApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,116 @@ public Task QueryAsync<T>(Query query, string org, Action<ICancellable, T> onNex
return QueryAsync(query, org, consumer, onError, onComplete);
}

/// <summary>
/// Executes the Flux query against the InfluxDB 2.0 and synchronously map whole response
/// to list of object with given type.
///
/// <para>
/// NOTE: This method is not intended for large query results.
/// Use <see cref="QueryAsync(string,string,System.Action{InfluxDB.Client.Core.ICancellable,object},System.Action{System.Exception},System.Action,System.Type)"/>
/// for large data streaming.
/// </para>
/// </summary>
/// <param name="query">the flux query to execute</param>
/// <param name="pocoType">the type of measurement</param>
/// <returns>Measurements which are matched the query</returns>
public Task<List<object>> QueryAsync(string query, Type pocoType)
{
return QueryAsync(query, _options.Org, pocoType);
}

/// <summary>
/// Executes the Flux query against the InfluxDB 2.0 and synchronously map whole response
/// to list of object with given type.
///
/// <para>
/// NOTE: This method is not intended for large query results.
/// Use <see cref="QueryAsync(string,string,System.Action{InfluxDB.Client.Core.ICancellable,object},System.Action{System.Exception},System.Action,System.Type)"/>
/// for large data streaming.
/// </para>
/// </summary>
/// <param name="query">the flux query to execute</param>
/// <param name="org">the organization</param>
/// <param name="pocoType">the type of measurement</param>
/// <returns>Measurements which are matched the query</returns>
public Task<List<object>> QueryAsync(string query, string org, Type pocoType)
{
return QueryAsync(CreateQuery(query),org, pocoType);
}

/// <summary>
/// Executes the Flux query against the InfluxDB 2.0 and synchronously map whole response
/// to list of object with given type.
///
/// <para>
/// NOTE: This method is not intended for large query results.
/// Use <see cref="QueryAsync(string,string,System.Action{InfluxDB.Client.Core.ICancellable,object},System.Action{System.Exception},System.Action,System.Type)"/>
/// for large data streaming.
/// </para>
/// </summary>
/// <param name="query">the flux query to execute</param>
/// <param name="pocoType">the type of measurement</param>
/// <returns>Measurements which are matched the query</returns>
public Task<List<object>> QueryAsync(Query query, Type pocoType)
{
return QueryAsync(query, _options.Org, pocoType);
}

/// <summary>
/// Executes the Flux query against the InfluxDB 2.0 and synchronously map whole response
/// to list of object with given type.
///
/// <para>
/// NOTE: This method is not intended for large query results.
/// Use <see cref="QueryAsync(string,string,System.Action{InfluxDB.Client.Core.ICancellable,object},System.Action{System.Exception},System.Action,System.Type)"/>
/// for large data streaming.
/// </para>
/// </summary>
/// <param name="query">the flux query to execute</param>
/// <param name="org">the organization</param>
/// <param name="pocoType">the type of measurement</param>
/// <returns>Measurements which are matched the query</returns>
public async Task<List<object>> QueryAsync(Query query, string org, Type pocoType)
{
var measurements = new List<object>();
var consumer = new FluxResponseConsumerPoco((cancellable, poco) => { measurements.Add(poco); }, Mapper, pocoType);
await QueryAsync(query, org, consumer, ErrorConsumer, EmptyAction).ConfigureAwait(false);
return measurements;
}


/// <summary>
/// Executes the Flux query against the InfluxDB 2.0 and asynchronously stream Measurements
/// to a <see cref="onNext"/> consumer.
/// </summary>
/// <param name="query">the flux query to execute</param>
/// <param name="org">specifies the source organization</param>
/// <param name="onNext">the callback to consume the mapped Measurements with capability
/// to discontinue a streaming query</param>
/// <param name="onError">the callback to consume any error notification</param>
/// <param name="onComplete">the callback to consume a notification about successfully end of stream</param>
/// <param name="pocoType">the type of measurement</param>
/// <returns>async task</returns>
public Task QueryAsync(string query, string org, Action<ICancellable, object> onNext, Action<Exception> onError,
Action onComplete, Type pocoType)
{
return QueryAsync(CreateQuery(query, DefaultDialect), org, onNext, onError, onComplete, pocoType);
}

public Task QueryAsync(Query query, string org, Action<ICancellable, object> onNext, Action<Exception> onError,
Action onComplete, Type pocoType)
{
Arguments.CheckNotNull(query, nameof(query));
Arguments.CheckNotNull(onNext, nameof(onNext));
Arguments.CheckNotNull(onError, nameof(onError));
Arguments.CheckNotNull(onComplete, nameof(onComplete));

var consumer = new FluxResponseConsumerPoco(onNext, Mapper, pocoType);

return QueryAsync(query, org, consumer, onError, onComplete);

}

/// <summary>
/// Executes the Flux query against the InfluxDB and synchronously map whole response to <see cref="string"/> result.
///
Expand Down
10 changes: 6 additions & 4 deletions Examples/CustomDomainMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ private class DomainEntityConverter : IDomainObjectMapper
/// Convert to DomainObject.
/// </summary>
public T ConvertToEntity<T>(FluxRecord fluxRecord)
=> (T)ConvertToEntity(fluxRecord, typeof(T));

public object ConvertToEntity(FluxRecord fluxRecord, Type type)
{
if (typeof(T) != typeof(Sensor))
if (type != typeof(Sensor))
{
throw new NotSupportedException($"This converter doesn't supports: {typeof(T)}");
throw new NotSupportedException($"This converter doesn't supports: {type}");
}

var customEntity = new Sensor
Expand All @@ -59,8 +62,7 @@ public T ConvertToEntity<T>(FluxRecord fluxRecord)
Value = Convert.ToDouble(fluxRecord.GetValueByKey("data")),
Timestamp = fluxRecord.GetTime().GetValueOrDefault().ToDateTimeUtc(),
};

return (T) Convert.ChangeType(customEntity, typeof(T));
return Convert.ChangeType(customEntity, type);
}

/// <summary>
Expand Down
10 changes: 7 additions & 3 deletions Examples/CustomDomainMappingAndLinq.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,13 @@ private class DomainEntityConverter : IDomainObjectMapper, IMemberNameResolver
/// <summary>
/// Convert to DomainObject.
/// </summary>
public T ConvertToEntity<T>(FluxRecord fluxRecord)
public T ConvertToEntity<T>(FluxRecord fluxRecord)
=> (T)ConvertToEntity(fluxRecord, typeof(T));


public object ConvertToEntity(FluxRecord fluxRecord, Type type)
{
if (typeof(T) != typeof(DomainEntity))
if (type != typeof(DomainEntity))
{
throw new NotSupportedException($"This converter doesn't supports: {typeof(DomainEntity)}");
}
Expand All @@ -84,7 +88,7 @@ public T ConvertToEntity<T>(FluxRecord fluxRecord)
}
}

return (T) Convert.ChangeType(customEntity, typeof(T));
return Convert.ChangeType(customEntity, type);
}

/// <summary>
Expand Down
2 changes: 2 additions & 0 deletions Examples/PocoQueryWriteExample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ await client.GetWriteApiAsync()
"|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")";

var list = await client.GetQueryApi().QueryAsync<Sensor>(query);
//or as an alternative:
// var list = await client.GetQueryApi().QueryAsync(query, typeof(Sensor));

//
// Print result
Expand Down
Loading