diff --git a/.github/workflows/publish-documentation.yml b/.github/workflows/publish-documentation.yml
index be6deef8de..33baea4572 100644
--- a/.github/workflows/publish-documentation.yml
+++ b/.github/workflows/publish-documentation.yml
@@ -11,7 +11,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
- distribution: 'zulu'
+ distribution: 'temurin'
java-version: '21'
cache: 'maven'
- name: Publish Documentation
diff --git a/.github/workflows/publish-snapshot.yml b/.github/workflows/publish-snapshot.yml
index d5bb53d130..8d22f67352 100644
--- a/.github/workflows/publish-snapshot.yml
+++ b/.github/workflows/publish-snapshot.yml
@@ -11,7 +11,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
- distribution: 'zulu'
+ distribution: 'temurin'
java-version: '21'
cache: 'maven'
server-id: ossrh
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index 0056e9487b..bb1c9f9029 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -15,7 +15,7 @@ jobs:
uses: actions/setup-java@v4
with:
distribution: 'temurin'
- java-version: '8'
+ java-version: '11'
cache: 'maven'
server-id: ${{ env.maven_server_id }}
server-username: MAVEN_USERNAME
diff --git a/.github/workflows/sanity-check.yml b/.github/workflows/sanity-check.yml
index 3e56579e7b..b8f9db90b6 100644
--- a/.github/workflows/sanity-check.yml
+++ b/.github/workflows/sanity-check.yml
@@ -23,8 +23,8 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
- distribution: 'zulu'
- java-version: '21'
+ distribution: 'temurin'
+ java-version: '11'
cache: 'maven'
- name: Start broker
run: ci/start-broker.sh
diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml
index 2019fc656a..380db0602c 100644
--- a/.github/workflows/test-pr.yml
+++ b/.github/workflows/test-pr.yml
@@ -19,7 +19,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
- distribution: 'zulu'
+ distribution: 'temurin'
java-version: '21'
cache: 'maven'
- name: Start broker
diff --git a/.github/workflows/test-rabbitmq-alphas.yml b/.github/workflows/test-rabbitmq-alphas.yml
index 08b7896e3a..b707dc290a 100644
--- a/.github/workflows/test-rabbitmq-alphas.yml
+++ b/.github/workflows/test-rabbitmq-alphas.yml
@@ -25,7 +25,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
- distribution: 'zulu'
+ distribution: 'temurin'
java-version: '21'
cache: 'maven'
- name: Start broker
diff --git a/.github/workflows/test-supported-java-versions.yml b/.github/workflows/test-supported-java-versions.yml
index b17b6de1f9..60774e07b4 100644
--- a/.github/workflows/test-supported-java-versions.yml
+++ b/.github/workflows/test-supported-java-versions.yml
@@ -11,7 +11,7 @@ jobs:
strategy:
matrix:
distribution: [ 'temurin' ]
- version: [ '8', '11', '17', '21', '23', '24-ea' ]
+ version: [ '11', '17', '21', '23', '24-ea' ]
include:
- distribution: 'semeru'
version: '17'
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index db302fb4bd..523b05cbf9 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -23,7 +23,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
- distribution: 'zulu'
+ distribution: 'temurin'
java-version: '21'
cache: 'maven'
server-id: ossrh
diff --git a/README.adoc b/README.adoc
index 8f9daef7d6..3d514fd899 100644
--- a/README.adoc
+++ b/README.adoc
@@ -31,7 +31,7 @@ See the https://www.rabbitmq.com/client-libraries/java-versions[RabbitMQ Java li
=== Pre-requisites
-This library requires at least Java 8, but Java 11 or more is recommended.
+This library requires at least Java 11, but Java 21 or more is recommended.
=== Dependencies
@@ -66,7 +66,7 @@ Breaking changes between releases can happen but will be kept to a minimum.
== Build Instructions
-You need JDK 8 or later installed.
+You need JDK 11 or later installed.
To build the JAR file:
diff --git a/pom.xml b/pom.xml
index d9e26e1ac0..a86a80da1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -348,8 +348,7 @@
maven-compiler-plugin
${maven.compiler.plugin.version}
- 1.8
- 1.8
+ 11
-Xlint:deprecation
-Xlint:unchecked
@@ -678,16 +677,6 @@
-
- mockito-4-on-java-8
-
- 1.8
-
-
- 4.11.0
- -Xshare:off
-
-
jvm-test-arguments-below-java-21
@@ -707,31 +696,6 @@
-
-
-
- use-release-compiler-argument-on-java-9-or-more
-
- [9,)
-
-
-
-
- maven-compiler-plugin
- ${maven.compiler.plugin.version}
-
- 1.8
- 1.8
- 8
-
- -Xlint:deprecation
- -Xlint:unchecked
-
-
-
-
-
-
diff --git a/src/docs/asciidoc/building.adoc b/src/docs/asciidoc/building.adoc
index c5feff4f79..cf1935a308 100644
--- a/src/docs/asciidoc/building.adoc
+++ b/src/docs/asciidoc/building.adoc
@@ -1,6 +1,6 @@
== Building the Client
-You need JDK 1.8 or more installed.
+You need JDK 11 or more installed.
To build the JAR file:
diff --git a/src/docs/asciidoc/index.adoc b/src/docs/asciidoc/index.adoc
index a28aefcd91..d8f442e779 100644
--- a/src/docs/asciidoc/index.adoc
+++ b/src/docs/asciidoc/index.adoc
@@ -11,7 +11,7 @@ the https://rabbitmq.com/stream.html[RabbitMQ Stream Plugin].
It allows creating and deleting streams, as well as publishing to and consuming from
these streams. Learn more in the <>.
-This library requires at least Java 8, but Java 11 or more is recommended.
+This library requires at least Java 11, but Java 21 or more is recommended.
https://github.com/rabbitmq/rabbitmq-stream-perf-test[Stream PerfTest] is a performance testing tool based on this client library.
diff --git a/src/docs/asciidoc/overview.adoc b/src/docs/asciidoc/overview.adoc
index 3077bda287..4e1f736a02 100644
--- a/src/docs/asciidoc/overview.adoc
+++ b/src/docs/asciidoc/overview.adoc
@@ -107,4 +107,4 @@ _These SPI are susceptible to change, but this should have no impact on most app
== Pre-requisites
-This library requires at least Java 8, but Java 11 or more is recommended (CRC calculation uses methods available as of Java 9).
+This library requires at least Java 11, but Java 21 or more is recommended.
diff --git a/src/main/java/com/rabbitmq/stream/ByteCapacity.java b/src/main/java/com/rabbitmq/stream/ByteCapacity.java
index c9b3609204..d3460e3578 100644
--- a/src/main/java/com/rabbitmq/stream/ByteCapacity.java
+++ b/src/main/java/com/rabbitmq/stream/ByteCapacity.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -14,8 +14,6 @@
// info@rabbitmq.com.
package com.rabbitmq.stream;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
@@ -40,15 +38,11 @@ public class ByteCapacity implements Comparable {
private static final String UNIT_TB = "tb";
private static final Map> CONSTRUCTORS =
- Collections.unmodifiableMap(
- new HashMap>() {
- {
- put(UNIT_KB, (size, input) -> ByteCapacity.kB(size, input));
- put(UNIT_MB, (size, input) -> ByteCapacity.MB(size, input));
- put(UNIT_GB, (size, input) -> ByteCapacity.GB(size, input));
- put(UNIT_TB, (size, input) -> ByteCapacity.TB(size, input));
- }
- });
+ Map.of(
+ UNIT_KB, ByteCapacity::kB,
+ UNIT_MB, ByteCapacity::MB,
+ UNIT_GB, ByteCapacity::GB,
+ UNIT_TB, ByteCapacity::TB);
private final long bytes;
private final String input;
diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java
index 011b784d4d..00a5931bb3 100644
--- a/src/main/java/com/rabbitmq/stream/impl/Client.java
+++ b/src/main/java/com/rabbitmq/stream/impl/Client.java
@@ -442,7 +442,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
private static Map clientProperties(Map fromParameters) {
fromParameters = fromParameters == null ? Collections.emptyMap() : fromParameters;
- Map clientProperties = new HashMap<>(fromParameters);
+ Map clientProperties = new LinkedHashMap<>(fromParameters);
clientProperties.putAll(ClientProperties.DEFAULT_CLIENT_PROPERTIES);
return Collections.unmodifiableMap(clientProperties);
}
@@ -506,21 +506,22 @@ void authenticate(CredentialsProvider credentialsProvider) {
} else if (saslAuthenticateResponse.isChallenge()) {
challenge = saslAuthenticateResponse.challenge;
} else if (saslAuthenticateResponse.isAuthenticationFailure()) {
- String message =
- "Unexpected response code during authentication: "
- + formatConstant(saslAuthenticateResponse.getResponseCode());
+ StringBuilder message =
+ new StringBuilder(
+ "Unexpected response code during authentication: "
+ + formatConstant(saslAuthenticateResponse.getResponseCode()));
if (saslAuthenticateResponse.getResponseCode()
== RESPONSE_CODE_AUTHENTICATION_FAILURE_LOOPBACK) {
- message +=
- ". The user is not authorized to connect from a remote host. "
- + "If the broker is running locally, make sure the '"
- + this.host
- + "' hostname is resolved to "
- + "the loopback interface (localhost, 127.0.0.1, ::1). "
- + "See https://www.rabbitmq.com/access-control.html#loopback-users.";
+ message
+ .append(". The user is not authorized to connect from a remote host. ")
+ .append("If the broker is running locally, make sure the '")
+ .append(this.host)
+ .append("' hostname is resolved to ")
+ .append("the loopback interface (localhost, 127.0.0.1, ::1). ")
+ .append("See https://www.rabbitmq.com/access-control.html#loopback-users.");
}
throw new AuthenticationFailureException(
- message, saslAuthenticateResponse.getResponseCode());
+ message.toString(), saslAuthenticateResponse.getResponseCode());
} else {
throw new StreamException(
"Unexpected response code during authentication: "
@@ -2223,7 +2224,7 @@ static class StreamStatsResponse extends Response {
StreamStatsResponse(short responseCode, Map info) {
super(responseCode);
- this.info = Collections.unmodifiableMap(new HashMap<>(info));
+ this.info = Map.copyOf(info);
}
public Map getInfo() {
@@ -2248,7 +2249,7 @@ public StreamMetadata(String stream, short responseCode, Broker leader, List getReplicas() {
- return this.replicas.isEmpty() ? Collections.emptyList() : new ArrayList<>(this.replicas);
+ return this.replicas;
}
boolean hasReplicas() {
@@ -2561,7 +2563,7 @@ int port() {
}
Map clientProperties() {
- return Collections.unmodifiableMap(this.clientProperties);
+ return Map.copyOf(this.clientProperties);
}
Codec codec() {
diff --git a/src/main/java/com/rabbitmq/stream/impl/ClientProperties.java b/src/main/java/com/rabbitmq/stream/impl/ClientProperties.java
index 89af455b0d..c557074df4 100644
--- a/src/main/java/com/rabbitmq/stream/impl/ClientProperties.java
+++ b/src/main/java/com/rabbitmq/stream/impl/ClientProperties.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -15,10 +15,7 @@
package com.rabbitmq.stream.impl;
import java.io.InputStream;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +37,7 @@ public final class ClientProperties {
public static final Map DEFAULT_CLIENT_PROPERTIES =
Collections.unmodifiableMap(
- new HashMap() {
+ new LinkedHashMap<>() {
{
put("product", "RabbitMQ Stream");
put("version", ClientProperties.VERSION);
diff --git a/src/main/java/com/rabbitmq/stream/impl/JdkChunkChecksum.java b/src/main/java/com/rabbitmq/stream/impl/JdkChunkChecksum.java
index 57160496f9..d54461a976 100644
--- a/src/main/java/com/rabbitmq/stream/impl/JdkChunkChecksum.java
+++ b/src/main/java/com/rabbitmq/stream/impl/JdkChunkChecksum.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -16,54 +16,22 @@
import com.rabbitmq.stream.ChunkChecksum;
import com.rabbitmq.stream.ChunkChecksumValidationException;
-import com.rabbitmq.stream.StreamException;
import io.netty.buffer.ByteBuf;
-import io.netty.util.ByteProcessor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.nio.ByteBuffer;
import java.util.function.Supplier;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
class JdkChunkChecksum implements ChunkChecksum {
- static final ChunkChecksum CRC32_SINGLETON;
- private static final Logger LOGGER = LoggerFactory.getLogger(JdkChunkChecksum.class);
private static final Supplier CRC32_SUPPLIER = CRC32::new;
-
- static {
- if (isChecksumUpdateByteBufferAvailable()) {
- LOGGER.debug("Checksum#update(ByteBuffer) method available, using it for direct buffers");
- CRC32_SINGLETON = new ByteBufferDirectByteBufChecksum(CRC32_SUPPLIER);
- } else {
- LOGGER.debug(
- "Checksum#update(ByteBuffer) method not available, using byte-by-byte CRC calculation for direct buffers");
- CRC32_SINGLETON = new JdkChunkChecksum(CRC32_SUPPLIER);
- }
- }
+ static final ChunkChecksum CRC32_SINGLETON = new JdkChunkChecksum(CRC32_SUPPLIER);
private final Supplier checksumSupplier;
- JdkChunkChecksum() {
- this(CRC32_SUPPLIER);
- }
-
JdkChunkChecksum(Supplier checksumSupplier) {
this.checksumSupplier = checksumSupplier;
}
- private static boolean isChecksumUpdateByteBufferAvailable() {
- try {
- Checksum.class.getDeclaredMethod("update", ByteBuffer.class);
- return true;
- } catch (Exception e) {
- return false;
- }
- }
-
@Override
public void checksum(ByteBuf byteBuf, long dataLength, long expected) {
Checksum checksum = checksumSupplier.get();
@@ -71,62 +39,10 @@ public void checksum(ByteBuf byteBuf, long dataLength, long expected) {
checksum.update(
byteBuf.array(), byteBuf.arrayOffset() + byteBuf.readerIndex(), byteBuf.readableBytes());
} else {
- byteBuf.forEachByte(
- byteBuf.readerIndex(), byteBuf.readableBytes(), new UpdateProcessor(checksum));
+ checksum.update(byteBuf.nioBuffer(byteBuf.readerIndex(), byteBuf.readableBytes()));
}
if (expected != checksum.getValue()) {
throw new ChunkChecksumValidationException(expected, checksum.getValue());
}
}
-
- private static final class ByteBufferDirectByteBufChecksum implements ChunkChecksum {
-
- private final Supplier checksumSupplier;
- private final Method updateMethod;
-
- private ByteBufferDirectByteBufChecksum(Supplier checksumSupplier) {
- this.checksumSupplier = checksumSupplier;
- try {
- this.updateMethod = Checksum.class.getDeclaredMethod("update", ByteBuffer.class);
- } catch (NoSuchMethodException e) {
- throw new StreamException("Error while looking up Checksum#update(ByteBuffer) method", e);
- }
- }
-
- @Override
- public void checksum(ByteBuf byteBuf, long dataLength, long expected) {
- Checksum checksum = checksumSupplier.get();
- if (byteBuf.hasArray()) {
- checksum.update(
- byteBuf.array(),
- byteBuf.arrayOffset() + byteBuf.readerIndex(),
- byteBuf.readableBytes());
- } else {
- try {
- this.updateMethod.invoke(
- checksum, byteBuf.nioBuffer(byteBuf.readerIndex(), byteBuf.readableBytes()));
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new StreamException("Error while calculating CRC", e);
- }
- }
- if (expected != checksum.getValue()) {
- throw new ChunkChecksumValidationException(expected, checksum.getValue());
- }
- }
- }
-
- private static class UpdateProcessor implements ByteProcessor {
-
- private final Checksum checksum;
-
- private UpdateProcessor(Checksum checksum) {
- this.checksum = checksum;
- }
-
- @Override
- public boolean process(byte value) {
- checksum.update(value);
- return true;
- }
- }
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java
index 0cd9038209..bf9dc9d60e 100644
--- a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java
+++ b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java
@@ -233,7 +233,7 @@ List findCandidateNodes(String stream, boolean forceLeader) {
LOGGER.debug("Candidates to publish to {}: {}", stream, candidates);
- return Collections.unmodifiableList(candidates);
+ return List.copyOf(candidates);
}
static Broker pickBroker(List candidates) {
diff --git a/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java b/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java
index c646803eae..306b1c1dfd 100644
--- a/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java
+++ b/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -771,10 +771,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
}
OutstandingRequest