From 62bb4641688969b48be2faa047e7d2a73ce6bdfd Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Thu, 17 Oct 2019 15:40:21 -0400 Subject: [PATCH 01/17] Export WriterError struct type and associated fields. --- writer.go | 32 ++++++++++++++++---------------- writer_test.go | 4 ++-- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/writer.go b/writer.go index c07a0c519..205b1bdf5 100644 --- a/writer.go +++ b/writer.go @@ -349,9 +349,9 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error { select { case e := <-res: if e != nil { - if we, ok := e.(*writerError); ok { + if we, ok := e.(*WriterError); ok { w.stats.retries.observe(1) - retry, err = append(retry, we.msg), we.err + retry, err = append(retry, we.Msg), we.Err } else { err = e } @@ -492,7 +492,7 @@ func (w *Writer) run() { err = fmt.Errorf("failed to find any partitions for topic %s", w.config.Topic) } if wm.res != nil { - wm.res <- &writerError{msg: wm.msg, err: err} + wm.res <- &WriterError{Msg: wm.msg, Err: err} } } @@ -746,7 +746,7 @@ func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error)) (ret logger.Printf("error dialing kafka brokers for topic %s (partition %d): %s", w.topic, w.partition, err) }) for i, res := range resch { - res <- &writerError{msg: batch[i], err: err} + res <- &WriterError{Msg: batch[i], Err: err} } return } @@ -760,7 +760,7 @@ func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error)) (ret logger.Printf("error writing messages to %s (partition %d): %s", w.topic, w.partition, err) }) for i, res := range resch { - res <- &writerError{msg: batch[i], err: err} + res <- &WriterError{Msg: batch[i], Err: err} } } else { for _, m := range batch { @@ -784,25 +784,25 @@ type writerMessage struct { res chan<- error } -type writerError struct { - msg Message - err error +type WriterError struct { + Msg Message + Err error } -func (e *writerError) Cause() error { - return e.err +func (e *WriterError) Cause() error { + return e.Err } -func (e *writerError) Error() string { - return e.err.Error() +func (e *WriterError) Error() string { + return e.Err.Error() } -func (e *writerError) Temporary() bool { - return isTemporary(e.err) +func (e *WriterError) Temporary() bool { + return isTemporary(e.Err) } -func (e *writerError) Timeout() bool { - return isTimeout(e.err) +func (e *WriterError) Timeout() bool { + return isTimeout(e.Err) } func shuffledStrings(list []string) []string { diff --git a/writer_test.go b/writer_test.go index fa419b98f..4cc9a4297 100644 --- a/writer_test.go +++ b/writer_test.go @@ -148,8 +148,8 @@ func (f *fakeWriter) messages() chan<- writerMessage { go func() { for { msg := <-ch - msg.res <- &writerError{ - err: errors.New("bad attempt"), + msg.res <- &WriterError{ + Err: errors.New("bad attempt"), } } }() From 046b6b61b3143278725c2849e3ecf6cc343739a7 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Thu, 17 Oct 2019 15:41:45 -0400 Subject: [PATCH 02/17] Relocate WriterError so it is grouped with other exported types in writer.go. --- writer.go | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/writer.go b/writer.go index 205b1bdf5..43e4ae23b 100644 --- a/writer.go +++ b/writer.go @@ -133,6 +133,27 @@ type WriterConfig struct { newPartitionWriter func(partition int, config WriterConfig, stats *writerStats) partitionWriter } +type WriterError struct { + Msg Message + Err error +} + +func (e *WriterError) Cause() error { + return e.Err +} + +func (e *WriterError) Error() string { + return e.Err.Error() +} + +func (e *WriterError) Temporary() bool { + return isTemporary(e.Err) +} + +func (e *WriterError) Timeout() bool { + return isTimeout(e.Err) +} + // WriterStats is a data structure returned by a call to Writer.Stats that // exposes details about the behavior of the writer. type WriterStats struct { @@ -784,27 +805,6 @@ type writerMessage struct { res chan<- error } -type WriterError struct { - Msg Message - Err error -} - -func (e *WriterError) Cause() error { - return e.Err -} - -func (e *WriterError) Error() string { - return e.Err.Error() -} - -func (e *WriterError) Temporary() bool { - return isTemporary(e.Err) -} - -func (e *WriterError) Timeout() bool { - return isTimeout(e.Err) -} - func shuffledStrings(list []string) []string { shuffledList := make([]string, len(list)) copy(shuffledList, list) From 46fffc34a2a46e5245a851de5f0709e3fe425c9b Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Wed, 23 Oct 2019 16:35:51 -0400 Subject: [PATCH 03/17] Fix bug in writer.write where messages were being sent to the incorrect error channel to be retried. --- writer.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/writer.go b/writer.go index 43e4ae23b..8a6d273af 100644 --- a/writer.go +++ b/writer.go @@ -668,9 +668,7 @@ func (w *writer) run() { // If a lstMsg exists we need to add it to the batch so we don't lose it. if len(lastMsg.msg.Value) != 0 { batch = append(batch, lastMsg.msg) - if lastMsg.res != nil { - resch = append(resch, lastMsg.res) - } + resch = append(resch, lastMsg.res) batchSizeBytes += int(lastMsg.msg.size()) lastMsg = writerMessage{} if !batchTimerRunning { @@ -691,9 +689,7 @@ func (w *writer) run() { break } batch = append(batch, wm.msg) - if wm.res != nil { - resch = append(resch, wm.res) - } + resch = append(resch, wm.res) batchSizeBytes += int(wm.msg.size()) mustFlush = len(batch) >= w.batchSize || batchSizeBytes >= w.maxMessageBytes } @@ -767,7 +763,9 @@ func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error)) (ret logger.Printf("error dialing kafka brokers for topic %s (partition %d): %s", w.topic, w.partition, err) }) for i, res := range resch { - res <- &WriterError{Msg: batch[i], Err: err} + if res != nil { + res <- &WriterError{Msg: batch[i], Err: err} + } } return } @@ -781,7 +779,9 @@ func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error)) (ret logger.Printf("error writing messages to %s (partition %d): %s", w.topic, w.partition, err) }) for i, res := range resch { - res <- &WriterError{Msg: batch[i], Err: err} + if res != nil { + res <- &WriterError{Msg: batch[i], Err: err} + } } } else { for _, m := range batch { @@ -789,7 +789,9 @@ func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error)) (ret w.stats.bytes.observe(int64(len(m.Key) + len(m.Value))) } for _, res := range resch { - res <- nil + if res != nil { + res <- nil + } } } t1 := time.Now() From adc009d16c6072207125010305d99583dcdcd3a3 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Wed, 23 Oct 2019 16:50:34 -0400 Subject: [PATCH 04/17] Add id field to writerMessage struct type. Add id field so that a given message can be tracked back to its index when it was originally sent from Writer.WriteMessages. id allows WriteMessages to keep track of which messages have been successfully sent therefore making it easier to do more sophisticated error handling. --- writer.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/writer.go b/writer.go index 8a6d273af..a7b6e6e4b 100644 --- a/writer.go +++ b/writer.go @@ -652,6 +652,7 @@ func (w *writer) run() { var done bool var batch = make([]Message, 0, w.batchSize) var resch = make([](chan<- error), 0, w.batchSize) + var ids = make([]int, 0, w.batchSize) var lastMsg writerMessage var batchSizeBytes int var idleConnDeadline time.Time @@ -669,6 +670,7 @@ func (w *writer) run() { if len(lastMsg.msg.Value) != 0 { batch = append(batch, lastMsg.msg) resch = append(resch, lastMsg.res) + ids = append(ids, lastMsg.id) batchSizeBytes += int(lastMsg.msg.size()) lastMsg = writerMessage{} if !batchTimerRunning { @@ -690,6 +692,7 @@ func (w *writer) run() { } batch = append(batch, wm.msg) resch = append(resch, wm.res) + ids = append(ids, wm.id) batchSizeBytes += int(wm.msg.size()) mustFlush = len(batch) >= w.batchSize || batchSizeBytes >= w.maxMessageBytes } @@ -719,7 +722,7 @@ func (w *writer) run() { continue } var err error - if conn, err = w.write(conn, batch, resch); err != nil { + if conn, err = w.write(conn, batch, resch, ids); err != nil { if conn != nil { conn.Close() conn = nil @@ -733,8 +736,13 @@ func (w *writer) run() { for i := range resch { resch[i] = nil } + + for i := range ids { + ids[i] = 0 + } batch = batch[:0] resch = resch[:0] + ids = ids[:0] batchSizeBytes = 0 } } @@ -754,7 +762,7 @@ func (w *writer) dial() (conn *Conn, err error) { return } -func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error)) (ret *Conn, err error) { +func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error), _ []int) (ret *Conn, err error) { w.stats.writes.observe(1) if conn == nil { if conn, err = w.dial(); err != nil { @@ -805,6 +813,7 @@ func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error)) (ret type writerMessage struct { msg Message res chan<- error + id int } func shuffledStrings(list []string) []string { From 7aa550323288ae767006f42958df362dbe75b2dd Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Thu, 24 Oct 2019 15:51:17 -0400 Subject: [PATCH 05/17] Set id when creating writerMessages struct in writer.WriteMessages. --- writer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/writer.go b/writer.go index a7b6e6e4b..50e0fd368 100644 --- a/writer.go +++ b/writer.go @@ -351,6 +351,7 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error { case w.msgs <- writerMessage{ msg: msg, res: res, + id: i, }: case <-ctx.Done(): w.mutex.RUnlock() From 426ab0165d1bc6969bf37aec0521723cc35eefe9 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Thu, 24 Oct 2019 16:02:46 -0400 Subject: [PATCH 06/17] Add writerResponse struct type to wrap write errors with message id. Change res channel type in writerMessage to new writerResponse struct type so ids can be returned back to writer.WriteMessages. Refactor affected receivers. --- writer.go | 52 +++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 39 insertions(+), 13 deletions(-) diff --git a/writer.go b/writer.go index 50e0fd368..c101d36f0 100644 --- a/writer.go +++ b/writer.go @@ -324,9 +324,9 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error { } var err error - var res chan error + var res chan writerResponse if !w.config.Async { - res = make(chan error, len(msgs)) + res = make(chan writerResponse, len(msgs)) } t0 := time.Now() @@ -370,12 +370,12 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error { for i := 0; i != len(msgs); i++ { select { case e := <-res: - if e != nil { - if we, ok := e.(*WriterError); ok { + if e.err != nil { + if we, ok := e.err.(*WriterError); ok { w.stats.retries.observe(1) retry, err = append(retry, we.Msg), we.Err } else { - err = e + err = e.err } } case <-ctx.Done(): @@ -514,7 +514,13 @@ func (w *Writer) run() { err = fmt.Errorf("failed to find any partitions for topic %s", w.config.Topic) } if wm.res != nil { - wm.res <- &WriterError{Msg: wm.msg, Err: err} + wm.res <- writerResponse{ + id: wm.id, + err: &WriterError{ + Msg: wm.msg, + Err: err, + }, + } } } @@ -652,7 +658,7 @@ func (w *writer) run() { var conn *Conn var done bool var batch = make([]Message, 0, w.batchSize) - var resch = make([](chan<- error), 0, w.batchSize) + var resch = make([](chan<- writerResponse), 0, w.batchSize) var ids = make([]int, 0, w.batchSize) var lastMsg writerMessage var batchSizeBytes int @@ -763,7 +769,7 @@ func (w *writer) dial() (conn *Conn, err error) { return } -func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error), _ []int) (ret *Conn, err error) { +func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- writerResponse), ids []int) (ret *Conn, err error) { w.stats.writes.observe(1) if conn == nil { if conn, err = w.dial(); err != nil { @@ -773,7 +779,13 @@ func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error), _ [] }) for i, res := range resch { if res != nil { - res <- &WriterError{Msg: batch[i], Err: err} + res <- writerResponse{ + id: ids[i], + err: &WriterError{ + Msg: batch[i], + Err: err, + }, + } } } return @@ -789,7 +801,13 @@ func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error), _ [] }) for i, res := range resch { if res != nil { - res <- &WriterError{Msg: batch[i], Err: err} + res <- writerResponse{ + id: ids[i], + err: &WriterError{ + Msg: batch[i], + Err: err, + }, + } } } } else { @@ -797,9 +815,12 @@ func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error), _ [] w.stats.messages.observe(1) w.stats.bytes.observe(int64(len(m.Key) + len(m.Value))) } - for _, res := range resch { + for i, res := range resch { if res != nil { - res <- nil + res <- writerResponse{ + id: i, + err: nil, + } } } } @@ -813,8 +834,13 @@ func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error), _ [] type writerMessage struct { msg Message - res chan<- error + res chan<- writerResponse + id int +} + +type writerResponse struct { id int + err error } func shuffledStrings(list []string) []string { From d6aaf6030c48a8cac721415b51e3449af5f57eb3 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Mon, 28 Oct 2019 18:15:52 -0400 Subject: [PATCH 07/17] Return writerResponse in fakeWriter.messages() in writer_test. --- writer_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/writer_test.go b/writer_test.go index 4cc9a4297..ee18dbd0c 100644 --- a/writer_test.go +++ b/writer_test.go @@ -148,8 +148,12 @@ func (f *fakeWriter) messages() chan<- writerMessage { go func() { for { msg := <-ch - msg.res <- &WriterError{ - Err: errors.New("bad attempt"), + msg.res <- writerResponse{ + id: msg.id, + err: &WriterError{ + Err: errors.New("bad attempt"), + Msg: msg.msg, + }, } } }() From 0891d299ecd8f0e6d34be82ee4408cac77f816d6 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Wed, 6 Nov 2019 09:51:09 -0500 Subject: [PATCH 08/17] Add test for kafka 2.2.1 compatibility to circle ci config. --- .circleci/config.yml | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index 5ec8e2bbd..1566a5faf 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -123,6 +123,37 @@ jobs: - run: go get -v -t . ./gzip ./lz4 ./sasl ./snappy - run: go test -v -race -cover -timeout 150s $(go list ./... | grep -v examples) + kafka-221: + working_directory: /go/src/github.com/segmentio/kafka-go + environment: + KAFKA_VERSION: "2.2.1" + docker: + - image: circleci/golang + - image: wurstmeister/zookeeper + ports: ['2181:2181'] + - image: wurstmeister/kafka:2.12-2.2.1 + ports: ['9092:9092','9093:9093'] + environment: + KAFKA_BROKER_ID: '1' + KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_ADVERTISED_HOST_NAME: 'localhost' + KAFKA_ADVERTISED_PORT: '9092' + KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181' + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093' + KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" + CUSTOM_INIT_SCRIPT: |- + echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; + /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram + steps: + - checkout + - setup_remote_docker: { reusable: true, docker_layer_caching: true } + - run: go get -v -t . ./gzip ./lz4 ./sasl ./snappy + - run: go test -v -race -cover -timeout 150s $(go list ./... | grep -v examples) + workflows: version: 2 run: @@ -131,3 +162,4 @@ workflows: - kafka-011 - kafka-111 - kafka-210 + - kafka-221 \ No newline at end of file From 0bd6dc0059b0dde98bf2a4cdb106e3defae2456b Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Wed, 5 Feb 2020 13:39:52 -0500 Subject: [PATCH 09/17] Remove unused WriterError.Cause method --- writer.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/writer.go b/writer.go index c101d36f0..848e44066 100644 --- a/writer.go +++ b/writer.go @@ -138,10 +138,6 @@ type WriterError struct { Err error } -func (e *WriterError) Cause() error { - return e.Err -} - func (e *WriterError) Error() string { return e.Err.Error() } From 4963dac51f45863d9a32464ffc15a84ec1889911 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Wed, 5 Feb 2020 13:44:29 -0500 Subject: [PATCH 10/17] Add WriterErrors type to wrap a slice of WriterError Implement WriterErrors type which implements the error interface to allow for the returning of multiple WriterErrors that may occur during a write. Provide basic Error() method which takes inspiration from https://github.com/hashicorp/go-multierror/blob/72917a1559e17f38638ade54020ab372ba848d67/format.go#L14 so the slice of WriterError is formatted in a nice fashion. --- writer.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/writer.go b/writer.go index 848e44066..431b2fd16 100644 --- a/writer.go +++ b/writer.go @@ -7,6 +7,7 @@ import ( "io" "math/rand" "sort" + "strings" "sync" "time" ) @@ -150,6 +151,24 @@ func (e *WriterError) Timeout() bool { return isTimeout(e.Err) } +type WriterErrors []WriterError + +func (es WriterErrors) Error() string { + if len(es) == 1 { + return fmt.Sprintf("1 WriterError occurred:\n\t* %s\n", es[0].Err) + } + + points := make([]string, len(es)) + for i, we := range es { + points[i] = fmt.Sprintf("* %s", we.Err) + } + + return fmt.Sprintf( + "%d WriterErrors occurred:\n\t%s\n", + len(es), + strings.Join(points, "\n\t")) +} + // WriterStats is a data structure returned by a call to Writer.Stats that // exposes details about the behavior of the writer. type WriterStats struct { From 67b360e535f4d3121f04aecb510fcec8f2c77313 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Tue, 11 Feb 2020 12:01:25 -0500 Subject: [PATCH 11/17] Change err type in writerResponse to *WriterError error is a bit too general for dependent code. All err values are WriterError anyway so make it the more specific type. --- writer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/writer.go b/writer.go index 431b2fd16..ab2074098 100644 --- a/writer.go +++ b/writer.go @@ -855,7 +855,7 @@ type writerMessage struct { type writerResponse struct { id int - err error + err *WriterError } func shuffledStrings(list []string) []string { From d5ef30b4ae0b674e47d23aa995a93b7324bf8b30 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Tue, 11 Feb 2020 12:09:42 -0500 Subject: [PATCH 12/17] Return multiple errors from Writer.WriteMessages and try entire batch before returning MessageTooLarge errors Utilize WriterErrors and WriterError types to return all errors that occur when sending a batch of messages using Writer.WriteMessages while still preserving error return type. Try to send all eligible messages in msgs before returning any MessageTooLarge errors. --- writer.go | 114 ++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 76 insertions(+), 38 deletions(-) diff --git a/writer.go b/writer.go index ab2074098..2099b36cd 100644 --- a/writer.go +++ b/writer.go @@ -338,7 +338,8 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error { return nil } - var err error + errs := make(WriterErrors, 0, len(msgs)) + tooLarge := 0 var res chan writerResponse if !w.config.Async { res = make(chan writerResponse, len(msgs)) @@ -346,83 +347,120 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error { t0 := time.Now() for attempt := 0; attempt < w.config.MaxAttempts; attempt++ { + handled := make(map[int]bool, len(msgs)) w.mutex.RLock() if w.closed { w.mutex.RUnlock() - return io.ErrClosedPipe + for _, m := range msgs { + errs = append(errs, WriterError{ + Msg: m, + Err: io.ErrClosedPipe, + }) + } + return errs } for i, msg := range msgs { if int(msg.size()) > w.config.BatchBytes { - err := MessageTooLargeError{ - Message: msg, - Remaining: msgs[i+1:], + errs = append(errs, WriterError{ + Msg: msg, + Err: MessageTooLargeError{ + Message: msg, + }, + }) + tooLarge += 1 + handled[i] = true + } else { + select { + case w.msgs <- writerMessage{ + msg: msg, + res: res, + id: i, + }: + case <-ctx.Done(): + w.mutex.RUnlock() + // treat all messages as failed + for j, m := range msgs { + // don't double count MessageTooLargeErrors which may already be present in errs + if _, ok := handled[j]; !ok { + errs = append(errs, WriterError{ + Msg: m, + Err: ctx.Err(), + }) + } + } + return errs } - w.mutex.RUnlock() - return err - } - select { - case w.msgs <- writerMessage{ - msg: msg, - res: res, - id: i, - }: - case <-ctx.Done(): - w.mutex.RUnlock() - return ctx.Err() } } - w.mutex.RUnlock() if w.config.Async { break } - var retry []Message - - for i := 0; i != len(msgs); i++ { + sent := len(msgs) - len(handled) + for i := 0; i != sent; i++ { select { - case e := <-res: - if e.err != nil { - if we, ok := e.err.(*WriterError); ok { - w.stats.retries.observe(1) - retry, err = append(retry, we.Msg), we.Err - } else { - err = e.err - } + case r := <-res: + handled[r.id] = true + if r.err != nil { + w.stats.retries.observe(1) + errs = append(errs, *r.err) } case <-ctx.Done(): - return ctx.Err() + // all unacked msgs become errors + for x := range msgs { + if _, ok := handled[x]; !ok { + errs = append(errs, WriterError{ + Msg: msgs[x], + Err: ctx.Err(), + }) + } + } + return errs } } - if msgs = retry; len(msgs) == 0 { + retryable := len(errs) - tooLarge + if retryable == 0 { break } timer := time.NewTimer(backoff(attempt+1, 100*time.Millisecond, 1*time.Second)) + var override error select { case <-timer.C: - // Only clear the error (so we retry the loop) if we have more retries, otherwise - // we risk silencing the error. + // Only clear the failures (so we retry the loop) if we have more retries if attempt < w.config.MaxAttempts-1 { - err = nil + // populate msgs for a retry + msgs = msgs[:0] + for i := tooLarge; i < len(errs); i++ { + msgs = append(msgs, errs[i].Msg) + } + errs = errs[:tooLarge] } case <-ctx.Done(): - err = ctx.Err() + override = ctx.Err() case <-w.done: - err = io.ErrClosedPipe + override = io.ErrClosedPipe } timer.Stop() - if err != nil { + if override != nil { + for i := tooLarge; i < len(errs); i++ { + errs[i].Err = override + } break } } w.stats.writeTime.observeDuration(time.Since(t0)) - return err + + if len(errs) > 0 { + return errs + } + return nil } // Stats returns a snapshot of the writer stats since the last time the method From 427f1edcc87d36e16ea1ac8dbac230a33b9dcb8d Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Tue, 11 Feb 2020 12:15:05 -0500 Subject: [PATCH 13/17] Rename receiver in WriterErrors.Error method --- writer.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/writer.go b/writer.go index 2099b36cd..b3b8c0e9a 100644 --- a/writer.go +++ b/writer.go @@ -153,19 +153,19 @@ func (e *WriterError) Timeout() bool { type WriterErrors []WriterError -func (es WriterErrors) Error() string { - if len(es) == 1 { - return fmt.Sprintf("1 WriterError occurred:\n\t* %s\n", es[0].Err) +func (wes WriterErrors) Error() string { + if len(wes) == 1 { + return fmt.Sprintf("1 WriterError occurred:\n\t* %s\n", wes[0].Err) } - points := make([]string, len(es)) - for i, we := range es { + points := make([]string, len(wes)) + for i, we := range wes { points[i] = fmt.Sprintf("* %s", we.Err) } return fmt.Sprintf( "%d WriterErrors occurred:\n\t%s\n", - len(es), + len(wes), strings.Join(points, "\n\t")) } From 4a268bb9fbdc4f7017b24b4c9d4b66870ee08405 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Tue, 11 Feb 2020 12:18:13 -0500 Subject: [PATCH 14/17] Add writerTestCase type which provides common methods for Writer tests --- writer_test.go | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/writer_test.go b/writer_test.go index ee18dbd0c..54bd8f548 100644 --- a/writer_test.go +++ b/writer_test.go @@ -58,6 +58,69 @@ func TestWriter(t *testing.T) { } } +type writerTestCase WriterErrors + +func (wt writerTestCase) errorsEqual(wes WriterErrors) bool { + exp := make(map[string]int) + numExp := 0 + for _, t := range wt { + if t.Err != nil { + numExp += 1 + k := string(t.Msg.Value) + t.Err.Error() + if _, ok := exp[k]; ok { + exp[k] += 1 + } else { + exp[k] = 1 + } + } + } + + if len(wes) != numExp { + return false + } + + for _, e := range wes { + k := string(e.Msg.Value) + e.Err.Error() + if _, ok := exp[k]; ok { + exp[k] -= 1 + } else { + return false + } + } + + for _, e := range exp { + if e != 0 { + return false + } + } + + return true +} + +func (wt writerTestCase) msgs() []Message { + msgs := make([]Message, len(wt)) + for i, m := range wt { + msgs[i] = m.Msg + } + + return msgs +} + +func (wt writerTestCase) expected() WriterErrors { + exp := make(WriterErrors, 0, len(wt)) + for _, v := range wt { + if v.Err != nil { + exp = append(exp, v) + } + } + + if len(exp) > 0 { + return exp + } + + return nil +} + func newTestWriter(config WriterConfig) *Writer { if len(config.Brokers) == 0 { config.Brokers = []string{"localhost:9092"} From 407ea250b5d3061296caa23d8026dbab826b0901 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Tue, 11 Feb 2020 12:24:35 -0500 Subject: [PATCH 15/17] Update tests for Writer type --- writer_test.go | 373 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 320 insertions(+), 53 deletions(-) diff --git a/writer_test.go b/writer_test.go index 54bd8f548..d9ba2b0e4 100644 --- a/writer_test.go +++ b/writer_test.go @@ -5,7 +5,6 @@ import ( "errors" "io" "math" - "strings" "testing" "time" ) @@ -21,12 +20,22 @@ func TestWriter(t *testing.T) { scenario: "closing a writer right after creating it returns promptly with no error", function: testWriterClose, }, - + { + scenario: "writing messages on closed writer should return error", + function: testClosedWriterErr, + }, + { + scenario: "writing empty Message slice returns promptly with no error", + function: testEmptyWrite, + }, + { + scenario: "writing messages after context is done should return an error", + function: testContextDoneErr, + }, { scenario: "writing 1 message through a writer using round-robin balancing produces 1 message to the first partition", function: testWriterRoundRobin1, }, - { scenario: "running out of max attempts should return an error", function: testWriterMaxAttemptsErr, @@ -47,6 +56,10 @@ func TestWriter(t *testing.T) { scenario: "writing messsages with a small batch byte size", function: testWriterSmallBatchBytes, }, + { + scenario: "writing messages with retries enabled", + function: testWriterRetries, + }, } for _, test := range tests { @@ -141,6 +154,120 @@ func testWriterClose(t *testing.T) { } } +func testClosedWriterErr(t *testing.T) { + tcs := []writerTestCase{ + { + { + Msg: Message{Value: []byte("Hello World!")}, + Err: io.ErrClosedPipe, + }, + }, + { + { + Msg: Message{Value: []byte("Hello")}, + Err: io.ErrClosedPipe, + }, + { + Msg: Message{Value: []byte("World!")}, + Err: io.ErrClosedPipe, + }, + }, + } + + const topic = "test-writer-0" + w := newTestWriter(WriterConfig{ + Topic: topic, + }) + + if err := w.Close(); err != nil { + t.Fatal(err) + } + + for i, tc := range tcs { + err := w.WriteMessages(context.Background(), tc.msgs()...) + if err == nil { + t.Errorf("test %d: expected error", i) + continue + } + + wes, ok := err.(WriterErrors) + if !ok { + t.Errorf("test %d: expected WriterErrors", i) + continue + } + + if !tc.errorsEqual(wes) { + t.Errorf("test %d: unexpected errors occurred.\nExpected:\n%sFound:\n%s", i, tc.expected(), wes) + } + } +} + +func testEmptyWrite(t *testing.T) { + const topic = "test-writer-0" + w := newTestWriter(WriterConfig{ + Topic: topic, + }) + + defer func() { + _ = w.Close() + }() + + if err := w.WriteMessages(context.Background(), []Message{}...); err != nil { + t.Error("unexpected error occurred", err) + } +} + +func testContextDoneErr(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + tcs := []writerTestCase{ + { + { + Msg: Message{Value: []byte("Hello World!")}, + Err: ctx.Err(), + }, + }, + { + { + Msg: Message{Value: []byte("Hello")}, + Err: ctx.Err(), + }, + { + Msg: Message{Value: []byte("World")}, + Err: ctx.Err(), + }, + }, + } + + const topic = "test-writer-0" + w := newTestWriter(WriterConfig{ + Topic: topic, + }) + + defer func() { + _ = w.Close() + }() + + for i, tc := range tcs { + err := w.WriteMessages(ctx, tc.msgs()...) + if err == nil { + t.Errorf("test %d: expected error", i) + continue + } + + wes, ok := err.(WriterErrors) + if !ok { + t.Errorf("test %d: expected WriterErrors", i) + continue + } + + if !tc.errorsEqual(wes) { + t.Errorf("test %d: unexpected errors occurred.\nExpected:\n%sFound:\n%s", i, tc.expected(), wes) + } + } +} + func testWriterRoundRobin1(t *testing.T) { const topic = "test-writer-1" @@ -203,9 +330,9 @@ func TestValidateWriter(t *testing.T) { } } -type fakeWriter struct{} +type errorWriter struct{} -func (f *fakeWriter) messages() chan<- writerMessage { +func (w *errorWriter) messages() chan<- writerMessage { ch := make(chan writerMessage, 1) go func() { @@ -224,46 +351,99 @@ func (f *fakeWriter) messages() chan<- writerMessage { return ch } -func (f *fakeWriter) close() { +func (w *errorWriter) close() { } func testWriterMaxAttemptsErr(t *testing.T) { + tcs := []writerTestCase{ + { + { + Msg: Message{Value: []byte("test 1 error")}, + Err: errors.New("bad attempt"), + }, + }, + { + { + Msg: Message{Value: []byte("test multi error")}, + Err: errors.New("bad attempt"), + }, + { + Msg: Message{Value: []byte("test multi error")}, + Err: errors.New("bad attempt"), + }, + }, + } + const topic = "test-writer-2" createTopic(t, topic, 1) w := newTestWriter(WriterConfig{ Topic: topic, - MaxAttempts: 1, + MaxAttempts: 2, Balancer: &RoundRobin{}, - newPartitionWriter: func(p int, config WriterConfig, stats *writerStats) partitionWriter { - return &fakeWriter{} + newPartitionWriter: func(_ int, _ WriterConfig, _ *writerStats) partitionWriter { + return &errorWriter{} }, }) - defer w.Close() + defer func() { + _ = w.Close() + }() - if err := w.WriteMessages(context.Background(), Message{ - Value: []byte("Hello World!"), - }); err == nil { - t.Error("expected error") - return - } else if err != nil { - if !strings.Contains(err.Error(), "bad attempt") { - t.Errorf("unexpected error: %s", err) - return + for i, tc := range tcs { + err := w.WriteMessages(context.Background(), tc.msgs()...) + if err == nil { + t.Errorf("test %d: expected error", i) + continue + } + + wes, ok := err.(WriterErrors) + if !ok { + t.Errorf("test %d: expected WriterErrors", i) + continue + } + + if !tc.errorsEqual(wes) { + t.Errorf("test %d: unexpected errors occurred.\nExpected:\n%sFound:\n%s", i, tc.expected(), wes) } } } func testWriterMaxBytes(t *testing.T) { - topic := makeTopic() + tcs := []writerTestCase{ + { + { + Msg: Message{Value: []byte("Hello World!")}, + Err: MessageTooLargeError{}, + }, + { + Msg: Message{Value: []byte("Hi")}, + Err: nil, + }, + }, + { + { + Msg: Message{Value: []byte("Too large!")}, + Err: MessageTooLargeError{}, + }, + { + Msg: Message{Value: []byte("Also too long!")}, + Err: MessageTooLargeError{}, + }, + }, + } + topic := makeTopic() + maxBytes := 25 createTopic(t, topic, 1) w := newTestWriter(WriterConfig{ Topic: topic, - BatchBytes: 25, + BatchBytes: maxBytes, }) - defer w.Close() + + defer func() { + _ = w.Close() + }() if err := w.WriteMessages(context.Background(), Message{ Value: []byte("Hi"), @@ -272,37 +452,21 @@ func testWriterMaxBytes(t *testing.T) { return } - firstMsg := []byte("Hello World!") - secondMsg := []byte("LeftOver!") - msgs := []Message{ - { - Value: firstMsg, - }, - { - Value: secondMsg, - }, - } - if err := w.WriteMessages(context.Background(), msgs...); err == nil { - t.Error("expected error") - return - } else if err != nil { - switch e := err.(type) { - case MessageTooLargeError: - if string(e.Message.Value) != string(firstMsg) { - t.Errorf("unxpected returned message. Expected: %s, Got %s", firstMsg, e.Message.Value) - return - } - if len(e.Remaining) != 1 { - t.Error("expected remaining errors; found none") - return - } - if string(e.Remaining[0].Value) != string(secondMsg) { - t.Errorf("unxpected returned message. Expected: %s, Got %s", secondMsg, e.Message.Value) - return - } - default: - t.Errorf("unexpected error: %s", err) - return + for i, tc := range tcs { + err := w.WriteMessages(context.Background(), tc.msgs()...) + if err == nil { + t.Errorf("test %d: expected error", i) + continue + } + + wes, ok := err.(WriterErrors) + if !ok { + t.Errorf("test %d: expected WriterErrors", i) + continue + } + + if !tc.errorsEqual(wes) { + t.Errorf("test %d: unexpected errors occurred.\nExpected:\n%sFound:\n%s", i, tc.expected(), wes) } } } @@ -498,3 +662,106 @@ func testWriterSmallBatchBytes(t *testing.T) { t.Error("bad messages in partition", msgs) } } + +type testRetryWriter struct { + errs int +} + +func (w *testRetryWriter) messages() chan<- writerMessage { + ch := make(chan writerMessage, 1) + + go func() { + for { + msg := <-ch + if w.errs > 0 { + msg.res <- writerResponse{ + id: msg.id, + err: &WriterError{ + Msg: msg.msg, + Err: errors.New("bad attempt"), + }, + } + w.errs -= 1 + } else { + msg.res <- writerResponse{ + id: msg.id, + err: nil, + } + } + } + }() + + return ch +} + +func (w *testRetryWriter) close() { + +} + +func testWriterRetries(t *testing.T) { + tcs := []writerTestCase{ + { + { + Msg: Message{Value: []byte("test message 1")}, + Err: nil, + }, + { + Msg: Message{Value: []byte("test message 2")}, + Err: nil, + }, + }, + { + { + Msg: Message{Value: []byte("these messages")}, + Err: nil, + }, + { + Msg: Message{Value: []byte("should succeed")}, + Err: nil, + }, + { + Msg: Message{Value: []byte("for this test case")}, + Err: nil, + }, + }, + { + { + Msg: Message{Value: []byte("this message should fail")}, + Err: errors.New("bad attempt"), + }, + }, + } + + const topic = "test-writer-retry" + createTopic(t, topic, 1) + + for i, tc := range tcs { + w := newTestWriter(WriterConfig{ + Topic: topic, + MaxAttempts: 2, + Balancer: &RoundRobin{}, + newPartitionWriter: func(_ int, _ WriterConfig, _ *writerStats) partitionWriter { + return &testRetryWriter{errs: 2} + }, + }) + + err := w.WriteMessages(context.Background(), tc.msgs()...) + if err == nil { + if tc.expected() != nil { + t.Errorf("test %d: expected error", i) + } + } else { + if wes, ok := err.(WriterErrors); !ok { + t.Errorf("test %d: expected WriterErrors", i) + } else { + if !tc.errorsEqual(wes) { + t.Errorf("test %d: unexpected errors occurred.\nExpected:\n%sFound:\n%s", i, tc.expected(), wes) + } + } + } + + if err = w.Close(); err != nil { + t.Fatal(err) + } + } +} From 9e8274a257ff47772e28568e546039254dd79d15 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Tue, 11 Feb 2020 15:22:48 -0500 Subject: [PATCH 16/17] Fix race condition by making testRetryWriter type thread safe --- writer_test.go | 49 +++++++++++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/writer_test.go b/writer_test.go index d9ba2b0e4..15cedc465 100644 --- a/writer_test.go +++ b/writer_test.go @@ -5,6 +5,7 @@ import ( "errors" "io" "math" + "sync" "testing" "time" ) @@ -664,16 +665,30 @@ func testWriterSmallBatchBytes(t *testing.T) { } type testRetryWriter struct { - errs int + ch chan writerMessage + join sync.WaitGroup } func (w *testRetryWriter) messages() chan<- writerMessage { - ch := make(chan writerMessage, 1) + return w.ch +} - go func() { - for { - msg := <-ch - if w.errs > 0 { +func (w *testRetryWriter) close() { + close(w.ch) + w.join.Wait() +} + +func (w *testRetryWriter) run(errs int) { + w.join.Add(1) + defer w.join.Done() + + var done bool + for !done { + msg, ok := <-w.ch + if !ok { + done = true + } else { + if errs > 0 { msg.res <- writerResponse{ id: msg.id, err: &WriterError{ @@ -681,7 +696,7 @@ func (w *testRetryWriter) messages() chan<- writerMessage { Err: errors.New("bad attempt"), }, } - w.errs -= 1 + errs -= 1 } else { msg.res <- writerResponse{ id: msg.id, @@ -689,13 +704,13 @@ func (w *testRetryWriter) messages() chan<- writerMessage { } } } - }() - - return ch + } } -func (w *testRetryWriter) close() { - +func newTestRetryWriter(_ int, _ WriterConfig, _ *writerStats) partitionWriter { + w := &testRetryWriter{ch: make(chan writerMessage, 1)} + go w.run(2) + return w } func testWriterRetries(t *testing.T) { @@ -737,12 +752,10 @@ func testWriterRetries(t *testing.T) { for i, tc := range tcs { w := newTestWriter(WriterConfig{ - Topic: topic, - MaxAttempts: 2, - Balancer: &RoundRobin{}, - newPartitionWriter: func(_ int, _ WriterConfig, _ *writerStats) partitionWriter { - return &testRetryWriter{errs: 2} - }, + Topic: topic, + MaxAttempts: 2, + Balancer: &RoundRobin{}, + newPartitionWriter: newTestRetryWriter, }) err := w.WriteMessages(context.Background(), tc.msgs()...) From 0876ebf46b53afdad9c6c55c15fd663ce14268b9 Mon Sep 17 00:00:00 2001 From: Evan Thomas Date: Mon, 24 Feb 2020 11:27:08 -0500 Subject: [PATCH 17/17] Add Unwrap method to WriterError type. Unwrap needs to be implemented in order to use the Unwrap function added in go 1.13. --- writer.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/writer.go b/writer.go index b3b8c0e9a..ff1031d45 100644 --- a/writer.go +++ b/writer.go @@ -151,6 +151,10 @@ func (e *WriterError) Timeout() bool { return isTimeout(e.Err) } +func (e *WriterError) Unwrap() error { + return e.Err +} + type WriterErrors []WriterError func (wes WriterErrors) Error() string {