Skip to content

Commit 2d7a4ed

Browse files
authored
KAFKA-12709; Add Admin API for ListTransactions (#10616)
This patch adds `Admin` support for the `listTransactions` API, which was added by [KIP-664](https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions). Similar to `listConsumerGroups`, the new `listTransactions` API is intended to be sent to all brokers. Reviewers: David Jacot <[email protected]>
1 parent 21532a7 commit 2d7a4ed

29 files changed

+1929
-293
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionOptions.java

+8
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,12 @@
2020

2121
@InterfaceStability.Evolving
2222
public class AbortTransactionOptions extends AbstractOptions<AbortTransactionOptions> {
23+
24+
@Override
25+
public String toString() {
26+
return "AbortTransactionOptions(" +
27+
"timeoutMs=" + timeoutMs +
28+
')';
29+
}
30+
2331
}

clients/src/main/java/org/apache/kafka/clients/admin/Admin.java

+22
Original file line numberDiff line numberDiff line change
@@ -1533,6 +1533,28 @@ default AbortTransactionResult abortTransaction(AbortTransactionSpec spec) {
15331533
*/
15341534
AbortTransactionResult abortTransaction(AbortTransactionSpec spec, AbortTransactionOptions options);
15351535

1536+
/**
1537+
* List active transactions in the cluster. See
1538+
* {@link #listTransactions(ListTransactionsOptions)} for more details.
1539+
*
1540+
* @return The result
1541+
*/
1542+
default ListTransactionsResult listTransactions() {
1543+
return listTransactions(new ListTransactionsOptions());
1544+
}
1545+
1546+
/**
1547+
* List active transactions in the cluster. This will query all potential transaction
1548+
* coordinators in the cluster and collect the state of all transactions. Users
1549+
* should typically attempt to reduce the size of the result set using
1550+
* {@link ListTransactionsOptions#filterProducerIds(Collection)} or
1551+
* {@link ListTransactionsOptions#filterStates(Collection)}
1552+
*
1553+
* @param options Options to control the method behavior (including filters)
1554+
* @return The result
1555+
*/
1556+
ListTransactionsResult listTransactions(ListTransactionsOptions options);
1557+
15361558
/**
15371559
* Get the metrics kept by the adminClient
15381560
*/

clients/src/main/java/org/apache/kafka/clients/admin/DescribeProducersResult.java

+7
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ public PartitionProducerState(List<ProducerState> activeProducers) {
7171
public List<ProducerState> activeProducers() {
7272
return activeProducers;
7373
}
74+
75+
@Override
76+
public String toString() {
77+
return "PartitionProducerState(" +
78+
"activeProducers=" + activeProducers +
79+
')';
80+
}
7481
}
7582

7683
}

clients/src/main/java/org/apache/kafka/clients/admin/DescribeTransactionsOptions.java

+7
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,11 @@
2828
@InterfaceStability.Evolving
2929
public class DescribeTransactionsOptions extends AbstractOptions<DescribeTransactionsOptions> {
3030

31+
@Override
32+
public String toString() {
33+
return "DescribeTransactionsOptions(" +
34+
"timeoutMs=" + timeoutMs +
35+
')';
36+
}
37+
3138
}

clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

+31-18
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,14 @@
3434
import org.apache.kafka.clients.admin.internals.AbortTransactionHandler;
3535
import org.apache.kafka.clients.admin.internals.AdminApiDriver;
3636
import org.apache.kafka.clients.admin.internals.AdminApiHandler;
37+
import org.apache.kafka.clients.admin.internals.AdminApiFuture;
3738
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
39+
import org.apache.kafka.clients.admin.internals.AllBrokersStrategy;
3840
import org.apache.kafka.clients.admin.internals.ConsumerGroupOperationContext;
41+
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
3942
import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
4043
import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler;
44+
import org.apache.kafka.clients.admin.internals.ListTransactionsHandler;
4145
import org.apache.kafka.clients.admin.internals.MetadataOperationContext;
4246
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
4347
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -4729,48 +4733,57 @@ void handleFailure(Throwable throwable) {
47294733

47304734
@Override
47314735
public DescribeProducersResult describeProducers(Collection<TopicPartition> topicPartitions, DescribeProducersOptions options) {
4732-
DescribeProducersHandler handler = new DescribeProducersHandler(
4733-
new HashSet<>(topicPartitions),
4734-
options,
4735-
logContext
4736-
);
4737-
return new DescribeProducersResult(invokeDriver(handler, options.timeoutMs));
4736+
AdminApiFuture.SimpleAdminApiFuture<TopicPartition, DescribeProducersResult.PartitionProducerState> future =
4737+
DescribeProducersHandler.newFuture(topicPartitions);
4738+
DescribeProducersHandler handler = new DescribeProducersHandler(options, logContext);
4739+
invokeDriver(handler, future, options.timeoutMs);
4740+
return new DescribeProducersResult(future.all());
47384741
}
47394742

47404743
@Override
47414744
public DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds, DescribeTransactionsOptions options) {
4742-
DescribeTransactionsHandler handler = new DescribeTransactionsHandler(
4743-
transactionalIds,
4744-
logContext
4745-
);
4746-
return new DescribeTransactionsResult(invokeDriver(handler, options.timeoutMs));
4745+
AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, TransactionDescription> future =
4746+
DescribeTransactionsHandler.newFuture(transactionalIds);
4747+
DescribeTransactionsHandler handler = new DescribeTransactionsHandler(logContext);
4748+
invokeDriver(handler, future, options.timeoutMs);
4749+
return new DescribeTransactionsResult(future.all());
47474750
}
47484751

47494752
@Override
47504753
public AbortTransactionResult abortTransaction(AbortTransactionSpec spec, AbortTransactionOptions options) {
4751-
AbortTransactionHandler handler = new AbortTransactionHandler(
4752-
spec,
4753-
logContext
4754-
);
4755-
return new AbortTransactionResult(invokeDriver(handler, options.timeoutMs));
4754+
AdminApiFuture.SimpleAdminApiFuture<TopicPartition, Void> future =
4755+
AbortTransactionHandler.newFuture(Collections.singleton(spec.topicPartition()));
4756+
AbortTransactionHandler handler = new AbortTransactionHandler(spec, logContext);
4757+
invokeDriver(handler, future, options.timeoutMs);
4758+
return new AbortTransactionResult(future.all());
4759+
}
4760+
4761+
@Override
4762+
public ListTransactionsResult listTransactions(ListTransactionsOptions options) {
4763+
AllBrokersStrategy.AllBrokersFuture<Collection<TransactionListing>> future =
4764+
ListTransactionsHandler.newFuture();
4765+
ListTransactionsHandler handler = new ListTransactionsHandler(options, logContext);
4766+
invokeDriver(handler, future, options.timeoutMs);
4767+
return new ListTransactionsResult(future.all());
47564768
}
47574769

4758-
private <K, V> Map<K, KafkaFutureImpl<V>> invokeDriver(
4770+
private <K, V> void invokeDriver(
47594771
AdminApiHandler<K, V> handler,
4772+
AdminApiFuture<K, V> future,
47604773
Integer timeoutMs
47614774
) {
47624775
long currentTimeMs = time.milliseconds();
47634776
long deadlineMs = calcDeadlineMs(currentTimeMs, timeoutMs);
47644777

47654778
AdminApiDriver<K, V> driver = new AdminApiDriver<>(
47664779
handler,
4780+
future,
47674781
deadlineMs,
47684782
retryBackoffMs,
47694783
logContext
47704784
);
47714785

47724786
maybeSendRequests(driver, currentTimeMs);
4773-
return driver.futures();
47744787
}
47754788

47764789
private <K, V> void maybeSendRequests(AdminApiDriver<K, V> driver, long currentTimeMs) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.clients.admin;
19+
20+
import org.apache.kafka.common.annotation.InterfaceStability;
21+
22+
import java.util.Collection;
23+
import java.util.Collections;
24+
import java.util.HashSet;
25+
import java.util.Set;
26+
27+
/**
28+
* Options for {@link Admin#listTransactions()}.
29+
*
30+
* The API of this class is evolving, see {@link Admin} for details.
31+
*/
32+
@InterfaceStability.Evolving
33+
public class ListTransactionsOptions extends AbstractOptions<ListTransactionsOptions> {
34+
private Set<TransactionState> filteredStates = Collections.emptySet();
35+
private Set<Long> filteredProducerIds = Collections.emptySet();
36+
37+
/**
38+
* Filter only the transactions that are in a specific set of states. If no filter
39+
* is specified or if the passed set of states is empty, then transactions in all
40+
* states will be returned.
41+
*
42+
* @param states the set of states to filter by
43+
* @return this object
44+
*/
45+
public ListTransactionsOptions filterStates(Collection<TransactionState> states) {
46+
this.filteredStates = new HashSet<>(states);
47+
return this;
48+
}
49+
50+
/**
51+
* Filter only the transactions from producers in a specific set of producerIds.
52+
* If no filter is specified or if the passed collection of producerIds is empty,
53+
* then the transactions of all producerIds will be returned.
54+
*
55+
* @param producerIdFilters the set of producerIds to filter by
56+
* @return this object
57+
*/
58+
public ListTransactionsOptions filterProducerIds(Collection<Long> producerIdFilters) {
59+
this.filteredProducerIds = new HashSet<>(producerIdFilters);
60+
return this;
61+
}
62+
63+
/**
64+
* Returns the set of states to be filtered or empty if no states have been specified.
65+
*
66+
* @return the current set of filtered states (empty means that no states are filtered and all
67+
* all transactions will be returned)
68+
*/
69+
public Set<TransactionState> filteredStates() {
70+
return filteredStates;
71+
}
72+
73+
/**
74+
* Returns the set of producerIds that are being filtered or empty if none have been specified.
75+
*
76+
* @return the current set of filtered states (empty means that no producerIds are filtered and
77+
* all transactions will be returned)
78+
*/
79+
public Set<Long> filteredProducerIds() {
80+
return filteredProducerIds;
81+
}
82+
83+
@Override
84+
public String toString() {
85+
return "ListTransactionsOptions(" +
86+
"filteredStates=" + filteredStates +
87+
", filteredProducerIds=" + filteredProducerIds +
88+
", timeoutMs=" + timeoutMs +
89+
')';
90+
}
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.admin;
18+
19+
import org.apache.kafka.common.KafkaFuture;
20+
import org.apache.kafka.common.annotation.InterfaceStability;
21+
import org.apache.kafka.common.internals.KafkaFutureImpl;
22+
23+
import java.util.ArrayList;
24+
import java.util.Collection;
25+
import java.util.HashMap;
26+
import java.util.HashSet;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.Set;
30+
31+
/**
32+
* The result of the {@link Admin#listTransactions()} call.
33+
* <p>
34+
* The API of this class is evolving, see {@link Admin} for details.
35+
*/
36+
@InterfaceStability.Evolving
37+
public class ListTransactionsResult {
38+
private final KafkaFutureImpl<Map<Integer, KafkaFutureImpl<Collection<TransactionListing>>>> future;
39+
40+
ListTransactionsResult(KafkaFutureImpl<Map<Integer, KafkaFutureImpl<Collection<TransactionListing>>>> future) {
41+
this.future = future;
42+
}
43+
44+
/**
45+
* Get all transaction listings. If any of the underlying requests fail, then the future
46+
* returned from this method will also fail with the first encountered error.
47+
*
48+
* @return A future containing the collection of transaction listings. The future completes
49+
* when all transaction listings are available and fails after any non-retriable error.
50+
*/
51+
public KafkaFuture<Collection<TransactionListing>> all() {
52+
return allByBrokerId().thenApply(map -> {
53+
List<TransactionListing> allListings = new ArrayList<>();
54+
for (Collection<TransactionListing> listings : map.values()) {
55+
allListings.addAll(listings);
56+
}
57+
return allListings;
58+
});
59+
}
60+
61+
/**
62+
* Get a future which returns a map containing the underlying listing future for each broker
63+
* in the cluster. This is useful, for example, if a partial listing of transactions is
64+
* sufficient, or if you want more granular error details.
65+
*
66+
* @return A future containing a map of futures by broker which complete individually when
67+
* their respective transaction listings are available. The top-level future returned
68+
* from this method may fail if the admin client is unable to lookup the available
69+
* brokers in the cluster.
70+
*/
71+
public KafkaFuture<Map<Integer, KafkaFuture<Collection<TransactionListing>>>> byBrokerId() {
72+
KafkaFutureImpl<Map<Integer, KafkaFuture<Collection<TransactionListing>>>> result = new KafkaFutureImpl<>();
73+
future.whenComplete((brokerFutures, exception) -> {
74+
if (brokerFutures != null) {
75+
Map<Integer, KafkaFuture<Collection<TransactionListing>>> brokerFuturesCopy =
76+
new HashMap<>(brokerFutures.size());
77+
brokerFuturesCopy.putAll(brokerFutures);
78+
result.complete(brokerFuturesCopy);
79+
} else {
80+
result.completeExceptionally(exception);
81+
}
82+
});
83+
return result;
84+
}
85+
86+
/**
87+
* Get all transaction listings in a map which is keyed by the ID of respective broker
88+
* that is currently managing them. If any of the underlying requests fail, then the future
89+
* returned from this method will also fail with the first encountered error.
90+
*
91+
* @return A future containing a map from the broker ID to the transactions hosted by that
92+
* broker respectively. This future completes when all transaction listings are
93+
* available and fails after any non-retriable error.
94+
*/
95+
public KafkaFuture<Map<Integer, Collection<TransactionListing>>> allByBrokerId() {
96+
KafkaFutureImpl<Map<Integer, Collection<TransactionListing>>> allFuture = new KafkaFutureImpl<>();
97+
Map<Integer, Collection<TransactionListing>> allListingsMap = new HashMap<>();
98+
99+
future.whenComplete((map, topLevelException) -> {
100+
if (topLevelException != null) {
101+
allFuture.completeExceptionally(topLevelException);
102+
return;
103+
}
104+
105+
Set<Integer> remainingResponses = new HashSet<>(map.keySet());
106+
map.forEach((brokerId, future) -> {
107+
future.whenComplete((listings, brokerException) -> {
108+
if (brokerException != null) {
109+
allFuture.completeExceptionally(brokerException);
110+
} else if (!allFuture.isDone()) {
111+
allListingsMap.put(brokerId, listings);
112+
remainingResponses.remove(brokerId);
113+
114+
if (remainingResponses.isEmpty()) {
115+
allFuture.complete(allListingsMap);
116+
}
117+
}
118+
});
119+
});
120+
});
121+
122+
return allFuture;
123+
}
124+
125+
}

0 commit comments

Comments
 (0)