Skip to content

Commit 09ed646

Browse files
committed
Implement some JANKY AF data access layers
Things I've learned today: * The [sqlx documentation](https://docs.rs/sqlx/0.6.2/sqlx/sqlite/types/index.html#uuid) is *lying* about its `uuid` support. Basically `query_as!` does not ser/deserialize `Uuid` properly in/out of Sqlite with `TEXT` _or_ `BLOB` * There are [no useful examples](launchbadge/sqlx#1014) of doing nested struct queries in sqlx at the moment
1 parent 8c63bf6 commit 09ed646

File tree

8 files changed

+322
-110
lines changed

8 files changed

+322
-110
lines changed

apidocs/api-description.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"openapi":"3.0.0","info":{"description":"Janky Server API defintion\n","version":"1.0.0","title":"Janky APIs","contact":{"email":"[email protected]"},"license":{"name":"AGPL v3.0","url":"https://www.gnu.org/licenses/agpl-3.0.en.html"}},"servers":[{"url":"http://localhost:8000/api/v1","description":"Local dev server (APIv1)"}],"paths":{"/projects/{name}":{"post":{"summary":"Trigger execution for this project","description":null,"parameters":[{"in":"path","name":"name","required":true,"example":"janky","schema":{"type":"string"}}],"responses":{"404":{"summary":"No project configured by that name"},"200":{"summary":"Execution has been triggered"}}}}}}
1+
{"openapi":"3.0.0","info":{"description":"Janky Agent API defintion\n","version":"1.0.0","title":"Janky APIs","contact":{"email":"[email protected]"},"license":{"name":"AGPL v3.0","url":"https://www.gnu.org/licenses/agpl-3.0.en.html"}},"servers":[{"url":"http://localhost:9000/api/v1","description":"Local dev agent (APIv1)"}],"paths":{"/capabilities":{"get":{"summary":"Retrieve a list of capabilities of this agent","description":null,"responses":{"200":{"description":"Getting capabilities","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CapsResponse"}}}}}}},"/execute":{"put":{"summary":"Execute a series of commands on this agent","description":null,"requestBody":{"content":{"application/json":{"schema":{"$ref":"#/components/schemas/CommandRequest"},"example":{"commands":[{"script":"echo \"Hi\""}]}}}},"responses":{"201":{"description":"Successfully accepted the commands for execution","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CommandResponse"}}}},"409":{"description":"Returned when the agent is busy with another series of commands"}}}}},"components":{"schemas":{"CapsResponse":{"type":"object","properties":{"caps":{"type":"array","items":{"$ref":"#/components/schemas/Capability"}}}},"Capability":{"type":"object","properties":{"name":{"type":"string"},"path":{"type":"string"},"data":{"type":"object"}}},"Command":{"type":"object","properties":{"script":{"type":"string","description":"A script that can be exec()'d on the agent"}}},"CommandRequest":{"type":"object","properties":{"commands":{"type":"array","items":{"$ref":"#/components/schemas/Command"}}}},"CommandResponse":{"type":"object","properties":{"uuid":{"type":"string","format":"uuid"},"stream":{"description":"URL to streaming WebSockets logs","type":"string","format":"url"},"task":{"description":"URL to the task metadata","type":"string","format":"url"},"log":{"description":"URL to the raw log of the task run","type":"string","format":"url"}}}}}}

migrations/20230128225903_agent.sql

Lines changed: 0 additions & 9 deletions
This file was deleted.
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
CREATE TABLE runs (
2+
uuid TEXT NOT NULL PRIMARY KEY,
3+
num INTEGER NOT NULL,
4+
status INTEGER NOT NULL,
5+
log_url TEXT NOT NULL,
6+
definition TEXT NOT NULL,
7+
scm_info TEXT NOT NULL,
8+
created_at DATETIME NOT NULL DEFAULT (DATETIME('now')),
9+
FOREIGN KEY(scm_info) REFERENCES scm_info(uuid),
10+
FOREIGN KEY(definition) REFERENCES run_definition(uuid)
11+
);
12+
13+
CREATE TABLE scm_info (
14+
uuid TEXT NOT NULL PRIMARY KEY,
15+
git_url TEXT NOT NULL,
16+
ref TEXT NOT NULL,
17+
created_at DATETIME NOT NULL DEFAULT (DATETIME('now'))
18+
);
19+
20+
CREATE TABLE run_definition (
21+
uuid TEXT NOT NULL PRIMARY KEY,
22+
definition TEXT NOT NULL,
23+
created_at DATETIME NOT NULL DEFAULT (DATETIME('now'))
24+
);

src/agent/main.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,10 @@ async fn worker(receiver: Receiver<Work>) {
123123
while let Ok(work) = receiver.recv().await {
124124
let log_file = std::fs::File::create(&work.log_file).unwrap();
125125
let mut bufw = std::io::BufWriter::new(log_file);
126-
debug!("Starting to execute the commands");
126+
debug!(
127+
"Starting to execute the commands, output in {:?}",
128+
&work.log_file
129+
);
127130
for command in work.command.commands.iter() {
128131
debug!("Command: {:?}", command);
129132
use os_pipe::pipe;

src/server/dao.rs

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* The DAO module contains all the necessary structs for interacting with the database
3+
*/
4+
5+
use chrono::{DateTime, NaiveDateTime, Utc};
6+
use sqlx::{FromRow, SqlitePool};
7+
use url::Url;
8+
use uuid::Uuid;
9+
10+
#[derive(Clone, Debug)]
11+
struct Run {
12+
run: RunRow,
13+
scm_info: ScmInfo,
14+
definition: RunDefinition,
15+
}
16+
17+
#[derive(Clone, Debug)]
18+
struct RunRow {
19+
// Unique identifier for the Run
20+
uuid: String,
21+
// User-identifiable number for the Run, monotonically increasing
22+
num: i64,
23+
// Unix status return code from the run, zero is success
24+
status: i64,
25+
// Globally resolvable URL for fetching raw logs
26+
log_url: String,
27+
definition: String,
28+
scm_info: String,
29+
created_at: NaiveDateTime,
30+
}
31+
32+
/* The basic implementation for Run has all the database access operations
33+
*/
34+
impl Run {
35+
/*
36+
* Create the Run in the database given the appropriate struct
37+
*/
38+
async fn create(run: &Run, pool: &SqlitePool) -> Result<(), sqlx::Error> {
39+
let mut tx = pool.begin().await?;
40+
41+
sqlx::query!(
42+
r#"INSERT INTO scm_info (uuid, git_url, ref, created_at) VALUES (?, ?, ?, ?)"#,
43+
run.scm_info.uuid,
44+
run.scm_info.git_url,
45+
run.scm_info.r#ref,
46+
run.scm_info.created_at
47+
)
48+
.execute(&mut tx)
49+
.await;
50+
51+
sqlx::query!(
52+
r#"INSERT INTO run_definition (uuid, definition, created_at) VALUES (?, ?, ?)"#,
53+
run.definition.uuid,
54+
run.definition.definition,
55+
run.definition.created_at,
56+
)
57+
.execute(&mut tx)
58+
.await?;
59+
60+
sqlx::query!(
61+
"INSERT INTO runs (uuid, num, status, log_url, definition, scm_info) VALUES ($1, $2, $3, $4, $5, $6)",
62+
run.run.uuid,
63+
run.run.num,
64+
run.run.status,
65+
run.run.log_url,
66+
run.definition.uuid,
67+
run.scm_info.uuid,
68+
)
69+
.execute(&mut tx)
70+
.await?;
71+
tx.commit().await
72+
}
73+
74+
/*
75+
* Allow finding a Run by the given Uuid
76+
*/
77+
async fn find_by(uuid: &str, pool: &SqlitePool) -> Result<Run, sqlx::Error> {
78+
let row = sqlx::query_as!(RunRow, "SELECT * FROM runs WHERE uuid = ?", uuid)
79+
.fetch_one(pool)
80+
.await?;
81+
let scm_info = sqlx::query_as!(
82+
ScmInfo,
83+
"SELECT * FROM scm_info WHERE uuid = ?",
84+
row.scm_info
85+
)
86+
.fetch_one(pool)
87+
.await?;
88+
let definition = sqlx::query_as!(
89+
RunDefinition,
90+
"SELECT * FROM run_definition WHERE uuid = ?",
91+
row.definition
92+
)
93+
.fetch_one(pool)
94+
.await?;
95+
96+
Ok(Run {
97+
run: row,
98+
scm_info,
99+
definition,
100+
})
101+
}
102+
}
103+
104+
impl Default for Run {
105+
fn default() -> Self {
106+
Self {
107+
run: RunRow::default(),
108+
scm_info: ScmInfo::default(),
109+
definition: RunDefinition::default(),
110+
}
111+
}
112+
}
113+
114+
impl Default for RunRow {
115+
fn default() -> Self {
116+
Self {
117+
uuid: Uuid::new_v4().hyphenated().to_string(),
118+
num: 42,
119+
status: 0,
120+
log_url: "https://example.com/console.log".into(),
121+
definition: Uuid::new_v4().hyphenated().to_string(),
122+
scm_info: Uuid::new_v4().hyphenated().to_string(),
123+
created_at: Utc::now().naive_utc(),
124+
}
125+
}
126+
}
127+
128+
#[derive(Clone, Debug)]
129+
struct ScmInfo {
130+
uuid: String,
131+
git_url: String,
132+
r#ref: String,
133+
created_at: NaiveDateTime,
134+
}
135+
136+
impl Default for ScmInfo {
137+
fn default() -> Self {
138+
Self {
139+
uuid: Uuid::new_v4().hyphenated().to_string(),
140+
git_url: "https://example.com/some/repo.git".into(),
141+
r#ref: "main".into(),
142+
created_at: Utc::now().naive_utc(),
143+
}
144+
}
145+
}
146+
147+
#[derive(Clone, Debug)]
148+
struct RunDefinition {
149+
uuid: String,
150+
definition: String,
151+
created_at: NaiveDateTime,
152+
}
153+
154+
impl Default for RunDefinition {
155+
fn default() -> Self {
156+
Self {
157+
uuid: Uuid::new_v4().hyphenated().to_string(),
158+
definition: String::new(),
159+
created_at: Utc::now().naive_utc(),
160+
}
161+
}
162+
}
163+
164+
#[cfg(test)]
165+
mod tests {
166+
use super::*;
167+
use sqlx::SqlitePool;
168+
169+
async fn setup_database() -> SqlitePool {
170+
let pool = SqlitePool::connect(":memory:")
171+
.await
172+
.expect("Failed to setup_database()");
173+
sqlx::migrate!()
174+
.run(&pool)
175+
.await
176+
.expect("Failed to run migrations in a test");
177+
pool
178+
}
179+
180+
#[async_std::test]
181+
async fn test_create_a_run() {
182+
pretty_env_logger::try_init();
183+
let pool = setup_database().await;
184+
let run = Run::default();
185+
let result = Run::create(&run, &pool).await.unwrap();
186+
let fetched_run = Run::find_by(&run.run.uuid, &pool).await.unwrap();
187+
assert_eq!(run.run.uuid, fetched_run.run.uuid);
188+
}
189+
}

src/server/main.rs

Lines changed: 7 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ use serde::{Deserialize, Serialize};
1717
use sqlx::SqlitePool;
1818
use url::Url;
1919

20+
mod dao;
21+
mod routes;
22+
2023
#[derive(Clone, Debug)]
2124
pub struct AppState<'a> {
2225
pub db: SqlitePool,
@@ -57,104 +60,6 @@ impl AppState<'_> {
5760
}
5861
}
5962

60-
/**
61-
* The routes module contains all the tide routes and the logic to fulfill the responses for each
62-
* route.
63-
*
64-
* Modules are nested for cleaner organization here
65-
*/
66-
mod routes {
67-
use crate::AppState;
68-
use tide::{Body, Request};
69-
70-
/**
71-
* GET /
72-
*/
73-
pub async fn index(req: Request<AppState<'_>>) -> Result<Body, tide::Error> {
74-
let params = json!({
75-
"page": "home",
76-
"config" : req.state().config,
77-
});
78-
79-
let mut body = req.state().render("index", &params).await?;
80-
body.set_mime("text/html");
81-
Ok(body)
82-
}
83-
84-
pub mod api {
85-
use log::*;
86-
use crate::{AppState, JankyYml, Scm};
87-
use tide::{Request, Response, StatusCode};
88-
89-
/**
90-
* POST /projects/{name}
91-
*/
92-
pub async fn execute_project(req: Request<AppState<'_>>) -> Result<Response, tide::Error> {
93-
let name: String = req.param("name")?.into();
94-
let state = req.state();
95-
96-
if !state.config.has_project(&name) {
97-
debug!("Could not find project named: {}", name);
98-
return Ok(Response::new(StatusCode::NotFound));
99-
}
100-
101-
if let Some(project) = state.config.projects.get(&name) {
102-
match &project.scm {
103-
Scm::GitHub {
104-
owner,
105-
repo,
106-
scm_ref,
107-
} => {
108-
debug!(
109-
"Fetching the file {} from {}/{}",
110-
&project.filename, owner, repo
111-
);
112-
let res = octocrab::instance()
113-
.repos(owner, repo)
114-
.raw_file(
115-
octocrab::params::repos::Commitish(scm_ref.into()),
116-
&project.filename,
117-
)
118-
.await?;
119-
let jankyfile: JankyYml = serde_yaml::from_str(&res.text().await?)?;
120-
debug!("text: {:?}", jankyfile);
121-
122-
for agent in &state.agents {
123-
if agent.can_meet(&jankyfile.needs) {
124-
debug!("agent: {:?} can meet our needs", agent);
125-
let commands: Vec<janky::Command> = jankyfile
126-
.commands
127-
.iter()
128-
.map(|c| janky::Command::with_script(c))
129-
.collect();
130-
let commands = janky::CommandRequest { commands };
131-
let client = reqwest::Client::new();
132-
let _res = client
133-
.put(
134-
agent
135-
.url
136-
.join("/api/v1/execute")
137-
.expect("Failed to join execute URL"),
138-
)
139-
.json(&commands)
140-
.send()
141-
.await?;
142-
143-
return Ok(json!({
144-
"msg": format!("Executing on {}", &agent.url)
145-
})
146-
.into());
147-
}
148-
}
149-
}
150-
}
151-
return Ok("{}".into());
152-
}
153-
Ok(Response::new(StatusCode::InternalServerError))
154-
}
155-
}
156-
}
157-
15863
#[derive(Clone, Debug, Deserialize)]
15964
struct JankyYml {
16065
needs: Vec<String>,
@@ -278,7 +183,10 @@ async fn main() -> Result<(), tide::Error> {
278183
});
279184
}
280185

281-
state.register_templates().await.expect("Failed to register handlebars templates");
186+
state
187+
.register_templates()
188+
.await
189+
.expect("Failed to register handlebars templates");
282190
let mut app = tide::with_state(state);
283191

284192
#[cfg(not(debug_assertions))]

0 commit comments

Comments
 (0)