Skip to content

Commit dd015dd

Browse files
authored
Merge pull request #2735 from drmingdrmer/leader
[metasrv] refactor: move write_to_local_leader() to MetaLeader
2 parents d22a1ae + d7407fc commit dd015dd

File tree

6 files changed

+96
-97
lines changed

6 files changed

+96
-97
lines changed

metasrv/src/meta_service/message.rs

+3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ pub struct JoinRequest {
3535
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, derive_more::TryInto)]
3636
pub enum AdminRequestInner {
3737
Join(JoinRequest),
38+
Write(LogEntry),
3839
}
3940

4041
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
@@ -52,8 +53,10 @@ impl AdminRequest {
5253
}
5354

5455
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, derive_more::TryInto)]
56+
#[allow(clippy::large_enum_variant)]
5557
pub enum AdminResponse {
5658
Join(()),
59+
AppliedState(AppliedState),
5760
}
5861

5962
impl tonic::IntoRequest<RaftRequest> for AdminRequest {

metasrv/src/meta_service/meta_leader.rs

+38-18
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
use std::collections::BTreeSet;
1616

1717
use async_raft::error::ResponseError;
18+
use async_raft::raft::ClientWriteRequest;
1819
use async_raft::ChangeConfigError;
20+
use async_raft::ClientWriteError;
21+
use common_meta_raft_store::state_machine::AppliedState;
1922
use common_meta_types::Cmd;
2023
use common_meta_types::LogEntry;
2124
use common_meta_types::Node;
@@ -25,7 +28,6 @@ use common_tracing::tracing;
2528
use crate::errors::ForwardToLeader;
2629
use crate::errors::InvalidMembership;
2730
use crate::errors::MetaError;
28-
use crate::errors::RetryableError;
2931
use crate::meta_service::message::AdminRequest;
3032
use crate::meta_service::message::AdminResponse;
3133
use crate::meta_service::AdminRequestInner;
@@ -52,6 +54,10 @@ impl<'a> MetaLeader<'a> {
5254
self.join(join_req).await?;
5355
Ok(AdminResponse::Join(()))
5456
}
57+
AdminRequestInner::Write(entry) => {
58+
let res = self.write(entry).await?;
59+
Ok(AdminResponse::AppliedState(res))
60+
}
5561
}
5662
}
5763

@@ -85,23 +91,7 @@ impl<'a> MetaLeader<'a> {
8591
},
8692
};
8793

88-
let res = self
89-
.meta_node
90-
.write_to_local_leader(ent.clone())
91-
.await
92-
.map_err(|e| MetaError::UnknownError(e.to_string()))?;
93-
match res {
94-
Ok(_applied_state) => {}
95-
Err(retryable_error) => {
96-
// TODO(xp): remove retryable error.
97-
let leader_id = match retryable_error {
98-
RetryableError::ForwardToLeader { leader } => leader,
99-
};
100-
return Err(MetaError::ForwardToLeader(ForwardToLeader {
101-
leader: Some(leader_id),
102-
}));
103-
}
104-
}
94+
self.write(ent.clone()).await?;
10595

10696
self.change_membership(membership).await
10797
}
@@ -138,4 +128,34 @@ impl<'a> MetaLeader<'a> {
138128
_ => Err(MetaError::UnknownError("uncovered error".to_string())),
139129
}
140130
}
131+
132+
/// Write a log through local raft node and return the states before and after applying the log.
133+
///
134+
/// If the raft node is not a leader, it returns MetaError::ForwardToLeader.
135+
/// If the leadership is lost during writing the log, it returns an UnknownError.
136+
/// TODO(xp): elaborate the UnknownError, e.g. LeaderLostError
137+
#[tracing::instrument(level = "debug", skip(self))]
138+
pub async fn write(&self, entry: LogEntry) -> Result<AppliedState, MetaError> {
139+
let write_rst = self
140+
.meta_node
141+
.raft
142+
.client_write(ClientWriteRequest::new(entry))
143+
.await;
144+
145+
tracing::debug!("raft.client_write rst: {:?}", write_rst);
146+
147+
match write_rst {
148+
Ok(resp) => Ok(resp.data),
149+
Err(cli_write_err) => match cli_write_err {
150+
// fatal error
151+
ClientWriteError::RaftError(raft_err) => {
152+
Err(MetaError::UnknownError(raft_err.to_string()))
153+
}
154+
// retryable error
155+
ClientWriteError::ForwardToLeader(_, leader) => {
156+
Err(MetaError::ForwardToLeader(ForwardToLeader { leader }))
157+
}
158+
},
159+
}
160+
}
141161
}

metasrv/src/meta_service/meta_service_impl.rs

+15-7
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::sync::Arc;
2121
use common_meta_types::LogEntry;
2222
use common_tracing::tracing;
2323

24+
use crate::errors::MetaError;
2425
use crate::meta_service::message::AdminRequest;
2526
use crate::meta_service::MetaNode;
2627
use crate::proto::meta_service_server::MetaService;
@@ -53,14 +54,21 @@ impl MetaService for MetaServiceImpl {
5354
let mes = request.into_inner();
5455
let req: LogEntry = mes.try_into()?;
5556

56-
let rst = self
57-
.meta_node
58-
.write_to_local_leader(req)
59-
.await
60-
.map_err(|e| tonic::Status::internal(e.to_string()))?;
57+
let leader = self.meta_node.as_leader().await;
6158

62-
let raft_mes = rst.into();
63-
Ok(tonic::Response::new(raft_mes))
59+
let leader = match leader {
60+
Ok(x) => x,
61+
Err(err) => {
62+
let err: MetaError = err.into();
63+
let raft_reply = Err::<(), _>(err).into();
64+
return Ok(tonic::Response::new(raft_reply));
65+
}
66+
};
67+
68+
let rst = leader.write(req).await;
69+
70+
let raft_reply = rst.into();
71+
Ok(tonic::Response::new(raft_reply))
6472
}
6573

6674
#[tracing::instrument(level = "info", skip(self))]

metasrv/src/meta_service/meta_service_impl_test.rs

+10-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ use common_tracing::tracing;
2525
use log::info;
2626
use pretty_assertions::assert_eq;
2727

28+
use crate::errors::ForwardToLeader;
29+
use crate::errors::MetaError;
2830
use crate::errors::RetryableError;
2931
use crate::meta_service::MetaNode;
3032
use crate::proto::meta_service_client::MetaServiceClient;
@@ -165,12 +167,17 @@ async fn test_meta_cluster_write_on_non_leader() -> anyhow::Result<()> {
165167
};
166168
let raft_mes = client.write(req).await?.into_inner();
167169

168-
let rst: Result<AppliedState, RetryableError> = raft_mes.into();
170+
let rst: Result<AppliedState, MetaError> = raft_mes.into();
171+
println!("{:?}", rst);
172+
169173
assert!(rst.is_err());
170174
let err = rst.unwrap_err();
171175
match err {
172-
RetryableError::ForwardToLeader { leader } => {
173-
assert_eq!(leader, 0);
176+
MetaError::ForwardToLeader(ForwardToLeader { leader }) => {
177+
assert_eq!(leader, Some(0));
178+
}
179+
_ => {
180+
panic!("expect ForwardToLeader")
174181
}
175182
}
176183

metasrv/src/meta_service/raftmeta.rs

+11-58
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ use std::collections::BTreeSet;
1616
use std::sync::Arc;
1717

1818
use async_raft::config::Config;
19-
use async_raft::raft::ClientWriteRequest;
20-
use async_raft::ClientWriteError;
2119
use async_raft::Raft;
2220
use async_raft::RaftMetrics;
2321
use async_raft::SnapshotPolicy;
@@ -46,10 +44,10 @@ use common_tracing::tracing::Instrument;
4644
use crate::errors::ConnectionError;
4745
use crate::errors::ForwardToLeader;
4846
use crate::errors::MetaError;
49-
use crate::errors::RetryableError;
5047
use crate::meta_service::message::AdminRequest;
5148
use crate::meta_service::message::AdminResponse;
5249
use crate::meta_service::meta_leader::MetaLeader;
50+
use crate::meta_service::AdminRequestInner;
5351
use crate::meta_service::MetaServiceImpl;
5452
use crate::meta_service::Network;
5553
use crate::proto::meta_service_client::MetaServiceClient;
@@ -532,31 +530,16 @@ impl MetaNode {
532530
/// Submit a write request to the known leader. Returns the response after applying the request.
533531
#[tracing::instrument(level = "info", skip(self))]
534532
pub async fn write(&self, req: LogEntry) -> common_exception::Result<AppliedState> {
535-
let mut curr_leader = self.get_leader().await;
536-
loop {
537-
let rst = if curr_leader == self.sto.id {
538-
self.write_to_local_leader(req.clone()).await?
539-
} else {
540-
// forward to leader
541-
542-
let addr = self.sto.get_node_addr(&curr_leader).await?;
543-
544-
// TODO: retry
545-
let mut client = MetaServiceClient::connect(format!("http://{}", addr))
546-
.await
547-
.map_err(|e| ErrorCode::CannotConnectNode(e.to_string()))?;
548-
let resp = client.write(req.clone()).await?;
549-
let rst: Result<AppliedState, RetryableError> = resp.into_inner().into();
550-
rst
551-
};
552-
553-
match rst {
554-
Ok(resp) => return Ok(resp),
555-
Err(write_err) => match write_err {
556-
RetryableError::ForwardToLeader { leader } => curr_leader = leader,
557-
},
558-
}
559-
}
533+
let res = self
534+
.handle_admin_req(AdminRequest {
535+
forward_to_leader: true,
536+
req: AdminRequestInner::Write(req.clone()),
537+
})
538+
.await?;
539+
540+
let res: AppliedState = res.try_into().expect("expect AppliedState");
541+
542+
Ok(res)
560543
}
561544

562545
/// Try to get the leader from the latest metrics of the local raft node.
@@ -616,34 +599,4 @@ impl MetaNode {
616599
let res: Result<AdminResponse, MetaError> = raft_mes.into();
617600
res
618601
}
619-
620-
/// Write a meta log through local raft node.
621-
/// It works only when this node is the leader,
622-
/// otherwise it returns ClientWriteError::ForwardToLeader error indicating the latest leader.
623-
#[tracing::instrument(level = "info", skip(self))]
624-
pub async fn write_to_local_leader(
625-
&self,
626-
req: LogEntry,
627-
) -> common_exception::Result<Result<AppliedState, RetryableError>> {
628-
let write_rst = self.raft.client_write(ClientWriteRequest::new(req)).await;
629-
630-
tracing::debug!("raft.client_write rst: {:?}", write_rst);
631-
632-
match write_rst {
633-
Ok(resp) => Ok(Ok(resp.data)),
634-
Err(cli_write_err) => match cli_write_err {
635-
// fatal error
636-
ClientWriteError::RaftError(raft_err) => {
637-
Err(ErrorCode::MetaServiceError(raft_err.to_string()))
638-
}
639-
// retryable error
640-
ClientWriteError::ForwardToLeader(_, leader) => match leader {
641-
Some(id) => Ok(Err(RetryableError::ForwardToLeader { leader: id })),
642-
None => Err(ErrorCode::MetaServiceUnavailable(
643-
"no leader to write".to_string(),
644-
)),
645-
},
646-
},
647-
}
648-
}
649602
}

metasrv/src/meta_service/raftmeta_test.rs

+19-11
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ use maplit::btreeset;
3232
use pretty_assertions::assert_eq;
3333

3434
use crate::configs;
35-
use crate::errors::RetryableError;
35+
use crate::errors::ForwardToLeader;
36+
use crate::errors::MetaError;
3637
use crate::meta_service::message::AdminRequest;
38+
use crate::meta_service::meta_leader::MetaLeader;
3739
use crate::meta_service::AdminRequestInner;
3840
use crate::meta_service::JoinRequest;
3941
use crate::meta_service::MetaNode;
@@ -136,8 +138,9 @@ async fn test_meta_node_write_to_local_leader() -> anyhow::Result<()> {
136138
let key = "t-non-leader-write";
137139
for id in 0u64..4 {
138140
let mn = &all[id as usize];
139-
let rst = mn
140-
.write_to_local_leader(LogEntry {
141+
let maybe_leader = MetaLeader::new(mn);
142+
let rst = maybe_leader
143+
.write(LogEntry {
141144
txid: None,
142145
cmd: Cmd::UpsertKV {
143146
key: key.to_string(),
@@ -148,16 +151,17 @@ async fn test_meta_node_write_to_local_leader() -> anyhow::Result<()> {
148151
})
149152
.await;
150153

151-
let rst = rst?;
152-
153154
if id == leader_id {
154155
assert!(rst.is_ok());
155156
} else {
156157
assert!(rst.is_err());
157158
let e = rst.unwrap_err();
158159
match e {
159-
RetryableError::ForwardToLeader { leader } => {
160-
assert_eq!(leader_id, leader);
160+
MetaError::ForwardToLeader(ForwardToLeader { leader }) => {
161+
assert_eq!(Some(leader_id), leader);
162+
}
163+
_ => {
164+
panic!("expect ForwardToLeader")
161165
}
162166
}
163167
}
@@ -559,7 +563,9 @@ async fn test_meta_node_restart_single_node() -> anyhow::Result<()> {
559563
let leader = tc.meta_nodes.pop().unwrap();
560564

561565
leader
562-
.write_to_local_leader(LogEntry {
566+
.as_leader()
567+
.await?
568+
.write(LogEntry {
563569
txid: None,
564570
cmd: Cmd::UpsertKV {
565571
key: "foo".to_string(),
@@ -568,7 +574,7 @@ async fn test_meta_node_restart_single_node() -> anyhow::Result<()> {
568574
value_meta: None,
569575
},
570576
})
571-
.await??;
577+
.await?;
572578
log_cnt += 1;
573579

574580
want_hs = leader.sto.raft_state.read_hard_state()?;
@@ -752,7 +758,9 @@ async fn assert_upsert_kv_synced(meta_nodes: Vec<Arc<MetaNode>>, key: &str) -> a
752758
tracing::info!("leader: last_applied={}", last_applied);
753759
{
754760
leader
755-
.write_to_local_leader(LogEntry {
761+
.as_leader()
762+
.await?
763+
.write(LogEntry {
756764
txid: None,
757765
cmd: Cmd::UpsertKV {
758766
key: key.to_string(),
@@ -761,7 +769,7 @@ async fn assert_upsert_kv_synced(meta_nodes: Vec<Arc<MetaNode>>, key: &str) -> a
761769
value_meta: None,
762770
},
763771
})
764-
.await??;
772+
.await?;
765773
}
766774

767775
assert_applied_index(meta_nodes.clone(), last_applied + 1).await?;

0 commit comments

Comments
 (0)