Skip to content

Commit 996c281

Browse files
committed
Added first proto file layout. Added first implementation of golang sdk layout. Implemented plugin system
1 parent c1059c5 commit 996c281

File tree

8 files changed

+734
-1
lines changed

8 files changed

+734
-1
lines changed

README.md

-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
11
# Gaia
2-
Build powerful pipelines with pure Go

plugin/grpc.go

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package plugin
2+
3+
import (
4+
"context"
5+
6+
plugin "github.com/hashicorp/go-plugin"
7+
"github.com/michelvocks/gaia/proto"
8+
"google.golang.org/grpc"
9+
)
10+
11+
// PluginGRPC is the Gaia plugin interface used for communication
12+
// with the plugin.
13+
type PluginGRPC interface {
14+
GetJobs() (*proto.Plugin_GetJobsClient, error)
15+
ExecuteJob(job *proto.Job) (*proto.Empty, error)
16+
}
17+
18+
// GRPCClient represents gRPC client
19+
type GRPCClient struct {
20+
client proto.PluginClient
21+
}
22+
23+
// PluginGRPCImpl represents the plugin implementation on client side.
24+
type PluginGRPCImpl struct {
25+
Impl PluginGRPC
26+
plugin.NetRPCUnsupportedPlugin
27+
}
28+
29+
// GRPCServer is needed here to implement hashicorp
30+
// plugin.Plugin interface. Real implementation is
31+
// in the plugin(s).
32+
func (p *PluginGRPCImpl) GRPCServer(s *grpc.Server) error {
33+
// Real implementation defined in plugin
34+
return nil
35+
}
36+
37+
// GRPCClient is the passing method for the gRPC client.
38+
func (p *PluginGRPCImpl) GRPCClient(c *grpc.ClientConn) (interface{}, error) {
39+
return &GRPCClient{client: proto.NewPluginClient(c)}, nil
40+
}
41+
42+
// GetJobs requests all jobs from the plugin.
43+
// We get a stream of proto.Job back.
44+
func (m *GRPCClient) GetJobs() (proto.Plugin_GetJobsClient, error) {
45+
return m.client.GetJobs(context.Background(), &proto.Empty{})
46+
}
47+
48+
// ExecuteJob triggers the execution of the given job in the plugin.
49+
func (m *GRPCClient) ExecuteJob(job *proto.Job) (*proto.JobResult, error) {
50+
return m.client.ExecuteJob(context.Background(), job)
51+
}

plugin/plugin.go

+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package plugin
2+
3+
import (
4+
"errors"
5+
"os/exec"
6+
7+
plugin "github.com/hashicorp/go-plugin"
8+
"github.com/michelvocks/gaia/proto"
9+
)
10+
11+
const (
12+
pluginMapKey = "plugin"
13+
)
14+
15+
var handshake = plugin.HandshakeConfig{
16+
ProtocolVersion: 1,
17+
MagicCookieKey: "GAIA_PLUGIN",
18+
// This cookie should never be changed again
19+
MagicCookieValue: "FdXjW27mN6XuG2zDBP4LixXUwDAGCEkidxwqBGYpUhxiWHzctATYZvpz4ZJdALmh",
20+
}
21+
22+
var pluginMap = map[string]plugin.Plugin{
23+
pluginMapKey: &PluginGRPCImpl{},
24+
}
25+
26+
// Plugin represents a single plugin instance which uses gRPC
27+
// to connect to exactly one plugin.
28+
type Plugin struct {
29+
// Client instance used to open gRPC connections.
30+
client *plugin.Client
31+
32+
// Interface to the connected plugin.
33+
pluginConn PluginGRPC
34+
}
35+
36+
// NewPlugin creates a new instance of Plugin.
37+
// One Plugin instance represents one connection to a plugin.
38+
func NewPlugin(c *exec.Cmd) *Plugin {
39+
// Allocate
40+
p := &Plugin{}
41+
42+
// Get new client
43+
p.client = plugin.NewClient(&plugin.ClientConfig{
44+
HandshakeConfig: handshake,
45+
Plugins: pluginMap,
46+
Cmd: c,
47+
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
48+
})
49+
50+
return p
51+
}
52+
53+
// Connect starts the plugin, initiates the gRPC connection and looks up the plugin.
54+
// It's up to the caller to call plugin.Close to shutdown the plugin
55+
// and close the gRPC connection.
56+
func (p *Plugin) Connect() error {
57+
// Connect via gRPC
58+
gRPCClient, err := p.client.Client()
59+
if err != nil {
60+
return err
61+
}
62+
63+
// Request the plugin
64+
raw, err := gRPCClient.Dispense(pluginMapKey)
65+
if err != nil {
66+
return err
67+
}
68+
69+
// Convert plugin to interface
70+
if pC, ok := raw.(PluginGRPC); ok {
71+
p.pluginConn = pC
72+
return nil
73+
}
74+
75+
return errors.New("plugin is not compatible with Gaia plugin interface")
76+
}
77+
78+
// Execute triggers the execution of one single job
79+
// for the given plugin.
80+
func (p *Plugin) Execute(j *proto.Job) error {
81+
_, err := p.pluginConn.ExecuteJob(j)
82+
return err
83+
}
84+
85+
// Close shutdown the plugin and kills the gRPC connection.
86+
// Remember to call this when you call plugin.Connect.
87+
func (p *Plugin) Close() {
88+
// We start the kill command in a goroutine because kill
89+
// is blocking until the subprocess successfully exits.
90+
// The user should not notice the stopping process.
91+
go func() {
92+
p.client.Kill()
93+
}()
94+
}

proto/README.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# proto
2+
This folder contains gRPC proto files and their generated language defintions.
3+
4+
You can use protoc to compile these on your own:
5+
`protoc -I ./ ./plugin.proto --go_out=plugins=grpc:./`

0 commit comments

Comments
 (0)