Skip to content

Commit 91db972

Browse files
committed
[core] Support refresh toCompact files in BucketedAppendCompactManager
1 parent 654607a commit 91db972

File tree

5 files changed

+393
-28
lines changed

5 files changed

+393
-28
lines changed

paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java

+47
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.PriorityQueue;
4646
import java.util.concurrent.ExecutionException;
4747
import java.util.concurrent.ExecutorService;
48+
import java.util.function.Supplier;
4849

4950
import static java.util.Collections.emptyList;
5051

@@ -65,6 +66,8 @@ public class BucketedAppendCompactManager extends CompactFutureManager {
6566

6667
private List<DataFileMeta> compacting;
6768

69+
private final Supplier<List<DataFileMeta>> dataFilesSupplier;
70+
6871
@Nullable private final CompactionMetrics.Reporter metricsReporter;
6972

7073
public BucketedAppendCompactManager(
@@ -76,6 +79,28 @@ public BucketedAppendCompactManager(
7679
long targetFileSize,
7780
CompactRewriter rewriter,
7881
@Nullable CompactionMetrics.Reporter metricsReporter) {
82+
this(
83+
executor,
84+
restored,
85+
dvMaintainer,
86+
minFileNum,
87+
maxFileNum,
88+
targetFileSize,
89+
rewriter,
90+
metricsReporter,
91+
null);
92+
}
93+
94+
public BucketedAppendCompactManager(
95+
ExecutorService executor,
96+
List<DataFileMeta> restored,
97+
@Nullable DeletionVectorsMaintainer dvMaintainer,
98+
int minFileNum,
99+
int maxFileNum,
100+
long targetFileSize,
101+
CompactRewriter rewriter,
102+
@Nullable CompactionMetrics.Reporter metricsReporter,
103+
@Nullable Supplier<List<DataFileMeta>> dataFilesSupplier) {
79104
this.executor = executor;
80105
this.dvMaintainer = dvMaintainer;
81106
this.toCompact = new PriorityQueue<>(fileComparator(false));
@@ -85,10 +110,15 @@ public BucketedAppendCompactManager(
85110
this.targetFileSize = targetFileSize;
86111
this.rewriter = rewriter;
87112
this.metricsReporter = metricsReporter;
113+
this.dataFilesSupplier = dataFilesSupplier;
88114
}
89115

90116
@Override
91117
public void triggerCompaction(boolean fullCompaction) {
118+
if (dataFilesSupplier != null) {
119+
refreshToCompact();
120+
}
121+
92122
if (fullCompaction) {
93123
triggerFullCompaction();
94124
} else {
@@ -120,6 +150,23 @@ private void triggerFullCompaction() {
120150
toCompact.clear();
121151
}
122152

153+
@VisibleForTesting
154+
protected void refreshToCompact() {
155+
List<DataFileMeta> latestDataFiles = new ArrayList<>(dataFilesSupplier.get());
156+
List<DataFileMeta> currentDataFiles = new ArrayList<>(toCompact);
157+
158+
if (LOG.isDebugEnabled()) {
159+
LOG.debug(
160+
"Current bucket latest snapshot files count {}, toCompact files count {}.",
161+
latestDataFiles.size(),
162+
currentDataFiles.size());
163+
}
164+
165+
// deduplicate files
166+
latestDataFiles.removeIf(currentDataFiles::remove);
167+
toCompact.addAll(latestDataFiles);
168+
}
169+
123170
private void recordCompactionsQueuedRequest() {
124171
if (metricsReporter != null) {
125172
metricsReporter.increaseCompactionsQueuedCount();

paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java

+42-11
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.operation;
2020

2121
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.Snapshot;
2223
import org.apache.paimon.append.BucketedAppendCompactManager;
2324
import org.apache.paimon.compact.CompactManager;
2425
import org.apache.paimon.compact.NoopCompactManager;
@@ -28,15 +29,18 @@
2829
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
2930
import org.apache.paimon.fs.FileIO;
3031
import org.apache.paimon.io.DataFileMeta;
32+
import org.apache.paimon.manifest.ManifestEntry;
3133
import org.apache.paimon.types.RowType;
3234
import org.apache.paimon.utils.FileStorePathFactory;
3335
import org.apache.paimon.utils.SnapshotManager;
3436

3537
import javax.annotation.Nullable;
3638

39+
import java.util.ArrayList;
3740
import java.util.List;
3841
import java.util.concurrent.ExecutorService;
3942
import java.util.function.Function;
43+
import java.util.function.Supplier;
4044

4145
/** {@link AppendOnlyFileStoreWrite} for {@link org.apache.paimon.table.BucketMode#HASH_FIXED}. */
4246
public class AppendOnlyFixedBucketFileStoreWrite extends AppendOnlyFileStoreWrite {
@@ -85,22 +89,49 @@ protected CompactManager getCompactManager(
8589
dvMaintainer != null
8690
? f -> dvMaintainer.deletionVectorOf(f).orElse(null)
8791
: null;
88-
return new BucketedAppendCompactManager(
89-
compactExecutor,
90-
restoredFiles,
91-
dvMaintainer,
92-
options.compactionMinFileNum(),
93-
options.compactionMaxFileNum().orElse(5),
94-
options.targetFileSize(false),
95-
files -> compactRewrite(partition, bucket, dvFactory, files),
96-
compactionMetrics == null
97-
? null
98-
: compactionMetrics.createReporter(partition, bucket));
92+
if (options.compactionForceRefreshFiles()) {
93+
Supplier<List<DataFileMeta>> dataFilesSupplier =
94+
() -> scanBucketFiles(snapshotManager.latestSnapshot(), partition, bucket);
95+
return new BucketedAppendCompactManager(
96+
compactExecutor,
97+
restoredFiles,
98+
dvMaintainer,
99+
options.compactionMinFileNum(),
100+
options.compactionMaxFileNum().orElse(5),
101+
options.targetFileSize(false),
102+
files -> compactRewrite(partition, bucket, dvFactory, files),
103+
compactionMetrics == null
104+
? null
105+
: compactionMetrics.createReporter(partition, bucket),
106+
dataFilesSupplier);
107+
} else {
108+
return new BucketedAppendCompactManager(
109+
compactExecutor,
110+
restoredFiles,
111+
dvMaintainer,
112+
options.compactionMinFileNum(),
113+
options.compactionMaxFileNum().orElse(5),
114+
options.targetFileSize(false),
115+
files -> compactRewrite(partition, bucket, dvFactory, files),
116+
compactionMetrics == null
117+
? null
118+
: compactionMetrics.createReporter(partition, bucket));
119+
}
99120
}
100121
}
101122

102123
@Override
103124
protected Function<WriterContainer<InternalRow>, Boolean> createWriterCleanChecker() {
104125
return createConflictAwareWriterCleanChecker(commitUser, snapshotManager);
105126
}
127+
128+
private List<DataFileMeta> scanBucketFiles(Snapshot snapshot, BinaryRow partition, int bucket) {
129+
List<DataFileMeta> dataFileMetas = new ArrayList<>();
130+
List<ManifestEntry> files =
131+
scan.withSnapshot(snapshot).withPartitionBucket(partition, bucket).plan().files();
132+
for (ManifestEntry entry : files) {
133+
dataFileMetas.add(entry.file());
134+
}
135+
return dataFileMetas;
136+
}
106137
}

paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java

+33
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,39 @@ public void testPick() {
191191
Collections.singletonList(newFile(2621L, 2630L)));
192192
}
193193

194+
@Test
195+
public void testToCompactRefresh() {
196+
int minFileNum = 4;
197+
int maxFileNum = 12;
198+
long targetFileSize = 1024;
199+
200+
List<DataFileMeta> totalFiles =
201+
Arrays.asList(
202+
newFile(1L, 1024L),
203+
newFile(1025L, 2049L),
204+
newFile(2050L, 2500L),
205+
newFile(2501L, 4096L),
206+
newFile(4097L, 6000L));
207+
208+
List<DataFileMeta> toCompact = totalFiles.subList(0, 4);
209+
List<DataFileMeta> toRefresh = totalFiles.subList(1, 5);
210+
211+
BucketedAppendCompactManager manager =
212+
new BucketedAppendCompactManager(
213+
null, // not used
214+
toCompact,
215+
null,
216+
minFileNum,
217+
maxFileNum,
218+
targetFileSize,
219+
null, // not used
220+
null,
221+
() -> toRefresh);
222+
223+
manager.refreshToCompact();
224+
assertThat(manager.getToCompact()).containsExactlyElementsOf(totalFiles);
225+
}
226+
194227
private void innerTest(
195228
List<DataFileMeta> toCompactBeforePick,
196229
boolean expectedPresent,

0 commit comments

Comments
 (0)