Skip to content

Commit 619a5d9

Browse files
committed
Add VSS Store Implementation
A KVStore implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend.
1 parent d78cab0 commit 619a5d9

File tree

6 files changed

+233
-0
lines changed

6 files changed

+233
-0
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ jobs:
4848
cargo update -p regex --precise "1.9.6" --verbose # regex 1.10.0 requires rustc 1.65.0
4949
cargo update -p jobserver --precise "0.1.26" --verbose # jobserver 0.1.27 requires rustc 1.66.0
5050
cargo update -p zstd-sys --precise "2.0.8+zstd.1.5.5" --verbose # zstd-sys 2.0.9+zstd.1.5.5 requires rustc 1.64.0
51+
cargo update -p petgraph --precise "0.6.3" --verbose # petgraph v0.6.4, requires rustc 1.64 or newer
5152
- name: Build on Rust ${{ matrix.toolchain }}
5253
run: cargo build --verbose --color always
5354
- name: Build with UniFFI support on Rust ${{ matrix.toolchain }}

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ tokio = { version = "1", default-features = false, features = [ "rt-multi-thread
6666
esplora-client = { version = "0.4", default-features = false }
6767
libc = "0.2"
6868
uniffi = { version = "0.23.0", features = ["build"], optional = true }
69+
vss-client = "0.1"
6970

7071
[target.'cfg(windows)'.dependencies]
7172
winapi = { version = "0.3", features = ["winbase"] }

src/builder.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ use bip39::Mnemonic;
4848

4949
use bitcoin::BlockHash;
5050

51+
#[cfg(any(vss, vss_test))]
52+
use crate::io::vss_store::VssStore;
5153
use std::convert::TryInto;
5254
use std::default::Default;
5355
use std::fmt;
@@ -269,6 +271,16 @@ impl NodeBuilder {
269271
self.build_with_store(kv_store)
270272
}
271273

274+
/// Builds a [`Node`] instance with a [`VssStore`] backend and according to the options
275+
/// previously configured.
276+
#[cfg(any(vss, vss_test))]
277+
pub fn build_with_vss_store(
278+
&self, url: &str, store_id: String,
279+
) -> Result<Node<VssStore>, BuildError> {
280+
let vss = Arc::new(VssStore::new(url, store_id));
281+
self.build_with_store(vss)
282+
}
283+
272284
/// Builds a [`Node`] instance according to the options previously configured.
273285
pub fn build_with_store<K: KVStore + Sync + Send + 'static>(
274286
&self, kv_store: Arc<K>,

src/io/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod sqlite_store;
44
#[cfg(test)]
55
pub(crate) mod test_utils;
66
pub(crate) mod utils;
7+
pub(crate) mod vss_store;
78

89
/// The event queue will be persisted under this key.
910
pub(crate) const EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";

src/io/vss_store.rs

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
use io::Error;
2+
use std::io;
3+
use std::io::ErrorKind;
4+
#[cfg(test)]
5+
use std::panic::RefUnwindSafe;
6+
7+
use crate::io::utils::check_namespace_key_validity;
8+
use lightning::util::persist::KVStore;
9+
use tokio::runtime::Runtime;
10+
use vss_client::client::VssClient;
11+
use vss_client::error::VssError;
12+
use vss_client::types::{
13+
DeleteObjectRequest, GetObjectRequest, KeyValue, ListKeyVersionsRequest, PutObjectRequest,
14+
};
15+
16+
/// A [`KVStore`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend.
17+
pub struct VssStore {
18+
client: VssClient,
19+
store_id: String,
20+
runtime: Runtime,
21+
}
22+
23+
impl VssStore {
24+
#[cfg(any(vss, vss_test))]
25+
pub(crate) fn new(base_url: &str, store_id: String) -> Self {
26+
let client = VssClient::new(base_url);
27+
let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
28+
Self { client, store_id, runtime }
29+
}
30+
31+
fn build_key(
32+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
33+
) -> io::Result<String> {
34+
if primary_namespace.is_empty() {
35+
Ok(key.to_string())
36+
} else {
37+
Ok(format!("{}#{}#{}", primary_namespace, secondary_namespace, key))
38+
}
39+
}
40+
41+
fn extract_key(&self, unified_key: &str) -> io::Result<String> {
42+
let mut parts = unified_key.splitn(3, '#');
43+
let (_primary_namespace, _secondary_namespace) = (parts.next(), parts.next());
44+
match parts.next() {
45+
Some(actual_key) => Ok(actual_key.to_string()),
46+
None => Err(Error::new(ErrorKind::InvalidData, "Invalid key format")),
47+
}
48+
}
49+
50+
async fn list_all_keys(
51+
&self, primary_namespace: &str, secondary_namespace: &str,
52+
) -> io::Result<Vec<String>> {
53+
let mut page_token = None;
54+
let mut keys = vec![];
55+
let key_prefix = format!("{}#{}", primary_namespace, secondary_namespace);
56+
while page_token != Some("".to_string()) {
57+
let request = ListKeyVersionsRequest {
58+
store_id: self.store_id.clone(),
59+
key_prefix: Some(key_prefix.clone()),
60+
page_token,
61+
page_size: None,
62+
};
63+
64+
let response = self.client.list_key_versions(&request).await.map_err(|e| {
65+
let msg = format!(
66+
"Failed to list keys in {}/{}: {}",
67+
primary_namespace, secondary_namespace, e
68+
);
69+
Error::new(ErrorKind::Other, msg)
70+
})?;
71+
72+
for kv in response.key_versions {
73+
keys.push(self.extract_key(&kv.key)?);
74+
}
75+
page_token = response.next_page_token;
76+
}
77+
Ok(keys)
78+
}
79+
}
80+
81+
impl KVStore for VssStore {
82+
fn read(
83+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
84+
) -> io::Result<Vec<u8>> {
85+
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
86+
let request = GetObjectRequest {
87+
store_id: self.store_id.clone(),
88+
key: self.build_key(primary_namespace, secondary_namespace, key)?,
89+
};
90+
91+
let resp =
92+
tokio::task::block_in_place(|| self.runtime.block_on(self.client.get_object(&request)))
93+
.map_err(|e| {
94+
let msg = format!(
95+
"Failed to read from key {}/{}/{}: {}",
96+
primary_namespace, secondary_namespace, key, e
97+
);
98+
match e {
99+
VssError::NoSuchKeyError(..) => Error::new(ErrorKind::NotFound, msg),
100+
_ => Error::new(ErrorKind::Other, msg),
101+
}
102+
})?;
103+
Ok(resp.value.unwrap().value)
104+
}
105+
106+
fn write(
107+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
108+
) -> io::Result<()> {
109+
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
110+
let request = PutObjectRequest {
111+
store_id: self.store_id.clone(),
112+
global_version: None,
113+
transaction_items: vec![KeyValue {
114+
key: self.build_key(primary_namespace, secondary_namespace, key)?,
115+
version: -1,
116+
value: buf.to_vec(),
117+
}],
118+
delete_items: vec![],
119+
};
120+
121+
tokio::task::block_in_place(|| self.runtime.block_on(self.client.put_object(&request)))
122+
.map_err(|e| {
123+
let msg = format!(
124+
"Failed to write to key {}/{}/{}: {}",
125+
primary_namespace, secondary_namespace, key, e
126+
);
127+
Error::new(ErrorKind::Other, msg)
128+
})?;
129+
130+
Ok(())
131+
}
132+
133+
fn remove(
134+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
135+
) -> io::Result<()> {
136+
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?;
137+
let request = DeleteObjectRequest {
138+
store_id: self.store_id.clone(),
139+
key_value: Some(KeyValue {
140+
key: self.build_key(primary_namespace, secondary_namespace, key)?,
141+
version: -1,
142+
value: vec![],
143+
}),
144+
};
145+
146+
tokio::task::block_in_place(|| self.runtime.block_on(self.client.delete_object(&request)))
147+
.map_err(|e| {
148+
let msg = format!(
149+
"Failed to delete key {}/{}/{}: {}",
150+
primary_namespace, secondary_namespace, key, e
151+
);
152+
Error::new(ErrorKind::Other, msg)
153+
})?;
154+
Ok(())
155+
}
156+
157+
fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
158+
check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?;
159+
160+
let keys = tokio::task::block_in_place(|| {
161+
self.runtime.block_on(self.list_all_keys(primary_namespace, secondary_namespace))
162+
})
163+
.map_err(|e| {
164+
let msg = format!(
165+
"Failed to retrieve keys in namespace: {}/{} : {}",
166+
primary_namespace, secondary_namespace, e
167+
);
168+
Error::new(ErrorKind::Other, msg)
169+
})?;
170+
171+
Ok(keys)
172+
}
173+
}
174+
175+
#[cfg(test)]
176+
impl RefUnwindSafe for VssStore {}
177+
178+
#[cfg(test)]
179+
#[cfg(vss_test)]
180+
mod tests {
181+
use super::*;
182+
use crate::io::test_utils::do_read_write_remove_list_persist;
183+
use rand::distributions::Alphanumeric;
184+
use rand::{thread_rng, Rng};
185+
186+
#[test]
187+
fn read_write_remove_list_persist() {
188+
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
189+
let mut rng = thread_rng();
190+
let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect();
191+
let vss_store = VssStore::new(&vss_base_url, rand_store_id);
192+
193+
do_read_write_remove_list_persist(&vss_store);
194+
}
195+
}

src/test/functional_tests.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,29 @@ fn channel_full_cycle() {
1818
do_channel_full_cycle(node_a, node_b, &bitcoind, &electrsd, false);
1919
}
2020

21+
#[test]
22+
#[cfg(vss_test)]
23+
fn channel_full_cycle_with_vss_store() {
24+
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();
25+
println!("== Node A ==");
26+
let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap());
27+
let config_a = random_config();
28+
let mut builder_a = NodeBuilder::from_config(config_a);
29+
builder_a.set_esplora_server(esplora_url.clone());
30+
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
31+
let node_a = builder_a.build_with_vss_store(&vss_base_url, "node_1_store".to_string()).unwrap();
32+
node_a.start().unwrap();
33+
34+
println!("\n== Node B ==");
35+
let config_b = random_config();
36+
let mut builder_b = NodeBuilder::from_config(config_b);
37+
builder_b.set_esplora_server(esplora_url);
38+
let node_b = builder_b.build_with_vss_store(&vss_base_url, "node_2_store".to_string()).unwrap();
39+
node_b.start().unwrap();
40+
41+
do_channel_full_cycle(node_a, node_b, &bitcoind, &electrsd, false);
42+
}
43+
2144
#[test]
2245
fn channel_full_cycle_0conf() {
2346
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();

0 commit comments

Comments
 (0)