diff --git a/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java b/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java index 9c5a5fd..21a8743 100644 --- a/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java +++ b/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java @@ -2,6 +2,7 @@ import com.google.protobuf.ByteString; import io.cloudquery.plugin.BackendOptions; +import io.cloudquery.plugin.NewClientOptions; import io.cloudquery.plugin.Plugin; import io.cloudquery.plugin.v3.PluginGrpc.PluginImplBase; import io.cloudquery.plugin.v3.Write; @@ -41,9 +42,15 @@ public void getVersion( public void init( io.cloudquery.plugin.v3.Init.Request request, StreamObserver responseObserver) { - plugin.init(); - responseObserver.onNext(io.cloudquery.plugin.v3.Init.Response.newBuilder().build()); - responseObserver.onCompleted(); + try { + plugin.init( + request.getSpec().toStringUtf8(), + NewClientOptions.builder().noConnection(request.getNoConnection()).build()); + responseObserver.onNext(io.cloudquery.plugin.v3.Init.Response.newBuilder().build()); + responseObserver.onCompleted(); + } catch (Exception e) { + responseObserver.onError(e); + } } @Override diff --git a/lib/src/main/java/io/cloudquery/memdb/MemDB.java b/lib/src/main/java/io/cloudquery/memdb/MemDB.java index a4f08f9..dbb0348 100644 --- a/lib/src/main/java/io/cloudquery/memdb/MemDB.java +++ b/lib/src/main/java/io/cloudquery/memdb/MemDB.java @@ -1,11 +1,17 @@ package io.cloudquery.memdb; import io.cloudquery.plugin.BackendOptions; +import io.cloudquery.plugin.ClientNotInitializedException; +import io.cloudquery.plugin.NewClientOptions; import io.cloudquery.plugin.Plugin; +import io.cloudquery.plugin.TableOutputStream; import io.cloudquery.scheduler.Scheduler; +import io.cloudquery.schema.ClientMeta; import io.cloudquery.schema.Column; +import io.cloudquery.schema.Resource; import io.cloudquery.schema.SchemaException; import io.cloudquery.schema.Table; +import io.cloudquery.schema.TableResolver; import io.grpc.stub.StreamObserver; import java.util.List; import org.apache.arrow.vector.types.pojo.ArrowType.Utf8; @@ -15,26 +21,44 @@ public class MemDB extends Plugin { List.of( Table.builder() .name("table1") - .columns(List.of(Column.builder().name("name1").type(new Utf8()).build())) + .resolver( + new TableResolver() { + @Override + public void resolve( + ClientMeta clientMeta, Resource parent, TableOutputStream stream) { + stream.write(Table1Data.builder().name("name1").build()); + stream.write(Table1Data.builder().name("name2").build()); + } + }) + .columns(List.of(Column.builder().name("name").type(new Utf8()).build())) .build(), Table.builder() .name("table2") - .columns(List.of(Column.builder().name("name1").type(new Utf8()).build())) + .resolver( + new TableResolver() { + @Override + public void resolve( + ClientMeta clientMeta, Resource parent, TableOutputStream stream) { + stream.write(Table2Data.builder().id("id1").build()); + stream.write(Table2Data.builder().id("id2").build()); + } + }) + .columns(List.of(Column.builder().name("id").type(new Utf8()).build())) .build()); + private Spec spec; + public MemDB() { super("memdb", "0.0.1"); } - @Override - public void init() { - // do nothing - } - @Override public List tables( List includeList, List skipList, boolean skipDependentTables) - throws SchemaException { + throws SchemaException, ClientNotInitializedException { + if (this.client == null) { + throw new ClientNotInitializedException(); + } return Table.filterDFS(allTables, includeList, skipList, skipDependentTables); } @@ -46,13 +70,19 @@ public void sync( boolean deterministicCqId, BackendOptions backendOptions, StreamObserver syncStream) - throws SchemaException { + throws SchemaException, ClientNotInitializedException { + if (this.client == null) { + throw new ClientNotInitializedException(); + } + List
filtered = Table.filterDFS(allTables, includeList, skipList, skipDependentTables); Scheduler.builder() + .client(client) .tables(filtered) .syncStream(syncStream) .deterministicCqId(deterministicCqId) .logger(getLogger()) + .concurrency(this.spec.getConcurrency()) .build() .sync(); } @@ -69,6 +99,17 @@ public void write() { @Override public void close() { - // do nothing + if (this.client != null) { + ((MemDBClient) this.client).close(); + } + } + + @Override + public ClientMeta newClient(String spec, NewClientOptions options) throws Exception { + if (options.isNoConnection()) { + return null; + } + this.spec = Spec.fromJSON(spec); + return new MemDBClient(); } } diff --git a/lib/src/main/java/io/cloudquery/memdb/MemDBClient.java b/lib/src/main/java/io/cloudquery/memdb/MemDBClient.java new file mode 100644 index 0000000..9a640b5 --- /dev/null +++ b/lib/src/main/java/io/cloudquery/memdb/MemDBClient.java @@ -0,0 +1,18 @@ +package io.cloudquery.memdb; + +import io.cloudquery.schema.ClientMeta; + +public class MemDBClient implements ClientMeta { + private static final String id = "memdb"; + + public MemDBClient() {} + + @Override + public String getId() { + return id; + } + + public void close() { + // do nothing + } +} diff --git a/lib/src/main/java/io/cloudquery/memdb/Spec.java b/lib/src/main/java/io/cloudquery/memdb/Spec.java new file mode 100644 index 0000000..316bcd2 --- /dev/null +++ b/lib/src/main/java/io/cloudquery/memdb/Spec.java @@ -0,0 +1,24 @@ +package io.cloudquery.memdb; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class Spec { + public static Spec fromJSON(String json) throws JsonMappingException, JsonProcessingException { + ObjectMapper objectMapper = new ObjectMapper(); + Spec spec = objectMapper.readValue(json, Spec.class); + if (spec.getConcurrency() == 0) { + spec.setConcurrency(10000); + } + return spec; + } + + private int concurrency; + + public Spec() {} +} diff --git a/lib/src/main/java/io/cloudquery/memdb/Table1Data.java b/lib/src/main/java/io/cloudquery/memdb/Table1Data.java new file mode 100644 index 0000000..7eee118 --- /dev/null +++ b/lib/src/main/java/io/cloudquery/memdb/Table1Data.java @@ -0,0 +1,10 @@ +package io.cloudquery.memdb; + +import lombok.Builder; +import lombok.Getter; + +@Builder +@Getter +public class Table1Data { + private String name; +} diff --git a/lib/src/main/java/io/cloudquery/memdb/Table2Data.java b/lib/src/main/java/io/cloudquery/memdb/Table2Data.java new file mode 100644 index 0000000..7715300 --- /dev/null +++ b/lib/src/main/java/io/cloudquery/memdb/Table2Data.java @@ -0,0 +1,10 @@ +package io.cloudquery.memdb; + +import lombok.Builder; +import lombok.Getter; + +@Builder +@Getter +public class Table2Data { + private String id; +} diff --git a/lib/src/main/java/io/cloudquery/plugin/ClientNotInitializedException.java b/lib/src/main/java/io/cloudquery/plugin/ClientNotInitializedException.java new file mode 100644 index 0000000..650012b --- /dev/null +++ b/lib/src/main/java/io/cloudquery/plugin/ClientNotInitializedException.java @@ -0,0 +1,6 @@ +package io.cloudquery.plugin; + +public class ClientNotInitializedException extends Exception { + + public ClientNotInitializedException() {} +} diff --git a/lib/src/main/java/io/cloudquery/plugin/NewClientOptions.java b/lib/src/main/java/io/cloudquery/plugin/NewClientOptions.java new file mode 100644 index 0000000..051d2e6 --- /dev/null +++ b/lib/src/main/java/io/cloudquery/plugin/NewClientOptions.java @@ -0,0 +1,10 @@ +package io.cloudquery.plugin; + +import lombok.Builder; +import lombok.Getter; + +@Builder +@Getter +public class NewClientOptions { + private final boolean noConnection; +} diff --git a/lib/src/main/java/io/cloudquery/plugin/Plugin.java b/lib/src/main/java/io/cloudquery/plugin/Plugin.java index b27fc1d..f04623d 100644 --- a/lib/src/main/java/io/cloudquery/plugin/Plugin.java +++ b/lib/src/main/java/io/cloudquery/plugin/Plugin.java @@ -1,5 +1,6 @@ package io.cloudquery.plugin; +import io.cloudquery.schema.ClientMeta; import io.cloudquery.schema.SchemaException; import io.cloudquery.schema.Table; import io.grpc.stub.StreamObserver; @@ -16,12 +17,17 @@ public abstract class Plugin { @NonNull protected final String name; @NonNull protected final String version; @Setter protected Logger logger; + protected ClientMeta client; - public abstract void init(); + public void init(String spec, NewClientOptions options) throws Exception { + client = newClient(spec, options); + } + + public abstract ClientMeta newClient(String spec, NewClientOptions options) throws Exception; public abstract List
tables( List includeList, List skipList, boolean skipDependentTables) - throws SchemaException; + throws SchemaException, ClientNotInitializedException; public abstract void sync( List includeList, @@ -30,7 +36,7 @@ public abstract void sync( boolean deterministicCqId, BackendOptions backendOptions, StreamObserver syncStream) - throws SchemaException; + throws SchemaException, ClientNotInitializedException; public abstract void read(); diff --git a/lib/src/main/java/io/cloudquery/plugin/TableOutputStream.java b/lib/src/main/java/io/cloudquery/plugin/TableOutputStream.java new file mode 100644 index 0000000..328c4d0 --- /dev/null +++ b/lib/src/main/java/io/cloudquery/plugin/TableOutputStream.java @@ -0,0 +1,5 @@ +package io.cloudquery.plugin; + +public interface TableOutputStream { + public void write(Object data); +} diff --git a/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java b/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java index 7d484f6..d8bc475 100644 --- a/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java +++ b/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java @@ -1,6 +1,7 @@ package io.cloudquery.scheduler; import io.cloudquery.plugin.v3.Sync; +import io.cloudquery.schema.ClientMeta; import io.cloudquery.schema.Table; import io.grpc.stub.StreamObserver; import java.io.IOException; @@ -14,7 +15,9 @@ public class Scheduler { @NonNull private final List
tables; @NonNull private final StreamObserver syncStream; @NonNull private final Logger logger; + @NonNull private final ClientMeta client; + private int concurrency; private boolean deterministicCqId; public void sync() { @@ -30,6 +33,25 @@ public void sync() { return; } } + + for (Table table : tables) { + try { + logger.info("resolving table: {}", table.getName()); + SchedulerTableOutputStream schedulerTableOutputStream = + SchedulerTableOutputStream.builder() + .table(table) + .client(client) + .logger(logger) + .syncStream(syncStream) + .build(); + table.getResolver().resolve(client, null, schedulerTableOutputStream); + logger.info("resolved table: {}", table.getName()); + } catch (Exception e) { + syncStream.onError(e); + return; + } + } + syncStream.onCompleted(); } } diff --git a/lib/src/main/java/io/cloudquery/scheduler/SchedulerTableOutputStream.java b/lib/src/main/java/io/cloudquery/scheduler/SchedulerTableOutputStream.java new file mode 100644 index 0000000..a13bf2f --- /dev/null +++ b/lib/src/main/java/io/cloudquery/scheduler/SchedulerTableOutputStream.java @@ -0,0 +1,52 @@ +package io.cloudquery.scheduler; + +import io.cloudquery.plugin.TableOutputStream; +import io.cloudquery.plugin.v3.Sync; +import io.cloudquery.schema.ClientMeta; +import io.cloudquery.schema.Column; +import io.cloudquery.schema.Resource; +import io.cloudquery.schema.Table; +import io.cloudquery.transformers.TransformerException; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import lombok.Builder; +import lombok.NonNull; +import org.apache.logging.log4j.Logger; + +@Builder +public class SchedulerTableOutputStream implements TableOutputStream { + @NonNull private final Table table; + private final Resource parent; + @NonNull private final ClientMeta client; + @NonNull private final Logger logger; + @NonNull private final StreamObserver syncStream; + + @Override + public void write(Object data) { + Resource resource = Resource.builder().table(table).parent(parent).item(data).build(); + for (Column column : table.getColumns()) { + try { + logger.info("resolving column: {}", column.getName()); + if (column.getResolver() == null) { + // TODO: Fall back to path resolver + continue; + } + column.getResolver().resolve(client, resource, column); + logger.info("resolved column: {}", column.getName()); + } catch (TransformerException e) { + logger.error("Failed to resolve column: {}", column.getName(), e); + return; + } + } + + try { + Sync.MessageInsert insert = + Sync.MessageInsert.newBuilder().setRecord(resource.encode()).build(); + Sync.Response response = Sync.Response.newBuilder().setInsert(insert).build(); + syncStream.onNext(response); + } catch (IOException e) { + logger.error("Failed to encode resource: {}", resource, e); + return; + } + } +} diff --git a/lib/src/main/java/io/cloudquery/schema/ClientMeta.java b/lib/src/main/java/io/cloudquery/schema/ClientMeta.java index 8356cfb..cf0b64a 100644 --- a/lib/src/main/java/io/cloudquery/schema/ClientMeta.java +++ b/lib/src/main/java/io/cloudquery/schema/ClientMeta.java @@ -1,6 +1,5 @@ package io.cloudquery.schema; -import lombok.Builder; - -@Builder -public class ClientMeta {} +public interface ClientMeta { + String getId(); +} diff --git a/lib/src/main/java/io/cloudquery/schema/Resource.java b/lib/src/main/java/io/cloudquery/schema/Resource.java index 2300882..198a1d5 100644 --- a/lib/src/main/java/io/cloudquery/schema/Resource.java +++ b/lib/src/main/java/io/cloudquery/schema/Resource.java @@ -1,7 +1,9 @@ package io.cloudquery.schema; +import com.google.protobuf.ByteString; import io.cloudquery.scalar.Scalar; import io.cloudquery.scalar.ValidationException; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import lombok.Builder; @@ -37,4 +39,9 @@ public Scalar get(String columnName) { int index = table.indexOfColumn(columnName); return this.data.get(index); } + + public ByteString encode() throws IOException { + // TODO: Encode data and not only schema + return table.encode(); + } } diff --git a/lib/src/main/java/io/cloudquery/schema/Table.java b/lib/src/main/java/io/cloudquery/schema/Table.java index ca47d8a..9acdc0f 100644 --- a/lib/src/main/java/io/cloudquery/schema/Table.java +++ b/lib/src/main/java/io/cloudquery/schema/Table.java @@ -133,6 +133,7 @@ public static int maxDepth(List
tables) { } @NonNull private String name; + private TableResolver resolver; private String title; private String description; @Setter private Table parent; diff --git a/lib/src/main/java/io/cloudquery/schema/TableResolver.java b/lib/src/main/java/io/cloudquery/schema/TableResolver.java new file mode 100644 index 0000000..417bac9 --- /dev/null +++ b/lib/src/main/java/io/cloudquery/schema/TableResolver.java @@ -0,0 +1,7 @@ +package io.cloudquery.schema; + +import io.cloudquery.plugin.TableOutputStream; + +public interface TableResolver { + void resolve(ClientMeta clientMeta, Resource parent, TableOutputStream stream); +}