Skip to content

Commit c7fb2fb

Browse files
committed
Implement retrieve(key) and retrieve(key, :Supplier<CompletableFuture<T>>) operations in RedisCache.
Closes spring-projects#2650
1 parent 624d7c6 commit c7fb2fb

File tree

6 files changed

+307
-21
lines changed

6 files changed

+307
-21
lines changed

src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,18 @@
1515
*/
1616
package org.springframework.data.redis.cache;
1717

18+
import reactor.core.publisher.Mono;
19+
20+
import java.nio.ByteBuffer;
1821
import java.nio.charset.StandardCharsets;
1922
import java.time.Duration;
2023
import java.util.concurrent.TimeUnit;
2124
import java.util.function.Consumer;
2225
import java.util.function.Function;
2326

2427
import org.springframework.dao.PessimisticLockingFailureException;
28+
import org.springframework.data.redis.connection.ReactiveRedisConnection;
29+
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
2530
import org.springframework.data.redis.connection.RedisConnection;
2631
import org.springframework.data.redis.connection.RedisConnectionFactory;
2732
import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
@@ -114,8 +119,8 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
114119
Assert.notNull(key, "Key must not be null");
115120

116121
byte[] result = shouldExpireWithin(ttl)
117-
? execute(name, connection -> connection.stringCommands().getEx(key, Expiration.from(ttl)))
118-
: execute(name, connection -> connection.stringCommands().get(key));
122+
? execute(name, connection -> connection.stringCommands().getEx(key, Expiration.from(ttl)))
123+
: execute(name, connection -> connection.stringCommands().get(key));
119124

120125
statistics.incGets(name);
121126

@@ -128,6 +133,32 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
128133
return result;
129134
}
130135

136+
@Override
137+
public Mono<ByteBuffer> retrieve(String name, byte[] key, @Nullable Duration ttl) {
138+
139+
assertReactiveRedisConnectionFactory();
140+
141+
Assert.notNull(name, "Name must not be null");
142+
Assert.notNull(key, "Key must not be null");
143+
144+
ByteBuffer wrappedKey = ByteBuffer.wrap(key);
145+
146+
Mono<ByteBuffer> result = shouldExpireWithin(ttl)
147+
? executeReactively(name, connection -> connection.stringCommands().getEx(wrappedKey, Expiration.from(ttl)))
148+
: executeReactively(name, connection -> connection.stringCommands().get(wrappedKey));
149+
150+
result = result.doOnSuccess(byteBuffer -> {
151+
if (byteBuffer != null) {
152+
statistics.incHits(name);
153+
}
154+
else {
155+
statistics.incMisses(name);
156+
}
157+
}).doFirst(() -> statistics.incGets(name));
158+
159+
return result;
160+
}
161+
131162
@Override
132163
public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
133164

@@ -308,6 +339,18 @@ private void executeLockFree(Consumer<RedisConnection> callback) {
308339
}
309340
}
310341

342+
private <T> T executeReactively(String name, Function<ReactiveRedisConnection, T> callback) {
343+
344+
ReactiveRedisConnection connection = getReactiveRedisConnectionFactory().getReactiveConnection();
345+
346+
try {
347+
return callback.apply(connection);
348+
}
349+
finally {
350+
connection.closeLater();
351+
}
352+
}
353+
311354
private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection connection) {
312355

313356
if (!isLockingCacheWriter()) {
@@ -333,11 +376,21 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
333376
}
334377
}
335378

379+
private void assertReactiveRedisConnectionFactory() {
380+
381+
Assert.state(this.connectionFactory instanceof ReactiveRedisConnectionFactory,
382+
() -> "Cache.retrieve(key) is only supported in Reactive Redis");
383+
}
384+
385+
private ReactiveRedisConnectionFactory getReactiveRedisConnectionFactory() {
386+
return (ReactiveRedisConnectionFactory) this.connectionFactory;
387+
}
388+
336389
private static byte[] createCacheLockKey(String name) {
337390
return (name + "~lock").getBytes(StandardCharsets.UTF_8);
338391
}
339392

340-
private boolean isTrue(@Nullable Boolean value) {
393+
private static boolean isTrue(@Nullable Boolean value) {
341394
return Boolean.TRUE.equals(value);
342395
}
343396

src/main/java/org/springframework/data/redis/cache/RedisCache.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import org.springframework.util.ObjectUtils;
4646
import org.springframework.util.ReflectionUtils;
4747

48+
import reactor.core.publisher.Mono;
49+
4850
/**
4951
* {@link org.springframework.cache.Cache} implementation using for Redis as the underlying store for cache data.
5052
* <p>
@@ -293,12 +295,21 @@ protected Object preProcessCacheValue(@Nullable Object value) {
293295

294296
@Override
295297
public CompletableFuture<?> retrieve(Object key) {
296-
return super.retrieve(key);
298+
return retrieveMono(key).toFuture();
297299
}
298300

299301
@Override
302+
@SuppressWarnings("unchecked")
300303
public <T> CompletableFuture<T> retrieve(Object key, Supplier<CompletableFuture<T>> valueLoader) {
301-
return super.retrieve(key, valueLoader);
304+
305+
return retrieveMono(key)
306+
.map(value -> (T) deserializeCacheValue(ByteUtils.getBytes(value)))
307+
.or(Mono.fromCompletionStage(valueLoader))
308+
.toFuture();
309+
}
310+
311+
private Mono<ByteBuffer> retrieveMono(Object key) {
312+
return getCacheWriter().retrieve(getName(), createAndConvertCacheKey(key));
302313
}
303314

304315
/**

src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
*/
1616
package org.springframework.data.redis.cache;
1717

18+
import java.nio.ByteBuffer;
1819
import java.time.Duration;
1920

2021
import org.springframework.data.redis.connection.RedisConnectionFactory;
2122
import org.springframework.lang.Nullable;
2223
import org.springframework.util.Assert;
2324

25+
import reactor.core.publisher.Mono;
26+
2427
/**
2528
* {@link RedisCacheWriter} provides low-level access to Redis commands ({@code SET, SETNX, GET, EXPIRE,...}) used for
2629
* caching.
@@ -135,6 +138,12 @@ default byte[] get(String name, byte[] key, @Nullable Duration ttl) {
135138
return get(name, key);
136139
}
137140

141+
default Mono<ByteBuffer> retrieve(String name, byte[] key) {
142+
return retrieve(name, key, null);
143+
}
144+
145+
Mono<ByteBuffer> retrieve(String name, byte[] key, @Nullable Duration ttl);
146+
138147
/**
139148
* Write the given key/value pair to Redis and set the expiration time if defined.
140149
*

0 commit comments

Comments
 (0)