Skip to content

feat: Implement concurrency and relations resolving #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions lib/src/main/java/io/cloudquery/memdb/MemDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,27 @@ public void resolve(
}
})
.transform(TransformWithClass.builder(Table2Data.class).pkField("id").build())
.relations(
List.of(
Table.builder()
.name("table2_child")
.resolver(
new TableResolver() {

@Override
public void resolve(
ClientMeta clientMeta,
Resource parent,
TableOutputStream stream) {
String parentName = parent.get("name").toString();
stream.write(
Table2ChildData.builder().name(parentName + "_name1").build());
stream.write(
Table2ChildData.builder().name(parentName + "_name2").build());
}
})
.transform(TransformWithClass.builder(Table2ChildData.class).build())
.build()))
.build());
}

Expand Down
2 changes: 1 addition & 1 deletion lib/src/main/java/io/cloudquery/memdb/Spec.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static Spec fromJSON(String json) throws JsonMappingException, JsonProces
ObjectMapper objectMapper = new ObjectMapper();
Spec spec = objectMapper.readValue(json, Spec.class);
if (spec.getConcurrency() == 0) {
spec.setConcurrency(10000);
spec.setConcurrency(100);
}
return spec;
}
Expand Down
10 changes: 10 additions & 0 deletions lib/src/main/java/io/cloudquery/memdb/Table2ChildData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.cloudquery.memdb;

import lombok.Builder;
import lombok.Getter;

@Builder
@Getter
public class Table2ChildData {
private String name;
}

This file was deleted.

83 changes: 62 additions & 21 deletions lib/src/main/java/io/cloudquery/scheduler/Scheduler.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package io.cloudquery.scheduler;

import com.google.protobuf.ByteString;
import io.cloudquery.helper.ArrowHelper;
import io.cloudquery.plugin.v3.Sync;
import io.cloudquery.schema.ClientMeta;
import io.cloudquery.schema.Resource;
import io.cloudquery.schema.Table;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.Builder;
import lombok.NonNull;
import org.apache.logging.log4j.Logger;
Expand All @@ -20,8 +25,58 @@ public class Scheduler {
private int concurrency;
private boolean deterministicCqId;

public void sync() {
private void resolveTables(List<Table> tables, Resource parent, int concurrency)
throws InterruptedException {
if (tables == null || tables.isEmpty()) {
return;
}
ExecutorService executor = Executors.newFixedThreadPool(Math.min(tables.size(), concurrency));
for (Table table : tables) {
final int nextLevelConcurrency = Math.max(1, concurrency / 2);
executor.submit(
new Runnable() {
@Override
public void run() {
try {
String tableMessage =
parent != null
? "table " + table.getName() + " of parent" + parent.getTable().getName()
: "table " + table.getName();

logger.info("resolving {}", tableMessage);
if (!table.getResolver().isPresent()) {
logger.error("no resolver for {}", tableMessage);
return;
}

SchedulerTableOutputStream schedulerTableOutputStream =
new SchedulerTableOutputStream(table, parent, client, logger);
table.getResolver().get().resolve(client, parent, schedulerTableOutputStream);

for (Resource resource : schedulerTableOutputStream.getResources()) {
ByteString record = resource.encode();
Sync.MessageInsert insert =
Sync.MessageInsert.newBuilder().setRecord(record).build();
Sync.Response response = Sync.Response.newBuilder().setInsert(insert).build();
syncStream.onNext(response);
resolveTables(table.getRelations(), resource, nextLevelConcurrency);
}

logger.info("resolved {}", tableMessage);
} catch (Exception e) {
logger.error("Failed to resolve table: {}", table.getName(), e);
syncStream.onError(e);
return;
}
}
});
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
}

public void sync() {
for (Table table : Table.flattenTables(tables)) {
try {
logger.info("sending migrate message for table: {}", table.getName());
Sync.MessageMigrateTable migrateTable =
Expand All @@ -34,26 +89,12 @@ public void sync() {
}
}

for (Table table : tables) {
try {
logger.info("resolving table: {}", table.getName());
if (!table.getResolver().isPresent()) {
logger.error("no resolver for table: {}", table.getName());
continue;
}
SchedulerTableOutputStream schedulerTableOutputStream =
SchedulerTableOutputStream.builder()
.table(table)
.client(client)
.logger(logger)
.syncStream(syncStream)
.build();
table.getResolver().get().resolve(client, null, schedulerTableOutputStream);
logger.info("resolved table: {}", table.getName());
} catch (Exception e) {
syncStream.onError(e);
return;
}
try {
resolveTables(this.tables, null, this.concurrency);
} catch (InterruptedException e) {
logger.error("Failed to resolve tables", e);
syncStream.onError(e);
return;
}

syncStream.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,52 +1,71 @@
package io.cloudquery.scheduler;

import com.google.protobuf.ByteString;
import io.cloudquery.plugin.TableOutputStream;
import io.cloudquery.plugin.v3.Sync;
import io.cloudquery.schema.ClientMeta;
import io.cloudquery.schema.Column;
import io.cloudquery.schema.Resource;
import io.cloudquery.schema.Table;
import io.cloudquery.transformers.TransformerException;
import io.grpc.stub.StreamObserver;
import lombok.Builder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import org.apache.logging.log4j.Logger;

@Builder
public class SchedulerTableOutputStream implements TableOutputStream {
private static final int RESOURCE_RESOLVE_CONCURRENCY = 100;
private static final int RESOURCE_RESOLVE_TIMEOUT_MINUTES = 10;
@NonNull private final Table table;
private final Resource parent;
@NonNull private final ClientMeta client;
@NonNull private final Logger logger;
@NonNull private final StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream;

private List<Resource> resources = new ArrayList<Resource>();

private ExecutorService executor;

public SchedulerTableOutputStream(
@NonNull Table table, Resource parent, @NonNull ClientMeta client, @NonNull Logger logger) {
this.table = table;
this.parent = parent;
this.client = client;
this.logger = logger;
this.executor = Executors.newFixedThreadPool(RESOURCE_RESOLVE_CONCURRENCY);
}

@Override
public void write(Object data) {
Resource resource = Resource.builder().table(table).parent(parent).item(data).build();
for (Column column : table.getColumns()) {
try {
logger.info("resolving column: {}", column.getName());
if (column.getResolver() == null) {
logger.error("no resolver for column: {}", column.getName());
continue;
}
column.getResolver().resolve(client, resource, column);
logger.info("resolved column: {}", column.getName());
} catch (TransformerException e) {
logger.error("Failed to resolve column: {}", column.getName(), e);
return;
}
executor.submit(
new Runnable() {
@Override
public void run() {
try {
logger.debug("resolving column: {}", column.getName());
if (column.getResolver() == null) {
logger.error("no resolver for column: {}", column.getName());
return;
}
column.getResolver().resolve(client, resource, column);
logger.debug("resolved column: {}", column.getName());
return;
} catch (TransformerException e) {
logger.error("Failed to resolve column: {}", column.getName(), e);
return;
}
}
});
}
resources.add(resource);
}

try {
ByteString record = resource.encode();
Sync.MessageInsert insert = Sync.MessageInsert.newBuilder().setRecord(record).build();
Sync.Response response = Sync.Response.newBuilder().setInsert(insert).build();
syncStream.onNext(response);
} catch (Exception e) {
logger.error("Failed to encode resource: {}", resource, e);
return;
}
public List<Resource> getResources() throws InterruptedException {
// TODO: Optimize this to not wait for all resources to complete
executor.shutdown();
executor.awaitTermination(RESOURCE_RESOLVE_TIMEOUT_MINUTES, TimeUnit.MINUTES);
return this.resources;
}
}
4 changes: 2 additions & 2 deletions lib/src/main/java/io/cloudquery/schema/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io.cloudquery.transformers.TransformerException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -28,7 +28,7 @@ public interface Transform {
}

public static List<Table> flattenTables(List<Table> tables) {
Map<String, Table> flattenMap = new HashMap<>();
Map<String, Table> flattenMap = new LinkedHashMap<>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is important to keep the order of the tables from the Spec

for (Table table : tables) {
Table newTable = table.toBuilder().relations(Collections.emptyList()).build();
flattenMap.put(newTable.name, newTable);
Expand Down