From 2f5ec2c7e01ca7dd5375e3e3634523fac84ee4c1 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Fri, 16 Jun 2023 14:49:00 +0300 Subject: [PATCH 1/7] feat: Add Plugin V3 --- discovery/v1/discovery.proto | 16 ++++ plugin/v3/plugin.proto | 141 +++++++++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+) create mode 100644 discovery/v1/discovery.proto create mode 100644 plugin/v3/plugin.proto diff --git a/discovery/v1/discovery.proto b/discovery/v1/discovery.proto new file mode 100644 index 0000000..1e167d5 --- /dev/null +++ b/discovery/v1/discovery.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; +package cloudquery.discovery.v1; + +option go_package = "github.com/cloudquery/plugin-pb-go/pb/discovery/v1;discovery"; + +service Discovery { + // Get the name of the plugin + rpc GetVersions(GetVersions.Request) returns (GetVersions.Response); +} + +message GetVersions { + message Request {} + message Response { + repeated uint64 versions = 1; + } +} diff --git a/plugin/v3/plugin.proto b/plugin/v3/plugin.proto new file mode 100644 index 0000000..7535e64 --- /dev/null +++ b/plugin/v3/plugin.proto @@ -0,0 +1,141 @@ +syntax = "proto3"; +package cloudquery.plugin.v3; + +import "google/protobuf/timestamp.proto"; + +option go_package = "github.com/cloudquery/plugin-pb-go/pb/plugin/v3;plugin"; + +service Plugin { + // Get the name of the plugin + rpc GetName(GetName.Request) returns (GetName.Response); + // Get the current version of the plugin + rpc GetVersion(GetVersion.Request) returns (GetVersion.Response); + // Configure the plugin with the given credentials and mode + rpc Init(Init.Request) returns (Init.Response); + // Get all tables the source plugin supports. Must be called after Init + rpc GetTables(GetTables.Request) returns (GetTables.Response); + // Start the sync the source plugin + rpc Sync(Sync.Request) returns (stream Sync.Response); + // Write resources + rpc Write(stream Write.Request) returns (Write.Response); + // Send signal to flush and close open connections + rpc Close(Close.Request) returns (Close.Response); +} + +enum REGISTRY { + REGISTRY_GITHUB = 0; + REGISTRY_GRPC = 1; + REGISTRY_LOCAL = 2; +} + +enum SCHEDULER { + SCHEDULER_DFS = 0; + SCHEDULER_ROUND_ROBIN = 1; +} + +enum Operator { + EQUAL = 0; + LESS_THAN = 1; +} + +enum PK_MODE { + DEFAULT = 0; + CQ_ID_ONLY = 1; +} + +message StateBackendSpec { + string name = 1; + string path = 2; + string version = 3; + REGISTRY registry = 4; + bytes spec = 5; +} + +message GetName { + message Request {} + message Response { + string name = 1; + } +} + +message GetVersion { + message Request {} + message Response { + string version = 1; + } +} + +message Init { + message Request { + bytes spec = 1; + } + message Response {} +} + +message GetTables { + message Request { + repeated string tables = 1; + repeated string skip_tables = 2; + } + message Response { + // marshalled []arrow.Schema + repeated bytes tables = 1; + } +} + +message WriteOptions { + bool migrate_force = 1; +} + +message MessageCreateTable { + // marshalled arrow.Schema + bytes table = 1; + bool migrate_force = 2; +} + +message MessageInsert { + // marshalled arrow.Record + bytes record = 1; + bool upsert = 2; +} + +message MessageDeleteStale { + // marshalled arrow.Schema + bytes table = 1; + string source_name = 2; + google.protobuf.Timestamp sync_time = 3; +} + +message Sync { + message Request { + repeated string tables = 1; + repeated string skip_tables = 2; + int64 concurrency = 3; + SCHEDULER scheduler = 4; + StateBackendSpec state_backend = 5; + } + message Response { + oneof message { + MessageCreateTable create_table = 1; + MessageInsert insert = 2; + MessageDeleteStale delete = 3; + } + } +} + +message Write { + message Request { + oneof message { + WriteOptions options = 1; + MessageCreateTable create_table = 2; + MessageInsert insert = 3; + MessageDeleteStale delete = 4; + } + } + message Response {} +} + +message Close { + message Request {} + message Response {} +} From 30cebbbd6c09f44518f3ad32f05013215b03d6d8 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Fri, 16 Jun 2023 13:18:14 +0100 Subject: [PATCH 2/7] Rename CreateTable to MigrateTable --- plugin/v3/plugin.proto | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugin/v3/plugin.proto b/plugin/v3/plugin.proto index 7535e64..a17ddad 100644 --- a/plugin/v3/plugin.proto +++ b/plugin/v3/plugin.proto @@ -87,7 +87,7 @@ message WriteOptions { bool migrate_force = 1; } -message MessageCreateTable { +message MessageMigrateTable { // marshalled arrow.Schema bytes table = 1; bool migrate_force = 2; @@ -116,7 +116,7 @@ message Sync { } message Response { oneof message { - MessageCreateTable create_table = 1; + MessageMigrateTable migrate_table = 1; MessageInsert insert = 2; MessageDeleteStale delete = 3; } @@ -127,7 +127,7 @@ message Write { message Request { oneof message { WriteOptions options = 1; - MessageCreateTable create_table = 2; + MessageMigrateTable migrate_table = 2; MessageInsert insert = 3; MessageDeleteStale delete = 4; } From c5faf202dab120c4cfaa7f9151dc6d09ea3c302a Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 20 Jun 2023 17:10:09 +0100 Subject: [PATCH 3/7] Update plugin.proto --- plugin/v3/plugin.proto | 1 + 1 file changed, 1 insertion(+) diff --git a/plugin/v3/plugin.proto b/plugin/v3/plugin.proto index a17ddad..2dc0fc4 100644 --- a/plugin/v3/plugin.proto +++ b/plugin/v3/plugin.proto @@ -67,6 +67,7 @@ message GetVersion { message Init { message Request { + // Internal plugin-specific spec bytes spec = 1; } message Response {} From 02b0af9c5bd7cc19f9e27e2a721062e895e4b6bd Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Wed, 21 Jun 2023 12:42:20 +0300 Subject: [PATCH 4/7] remove unused proto --- plugin/v3/plugin.proto | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/plugin/v3/plugin.proto b/plugin/v3/plugin.proto index 2dc0fc4..2561b9f 100644 --- a/plugin/v3/plugin.proto +++ b/plugin/v3/plugin.proto @@ -28,21 +28,6 @@ enum REGISTRY { REGISTRY_LOCAL = 2; } -enum SCHEDULER { - SCHEDULER_DFS = 0; - SCHEDULER_ROUND_ROBIN = 1; -} - -enum Operator { - EQUAL = 0; - LESS_THAN = 1; -} - -enum PK_MODE { - DEFAULT = 0; - CQ_ID_ONLY = 1; -} - message StateBackendSpec { string name = 1; string path = 2; @@ -111,9 +96,7 @@ message Sync { message Request { repeated string tables = 1; repeated string skip_tables = 2; - int64 concurrency = 3; - SCHEDULER scheduler = 4; - StateBackendSpec state_backend = 5; + StateBackendSpec state_backend = 3; } message Response { oneof message { From 50557453ec2c2dcb326bee277a5a564b4221738d Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Fri, 23 Jun 2023 12:03:15 +0300 Subject: [PATCH 5/7] address review --- plugin/v3/plugin.proto | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/plugin/v3/plugin.proto b/plugin/v3/plugin.proto index 2561b9f..fc3499b 100644 --- a/plugin/v3/plugin.proto +++ b/plugin/v3/plugin.proto @@ -22,17 +22,18 @@ service Plugin { rpc Close(Close.Request) returns (Close.Response); } -enum REGISTRY { - REGISTRY_GITHUB = 0; - REGISTRY_GRPC = 1; - REGISTRY_LOCAL = 2; +enum Registry { + REGISTRY_UNSPECIFIED = 0; + REGISTRY_GITHUB = 1; + REGISTRY_GRPC = 2; + REGISTRY_LOCAL = 3; } message StateBackendSpec { string name = 1; string path = 2; string version = 3; - REGISTRY registry = 4; + Registry registry = 4; bytes spec = 5; } @@ -82,7 +83,6 @@ message MessageMigrateTable { message MessageInsert { // marshalled arrow.Record bytes record = 1; - bool upsert = 2; } message MessageDeleteStale { From 67e51e7c3f05e571de167e51422806eabaede49a Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Fri, 23 Jun 2023 12:04:55 +0300 Subject: [PATCH 6/7] fix review --- plugin/v3/plugin.proto | 1 - 1 file changed, 1 deletion(-) diff --git a/plugin/v3/plugin.proto b/plugin/v3/plugin.proto index fc3499b..80cfff3 100644 --- a/plugin/v3/plugin.proto +++ b/plugin/v3/plugin.proto @@ -77,7 +77,6 @@ message WriteOptions { message MessageMigrateTable { // marshalled arrow.Schema bytes table = 1; - bool migrate_force = 2; } message MessageInsert { From a081f5f6417693a5466efee3621a9b7ac12c327c Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Fri, 23 Jun 2023 12:16:23 +0300 Subject: [PATCH 7/7] add comment --- plugin/v3/plugin.proto | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugin/v3/plugin.proto b/plugin/v3/plugin.proto index 80cfff3..ff09b9a 100644 --- a/plugin/v3/plugin.proto +++ b/plugin/v3/plugin.proto @@ -109,6 +109,8 @@ message Sync { message Write { message Request { oneof message { + // WriteOptions is only used for the first message + // to configure the write stream WriteOptions options = 1; MessageMigrateTable migrate_table = 2; MessageInsert insert = 3;