committerSupplier
+ ) throws IndexSizeExceededException, SegmentNotWritableException
{
return add(identifier, row, committerSupplier, true);
}
@@ -74,15 +83,13 @@ default AppenderatorAddResult add(SegmentIdWithShardSpec identifier, InputRow ro
* Committer is guaranteed to be *created* synchronously with the call to add, but will actually be used
* asynchronously.
*
- * If committer is not provided, no metadata is persisted. If it's provided, {@link #add}, {@link #clear},
- * {@link #persistAll}, and {@link #push} methods should all be called from the same thread to keep the metadata
- * committed by Committer in sync.
+ * If committer is not provided, no metadata is persisted.
*
* @param identifier the segment into which this row should be added
* @param row the row to add
- * @param committerSupplier supplier of a committer associated with all data that has been added, including this row
- * if {@param allowIncrementalPersists} is set to false then this will not be used as no
- * persist will be done automatically
+ * @param committerSupplier supplier of a committer associated with all data that has been added, including
+ * this row if {@code allowIncrementalPersists} is set to false then this will not be
+ * used as no persist will be done automatically
* @param allowIncrementalPersists indicate whether automatic persist should be performed or not if required.
* If this flag is set to false then the return value should have
* {@link AppenderatorAddResult#isPersistRequired} set to true if persist was skipped
@@ -116,6 +123,7 @@ AppenderatorAddResult add(
*
* @throws IllegalStateException if the segment is unknown
*/
+ @VisibleForTesting
int getRowCount(SegmentIdWithShardSpec identifier);
/**
@@ -129,20 +137,23 @@ AppenderatorAddResult add(
* Drop all in-memory and on-disk data, and forget any previously-remembered commit metadata. This could be useful if,
* for some reason, rows have been added that we do not actually want to hand off. Blocks until all data has been
* cleared. This may take some time, since all pending persists must finish first.
- *
- * {@link #add}, {@link #clear}, {@link #persistAll}, and {@link #push} methods should all be called from the same
- * thread to keep the metadata committed by Committer in sync.
*/
+ @VisibleForTesting
void clear() throws InterruptedException;
/**
- * Drop all data associated with a particular pending segment. Unlike {@link #clear()}), any on-disk commit
- * metadata will remain unchanged. If there is no pending segment with this identifier, then this method will
+ * Schedule dropping all data associated with a particular pending segment. Unlike {@link #clear()}), any on-disk
+ * commit metadata will remain unchanged. If there is no pending segment with this identifier, then this method will
* do nothing.
*
* You should not write to the dropped segment after calling "drop". If you need to drop all your data and
* re-write it, consider {@link #clear()} instead.
*
+ * This method might be called concurrently from a thread different from the "main data appending / indexing thread",
+ * from where all other methods in this class (except those inherited from {@link QuerySegmentWalker}) are called.
+ * This typically happens when {@code drop()} is called in an async future callback. drop() itself is cheap
+ * and relays heavy dropping work to an internal executor of this Appenderator.
+ *
* @param identifier the pending segment to drop
*
* @return future that resolves when data is dropped
@@ -155,9 +166,7 @@ AppenderatorAddResult add(
* be used asynchronously. Any metadata returned by the committer will be associated with the data persisted to
* disk.
*
- * If committer is not provided, no metadata is persisted. If it's provided, {@link #add}, {@link #clear},
- * {@link #persistAll}, and {@link #push} methods should all be called from the same thread to keep the metadata
- * committed by Committer in sync.
+ * If committer is not provided, no metadata is persisted.
*
* @param committer a committer associated with all data that has been added so far
*
@@ -171,9 +180,7 @@ AppenderatorAddResult add(
*
* After this method is called, you cannot add new data to any segments that were previously under construction.
*
- * If committer is not provided, no metadata is persisted. If it's provided, {@link #add}, {@link #clear},
- * {@link #persistAll}, and {@link #push} methods should all be called from the same thread to keep the metadata
- * committed by Committer in sync.
+ * If committer is not provided, no metadata is persisted.
*
* @param identifiers list of segments to push
* @param committer a committer associated with all data that has been added so far
@@ -189,8 +196,9 @@ ListenableFuture push(
);
/**
- * Stop any currently-running processing and clean up after ourselves. This allows currently running persists and pushes
- * to finish. This will not remove any on-disk persisted data, but it will drop any data that has not yet been persisted.
+ * Stop any currently-running processing and clean up after ourselves. This allows currently running persists and
+ * pushes to finish. This will not remove any on-disk persisted data, but it will drop any data that has not yet been
+ * persisted.
*/
@Override
void close();
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 83be7dc24310..351013474c1e 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -94,6 +94,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -121,7 +122,13 @@ public class AppenderatorImpl implements Appenderator
private final IndexIO indexIO;
private final IndexMerger indexMerger;
private final Cache cache;
- private final Map