|
16 | 16 | import logging
|
17 | 17 | from typing import Dict, List, Tuple
|
18 | 18 |
|
| 19 | +import aioredis |
19 | 20 | import attr
|
20 | 21 | from aiohttp import web
|
21 | 22 |
|
@@ -53,75 +54,74 @@ def _decode_hash_key(cls, hash_key: str) -> Dict[str, str]:
|
53 | 54 | key = dict(x.split("=") for x in tmp_key.split(":"))
|
54 | 55 | return key
|
55 | 56 |
|
| 57 | + @property |
| 58 | + def client(self) -> aioredis.Redis: |
| 59 | + return get_redis_client(self.app) |
| 60 | + |
56 | 61 | async def set_resource(
|
57 | 62 | self, key: Dict[str, str], resource: Tuple[str, str]
|
58 | 63 | ) -> None:
|
59 |
| - client = get_redis_client(self.app) |
60 | 64 | hash_key = f"{self._hash_key(key)}:{RESOURCE_SUFFIX}"
|
61 |
| - await client.hmset_dict(hash_key, **{resource[0]: resource[1]}) |
| 65 | + field, value = resource |
| 66 | + await self.client.hmset_dict(hash_key, **{field: value}) |
62 | 67 |
|
63 | 68 | async def get_resources(self, key: Dict[str, str]) -> Dict[str, str]:
|
64 |
| - client = get_redis_client(self.app) |
65 | 69 | hash_key = f"{self._hash_key(key)}:{RESOURCE_SUFFIX}"
|
66 |
| - return await client.hgetall(hash_key) |
| 70 | + return await self.client.hgetall(hash_key) |
67 | 71 |
|
68 | 72 | async def remove_resource(self, key: Dict[str, str], resource_name: str) -> None:
|
69 |
| - client = get_redis_client(self.app) |
70 | 73 | hash_key = f"{self._hash_key(key)}:{RESOURCE_SUFFIX}"
|
71 |
| - await client.hdel(hash_key, resource_name) |
| 74 | + await self.client.hdel(hash_key, resource_name) |
72 | 75 |
|
73 | 76 | async def find_resources(
|
74 | 77 | self, key: Dict[str, str], resource_name: str
|
75 | 78 | ) -> List[str]:
|
76 |
| - client = get_redis_client(self.app) |
77 | 79 | resources = []
|
78 | 80 | # the key might only be partialy complete
|
79 | 81 | partial_hash_key = f"{self._hash_key(key)}:{RESOURCE_SUFFIX}"
|
80 |
| - async for key in client.iscan(match=partial_hash_key): |
81 |
| - if await client.hexists(key, resource_name): |
82 |
| - resources.append(await client.hget(key, resource_name)) |
| 82 | + async for key in self.client.iscan(match=partial_hash_key): |
| 83 | + if await self.client.hexists(key, resource_name): |
| 84 | + resources.append(await self.client.hget(key, resource_name)) |
83 | 85 | return resources
|
84 | 86 |
|
85 | 87 | async def find_keys(self, resource: Tuple[str, str]) -> List[Dict[str, str]]:
|
86 | 88 | keys = []
|
87 | 89 | if not resource:
|
88 | 90 | return keys
|
89 |
| - client = get_redis_client(self.app) |
90 |
| - async for hash_key in client.iscan(match=f"*:{RESOURCE_SUFFIX}"): |
91 |
| - if resource[1] == await client.hget(hash_key, resource[0]): |
| 91 | + |
| 92 | + field, value = resource |
| 93 | + |
| 94 | + async for hash_key in self.client.iscan(match=f"*:{RESOURCE_SUFFIX}"): |
| 95 | + if value == await self.client.hget(hash_key, field): |
92 | 96 | keys.append(self._decode_hash_key(hash_key))
|
93 | 97 | return keys
|
94 | 98 |
|
95 | 99 | async def set_key_alive(self, key: Dict[str, str], timeout: int) -> None:
|
96 | 100 | # setting the timeout to always expire, timeout > 0
|
97 | 101 | timeout = int(max(1, timeout))
|
98 |
| - client = get_redis_client(self.app) |
99 | 102 | hash_key = f"{self._hash_key(key)}:{ALIVE_SUFFIX}"
|
100 |
| - await client.set(hash_key, 1, expire=timeout) |
| 103 | + await self.client.set(hash_key, 1, expire=timeout) |
101 | 104 |
|
102 | 105 | async def is_key_alive(self, key: Dict[str, str]) -> bool:
|
103 |
| - client = get_redis_client(self.app) |
104 | 106 | hash_key = f"{self._hash_key(key)}:{ALIVE_SUFFIX}"
|
105 |
| - return await client.exists(hash_key) > 0 |
| 107 | + return await self.client.exists(hash_key) > 0 |
106 | 108 |
|
107 | 109 | async def remove_key(self, key: Dict[str, str]) -> None:
|
108 |
| - client = get_redis_client(self.app) |
109 |
| - await client.delete( |
| 110 | + await self.client.delete( |
110 | 111 | f"{self._hash_key(key)}:{RESOURCE_SUFFIX}",
|
111 | 112 | f"{self._hash_key(key)}:{ALIVE_SUFFIX}",
|
112 | 113 | )
|
113 | 114 |
|
114 | 115 | async def get_all_resource_keys(
|
115 | 116 | self,
|
116 | 117 | ) -> Tuple[List[Dict[str, str]], List[Dict[str, str]]]:
|
117 |
| - client = get_redis_client(self.app) |
118 | 118 | alive_keys = [
|
119 | 119 | self._decode_hash_key(hash_key)
|
120 |
| - async for hash_key in client.iscan(match=f"*:{ALIVE_SUFFIX}") |
| 120 | + async for hash_key in self.client.iscan(match=f"*:{ALIVE_SUFFIX}") |
121 | 121 | ]
|
122 | 122 | dead_keys = [
|
123 | 123 | self._decode_hash_key(hash_key)
|
124 |
| - async for hash_key in client.iscan(match=f"*:{RESOURCE_SUFFIX}") |
| 124 | + async for hash_key in self.client.iscan(match=f"*:{RESOURCE_SUFFIX}") |
125 | 125 | if self._decode_hash_key(hash_key) not in alive_keys
|
126 | 126 | ]
|
127 | 127 |
|
|
0 commit comments