Skip to content

Commit bd729bb

Browse files
authored
feat(sync): Initial insert message support (#81)
1 parent ead7dd9 commit bd729bb

16 files changed

+245
-20
lines changed

Diff for: lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.protobuf.ByteString;
44
import io.cloudquery.plugin.BackendOptions;
5+
import io.cloudquery.plugin.NewClientOptions;
56
import io.cloudquery.plugin.Plugin;
67
import io.cloudquery.plugin.v3.PluginGrpc.PluginImplBase;
78
import io.cloudquery.plugin.v3.Write;
@@ -41,9 +42,15 @@ public void getVersion(
4142
public void init(
4243
io.cloudquery.plugin.v3.Init.Request request,
4344
StreamObserver<io.cloudquery.plugin.v3.Init.Response> responseObserver) {
44-
plugin.init();
45-
responseObserver.onNext(io.cloudquery.plugin.v3.Init.Response.newBuilder().build());
46-
responseObserver.onCompleted();
45+
try {
46+
plugin.init(
47+
request.getSpec().toStringUtf8(),
48+
NewClientOptions.builder().noConnection(request.getNoConnection()).build());
49+
responseObserver.onNext(io.cloudquery.plugin.v3.Init.Response.newBuilder().build());
50+
responseObserver.onCompleted();
51+
} catch (Exception e) {
52+
responseObserver.onError(e);
53+
}
4754
}
4855

4956
@Override

Diff for: lib/src/main/java/io/cloudquery/memdb/MemDB.java

+51-10
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
package io.cloudquery.memdb;
22

33
import io.cloudquery.plugin.BackendOptions;
4+
import io.cloudquery.plugin.ClientNotInitializedException;
5+
import io.cloudquery.plugin.NewClientOptions;
46
import io.cloudquery.plugin.Plugin;
7+
import io.cloudquery.plugin.TableOutputStream;
58
import io.cloudquery.scheduler.Scheduler;
9+
import io.cloudquery.schema.ClientMeta;
610
import io.cloudquery.schema.Column;
11+
import io.cloudquery.schema.Resource;
712
import io.cloudquery.schema.SchemaException;
813
import io.cloudquery.schema.Table;
14+
import io.cloudquery.schema.TableResolver;
915
import io.grpc.stub.StreamObserver;
1016
import java.util.List;
1117
import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;
@@ -15,26 +21,44 @@ public class MemDB extends Plugin {
1521
List.of(
1622
Table.builder()
1723
.name("table1")
18-
.columns(List.of(Column.builder().name("name1").type(new Utf8()).build()))
24+
.resolver(
25+
new TableResolver() {
26+
@Override
27+
public void resolve(
28+
ClientMeta clientMeta, Resource parent, TableOutputStream stream) {
29+
stream.write(Table1Data.builder().name("name1").build());
30+
stream.write(Table1Data.builder().name("name2").build());
31+
}
32+
})
33+
.columns(List.of(Column.builder().name("name").type(new Utf8()).build()))
1934
.build(),
2035
Table.builder()
2136
.name("table2")
22-
.columns(List.of(Column.builder().name("name1").type(new Utf8()).build()))
37+
.resolver(
38+
new TableResolver() {
39+
@Override
40+
public void resolve(
41+
ClientMeta clientMeta, Resource parent, TableOutputStream stream) {
42+
stream.write(Table2Data.builder().id("id1").build());
43+
stream.write(Table2Data.builder().id("id2").build());
44+
}
45+
})
46+
.columns(List.of(Column.builder().name("id").type(new Utf8()).build()))
2347
.build());
2448

49+
private Spec spec;
50+
2551
public MemDB() {
2652
super("memdb", "0.0.1");
2753
}
2854

29-
@Override
30-
public void init() {
31-
// do nothing
32-
}
33-
3455
@Override
3556
public List<Table> tables(
3657
List<String> includeList, List<String> skipList, boolean skipDependentTables)
37-
throws SchemaException {
58+
throws SchemaException, ClientNotInitializedException {
59+
if (this.client == null) {
60+
throw new ClientNotInitializedException();
61+
}
3862
return Table.filterDFS(allTables, includeList, skipList, skipDependentTables);
3963
}
4064

@@ -46,13 +70,19 @@ public void sync(
4670
boolean deterministicCqId,
4771
BackendOptions backendOptions,
4872
StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream)
49-
throws SchemaException {
73+
throws SchemaException, ClientNotInitializedException {
74+
if (this.client == null) {
75+
throw new ClientNotInitializedException();
76+
}
77+
5078
List<Table> filtered = Table.filterDFS(allTables, includeList, skipList, skipDependentTables);
5179
Scheduler.builder()
80+
.client(client)
5281
.tables(filtered)
5382
.syncStream(syncStream)
5483
.deterministicCqId(deterministicCqId)
5584
.logger(getLogger())
85+
.concurrency(this.spec.getConcurrency())
5686
.build()
5787
.sync();
5888
}
@@ -69,6 +99,17 @@ public void write() {
6999

70100
@Override
71101
public void close() {
72-
// do nothing
102+
if (this.client != null) {
103+
((MemDBClient) this.client).close();
104+
}
105+
}
106+
107+
@Override
108+
public ClientMeta newClient(String spec, NewClientOptions options) throws Exception {
109+
if (options.isNoConnection()) {
110+
return null;
111+
}
112+
this.spec = Spec.fromJSON(spec);
113+
return new MemDBClient();
73114
}
74115
}
+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.cloudquery.memdb;
2+
3+
import io.cloudquery.schema.ClientMeta;
4+
5+
public class MemDBClient implements ClientMeta {
6+
private static final String id = "memdb";
7+
8+
public MemDBClient() {}
9+
10+
@Override
11+
public String getId() {
12+
return id;
13+
}
14+
15+
public void close() {
16+
// do nothing
17+
}
18+
}

Diff for: lib/src/main/java/io/cloudquery/memdb/Spec.java

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.cloudquery.memdb;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.JsonMappingException;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import lombok.Getter;
7+
import lombok.Setter;
8+
9+
@Getter
10+
@Setter
11+
public class Spec {
12+
public static Spec fromJSON(String json) throws JsonMappingException, JsonProcessingException {
13+
ObjectMapper objectMapper = new ObjectMapper();
14+
Spec spec = objectMapper.readValue(json, Spec.class);
15+
if (spec.getConcurrency() == 0) {
16+
spec.setConcurrency(10000);
17+
}
18+
return spec;
19+
}
20+
21+
private int concurrency;
22+
23+
public Spec() {}
24+
}
+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.cloudquery.memdb;
2+
3+
import lombok.Builder;
4+
import lombok.Getter;
5+
6+
@Builder
7+
@Getter
8+
public class Table1Data {
9+
private String name;
10+
}
+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.cloudquery.memdb;
2+
3+
import lombok.Builder;
4+
import lombok.Getter;
5+
6+
@Builder
7+
@Getter
8+
public class Table2Data {
9+
private String id;
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package io.cloudquery.plugin;
2+
3+
public class ClientNotInitializedException extends Exception {
4+
5+
public ClientNotInitializedException() {}
6+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.cloudquery.plugin;
2+
3+
import lombok.Builder;
4+
import lombok.Getter;
5+
6+
@Builder
7+
@Getter
8+
public class NewClientOptions {
9+
private final boolean noConnection;
10+
}

Diff for: lib/src/main/java/io/cloudquery/plugin/Plugin.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.cloudquery.plugin;
22

3+
import io.cloudquery.schema.ClientMeta;
34
import io.cloudquery.schema.SchemaException;
45
import io.cloudquery.schema.Table;
56
import io.grpc.stub.StreamObserver;
@@ -16,12 +17,17 @@ public abstract class Plugin {
1617
@NonNull protected final String name;
1718
@NonNull protected final String version;
1819
@Setter protected Logger logger;
20+
protected ClientMeta client;
1921

20-
public abstract void init();
22+
public void init(String spec, NewClientOptions options) throws Exception {
23+
client = newClient(spec, options);
24+
}
25+
26+
public abstract ClientMeta newClient(String spec, NewClientOptions options) throws Exception;
2127

2228
public abstract List<Table> tables(
2329
List<String> includeList, List<String> skipList, boolean skipDependentTables)
24-
throws SchemaException;
30+
throws SchemaException, ClientNotInitializedException;
2531

2632
public abstract void sync(
2733
List<String> includeList,
@@ -30,7 +36,7 @@ public abstract void sync(
3036
boolean deterministicCqId,
3137
BackendOptions backendOptions,
3238
StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream)
33-
throws SchemaException;
39+
throws SchemaException, ClientNotInitializedException;
3440

3541
public abstract void read();
3642

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package io.cloudquery.plugin;
2+
3+
public interface TableOutputStream {
4+
public void write(Object data);
5+
}

Diff for: lib/src/main/java/io/cloudquery/scheduler/Scheduler.java

+22
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.cloudquery.scheduler;
22

33
import io.cloudquery.plugin.v3.Sync;
4+
import io.cloudquery.schema.ClientMeta;
45
import io.cloudquery.schema.Table;
56
import io.grpc.stub.StreamObserver;
67
import java.io.IOException;
@@ -14,7 +15,9 @@ public class Scheduler {
1415
@NonNull private final List<Table> tables;
1516
@NonNull private final StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream;
1617
@NonNull private final Logger logger;
18+
@NonNull private final ClientMeta client;
1719

20+
private int concurrency;
1821
private boolean deterministicCqId;
1922

2023
public void sync() {
@@ -30,6 +33,25 @@ public void sync() {
3033
return;
3134
}
3235
}
36+
37+
for (Table table : tables) {
38+
try {
39+
logger.info("resolving table: {}", table.getName());
40+
SchedulerTableOutputStream schedulerTableOutputStream =
41+
SchedulerTableOutputStream.builder()
42+
.table(table)
43+
.client(client)
44+
.logger(logger)
45+
.syncStream(syncStream)
46+
.build();
47+
table.getResolver().resolve(client, null, schedulerTableOutputStream);
48+
logger.info("resolved table: {}", table.getName());
49+
} catch (Exception e) {
50+
syncStream.onError(e);
51+
return;
52+
}
53+
}
54+
3355
syncStream.onCompleted();
3456
}
3557
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package io.cloudquery.scheduler;
2+
3+
import io.cloudquery.plugin.TableOutputStream;
4+
import io.cloudquery.plugin.v3.Sync;
5+
import io.cloudquery.schema.ClientMeta;
6+
import io.cloudquery.schema.Column;
7+
import io.cloudquery.schema.Resource;
8+
import io.cloudquery.schema.Table;
9+
import io.cloudquery.transformers.TransformerException;
10+
import io.grpc.stub.StreamObserver;
11+
import java.io.IOException;
12+
import lombok.Builder;
13+
import lombok.NonNull;
14+
import org.apache.logging.log4j.Logger;
15+
16+
@Builder
17+
public class SchedulerTableOutputStream implements TableOutputStream {
18+
@NonNull private final Table table;
19+
private final Resource parent;
20+
@NonNull private final ClientMeta client;
21+
@NonNull private final Logger logger;
22+
@NonNull private final StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream;
23+
24+
@Override
25+
public void write(Object data) {
26+
Resource resource = Resource.builder().table(table).parent(parent).item(data).build();
27+
for (Column column : table.getColumns()) {
28+
try {
29+
logger.info("resolving column: {}", column.getName());
30+
if (column.getResolver() == null) {
31+
// TODO: Fall back to path resolver
32+
continue;
33+
}
34+
column.getResolver().resolve(client, resource, column);
35+
logger.info("resolved column: {}", column.getName());
36+
} catch (TransformerException e) {
37+
logger.error("Failed to resolve column: {}", column.getName(), e);
38+
return;
39+
}
40+
}
41+
42+
try {
43+
Sync.MessageInsert insert =
44+
Sync.MessageInsert.newBuilder().setRecord(resource.encode()).build();
45+
Sync.Response response = Sync.Response.newBuilder().setInsert(insert).build();
46+
syncStream.onNext(response);
47+
} catch (IOException e) {
48+
logger.error("Failed to encode resource: {}", resource, e);
49+
return;
50+
}
51+
}
52+
}
+3-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.cloudquery.schema;
22

3-
import lombok.Builder;
4-
5-
@Builder
6-
public class ClientMeta {}
3+
public interface ClientMeta {
4+
String getId();
5+
}

Diff for: lib/src/main/java/io/cloudquery/schema/Resource.java

+7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package io.cloudquery.schema;
22

3+
import com.google.protobuf.ByteString;
34
import io.cloudquery.scalar.Scalar;
45
import io.cloudquery.scalar.ValidationException;
6+
import java.io.IOException;
57
import java.util.ArrayList;
68
import java.util.List;
79
import lombok.Builder;
@@ -37,4 +39,9 @@ public Scalar<?> get(String columnName) {
3739
int index = table.indexOfColumn(columnName);
3840
return this.data.get(index);
3941
}
42+
43+
public ByteString encode() throws IOException {
44+
// TODO: Encode data and not only schema
45+
return table.encode();
46+
}
4047
}

Diff for: lib/src/main/java/io/cloudquery/schema/Table.java

+1
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public static int maxDepth(List<Table> tables) {
133133
}
134134

135135
@NonNull private String name;
136+
private TableResolver resolver;
136137
private String title;
137138
private String description;
138139
@Setter private Table parent;

0 commit comments

Comments
 (0)