32
32
import org .springframework .data .redis .TooManyClusterRedirectionsException ;
33
33
import org .springframework .data .redis .connection .util .ByteArraySet ;
34
34
import org .springframework .data .redis .connection .util .ByteArrayWrapper ;
35
+ import org .springframework .lang .NonNull ;
35
36
import org .springframework .lang .Nullable ;
36
37
import org .springframework .scheduling .concurrent .ThreadPoolTaskExecutor ;
37
38
import org .springframework .util .Assert ;
48
49
*/
49
50
public class ClusterCommandExecutor implements DisposableBean {
50
51
51
- private AsyncTaskExecutor executor ;
52
- private final ClusterTopologyProvider topologyProvider ;
52
+ protected static final AsyncTaskExecutor DEFAULT_TASK_EXECUTOR = new SimpleAsyncTaskExecutor ();
53
+
54
+ private int maxRedirects = 5 ;
55
+
56
+ private final AsyncTaskExecutor executor ;
57
+
53
58
private final ClusterNodeResourceProvider resourceProvider ;
59
+
60
+ private final ClusterTopologyProvider topologyProvider ;
61
+
54
62
private final ExceptionTranslationStrategy exceptionTranslationStrategy ;
55
- private int maxRedirects = 5 ;
56
63
57
64
/**
58
65
* Create a new instance of {@link ClusterCommandExecutor}.
@@ -64,13 +71,7 @@ public class ClusterCommandExecutor implements DisposableBean {
64
71
public ClusterCommandExecutor (ClusterTopologyProvider topologyProvider , ClusterNodeResourceProvider resourceProvider ,
65
72
ExceptionTranslationStrategy exceptionTranslation ) {
66
73
67
- Assert .notNull (topologyProvider , "ClusterTopologyProvider must not be null" );
68
- Assert .notNull (resourceProvider , "ClusterNodeResourceProvider must not be null" );
69
- Assert .notNull (exceptionTranslation , "ExceptionTranslationStrategy must not be null" );
70
-
71
- this .topologyProvider = topologyProvider ;
72
- this .resourceProvider = resourceProvider ;
73
- this .exceptionTranslationStrategy = exceptionTranslation ;
74
+ this (topologyProvider , resourceProvider , exceptionTranslation , DEFAULT_TASK_EXECUTOR );
74
75
}
75
76
76
77
/**
@@ -82,36 +83,42 @@ public ClusterCommandExecutor(ClusterTopologyProvider topologyProvider, ClusterN
82
83
public ClusterCommandExecutor (ClusterTopologyProvider topologyProvider , ClusterNodeResourceProvider resourceProvider ,
83
84
ExceptionTranslationStrategy exceptionTranslation , @ Nullable AsyncTaskExecutor executor ) {
84
85
85
- this (topologyProvider , resourceProvider , exceptionTranslation );
86
- this .executor = executor ;
86
+ Assert .notNull (topologyProvider , "ClusterTopologyProvider must not be null" );
87
+ Assert .notNull (resourceProvider , "ClusterNodeResourceProvider must not be null" );
88
+ Assert .notNull (exceptionTranslation , "ExceptionTranslationStrategy must not be null" );
89
+
90
+ this .topologyProvider = topologyProvider ;
91
+ this .resourceProvider = resourceProvider ;
92
+ this .exceptionTranslationStrategy = exceptionTranslation ;
93
+ this .executor = resolveTaskExecutor (executor );
87
94
}
88
95
89
- {
90
- if (executor == null ) {
91
- this .executor = new SimpleAsyncTaskExecutor ();
92
- }
96
+ private @ NonNull AsyncTaskExecutor resolveTaskExecutor (@ Nullable AsyncTaskExecutor taskExecutor ) {
97
+ return taskExecutor != null ? taskExecutor : DEFAULT_TASK_EXECUTOR ;
93
98
}
94
99
95
100
/**
96
101
* Run {@link ClusterCommandCallback} on a random node.
97
102
*
98
- * @param cmd must not be {@literal null}.
103
+ * @param commandCallback must not be {@literal null}.
99
104
* @return never {@literal null}.
100
105
*/
101
- public <T > NodeResult <T > executeCommandOnArbitraryNode (ClusterCommandCallback <?, T > cmd ) {
106
+ public <T > NodeResult <T > executeCommandOnArbitraryNode (ClusterCommandCallback <?, T > commandCallback ) {
107
+
108
+ Assert .notNull (commandCallback , "ClusterCommandCallback must not be null" );
102
109
103
- Assert .notNull (cmd , "ClusterCommandCallback must not be null" );
104
110
List <RedisClusterNode > nodes = new ArrayList <>(getClusterTopology ().getActiveNodes ());
105
- return executeCommandOnSingleNode (cmd , nodes .get (new Random ().nextInt (nodes .size ())));
111
+
112
+ return executeCommandOnSingleNode (commandCallback , nodes .get (new Random ().nextInt (nodes .size ())));
106
113
}
107
114
108
115
/**
109
116
* Run {@link ClusterCommandCallback} on given {@link RedisClusterNode}.
110
117
*
111
118
* @param cmd must not be {@literal null}.
112
119
* @param node must not be {@literal null}.
120
+ * @return the {@link NodeResult} from the single, targeted {@link RedisClusterNode}.
113
121
* @throws IllegalArgumentException in case no resource can be acquired for given node.
114
- * @return
115
122
*/
116
123
public <S , T > NodeResult <T > executeCommandOnSingleNode (ClusterCommandCallback <S , T > cmd , RedisClusterNode node ) {
117
124
return executeCommandOnSingleNode (cmd , node , 0 );
@@ -132,19 +139,21 @@ private <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S
132
139
RedisClusterNode nodeToUse = lookupNode (node );
133
140
134
141
S client = this .resourceProvider .getResourceForSpecificNode (nodeToUse );
142
+
135
143
Assert .notNull (client , "Could not acquire resource for node; Is your cluster info up to date" );
136
144
137
145
try {
138
146
return new NodeResult <>(node , cmd .doInCluster (client ));
139
- } catch (RuntimeException ex ) {
147
+ } catch (RuntimeException cause ) {
140
148
141
- RuntimeException translatedException = convertToDataAccessException (ex );
142
- if (translatedException instanceof ClusterRedirectException ) {
143
- ClusterRedirectException cre = (ClusterRedirectException ) translatedException ;
144
- return executeCommandOnSingleNode (cmd ,
145
- topologyProvider .getTopology ().lookup (cre .getTargetHost (), cre .getTargetPort ()), redirectCount + 1 );
149
+ RuntimeException translatedException = convertToDataAccessException (cause );
150
+
151
+ if (translatedException instanceof ClusterRedirectException clusterRedirectException ) {
152
+ return executeCommandOnSingleNode (cmd , topologyProvider .getTopology ()
153
+ .lookup (clusterRedirectException .getTargetHost (), clusterRedirectException .getTargetPort ()),
154
+ redirectCount + 1 );
146
155
} else {
147
- throw translatedException != null ? translatedException : ex ;
156
+ throw translatedException != null ? translatedException : cause ;
148
157
}
149
158
} finally {
150
159
this .resourceProvider .returnResourceForSpecificNode (nodeToUse , client );
@@ -159,10 +168,11 @@ private <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S
159
168
* @throws IllegalArgumentException in case the node could not be resolved to a topology-known node
160
169
*/
161
170
private RedisClusterNode lookupNode (RedisClusterNode node ) {
171
+
162
172
try {
163
173
return topologyProvider .getTopology ().lookup (node );
164
- } catch (ClusterStateFailureException e ) {
165
- throw new IllegalArgumentException (String .format ("Node %s is unknown to cluster" , node ), e );
174
+ } catch (ClusterStateFailureException cause ) {
175
+ throw new IllegalArgumentException (String .format ("Node %s is unknown to cluster" , node ), cause );
166
176
}
167
177
}
168
178
@@ -171,7 +181,8 @@ private RedisClusterNode lookupNode(RedisClusterNode node) {
171
181
*
172
182
* @param cmd must not be {@literal null}.
173
183
* @return never {@literal null}.
174
- * @throws ClusterCommandExecutionFailureException
184
+ * @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
185
+ * {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
175
186
*/
176
187
public <S , T > MultiNodeResult <T > executeCommandOnAllNodes (final ClusterCommandCallback <S , T > cmd ) {
177
188
return executeCommandAsyncOnNodes (cmd , getClusterTopology ().getActiveMasterNodes ());
@@ -181,7 +192,8 @@ public <S, T> MultiNodeResult<T> executeCommandOnAllNodes(final ClusterCommandCa
181
192
* @param callback must not be {@literal null}.
182
193
* @param nodes must not be {@literal null}.
183
194
* @return never {@literal null}.
184
- * @throws ClusterCommandExecutionFailureException
195
+ * @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
196
+ * {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
185
197
* @throws IllegalArgumentException in case the node could not be resolved to a topology-known node
186
198
*/
187
199
public <S , T > MultiNodeResult <T > executeCommandAsyncOnNodes (ClusterCommandCallback <S , T > callback ,
@@ -202,6 +214,7 @@ public <S, T> MultiNodeResult<T> executeCommandAsyncOnNodes(ClusterCommandCallba
202
214
}
203
215
204
216
Map <NodeExecution , Future <NodeResult <T >>> futures = new LinkedHashMap <>();
217
+
205
218
for (RedisClusterNode node : resolvedRedisClusterNodes ) {
206
219
futures .put (new NodeExecution (node ), executor .submit (() -> executeCommandOnSingleNode (callback , node )));
207
220
}
@@ -213,10 +226,10 @@ private <T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResu
213
226
214
227
boolean done = false ;
215
228
216
- MultiNodeResult <T > result = new MultiNodeResult <>();
217
229
Map <RedisClusterNode , Throwable > exceptions = new HashMap <>();
218
-
230
+ MultiNodeResult < T > result = new MultiNodeResult <>();
219
231
Set <String > saveGuard = new HashSet <>();
232
+
220
233
while (!done ) {
221
234
222
235
done = true ;
@@ -242,6 +255,7 @@ private <T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResu
242
255
} catch (ExecutionException e ) {
243
256
244
257
RuntimeException ex = convertToDataAccessException ((Exception ) e .getCause ());
258
+
245
259
exceptions .put (execution .getNode (), ex != null ? ex : e .getCause ());
246
260
} catch (InterruptedException e ) {
247
261
@@ -253,6 +267,7 @@ private <T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResu
253
267
}
254
268
}
255
269
}
270
+
256
271
try {
257
272
Thread .sleep (10 );
258
273
} catch (InterruptedException e ) {
@@ -273,21 +288,23 @@ private <T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResu
273
288
*
274
289
* @param cmd must not be {@literal null}.
275
290
* @return never {@literal null}.
276
- * @throws ClusterCommandExecutionFailureException
291
+ * @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
292
+ * {@link MultiKeyClusterCommandCallback command}.
277
293
*/
278
294
public <S , T > MultiNodeResult <T > executeMultiKeyCommand (MultiKeyClusterCommandCallback <S , T > cmd ,
279
295
Iterable <byte []> keys ) {
280
296
281
297
Map <RedisClusterNode , PositionalKeys > nodeKeyMap = new HashMap <>();
282
-
283
298
int index = 0 ;
299
+
284
300
for (byte [] key : keys ) {
285
301
for (RedisClusterNode node : getClusterTopology ().getKeyServingNodes (key )) {
286
302
nodeKeyMap .computeIfAbsent (node , val -> PositionalKeys .empty ()).append (PositionalKey .of (key , index ++));
287
303
}
288
304
}
289
305
290
306
Map <NodeExecution , Future <NodeResult <T >>> futures = new LinkedHashMap <>();
307
+
291
308
for (Entry <RedisClusterNode , PositionalKeys > entry : nodeKeyMap .entrySet ()) {
292
309
293
310
if (entry .getKey ().isMaster ()) {
@@ -309,6 +326,7 @@ private <S, T> NodeResult<T> executeMultiKeyCommandOnSingleNode(MultiKeyClusterC
309
326
Assert .notNull (key , "Keys for execution must not be null" );
310
327
311
328
S client = this .resourceProvider .getResourceForSpecificNode (node );
329
+
312
330
Assert .notNull (client , "Could not acquire resource for node; Is your cluster info up to date" );
313
331
314
332
try {
@@ -479,7 +497,9 @@ public RedisClusterNode getNode() {
479
497
}
480
498
481
499
/**
482
- * @return
500
+ * Returns the key as an array of bytes.
501
+ *
502
+ * @return the key as an array of bytes.
483
503
*/
484
504
public byte [] getKey () {
485
505
return key .getArray ();
0 commit comments