Skip to content

Commit dd2c1a5

Browse files
authored
feat(sync): Send migrate messages (#79)
First part of the sync, sending migrate table messages to the client
1 parent 4b77a2f commit dd2c1a5

File tree

7 files changed

+108
-27
lines changed

7 files changed

+108
-27
lines changed

Diff for: lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java

+14-22
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,14 @@
11
package io.cloudquery.internal.servers.plugin.v3;
22

33
import com.google.protobuf.ByteString;
4+
import io.cloudquery.plugin.BackendOptions;
45
import io.cloudquery.plugin.Plugin;
56
import io.cloudquery.plugin.v3.PluginGrpc.PluginImplBase;
67
import io.cloudquery.plugin.v3.Write;
78
import io.cloudquery.schema.Table;
89
import io.grpc.stub.StreamObserver;
9-
import java.io.ByteArrayOutputStream;
10-
import java.nio.channels.Channels;
1110
import java.util.ArrayList;
1211
import java.util.List;
13-
import org.apache.arrow.memory.BufferAllocator;
14-
import org.apache.arrow.memory.RootAllocator;
15-
import org.apache.arrow.vector.VectorSchemaRoot;
16-
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
17-
import org.apache.arrow.vector.types.pojo.Schema;
1812

1913
public class PluginServer extends PluginImplBase {
2014
private final Plugin plugin;
@@ -64,18 +58,7 @@ public void getTables(
6458
request.getSkipDependentTables());
6559
List<ByteString> byteStrings = new ArrayList<>();
6660
for (Table table : tables) {
67-
try (BufferAllocator bufferAllocator = new RootAllocator()) {
68-
Schema schema = table.toArrowSchema();
69-
VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator);
70-
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
71-
try (ArrowStreamWriter writer =
72-
new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) {
73-
writer.start();
74-
writer.end();
75-
byteStrings.add(ByteString.copyFrom(out.toByteArray()));
76-
}
77-
}
78-
}
61+
byteStrings.add(table.encode());
7962
}
8063
responseObserver.onNext(
8164
io.cloudquery.plugin.v3.GetTables.Response.newBuilder()
@@ -91,9 +74,18 @@ public void getTables(
9174
public void sync(
9275
io.cloudquery.plugin.v3.Sync.Request request,
9376
StreamObserver<io.cloudquery.plugin.v3.Sync.Response> responseObserver) {
94-
plugin.sync();
95-
responseObserver.onNext(io.cloudquery.plugin.v3.Sync.Response.newBuilder().build());
96-
responseObserver.onCompleted();
77+
try {
78+
plugin.sync(
79+
request.getTablesList(),
80+
request.getSkipTablesList(),
81+
request.getSkipDependentTables(),
82+
request.getDeterministicCqId(),
83+
new BackendOptions(
84+
request.getBackend().getTableName(), request.getBackend().getConnection()),
85+
responseObserver);
86+
} catch (Exception e) {
87+
responseObserver.onError(e);
88+
}
9789
}
9890

9991
@Override

Diff for: lib/src/main/java/io/cloudquery/memdb/MemDB.java

+19-3
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package io.cloudquery.memdb;
22

3+
import io.cloudquery.plugin.BackendOptions;
34
import io.cloudquery.plugin.Plugin;
5+
import io.cloudquery.scheduler.Scheduler;
46
import io.cloudquery.schema.Column;
57
import io.cloudquery.schema.SchemaException;
68
import io.cloudquery.schema.Table;
9+
import io.grpc.stub.StreamObserver;
710
import java.util.List;
811
import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;
912

@@ -36,9 +39,22 @@ public List<Table> tables(
3639
}
3740

3841
@Override
39-
public void sync() {
40-
// TODO Auto-generated method stub
41-
throw new UnsupportedOperationException("Unimplemented method 'Sync'");
42+
public void sync(
43+
List<String> includeList,
44+
List<String> skipList,
45+
boolean skipDependentTables,
46+
boolean deterministicCqId,
47+
BackendOptions backendOptions,
48+
StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream)
49+
throws SchemaException {
50+
List<Table> filtered = Table.filterDFS(allTables, includeList, skipList, skipDependentTables);
51+
Scheduler.builder()
52+
.tables(filtered)
53+
.syncStream(syncStream)
54+
.deterministicCqId(deterministicCqId)
55+
.logger(getLogger())
56+
.build()
57+
.sync();
4258
}
4359

4460
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.cloudquery.plugin;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Getter;
5+
6+
@AllArgsConstructor
7+
@Getter
8+
public class BackendOptions {
9+
private final String tableName;
10+
private final String connection;
11+
}

Diff for: lib/src/main/java/io/cloudquery/plugin/Plugin.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.cloudquery.schema.SchemaException;
44
import io.cloudquery.schema.Table;
5+
import io.grpc.stub.StreamObserver;
56
import java.util.List;
67
import lombok.Getter;
78
import lombok.NonNull;
@@ -22,7 +23,14 @@ public abstract List<Table> tables(
2223
List<String> includeList, List<String> skipList, boolean skipDependentTables)
2324
throws SchemaException;
2425

25-
public abstract void sync();
26+
public abstract void sync(
27+
List<String> includeList,
28+
List<String> skipList,
29+
boolean skipDependentTables,
30+
boolean deterministicCqId,
31+
BackendOptions backendOptions,
32+
StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream)
33+
throws SchemaException;
2634

2735
public abstract void read();
2836

+31-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,35 @@
11
package io.cloudquery.scheduler;
22

3+
import io.cloudquery.plugin.v3.Sync;
4+
import io.cloudquery.schema.Table;
5+
import io.grpc.stub.StreamObserver;
6+
import java.io.IOException;
7+
import java.util.List;
8+
import lombok.Builder;
9+
import lombok.NonNull;
10+
import org.apache.logging.log4j.Logger;
11+
12+
@Builder
313
public class Scheduler {
4-
public Scheduler() {}
14+
@NonNull private final List<Table> tables;
15+
@NonNull private final StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream;
16+
@NonNull private final Logger logger;
17+
18+
private boolean deterministicCqId;
19+
20+
public void sync() {
21+
for (Table table : tables) {
22+
try {
23+
logger.info("sending migrate message for table: {}", table.getName());
24+
Sync.MessageMigrateTable migrateTable =
25+
Sync.MessageMigrateTable.newBuilder().setTable(table.encode()).build();
26+
Sync.Response response = Sync.Response.newBuilder().setMigrateTable(migrateTable).build();
27+
syncStream.onNext(response);
28+
} catch (IOException e) {
29+
syncStream.onError(e);
30+
return;
31+
}
32+
}
33+
syncStream.onCompleted();
34+
}
535
}

Diff for: lib/src/main/java/io/cloudquery/schema/Table.java

+23
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@
22

33
import static java.util.Arrays.asList;
44

5+
import com.google.protobuf.ByteString;
56
import io.cloudquery.glob.Glob;
67
import io.cloudquery.schema.Column.ColumnBuilder;
78
import io.cloudquery.transformers.TransformerException;
9+
import java.io.ByteArrayOutputStream;
10+
import java.io.IOException;
11+
import java.nio.channels.Channels;
812
import java.util.ArrayList;
913
import java.util.Collections;
1014
import java.util.HashMap;
@@ -16,6 +20,10 @@
1620
import lombok.Getter;
1721
import lombok.NonNull;
1822
import lombok.Setter;
23+
import org.apache.arrow.memory.BufferAllocator;
24+
import org.apache.arrow.memory.RootAllocator;
25+
import org.apache.arrow.vector.VectorSchemaRoot;
26+
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
1927
import org.apache.arrow.vector.types.pojo.Field;
2028
import org.apache.arrow.vector.types.pojo.Schema;
2129

@@ -228,4 +236,19 @@ public Schema toArrowSchema() {
228236
Schema schema = new Schema(asList(fields), metadata);
229237
return schema;
230238
}
239+
240+
public ByteString encode() throws IOException {
241+
try (BufferAllocator bufferAllocator = new RootAllocator()) {
242+
Schema schema = toArrowSchema();
243+
VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator);
244+
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
245+
try (ArrowStreamWriter writer =
246+
new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) {
247+
writer.start();
248+
writer.end();
249+
return ByteString.copyFrom(out.toByteArray());
250+
}
251+
}
252+
}
253+
}
231254
}

Diff for: lib/src/main/java/io/cloudquery/server/ServeCommand.java

+1
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ private LoggerContext initLogger() {
8888
context.start(configuration);
8989

9090
logger = context.getLogger(ServeCommand.class.getName());
91+
this.plugin.setLogger(logger);
9192
return context;
9293
}
9394

0 commit comments

Comments
 (0)