Skip to content

Add/enhance MySQL support #864

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
597 changes: 593 additions & 4 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ e2e-tests = []
outbound-redis-tests = []
config-provider-tests = []
outbound-pg-tests = []
outbound-mysql-tests = []

[workspace]
members = [
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ test-config-provider:
test-outbound-pg:
RUST_LOG=$(LOG_LEVEL) cargo test --test integration --features outbound-pg-tests --no-fail-fast -- --nocapture

.PHONY: test-outbound-mysql
test-outbound-mysql:
RUST_LOG=$(LOG_LEVEL) cargo test --test integration --features outbound-mysql-tests --no-fail-fast -- --nocapture

.PHONY: test-sdk-go
test-sdk-go:
$(MAKE) -C sdk/go test
Expand Down
17 changes: 17 additions & 0 deletions crates/outbound-mysql/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "outbound-mysql"
version = { workspace = true }
authors = { workspace = true }
edition = { workspace = true }

[lib]
doctest = false

[dependencies]
anyhow = "1.0"
mysql_async = "0.30.0"
mysql_common = "0.29.1"
spin-core = { path = "../core" }
tokio = { version = "1", features = [ "rt-multi-thread" ] }
tracing = { version = "0.1", features = [ "log" ] }
wit-bindgen-wasmtime = { workspace = true }
254 changes: 254 additions & 0 deletions crates/outbound-mysql/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
use mysql_async::consts::ColumnType;
use mysql_async::{from_value_opt, prelude::*};
pub use outbound_mysql::add_to_linker;
use spin_core::HostComponent;
use std::collections::HashMap;
use std::sync::Arc;
use wit_bindgen_wasmtime::async_trait;

wit_bindgen_wasmtime::export!({paths: ["../../wit/ephemeral/outbound-mysql.wit"], async: *});
use outbound_mysql::*;

/// A simple implementation to support outbound mysql connection
#[derive(Default)]
pub struct OutboundMysql {
pub connections: HashMap<String, mysql_async::Conn>,
}

impl HostComponent for OutboundMysql {
type Data = Self;

fn add_to_linker<T: Send>(
linker: &mut spin_core::Linker<T>,
get: impl Fn(&mut spin_core::Data<T>) -> &mut Self::Data + Send + Sync + Copy + 'static,
) -> anyhow::Result<()> {
outbound_mysql::add_to_linker(linker, get)
}

fn build_data(&self) -> Self::Data {
Default::default()
}
}

#[async_trait]
impl outbound_mysql::OutboundMysql for OutboundMysql {
async fn execute(
&mut self,
address: &str,
statement: &str,
params: Vec<ParameterValue<'_>>,
) -> Result<(), MysqlError> {
let db_params = params
.iter()
.map(to_sql_parameter)
.collect::<anyhow::Result<Vec<_>>>()
.map_err(|e| MysqlError::QueryFailed(format!("{:?}", e)))?;

let parameters = mysql_async::Params::Positional(db_params);

self.get_conn(address)
.await
.map_err(|e| MysqlError::ConnectionFailed(format!("{:?}", e)))?
.exec_batch(statement, &[parameters])
.await
.map_err(|e| MysqlError::QueryFailed(format!("{:?}", e)))?;

Ok(())
}

async fn query(
&mut self,
address: &str,
statement: &str,
params: Vec<ParameterValue<'_>>,
) -> Result<RowSet, MysqlError> {
let db_params = params
.iter()
.map(to_sql_parameter)
.collect::<anyhow::Result<Vec<_>>>()
.map_err(|e| MysqlError::QueryFailed(format!("{:?}", e)))?;

let parameters = mysql_async::Params::Positional(db_params);

let mut query_result = self
.get_conn(address)
.await
.map_err(|e| MysqlError::ConnectionFailed(format!("{:?}", e)))?
.exec_iter(statement, parameters)
.await
.map_err(|e| MysqlError::QueryFailed(format!("{:?}", e)))?;

// We have to get these before collect() destroys them
let columns = convert_columns(query_result.columns());

match query_result.collect::<mysql_async::Row>().await {
Err(e) => Err(MysqlError::OtherError(format!("{:?}", e))),
Ok(result_set) => {
let rows = result_set
.into_iter()
.map(|row| convert_row(row, &columns))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| MysqlError::QueryFailed(format!("{:?}", e)))?;

Ok(RowSet { columns, rows })
}
}
}
}

fn to_sql_parameter(value: &ParameterValue) -> anyhow::Result<mysql_async::Value> {
match value {
ParameterValue::Boolean(v) => Ok(mysql_async::Value::from(v)),
ParameterValue::Int32(v) => Ok(mysql_async::Value::from(v)),
ParameterValue::Int64(v) => Ok(mysql_async::Value::from(v)),
ParameterValue::Int8(v) => Ok(mysql_async::Value::from(v)),
ParameterValue::Int16(v) => Ok(mysql_async::Value::from(v)),
ParameterValue::Floating32(v) => Ok(mysql_async::Value::from(v)),
ParameterValue::Floating64(v) => Ok(mysql_async::Value::from(v)),
ParameterValue::Uint8(v) => Ok(mysql_async::Value::from(v)),
ParameterValue::Uint16(v) => Ok(mysql_async::Value::from(v)),
ParameterValue::Uint32(v) => Ok(mysql_async::Value::from(v)),
ParameterValue::Uint64(v) => Ok(mysql_async::Value::from(v)),
ParameterValue::Str(v) => Ok(mysql_async::Value::from(v)),
ParameterValue::Binary(v) => Ok(mysql_async::Value::from(v)),
ParameterValue::DbNull => Ok(mysql_async::Value::NULL),
}
}

fn convert_columns(columns: Option<Arc<[mysql_async::Column]>>) -> Vec<Column> {
match columns {
Some(columns) => columns.iter().map(convert_column).collect(),
None => vec![],
}
}

fn convert_column(column: &mysql_async::Column) -> Column {
let name = column.name_str().to_string();
let data_type = convert_data_type(column);

Column { name, data_type }
}

fn convert_data_type(column: &mysql_async::Column) -> DbDataType {
let column_type = column.column_type();

if column_type.is_numeric_type() {
convert_numeric_type(column)
} else if column_type.is_character_type() {
convert_character_type(column)
} else {
DbDataType::Other
}
}

fn convert_character_type(column: &mysql_async::Column) -> DbDataType {
match (column.column_type(), is_binary(column)) {
(ColumnType::MYSQL_TYPE_BLOB, false) => DbDataType::Str, // TEXT type
(ColumnType::MYSQL_TYPE_BLOB, _) => DbDataType::Binary,
(ColumnType::MYSQL_TYPE_LONG_BLOB, _) => DbDataType::Binary,
(ColumnType::MYSQL_TYPE_MEDIUM_BLOB, _) => DbDataType::Binary,
(ColumnType::MYSQL_TYPE_STRING, true) => DbDataType::Binary, // BINARY type
(ColumnType::MYSQL_TYPE_STRING, _) => DbDataType::Str,
(ColumnType::MYSQL_TYPE_VAR_STRING, true) => DbDataType::Binary, // VARBINARY type
(ColumnType::MYSQL_TYPE_VAR_STRING, _) => DbDataType::Str,
(_, _) => DbDataType::Other,
}
}

fn convert_numeric_type(column: &mysql_async::Column) -> DbDataType {
match (column.column_type(), is_signed(column)) {
(ColumnType::MYSQL_TYPE_DOUBLE, _) => DbDataType::Floating64,
(ColumnType::MYSQL_TYPE_FLOAT, _) => DbDataType::Floating32,
(ColumnType::MYSQL_TYPE_INT24, true) => DbDataType::Int32,
(ColumnType::MYSQL_TYPE_INT24, false) => DbDataType::Uint32,
(ColumnType::MYSQL_TYPE_LONG, true) => DbDataType::Int32,
(ColumnType::MYSQL_TYPE_LONG, false) => DbDataType::Uint32,
(ColumnType::MYSQL_TYPE_LONGLONG, true) => DbDataType::Int64,
(ColumnType::MYSQL_TYPE_LONGLONG, false) => DbDataType::Uint64,
(ColumnType::MYSQL_TYPE_SHORT, true) => DbDataType::Int16,
(ColumnType::MYSQL_TYPE_SHORT, false) => DbDataType::Uint16,
(ColumnType::MYSQL_TYPE_TINY, true) => DbDataType::Int8,
(ColumnType::MYSQL_TYPE_TINY, false) => DbDataType::Uint8,
(_, _) => DbDataType::Other,
}
}

fn is_signed(column: &mysql_async::Column) -> bool {
!column
.flags()
.contains(mysql_async::consts::ColumnFlags::UNSIGNED_FLAG)
}

fn is_binary(column: &mysql_async::Column) -> bool {
column
.flags()
.contains(mysql_async::consts::ColumnFlags::BINARY_FLAG)
}

fn convert_row(mut row: mysql_async::Row, columns: &[Column]) -> Result<Vec<DbValue>, MysqlError> {
let mut result = Vec::with_capacity(row.len());
for index in 0..row.len() {
result.push(convert_entry(&mut row, index, columns)?);
}
Ok(result)
}

fn convert_entry(
row: &mut mysql_async::Row,
index: usize,
columns: &[Column],
) -> Result<DbValue, MysqlError> {
match (row.take(index), columns.get(index)) {
(None, _) => Ok(DbValue::DbNull), // TODO: is this right or is this an "index out of range" thing
(_, None) => Err(MysqlError::OtherError(format!(
"Can't get column at index {}",
index
))),
(Some(mysql_async::Value::NULL), _) => Ok(DbValue::DbNull),
(Some(value), Some(column)) => convert_value(value, column),
}
}

fn convert_value(value: mysql_async::Value, column: &Column) -> Result<DbValue, MysqlError> {
match column.data_type {
DbDataType::Binary => convert_value_to::<Vec<u8>>(value).map(DbValue::Binary),
DbDataType::Boolean => convert_value_to::<bool>(value).map(DbValue::Boolean),
DbDataType::Floating32 => convert_value_to::<f32>(value).map(DbValue::Floating32),
DbDataType::Floating64 => convert_value_to::<f64>(value).map(DbValue::Floating64),
DbDataType::Int8 => convert_value_to::<i8>(value).map(DbValue::Int8),
DbDataType::Int16 => convert_value_to::<i16>(value).map(DbValue::Int16),
DbDataType::Int32 => convert_value_to::<i32>(value).map(DbValue::Int32),
DbDataType::Int64 => convert_value_to::<i64>(value).map(DbValue::Int64),
DbDataType::Str => convert_value_to::<String>(value).map(DbValue::Str),
DbDataType::Uint8 => convert_value_to::<u8>(value).map(DbValue::Uint8),
DbDataType::Uint16 => convert_value_to::<u16>(value).map(DbValue::Uint16),
DbDataType::Uint32 => convert_value_to::<u32>(value).map(DbValue::Uint32),
DbDataType::Uint64 => convert_value_to::<u64>(value).map(DbValue::Uint64),
DbDataType::Other => Err(MysqlError::ValueConversionFailed(format!(
"Cannot convert value {:?} in column {} data type {:?}",
value, column.name, column.data_type
))),
}
}

impl OutboundMysql {
async fn get_conn(&mut self, address: &str) -> anyhow::Result<&mut mysql_async::Conn> {
let client = match self.connections.entry(address.to_owned()) {
std::collections::hash_map::Entry::Occupied(o) => o.into_mut(),
std::collections::hash_map::Entry::Vacant(v) => v.insert(build_conn(address).await?),
};
Ok(client)
}
}

async fn build_conn(address: &str) -> Result<mysql_async::Conn, mysql_async::Error> {
tracing::log::debug!("Build new connection: {}", address);

let connection_pool = mysql_async::Pool::new(address);

connection_pool.get_conn().await
}

fn convert_value_to<T: FromValue>(value: mysql_async::Value) -> Result<T, MysqlError> {
from_value_opt::<T>(value).map_err(|e| MysqlError::ValueConversionFailed(format!("{}", e)))
}
3 changes: 2 additions & 1 deletion crates/trigger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ clap = { version = "3.1.15", features = ["derive", "env"] }
ctrlc = { version = "3.2", features = ["termination"] }
dirs = "4"
futures = "0.3"
outbound-http = { path = "../outbound-http" }
outbound-http = { path = "../outbound-http" }
outbound-redis = { path = "../outbound-redis" }
outbound-pg = { path = "../outbound-pg" }
outbound-mysql = { path = "../outbound-mysql" }
sanitize-filename = "0.4"
serde = "1.0"
serde_json = "1.0"
Expand Down
1 change: 1 addition & 0 deletions crates/trigger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl<Executor: TriggerExecutor> TriggerExecutorBuilder<Executor> {
if !self.disable_default_host_components {
builder.add_host_component(outbound_redis::OutboundRedisComponent)?;
builder.add_host_component(outbound_pg::OutboundPg::default())?;
builder.add_host_component(outbound_mysql::OutboundMysql::default())?;
self.loader.add_dynamic_host_component(
&mut builder,
outbound_http::OutboundHttpComponent,
Expand Down
2 changes: 2 additions & 0 deletions examples/rust-outbound-mysql/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
target = "wasm32-wasi"
1 change: 1 addition & 0 deletions examples/rust-outbound-mysql/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
25 changes: 25 additions & 0 deletions examples/rust-outbound-mysql/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "rust-outbound-mysql"
authors = ["itowlson <[email protected]>"]
description = "Demo of calling MySQL from a Spin application"
version = "0.1.0"
edition = "2021"

[lib]
crate-type = [ "cdylib" ]

[dependencies]
# Useful crate to handle errors.
anyhow = "1"
# Crate to simplify working with bytes.
bytes = "1"
# General-purpose crate with common HTTP types.
http = "0.2"
serde = "1.0.144"
serde_json = "1.0.85"
# The Spin SDK.
spin-sdk = { path = "../../sdk/rust" }
# Crate that generates Rust Wasm bindings from a WebAssembly interface.
wit-bindgen-rust = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "cb871cfa1ee460b51eb1d144b175b9aab9c50aba" }

[workspace]
4 changes: 4 additions & 0 deletions examples/rust-outbound-mysql/db/pets.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE pets (id INT PRIMARY KEY, name VARCHAR(100) NOT NULL, prey VARCHAR(100), is_finicky BOOL NOT NULL);
INSERT INTO pets VALUES (1, 'Splodge', NULL, false);
INSERT INTO pets VALUES (2, 'Kiki', 'Cicadas', false);
INSERT INTO pets VALUES (3, 'Slats', 'Temptations', true);
15 changes: 15 additions & 0 deletions examples/rust-outbound-mysql/spin.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
spin_version = "1"
authors = ["itowlson <[email protected]>"]
description = "Demo of calling MySQL from a Spin application"
name = "rust-outbound-mysql"
trigger = { type = "http", base = "/" }
version = "0.1.0"

[[component]]
environment = { DB_URL = "mysql://spin:[email protected]/spin_dev" }
id = "rust-outbound-mysql"
source = "target/wasm32-wasi/release/rust_outbound_mysql.wasm"
[component.trigger]
route = "/..."
[component.build]
command = "cargo build --target wasm32-wasi --release"
Loading