Skip to content

Commit cc49e7c

Browse files
authored
Clean up errors sent to the client during Topic Unload (#1282)
### Motivation While investigating on #1281 I have found a couple of enhancements: - we should add more logging to troubleshoot unknown errors during topic unload - we can improve some logging - in case of CursorAlreadyClosedException we return UNKNOWN_SERVER_ERROR and NOT_LEADER_FOR_PARTITION and this generates a "received an unknown error" log line on the client, that is pretty scary ### Modifications - improve the toString representation of the callback passed to asyncFindPosition - return NOT_LEADER_FOR_PARTITION in case of CursorAlreadyClosedException and ManagedLedgerFencedException - reduce log level of some parts of the code
1 parent b6be650 commit cc49e7c

File tree

5 files changed

+42
-12
lines changed

5 files changed

+42
-12
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,12 @@ private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long
229229
final ManagedLedger ledger = topic.getManagedLedger();
230230

231231
if (((ManagedLedgerImpl) ledger).getState() == ManagedLedgerImpl.State.Closed) {
232-
log.error("[{}] Async get cursor for offset {} failed, because current managedLedger has been closed",
233-
requestHandler.ctx.channel(), offset);
232+
log.error("[{}] Async get cursor for offset {} for topic {} failed, "
233+
+ "because current managedLedger has been closed",
234+
requestHandler.ctx.channel(), offset, topic.getName());
234235
CompletableFuture<Pair<ManagedCursor, Long>> future = new CompletableFuture<>();
235-
future.completeExceptionally(new Exception("Current managedLedger has been closed."));
236+
future.completeExceptionally(new Exception("Current managedLedger for "
237+
+ topic.getName() + " has been closed."));
236238
return future;
237239
}
238240

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -351,8 +351,6 @@ private void handlePartitionData(final TopicPartition topicPartition,
351351
} else {
352352
cursorFuture.whenComplete((cursorLongPair, ex) -> {
353353
if (ex != null) {
354-
log.error("KafkaTopicConsumerManager.asyncGetCursorByOffset({}) failed for topic {}.",
355-
offset, topicPartition, ex.getCause());
356354
registerPrepareMetadataFailedEvent(startPrepareMetadataNanos);
357355
requestHandler.getKafkaTopicManagerSharedState()
358356
.getKafkaTopicConsumerManagerCache().removeAndCloseByTopic(fullTopicName);
@@ -375,7 +373,16 @@ private void handlePartitionData(final TopicPartition topicPartition,
375373
if (throwable != null) {
376374
tcm.deleteOneCursorAsync(cursorLongPair.getLeft(),
377375
"cursor.readEntry fail. deleteCursor");
378-
addErrorPartitionResponse(topicPartition, Errors.forException(throwable));
376+
if (throwable instanceof ManagedLedgerException.CursorAlreadyClosedException
377+
|| throwable
378+
instanceof ManagedLedgerException.ManagedLedgerFencedException) {
379+
addErrorPartitionResponse(topicPartition,
380+
Errors.NOT_LEADER_FOR_PARTITION);
381+
} else {
382+
log.error("Read entry error on {}", partitionData, throwable);
383+
addErrorPartitionResponse(topicPartition,
384+
Errors.forException(throwable));
385+
}
379386
} else if (entries == null) {
380387
addErrorPartitionResponse(topicPartition,
381388
Errors.forException(new ApiException("Cursor is null")));
@@ -605,7 +612,7 @@ public void markDeleteComplete(Object ctx) {
605612
// this is OK, since this is kind of cumulative ack, following commit will come.
606613
@Override
607614
public void markDeleteFailed(ManagedLedgerException e, Object ctx) {
608-
log.warn("Mark delete success for position: {} with error:",
615+
log.warn("Mark delete failed for position: {} with error:",
609616
currentPosition, e);
610617
}
611618
}, null);

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ public EncodeResult encode(final EncodeRequest encodeRequest) {
9191
sequenceId = Commands.initBatchMessageMetadata(msgMetadata, message.getMessageBuilder());
9292
}
9393
currentBatchSizeBytes += message.getDataBuffer().readableBytes();
94-
if (log.isDebugEnabled()) {
95-
log.debug("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ",
94+
if (log.isTraceEnabled()) {
95+
log.trace("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ",
9696
sequenceId, numMessagesInBatch, currentBatchSizeBytes);
9797
}
9898

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageMetadataUtils.java

+21-2
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
*/
1414
package io.streamnative.pulsar.handlers.kop.utils;
1515

16+
import com.google.common.base.Predicate;
1617
import io.netty.buffer.ByteBuf;
1718
import io.streamnative.pulsar.handlers.kop.exceptions.MetadataCorruptedException;
1819
import java.util.concurrent.CompletableFuture;
1920
import javax.annotation.Nullable;
21+
import lombok.AllArgsConstructor;
2022
import lombok.extern.slf4j.Slf4j;
2123
import org.apache.bookkeeper.mledger.AsyncCallbacks;
2224
import org.apache.bookkeeper.mledger.Entry;
@@ -171,7 +173,18 @@ public static long getMockOffset(long ledgerId, long entryId) {
171173
public static CompletableFuture<Position> asyncFindPosition(final ManagedLedger managedLedger,
172174
final long offset,
173175
final boolean skipMessagesWithoutIndex) {
174-
return managedLedger.asyncFindPosition(entry -> {
176+
return managedLedger.asyncFindPosition(new FindEntryByOffset(managedLedger,
177+
offset, skipMessagesWithoutIndex));
178+
}
179+
180+
@AllArgsConstructor
181+
private static class FindEntryByOffset implements Predicate<Entry> {
182+
private final ManagedLedger managedLedger;
183+
private final long offset;
184+
private final boolean skipMessagesWithoutIndex;
185+
186+
@Override
187+
public boolean apply(Entry entry) {
175188
if (entry == null) {
176189
// `entry` should not be null, add the null check here to fix the spotbugs check
177190
return false;
@@ -191,6 +204,12 @@ public static CompletableFuture<Position> asyncFindPosition(final ManagedLedger
191204
} finally {
192205
entry.release();
193206
}
194-
});
207+
}
208+
209+
@Override
210+
public String toString() {
211+
return "FindEntryByOffset{ " + offset + "}";
212+
}
195213
}
214+
196215
}

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,9 @@ public void testUnloadTopic() throws Exception {
514514
topicConsumerManager.removeCursorFuture(totalMessages - 1).get();
515515
fail("should have failed");
516516
} catch (ExecutionException ex) {
517-
assertTrue(ex.getCause().getMessage().contains("Current managedLedger has been closed."));
517+
log.info("error", ex);
518+
assertTrue(ex.getCause().getMessage().contains("Current managedLedger for "
519+
+ fullTopicName + " has been closed."));
518520
}
519521

520522
}

0 commit comments

Comments
 (0)