Skip to content

Commit 2cb7362

Browse files
committed
Add DataClient struct, allowing users to query Grafana for data over gRPC
WIP, not even tested, feels like it might just work though? TODO: - [ ] Finish off `Frame::from_arrow` so it actually populates the chunks - [ ] Test it out Depends on grafana/grafana#55781.
1 parent ce2dc43 commit 2cb7362

File tree

8 files changed

+333
-56
lines changed

8 files changed

+333
-56
lines changed

Diff for: crates/grafana-plugin-sdk/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ description = "SDK for building Grafana backend plugins."
1010

1111
[dependencies]
1212
arrow2 = { version = "0.14.0", features = ["io_ipc"] }
13+
bytes = "1.2.1"
1314
chrono = "0.4.19"
1415
futures-core = "0.3.17"
1516
futures-util = "0.3.17"
@@ -40,7 +41,7 @@ tracing-subscriber = { version = "0.3.1", features = [
4041

4142
[dev-dependencies]
4243
async-stream = "0.3.2"
43-
bytes = "1.1.0"
44+
bytes = "1.2.1"
4445
futures = "0.3.17"
4546
paste = "1.0.6"
4647
pretty_assertions = "1.0.0"

Diff for: crates/grafana-plugin-sdk/examples/main.rs

+64-30
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use bytes::Bytes;
1010
use chrono::prelude::*;
1111
use futures_util::stream::FuturesOrdered;
1212
use http::Response;
13-
use serde::Deserialize;
13+
use serde::{Deserialize, Serialize};
1414
use thiserror::Error;
1515
use tokio_stream::StreamExt;
1616
use tracing::{debug, info};
@@ -21,28 +21,42 @@ use grafana_plugin_sdk::{
2121
prelude::*,
2222
};
2323

24-
#[derive(Clone, Debug, Default)]
25-
struct MyPluginService(Arc<AtomicUsize>);
24+
#[derive(Clone, Debug)]
25+
struct MyPluginService {
26+
counter: Arc<AtomicUsize>,
27+
data_client: backend::DataClient,
28+
}
2629

2730
impl MyPluginService {
2831
fn new() -> Self {
29-
Self(Arc::new(AtomicUsize::new(0)))
32+
Self {
33+
counter: Arc::new(AtomicUsize::new(0)),
34+
data_client: backend::DataClient::new("localhost:10000").expect("valid URL"),
35+
}
3036
}
3137
}
3238

3339
// Data service implementation.
3440

35-
#[derive(Debug, Deserialize)]
41+
#[derive(Clone, Debug, Deserialize, Serialize)]
3642
#[serde(rename_all = "camelCase")]
3743
struct Query {
3844
pub expression: String,
3945
pub other_user_input: u64,
4046
}
4147

48+
#[derive(Debug, Error)]
49+
enum SDKError {
50+
#[error("SDK error: {0}")]
51+
Data(data::Error),
52+
#[error("Query data error: {0}")]
53+
QueryData(backend::QueryDataError),
54+
}
55+
4256
#[derive(Debug, Error)]
4357
#[error("Error querying backend for query {ref_id}: {source}")]
4458
struct QueryError {
45-
source: data::Error,
59+
source: SDKError,
4660
ref_id: String,
4761
}
4862

@@ -58,40 +72,60 @@ impl backend::DataService for MyPluginService {
5872
type QueryError = QueryError;
5973
type Stream = backend::BoxDataResponseStream<Self::QueryError>;
6074
async fn query_data(&self, request: backend::QueryDataRequest<Self::Query>) -> Self::Stream {
75+
let client = self.data_client.clone();
76+
let transport_metadata = request.transport_metadata().clone();
6177
Box::pin(
6278
request
6379
.queries
6480
.into_iter()
65-
.map(|x: DataQuery<Self::Query>| async move {
81+
.map(|x: DataQuery<Self::Query>| {
6682
// We can see the user's query in `x.query`:
6783
debug!(
6884
expression = x.query.expression,
6985
other_user_input = x.query.other_user_input,
7086
"Got backend query",
7187
);
72-
// Here we create a single response Frame for each query.
73-
// Frames can be created from iterators of fields using [`IntoFrame`].
74-
Ok(backend::DataResponse::new(
75-
x.ref_id.clone(),
76-
vec![[
77-
// Fields can be created from iterators of a variety of
78-
// relevant datatypes.
79-
[
80-
Utc.ymd(2021, 1, 1).and_hms(12, 0, 0),
81-
Utc.ymd(2021, 1, 1).and_hms(12, 0, 1),
82-
Utc.ymd(2021, 1, 1).and_hms(12, 0, 2),
88+
89+
let ref_id = x.ref_id.clone();
90+
let transport_metadata = transport_metadata.clone();
91+
let mut client = client.clone();
92+
93+
async move {
94+
// We can proxy this request to a different datasource if we like.
95+
// Here we do that, but ignore the response.
96+
let proxied = client
97+
.query_data(vec![x.clone()], &transport_metadata)
98+
.await
99+
.map_err(|source| QueryError {
100+
source: SDKError::QueryData(source),
101+
ref_id: ref_id.clone(),
102+
})?;
103+
info!("Got proxied response: {:?}", proxied);
104+
105+
// Here we create a single response Frame for each query.
106+
// Frames can be created from iterators of fields using [`IntoFrame`].
107+
Ok(backend::DataResponse::new(
108+
ref_id.clone(),
109+
vec![[
110+
// Fields can be created from iterators of a variety of
111+
// relevant datatypes.
112+
[
113+
Utc.ymd(2021, 1, 1).and_hms(12, 0, 0),
114+
Utc.ymd(2021, 1, 1).and_hms(12, 0, 1),
115+
Utc.ymd(2021, 1, 1).and_hms(12, 0, 2),
116+
]
117+
.into_field("time"),
118+
[1_u32, 2, 3].into_field("x"),
119+
["a", "b", "c"].into_field("y"),
83120
]
84-
.into_field("time"),
85-
[1_u32, 2, 3].into_field("x"),
86-
["a", "b", "c"].into_field("y"),
87-
]
88-
.into_frame("foo")
89-
.check()
90-
.map_err(|source| QueryError {
91-
ref_id: x.ref_id,
92-
source,
93-
})?],
94-
))
121+
.into_frame("foo")
122+
.check()
123+
.map_err(|source| QueryError {
124+
source: SDKError::Data(source),
125+
ref_id,
126+
})?],
127+
))
128+
}
95129
})
96130
.collect::<FuturesOrdered<_>>(),
97131
)
@@ -195,7 +229,7 @@ impl backend::ResourceService for MyPluginService {
195229
&self,
196230
r: backend::CallResourceRequest,
197231
) -> Result<(Self::InitialResponse, Self::Stream), Self::Error> {
198-
let count = Arc::clone(&self.0);
232+
let count = Arc::clone(&self.counter);
199233
let response_and_stream = match r.request.uri().path() {
200234
// Just send back a single response.
201235
"/echo" => Ok((

0 commit comments

Comments
 (0)