Skip to content

Commit b3ecea8

Browse files
Migrated Workflows From In Memory to Db (#33)
* Migrated workflows from in memory to db * Convert row-related defer statements to manual resource management
1 parent 1eef609 commit b3ecea8

File tree

10 files changed

+273
-158
lines changed

10 files changed

+273
-158
lines changed

data/scripts/def.sql

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,15 @@ CREATE TABLE IF NOT EXISTS api_keys (
3333
FOREIGN KEY(user_ID) REFERENCES users(id)
3434
);
3535

36+
CREATE TABLE IF NOT EXISTS workflows (
37+
id INTEGER NOT NULL PRIMARY KEY,
38+
topic_name TEXT NOT NULL,
39+
offset INTEGER NOT NULL,
40+
function_name TEXT NOT NULL,
41+
enabled BOOLEAN NOT NULL DEFAULT TRUE,
42+
sink_url TEXT NOT NULL,
43+
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
44+
last_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP
45+
);
46+
47+

persistence/apiKeys.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,16 @@ func (dstore *Datastore) GetAPIKeysByUserID(userID int) ([]ApiKey, error) {
6666
if err != nil {
6767
return nil, err
6868
}
69-
defer rows.Close()
70-
7169
var apiKeys []ApiKey
7270
for rows.Next() {
7371
var apiKey ApiKey
7472
err := rows.Scan(&apiKey.Id, &apiKey.Key, &apiKey.CreatedAt)
7573
if err != nil {
74+
rows.Close()
7675
return nil, err
7776
}
7877
apiKeys = append(apiKeys, apiKey)
7978
}
79+
rows.Close()
8080
return apiKeys, nil
8181
}

persistence/database.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func setupConnection(isDevMode bool) (*Datastore, error) {
5454
var db_file string
5555

5656
if isDevMode {
57-
db_file = ":memory:"
57+
db_file = "file::memory:?cache=shared"
5858
} else {
5959
db_file = os.Getenv("DB_FILE_LOCATION")
6060
}

persistence/session.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,14 +216,15 @@ func (dstore *Datastore) GcSessions(maxlifetime int64) error {
216216
deleteQuery := "DELETE FROM sessions ID=?;"
217217
dstore.RWMutex.Lock()
218218
rows, err := dstore.db.Query(selectQuery)
219+
dstore.RWMutex.Unlock()
219220
if err != nil {
220221
return err
221222
}
222-
defer rows.Close()
223223
for rows.Next() {
224224
var sdb SessionDB
225225
err = rows.Scan(&sdb.ID, &sdb.TimeAccessed)
226226
if err != nil {
227+
rows.Close()
227228
return err
228229
}
229230
if (sdb.TimeAccessed.Unix() + maxlifetime) < time.Now().Unix() {
@@ -233,7 +234,7 @@ func (dstore *Datastore) GcSessions(maxlifetime int64) error {
233234
}
234235
}
235236
}
236-
dstore.RWMutex.Unlock()
237+
rows.Close()
237238
err = rows.Err()
238239
if err != nil {
239240
return err

persistence/stickyConnection.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,16 @@ func (dstore *Datastore) GetAllEndpoints(userId int) ([]Endpoint, error) {
5252
if err != nil {
5353
return stickey_connections, err
5454
}
55-
defer rows.Close()
56-
5755
for rows.Next() {
5856
var stickey_connection Endpoint
5957
err := rows.Scan(&stickey_connection.RouteID, &stickey_connection.Security_key, &stickey_connection.TopicName, &stickey_connection.LastModified)
6058
if err != nil {
59+
rows.Close()
6160
return stickey_connections, err
6261
}
6362
stickey_connections = append(stickey_connections, stickey_connection)
6463
}
64+
rows.Close()
6565
return stickey_connections, nil
6666
}
6767

persistence/user.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,16 @@ func (dstore *Datastore) getAllUsers() ([]User, error) {
3232
if err != nil {
3333
return users, err
3434
}
35-
defer rows.Close()
3635
for rows.Next() {
3736
var user User
3837
err := rows.Scan(&user.ID, &user.Email, &user.AuthType, &user.CreatedAt, &user.LastModified)
3938
if err != nil {
39+
rows.Close()
4040
return users, err
4141
}
4242
users = append(users, user)
4343
}
44+
rows.Close()
4445
return users, nil
4546
}
4647

persistence/workflow.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package persistence
2+
3+
import (
4+
"errors"
5+
"reflect"
6+
"time"
7+
)
8+
9+
type Workflow struct {
10+
Id int `json:"id"`
11+
TopicName string `json:"topicName"`
12+
Offset int `json:"offset"`
13+
FunctionName string `json:"functionName"`
14+
Enabled bool `json:"enabled"`
15+
SinkURL string `json:"sinkURL"`
16+
LastModified time.Time `json:"lastModified,omitempty"`
17+
}
18+
19+
type funcMap map[string]interface{}
20+
21+
var FUNC_MAP = funcMap{}
22+
23+
func (workflow Workflow) Call(params ...interface{}) (result interface{}, err error) {
24+
f := reflect.ValueOf(FUNC_MAP[workflow.FunctionName])
25+
if len(params) != f.Type().NumIn() {
26+
err = errors.New("the number of params is out of index")
27+
return
28+
}
29+
in := make([]reflect.Value, len(params))
30+
for k, param := range params {
31+
in[k] = reflect.ValueOf(param)
32+
}
33+
res := f.Call(in)
34+
result = res[0].Interface()
35+
return
36+
}
37+
38+
func (dstore *Datastore) GetWorkflow(id int) (Workflow, error) {
39+
selectQuery := "SELECT id, topic_name, offset, function_name, enabled, sink_url, last_modified FROM workflows WHERE id=?;"
40+
dstore.RWMutex.RLock()
41+
row := dstore.db.QueryRow(selectQuery, id)
42+
dstore.RWMutex.RUnlock()
43+
44+
var workflow Workflow
45+
46+
err := row.Scan(&workflow.Id, &workflow.TopicName, &workflow.Offset, &workflow.FunctionName, &workflow.Enabled, &workflow.SinkURL, &workflow.LastModified)
47+
if err != nil {
48+
return workflow, err
49+
}
50+
return workflow, nil
51+
}
52+
53+
func (dstore *Datastore) GetWorkflows() ([]Workflow, error) {
54+
workflows := []Workflow{}
55+
selectQuery := "SELECT id, topic_name, offset, function_name, enabled, sink_url, last_modified FROM workflows;"
56+
dstore.RWMutex.RLock()
57+
rows, err := dstore.db.Query(selectQuery)
58+
dstore.RWMutex.RUnlock()
59+
if err != nil {
60+
return workflows, err
61+
}
62+
for rows.Next() {
63+
var workflow Workflow
64+
err := rows.Scan(&workflow.Id, &workflow.TopicName, &workflow.Offset, &workflow.FunctionName, &workflow.Enabled, &workflow.SinkURL, &workflow.LastModified)
65+
if err != nil {
66+
rows.Close()
67+
return workflows, err
68+
}
69+
workflows = append(workflows, workflow)
70+
}
71+
rows.Close()
72+
err = rows.Err()
73+
if err != nil {
74+
return workflows, err
75+
}
76+
return workflows, nil
77+
}
78+
79+
func (dstore *Datastore) GetEnabledWorkflows() ([]Workflow, error) {
80+
workflows := []Workflow{}
81+
selectQuery := "SELECT id, topic_name, offset, function_name, enabled, sink_url, last_modified FROM workflows WHERE enabled=true;"
82+
dstore.RWMutex.RLock()
83+
rows, err := dstore.db.Query(selectQuery)
84+
dstore.RWMutex.RUnlock()
85+
if err != nil {
86+
return workflows, err
87+
}
88+
for rows.Next() {
89+
var workflow Workflow
90+
err := rows.Scan(&workflow.Id, &workflow.TopicName, &workflow.Offset, &workflow.FunctionName, &workflow.Enabled, &workflow.SinkURL, &workflow.LastModified)
91+
if err != nil {
92+
rows.Close()
93+
return workflows, err
94+
}
95+
workflows = append(workflows, workflow)
96+
}
97+
rows.Close()
98+
return workflows, nil
99+
}
100+
101+
func (dstore *Datastore) InsertWorkflow(workflow Workflow) (time.Time, error) {
102+
insertQuery := `
103+
INSERT INTO workflows(id, topic_name, offset, function_name, sink_url, last_modified)
104+
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
105+
RETURNING id, last_modified;
106+
`
107+
108+
dstore.RWMutex.Lock()
109+
row := dstore.db.QueryRow(insertQuery, workflow.Id, workflow.TopicName, workflow.Offset, workflow.FunctionName, workflow.SinkURL)
110+
dstore.RWMutex.Unlock()
111+
112+
var insertedID int
113+
var lastModified time.Time
114+
115+
err := row.Scan(&insertedID, &lastModified)
116+
if err != nil {
117+
return time.Time{}, err
118+
}
119+
120+
return lastModified, nil
121+
}
122+
123+
func (dstore *Datastore) DeleteWorkflow(id int) (int, error) {
124+
deleteQuery := "DELETE FROM workflows WHERE id=?"
125+
dstore.RWMutex.Lock()
126+
res, err := dstore.db.Exec(deleteQuery, id)
127+
dstore.RWMutex.Unlock()
128+
if err != nil {
129+
return 0, err
130+
}
131+
rowsDeleted, err := res.RowsAffected()
132+
if err != nil {
133+
return 0, err
134+
}
135+
return int(rowsDeleted), nil
136+
}
137+
138+
func (dstore *Datastore) UpdateWorkflow(id int) (Workflow, error) {
139+
updateQuery := `
140+
UPDATE workflows
141+
SET enabled = NOT enabled, last_modified = CURRENT_TIMESTAMP
142+
WHERE id = ?
143+
RETURNING id, topic_name, offset, function_name, enabled, sink_url, last_modified;
144+
`
145+
146+
dstore.RWMutex.Lock()
147+
row := dstore.db.QueryRow(updateQuery, id)
148+
dstore.RWMutex.Unlock()
149+
var workflow Workflow
150+
err := row.Scan(&workflow.Id, &workflow.TopicName, &workflow.Offset, &workflow.FunctionName, &workflow.Enabled, &workflow.SinkURL, &workflow.LastModified)
151+
if err != nil {
152+
return workflow, err
153+
}
154+
return workflow, err
155+
}

0 commit comments

Comments
 (0)