From 3db9eedd74b5b2bf35ddf1ba30ca99c661ee35ed Mon Sep 17 00:00:00 2001 From: rmorshea Date: Sat, 25 Sep 2021 14:59:17 -0700 Subject: [PATCH] clean up patch queues after exit also does some internal code cleanup and adds a dedicated Stop exception. --- src/idom/__init__.py | 2 + src/idom/core/dispatcher.py | 86 +++++++++++++++++++----------- tests/test_core/test_dispatcher.py | 48 ++++++++++++++++- 3 files changed, 105 insertions(+), 31 deletions(-) diff --git a/src/idom/__init__.py b/src/idom/__init__.py index 6768ede16..022d92cb0 100644 --- a/src/idom/__init__.py +++ b/src/idom/__init__.py @@ -1,6 +1,7 @@ from . import config, html, log, web from .core import hooks from .core.component import Component, component +from .core.dispatcher import Stop from .core.events import EventHandler, event from .core.layout import Layout from .core.vdom import vdom @@ -27,6 +28,7 @@ "multiview", "Ref", "run", + "Stop", "vdom", "web", ] diff --git a/src/idom/core/dispatcher.py b/src/idom/core/dispatcher.py index e8b03d211..10a3acf6b 100644 --- a/src/idom/core/dispatcher.py +++ b/src/idom/core/dispatcher.py @@ -36,7 +36,21 @@ SendCoroutine = Callable[["VdomJsonPatch"], Awaitable[None]] +"""Send model patches given by a dispatcher""" + RecvCoroutine = Callable[[], Awaitable[LayoutEvent]] +"""Called by a dispatcher to return a :class:`idom.core.layout.LayoutEvent` + +The event will then trigger an :class:`idom.core.proto.EventHandlerType` in a layout. +""" + + +class Stop(BaseException): + """Stop dispatching changes and events + + Raising this error will tell dispatchers to gracefully exit. Typically this is + called by code running inside a layout to tell it to stop rendering. + """ async def dispatch_single_view( @@ -46,9 +60,12 @@ async def dispatch_single_view( ) -> None: """Run a dispatch loop for a single view instance""" with layout: - async with create_task_group() as task_group: - task_group.start_soon(_single_outgoing_loop, layout, send) - task_group.start_soon(_single_incoming_loop, layout, recv) + try: + async with create_task_group() as task_group: + task_group.start_soon(_single_outgoing_loop, layout, send) + task_group.start_soon(_single_incoming_loop, layout, recv) + except Stop: + logger.info("Stopped dispatch task") SharedViewDispatcher = Callable[[SendCoroutine, RecvCoroutine], Awaitable[None]] @@ -63,9 +80,8 @@ async def create_shared_view_dispatcher( with layout: ( dispatch_shared_view, - model_state, - all_patch_queues, - ) = await _make_shared_view_dispatcher(layout) + send_patch, + ) = await _create_shared_view_dispatcher(layout) dispatch_tasks: List[Future[None]] = [] @@ -95,34 +111,35 @@ def dispatch_shared_view_soon( else: patch = VdomJsonPatch.create_from(update_future.result()) - model_state.current = patch.apply_to(model_state.current) - # push updates to all dispatcher callbacks - for queue in all_patch_queues: - queue.put_nowait(patch) + send_patch(patch) def ensure_shared_view_dispatcher_future( layout: LayoutType[LayoutUpdate, LayoutEvent], ) -> Tuple[Future[None], SharedViewDispatcher]: - """Ensure the future of a dispatcher created by :func:`create_shared_view_dispatcher`""" + """Ensure the future of a dispatcher made by :func:`create_shared_view_dispatcher` + + This returns a future that can be awaited to block until all dispatch tasks have + completed as well as the dispatcher coroutine itself which is used to start dispatch + tasks. + + This is required in situations where usage of the async context manager from + :func:`create_shared_view_dispatcher` is not possible. Typically this happens when + integrating IDOM with other frameworks, servers, or applications. + """ dispatcher_future: Future[SharedViewDispatcher] = Future() async def dispatch_shared_view_forever() -> None: with layout: ( dispatch_shared_view, - model_state, - all_patch_queues, - ) = await _make_shared_view_dispatcher(layout) + send_patch, + ) = await _create_shared_view_dispatcher(layout) dispatcher_future.set_result(dispatch_shared_view) while True: - patch = await render_json_patch(layout) - model_state.current = patch.apply_to(model_state.current) - # push updates to all dispatcher callbacks - for queue in all_patch_queues: - queue.put_nowait(patch) + send_patch(await render_json_patch(layout)) async def dispatch(send: SendCoroutine, recv: RecvCoroutine) -> None: await (await dispatcher_future)(send, recv) @@ -159,28 +176,37 @@ def create_from(cls, update: LayoutUpdate) -> VdomJsonPatch: return cls(update.path, make_patch(update.old or {}, update.new).patch) -async def _make_shared_view_dispatcher( +async def _create_shared_view_dispatcher( layout: LayoutType[LayoutUpdate, LayoutEvent], -) -> Tuple[SharedViewDispatcher, Ref[Any], WeakSet[Queue[VdomJsonPatch]]]: +) -> Tuple[SharedViewDispatcher, Callable[[VdomJsonPatch], None]]: update = await layout.render() model_state = Ref(update.new) # We push updates to queues instead of pushing directly to send() callbacks in - # order to isolate the render loop from any errors dispatch callbacks might - # raise. + # order to isolate send_patch() from any errors send() callbacks might raise. all_patch_queues: WeakSet[Queue[VdomJsonPatch]] = WeakSet() async def dispatch_shared_view(send: SendCoroutine, recv: RecvCoroutine) -> None: patch_queue: Queue[VdomJsonPatch] = Queue() - async with create_task_group() as inner_task_group: - all_patch_queues.add(patch_queue) - effective_update = LayoutUpdate("", None, model_state.current) - await send(VdomJsonPatch.create_from(effective_update)) - inner_task_group.start_soon(_single_incoming_loop, layout, recv) - inner_task_group.start_soon(_shared_outgoing_loop, send, patch_queue) + try: + async with create_task_group() as inner_task_group: + all_patch_queues.add(patch_queue) + effective_update = LayoutUpdate("", None, model_state.current) + await send(VdomJsonPatch.create_from(effective_update)) + inner_task_group.start_soon(_single_incoming_loop, layout, recv) + inner_task_group.start_soon(_shared_outgoing_loop, send, patch_queue) + except Stop: + logger.info("Stopped dispatch task") + finally: + all_patch_queues.remove(patch_queue) return None - return dispatch_shared_view, model_state, all_patch_queues + def send_patch(patch: VdomJsonPatch) -> None: + model_state.current = patch.apply_to(model_state.current) + for queue in all_patch_queues: + queue.put_nowait(patch) + + return dispatch_shared_view, send_patch async def _single_outgoing_loop( diff --git a/tests/test_core/test_dispatcher.py b/tests/test_core/test_dispatcher.py index 35a6171c6..0d613dfa3 100644 --- a/tests/test_core/test_dispatcher.py +++ b/tests/test_core/test_dispatcher.py @@ -1,4 +1,5 @@ import asyncio +import sys from typing import Any, Sequence import pytest @@ -6,6 +7,7 @@ import idom from idom.core.dispatcher import ( VdomJsonPatch, + _create_shared_view_dispatcher, create_shared_view_dispatcher, dispatch_single_view, ensure_shared_view_dispatcher_future, @@ -39,7 +41,7 @@ async def send(patch): changes.append(patch) sem.release() if not events_to_inject: - raise asyncio.CancelledError() + raise idom.Stop() async def recv(): await sem.acquire() @@ -130,3 +132,47 @@ async def test_ensure_shared_view_dispatcher_future(): assert_changes_produce_expected_model(changes_1, model) assert_changes_produce_expected_model(changes_2, model) + + +async def test_private_create_shared_view_dispatcher_cleans_up_patch_queues(): + """Report an issue if this test breaks + + Some internals of idom.core.dispatcher may need to be changed in order to make some + internal state easier to introspect. + + Ideally we would just check if patch queues are getting cleaned up more directly, + but without having access to that, we use some side effects to try and infer whether + it happens. + """ + + @idom.component + def SomeComponent(): + return idom.html.div() + + async def send(patch): + raise idom.Stop() + + async def recv(): + return LayoutEvent("something", []) + + with idom.Layout(SomeComponent()) as layout: + dispatch_shared_view, push_patch = await _create_shared_view_dispatcher(layout) + + # Dispatch a view that should exit. After exiting its patch queue should be + # cleaned up and removed. Since we only dispatched one view there should be + # no patch queues. + await dispatch_shared_view(send, recv) # this should stop immediately + + # We create a patch and check its ref count. We will check this after attempting + # to push out the change. + patch = VdomJsonPatch("anything", []) + patch_ref_count = sys.getrefcount(patch) + + # We push out this change. + push_patch(patch) + + # Because there should be no patch queues, we expect that the ref count remains + # the same. If the ref count had increased, then we would know that the patch + # queue has not been cleaned up and that the patch we just pushed was added to + # it. + assert not sys.getrefcount(patch) > patch_ref_count