32
32
import org .springframework .data .redis .TooManyClusterRedirectionsException ;
33
33
import org .springframework .data .redis .connection .util .ByteArraySet ;
34
34
import org .springframework .data .redis .connection .util .ByteArrayWrapper ;
35
- import org .springframework .lang .NonNull ;
36
35
import org .springframework .lang .Nullable ;
37
- import org .springframework .scheduling .concurrent .ThreadPoolTaskExecutor ;
38
36
import org .springframework .util .Assert ;
39
37
import org .springframework .util .CollectionUtils ;
40
38
import org .springframework .util .ObjectUtils ;
@@ -78,7 +76,7 @@ public ClusterCommandExecutor(ClusterTopologyProvider topologyProvider, ClusterN
78
76
* @param topologyProvider must not be {@literal null}.
79
77
* @param resourceProvider must not be {@literal null}.
80
78
* @param exceptionTranslation must not be {@literal null}.
81
- * @param executor can be {@literal null}. Defaulted to {@link ThreadPoolTaskExecutor }.
79
+ * @param executor the task executor to null, defaults to {@link SimpleAsyncTaskExecutor} if {@literal null }.
82
80
*/
83
81
public ClusterCommandExecutor (ClusterTopologyProvider topologyProvider , ClusterNodeResourceProvider resourceProvider ,
84
82
ExceptionTranslationStrategy exceptionTranslation , @ Nullable AsyncTaskExecutor executor ) {
@@ -90,11 +88,7 @@ public ClusterCommandExecutor(ClusterTopologyProvider topologyProvider, ClusterN
90
88
this .topologyProvider = topologyProvider ;
91
89
this .resourceProvider = resourceProvider ;
92
90
this .exceptionTranslationStrategy = exceptionTranslation ;
93
- this .executor = resolveTaskExecutor (executor );
94
- }
95
-
96
- private @ NonNull AsyncTaskExecutor resolveTaskExecutor (@ Nullable AsyncTaskExecutor taskExecutor ) {
97
- return taskExecutor != null ? taskExecutor : DEFAULT_TASK_EXECUTOR ;
91
+ this .executor = executor != null ? executor : DEFAULT_TASK_EXECUTOR ;
98
92
}
99
93
100
94
/**
@@ -149,9 +143,8 @@ private <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S
149
143
RuntimeException translatedException = convertToDataAccessException (cause );
150
144
151
145
if (translatedException instanceof ClusterRedirectException clusterRedirectException ) {
152
- return executeCommandOnSingleNode (cmd , topologyProvider .getTopology ()
153
- .lookup (clusterRedirectException .getTargetHost (), clusterRedirectException .getTargetPort ()),
154
- redirectCount + 1 );
146
+ return executeCommandOnSingleNode (cmd , topologyProvider .getTopology ().lookup (
147
+ clusterRedirectException .getTargetHost (), clusterRedirectException .getTargetPort ()), redirectCount + 1 );
155
148
} else {
156
149
throw translatedException != null ? translatedException : cause ;
157
150
}
@@ -182,7 +175,7 @@ private RedisClusterNode lookupNode(RedisClusterNode node) {
182
175
* @param cmd must not be {@literal null}.
183
176
* @return never {@literal null}.
184
177
* @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
185
- * {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
178
+ * {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
186
179
*/
187
180
public <S , T > MultiNodeResult <T > executeCommandOnAllNodes (final ClusterCommandCallback <S , T > cmd ) {
188
181
return executeCommandAsyncOnNodes (cmd , getClusterTopology ().getActiveMasterNodes ());
@@ -193,7 +186,7 @@ public <S, T> MultiNodeResult<T> executeCommandOnAllNodes(final ClusterCommandCa
193
186
* @param nodes must not be {@literal null}.
194
187
* @return never {@literal null}.
195
188
* @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
196
- * {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
189
+ * {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
197
190
* @throws IllegalArgumentException in case the node could not be resolved to a topology-known node
198
191
*/
199
192
public <S , T > MultiNodeResult <T > executeCommandAsyncOnNodes (ClusterCommandCallback <S , T > callback ,
@@ -295,7 +288,7 @@ private <T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResu
295
288
* @param commandCallback must not be {@literal null}.
296
289
* @return never {@literal null}.
297
290
* @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
298
- * {@link MultiKeyClusterCommandCallback command}.
291
+ * {@link MultiKeyClusterCommandCallback command}.
299
292
*/
300
293
public <S , T > MultiNodeResult <T > executeMultiKeyCommand (MultiKeyClusterCommandCallback <S , T > commandCallback ,
301
294
Iterable <byte []> keys ) {
@@ -315,8 +308,8 @@ public <S, T> MultiNodeResult<T> executeMultiKeyCommand(MultiKeyClusterCommandCa
315
308
316
309
if (entry .getKey ().isMaster ()) {
317
310
for (PositionalKey key : entry .getValue ()) {
318
- futures .put (new NodeExecution (entry .getKey (), key ), this .executor . submit (() ->
319
- executeMultiKeyCommandOnSingleNode (commandCallback , entry .getKey (), key .getBytes ())));
311
+ futures .put (new NodeExecution (entry .getKey (), key ), this .executor
312
+ . submit (() -> executeMultiKeyCommandOnSingleNode (commandCallback , entry .getKey (), key .getBytes ())));
320
313
}
321
314
}
322
315
}
0 commit comments