Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 5458eb8

Browse files
authored
Fix 'Unhandled error in Deferred' (#12089)
* Fix 'Unhandled error in Deferred' Fixes a CRITICAL "Unhandled error in Deferred" log message which happened when a function wrapped with `@cachedList` failed * Minor optimisation to cachedListDescriptor we can avoid re-using `missing`, which saves looking up entries in `deferreds_map`, and means we don't need to copy it. * Improve type annotation on CachedListDescriptor
1 parent 9d11fee commit 5458eb8

File tree

3 files changed

+38
-37
lines changed

3 files changed

+38
-37
lines changed

changelog.d/12089.bugfix

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix occasional 'Unhandled error in Deferred' error message.

synapse/util/caches/descriptors.py

+32-32
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import logging
1919
from typing import (
2020
Any,
21+
Awaitable,
2122
Callable,
2223
Dict,
2324
Generic,
@@ -346,15 +347,15 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
346347
"""Wraps an existing cache to support bulk fetching of keys.
347348
348349
Given an iterable of keys it looks in the cache to find any hits, then passes
349-
the tuple of missing keys to the wrapped function.
350+
the set of missing keys to the wrapped function.
350351
351-
Once wrapped, the function returns a Deferred which resolves to the list
352-
of results.
352+
Once wrapped, the function returns a Deferred which resolves to a Dict mapping from
353+
input key to output value.
353354
"""
354355

355356
def __init__(
356357
self,
357-
orig: Callable[..., Any],
358+
orig: Callable[..., Awaitable[Dict]],
358359
cached_method_name: str,
359360
list_name: str,
360361
num_args: Optional[int] = None,
@@ -385,13 +386,13 @@ def __init__(
385386

386387
def __get__(
387388
self, obj: Optional[Any], objtype: Optional[Type] = None
388-
) -> Callable[..., Any]:
389+
) -> Callable[..., "defer.Deferred[Dict[Hashable, Any]]"]:
389390
cached_method = getattr(obj, self.cached_method_name)
390391
cache: DeferredCache[CacheKey, Any] = cached_method.cache
391392
num_args = cached_method.num_args
392393

393394
@functools.wraps(self.orig)
394-
def wrapped(*args: Any, **kwargs: Any) -> Any:
395+
def wrapped(*args: Any, **kwargs: Any) -> "defer.Deferred[Dict]":
395396
# If we're passed a cache_context then we'll want to call its
396397
# invalidate() whenever we are invalidated
397398
invalidate_callback = kwargs.pop("on_invalidate", None)
@@ -444,39 +445,38 @@ def arg_to_cache_key(arg: Hashable) -> Hashable:
444445
deferred: "defer.Deferred[Any]" = defer.Deferred()
445446
deferreds_map[arg] = deferred
446447
key = arg_to_cache_key(arg)
447-
cache.set(key, deferred, callback=invalidate_callback)
448+
cached_defers.append(
449+
cache.set(key, deferred, callback=invalidate_callback)
450+
)
448451

449452
def complete_all(res: Dict[Hashable, Any]) -> None:
450-
# the wrapped function has completed. It returns a
451-
# a dict. We can now resolve the observable deferreds in
452-
# the cache and update our own result map.
453-
for e in missing:
453+
# the wrapped function has completed. It returns a dict.
454+
# We can now update our own result map, and then resolve the
455+
# observable deferreds in the cache.
456+
for e, d1 in deferreds_map.items():
454457
val = res.get(e, None)
455-
deferreds_map[e].callback(val)
458+
# make sure we update the results map before running the
459+
# deferreds, because as soon as we run the last deferred, the
460+
# gatherResults() below will complete and return the result
461+
# dict to our caller.
456462
results[e] = val
463+
d1.callback(val)
457464

458-
def errback(f: Failure) -> Failure:
459-
# the wrapped function has failed. Invalidate any cache
460-
# entries we're supposed to be populating, and fail
461-
# their deferreds.
462-
for e in missing:
463-
key = arg_to_cache_key(e)
464-
cache.invalidate(key)
465-
deferreds_map[e].errback(f)
466-
467-
# return the failure, to propagate to our caller.
468-
return f
465+
def errback_all(f: Failure) -> None:
466+
# the wrapped function has failed. Propagate the failure into
467+
# the cache, which will invalidate the entry, and cause the
468+
# relevant cached_deferreds to fail, which will propagate the
469+
# failure to our caller.
470+
for d1 in deferreds_map.values():
471+
d1.errback(f)
469472

470473
args_to_call = dict(arg_dict)
471-
# copy the missing set before sending it to the callee, to guard against
472-
# modification.
473-
args_to_call[self.list_name] = tuple(missing)
474-
475-
cached_defers.append(
476-
defer.maybeDeferred(
477-
preserve_fn(self.orig), **args_to_call
478-
).addCallbacks(complete_all, errback)
479-
)
474+
args_to_call[self.list_name] = missing
475+
476+
# dispatch the call, and attach the two handlers
477+
defer.maybeDeferred(
478+
preserve_fn(self.orig), **args_to_call
479+
).addCallbacks(complete_all, errback_all)
480480

481481
if cached_defers:
482482
d = defer.gatherResults(cached_defers, consumeErrors=True).addCallbacks(

tests/util/caches/test_descriptors.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -673,14 +673,14 @@ async def list_fn(self, args1, arg2):
673673
self.assertEqual(current_context(), SENTINEL_CONTEXT)
674674
r = yield d1
675675
self.assertEqual(current_context(), c1)
676-
obj.mock.assert_called_once_with((10, 20), 2)
676+
obj.mock.assert_called_once_with({10, 20}, 2)
677677
self.assertEqual(r, {10: "fish", 20: "chips"})
678678
obj.mock.reset_mock()
679679

680680
# a call with different params should call the mock again
681681
obj.mock.return_value = {30: "peas"}
682682
r = yield obj.list_fn([20, 30], 2)
683-
obj.mock.assert_called_once_with((30,), 2)
683+
obj.mock.assert_called_once_with({30}, 2)
684684
self.assertEqual(r, {20: "chips", 30: "peas"})
685685
obj.mock.reset_mock()
686686

@@ -701,7 +701,7 @@ async def list_fn(self, args1, arg2):
701701
obj.mock.return_value = {40: "gravy"}
702702
iterable = (x for x in [10, 40, 40])
703703
r = yield obj.list_fn(iterable, 2)
704-
obj.mock.assert_called_once_with((40,), 2)
704+
obj.mock.assert_called_once_with({40}, 2)
705705
self.assertEqual(r, {10: "fish", 40: "gravy"})
706706

707707
def test_concurrent_lookups(self):
@@ -729,7 +729,7 @@ def list_fn(self, args1) -> "Deferred[dict]":
729729
d3 = obj.list_fn([10])
730730

731731
# the mock should have been called exactly once
732-
obj.mock.assert_called_once_with((10,))
732+
obj.mock.assert_called_once_with({10})
733733
obj.mock.reset_mock()
734734

735735
# ... and none of the calls should yet be complete
@@ -771,7 +771,7 @@ async def list_fn(self, args1, arg2):
771771
# cache miss
772772
obj.mock.return_value = {10: "fish", 20: "chips"}
773773
r1 = yield obj.list_fn([10, 20], 2, on_invalidate=invalidate0)
774-
obj.mock.assert_called_once_with((10, 20), 2)
774+
obj.mock.assert_called_once_with({10, 20}, 2)
775775
self.assertEqual(r1, {10: "fish", 20: "chips"})
776776
obj.mock.reset_mock()
777777

0 commit comments

Comments
 (0)