Skip to content

Commit 8b1bb1b

Browse files
authored
chore: adding write delete stale logic (#98)
fixes: #94
1 parent 464cca8 commit 8b1bb1b

File tree

5 files changed

+94
-3
lines changed

5 files changed

+94
-3
lines changed

Diff for: lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java

+15
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.protobuf.ByteString;
44
import io.cloudquery.helper.ArrowHelper;
5+
import io.cloudquery.messages.WriteDeleteStale;
56
import io.cloudquery.messages.WriteInsert;
67
import io.cloudquery.messages.WriteMessage;
78
import io.cloudquery.messages.WriteMigrateTable;
@@ -16,6 +17,7 @@
1617
import io.grpc.stub.StreamObserver;
1718
import java.io.IOException;
1819
import java.util.ArrayList;
20+
import java.util.Date;
1921
import java.util.List;
2022

2123
public class PluginServer extends PluginImplBase {
@@ -125,6 +127,10 @@ public void onNext(Write.Request request) {
125127
plugin.write(processMigrateTableRequest(request));
126128
} else if (messageCase == Write.Request.MessageCase.INSERT) {
127129
plugin.write(processInsertRequest(request));
130+
} else if (messageCase == Write.Request.MessageCase.DELETE) {
131+
plugin.write(processDeleteStaleRequest(request));
132+
} else {
133+
throw new IllegalArgumentException("Unknown message type: " + messageCase);
128134
}
129135
} catch (IOException | ValidationException ex) {
130136
onError(ex);
@@ -166,4 +172,13 @@ private WriteMessage processInsertRequest(Write.Request request)
166172
ByteString record = insert.getRecord();
167173
return new WriteInsert(ArrowHelper.decodeResource(record));
168174
}
175+
176+
private WriteMessage processDeleteStaleRequest(Write.Request request)
177+
throws IOException, ValidationException {
178+
Write.MessageDeleteStale messageDeleteStale = request.getDelete();
179+
return new WriteDeleteStale(
180+
messageDeleteStale.getTableName(),
181+
messageDeleteStale.getSourceName(),
182+
new Date(messageDeleteStale.getSyncTime().getSeconds() * 1000));
183+
}
169184
}

Diff for: lib/src/main/java/io/cloudquery/memdb/MemDBClient.java

+46-3
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,30 @@
11
package io.cloudquery.memdb;
22

3+
import io.cloudquery.messages.WriteDeleteStale;
34
import io.cloudquery.messages.WriteInsert;
45
import io.cloudquery.messages.WriteMessage;
56
import io.cloudquery.messages.WriteMigrateTable;
7+
import io.cloudquery.scalar.Timestamp;
68
import io.cloudquery.schema.ClientMeta;
9+
import io.cloudquery.schema.Column;
710
import io.cloudquery.schema.Resource;
811
import io.cloudquery.schema.Table;
912
import io.cloudquery.schema.TableColumnChange;
1013
import java.util.ArrayList;
14+
import java.util.Date;
1115
import java.util.HashMap;
1216
import java.util.List;
1317
import java.util.Map;
18+
import java.util.Objects;
19+
import java.util.Optional;
1420
import java.util.concurrent.locks.ReentrantReadWriteLock;
1521

1622
public class MemDBClient implements ClientMeta {
1723
private static final String id = "memdb";
1824

19-
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
20-
private Map<String, Table> tables = new HashMap<>();
21-
private Map<String, List<Resource>> memDB = new HashMap<>();
25+
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
26+
private final Map<String, Table> tables = new HashMap<>();
27+
private final Map<String, List<Resource>> memDB = new HashMap<>();
2228

2329
public MemDBClient() {}
2430

@@ -37,6 +43,9 @@ public void write(WriteMessage message) {
3743
if (message instanceof WriteInsert insert) {
3844
insert(insert);
3945
}
46+
if (message instanceof WriteDeleteStale deleteStale) {
47+
deleteStale(deleteStale);
48+
}
4049
} finally {
4150
lock.writeLock().unlock();
4251
}
@@ -74,6 +83,40 @@ private void overwrite(Table table, Resource resource) {
7483
memDB.get(tableName).add(resource);
7584
}
7685

86+
private void deleteStale(WriteDeleteStale deleteStale) {
87+
String tableName = deleteStale.getTableName();
88+
89+
List<Resource> filteredList = new ArrayList<>();
90+
91+
for (int i = 0; i < memDB.get(tableName).size(); i++) {
92+
Resource row = memDB.get(tableName).get(i);
93+
Optional<Column> sourceColumn = row.getTable().getColumn(Column.CQ_SOURCE_NAME);
94+
if (sourceColumn.isEmpty()) {
95+
continue;
96+
}
97+
Optional<Column> syncColumn = row.getTable().getColumn(Column.CQ_SYNC_TIME);
98+
if (syncColumn.isEmpty()) {
99+
continue;
100+
}
101+
102+
String sourceName = "";
103+
if (row.get(Column.CQ_SOURCE_NAME) != null) {
104+
sourceName = row.get(Column.CQ_SOURCE_NAME).toString();
105+
}
106+
107+
if (Objects.equals(sourceName, deleteStale.getSourceName())) {
108+
Date rowSyncTime = new Date(0);
109+
if (row.get(Column.CQ_SYNC_TIME) != null) {
110+
rowSyncTime = new Date(((Timestamp) row.get(Column.CQ_SYNC_TIME)).get());
111+
}
112+
if (!rowSyncTime.before(deleteStale.getTimestamp())) {
113+
filteredList.add(row);
114+
}
115+
}
116+
}
117+
memDB.put(tableName, filteredList);
118+
}
119+
77120
public void close() {
78121
// do nothing
79122
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.cloudquery.messages;
2+
3+
import java.util.Date;
4+
import lombok.AllArgsConstructor;
5+
import lombok.Getter;
6+
7+
@AllArgsConstructor
8+
@Getter
9+
public class WriteDeleteStale extends WriteMessage {
10+
private String tableName;
11+
private String sourceName;
12+
private Date timestamp;
13+
}

Diff for: lib/src/main/java/io/cloudquery/schema/Column.java

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
@Builder(toBuilder = true)
99
@Getter
1010
public class Column {
11+
public static final String CQ_SOURCE_NAME = "_cq_source_name";
12+
public static final String CQ_SYNC_TIME = "_cq_sync_time";
1113
private String name;
1214
private String description;
1315
private ArrowType type;

Diff for: lib/src/test/java/io/cloudquery/internal/servers/plugin/v3/PluginServerTest.java

+18
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import com.google.protobuf.ByteString;
77
import io.cloudquery.helper.ArrowHelper;
8+
import io.cloudquery.messages.WriteDeleteStale;
89
import io.cloudquery.messages.WriteInsert;
910
import io.cloudquery.messages.WriteMigrateTable;
1011
import io.cloudquery.plugin.Plugin;
@@ -78,6 +79,18 @@ public void shouldSendWriteInsertMessage() throws Exception {
7879
verify(plugin).write(any(WriteInsert.class));
7980
}
8081

82+
@Test
83+
public void shouldSendWriteDeleteStaleMessage() throws Exception {
84+
NullResponseStream<Write.Response> responseObserver = new NullResponseStream<>();
85+
86+
StreamObserver<Write.Request> writeService = pluginStub.write(responseObserver);
87+
writeService.onNext(generateDeleteStaleMessage());
88+
writeService.onCompleted();
89+
responseObserver.await();
90+
91+
verify(plugin).write(any(WriteDeleteStale.class));
92+
}
93+
8194
private static Write.Request generateMigrateTableMessage() throws IOException {
8295
Table table = Table.builder().name("test").build();
8396
return Write.Request.newBuilder()
@@ -96,6 +109,11 @@ private Write.Request generateInsertMessage() throws IOException, ValidationExce
96109
return Write.Request.newBuilder().setInsert(messageInsert).build();
97110
}
98111

112+
private Write.Request generateDeleteStaleMessage() {
113+
Write.MessageDeleteStale messageDeleteStale = Write.MessageDeleteStale.newBuilder().build();
114+
return Write.Request.newBuilder().setDelete(messageDeleteStale).build();
115+
}
116+
99117
private static class NullResponseStream<T> implements StreamObserver<T> {
100118
private final CountDownLatch countDownLatch = new CountDownLatch(1);
101119

0 commit comments

Comments
 (0)