Skip to content

Commit b911230

Browse files
Chris DrouillardCQ Bot
Chris Drouillard
authored and
CQ Bot
committed
[vfs] Reduce the size of idle connections
Future objects in rust can easily grow to be several KBs. If we have a thousand open connections each being handled by a multi KB future then we're using MBs of memory. Most of this memory is wasted because the connections sit idle most of the time and it's unlikely that more than a dozen connections will ever be handling a request at the same time. This change introduces a RequestListener that is optimized to reduce the memory usage of idle connections. The RequestListener dynamically heap allocates the futures for handling requests so the memory of these futures isn't consumed while the connections are idle. RequestListener manually implements Future to work around rust-lang/rust#62958 and rust-lang/rust#108906. This further reduces the size of an idle RequestListener to typically less than 80 bytes. RequestListener only reduces the idle memory overhead of the futures downstream from itself. There's still work to be done to reduce the size of futures upstream from the RequestListeners. Bug: 398886282 Change-Id: I30bd18099de05bd57639df69d88d011f21c03335 Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/1198690 Commit-Queue: Chris Drouillard <[email protected]> Reviewed-by: Chris Suter <[email protected]>
1 parent 38be7e7 commit b911230

File tree

8 files changed

+547
-152
lines changed

8 files changed

+547
-152
lines changed

src/storage/lib/vfs/rust/BUILD.gn

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ common_vfs_args = {
7070
"src/pseudo_directory.rs",
7171
"src/remote.rs",
7272
"src/remote/tests.rs",
73+
"src/request_handler.rs",
7374
"src/service.rs",
7475
"src/service/tests.rs",
7576
"src/service/tests/direct_connection.rs",

src/storage/lib/vfs/rust/src/directory/immutable/connection.rs

+20-16
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ use crate::directory::connection::{BaseConnection, ConnectionState};
99
use crate::directory::entry_container::Directory;
1010
use crate::execution_scope::ExecutionScope;
1111
use crate::node::OpenNode;
12+
use crate::request_handler::{RequestHandler, RequestListener};
1213
use crate::{ObjectRequestRef, ProtocolsExt};
1314

1415
use fidl_fuchsia_io as fio;
1516
use fio::DirectoryRequest;
16-
use futures::TryStreamExt;
1717
use std::future::Future;
18-
use std::pin::pin;
18+
use std::ops::ControlFlow;
19+
use std::pin::Pin;
1920
use std::sync::Arc;
2021
use zx_status::Status;
2122

@@ -24,19 +25,6 @@ pub struct ImmutableConnection<DirectoryType: Directory> {
2425
}
2526

2627
impl<DirectoryType: Directory> ImmutableConnection<DirectoryType> {
27-
async fn handle_requests<RS>(mut self, mut requests: RS)
28-
where
29-
RS: futures::stream::Stream<Item = Result<DirectoryRequest, fidl::Error>>,
30-
{
31-
let mut requests = pin!(requests);
32-
while let Ok(Some(request)) = requests.try_next().await {
33-
let _guard = self.base.scope.active_guard();
34-
if !matches!(self.base.handle_request(request).await, Ok(ConnectionState::Alive)) {
35-
break;
36-
}
37-
}
38-
}
39-
4028
pub fn create(
4129
scope: ExecutionScope,
4230
directory: Arc<DirectoryType>,
@@ -81,8 +69,24 @@ impl<DirectoryType: Directory> ImmutableConnection<DirectoryType> {
8169
let object_request = object_request.take();
8270
Ok(async move {
8371
if let Ok(requests) = object_request.into_request_stream(&connection.base).await {
84-
connection.handle_requests(transform(requests)).await;
72+
RequestListener::new(transform(requests), connection).await;
8573
}
8674
})
8775
}
8876
}
77+
78+
impl<DirectoryType: Directory> RequestHandler for ImmutableConnection<DirectoryType> {
79+
type Request = Result<DirectoryRequest, fidl::Error>;
80+
81+
async fn handle_request(self: Pin<&mut Self>, request: Self::Request) -> ControlFlow<()> {
82+
let this = self.get_mut();
83+
let _guard = this.base.scope.active_guard();
84+
match request {
85+
Ok(request) => match this.base.handle_request(request).await {
86+
Ok(ConnectionState::Alive) => ControlFlow::Continue(()),
87+
Ok(ConnectionState::Closed) | Err(_) => ControlFlow::Break(()),
88+
},
89+
Err(_) => ControlFlow::Break(()),
90+
}
91+
}
92+
}

src/storage/lib/vfs/rust/src/directory/mutable/connection.rs

+23-28
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,21 @@ use crate::execution_scope::ExecutionScope;
1414
use crate::name::validate_name;
1515
use crate::node::OpenNode;
1616
use crate::path::Path;
17+
use crate::request_handler::{RequestHandler, RequestListener};
1718
use crate::token_registry::{TokenInterface, TokenRegistry, Tokenizable};
1819
use crate::{ObjectRequestRef, ProtocolsExt};
1920

2021
use anyhow::Error;
2122
use fidl::endpoints::ServerEnd;
2223
use fidl::Handle;
2324
use fidl_fuchsia_io as fio;
24-
use futures::{pin_mut, TryStreamExt as _};
25-
use pin_project::pin_project;
2625
use std::future::Future;
26+
use std::ops::ControlFlow;
2727
use std::pin::Pin;
2828
use std::sync::Arc;
2929
use storage_trace::{self as trace, TraceFutureExt};
3030
use zx_status::Status;
3131

32-
#[pin_project]
3332
pub struct MutableConnection<DirectoryType: MutableDirectory> {
3433
base: BaseConnection<DirectoryType>,
3534
}
@@ -51,7 +50,7 @@ impl<DirectoryType: MutableDirectory> MutableConnection<DirectoryType> {
5150
let object_request = object_request.take();
5251
Ok(async move {
5352
if let Ok(requests) = object_request.into_request_stream(&connection.base).await {
54-
connection.handle_requests(requests).await
53+
RequestListener::new(requests, Tokenizable::new(connection)).await;
5554
}
5655
})
5756
}
@@ -62,7 +61,7 @@ impl<DirectoryType: MutableDirectory> MutableConnection<DirectoryType> {
6261
) -> Result<ConnectionState, Error> {
6362
match request {
6463
fio::DirectoryRequest::Unlink { name, options, responder } => {
65-
let result = this.as_mut().handle_unlink(name, options).await;
64+
let result = this.handle_unlink(name, options).await;
6665
responder.send(result.map_err(Status::into_raw))?;
6766
}
6867
fio::DirectoryRequest::GetToken { responder } => {
@@ -78,7 +77,6 @@ impl<DirectoryType: MutableDirectory> MutableConnection<DirectoryType> {
7877
}
7978
fio::DirectoryRequest::SetAttr { flags, attributes, responder } => {
8079
let status = match this
81-
.as_mut()
8280
.handle_update_attributes(io1_to_io2_attrs(flags, attributes))
8381
.await
8482
{
@@ -99,8 +97,7 @@ impl<DirectoryType: MutableDirectory> MutableConnection<DirectoryType> {
9997
responder.send(Err(Status::INVALID_ARGS.into_raw()))?;
10098
} else {
10199
responder.send(
102-
this.as_mut()
103-
.base
100+
this.base
104101
.directory
105102
.create_symlink(name, target, connection)
106103
.await
@@ -148,10 +145,7 @@ impl<DirectoryType: MutableDirectory> MutableConnection<DirectoryType> {
148145
fio::DirectoryRequest::UpdateAttributes { payload, responder } => {
149146
async move {
150147
responder.send(
151-
this.as_mut()
152-
.handle_update_attributes(payload)
153-
.await
154-
.map_err(Status::into_raw),
148+
this.handle_update_attributes(payload).await.map_err(Status::into_raw),
155149
)
156150
}
157151
.trace(trace::trace_future_args!(c"storage", c"Directory::UpdateAttributes"))
@@ -165,7 +159,7 @@ impl<DirectoryType: MutableDirectory> MutableConnection<DirectoryType> {
165159
}
166160

167161
async fn handle_update_attributes(
168-
self: Pin<&mut Self>,
162+
&self,
169163
attributes: fio::MutableNodeAttributes,
170164
) -> Result<(), Status> {
171165
if !self.base.options.rights.contains(fio::Operations::UPDATE_ATTRIBUTES) {
@@ -176,11 +170,7 @@ impl<DirectoryType: MutableDirectory> MutableConnection<DirectoryType> {
176170
self.base.directory.update_attributes(attributes).await
177171
}
178172

179-
async fn handle_unlink(
180-
self: Pin<&mut Self>,
181-
name: String,
182-
options: fio::UnlinkOptions,
183-
) -> Result<(), Status> {
173+
async fn handle_unlink(&self, name: String, options: fio::UnlinkOptions) -> Result<(), Status> {
184174
if !self.base.options.rights.contains(fio::Rights::MODIFY_DIRECTORY) {
185175
return Err(Status::BAD_HANDLE);
186176
}
@@ -277,18 +267,23 @@ impl<DirectoryType: MutableDirectory> MutableConnection<DirectoryType> {
277267
async fn handle_remove_extended_attribute(&self, name: Vec<u8>) -> Result<(), Status> {
278268
self.base.directory.remove_extended_attribute(name).await
279269
}
270+
}
271+
272+
impl<DirectoryType: MutableDirectory> RequestHandler
273+
for Tokenizable<MutableConnection<DirectoryType>>
274+
{
275+
type Request = Result<fio::DirectoryRequest, fidl::Error>;
280276

281-
async fn handle_requests(self, mut requests: fio::DirectoryRequestStream) {
282-
let this = Tokenizable::new(self);
283-
pin_mut!(this);
284-
while let Ok(Some(request)) = requests.try_next().await {
285-
let _guard = this.base.scope.active_guard();
286-
if !matches!(
287-
Self::handle_request(Pin::as_mut(&mut this), request).await,
288-
Ok(ConnectionState::Alive)
289-
) {
290-
break;
277+
async fn handle_request(self: Pin<&mut Self>, request: Self::Request) -> ControlFlow<()> {
278+
let _guard = self.base.scope.active_guard();
279+
match request {
280+
Ok(request) => {
281+
match MutableConnection::<DirectoryType>::handle_request(self, request).await {
282+
Ok(ConnectionState::Alive) => ControlFlow::Continue(()),
283+
Ok(ConnectionState::Closed) | Err(_) => ControlFlow::Break(()),
284+
}
291285
}
286+
Err(_) => ControlFlow::Break(()),
292287
}
293288
}
294289
}

0 commit comments

Comments
 (0)