-
Notifications
You must be signed in to change notification settings - Fork 812
0.4: kafka.Writer #461
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
0.4: kafka.Writer #461
Conversation
I was being blocked by flaky tests so I added fixes for those, also fixed the CI config so it would run all tests, and made it run against all kafka versions from 0.10 to 2.3. |
Topic: topic, | ||
MaxAttempts: 1, | ||
MaxAttempts: 3, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to have a bit more thorough testing around how many attempts were actually made, and that writes actually succeed on subsequent attempts if the temporary issue resolves. In the old version, I had started going down the route of putting the write
method (now produce
) in an interface so that test code could substitute it with a dumb implementation which counts attempt numbers. Do you have any thoughts on that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, I think this test will fail if I happen to have a kafka server running on port 9999 😄
@@ -25,7 +46,7 @@ type ProduceRequest struct { | |||
Partition int | |||
|
|||
// The level of required acknowledgements to ask the kafka broker for. | |||
RequiredAcks int | |||
RequiredAcks RequiredAcks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. I didn't even know these were code for none/one/all, I thought I was specifying exactly how many acks I wanted.
* 0.4: fix panic when TLS is enabled * 0.4: fix panic when establishing TLS connections * cleanup * Update transport_test.go Co-authored-by: Steve van Loben Sels <[email protected]> * validate that an error is returned Co-authored-by: Steve van Loben Sels <[email protected]>
…stead of the full size of the sequence (#485) * modify protocol.Bytes to expose the number of remaining bytes instead of the full size of the sequence * add test for pageRef.ReadByte + fix pageRef.scan * reuse contiguousPages.scan
Sets the correct balancer as passed through in the config on the writer
* add protocol package * add some documentation * fix * make ByteSequence more generic + add more benchmarks * WIP: add support for record batches * finish support for record batches * add support for recort set compression * backward-compatible compression codec imports * fix compress tests * make it possible for the transport to connect to multiple clusters + enhance kafka.Client to expose methods for creating and deleting topics * support responding to metadata requests with cached response * manage proper shutdown of client transport in tests * WIP: test Produce API * WIP: massive cleanup + track down CRC32 validation issue * functional Produce and Fetch implementations * add metadata request/response * add listoffsets API * expose cluster id and controller in metadata response * remove bufio.Writer from the protocol API * remove bufio.Reader from the protocol API * add back deprecated Client methods * fixes for kafka 0.10 * cleanup comment in protocol/record.go * add more comments * reduce size of bufio.Reader buffer on kafka connections * refactor transport internals to support splitting requests and dispatching them across multiple brokers * avoid contention on connection pool mutex in most cases * cleanup * add kafka.(*Client).MultiFetch API * close records in produce request * refactor record batch APIs to fully support streaming * remove io.Closer from protocol.RecordBatch * never return nil record batches * record batch fixes * remove unused variable * fix reading of multiple topic partitions in produce and fetch messages * alias compress.Compression in the kafka package * expose compression constants in the kafka package * exposes kafka.Request and kafka.Response interfaces * simplify the protocol.Bytes interface * simplify error management in protocol package * wait for topic creation to propagate + fix request dispatching in multi-broker clusters * simplify kafka.(*Client).CreateTopics API * improve error handling + wait for metadata propagation after topic creation * revisit connection pool implementation to remove multiplexing * fix panic when referencing truncated page buffer * fix unexpected EOF errors reading kafka messages * revisit record reader API * fix panic type asserting nil response into *metadata.Response * optimize allocation of broker ids in cluster metadata * unify sync.Pool usage * reduce memory footprint of protocol.(*RecordSet).readFromVersion2 * fix panic accessing optimized record reader with a nil headers slice * add APIs for marshaling and unmarshaling kafka values * [skip ci] fix README example * investigate-multi-fetch-issues * remove MultiFetch API * simplify protocol tests * add benchmarks for kafka.Marshal and kafka.Unmarshal * fix crash on cluster layout changes * add more error codes * remove partial support for flexible message format * downgrade metadata test from v9 to v8 * test against kafka 2.5.0 * Update offsetfetch.go Co-authored-by: Jeremy Jackins <[email protected]> * Update offsetfetch.go Co-authored-by: Jeremy Jackins <[email protected]> * Update offsetfetch.go Co-authored-by: Jeremy Jackins <[email protected]> * fix typos * fix more typos * set pprof labels on transport goroutines (#458) * change tests to run against 2.4.1 instead of 2.5.0 * support up to 2.3.1 (TestConn/nettest/PingPong fails with 2.4 and above) * Update README.md Co-authored-by: Steve van Loben Sels <[email protected]> * Update client.go Co-authored-by: Steve van Loben Sels <[email protected]> * comment on why we devide the timeout by 2 * protocol.Reducer => protocol.Merger * cleanup docker-compose.yml * protocol.Mapper => protocol.Splitter * propagate the caller's context to the dial function (#460) * fix backward compatiblity with kafka-go v0.3.x * fix record offsets when fetching messages with version 1 * default record timestamps to current timestamp * revert changes to docker-compose.yml * fix tests * fix tests (2) * 0.4: kafka.Writer (#461) * 0.4: kafka.Writer * update README * disable some parallel tests * disable global parallelism in tests * fix typo * disable parallelism in sub-packages tests * properly seed random sources + delete test topics * cleanup build * run all tests * fix tests * enable more SASL mechanisms on CI * try to fix the CI config * try testing the sasl package with 2.3.1 only * inline configuration for kafka 2.3.1 in CI * fix zookeeper hostname in CI * cleanup CI config * keep the kafka 0.10 configuration separate + test against more kafka versions * fix kafka 0.11 image tag * try caching dependencies * support multiple broker addresses * uncomment max attempt test * fix typos * guard against empty kafka.MultiAddr in kafka.Transport * don't export new APIs for network addresses + adapt to any multi-addr implementation * add comment about the transport caching the metadata responses * 0.4 fix tls address panic (#478) * 0.4: fix panic when TLS is enabled * 0.4: fix panic when establishing TLS connections * cleanup * Update transport_test.go Co-authored-by: Steve van Loben Sels <[email protected]> * validate that an error is returned Co-authored-by: Steve van Loben Sels <[email protected]> * 0.4: fix short writes (#479) * 0.4: modify protocol.Bytes to expose the number of remaining bytes instead of the full size of the sequence (#485) * modify protocol.Bytes to expose the number of remaining bytes instead of the full size of the sequence * add test for pageRef.ReadByte + fix pageRef.scan * reuse contiguousPages.scan * fix(writer): set correct balancer (#489) Sets the correct balancer as passed through in the config on the writer Co-authored-by: Steve van Loben Sels <[email protected]> Co-authored-by: Artur <[email protected]> * Fix for panic when RequiredAcks is set to RequireNone (#504) * Fix panic in async wait() method when RequiredAcks is None When RequiredAcks is None, the producer does not wait for a response from the broker, therefore the response is nil. The async wait() method was not handling this case, leading to a panic. * Add regression test for RequiredAcks == RequireNone This new test is required because all the other Writer tests use NewWriter() to create Writers, which sets RequiredAcks to RequireAll when 0 (None) was specified. * fix: writer test for RequiredAcks=None * fix: writer tests for RequiredAcks=None (2) * 0.4 broker resolver (#526) * 0.4: kafka.BrokerResolver * add kafka.Transport.Context * inline network and address fields in conn type * Fix sasl authentication on writer (#541) The authenticateSASL was called before getting api version. This resulted incorrect apiversion (0 instead of 1) when calling saslHandshakeRoundTrip request * Remove deprecated function (NewWriter) usages (#528) * fix zstd decoder leak (#543) * fix zstd decoder leak * fix tests * fix panic * fix tests (2) * fix tests (3) * fix tests (4) * move ConnWaitGroup to testing package * fix zstd codec * Update compress/zstd/zstd.go Co-authored-by: Nicholas Sun <[email protected]> * PR feedback Co-authored-by: Nicholas Sun <[email protected]> * improve custom resolver support by allowing port to be overridden (#545) * 0.4: reduce memory footprint (#547) * Bring over flexible message changes * Add docker-compose config for kafka 2.4.1 * Misc. cleanups * Add protocol tests and fix issues * Misc. fixes; run circleci on v2.4.1 * Skip conntest for v2.4.1 * Disable nettests for kafka 2.4.1 * Revert formatting changes * Misc. fixes * Update comments * Make create topics test more interesting * feat(writer): add support for writing messages to multiple topics (#561) * Add comments on failing nettests * Fix spacing * Update var int sizing * Simplify writeVarInt implementation * Revert encoding change * Simplify varint encoding functions and expand tests * Also test sizeOf functions in protocol test * chore: merge master and resolve conflicts (#570) Co-authored-by: Jeremy Jackins <[email protected]> Co-authored-by: Steve van Loben Sels <[email protected]> Co-authored-by: Artur <[email protected]> Co-authored-by: Neil Cook <[email protected]> Co-authored-by: Ahmy Yulrizka <[email protected]> Co-authored-by: Turfa Auliarachman <[email protected]> Co-authored-by: Nicholas Sun <[email protected]> Co-authored-by: Dominic Barnes <[email protected]> Co-authored-by: Benjamin Yolken <[email protected]> Co-authored-by: Benjamin Yolken <[email protected]>
This PR modifies the implementation of the
kafka.Writer
type to adopt the 0.4 APIs and leverage connection pooling ofkafka.Transport
in the writers.I am also introducing a couple new APIs to address known flaws of the existing API:
WriteMessages
so programs can distinguish which messages have succeeded and failed.Async: true
.A major change in the implementation is the removal of all queuing, and with it the need for blocking and synchronizing error reporting. Batches are now aggregated in a shared state instead of using message passing to aggregate batches in each partition writer.
I've also done documentation work, which hopefully is going to help address some of the open issues as well.
Here are a couple of issues that this PR will likely address:
#53
#131
#298
#342
#350
#356
#364
#391
#419
#445
#451
#459
#176
#420
#452