Skip to content

Commit 2c7060f

Browse files
authored
feat: Encode resources with data (#88)
1 parent 2b82e04 commit 2c7060f

23 files changed

+331
-93
lines changed

Diff for: lib/build.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ dependencies {
6464
test {
6565
useJUnitPlatform()
6666
testLogging {
67-
events "passed", "skipped", "failed"
67+
events "skipped", "failed"
6868
}
6969
jvmArgs("--add-opens=java.base/java.nio=ALL-UNNAMED")
7070
}
@@ -100,6 +100,7 @@ task runMemDBServe(type: JavaExec) {
100100
classpath = sourceSets.main.runtimeClasspath
101101
main = javaMainClass
102102
args = ["serve"]
103+
jvmArgs = ["--add-opens=java.base/java.nio=ALL-UNNAMED"]
103104
}
104105

105106
spotless {

Diff for: lib/src/main/java/io/cloudquery/helper/ArrowHelper.java

+168-10
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44

55
import com.google.protobuf.ByteString;
66
import io.cloudquery.schema.Column;
7+
import io.cloudquery.schema.Resource;
78
import io.cloudquery.schema.Table;
89
import io.cloudquery.schema.Table.TableBuilder;
10+
import io.cloudquery.types.JSONType.JSONVector;
11+
import io.cloudquery.types.UUIDType.UUIDVector;
912
import java.io.ByteArrayOutputStream;
1013
import java.io.IOException;
1114
import java.nio.channels.Channels;
@@ -15,29 +18,136 @@
1518
import java.util.Map;
1619
import org.apache.arrow.memory.BufferAllocator;
1720
import org.apache.arrow.memory.RootAllocator;
21+
import org.apache.arrow.vector.BigIntVector;
22+
import org.apache.arrow.vector.BitVector;
23+
import org.apache.arrow.vector.FieldVector;
24+
import org.apache.arrow.vector.FixedSizeBinaryVector;
25+
import org.apache.arrow.vector.Float4Vector;
26+
import org.apache.arrow.vector.Float8Vector;
27+
import org.apache.arrow.vector.IntVector;
28+
import org.apache.arrow.vector.LargeVarBinaryVector;
29+
import org.apache.arrow.vector.LargeVarCharVector;
30+
import org.apache.arrow.vector.SmallIntVector;
31+
import org.apache.arrow.vector.TimeStampVector;
32+
import org.apache.arrow.vector.TinyIntVector;
33+
import org.apache.arrow.vector.UInt1Vector;
34+
import org.apache.arrow.vector.UInt2Vector;
35+
import org.apache.arrow.vector.UInt4Vector;
36+
import org.apache.arrow.vector.UInt8Vector;
37+
import org.apache.arrow.vector.VarBinaryVector;
38+
import org.apache.arrow.vector.VarCharVector;
1839
import org.apache.arrow.vector.VectorSchemaRoot;
1940
import org.apache.arrow.vector.ipc.ArrowReader;
2041
import org.apache.arrow.vector.ipc.ArrowStreamReader;
2142
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
2243
import org.apache.arrow.vector.types.pojo.Field;
44+
import org.apache.arrow.vector.types.pojo.FieldType;
2345
import org.apache.arrow.vector.types.pojo.Schema;
46+
import org.apache.arrow.vector.util.Text;
2447

2548
public class ArrowHelper {
49+
public static final String CQ_EXTENSION_INCREMENTAL = "cq:extension:incremental";
50+
public static final String CQ_EXTENSION_CONSTRAINT_NAME = "cq:extension:constraint_name";
51+
public static final String CQ_EXTENSION_PRIMARY_KEY = "cq:extension:primary_key";
52+
public static final String CQ_EXTENSION_UNIQUE = "cq:extension:unique";
2653
public static final String CQ_TABLE_NAME = "cq:table_name";
2754
public static final String CQ_TABLE_TITLE = "cq:table_title";
2855
public static final String CQ_TABLE_DESCRIPTION = "cq:table_description";
2956
public static final String CQ_TABLE_DEPENDS_ON = "cq:table_depends_on";
3057

58+
private static void setVectorData(FieldVector vector, Object data) {
59+
vector.allocateNew();
60+
if (vector instanceof BigIntVector) {
61+
((BigIntVector) vector).set(0, (long) data);
62+
return;
63+
}
64+
if (vector instanceof BitVector) {
65+
((BitVector) vector).set(0, (int) data);
66+
return;
67+
}
68+
if (vector instanceof FixedSizeBinaryVector) {
69+
((FixedSizeBinaryVector) vector).set(0, (byte[]) data);
70+
return;
71+
}
72+
if (vector instanceof Float4Vector) {
73+
((Float4Vector) vector).set(0, (float) data);
74+
return;
75+
}
76+
if (vector instanceof Float8Vector) {
77+
((Float8Vector) vector).set(0, (double) data);
78+
return;
79+
}
80+
if (vector instanceof IntVector) {
81+
((IntVector) vector).set(0, (int) data);
82+
return;
83+
}
84+
if (vector instanceof LargeVarBinaryVector) {
85+
((LargeVarBinaryVector) vector).set(0, (byte[]) data);
86+
return;
87+
}
88+
if (vector instanceof LargeVarCharVector) {
89+
((LargeVarCharVector) vector).set(0, (Text) data);
90+
return;
91+
}
92+
if (vector instanceof SmallIntVector) {
93+
((SmallIntVector) vector).set(0, (short) data);
94+
return;
95+
}
96+
if (vector instanceof TimeStampVector) {
97+
((TimeStampVector) vector).set(0, (long) data);
98+
return;
99+
}
100+
if (vector instanceof TinyIntVector) {
101+
((TinyIntVector) vector).set(0, (byte) data);
102+
return;
103+
}
104+
if (vector instanceof UInt1Vector) {
105+
((UInt1Vector) vector).set(0, (byte) data);
106+
return;
107+
}
108+
if (vector instanceof UInt2Vector) {
109+
((UInt2Vector) vector).set(0, (short) data);
110+
return;
111+
}
112+
if (vector instanceof UInt4Vector) {
113+
((UInt4Vector) vector).set(0, (int) data);
114+
return;
115+
}
116+
if (vector instanceof UInt8Vector) {
117+
((UInt8Vector) vector).set(0, (long) data);
118+
return;
119+
}
120+
if (vector instanceof VarBinaryVector) {
121+
((VarBinaryVector) vector).set(0, (byte[]) data);
122+
return;
123+
}
124+
if (vector instanceof VarCharVector) {
125+
((VarCharVector) vector).set(0, (Text) data);
126+
return;
127+
}
128+
if (vector instanceof UUIDVector) {
129+
((UUIDVector) vector).set(0, (java.util.UUID) data);
130+
return;
131+
}
132+
if (vector instanceof JSONVector) {
133+
((JSONVector) vector).setSafe(0, (byte[]) data);
134+
return;
135+
}
136+
137+
throw new IllegalArgumentException("Unsupported vector type: " + vector.getClass());
138+
}
139+
31140
public static ByteString encode(Table table) throws IOException {
32141
try (BufferAllocator bufferAllocator = new RootAllocator()) {
33142
Schema schema = toArrowSchema(table);
34-
VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator);
35-
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
36-
try (ArrowStreamWriter writer =
37-
new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) {
38-
writer.start();
39-
writer.end();
40-
return ByteString.copyFrom(out.toByteArray());
143+
try (VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator)) {
144+
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
145+
try (ArrowStreamWriter writer =
146+
new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) {
147+
writer.start();
148+
writer.end();
149+
return ByteString.copyFrom(out.toByteArray());
150+
}
41151
}
42152
}
43153
}
@@ -57,7 +167,15 @@ public static Schema toArrowSchema(Table table) {
57167
Field[] fields = new Field[columns.size()];
58168
for (int i = 0; i < columns.size(); i++) {
59169
Column column = columns.get(i);
60-
Field field = Field.nullable(column.getName(), column.getType());
170+
Map<String, String> metadata = new HashMap<>();
171+
metadata.put(CQ_EXTENSION_UNIQUE, column.isUnique() ? "true" : "false");
172+
metadata.put(CQ_EXTENSION_PRIMARY_KEY, column.isPrimaryKey() ? "true" : "false");
173+
metadata.put(CQ_EXTENSION_INCREMENTAL, column.isIncrementalKey() ? "true" : "false");
174+
Field field =
175+
new Field(
176+
column.getName(),
177+
new FieldType(!column.isNotNull(), column.getType(), null, metadata),
178+
null);
61179
fields[i] = field;
62180
}
63181
Map<String, String> metadata = new HashMap<>();
@@ -71,22 +189,37 @@ public static Schema toArrowSchema(Table table) {
71189
if (table.getParent() != null) {
72190
metadata.put(CQ_TABLE_DEPENDS_ON, table.getParent().getName());
73191
}
192+
metadata.put(CQ_EXTENSION_CONSTRAINT_NAME, table.getConstraintName());
74193
return new Schema(asList(fields), metadata);
75194
}
76195

77196
public static Table fromArrowSchema(Schema schema) {
78197
List<Column> columns = new ArrayList<>();
79198
for (Field field : schema.getFields()) {
80-
columns.add(Column.builder().name(field.getName()).type(field.getType()).build());
199+
boolean isUnique = field.getMetadata().get(CQ_EXTENSION_UNIQUE) == "true";
200+
boolean isPrimaryKey = field.getMetadata().get(CQ_EXTENSION_PRIMARY_KEY) == "true";
201+
boolean isIncrementalKey = field.getMetadata().get(CQ_EXTENSION_INCREMENTAL) == "true";
202+
203+
columns.add(
204+
Column.builder()
205+
.name(field.getName())
206+
.unique(isUnique)
207+
.primaryKey(isPrimaryKey)
208+
.incrementalKey(isIncrementalKey)
209+
.type(field.getType())
210+
.build());
81211
}
82212

83213
Map<String, String> metaData = schema.getCustomMetadata();
84214
String name = metaData.get(CQ_TABLE_NAME);
85215
String title = metaData.get(CQ_TABLE_TITLE);
86216
String description = metaData.get(CQ_TABLE_DESCRIPTION);
87217
String parent = metaData.get(CQ_TABLE_DEPENDS_ON);
218+
String constraintName = metaData.get(CQ_EXTENSION_CONSTRAINT_NAME);
219+
220+
TableBuilder tableBuilder =
221+
Table.builder().name(name).constraintName(constraintName).columns(columns);
88222

89-
TableBuilder tableBuilder = Table.builder().name(name).columns(columns);
90223
if (title != null) {
91224
tableBuilder.title(title);
92225
}
@@ -99,4 +232,29 @@ public static Table fromArrowSchema(Schema schema) {
99232

100233
return tableBuilder.build();
101234
}
235+
236+
public static ByteString encode(Resource resource) throws IOException {
237+
try (BufferAllocator bufferAllocator = new RootAllocator()) {
238+
Table table = resource.getTable();
239+
Schema schema = toArrowSchema(table);
240+
try (VectorSchemaRoot vectorRoot = VectorSchemaRoot.create(schema, bufferAllocator)) {
241+
for (int i = 0; i < table.getColumns().size(); i++) {
242+
FieldVector vector = vectorRoot.getVector(i);
243+
Object data = resource.getData().get(i).get();
244+
setVectorData(vector, data);
245+
}
246+
// TODO: Support encoding multiple resources
247+
vectorRoot.setRowCount(1);
248+
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
249+
try (ArrowStreamWriter writer =
250+
new ArrowStreamWriter(vectorRoot, null, Channels.newChannel(out))) {
251+
writer.start();
252+
writer.writeBatch();
253+
writer.end();
254+
return ByteString.copyFrom(out.toByteArray());
255+
}
256+
}
257+
}
258+
}
259+
}
102260
}

Diff for: lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java

+3
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public void init(
5353
responseObserver.onNext(io.cloudquery.plugin.v3.Init.Response.newBuilder().build());
5454
responseObserver.onCompleted();
5555
} catch (Exception e) {
56+
plugin.getLogger().error("Error initializing plugin", e);
5657
responseObserver.onError(e);
5758
}
5859
}
@@ -77,6 +78,7 @@ public void getTables(
7778
.build());
7879
responseObserver.onCompleted();
7980
} catch (Exception e) {
81+
plugin.getLogger().error("Error getting tables", e);
8082
responseObserver.onError(e);
8183
}
8284
}
@@ -95,6 +97,7 @@ public void sync(
9597
request.getBackend().getTableName(), request.getBackend().getConnection()),
9698
responseObserver);
9799
} catch (Exception e) {
100+
plugin.getLogger().error("Error syncing tables", e);
98101
responseObserver.onError(e);
99102
}
100103
}

Diff for: lib/src/main/java/io/cloudquery/memdb/MemDB.java

+44-33
Original file line numberDiff line numberDiff line change
@@ -8,44 +8,56 @@
88
import io.cloudquery.plugin.TableOutputStream;
99
import io.cloudquery.scheduler.Scheduler;
1010
import io.cloudquery.schema.ClientMeta;
11-
import io.cloudquery.schema.Column;
1211
import io.cloudquery.schema.Resource;
1312
import io.cloudquery.schema.SchemaException;
1413
import io.cloudquery.schema.Table;
1514
import io.cloudquery.schema.TableResolver;
15+
import io.cloudquery.transformers.Tables;
16+
import io.cloudquery.transformers.TransformWithClass;
1617
import io.grpc.stub.StreamObserver;
1718
import java.util.List;
18-
import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;
19+
import java.util.UUID;
1920

2021
public class MemDB extends Plugin {
21-
private List<Table> allTables =
22-
List.of(
23-
Table.builder()
24-
.name("table1")
25-
.resolver(
26-
new TableResolver() {
27-
@Override
28-
public void resolve(
29-
ClientMeta clientMeta, Resource parent, TableOutputStream stream) {
30-
stream.write(Table1Data.builder().name("name1").build());
31-
stream.write(Table1Data.builder().name("name2").build());
32-
}
33-
})
34-
.columns(List.of(Column.builder().name("name").type(new Utf8()).build()))
35-
.build(),
36-
Table.builder()
37-
.name("table2")
38-
.resolver(
39-
new TableResolver() {
40-
@Override
41-
public void resolve(
42-
ClientMeta clientMeta, Resource parent, TableOutputStream stream) {
43-
stream.write(Table2Data.builder().id("id1").build());
44-
stream.write(Table2Data.builder().id("id2").build());
45-
}
46-
})
47-
.columns(List.of(Column.builder().name("id").type(new Utf8()).build()))
48-
.build());
22+
private static List<Table> getTables() {
23+
return List.of(
24+
Table.builder()
25+
.name("table1")
26+
.resolver(
27+
new TableResolver() {
28+
@Override
29+
public void resolve(
30+
ClientMeta clientMeta, Resource parent, TableOutputStream stream) {
31+
stream.write(
32+
Table1Data.builder()
33+
.id(UUID.fromString("46b2b6e6-8f3e-4340-a721-4aa0786b1cc0"))
34+
.name("name1")
35+
.build());
36+
stream.write(
37+
Table1Data.builder()
38+
.id(UUID.fromString("e89f95df-a389-4f1b-9ba6-1fab565523d6"))
39+
.name("name2")
40+
.build());
41+
}
42+
})
43+
.transform(TransformWithClass.builder(Table1Data.class).pkField("id").build())
44+
.build(),
45+
Table.builder()
46+
.name("table2")
47+
.resolver(
48+
new TableResolver() {
49+
@Override
50+
public void resolve(
51+
ClientMeta clientMeta, Resource parent, TableOutputStream stream) {
52+
stream.write(Table2Data.builder().id(1).name("name1").build());
53+
stream.write(Table2Data.builder().id(2).name("name2").build());
54+
}
55+
})
56+
.transform(TransformWithClass.builder(Table2Data.class).pkField("id").build())
57+
.build());
58+
}
59+
60+
private List<Table> allTables;
4961

5062
private Spec spec;
5163

@@ -107,10 +119,9 @@ public void close() {
107119

108120
@Override
109121
public ClientMeta newClient(String spec, NewClientOptions options) throws Exception {
110-
if (options.isNoConnection()) {
111-
return null;
112-
}
113122
this.spec = Spec.fromJSON(spec);
123+
this.allTables = getTables();
124+
Tables.transformTables(allTables);
114125
return new MemDBClient();
115126
}
116127
}
+2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package io.cloudquery.memdb;
22

3+
import java.util.UUID;
34
import lombok.Builder;
45
import lombok.Getter;
56

67
@Builder
78
@Getter
89
public class Table1Data {
10+
private UUID id;
911
private String name;
1012
}

0 commit comments

Comments
 (0)