Skip to content

Commit 014b1ec

Browse files
authored
Return error when unsubscribing from invalid subscription id (#619)
1 parent 135d929 commit 014b1ec

File tree

2 files changed

+92
-16
lines changed

2 files changed

+92
-16
lines changed

pubsub/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ serde = "1.0"
2121

2222
[dev-dependencies]
2323
jsonrpc-tcp-server = { version = "17.0", path = "../tcp" }
24+
serde_json = "1.0"
2425

2526
[badges]
2627
travis-ci = { repository = "paritytech/jsonrpc", branch = "master"}

pubsub/src/subscription.rs

Lines changed: 91 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,11 @@ impl Session {
8181
}
8282

8383
/// Removes existing subscription.
84-
fn remove_subscription(&self, name: &str, id: &SubscriptionId) {
85-
self.active_subscriptions.lock().remove(&(id.clone(), name.into()));
84+
fn remove_subscription(&self, name: &str, id: &SubscriptionId) -> bool {
85+
self.active_subscriptions
86+
.lock()
87+
.remove(&(id.clone(), name.into()))
88+
.is_some()
8689
}
8790
}
8891

@@ -335,8 +338,11 @@ where
335338
};
336339
match (meta.session(), id) {
337340
(Some(session), Some(id)) => {
338-
session.remove_subscription(&self.notification, &id);
339-
Box::pin(self.unsubscribe.call(id, Some(meta)))
341+
if session.remove_subscription(&self.notification, &id) {
342+
Box::pin(self.unsubscribe.call(id, Some(meta)))
343+
} else {
344+
Box::pin(future::err(core::Error::invalid_params("Invalid subscription id.")))
345+
}
340346
}
341347
(Some(_), None) => Box::pin(future::err(core::Error::invalid_params("Expected subscription id."))),
342348
_ => Box::pin(future::err(subscriptions_unavailable())),
@@ -392,13 +398,36 @@ mod tests {
392398
});
393399

394400
// when
395-
session.remove_subscription("test", &id);
401+
let removed = session.remove_subscription("test", &id);
396402
drop(session);
397403

398404
// then
405+
assert_eq!(removed, true);
399406
assert_eq!(called.load(Ordering::SeqCst), false);
400407
}
401408

409+
#[test]
410+
fn should_not_remove_subscription_if_invalid() {
411+
// given
412+
let id = SubscriptionId::Number(1);
413+
let called = Arc::new(AtomicBool::new(false));
414+
let called2 = called.clone();
415+
let other_session = session().0;
416+
let session = session().0;
417+
session.add_subscription("test", &id, move |id| {
418+
assert_eq!(id, SubscriptionId::Number(1));
419+
called2.store(true, Ordering::SeqCst);
420+
});
421+
422+
// when
423+
let removed = other_session.remove_subscription("test", &id);
424+
drop(session);
425+
426+
// then
427+
assert_eq!(removed, false);
428+
assert_eq!(called.load(Ordering::SeqCst), true);
429+
}
430+
402431
#[test]
403432
fn should_unregister_in_case_of_collision() {
404433
// given
@@ -485,40 +514,86 @@ mod tests {
485514
});
486515
}
487516

488-
#[derive(Clone, Default)]
489-
struct Metadata;
517+
#[derive(Clone)]
518+
struct Metadata(Arc<Session>);
490519
impl core::Metadata for Metadata {}
491520
impl PubSubMetadata for Metadata {
492521
fn session(&self) -> Option<Arc<Session>> {
493-
Some(Arc::new(session().0))
522+
Some(self.0.clone())
523+
}
524+
}
525+
impl Default for Metadata {
526+
fn default() -> Self {
527+
Self(Arc::new(session().0))
494528
}
495529
}
496530

497531
#[test]
498532
fn should_subscribe() {
499533
// given
500-
let called = Arc::new(AtomicBool::new(false));
501-
let called2 = called.clone();
502534
let (subscribe, _) = new_subscription(
503535
"test".into(),
504-
move |params, _meta, _subscriber| {
536+
move |params, _meta, subscriber: Subscriber| {
505537
assert_eq!(params, core::Params::None);
506-
called2.store(true, Ordering::SeqCst);
538+
let _sink = subscriber.assign_id(SubscriptionId::Number(5)).unwrap();
507539
},
508540
|_id, _meta| async { Ok(core::Value::Bool(true)) },
509541
);
510-
let meta = Metadata;
511542

512543
// when
544+
let meta = Metadata::default();
513545
let result = subscribe.call(core::Params::None, meta);
514546

515547
// then
516-
assert_eq!(called.load(Ordering::SeqCst), true);
548+
assert_eq!(futures::executor::block_on(result), Ok(serde_json::json!(5)));
549+
}
550+
551+
#[test]
552+
fn should_unsubscribe() {
553+
// given
554+
const SUB_ID: u64 = 5;
555+
let (subscribe, unsubscribe) = new_subscription(
556+
"test".into(),
557+
move |params, _meta, subscriber: Subscriber| {
558+
assert_eq!(params, core::Params::None);
559+
let _sink = subscriber.assign_id(SubscriptionId::Number(SUB_ID)).unwrap();
560+
},
561+
|_id, _meta| async { Ok(core::Value::Bool(true)) },
562+
);
563+
564+
// when
565+
let meta = Metadata::default();
566+
futures::executor::block_on(subscribe.call(core::Params::None, meta.clone())).unwrap();
567+
let result = unsubscribe.call(core::Params::Array(vec![serde_json::json!(SUB_ID)]), meta);
568+
569+
// then
570+
assert_eq!(futures::executor::block_on(result), Ok(serde_json::json!(true)));
571+
}
572+
573+
#[test]
574+
fn should_not_unsubscribe_if_invalid() {
575+
// given
576+
const SUB_ID: u64 = 5;
577+
let (subscribe, unsubscribe) = new_subscription(
578+
"test".into(),
579+
move |params, _meta, subscriber: Subscriber| {
580+
assert_eq!(params, core::Params::None);
581+
let _sink = subscriber.assign_id(SubscriptionId::Number(SUB_ID)).unwrap();
582+
},
583+
|_id, _meta| async { Ok(core::Value::Bool(true)) },
584+
);
585+
586+
// when
587+
let meta = Metadata::default();
588+
futures::executor::block_on(subscribe.call(core::Params::None, meta.clone())).unwrap();
589+
let result = unsubscribe.call(core::Params::Array(vec![serde_json::json!(SUB_ID + 1)]), meta);
590+
591+
// then
517592
assert_eq!(
518593
futures::executor::block_on(result),
519594
Err(core::Error {
520-
code: core::ErrorCode::ServerError(-32091),
521-
message: "Subscription rejected".into(),
595+
code: core::ErrorCode::InvalidParams,
596+
message: "Invalid subscription id.".into(),
522597
data: None,
523598
})
524599
);

0 commit comments

Comments
 (0)