Skip to content

Commit d5c2d1d

Browse files
shacharPashchayimdvora-h
authored
Adding support for triggered functions (TFUNCTION) (redis#2861)
Co-authored-by: Chayim I. Kirshen <[email protected]> Co-authored-by: dvora-h <[email protected]> Co-authored-by: Chayim <[email protected]>
1 parent b0abd55 commit d5c2d1d

File tree

6 files changed

+246
-0
lines changed

6 files changed

+246
-0
lines changed

dockers/cluster.redis.conf

+1
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ loadmodule /opt/redis-stack/lib/redisgraph.so
55
loadmodule /opt/redis-stack/lib/redistimeseries.so
66
loadmodule /opt/redis-stack/lib/rejson.so
77
loadmodule /opt/redis-stack/lib/redisbloom.so
8+
loadmodule /opt/redis-stack/lib/redisgears.so v8-plugin-path /opt/redis-stack/lib/libredisgears_v8_plugin.so

redis/cluster.py

+6
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,11 @@ class AbstractRedisCluster:
282282
"READONLY",
283283
"READWRITE",
284284
"TIME",
285+
"TFUNCTION LOAD",
286+
"TFUNCTION DELETE",
287+
"TFUNCTION LIST",
288+
"TFCALL",
289+
"TFCALLASYNC",
285290
"GRAPH.CONFIG",
286291
"LATENCY HISTORY",
287292
"LATENCY LATEST",
@@ -298,6 +303,7 @@ class AbstractRedisCluster:
298303
"FUNCTION LIST",
299304
"FUNCTION LOAD",
300305
"FUNCTION RESTORE",
306+
"REDISGEARS_2.REFRESHCLUSTER",
301307
"SCAN",
302308
"SCRIPT EXISTS",
303309
"SCRIPT FLUSH",

redis/commands/cluster.py

+10
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@
3030
AsyncACLCommands,
3131
AsyncDataAccessCommands,
3232
AsyncFunctionCommands,
33+
AsyncGearsCommands,
3334
AsyncManagementCommands,
3435
AsyncScriptCommands,
3536
DataAccessCommands,
3637
FunctionCommands,
38+
GearsCommands,
3739
ManagementCommands,
3840
PubSubCommands,
3941
ResponseT,
@@ -689,6 +691,12 @@ def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
689691
self.read_from_replicas = False
690692
return self.execute_command("READWRITE", target_nodes=target_nodes)
691693

694+
def gears_refresh_cluster(self, **kwargs) -> ResponseT:
695+
"""
696+
On an OSS cluster, before executing any gears function, you must call this command. # noqa
697+
"""
698+
return self.execute_command("REDISGEARS_2.REFRESHCLUSTER", **kwargs)
699+
692700

693701
class AsyncClusterManagementCommands(
694702
ClusterManagementCommands, AsyncManagementCommands
@@ -864,6 +872,7 @@ class RedisClusterCommands(
864872
ClusterDataAccessCommands,
865873
ScriptCommands,
866874
FunctionCommands,
875+
GearsCommands,
867876
RedisModuleCommands,
868877
):
869878
"""
@@ -893,6 +902,7 @@ class AsyncRedisClusterCommands(
893902
AsyncClusterDataAccessCommands,
894903
AsyncScriptCommands,
895904
AsyncFunctionCommands,
905+
AsyncGearsCommands,
896906
):
897907
"""
898908
A class for all Redis Cluster commands

redis/commands/core.py

+127
Original file line numberDiff line numberDiff line change
@@ -6105,6 +6105,131 @@ def function_stats(self) -> Union[Awaitable[List], List]:
61056105
AsyncFunctionCommands = FunctionCommands
61066106

61076107

6108+
class GearsCommands:
6109+
def tfunction_load(
6110+
self, lib_code: str, replace: bool = False, config: Union[str, None] = None
6111+
) -> ResponseT:
6112+
"""
6113+
Load a new library to RedisGears.
6114+
6115+
``lib_code`` - the library code.
6116+
``config`` - a string representation of a JSON object
6117+
that will be provided to the library on load time,
6118+
for more information refer to
6119+
https://github.com/RedisGears/RedisGears/blob/master/docs/function_advance_topics.md#library-configuration
6120+
``replace`` - an optional argument, instructs RedisGears to replace the
6121+
function if its already exists
6122+
6123+
For more information see https://redis.io/commands/tfunction-load/
6124+
"""
6125+
pieces = []
6126+
if replace:
6127+
pieces.append("REPLACE")
6128+
if config is not None:
6129+
pieces.extend(["CONFIG", config])
6130+
pieces.append(lib_code)
6131+
return self.execute_command("TFUNCTION LOAD", *pieces)
6132+
6133+
def tfunction_delete(self, lib_name: str) -> ResponseT:
6134+
"""
6135+
Delete a library from RedisGears.
6136+
6137+
``lib_name`` the library name to delete.
6138+
6139+
For more information see https://redis.io/commands/tfunction-delete/
6140+
"""
6141+
return self.execute_command("TFUNCTION DELETE", lib_name)
6142+
6143+
def tfunction_list(
6144+
self,
6145+
with_code: bool = False,
6146+
verbose: int = 0,
6147+
lib_name: Union[str, None] = None,
6148+
) -> ResponseT:
6149+
"""
6150+
List the functions with additional information about each function.
6151+
6152+
``with_code`` Show libraries code.
6153+
``verbose`` output verbosity level, higher number will increase verbosity level
6154+
``lib_name`` specifying a library name (can be used multiple times to show multiple libraries in a single command) # noqa
6155+
6156+
For more information see https://redis.io/commands/tfunction-list/
6157+
"""
6158+
pieces = []
6159+
if with_code:
6160+
pieces.append("WITHCODE")
6161+
if verbose >= 1 and verbose <= 3:
6162+
pieces.append("v" * verbose)
6163+
else:
6164+
raise DataError("verbose can be 1, 2 or 3")
6165+
if lib_name is not None:
6166+
pieces.append("LIBRARY")
6167+
pieces.append(lib_name)
6168+
6169+
return self.execute_command("TFUNCTION LIST", *pieces)
6170+
6171+
def _tfcall(
6172+
self,
6173+
lib_name: str,
6174+
func_name: str,
6175+
keys: KeysT = None,
6176+
_async: bool = False,
6177+
*args: List,
6178+
) -> ResponseT:
6179+
pieces = [f"{lib_name}.{func_name}"]
6180+
if keys is not None:
6181+
pieces.append(len(keys))
6182+
pieces.extend(keys)
6183+
else:
6184+
pieces.append(0)
6185+
if args is not None:
6186+
pieces.extend(args)
6187+
if _async:
6188+
return self.execute_command("TFCALLASYNC", *pieces)
6189+
return self.execute_command("TFCALL", *pieces)
6190+
6191+
def tfcall(
6192+
self,
6193+
lib_name: str,
6194+
func_name: str,
6195+
keys: KeysT = None,
6196+
*args: List,
6197+
) -> ResponseT:
6198+
"""
6199+
Invoke a function.
6200+
6201+
``lib_name`` - the library name contains the function.
6202+
``func_name`` - the function name to run.
6203+
``keys`` - the keys that will be touched by the function.
6204+
``args`` - Additional argument to pass to the function.
6205+
6206+
For more information see https://redis.io/commands/tfcall/
6207+
"""
6208+
return self._tfcall(lib_name, func_name, keys, False, *args)
6209+
6210+
def tfcall_async(
6211+
self,
6212+
lib_name: str,
6213+
func_name: str,
6214+
keys: KeysT = None,
6215+
*args: List,
6216+
) -> ResponseT:
6217+
"""
6218+
Invoke an async function (coroutine).
6219+
6220+
``lib_name`` - the library name contains the function.
6221+
``func_name`` - the function name to run.
6222+
``keys`` - the keys that will be touched by the function.
6223+
``args`` - Additional argument to pass to the function.
6224+
6225+
For more information see https://redis.io/commands/tfcall/
6226+
"""
6227+
return self._tfcall(lib_name, func_name, keys, True, *args)
6228+
6229+
6230+
AsyncGearsCommands = GearsCommands
6231+
6232+
61086233
class DataAccessCommands(
61096234
BasicKeyCommands,
61106235
HyperlogCommands,
@@ -6148,6 +6273,7 @@ class CoreCommands(
61486273
PubSubCommands,
61496274
ScriptCommands,
61506275
FunctionCommands,
6276+
GearsCommands,
61516277
):
61526278
"""
61536279
A class containing all of the implemented redis commands. This class is
@@ -6164,6 +6290,7 @@ class AsyncCoreCommands(
61646290
AsyncPubSubCommands,
61656291
AsyncScriptCommands,
61666292
AsyncFunctionCommands,
6293+
AsyncGearsCommands,
61676294
):
61686295
"""
61696296
A class containing all of the implemented redis commands. This class is

tests/test_cluster.py

+51
Original file line numberDiff line numberDiff line change
@@ -2431,6 +2431,57 @@ def teardown():
24312431
assert "client-info" in r.acl_log(count=1, target_nodes=node)[0]
24322432
assert r.acl_log_reset(target_nodes=node)
24332433

2434+
def generate_lib_code(self, lib_name):
2435+
return f"""#!js api_version=1.0 name={lib_name}\n redis.registerFunction('foo', ()=>{{return 'bar'}})""" # noqa
2436+
2437+
def try_delete_libs(self, r, *lib_names):
2438+
for lib_name in lib_names:
2439+
try:
2440+
r.tfunction_delete(lib_name)
2441+
except Exception:
2442+
pass
2443+
2444+
@skip_if_server_version_lt("7.1.140")
2445+
def test_tfunction_load_delete(self, r):
2446+
r.gears_refresh_cluster()
2447+
self.try_delete_libs(r, "lib1")
2448+
lib_code = self.generate_lib_code("lib1")
2449+
assert r.tfunction_load(lib_code)
2450+
assert r.tfunction_delete("lib1")
2451+
2452+
@skip_if_server_version_lt("7.1.140")
2453+
def test_tfunction_list(self, r):
2454+
r.gears_refresh_cluster()
2455+
self.try_delete_libs(r, "lib1", "lib2", "lib3")
2456+
assert r.tfunction_load(self.generate_lib_code("lib1"))
2457+
assert r.tfunction_load(self.generate_lib_code("lib2"))
2458+
assert r.tfunction_load(self.generate_lib_code("lib3"))
2459+
2460+
# test error thrown when verbose > 4
2461+
with pytest.raises(DataError):
2462+
assert r.tfunction_list(verbose=8)
2463+
2464+
functions = r.tfunction_list(verbose=1)
2465+
assert len(functions) == 3
2466+
2467+
expected_names = [b"lib1", b"lib2", b"lib3"]
2468+
actual_names = [functions[0][13], functions[1][13], functions[2][13]]
2469+
2470+
assert sorted(expected_names) == sorted(actual_names)
2471+
assert r.tfunction_delete("lib1")
2472+
assert r.tfunction_delete("lib2")
2473+
assert r.tfunction_delete("lib3")
2474+
2475+
@skip_if_server_version_lt("7.1.140")
2476+
def test_tfcall(self, r):
2477+
r.gears_refresh_cluster()
2478+
self.try_delete_libs(r, "lib1")
2479+
assert r.tfunction_load(self.generate_lib_code("lib1"))
2480+
assert r.tfcall("lib1", "foo") == b"bar"
2481+
assert r.tfcall_async("lib1", "foo") == b"bar"
2482+
2483+
assert r.tfunction_delete("lib1")
2484+
24342485

24352486
@pytest.mark.onlycluster
24362487
class TestNodesManager:

tests/test_commands.py

+51
Original file line numberDiff line numberDiff line change
@@ -1791,6 +1791,57 @@ def test_substr(self, r):
17911791
assert r.substr("a", 3, 5) == b"345"
17921792
assert r.substr("a", 3, -2) == b"345678"
17931793

1794+
def generate_lib_code(self, lib_name):
1795+
return f"""#!js api_version=1.0 name={lib_name}\n redis.registerFunction('foo', ()=>{{return 'bar'}})""" # noqa
1796+
1797+
def try_delete_libs(self, r, *lib_names):
1798+
for lib_name in lib_names:
1799+
try:
1800+
r.tfunction_delete(lib_name)
1801+
except Exception:
1802+
pass
1803+
1804+
@pytest.mark.onlynoncluster
1805+
@skip_if_server_version_lt("7.1.140")
1806+
def test_tfunction_load_delete(self, r):
1807+
self.try_delete_libs(r, "lib1")
1808+
lib_code = self.generate_lib_code("lib1")
1809+
assert r.tfunction_load(lib_code)
1810+
assert r.tfunction_delete("lib1")
1811+
1812+
@pytest.mark.onlynoncluster
1813+
@skip_if_server_version_lt("7.1.140")
1814+
def test_tfunction_list(self, r):
1815+
self.try_delete_libs(r, "lib1", "lib2", "lib3")
1816+
assert r.tfunction_load(self.generate_lib_code("lib1"))
1817+
assert r.tfunction_load(self.generate_lib_code("lib2"))
1818+
assert r.tfunction_load(self.generate_lib_code("lib3"))
1819+
1820+
# test error thrown when verbose > 4
1821+
with pytest.raises(redis.exceptions.DataError):
1822+
assert r.tfunction_list(verbose=8)
1823+
1824+
functions = r.tfunction_list(verbose=1)
1825+
assert len(functions) == 3
1826+
1827+
expected_names = [b"lib1", b"lib2", b"lib3"]
1828+
actual_names = [functions[0][13], functions[1][13], functions[2][13]]
1829+
1830+
assert sorted(expected_names) == sorted(actual_names)
1831+
assert r.tfunction_delete("lib1")
1832+
assert r.tfunction_delete("lib2")
1833+
assert r.tfunction_delete("lib3")
1834+
1835+
@pytest.mark.onlynoncluster
1836+
@skip_if_server_version_lt("7.1.140")
1837+
def test_tfcall(self, r):
1838+
self.try_delete_libs(r, "lib1")
1839+
assert r.tfunction_load(self.generate_lib_code("lib1"))
1840+
assert r.tfcall("lib1", "foo") == b"bar"
1841+
assert r.tfcall_async("lib1", "foo") == b"bar"
1842+
1843+
assert r.tfunction_delete("lib1")
1844+
17941845
def test_ttl(self, r):
17951846
r["a"] = "1"
17961847
assert r.expire("a", 10)

0 commit comments

Comments
 (0)