Skip to content

Commit f799bb8

Browse files
committed
Message format v2 support, first draft
1 parent c931712 commit f799bb8

File tree

7 files changed

+998
-182
lines changed

7 files changed

+998
-182
lines changed

src/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
3939
rdlog.c rdstring.c rdkafka_event.c rdkafka_metadata.c \
4040
rdregex.c rdports.c rdkafka_metadata_cache.c rdavl.c \
4141
rdkafka_sasl.c rdkafka_sasl_plain.c rdkafka_interceptor.c \
42+
rdkafka_msgfmt.c rdvarint.c \
4243
$(SRCS_y)
4344

4445
HDRS= rdkafka.h

src/rdkafka_broker.c

Lines changed: 26 additions & 182 deletions
Original file line numberDiff line numberDiff line change
@@ -2690,192 +2690,36 @@ static int rd_kafka_compress_MessageSet_buf (rd_kafka_broker_t *rkb,
26902690
* Produce messages from 'rktp's queue.
26912691
*/
26922692
static int rd_kafka_broker_produce_toppar (rd_kafka_broker_t *rkb,
2693-
rd_kafka_toppar_t *rktp) {
2694-
int cnt;
2695-
rd_kafka_msg_t *rkm;
2696-
int msgcnt = 0, msgcntmax;
2697-
rd_kafka_buf_t *rkbuf;
2698-
rd_kafka_itopic_t *rkt = rktp->rktp_rkt;
2699-
int iovcnt;
2700-
size_t iov_firstmsg;
2701-
size_t of_firstmsg;
2702-
size_t of_init_firstmsg;
2703-
size_t of_MessageSetSize;
2704-
int32_t MessageSetSize = 0;
2705-
int outlen;
2706-
int MsgVersion = 0;
2707-
int use_relative_offsets = 0;
2708-
int64_t timestamp_firstmsg = 0;
2709-
int queued_cnt;
2710-
size_t queued_bytes;
2711-
size_t buffer_space;
2712-
rd_ts_t now;
2713-
2714-
queued_cnt = rd_kafka_msgq_len(&rktp->rktp_xmit_msgq);
2715-
if (queued_cnt == 0)
2716-
return 0;
2717-
2718-
queued_bytes = rd_kafka_msgq_size(&rktp->rktp_xmit_msgq);
2719-
2720-
now = rd_clock();
2721-
2722-
if (rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER1) {
2723-
MsgVersion = 1;
2724-
if (rktp->rktp_rkt->rkt_conf.compression_codec)
2725-
use_relative_offsets = 1;
2726-
}
2727-
2728-
/* iovs:
2729-
* 1 * (RequiredAcks + Timeout + Topic + Partition + MessageSetSize)
2730-
* msgcntmax * messagehdr
2731-
* msgcntmax * Value_len
2732-
* msgcntmax * messagepayload (ext memory)
2733-
* = 1 + (4 * msgcntmax)
2734-
*
2735-
* We are bound by configuration.
2736-
*/
2737-
msgcntmax = RD_MIN(queued_cnt, rkb->rkb_rk->rk_conf.batch_num_messages);
2738-
rd_kafka_assert(rkb->rkb_rk, msgcntmax > 0);
2739-
iovcnt = 1 + (3 * msgcntmax);
2740-
2741-
/* Allocate iovecs to hold all headers and messages,
2742-
* and allocate enough to allow copies of small messages.
2743-
* The allocated size is the minimum of message.max.bytes
2744-
* or queued_bytes + queued_cnt * per_msg_overhead */
2745-
2746-
buffer_space =
2747-
/* RequiredAcks + Timeout + TopicCnt */
2748-
2 + 4 + 4 +
2749-
/* Topic */
2750-
RD_KAFKAP_STR_SIZE(rkt->rkt_topic) +
2751-
/* PartitionCnt + Partition + MessageSetSize */
2752-
4 + 4 + 4;
2753-
2754-
if (rkb->rkb_rk->rk_conf.msg_copy_max_size > 0)
2755-
buffer_space += queued_bytes +
2756-
msgcntmax * RD_KAFKAP_MESSAGE_OVERHEAD;
2757-
else
2758-
buffer_space += msgcntmax * RD_KAFKAP_MESSAGE_OVERHEAD;
2759-
2760-
2761-
rkbuf = rd_kafka_buf_new(rkb->rkb_rk, RD_KAFKAP_Produce, iovcnt,
2762-
RD_MIN((size_t)rkb->rkb_rk->rk_conf.
2763-
max_msg_size,
2764-
buffer_space));
2765-
2766-
/*
2767-
* Insert first part of Produce header
2768-
*/
2769-
/* RequiredAcks */
2770-
rd_kafka_buf_write_i16(rkbuf, rkt->rkt_conf.required_acks);
2771-
/* Timeout */
2772-
rd_kafka_buf_write_i32(rkbuf, rkt->rkt_conf.request_timeout_ms);
2773-
/* TopicArrayCnt */
2774-
rd_kafka_buf_write_i32(rkbuf, 1);
2693+
rd_kafka_toppar_t *rktp) {
2694+
rd_kafka_buf_t *rkbuf;
2695+
rd_kafka_itopic_t *rkt = rktp->rktp_rkt;
2696+
size_t MessageSetSize = 0;
2697+
rd_kafka_msgset_writer_t msetw;
2698+
2699+
/**
2700+
* Create ProduceRequest with as many messages from the toppar
2701+
* transmit queue as possible.
2702+
*/
2703+
rkbuf = rd_kafka_msgset_create_ProduceRequest(rkb, rktp,
2704+
&MessageSetSize);
27752705

2776-
/* Insert topic */
2777-
rd_kafka_buf_write_kstr(rkbuf, rkt->rkt_topic);
2706+
cnt = rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt);
27782707

2779-
/*
2780-
* Insert second part of Produce header
2781-
*/
2782-
/* PartitionArrayCnt */
2783-
rd_kafka_buf_write_i32(rkbuf, 1);
2784-
/* Partition */
2785-
rd_kafka_buf_write_i32(rkbuf, rktp->rktp_partition);
2786-
/* MessageSetSize: Will be finalized later*/
2787-
of_MessageSetSize = rd_kafka_buf_write_i32(rkbuf, 0);
2788-
2789-
/* Push write-buffer onto iovec stack to create a clean iovec boundary
2790-
* for the compression codecs. */
2791-
rd_kafka_buf_autopush(rkbuf);
2708+
rd_atomic64_add(&rktp->rktp_c.tx_msgs, cnt);
2709+
rd_atomic64_add(&rktp->rktp_c.tx_bytes, MessageSetSize);
27922710

2793-
iov_firstmsg = rkbuf->rkbuf_msg.msg_iovlen;
2794-
of_firstmsg = rkbuf->rkbuf_wof;
2795-
of_init_firstmsg = rkbuf->rkbuf_wof_init;
2796-
2797-
while (msgcnt < msgcntmax &&
2798-
(rkm = TAILQ_FIRST(&rktp->rktp_xmit_msgq.rkmq_msgs))) {
2799-
2800-
if (of_firstmsg + MessageSetSize + rd_kafka_msg_wire_size(rkm) >
2801-
(size_t)rkb->rkb_rk->rk_conf.max_msg_size) {
2802-
rd_rkb_dbg(rkb, MSG, "PRODUCE",
2803-
"No more space in current MessageSet "
2804-
"(%i messages)",
2805-
rd_atomic32_get(&rkbuf->rkbuf_msgq.
2806-
rkmq_msg_cnt));
2807-
/* Not enough remaining size. */
2808-
break;
2809-
}
2711+
rd_rkb_dbg(rkb, MSG, "PRODUCE",
2712+
"produce messageset with %i messages "
2713+
"(%"PRId32" bytes)",
2714+
rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt),
2715+
MessageSetSize);
28102716

2811-
rd_kafka_msgq_deq(&rktp->rktp_xmit_msgq, rkm, 1);
2812-
rd_kafka_msgq_enq(&rkbuf->rkbuf_msgq, rkm);
2813-
2814-
rd_avg_add(&rkb->rkb_avg_int_latency, now - rkm->rkm_ts_enq);
2815-
2816-
if (unlikely(msgcnt == 0 && MsgVersion == 1))
2817-
timestamp_firstmsg = rkm->rkm_timestamp;
2818-
2819-
/* Write message to buffer */
2820-
rd_kafka_assert(rkb->rkb_rk, rkm->rkm_len < INT32_MAX);
2821-
rd_kafka_buf_write_Message(rkb, rkbuf,
2822-
use_relative_offsets ? msgcnt : 0,
2823-
MsgVersion,
2824-
RD_KAFKA_COMPRESSION_NONE,
2825-
rkm->rkm_timestamp,
2826-
rkm->rkm_key, (int32_t)rkm->rkm_key_len,
2827-
rkm->rkm_payload,
2828-
(int32_t)rkm->rkm_len,
2829-
&outlen);
2830-
2831-
msgcnt++;
2832-
MessageSetSize += outlen;
2833-
rd_dassert(outlen <= rd_kafka_msg_wire_size(rkm));
2834-
}
2717+
if (!rkt->rkt_conf.required_acks)
2718+
rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_NO_RESPONSE;
28352719

2836-
/* No messages added, bail out early. */
2837-
if (unlikely(rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt) == 0)) {
2838-
rd_kafka_buf_destroy(rkbuf);
2839-
return -1;
2840-
}
2841-
2842-
/* Push final (copied) message, if any. */
2843-
rd_kafka_buf_autopush(rkbuf);
2844-
2845-
/* Compress the message(s) */
2846-
if (rktp->rktp_rkt->rkt_conf.compression_codec)
2847-
rd_kafka_compress_MessageSet_buf(rkb, rktp, rkbuf,
2848-
(int)iov_firstmsg, (int)of_firstmsg,
2849-
of_init_firstmsg,
2850-
MsgVersion,
2851-
timestamp_firstmsg,
2852-
&MessageSetSize);
2853-
2854-
/* Update MessageSetSize */
2855-
rd_kafka_buf_update_i32(rkbuf, of_MessageSetSize, MessageSetSize);
2856-
2857-
rd_atomic64_add(&rktp->rktp_c.tx_msgs,
2858-
rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt));
2859-
rd_atomic64_add(&rktp->rktp_c.tx_bytes, MessageSetSize);
2860-
2861-
rd_rkb_dbg(rkb, MSG, "PRODUCE",
2862-
"produce messageset with %i messages "
2863-
"(%"PRId32" bytes)",
2864-
rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt),
2865-
MessageSetSize);
2866-
2867-
cnt = rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt);
2868-
2869-
if (!rkt->rkt_conf.required_acks)
2870-
rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_NO_RESPONSE;
2871-
2872-
/* Use timeout from first message. */
2873-
rkbuf->rkbuf_ts_timeout =
2874-
TAILQ_FIRST(&rkbuf->rkbuf_msgq.rkmq_msgs)->rkm_ts_timeout;
2875-
2876-
if (rkb->rkb_features & RD_KAFKA_FEATURE_THROTTLETIME)
2877-
rd_kafka_buf_ApiVersion_set(rkbuf, 1,
2878-
RD_KAFKA_FEATURE_THROTTLETIME);
2720+
/* Use timeout from first message. */
2721+
rkbuf->rkbuf_ts_timeout =
2722+
TAILQ_FIRST(&rkbuf->rkbuf_msgq.rkmq_msgs)->rkm_ts_timeout;
28792723

28802724
rd_kafka_broker_buf_enq_replyq(rkb, rkbuf,
28812725
RD_KAFKA_NO_REPLYQ,
@@ -2884,7 +2728,7 @@ static int rd_kafka_broker_produce_toppar (rd_kafka_broker_t *rkb,
28842728
rd_kafka_toppar_keep(rktp));
28852729

28862730

2887-
return cnt;
2731+
return cnt;
28882732
}
28892733

28902734

src/rdkafka_buf.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,18 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */
413413
};
414414

415415

416+
/**
417+
* @brief rkbuf buffer position
418+
* Points to a position state of the buffer, can be used for rewinding
419+
* or seeking.
420+
*/
421+
typedef struct rd_kafka_bufpos_s {
422+
rd_kafka_buf_t *rkbufp_rkbuf; /* refcounted back-pointer */
423+
int rkbufp_iov_idx; /* iov index */
424+
size_t rkbufp_of; /* rkbuf_wof or rkbuf_of */
425+
size_t rkbufp_wof_init; /* rkbuf_wof_init */
426+
} rd_kafka_bufpos_t;
427+
416428
typedef struct rd_kafka_bufq_s {
417429
TAILQ_HEAD(, rd_kafka_buf_s) rkbq_bufs;
418430
rd_atomic32_t rkbq_cnt;

src/rdkafka_int.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,15 @@ struct rd_kafka_s {
183183
* <0: Running in High level consumer mode */
184184
rd_atomic32_t rk_simple_cnt;
185185

186+
/**
187+
* Exactly Once Semantics
188+
*/
189+
struct {
190+
rd_kafkap_str_t *rk_TransactionalId;
191+
int64_t rk_PID;
192+
int16_t rk_ProducerEpoch;
193+
} rk_eos;
194+
186195
const rd_kafkap_bytes_t *rk_null_bytes;
187196

188197
struct {

0 commit comments

Comments
 (0)