Skip to content

feat: Encode resources with data #88

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ dependencies {
test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", "failed"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this because it makes it a pain to see the failing tests

events "skipped", "failed"
}
jvmArgs("--add-opens=java.base/java.nio=ALL-UNNAMED")
}
Expand Down Expand Up @@ -100,6 +100,7 @@ task runMemDBServe(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = javaMainClass
args = ["serve"]
jvmArgs = ["--add-opens=java.base/java.nio=ALL-UNNAMED"]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed this for the arrow serialization

}

spotless {
Expand Down
178 changes: 168 additions & 10 deletions lib/src/main/java/io/cloudquery/helper/ArrowHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

import com.google.protobuf.ByteString;
import io.cloudquery.schema.Column;
import io.cloudquery.schema.Resource;
import io.cloudquery.schema.Table;
import io.cloudquery.schema.Table.TableBuilder;
import io.cloudquery.types.JSONType.JSONVector;
import io.cloudquery.types.UUIDType.UUIDVector;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
Expand All @@ -15,29 +18,136 @@
import java.util.Map;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.LargeVarBinaryVector;
import org.apache.arrow.vector.LargeVarCharVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.UInt1Vector;
import org.apache.arrow.vector.UInt2Vector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.UInt8Vector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.Text;

public class ArrowHelper {
public static final String CQ_EXTENSION_INCREMENTAL = "cq:extension:incremental";
public static final String CQ_EXTENSION_CONSTRAINT_NAME = "cq:extension:constraint_name";
public static final String CQ_EXTENSION_PRIMARY_KEY = "cq:extension:primary_key";
public static final String CQ_EXTENSION_UNIQUE = "cq:extension:unique";
public static final String CQ_TABLE_NAME = "cq:table_name";
public static final String CQ_TABLE_TITLE = "cq:table_title";
public static final String CQ_TABLE_DESCRIPTION = "cq:table_description";
public static final String CQ_TABLE_DEPENDS_ON = "cq:table_depends_on";

private static void setVectorData(FieldVector vector, Object data) {
vector.allocateNew();
if (vector instanceof BigIntVector) {
((BigIntVector) vector).set(0, (long) data);
return;
}
Comment on lines +60 to +63
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can follow this pattern which we do elsewhere to remove the cast on the line below:

Suggested change
if (vector instanceof BigIntVector) {
((BigIntVector) vector).set(0, (long) data);
return;
}
if (vector instanceof BigIntVector bigIntVector) {
bigIntVector.set(0, (long) data);
return;
}

if (vector instanceof BitVector) {
((BitVector) vector).set(0, (int) data);
return;
}
if (vector instanceof FixedSizeBinaryVector) {
((FixedSizeBinaryVector) vector).set(0, (byte[]) data);
return;
}
if (vector instanceof Float4Vector) {
((Float4Vector) vector).set(0, (float) data);
return;
}
if (vector instanceof Float8Vector) {
((Float8Vector) vector).set(0, (double) data);
return;
}
if (vector instanceof IntVector) {
((IntVector) vector).set(0, (int) data);
return;
}
if (vector instanceof LargeVarBinaryVector) {
((LargeVarBinaryVector) vector).set(0, (byte[]) data);
return;
}
if (vector instanceof LargeVarCharVector) {
((LargeVarCharVector) vector).set(0, (Text) data);
return;
}
if (vector instanceof SmallIntVector) {
((SmallIntVector) vector).set(0, (short) data);
return;
}
if (vector instanceof TimeStampVector) {
((TimeStampVector) vector).set(0, (long) data);
return;
}
if (vector instanceof TinyIntVector) {
((TinyIntVector) vector).set(0, (byte) data);
return;
}
if (vector instanceof UInt1Vector) {
((UInt1Vector) vector).set(0, (byte) data);
return;
}
if (vector instanceof UInt2Vector) {
((UInt2Vector) vector).set(0, (short) data);
return;
}
if (vector instanceof UInt4Vector) {
((UInt4Vector) vector).set(0, (int) data);
return;
}
if (vector instanceof UInt8Vector) {
((UInt8Vector) vector).set(0, (long) data);
return;
}
if (vector instanceof VarBinaryVector) {
((VarBinaryVector) vector).set(0, (byte[]) data);
return;
}
if (vector instanceof VarCharVector) {
((VarCharVector) vector).set(0, (Text) data);
return;
}
if (vector instanceof UUIDVector) {
((UUIDVector) vector).set(0, (java.util.UUID) data);
return;
}
if (vector instanceof JSONVector) {
((JSONVector) vector).setSafe(0, (byte[]) data);
return;
}

throw new IllegalArgumentException("Unsupported vector type: " + vector.getClass());
}

public static ByteString encode(Table table) throws IOException {
try (BufferAllocator bufferAllocator = new RootAllocator()) {
Schema schema = toArrowSchema(table);
VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator);
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
try (ArrowStreamWriter writer =
new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) {
writer.start();
writer.end();
return ByteString.copyFrom(out.toByteArray());
try (VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently we need to close VectorSchemaRoot (which closes all the underlying vectors) to avoid memory leaks. It's only an issue when we set data on the vectors so we I didn't see it on the table encode before.

try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
try (ArrowStreamWriter writer =
new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) {
writer.start();
writer.end();
return ByteString.copyFrom(out.toByteArray());
}
}
}
}
Expand All @@ -57,7 +167,15 @@ public static Schema toArrowSchema(Table table) {
Field[] fields = new Field[columns.size()];
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
Field field = Field.nullable(column.getName(), column.getType());
Map<String, String> metadata = new HashMap<>();
metadata.put(CQ_EXTENSION_UNIQUE, column.isUnique() ? "true" : "false");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this just be Boolean.toString(column.isUnique())?

metadata.put(CQ_EXTENSION_PRIMARY_KEY, column.isPrimaryKey() ? "true" : "false");
metadata.put(CQ_EXTENSION_INCREMENTAL, column.isIncrementalKey() ? "true" : "false");
Field field =
new Field(
column.getName(),
new FieldType(!column.isNotNull(), column.getType(), null, metadata),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First argument of FieldType says if the field is nullable

null);
fields[i] = field;
}
Map<String, String> metadata = new HashMap<>();
Expand All @@ -71,22 +189,37 @@ public static Schema toArrowSchema(Table table) {
if (table.getParent() != null) {
metadata.put(CQ_TABLE_DEPENDS_ON, table.getParent().getName());
}
metadata.put(CQ_EXTENSION_CONSTRAINT_NAME, table.getConstraintName());
return new Schema(asList(fields), metadata);
}

public static Table fromArrowSchema(Schema schema) {
List<Column> columns = new ArrayList<>();
for (Field field : schema.getFields()) {
columns.add(Column.builder().name(field.getName()).type(field.getType()).build());
boolean isUnique = field.getMetadata().get(CQ_EXTENSION_UNIQUE) == "true";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
boolean isUnique = field.getMetadata().get(CQ_EXTENSION_UNIQUE) == "true";
boolean isUnique = field.getMetadata().get(CQ_EXTENSION_UNIQUE).equals("true");

Or use Objects.equals(field.getMetadata().get(CQ_EXTENSION_UNIQUE), "true") if it can be null.

boolean isPrimaryKey = field.getMetadata().get(CQ_EXTENSION_PRIMARY_KEY) == "true";
boolean isIncrementalKey = field.getMetadata().get(CQ_EXTENSION_INCREMENTAL) == "true";

columns.add(
Column.builder()
.name(field.getName())
.unique(isUnique)
.primaryKey(isPrimaryKey)
.incrementalKey(isIncrementalKey)
.type(field.getType())
.build());
}

Map<String, String> metaData = schema.getCustomMetadata();
String name = metaData.get(CQ_TABLE_NAME);
String title = metaData.get(CQ_TABLE_TITLE);
String description = metaData.get(CQ_TABLE_DESCRIPTION);
String parent = metaData.get(CQ_TABLE_DEPENDS_ON);
String constraintName = metaData.get(CQ_EXTENSION_CONSTRAINT_NAME);

TableBuilder tableBuilder =
Table.builder().name(name).constraintName(constraintName).columns(columns);

TableBuilder tableBuilder = Table.builder().name(name).columns(columns);
if (title != null) {
tableBuilder.title(title);
}
Expand All @@ -99,4 +232,29 @@ public static Table fromArrowSchema(Schema schema) {

return tableBuilder.build();
}

public static ByteString encode(Resource resource) throws IOException {
try (BufferAllocator bufferAllocator = new RootAllocator()) {
Table table = resource.getTable();
Schema schema = toArrowSchema(table);
try (VectorSchemaRoot vectorRoot = VectorSchemaRoot.create(schema, bufferAllocator)) {
for (int i = 0; i < table.getColumns().size(); i++) {
FieldVector vector = vectorRoot.getVector(i);
Object data = resource.getData().get(i).get();
setVectorData(vector, data);
}
// TODO: Support encoding multiple resources
vectorRoot.setRowCount(1);
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
try (ArrowStreamWriter writer =
new ArrowStreamWriter(vectorRoot, null, Channels.newChannel(out))) {
writer.start();
writer.writeBatch();
writer.end();
return ByteString.copyFrom(out.toByteArray());
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public void init(
responseObserver.onNext(io.cloudquery.plugin.v3.Init.Response.newBuilder().build());
responseObserver.onCompleted();
} catch (Exception e) {
plugin.getLogger().error("Error initializing plugin", e);
responseObserver.onError(e);
}
}
Expand All @@ -77,6 +78,7 @@ public void getTables(
.build());
responseObserver.onCompleted();
} catch (Exception e) {
plugin.getLogger().error("Error getting tables", e);
responseObserver.onError(e);
}
}
Expand All @@ -95,6 +97,7 @@ public void sync(
request.getBackend().getTableName(), request.getBackend().getConnection()),
responseObserver);
} catch (Exception e) {
plugin.getLogger().error("Error syncing tables", e);
responseObserver.onError(e);
}
}
Expand Down
77 changes: 44 additions & 33 deletions lib/src/main/java/io/cloudquery/memdb/MemDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,56 @@
import io.cloudquery.plugin.TableOutputStream;
import io.cloudquery.scheduler.Scheduler;
import io.cloudquery.schema.ClientMeta;
import io.cloudquery.schema.Column;
import io.cloudquery.schema.Resource;
import io.cloudquery.schema.SchemaException;
import io.cloudquery.schema.Table;
import io.cloudquery.schema.TableResolver;
import io.cloudquery.transformers.Tables;
import io.cloudquery.transformers.TransformWithClass;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;
import java.util.UUID;

public class MemDB extends Plugin {
private List<Table> allTables =
List.of(
Table.builder()
.name("table1")
.resolver(
new TableResolver() {
@Override
public void resolve(
ClientMeta clientMeta, Resource parent, TableOutputStream stream) {
stream.write(Table1Data.builder().name("name1").build());
stream.write(Table1Data.builder().name("name2").build());
}
})
.columns(List.of(Column.builder().name("name").type(new Utf8()).build()))
.build(),
Table.builder()
.name("table2")
.resolver(
new TableResolver() {
@Override
public void resolve(
ClientMeta clientMeta, Resource parent, TableOutputStream stream) {
stream.write(Table2Data.builder().id("id1").build());
stream.write(Table2Data.builder().id("id2").build());
}
})
.columns(List.of(Column.builder().name("id").type(new Utf8()).build()))
.build());
private static List<Table> getTables() {
return List.of(
Table.builder()
.name("table1")
.resolver(
new TableResolver() {
@Override
public void resolve(
ClientMeta clientMeta, Resource parent, TableOutputStream stream) {
stream.write(
Table1Data.builder()
.id(UUID.fromString("46b2b6e6-8f3e-4340-a721-4aa0786b1cc0"))
.name("name1")
.build());
stream.write(
Table1Data.builder()
.id(UUID.fromString("e89f95df-a389-4f1b-9ba6-1fab565523d6"))
.name("name2")
.build());
}
})
.transform(TransformWithClass.builder(Table1Data.class).pkField("id").build())
.build(),
Table.builder()
.name("table2")
.resolver(
new TableResolver() {
@Override
public void resolve(
ClientMeta clientMeta, Resource parent, TableOutputStream stream) {
stream.write(Table2Data.builder().id(1).name("name1").build());
stream.write(Table2Data.builder().id(2).name("name2").build());
}
})
.transform(TransformWithClass.builder(Table2Data.class).pkField("id").build())
.build());
}

private List<Table> allTables;

private Spec spec;

Expand Down Expand Up @@ -107,10 +119,9 @@ public void close() {

@Override
public ClientMeta newClient(String spec, NewClientOptions options) throws Exception {
if (options.isNoConnection()) {
return null;
}
this.spec = Spec.fromJSON(spec);
this.allTables = getTables();
Tables.transformTables(allTables);
return new MemDBClient();
}
}
2 changes: 2 additions & 0 deletions lib/src/main/java/io/cloudquery/memdb/Table1Data.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.cloudquery.memdb;

import java.util.UUID;
import lombok.Builder;
import lombok.Getter;

@Builder
@Getter
public class Table1Data {
private UUID id;
private String name;
}
Loading