Skip to content

feat(sync): Initial insert message support #81

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.protobuf.ByteString;
import io.cloudquery.plugin.BackendOptions;
import io.cloudquery.plugin.NewClientOptions;
import io.cloudquery.plugin.Plugin;
import io.cloudquery.plugin.v3.PluginGrpc.PluginImplBase;
import io.cloudquery.plugin.v3.Write;
Expand Down Expand Up @@ -41,9 +42,15 @@ public void getVersion(
public void init(
io.cloudquery.plugin.v3.Init.Request request,
StreamObserver<io.cloudquery.plugin.v3.Init.Response> responseObserver) {
plugin.init();
responseObserver.onNext(io.cloudquery.plugin.v3.Init.Response.newBuilder().build());
responseObserver.onCompleted();
try {
plugin.init(
request.getSpec().toStringUtf8(),
NewClientOptions.builder().noConnection(request.getNoConnection()).build());
responseObserver.onNext(io.cloudquery.plugin.v3.Init.Response.newBuilder().build());
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(e);
}
}

@Override
Expand Down
61 changes: 51 additions & 10 deletions lib/src/main/java/io/cloudquery/memdb/MemDB.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package io.cloudquery.memdb;

import io.cloudquery.plugin.BackendOptions;
import io.cloudquery.plugin.ClientNotInitializedException;
import io.cloudquery.plugin.NewClientOptions;
import io.cloudquery.plugin.Plugin;
import io.cloudquery.plugin.TableOutputStream;
import io.cloudquery.scheduler.Scheduler;
import io.cloudquery.schema.ClientMeta;
import io.cloudquery.schema.Column;
import io.cloudquery.schema.Resource;
import io.cloudquery.schema.SchemaException;
import io.cloudquery.schema.Table;
import io.cloudquery.schema.TableResolver;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;
Expand All @@ -15,26 +21,44 @@ public class MemDB extends Plugin {
List.of(
Table.builder()
.name("table1")
.columns(List.of(Column.builder().name("name1").type(new Utf8()).build()))
.resolver(
new TableResolver() {
@Override
public void resolve(
ClientMeta clientMeta, Resource parent, TableOutputStream stream) {
stream.write(Table1Data.builder().name("name1").build());
stream.write(Table1Data.builder().name("name2").build());
}
})
.columns(List.of(Column.builder().name("name").type(new Utf8()).build()))
.build(),
Table.builder()
.name("table2")
.columns(List.of(Column.builder().name("name1").type(new Utf8()).build()))
.resolver(
new TableResolver() {
@Override
public void resolve(
ClientMeta clientMeta, Resource parent, TableOutputStream stream) {
stream.write(Table2Data.builder().id("id1").build());
stream.write(Table2Data.builder().id("id2").build());
}
})
.columns(List.of(Column.builder().name("id").type(new Utf8()).build()))
.build());

private Spec spec;

public MemDB() {
super("memdb", "0.0.1");
}

@Override
public void init() {
// do nothing
}

@Override
public List<Table> tables(
List<String> includeList, List<String> skipList, boolean skipDependentTables)
throws SchemaException {
throws SchemaException, ClientNotInitializedException {
if (this.client == null) {
throw new ClientNotInitializedException();
}
return Table.filterDFS(allTables, includeList, skipList, skipDependentTables);
}

Expand All @@ -46,13 +70,19 @@ public void sync(
boolean deterministicCqId,
BackendOptions backendOptions,
StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream)
throws SchemaException {
throws SchemaException, ClientNotInitializedException {
if (this.client == null) {
throw new ClientNotInitializedException();
}

List<Table> filtered = Table.filterDFS(allTables, includeList, skipList, skipDependentTables);
Scheduler.builder()
.client(client)
.tables(filtered)
.syncStream(syncStream)
.deterministicCqId(deterministicCqId)
.logger(getLogger())
.concurrency(this.spec.getConcurrency())
.build()
.sync();
}
Expand All @@ -69,6 +99,17 @@ public void write() {

@Override
public void close() {
// do nothing
if (this.client != null) {
((MemDBClient) this.client).close();
}
}

@Override
public ClientMeta newClient(String spec, NewClientOptions options) throws Exception {
if (options.isNoConnection()) {
return null;
}
this.spec = Spec.fromJSON(spec);
return new MemDBClient();
}
}
18 changes: 18 additions & 0 deletions lib/src/main/java/io/cloudquery/memdb/MemDBClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.cloudquery.memdb;

import io.cloudquery.schema.ClientMeta;

public class MemDBClient implements ClientMeta {
private static final String id = "memdb";

public MemDBClient() {}

@Override
public String getId() {
return id;
}

public void close() {
// do nothing
}
}
24 changes: 24 additions & 0 deletions lib/src/main/java/io/cloudquery/memdb/Spec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.cloudquery.memdb;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class Spec {
public static Spec fromJSON(String json) throws JsonMappingException, JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
Spec spec = objectMapper.readValue(json, Spec.class);
if (spec.getConcurrency() == 0) {
spec.setConcurrency(10000);
}
return spec;
}

private int concurrency;

public Spec() {}
}
10 changes: 10 additions & 0 deletions lib/src/main/java/io/cloudquery/memdb/Table1Data.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.cloudquery.memdb;

import lombok.Builder;
import lombok.Getter;

@Builder
@Getter
public class Table1Data {
private String name;
}
10 changes: 10 additions & 0 deletions lib/src/main/java/io/cloudquery/memdb/Table2Data.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.cloudquery.memdb;

import lombok.Builder;
import lombok.Getter;

@Builder
@Getter
public class Table2Data {
private String id;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.cloudquery.plugin;

public class ClientNotInitializedException extends Exception {

public ClientNotInitializedException() {}
}
10 changes: 10 additions & 0 deletions lib/src/main/java/io/cloudquery/plugin/NewClientOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.cloudquery.plugin;

import lombok.Builder;
import lombok.Getter;

@Builder
@Getter
public class NewClientOptions {
private final boolean noConnection;
}
12 changes: 9 additions & 3 deletions lib/src/main/java/io/cloudquery/plugin/Plugin.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.cloudquery.plugin;

import io.cloudquery.schema.ClientMeta;
import io.cloudquery.schema.SchemaException;
import io.cloudquery.schema.Table;
import io.grpc.stub.StreamObserver;
Expand All @@ -16,12 +17,17 @@ public abstract class Plugin {
@NonNull protected final String name;
@NonNull protected final String version;
@Setter protected Logger logger;
protected ClientMeta client;

public abstract void init();
public void init(String spec, NewClientOptions options) throws Exception {
client = newClient(spec, options);
}

public abstract ClientMeta newClient(String spec, NewClientOptions options) throws Exception;

public abstract List<Table> tables(
List<String> includeList, List<String> skipList, boolean skipDependentTables)
throws SchemaException;
throws SchemaException, ClientNotInitializedException;

public abstract void sync(
List<String> includeList,
Expand All @@ -30,7 +36,7 @@ public abstract void sync(
boolean deterministicCqId,
BackendOptions backendOptions,
StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream)
throws SchemaException;
throws SchemaException, ClientNotInitializedException;

public abstract void read();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.cloudquery.plugin;

public interface TableOutputStream {
public void write(Object data);
}
22 changes: 22 additions & 0 deletions lib/src/main/java/io/cloudquery/scheduler/Scheduler.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.cloudquery.scheduler;

import io.cloudquery.plugin.v3.Sync;
import io.cloudquery.schema.ClientMeta;
import io.cloudquery.schema.Table;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
Expand All @@ -14,7 +15,9 @@ public class Scheduler {
@NonNull private final List<Table> tables;
@NonNull private final StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream;
@NonNull private final Logger logger;
@NonNull private final ClientMeta client;

private int concurrency;
private boolean deterministicCqId;

public void sync() {
Expand All @@ -30,6 +33,25 @@ public void sync() {
return;
}
}

for (Table table : tables) {
try {
logger.info("resolving table: {}", table.getName());
SchedulerTableOutputStream schedulerTableOutputStream =
SchedulerTableOutputStream.builder()
.table(table)
.client(client)
.logger(logger)
.syncStream(syncStream)
.build();
table.getResolver().resolve(client, null, schedulerTableOutputStream);
logger.info("resolved table: {}", table.getName());
} catch (Exception e) {
syncStream.onError(e);
return;
}
}

syncStream.onCompleted();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.cloudquery.scheduler;

import io.cloudquery.plugin.TableOutputStream;
import io.cloudquery.plugin.v3.Sync;
import io.cloudquery.schema.ClientMeta;
import io.cloudquery.schema.Column;
import io.cloudquery.schema.Resource;
import io.cloudquery.schema.Table;
import io.cloudquery.transformers.TransformerException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import lombok.Builder;
import lombok.NonNull;
import org.apache.logging.log4j.Logger;

@Builder
public class SchedulerTableOutputStream implements TableOutputStream {
@NonNull private final Table table;
private final Resource parent;
@NonNull private final ClientMeta client;
@NonNull private final Logger logger;
@NonNull private final StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream;

@Override
public void write(Object data) {
Resource resource = Resource.builder().table(table).parent(parent).item(data).build();
for (Column column : table.getColumns()) {
try {
logger.info("resolving column: {}", column.getName());
if (column.getResolver() == null) {
// TODO: Fall back to path resolver
continue;
}
column.getResolver().resolve(client, resource, column);
logger.info("resolved column: {}", column.getName());
} catch (TransformerException e) {
logger.error("Failed to resolve column: {}", column.getName(), e);
return;
}
}

try {
Sync.MessageInsert insert =
Sync.MessageInsert.newBuilder().setRecord(resource.encode()).build();
Sync.Response response = Sync.Response.newBuilder().setInsert(insert).build();
syncStream.onNext(response);
} catch (IOException e) {
logger.error("Failed to encode resource: {}", resource, e);
return;
}
}
}
7 changes: 3 additions & 4 deletions lib/src/main/java/io/cloudquery/schema/ClientMeta.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.cloudquery.schema;

import lombok.Builder;

@Builder
public class ClientMeta {}
public interface ClientMeta {
String getId();
}
7 changes: 7 additions & 0 deletions lib/src/main/java/io/cloudquery/schema/Resource.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.cloudquery.schema;

import com.google.protobuf.ByteString;
import io.cloudquery.scalar.Scalar;
import io.cloudquery.scalar.ValidationException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import lombok.Builder;
Expand Down Expand Up @@ -37,4 +39,9 @@ public Scalar<?> get(String columnName) {
int index = table.indexOfColumn(columnName);
return this.data.get(index);
}

public ByteString encode() throws IOException {
// TODO: Encode data and not only schema
return table.encode();
}
}
1 change: 1 addition & 0 deletions lib/src/main/java/io/cloudquery/schema/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public static int maxDepth(List<Table> tables) {
}

@NonNull private String name;
private TableResolver resolver;
private String title;
private String description;
@Setter private Table parent;
Expand Down
Loading