Skip to content

Commit a0871a1

Browse files
emasabedenhillmilindl
authored
KIP-320 : Allow fetchers to detect and handle log truncation (#4162)
Co-authored-by: Magnus Edenhill <[email protected]> Co-authored-by: Milind L <[email protected]>
1 parent ac35618 commit a0871a1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+3342
-992
lines changed

Diff for: CHANGELOG.md

+17-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
# librdkafka v2.0.3
1+
# librdkafka v2.1.0
22

3-
librdkafka v2.0.3 is a bugfix release:
3+
librdkafka v2.1.0 is a feature release:
44

5+
* [KIP-320](https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation)
6+
Allow fetchers to detect and handle log truncation (#4122).
57
* Fix a reference count issue blocking the consumer from closing (#4187).
68
* Fix a protocol issue with ListGroups API, where an extra
7-
field was appended for API Versions greater than or equal to 3.
9+
field was appended for API Versions greater than or equal to 3 (#4207).
810
* Fix an issue with `max.poll.interval.ms`, where polling any queue would cause
911
the timeout to be reset (#4176).
1012
* Fix seek partition timeout, was one thousand times lower than the passed
@@ -15,6 +17,18 @@ librdkafka v2.0.3 is a bugfix release:
1517
* Upgrade OpenSSL to v3.0.8 with various security fixes,
1618
check the [release notes](https://www.openssl.org/news/cl30.txt) (#4215).
1719

20+
## Enhancements
21+
22+
* Added `rd_kafka_topic_partition_get_leader_epoch()` (and `set..()`).
23+
* Added partition leader epoch APIs:
24+
- `rd_kafka_topic_partition_get_leader_epoch()` (and `set..()`)
25+
- `rd_kafka_message_leader_epoch()`
26+
- `rd_kafka_*assign()` and `rd_kafka_seek_partitions()` now supports
27+
partitions with a leader epoch set.
28+
- `rd_kafka_offsets_for_times()` will return per-partition leader-epochs.
29+
- `leader_epoch`, `stored_leader_epoch`, and `committed_leader_epoch`
30+
added to per-partition statistics.
31+
1832

1933
## Fixes
2034

Diff for: INTRODUCTION.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -1887,7 +1887,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
18871887
| KIP-289 - Consumer group.id default to NULL | 2.2.0 | Supported |
18881888
| KIP-294 - SSL endpoint verification | 2.0.0 | Supported |
18891889
| KIP-302 - Use all addresses for resolved broker hostname | 2.1.0 | Supported |
1890-
| KIP-320 - Consumer: handle log truncation | 2.1.0, 2.2.0 | Not supported |
1890+
| KIP-320 - Consumer: handle log truncation | 2.1.0, 2.2.0 | Supported |
18911891
| KIP-322 - DeleteTopics disabled error code | 2.1.0 | Supported |
18921892
| KIP-339 - AdminAPI: incrementalAlterConfigs | 2.3.0 | Not supported |
18931893
| KIP-341 - Update Sticky partition assignment data | 2.3.0 | Not supported (superceeded by KIP-429) |
@@ -1953,7 +1953,7 @@ release of librdkafka.
19531953
| 0 | Produce | 9 | 7 |
19541954
| 1 | Fetch | 13 | 11 |
19551955
| 2 | ListOffsets | 7 | 2 |
1956-
| 3 | Metadata | 12 | 4 |
1956+
| 3 | Metadata | 12 | 9 |
19571957
| 8 | OffsetCommit | 8 | 7 |
19581958
| 9 | OffsetFetch | 8 | 7 |
19591959
| 10 | FindCoordinator | 4 | 2 |

Diff for: STATISTICS.md

+3
Original file line numberDiff line numberDiff line change
@@ -179,13 +179,16 @@ query_offset | int gauge | | Current/Last logical offset query
179179
next_offset | int gauge | | Next offset to fetch
180180
app_offset | int gauge | | Offset of last message passed to application + 1
181181
stored_offset | int gauge | | Offset to be committed
182+
stored_leader_epoch | int | | Partition leader epoch of stored offset
182183
committed_offset | int gauge | | Last committed offset
184+
committed_leader_epoch | int | | Partition leader epoch of committed offset
183185
eof_offset | int gauge | | Last PARTITION_EOF signaled offset
184186
lo_offset | int gauge | | Partition's low watermark offset on broker
185187
hi_offset | int gauge | | Partition's high watermark offset on broker
186188
ls_offset | int gauge | | Partition's last stable offset on broker, or same as hi_offset is broker version is less than 0.11.0.0.
187189
consumer_lag | int gauge | | Difference between (hi_offset or ls_offset) and committed_offset). hi_offset is used when isolation.level=read_uncommitted, otherwise ls_offset.
188190
consumer_lag_stored | int gauge | | Difference between (hi_offset or ls_offset) and stored_offset. See consumer_lag and stored_offset.
191+
leader_epoch | int | | Last known partition leader epoch, or -1 if unknown.
189192
txmsgs | int | | Total number of messages transmitted (produced)
190193
txbytes | int | | Total number of bytes transmitted for txmsgs
191194
rxmsgs | int | | Total number of messages consumed, not including ignored messages (due to offset, etc).

Diff for: configure.self

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ mkl_toggle_option "Development" ENABLE_VALGRIND "--enable-valgrind" "Enable in-c
3434

3535
mkl_toggle_option "Development" ENABLE_REFCNT_DEBUG "--enable-refcnt-debug" "Enable refcnt debugging" "n"
3636

37-
mkl_toggle_option "Feature" ENABLE_LZ4_EXT "--enable-lz4-ext" "Enable external LZ4 library support (builtin version 1.9.2)" "y"
37+
mkl_toggle_option "Feature" ENABLE_LZ4_EXT "--enable-lz4-ext" "Enable external LZ4 library support (builtin version 1.9.3)" "y"
3838
mkl_toggle_option "Feature" ENABLE_LZ4_EXT "--enable-lz4" "Deprecated: alias for --enable-lz4-ext" "y"
3939

4040
mkl_toggle_option "Feature" ENABLE_REGEX_EXT "--enable-regex-ext" "Enable external (libc) regex (else use builtin)" "y"

Diff for: examples/consumer.c

+3-2
Original file line numberDiff line numberDiff line change
@@ -225,9 +225,10 @@ int main(int argc, char **argv) {
225225
}
226226

227227
/* Proper message. */
228-
printf("Message on %s [%" PRId32 "] at offset %" PRId64 ":\n",
228+
printf("Message on %s [%" PRId32 "] at offset %" PRId64
229+
" (leader epoch %" PRId32 "):\n",
229230
rd_kafka_topic_name(rkm->rkt), rkm->partition,
230-
rkm->offset);
231+
rkm->offset, rd_kafka_message_leader_epoch(rkm));
231232

232233
/* Print the message key. */
233234
if (rkm->key && is_printable(rkm->key, rkm->key_len))

Diff for: src-cpp/HandleImpl.cpp

+5-2
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,8 @@ rd_kafka_topic_partition_list_t *partitions_to_c_parts(
391391
rd_kafka_topic_partition_t *rktpar = rd_kafka_topic_partition_list_add(
392392
c_parts, tpi->topic_.c_str(), tpi->partition_);
393393
rktpar->offset = tpi->offset_;
394+
if (tpi->leader_epoch_ != -1)
395+
rd_kafka_topic_partition_set_leader_epoch(rktpar, tpi->leader_epoch_);
394396
}
395397

396398
return c_parts;
@@ -412,8 +414,9 @@ void update_partitions_from_c_parts(
412414
dynamic_cast<RdKafka::TopicPartitionImpl *>(partitions[j]);
413415
if (!strcmp(p->topic, pp->topic_.c_str()) &&
414416
p->partition == pp->partition_) {
415-
pp->offset_ = p->offset;
416-
pp->err_ = static_cast<RdKafka::ErrorCode>(p->err);
417+
pp->offset_ = p->offset;
418+
pp->err_ = static_cast<RdKafka::ErrorCode>(p->err);
419+
pp->leader_epoch_ = rd_kafka_topic_partition_get_leader_epoch(p);
417420
}
418421
}
419422
}

Diff for: src-cpp/rdkafkacpp.h

+42-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ namespace RdKafka {
111111
* @remark This value should only be used during compile time,
112112
* for runtime checks of version use RdKafka::version()
113113
*/
114-
#define RD_KAFKA_VERSION 0x020002ff
114+
#define RD_KAFKA_VERSION 0x020100ff
115115

116116
/**
117117
* @brief Returns the librdkafka version as integer.
@@ -324,6 +324,8 @@ enum ErrorCode {
324324
ERR__NOOP = -141,
325325
/** No offset to automatically reset to */
326326
ERR__AUTO_OFFSET_RESET = -140,
327+
/** Partition log truncation detected */
328+
ERR__LOG_TRUNCATION = -139,
327329

328330
/** End internal error codes */
329331
ERR__END = -100,
@@ -1978,6 +1980,12 @@ class RD_EXPORT TopicPartition {
19781980

19791981
/** @returns error code (if applicable) */
19801982
virtual ErrorCode err() const = 0;
1983+
1984+
/** @brief Get partition leader epoch, or -1 if not known or relevant. */
1985+
virtual int32_t get_leader_epoch() = 0;
1986+
1987+
/** @brief Set partition leader epoch. */
1988+
virtual void set_leader_epoch(int32_t leader_epoch) = 0;
19811989
};
19821990

19831991

@@ -2035,6 +2043,11 @@ class RD_EXPORT Topic {
20352043
* The offset will be committed (written) to the broker (or file) according
20362044
* to \p auto.commit.interval.ms or next manual offset-less commit call.
20372045
*
2046+
* @deprecated This API lacks support for partition leader epochs, which makes
2047+
* it at risk for unclean leader election log truncation issues.
2048+
* Use KafkaConsumer::offsets_store() or
2049+
* Message::offset_store() instead.
2050+
*
20382051
* @remark \c enable.auto.offset.store must be set to \c false when using
20392052
* this API.
20402053
*
@@ -2465,6 +2478,31 @@ class RD_EXPORT Message {
24652478
/** @returns the broker id of the broker the message was produced to or
24662479
* fetched from, or -1 if not known/applicable. */
24672480
virtual int32_t broker_id() const = 0;
2481+
2482+
/** @returns the message's partition leader epoch at the time the message was
2483+
* fetched and if known, else -1. */
2484+
virtual int32_t leader_epoch() const = 0;
2485+
2486+
/**
2487+
* @brief Store offset +1 for the consumed message.
2488+
*
2489+
* The message offset + 1 will be committed to broker according
2490+
* to \c `auto.commit.interval.ms` or manual offset-less commit()
2491+
*
2492+
* @warning This method may only be called for partitions that are currently
2493+
* assigned.
2494+
* Non-assigned partitions will fail with ERR__STATE.
2495+
*
2496+
* @warning Avoid storing offsets after calling seek() (et.al) as
2497+
* this may later interfere with resuming a paused partition, instead
2498+
* store offsets prior to calling seek.
2499+
*
2500+
* @remark \c `enable.auto.offset.store` must be set to "false" when using
2501+
* this API.
2502+
*
2503+
* @returns NULL on success or an error object on failure.
2504+
*/
2505+
virtual Error *offset_store() = 0;
24682506
};
24692507

24702508
/**@}*/
@@ -2865,6 +2903,9 @@ class RD_EXPORT KafkaConsumer : public virtual Handle {
28652903
* @remark \c enable.auto.offset.store must be set to \c false when using
28662904
* this API.
28672905
*
2906+
* @remark The leader epoch, if set, will be used to fence outdated partition
2907+
* leaders. See TopicPartition::set_leader_epoch().
2908+
*
28682909
* @returns RdKafka::ERR_NO_ERROR on success, or
28692910
* RdKafka::ERR___UNKNOWN_PARTITION if none of the offsets could
28702911
* be stored, or

Diff for: src-cpp/rdkafkacpp_int.h

+33-6
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,21 @@ class MessageImpl : public Message {
540540
return rd_kafka_message_broker_id(rkmessage_);
541541
}
542542

543+
int32_t leader_epoch() const {
544+
return rd_kafka_message_leader_epoch(rkmessage_);
545+
}
546+
547+
548+
Error *offset_store() {
549+
rd_kafka_error_t *c_error;
550+
551+
c_error = rd_kafka_offset_store_message(rkmessage_);
552+
553+
if (c_error)
554+
return new ErrorImpl(c_error);
555+
else
556+
return NULL;
557+
}
543558

544559
RdKafka::Topic *topic_;
545560
rd_kafka_message_t *rkmessage_;
@@ -1227,21 +1242,24 @@ class TopicPartitionImpl : public TopicPartition {
12271242
topic_(topic),
12281243
partition_(partition),
12291244
offset_(RdKafka::Topic::OFFSET_INVALID),
1230-
err_(ERR_NO_ERROR) {
1245+
err_(ERR_NO_ERROR),
1246+
leader_epoch_(-1) {
12311247
}
12321248

12331249
TopicPartitionImpl(const std::string &topic, int partition, int64_t offset) :
12341250
topic_(topic),
12351251
partition_(partition),
12361252
offset_(offset),
1237-
err_(ERR_NO_ERROR) {
1253+
err_(ERR_NO_ERROR),
1254+
leader_epoch_(-1) {
12381255
}
12391256

12401257
TopicPartitionImpl(const rd_kafka_topic_partition_t *c_part) {
1241-
topic_ = std::string(c_part->topic);
1242-
partition_ = c_part->partition;
1243-
offset_ = c_part->offset;
1244-
err_ = static_cast<ErrorCode>(c_part->err);
1258+
topic_ = std::string(c_part->topic);
1259+
partition_ = c_part->partition;
1260+
offset_ = c_part->offset;
1261+
err_ = static_cast<ErrorCode>(c_part->err);
1262+
leader_epoch_ = rd_kafka_topic_partition_get_leader_epoch(c_part);
12451263
// FIXME: metadata
12461264
}
12471265

@@ -1266,6 +1284,14 @@ class TopicPartitionImpl : public TopicPartition {
12661284
offset_ = offset;
12671285
}
12681286

1287+
int32_t get_leader_epoch() {
1288+
return leader_epoch_;
1289+
}
1290+
1291+
void set_leader_epoch(int32_t leader_epoch) {
1292+
leader_epoch_ = leader_epoch_;
1293+
}
1294+
12691295
std::ostream &operator<<(std::ostream &ostrm) const {
12701296
return ostrm << topic_ << " [" << partition_ << "]";
12711297
}
@@ -1274,6 +1300,7 @@ class TopicPartitionImpl : public TopicPartition {
12741300
int partition_;
12751301
int64_t offset_;
12761302
ErrorCode err_;
1303+
int32_t leader_epoch_;
12771304
};
12781305

12791306

0 commit comments

Comments
 (0)