Skip to content

Commit 50ec9d9

Browse files
authored
feat: Add Plugin Proto V3 (#21)
Instead of #15. Plugin V3 Protocol client/server regenerated from cloudquery/plugin-pb#1
1 parent ba7d275 commit 50ec9d9

File tree

11 files changed

+2523
-15
lines changed

11 files changed

+2523
-15
lines changed

Makefile

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,5 @@ gen-proto:
1818
protoc --proto_path=. --go_out . --go_opt=module="github.com/cloudquery/plugin-pb-go" --go-grpc_out=. --go-grpc_opt=module="github.com/cloudquery/plugin-pb-go" plugin-pb/source/v1/source.proto
1919
protoc --proto_path=. --go_out . --go_opt=module="github.com/cloudquery/plugin-pb-go" --go-grpc_out=. --go-grpc_opt=module="github.com/cloudquery/plugin-pb-go" plugin-pb/source/v2/source.proto
2020
protoc --proto_path=. --go_out . --go_opt=module="github.com/cloudquery/plugin-pb-go" --go-grpc_out=. --go-grpc_opt=module="github.com/cloudquery/plugin-pb-go" plugin-pb/destination/v1/destination.proto
21-
protoc --proto_path=. --go_out . --go_opt=module="github.com/cloudquery/plugin-pb-go" --go-grpc_out=. --go-grpc_opt=module="github.com/cloudquery/plugin-pb-go" plugin-pb/discovery/v1/discovery.proto
21+
protoc --proto_path=. --go_out . --go_opt=module="github.com/cloudquery/plugin-pb-go" --go-grpc_out=. --go-grpc_opt=module="github.com/cloudquery/plugin-pb-go" plugin-pb/discovery/v1/discovery.proto
22+
protoc --proto_path=. --go_out . --go_opt=module="github.com/cloudquery/plugin-pb-go" --go-grpc_out=. --go-grpc_opt=module="github.com/cloudquery/plugin-pb-go" plugin-pb/plugin/v3/plugin.proto

go.mod

+12-4
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ module github.com/cloudquery/plugin-pb-go
33
go 1.19
44

55
require (
6+
github.com/apache/arrow/go/v13 v13.0.0-20230622042343-ec413b7763fe
67
github.com/avast/retry-go/v4 v4.3.4
78
github.com/ghodss/yaml v1.0.0
89
github.com/google/go-cmp v0.5.9
910
github.com/rs/zerolog v1.29.1
1011
github.com/schollz/progressbar/v3 v3.13.1
11-
github.com/stretchr/testify v1.8.4
1212
github.com/thoas/go-funk v0.9.3
1313
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1
1414
google.golang.org/grpc v1.55.0
@@ -17,18 +17,26 @@ require (
1717
)
1818

1919
require (
20-
github.com/davecgh/go-spew v1.1.1 // indirect
20+
github.com/goccy/go-json v0.10.0 // indirect
2121
github.com/golang/protobuf v1.5.3 // indirect
22-
github.com/mattn/go-colorable v0.1.12 // indirect
22+
github.com/google/flatbuffers v23.1.21+incompatible // indirect
23+
github.com/klauspost/compress v1.15.15 // indirect
24+
github.com/klauspost/cpuid/v2 v2.2.3 // indirect
25+
github.com/mattn/go-colorable v0.1.13 // indirect
2326
github.com/mattn/go-isatty v0.0.17 // indirect
2427
github.com/mattn/go-runewidth v0.0.14 // indirect
2528
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
26-
github.com/pmezard/go-difflib v1.0.0 // indirect
29+
github.com/pierrec/lz4/v4 v4.1.17 // indirect
2730
github.com/rivo/uniseg v0.2.0 // indirect
31+
github.com/stretchr/testify v1.8.4 // indirect
32+
github.com/zeebo/xxh3 v1.0.2 // indirect
33+
golang.org/x/mod v0.8.0 // indirect
2834
golang.org/x/net v0.8.0 // indirect
2935
golang.org/x/sys v0.6.0 // indirect
3036
golang.org/x/term v0.6.0 // indirect
3137
golang.org/x/text v0.8.0 // indirect
38+
golang.org/x/tools v0.6.0 // indirect
39+
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
3240
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
3341
gopkg.in/yaml.v2 v2.4.0 // indirect
3442
)

go.sum

+28-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
github.com/apache/arrow/go/v13 v13.0.0-20230622042343-ec413b7763fe h1:pvUEAu13JR1Apj8dYI73iMNEvBNfKWiCFaR0UAUUDtA=
2+
github.com/apache/arrow/go/v13 v13.0.0-20230622042343-ec413b7763fe/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
13
github.com/avast/retry-go/v4 v4.3.4 h1:pHLkL7jvCvP317I8Ge+Km2Yhntv3SdkJm7uekkqbKhM=
24
github.com/avast/retry-go/v4 v4.3.4/go.mod h1:rv+Nla6Vk3/ilU0H51VHddWHiwimzX66yZ0JT6T+UvE=
35
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
@@ -6,23 +8,36 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
68
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
79
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
810
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
11+
github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA=
12+
github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
913
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
1014
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
1115
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
1216
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
17+
github.com/google/flatbuffers v23.1.21+incompatible h1:bUqzx/MXCDxuS0hRJL2EfjyZL3uQrPbMocUa8zGqsTA=
18+
github.com/google/flatbuffers v23.1.21+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
1319
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
1420
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
1521
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
22+
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
1623
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
17-
github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40=
24+
github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw=
25+
github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4=
26+
github.com/klauspost/cpuid/v2 v2.2.3 h1:sxCkb+qR91z4vsqw4vGGZlDgPz3G7gjaLyK3V8y70BU=
27+
github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
1828
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
29+
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
30+
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
1931
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
32+
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
2033
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
2134
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
2235
github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU=
2336
github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
2437
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ=
2538
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw=
39+
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
40+
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
2641
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
2742
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
2843
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@@ -45,20 +60,32 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
4560
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
4661
github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw=
4762
github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
63+
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
64+
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
65+
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
4866
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc=
4967
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
68+
golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=
69+
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
5070
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
5171
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
72+
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
5273
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
5374
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
75+
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
5476
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
5577
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
5678
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
5779
golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw=
5880
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
5981
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
6082
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
83+
golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM=
84+
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
6185
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
86+
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
87+
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
88+
gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o=
6289
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc h1:XSJ8Vk1SWuNr8S18z1NZSziL0CPIXLCCMDOEFtHBOFc=
6390
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
6491
google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag=

managedplugin/logging.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package managedplugin
22

33
import "github.com/rs/zerolog"
44

5-
func JSONToLog(l zerolog.Logger, msg map[string]any) {
5+
func (c *Client) jsonToLog(l zerolog.Logger, msg map[string]any) {
66
level := msg["level"]
77
delete(msg, "level")
88
switch level {
@@ -14,8 +14,10 @@ func JSONToLog(l zerolog.Logger, msg map[string]any) {
1414
l.Info().Fields(msg).Msg("")
1515
case "warn":
1616
l.Warn().Fields(msg).Msg("")
17+
c.metrics.incrementWarnings()
1718
case "error":
1819
l.Error().Fields(msg).Msg("")
20+
c.metrics.incrementErrors()
1921
default:
2022
l.Error().Fields(msg).Msg("unknown level")
2123
}

managedplugin/metrics.go

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package managedplugin
2+
3+
import "sync/atomic"
4+
5+
type Metrics struct {
6+
Errors uint64
7+
Warnings uint64
8+
}
9+
10+
func (m *Metrics) incrementErrors() {
11+
atomic.AddUint64(&m.Errors, 1)
12+
}
13+
14+
func (m *Metrics) incrementWarnings() {
15+
atomic.AddUint64(&m.Warnings, 1)
16+
}

managedplugin/plugin.go

+11-5
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ type Client struct {
6262
Conn *grpc.ClientConn
6363
config Config
6464
noSentry bool
65+
metrics *Metrics
6566
}
6667

6768
type Option func(*Client)
@@ -124,6 +125,7 @@ func NewClient(ctx context.Context, typ PluginType, config Config, opts ...Optio
124125
directory: defaultDownloadDir,
125126
wg: &sync.WaitGroup{},
126127
config: config,
128+
metrics: &Metrics{},
127129
}
128130
for _, opt := range opts {
129131
opt(&c)
@@ -164,6 +166,10 @@ func NewClient(ctx context.Context, typ PluginType, config Config, opts ...Optio
164166
return &c, nil
165167
}
166168

169+
func (c *Client) Metrics() Metrics {
170+
return *c.metrics
171+
}
172+
167173
func (c *Client) startLocal(ctx context.Context, path string) error {
168174
c.grpcSocketName = GenerateRandomUnixSocketName()
169175
// spawn the plugin first and then connect
@@ -180,7 +186,7 @@ func (c *Client) startLocal(ctx context.Context, path string) error {
180186
cmd.Stderr = os.Stderr
181187
cmd.SysProcAttr = getSysProcAttr()
182188
if err := cmd.Start(); err != nil {
183-
return fmt.Errorf("failed to start destination plugin %s: %w", path, err)
189+
return fmt.Errorf("failed to start plugin %s: %w", path, err)
184190
}
185191

186192
c.cmd = cmd
@@ -195,18 +201,18 @@ func (c *Client) startLocal(ctx context.Context, path string) error {
195201
break
196202
}
197203
if errors.Is(err, ErrLogLineToLong) {
198-
c.logger.Info().Str("line", string(line)).Msg("truncated destination plugin log line")
204+
c.logger.Info().Str("line", string(line)).Msg("truncated plugin log line")
199205
continue
200206
}
201207
if err != nil {
202-
c.logger.Err(err).Msg("failed to read log line from destination plugin")
208+
c.logger.Err(err).Msg("failed to read log line from plugin")
203209
break
204210
}
205211
var structuredLogLine map[string]any
206212
if err := json.Unmarshal(line, &structuredLogLine); err != nil {
207-
c.logger.Err(err).Str("line", string(line)).Msg("failed to unmarshal log line from destination plugin")
213+
c.logger.Err(err).Str("line", string(line)).Msg("failed to unmarshal log line from plugin")
208214
} else {
209-
JSONToLog(c.logger, structuredLogLine)
215+
c.jsonToLog(c.logger, structuredLogLine)
210216
}
211217
}
212218
}()

pb/discovery/v1/discovery.pb.go

+3-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pb/plugin/v3/arrow.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package plugin
2+
3+
import (
4+
"bytes"
5+
6+
"github.com/apache/arrow/go/v13/arrow"
7+
"github.com/apache/arrow/go/v13/arrow/ipc"
8+
)
9+
10+
func SchemaToBytes(sc *arrow.Schema) ([]byte, error) {
11+
var buf bytes.Buffer
12+
wr := ipc.NewWriter(&buf, ipc.WithSchema(sc))
13+
if err := wr.Close(); err != nil {
14+
return nil, err
15+
}
16+
return buf.Bytes(), nil
17+
}
18+
19+
func NewSchemaFromBytes(b []byte) (*arrow.Schema, error) {
20+
rdr, err := ipc.NewReader(bytes.NewReader(b))
21+
if err != nil {
22+
return nil, err
23+
}
24+
return rdr.Schema(), nil
25+
}
26+
27+
func RecordToBytes(record arrow.Record) ([]byte, error) {
28+
var buf bytes.Buffer
29+
wr := ipc.NewWriter(&buf, ipc.WithSchema(record.Schema()))
30+
if err := wr.Write(record); err != nil {
31+
return nil, err
32+
}
33+
if err := wr.Close(); err != nil {
34+
return nil, err
35+
}
36+
return buf.Bytes(), nil
37+
}
38+
39+
func NewRecordFromBytes(b []byte) (arrow.Record, error) {
40+
rdr, err := ipc.NewReader(bytes.NewReader(b))
41+
if err != nil {
42+
return nil, err
43+
}
44+
for rdr.Next() {
45+
rec := rdr.Record()
46+
rec.Retain()
47+
return rec, nil
48+
}
49+
return nil, nil
50+
}

pb/plugin/v3/arrow_test.go

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package plugin
2+
3+
import (
4+
"testing"
5+
6+
"github.com/apache/arrow/go/v13/arrow"
7+
"github.com/apache/arrow/go/v13/arrow/array"
8+
"github.com/apache/arrow/go/v13/arrow/memory"
9+
)
10+
11+
func TestSchemaRoundTrip(t *testing.T) {
12+
md := arrow.NewMetadata([]string{"foo", "bar"}, []string{"baz", "quux"})
13+
sc := arrow.NewSchema([]arrow.Field{
14+
{Name: "a", Type: arrow.PrimitiveTypes.Int64},
15+
{Name: "b", Type: arrow.PrimitiveTypes.Float64},
16+
}, &md)
17+
b, err := SchemaToBytes(sc)
18+
if err != nil {
19+
t.Fatal(err)
20+
}
21+
sc2, err := NewSchemaFromBytes(b)
22+
if err != nil {
23+
t.Fatal(err)
24+
}
25+
if !sc.Equal(sc2) {
26+
t.Errorf("expected %v, got %v", sc, sc2)
27+
}
28+
}
29+
30+
func TestRecordRoundTrip(t *testing.T) {
31+
md := arrow.NewMetadata([]string{"foo", "bar"}, []string{"baz", "quux"})
32+
sc := arrow.NewSchema([]arrow.Field{
33+
{Name: "a", Type: arrow.PrimitiveTypes.Int64},
34+
{Name: "b", Type: arrow.PrimitiveTypes.Float64},
35+
}, &md)
36+
bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc)
37+
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
38+
bldr.Field(1).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2, 3.3}, nil)
39+
rec := bldr.NewRecord()
40+
defer rec.Release()
41+
42+
b, err := RecordToBytes(rec)
43+
if err != nil {
44+
t.Fatal(err)
45+
}
46+
rec2, err := NewRecordFromBytes(b)
47+
if err != nil {
48+
t.Fatal(err)
49+
}
50+
if !array.RecordEqual(rec, rec2) {
51+
t.Errorf("expected %v, got %v", rec, rec2)
52+
}
53+
}

0 commit comments

Comments
 (0)