Skip to content

Commit d7d4336

Browse files
commands/cluster: use pipeline to execute split commands (#2230)
- allow passing target_nodes to pipeline commands - move READ_COMMANDS to commands/cluster to avoid import cycle - add types to list_or_args
1 parent 11cf66a commit d7d4336

File tree

5 files changed

+151
-153
lines changed

5 files changed

+151
-153
lines changed

redis/asyncio/cluster.py

+11-6
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
from redis.cluster import (
2424
PIPELINE_BLOCKED_COMMANDS,
2525
PRIMARY,
26-
READ_COMMANDS,
2726
REPLICA,
2827
SLOT_ID,
2928
AbstractRedisCluster,
@@ -32,7 +31,7 @@
3231
get_node_name,
3332
parse_cluster_slots,
3433
)
35-
from redis.commands import AsyncRedisClusterCommands
34+
from redis.commands import READ_COMMANDS, AsyncRedisClusterCommands
3635
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
3736
from redis.exceptions import (
3837
AskError,
@@ -1350,11 +1349,17 @@ async def _execute(
13501349

13511350
nodes = {}
13521351
for cmd in todo:
1353-
target_nodes = await client._determine_nodes(*cmd.args)
1354-
if not target_nodes:
1355-
raise RedisClusterException(
1356-
f"No targets were found to execute {cmd.args} command on"
1352+
passed_targets = cmd.kwargs.pop("target_nodes", None)
1353+
if passed_targets and not client._is_node_flag(passed_targets):
1354+
target_nodes = client._parse_target_nodes(passed_targets)
1355+
else:
1356+
target_nodes = await client._determine_nodes(
1357+
*cmd.args, node_flag=passed_targets
13571358
)
1359+
if not target_nodes:
1360+
raise RedisClusterException(
1361+
f"No targets were found to execute {cmd.args} command on"
1362+
)
13581363
if len(target_nodes) > 1:
13591364
raise RedisClusterException(f"Too many targets for command {cmd.args}")
13601365

redis/cluster.py

+15-50
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from typing import Any, Callable, Dict, Tuple
1010

1111
from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan
12-
from redis.commands import CommandsParser, RedisClusterCommands
12+
from redis.commands import READ_COMMANDS, CommandsParser, RedisClusterCommands
1313
from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url
1414
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
1515
from redis.exceptions import (
@@ -154,52 +154,6 @@ def parse_cluster_shards(resp, **options):
154154
)
155155
KWARGS_DISABLED_KEYS = ("host", "port")
156156

157-
# Not complete, but covers the major ones
158-
# https://redis.io/commands
159-
READ_COMMANDS = frozenset(
160-
[
161-
"BITCOUNT",
162-
"BITPOS",
163-
"EXISTS",
164-
"GEODIST",
165-
"GEOHASH",
166-
"GEOPOS",
167-
"GEORADIUS",
168-
"GEORADIUSBYMEMBER",
169-
"GET",
170-
"GETBIT",
171-
"GETRANGE",
172-
"HEXISTS",
173-
"HGET",
174-
"HGETALL",
175-
"HKEYS",
176-
"HLEN",
177-
"HMGET",
178-
"HSTRLEN",
179-
"HVALS",
180-
"KEYS",
181-
"LINDEX",
182-
"LLEN",
183-
"LRANGE",
184-
"MGET",
185-
"PTTL",
186-
"RANDOMKEY",
187-
"SCARD",
188-
"SDIFF",
189-
"SINTER",
190-
"SISMEMBER",
191-
"SMEMBERS",
192-
"SRANDMEMBER",
193-
"STRLEN",
194-
"SUNION",
195-
"TTL",
196-
"ZCARD",
197-
"ZCOUNT",
198-
"ZRANGE",
199-
"ZSCORE",
200-
]
201-
)
202-
203157

204158
def cleanup_kwargs(**kwargs):
205159
"""
@@ -1993,14 +1947,25 @@ def _send_cluster_commands(
19931947
# refer to our internal node -> slot table that
19941948
# tells us where a given
19951949
# command should route to.
1996-
node = self._determine_nodes(*c.args)
1950+
passed_targets = c.options.pop("target_nodes", None)
1951+
if passed_targets and not self._is_nodes_flag(passed_targets):
1952+
target_nodes = self._parse_target_nodes(passed_targets)
1953+
else:
1954+
target_nodes = self._determine_nodes(*c.args, node_flag=passed_targets)
1955+
if not target_nodes:
1956+
raise RedisClusterException(
1957+
f"No targets were found to execute {c.args} command on"
1958+
)
1959+
if len(target_nodes) > 1:
1960+
raise RedisClusterException(f"Too many targets for command {c.args}")
19971961

1962+
node = target_nodes[0]
19981963
# now that we know the name of the node
19991964
# ( it's just a string in the form of host:port )
20001965
# we can build a list of commands for each node.
2001-
node_name = node[0].name
1966+
node_name = node.name
20021967
if node_name not in nodes:
2003-
redis_node = self.get_redis_connection(node[0])
1968+
redis_node = self.get_redis_connection(node)
20041969
connection = get_connection(redis_node, c.args)
20051970
nodes[node_name] = NodeCommands(
20061971
redis_node.parse_response, redis_node.connection_pool, connection

redis/commands/__init__.py

+7-6
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
1-
from .cluster import AsyncRedisClusterCommands, RedisClusterCommands
1+
from .cluster import READ_COMMANDS, AsyncRedisClusterCommands, RedisClusterCommands
22
from .core import AsyncCoreCommands, CoreCommands
33
from .helpers import list_or_args
44
from .parser import CommandsParser
55
from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands
66
from .sentinel import AsyncSentinelCommands, SentinelCommands
77

88
__all__ = [
9+
"AsyncCoreCommands",
910
"AsyncRedisClusterCommands",
10-
"RedisClusterCommands",
11+
"AsyncRedisModuleCommands",
12+
"AsyncSentinelCommands",
1113
"CommandsParser",
12-
"AsyncCoreCommands",
1314
"CoreCommands",
14-
"list_or_args",
15-
"AsyncRedisModuleCommands",
15+
"READ_COMMANDS",
16+
"RedisClusterCommands",
1617
"RedisModuleCommands",
17-
"AsyncSentinelCommands",
1818
"SentinelCommands",
19+
"list_or_args",
1920
]

0 commit comments

Comments
 (0)