Skip to content

Commit 7a908d2

Browse files
authored
Merge pull request #18 from bisohns/feature/add-websocket-server
Feature/add websocket server
2 parents 0956734 + 41103b5 commit 7a908d2

File tree

17 files changed

+372
-50
lines changed

17 files changed

+372
-50
lines changed

cmd/api.go

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
Copyright © 2021 Bisohns
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package cmd
17+
18+
import (
19+
"encoding/json"
20+
"fmt"
21+
"net/http"
22+
"os"
23+
"strconv"
24+
"strings"
25+
"sync"
26+
"time"
27+
28+
"github.com/bisohns/saido/config"
29+
"github.com/bisohns/saido/driver"
30+
"github.com/bisohns/saido/inspector"
31+
"github.com/gorilla/handlers"
32+
"github.com/gorilla/websocket"
33+
log "github.com/sirupsen/logrus"
34+
"github.com/spf13/cobra"
35+
)
36+
37+
const (
38+
socketBufferSize = 1042
39+
messageBufferSize = 256
40+
)
41+
42+
var (
43+
port string
44+
server = http.NewServeMux()
45+
upgrader = &websocket.Upgrader{
46+
ReadBufferSize: socketBufferSize,
47+
WriteBufferSize: socketBufferSize,
48+
CheckOrigin: func(r *http.Request) bool {
49+
return true
50+
}}
51+
)
52+
53+
type FullMessage struct {
54+
Error bool
55+
Message interface{}
56+
}
57+
58+
type Message struct {
59+
Host string
60+
Name string
61+
Data interface{}
62+
}
63+
64+
type Client struct {
65+
Socket *websocket.Conn
66+
Send chan *FullMessage
67+
}
68+
69+
// Write to websocket
70+
func (client *Client) Write() {
71+
defer client.Socket.Close()
72+
var err error
73+
for msg := range client.Send {
74+
err = client.Socket.WriteJSON(msg)
75+
if err != nil {
76+
log.Error("Error inside client write ", err)
77+
}
78+
}
79+
}
80+
81+
type Hosts struct {
82+
Config *config.Config
83+
// Connections : hostname mapped to connection instances to reuse
84+
// across metrics
85+
mu sync.Mutex
86+
Drivers map[string]*driver.Driver
87+
Client chan *Client
88+
Start chan bool
89+
}
90+
91+
func (hosts *Hosts) getDriver(address string) *driver.Driver {
92+
hosts.mu.Lock()
93+
defer hosts.mu.Unlock()
94+
return hosts.Drivers[address]
95+
}
96+
97+
func (hosts *Hosts) resetDriver(host config.Host) {
98+
hosts.mu.Lock()
99+
defer hosts.mu.Unlock()
100+
hostDriver := host.Connection.ToDriver()
101+
hosts.Drivers[host.Address] = &hostDriver
102+
}
103+
104+
func (hosts *Hosts) sendMetric(host config.Host, client *Client) {
105+
if hosts.getDriver(host.Address) == nil {
106+
hosts.resetDriver(host)
107+
}
108+
for _, metric := range config.GetDashboardInfoConfig(hosts.Config).Metrics {
109+
initializedMetric, err := inspector.Init(metric, hosts.getDriver(host.Address))
110+
data, err := initializedMetric.Execute()
111+
if err == nil {
112+
var unmarsh interface{}
113+
json.Unmarshal(data, &unmarsh)
114+
message := &FullMessage{
115+
Message: Message{
116+
Host: host.Address,
117+
Name: metric,
118+
Data: unmarsh,
119+
},
120+
Error: false,
121+
}
122+
client.Send <- message
123+
} else {
124+
// check for error 127 which means command was not found
125+
var errorContent string
126+
if !strings.Contains(fmt.Sprintf("%s", err), "127") {
127+
errorContent = fmt.Sprintf("Could not retrieve metric %s from driver %s with error %s, resetting connection...", metric, host.Address, err)
128+
} else {
129+
errorContent = fmt.Sprintf("Command %s not found on driver %s", metric, host.Address)
130+
}
131+
log.Error(errorContent)
132+
hosts.resetDriver(host)
133+
message := &FullMessage{
134+
Message: errorContent,
135+
Error: true,
136+
}
137+
client.Send <- message
138+
}
139+
}
140+
}
141+
142+
func (hosts *Hosts) Run() {
143+
dashboardInfo := config.GetDashboardInfoConfig(hosts.Config)
144+
log.Debug("In Running")
145+
for {
146+
select {
147+
case client := <-hosts.Client:
148+
for {
149+
for _, host := range dashboardInfo.Hosts {
150+
go hosts.sendMetric(host, client)
151+
}
152+
log.Infof("Delaying for %d seconds", dashboardInfo.PollInterval)
153+
time.Sleep(time.Duration(dashboardInfo.PollInterval) * time.Second)
154+
}
155+
}
156+
}
157+
158+
}
159+
160+
func (hosts *Hosts) ServeHTTP(w http.ResponseWriter, req *http.Request) {
161+
socket, err := upgrader.Upgrade(w, req, nil)
162+
if err != nil {
163+
log.Fatal(err)
164+
return
165+
}
166+
client := &Client{
167+
Socket: socket,
168+
Send: make(chan *FullMessage, messageBufferSize),
169+
}
170+
hosts.Client <- client
171+
client.Write()
172+
}
173+
174+
func newHosts(cfg *config.Config) *Hosts {
175+
hosts := &Hosts{
176+
Config: cfg,
177+
Drivers: make(map[string]*driver.Driver),
178+
Client: make(chan *Client),
179+
}
180+
return hosts
181+
}
182+
183+
func setHostHandler(w http.ResponseWriter, r *http.Request) {
184+
b, _ := json.Marshal("Hello World")
185+
w.Write(b)
186+
}
187+
188+
var apiCmd = &cobra.Command{
189+
Use: "api",
190+
Short: "host saido as an API on a PORT env variable, fallback to set argument",
191+
Long: ``,
192+
Run: func(cmd *cobra.Command, args []string) {
193+
// server.HandleFunc("/set-hosts", SetHostHandler)
194+
// FIXME: set up cfg using set-hosts endpoint
195+
hosts := newHosts(cfg)
196+
server.HandleFunc("/set-hosts", setHostHandler)
197+
server.Handle("/metrics", hosts)
198+
log.Info("listening on :", port)
199+
_, err := strconv.Atoi(port)
200+
if err != nil {
201+
log.Fatal(err)
202+
}
203+
go hosts.Run()
204+
loggedRouters := handlers.LoggingHandler(os.Stdout, server)
205+
if err := http.ListenAndServe(":"+port, loggedRouters); err != nil {
206+
log.Fatal(err)
207+
}
208+
},
209+
}
210+
211+
func init() {
212+
apiCmd.Flags().StringVarP(&port, "port", "p", "3000", "Port to run application server on")
213+
rootCmd.AddCommand(apiCmd)
214+
}

config.example.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ hosts:
2929

3030
metrics:
3131
- memory
32-
- cpu
32+
- tcp
33+
poll-interval: 30

config/config.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,18 @@ import (
44
"fmt"
55
"io/ioutil"
66

7+
"github.com/bisohns/saido/driver"
78
"github.com/mitchellh/mapstructure"
89
log "github.com/sirupsen/logrus"
910

1011
"gopkg.in/yaml.v2"
1112
)
1213

1314
type DashboardInfo struct {
14-
Hosts []Host
15-
Metrics []string
16-
Title string
15+
Hosts []Host
16+
Metrics []string
17+
Title string
18+
PollInterval int
1719
}
1820

1921
type Connection struct {
@@ -22,6 +24,22 @@ type Connection struct {
2224
Password string `mapstructure:"password"`
2325
PrivateKeyPath string `mapstructure:"private_key_path"`
2426
Port int32 `mapstructure:"port"`
27+
Host string
28+
}
29+
30+
func (conn *Connection) ToDriver() driver.Driver {
31+
switch conn.Type {
32+
case "ssh":
33+
return &driver.SSH{
34+
User: conn.Username,
35+
Host: conn.Host,
36+
Port: int(conn.Port),
37+
KeyFile: conn.PrivateKeyPath,
38+
CheckKnownHosts: false,
39+
}
40+
default:
41+
return &driver.Local{}
42+
}
2543
}
2644

2745
type Host struct {
@@ -31,9 +49,10 @@ type Host struct {
3149
}
3250

3351
type Config struct {
34-
Hosts map[interface{}]interface{} `yaml:"hosts"`
35-
Metrics []string `yaml:"metrics"`
36-
Title string `yaml:"title"`
52+
Hosts map[interface{}]interface{} `yaml:"hosts"`
53+
Metrics []string `yaml:"metrics"`
54+
Title string `yaml:"title"`
55+
PollInterval int `yaml:"poll-interval"`
3756
}
3857

3958
func LoadConfig(configPath string) *Config {
@@ -62,6 +81,7 @@ func GetDashboardInfoConfig(config *Config) *DashboardInfo {
6281
for _, host := range dashboardInfo.Hosts {
6382
log.Debugf("%s: %v", host.Address, host.Connection)
6483
}
84+
dashboardInfo.PollInterval = config.PollInterval
6585
return dashboardInfo
6686
}
6787

@@ -107,6 +127,7 @@ func parseConfig(name string, host string, group map[interface{}]interface{}, cu
107127
}
108128

109129
if !isParent {
130+
currentConn.Host = host
110131
newHost := Host{
111132
Address: host,
112133
Connection: currentConn,

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ module github.com/bisohns/saido
33
go 1.14
44

55
require (
6+
github.com/gorilla/handlers v1.5.1 // indirect
7+
github.com/gorilla/websocket v1.5.0 // indirect
68
github.com/kr/pretty v0.2.0 // indirect
79
github.com/melbahja/goph v1.2.1
810
github.com/mitchellh/mapstructure v1.4.3

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
6767
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
6868
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
6969
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
70+
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
71+
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
7072
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
7173
github.com/gdamore/encoding v1.0.0 h1:+7OoQ1Bc6eTm5niUzBa0Ctsh6JbMW6Ra+YNuAtDBdko=
7274
github.com/gdamore/encoding v1.0.0/go.mod h1:alR0ol34c49FCSBLjhosxzcPHQbf2trDkoo5dl+VrEg=
@@ -140,6 +142,10 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
140142
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
141143
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
142144
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
145+
github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH4=
146+
github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q=
147+
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
148+
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
143149
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
144150
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
145151
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=

inspector/custom.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package inspector
22

33
import (
4+
"encoding/json"
45
"errors"
56
"fmt"
67

@@ -22,7 +23,7 @@ type Custom struct {
2223

2324
// Parse : run custom parsing on output of the command
2425
func (i *Custom) Parse(output string) {
25-
log.Debug("Parsing ouput string in Custom inspector")
26+
log.Debug("Parsing output string in Custom inspector")
2627
i.Values = i.createMetric(output)
2728
}
2829

@@ -44,11 +45,13 @@ func (i Custom) driverExec() driver.Command {
4445
return (*i.Driver).RunCommand
4546
}
4647

47-
func (i *Custom) Execute() {
48+
func (i *Custom) Execute() ([]byte, error) {
4849
output, err := i.driverExec()(i.Command)
4950
if err == nil {
5051
i.Parse(output)
52+
return json.Marshal(i.Values)
5153
}
54+
return []byte(""), err
5255
}
5356

5457
// NewCustom : Initialize a new Custom instance

0 commit comments

Comments
 (0)