1
1
package io .cloudquery .internal .servers .plugin .v3 ;
2
2
3
3
import io .cloudquery .plugin .v3 .PluginGrpc .PluginImplBase ;
4
+ import io .cloudquery .schema .Table ;
5
+ import io .cloudquery .plugin .v3 .Write ;
6
+ import io .grpc .stub .StreamObserver ;
7
+
8
+ import java .io .ByteArrayOutputStream ;
9
+ import java .nio .channels .Channels ;
10
+ import java .util .ArrayList ;
11
+ import java .util .List ;
12
+
13
+ import org .apache .arrow .memory .BufferAllocator ;
14
+ import org .apache .arrow .memory .RootAllocator ;
15
+ import org .apache .arrow .vector .VectorSchemaRoot ;
16
+ import org .apache .arrow .vector .ipc .ArrowStreamWriter ;
17
+ import org .apache .arrow .vector .types .pojo .Schema ;
18
+ import com .google .protobuf .ByteString ;
19
+
4
20
import io .cloudquery .plugin .Plugin ;
5
21
6
22
public class PluginServer extends PluginImplBase {
@@ -9,4 +25,100 @@ public class PluginServer extends PluginImplBase {
9
25
public PluginServer (Plugin plugin ) {
10
26
this .plugin = plugin ;
11
27
}
28
+
29
+ @ Override
30
+ public void getName (io .cloudquery .plugin .v3 .GetName .Request request ,
31
+ StreamObserver <io .cloudquery .plugin .v3 .GetName .Response > responseObserver ) {
32
+ responseObserver
33
+ .onNext (io .cloudquery .plugin .v3 .GetName .Response .newBuilder ().setName (plugin .getName ()).build ());
34
+ responseObserver .onCompleted ();
35
+ }
36
+
37
+ @ Override
38
+ public void getVersion (io .cloudquery .plugin .v3 .GetVersion .Request request ,
39
+ StreamObserver <io .cloudquery .plugin .v3 .GetVersion .Response > responseObserver ) {
40
+ responseObserver .onNext (
41
+ io .cloudquery .plugin .v3 .GetVersion .Response .newBuilder ().setVersion (plugin .getVersion ()).build ());
42
+ responseObserver .onCompleted ();
43
+ }
44
+
45
+ @ Override
46
+ public void init (io .cloudquery .plugin .v3 .Init .Request request ,
47
+ StreamObserver <io .cloudquery .plugin .v3 .Init .Response > responseObserver ) {
48
+ plugin .init ();
49
+ responseObserver .onNext (io .cloudquery .plugin .v3 .Init .Response .newBuilder ().build ());
50
+ responseObserver .onCompleted ();
51
+ }
52
+
53
+ @ Override
54
+ public void getTables (io .cloudquery .plugin .v3 .GetTables .Request request ,
55
+ StreamObserver <io .cloudquery .plugin .v3 .GetTables .Response > responseObserver ) {
56
+ try {
57
+ List <Table > tables = plugin .tables ();
58
+ List <ByteString > byteStrings = new ArrayList <>();
59
+ for (Table table : tables ) {
60
+ try (BufferAllocator bufferAllocator = new RootAllocator ()) {
61
+ Schema schema = table .toArrowSchema ();
62
+ VectorSchemaRoot schemaRoot = VectorSchemaRoot .create (schema , bufferAllocator );
63
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream ()) {
64
+ try (ArrowStreamWriter writer = new ArrowStreamWriter (schemaRoot , null ,
65
+ Channels .newChannel (out ))) {
66
+ writer .start ();
67
+ writer .end ();
68
+ byteStrings .add (ByteString .copyFrom (out .toByteArray ()));
69
+ }
70
+ }
71
+ }
72
+ }
73
+ responseObserver
74
+ .onNext (io .cloudquery .plugin .v3 .GetTables .Response .newBuilder ().addAllTables (byteStrings ).build ());
75
+ responseObserver .onCompleted ();
76
+ } catch (Exception e ) {
77
+ responseObserver .onError (e );
78
+ }
79
+ }
80
+
81
+ @ Override
82
+ public void sync (io .cloudquery .plugin .v3 .Sync .Request request ,
83
+ StreamObserver <io .cloudquery .plugin .v3 .Sync .Response > responseObserver ) {
84
+ plugin .sync ();
85
+ responseObserver .onNext (io .cloudquery .plugin .v3 .Sync .Response .newBuilder ().build ());
86
+ responseObserver .onCompleted ();
87
+ }
88
+
89
+ @ Override
90
+ public void read (io .cloudquery .plugin .v3 .Read .Request request ,
91
+ StreamObserver <io .cloudquery .plugin .v3 .Read .Response > responseObserver ) {
92
+ plugin .read ();
93
+ responseObserver .onNext (io .cloudquery .plugin .v3 .Read .Response .newBuilder ().build ());
94
+ responseObserver .onCompleted ();
95
+ }
96
+
97
+ @ Override
98
+ public StreamObserver <Write .Request > write (StreamObserver <Write .Response > responseObserver ) {
99
+ plugin .write ();
100
+ return new StreamObserver <>() {
101
+ @ Override
102
+ public void onNext (Write .Request request ) {
103
+ }
104
+
105
+ @ Override
106
+ public void onError (Throwable t ) {
107
+ }
108
+
109
+ @ Override
110
+ public void onCompleted () {
111
+ responseObserver .onNext (Write .Response .newBuilder ().build ());
112
+ responseObserver .onCompleted ();
113
+ }
114
+ };
115
+ }
116
+
117
+ @ Override
118
+ public void close (io .cloudquery .plugin .v3 .Close .Request request ,
119
+ StreamObserver <io .cloudquery .plugin .v3 .Close .Response > responseObserver ) {
120
+ plugin .close ();
121
+ responseObserver .onNext (io .cloudquery .plugin .v3 .Close .Response .newBuilder ().build ());
122
+ responseObserver .onCompleted ();
123
+ }
12
124
}
0 commit comments