From 1ece080f872cb0f20ce48bfbff13720872a15337 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 12 Nov 2020 10:48:55 +0100 Subject: [PATCH 1/2] fix: Blocking API batches Point by precision --- .../client/internal/WriteApiBlockingImpl.java | 8 +- .../influxdb/client/WriteApiBlockingTest.java | 93 +++++++++++++++++++ 2 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 client/src/test/java/com/influxdb/client/WriteApiBlockingTest.java diff --git a/client/src/main/java/com/influxdb/client/internal/WriteApiBlockingImpl.java b/client/src/main/java/com/influxdb/client/internal/WriteApiBlockingImpl.java index b6cfef9e951..f3f070b6ea0 100644 --- a/client/src/main/java/com/influxdb/client/internal/WriteApiBlockingImpl.java +++ b/client/src/main/java/com/influxdb/client/internal/WriteApiBlockingImpl.java @@ -22,6 +22,7 @@ package com.influxdb.client.internal; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Objects; import java.util.logging.Level; @@ -147,7 +148,12 @@ public void writePoints(@Nonnull final String bucket, points .stream() .filter(Objects::nonNull) - .forEach(point -> write(bucket, org, point.getPrecision(), new BatchWriteDataPoint(point, options))); + .collect(Collectors.groupingBy(Point::getPrecision, LinkedHashMap::new, Collectors.toList())) + .forEach((precision, grouped) -> write( + bucket, + org, + precision, + grouped.stream().map(it -> new BatchWriteDataPoint(it, options)))); } @Override diff --git a/client/src/test/java/com/influxdb/client/WriteApiBlockingTest.java b/client/src/test/java/com/influxdb/client/WriteApiBlockingTest.java new file mode 100644 index 00000000000..9f6bf2bf22a --- /dev/null +++ b/client/src/test/java/com/influxdb/client/WriteApiBlockingTest.java @@ -0,0 +1,93 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.client; + +import java.util.Arrays; + +import com.influxdb.client.domain.WritePrecision; +import com.influxdb.client.internal.AbstractInfluxDBClientTest; +import com.influxdb.client.write.Point; + +import okhttp3.mockwebserver.RecordedRequest; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +/** + * @author Jakub Bednar (12/11/2020 10:25) + */ +@RunWith(JUnitPlatform.class) +class WriteApiBlockingTest extends AbstractInfluxDBClientTest { + + @Test + public void groupPointsByPrecision() throws InterruptedException { + mockServer.enqueue(createResponse("{}")); + + Point point1 = Point.measurement("h2o").addTag("location", "europe").addField("level", 1).time(1L, WritePrecision.NS); + Point point2 = Point.measurement("h2o").addTag("location", "europe").addField("level", 2).time(2L, WritePrecision.NS); + + influxDBClient + .getWriteApiBlocking() + .writePoints("b1", "org1", Arrays.asList(point1, point2)); + + Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(1); + + RecordedRequest request = takeRequest(); + + Assertions.assertThat("h2o,location=europe level=1i 1\nh2o,location=europe level=2i 2") + .isEqualTo(request.getBody().readUtf8()); + Assertions.assertThat("ns").isEqualTo(request.getRequestUrl().queryParameter("precision")); + Assertions.assertThat("b1").isEqualTo(request.getRequestUrl().queryParameter("bucket")); + Assertions.assertThat("org1").isEqualTo(request.getRequestUrl().queryParameter("org")); + } + @Test + public void groupPointsByPrecisionDifferent() throws InterruptedException { + mockServer.enqueue(createResponse("{}")); + mockServer.enqueue(createResponse("{}")); + + Point point1 = Point.measurement("h2o").addTag("location", "europe").addField("level", 1).time(1L, WritePrecision.NS); + Point point2 = Point.measurement("h2o").addTag("location", "europe").addField("level", 2).time(2L, WritePrecision.S); + + influxDBClient + .getWriteApiBlocking() + .writePoints("b1", "org1", Arrays.asList(point1, point2)); + + Assertions.assertThat(mockServer.getRequestCount()).isEqualTo(2); + + RecordedRequest request = takeRequest(); + + Assertions.assertThat("h2o,location=europe level=1i 1") + .isEqualTo(request.getBody().readUtf8()); + Assertions.assertThat("ns").isEqualTo(request.getRequestUrl().queryParameter("precision")); + Assertions.assertThat("b1").isEqualTo(request.getRequestUrl().queryParameter("bucket")); + Assertions.assertThat("org1").isEqualTo(request.getRequestUrl().queryParameter("org")); + + request = takeRequest(); + + Assertions.assertThat("h2o,location=europe level=2i 2") + .isEqualTo(request.getBody().readUtf8()); + Assertions.assertThat("s").isEqualTo(request.getRequestUrl().queryParameter("precision")); + Assertions.assertThat("b1").isEqualTo(request.getRequestUrl().queryParameter("bucket")); + Assertions.assertThat("org1").isEqualTo(request.getRequestUrl().queryParameter("org")); + } +} From f50ec6fc1e70157c6db1cf1e50a3ed51d36beda3 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 12 Nov 2020 10:51:25 +0100 Subject: [PATCH 2/2] docs: updated CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 77114a94fad..050d718a078 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Bug Fixes 1. [#173](https://github.com/influxdata/influxdb-client-java/pull/173): Query error could be after _success_ table +1. [#176](https://github.com/influxdata/influxdb-client-java/pull/176): Blocking API batches Point by precision ## 1.13.0 [2020-10-30]