Skip to content

Use Redis locking for value retrieval synchronization #2948

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>3.4.0-SNAPSHOT</version>
<version>3.4.0-GH-2890-SNAPSHOT</version>

<name>Spring Data Redis</name>
<description>Spring Data module for Redis</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;

import org.springframework.dao.PessimisticLockingFailureException;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
Expand Down Expand Up @@ -137,9 +138,14 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
Assert.notNull(name, "Name must not be null");
Assert.notNull(key, "Key must not be null");

byte[] result = shouldExpireWithin(ttl)
? execute(name, connection -> connection.stringCommands().getEx(key, Expiration.from(ttl)))
: execute(name, connection -> connection.stringCommands().get(key));
return execute(name, connection -> doGet(connection, name, key, ttl));
}

@Nullable
private byte[] doGet(RedisConnection connection, String name, byte[] key, @Nullable Duration ttl) {

byte[] result = shouldExpireWithin(ttl) ? connection.stringCommands().getEx(key, Expiration.from(ttl))
: connection.stringCommands().get(key);

statistics.incGets(name);

Expand All @@ -152,6 +158,50 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
return result;
}

@Override
public byte[] get(String name, byte[] key, Supplier<byte[]> valueLoader, @Nullable Duration ttl,
boolean timeToIdleEnabled) {

Assert.notNull(name, "Name must not be null");
Assert.notNull(key, "Key must not be null");

boolean withTtl = shouldExpireWithin(ttl);

// double-checked locking optimization
if (isLockingCacheWriter()) {
byte[] bytes = get(name, key, timeToIdleEnabled && withTtl ? ttl : null);
if (bytes != null) {
return bytes;
}
}

return execute(name, connection -> {

boolean wasLocked = false;
if (isLockingCacheWriter()) {
doLock(name, key, null, connection);
wasLocked = true;
}

try {

byte[] result = doGet(connection, name, key, timeToIdleEnabled && withTtl ? ttl : null);

if (result != null) {
return result;
}

byte[] value = valueLoader.get();
doPut(connection, name, key, value, ttl);
return value;
} finally {
if (isLockingCacheWriter() && wasLocked) {
doUnlock(name, connection);
}
}
});
}

@Override
public boolean supportsAsyncRetrieve() {
return asyncCacheWriter.isSupported();
Expand Down Expand Up @@ -186,17 +236,21 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
Assert.notNull(value, "Value must not be null");

execute(name, connection -> {

if (shouldExpireWithin(ttl)) {
connection.stringCommands().set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS),
SetOption.upsert());
} else {
connection.stringCommands().set(key, value);
}

doPut(connection, name, key, value, ttl);
return "OK";
});

}

private void doPut(RedisConnection connection, String name, byte[] key, byte[] value, @Nullable Duration ttl) {

if (shouldExpireWithin(ttl)) {
connection.stringCommands().set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS),
SetOption.upsert());
} else {
connection.stringCommands().set(key, value);
}

statistics.incPuts(name);
}

Expand Down
43 changes: 14 additions & 29 deletions src/main/java/org/springframework/data/redis/cache/RedisCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import java.util.StringJoiner;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import org.springframework.cache.Cache;
Expand Down Expand Up @@ -64,8 +62,6 @@ public class RedisCache extends AbstractValueAdaptingCache {

static final String CACHE_RETRIEVAL_UNSUPPORTED_OPERATION_EXCEPTION_MESSAGE = "The Redis driver configured with RedisCache through RedisCacheWriter does not support CompletableFuture-based retrieval";

private final Lock lock = new ReentrantLock();

private final RedisCacheConfiguration cacheConfiguration;

private final RedisCacheWriter cacheWriter;
Expand Down Expand Up @@ -154,28 +150,18 @@ public CacheStatistics getStatistics() {
@SuppressWarnings("unchecked")
public <T> T get(Object key, Callable<T> valueLoader) {

ValueWrapper result = get(key);

return result != null ? (T) result.get() : getSynchronized(key, valueLoader);
}

@Nullable
@SuppressWarnings("unchecked")
private <T> T getSynchronized(Object key, Callable<T> valueLoader) {
byte[] binaryKey = createAndConvertCacheKey(key);
byte[] binaryValue = getCacheWriter().get(getName(), binaryKey,
() -> serializeCacheValue(toStoreValue(loadCacheValue(key, valueLoader))), getTimeToLive(key),
getCacheConfiguration().isTimeToIdleEnabled());

lock.lock();
ValueWrapper result = toValueWrapper(deserializeCacheValue(binaryValue));

try {
ValueWrapper result = get(key);
return result != null ? (T) result.get() : loadCacheValue(key, valueLoader);
} finally {
lock.unlock();
}
return result != null ? (T) result.get() : null;
}

/**
* Loads the {@link Object} using the given {@link Callable valueLoader} and {@link #put(Object, Object) puts} the
* {@link Object loaded value} in the cache.
* Loads the {@link Object} using the given {@link Callable valueLoader}.
*
* @param <T> {@link Class type} of the loaded {@link Object cache value}.
* @param key {@link Object key} mapped to the loaded {@link Object cache value}.
Expand All @@ -184,17 +170,11 @@ private <T> T getSynchronized(Object key, Callable<T> valueLoader) {
*/
protected <T> T loadCacheValue(Object key, Callable<T> valueLoader) {

T value;

try {
value = valueLoader.call();
return valueLoader.call();
} catch (Exception ex) {
throw new ValueRetrievalException(key, valueLoader, ex);
}

put(key, value);

return value;
}

@Override
Expand Down Expand Up @@ -443,7 +423,12 @@ protected String convertKey(Object key) {
}

private CompletableFuture<ValueWrapper> retrieveValue(Object key) {
return getCacheWriter().retrieve(getName(), createAndConvertCacheKey(key)) //

CompletableFuture<byte[]> retrieve = getCacheConfiguration().isTimeToIdleEnabled()
? getCacheWriter().retrieve(getName(), createAndConvertCacheKey(key), getTimeToLive(key))
: getCacheWriter().retrieve(getName(), createAndConvertCacheKey(key));

return retrieve //
.thenApply(binaryValue -> binaryValue != null ? deserializeCacheValue(binaryValue) : null) //
.thenApply(this::toValueWrapper);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import org.springframework.util.Assert;

/**
* {@link RedisCacheWriter} provides low-level access to Redis commands ({@code SET, SETNX, GET, EXPIRE,...})
* used for caching.
* {@link RedisCacheWriter} provides low-level access to Redis commands ({@code SET, SETNX, GET, EXPIRE,...}) used for
* caching.
* <p>
* The {@link RedisCacheWriter} may be shared by multiple cache implementations and is responsible for reading/writing
* binary data from/to Redis. The implementation honors potential cache lock flags that might be set.
* <p>
* The default {@link RedisCacheWriter} implementation can be customized with {@link BatchStrategy}
* to tune performance behavior.
* The default {@link RedisCacheWriter} implementation can be customized with {@link BatchStrategy} to tune performance
* behavior.
*
* @author Christoph Strobl
* @author Mark Paluch
Expand Down Expand Up @@ -96,9 +96,8 @@ static RedisCacheWriter lockingRedisCacheWriter(RedisConnectionFactory connectio
*
* @param connectionFactory must not be {@literal null}.
* @param sleepTime sleep time between lock access attempts, must not be {@literal null}.
* @param lockTtlFunction TTL function to compute the Lock TTL. The function is called with contextual keys
* and values (such as the cache name on cleanup or the actual key/value on put requests);
* must not be {@literal null}.
* @param lockTtlFunction TTL function to compute the Lock TTL. The function is called with contextual keys and values
* (such as the cache name on cleanup or the actual key/value on put requests); must not be {@literal null}.
* @param batchStrategy must not be {@literal null}.
* @return new instance of {@link DefaultRedisCacheWriter}.
* @since 3.2
Expand All @@ -124,8 +123,8 @@ static RedisCacheWriter lockingRedisCacheWriter(RedisConnectionFactory connectio
byte[] get(String name, byte[] key);

/**
* Get the binary value representation from Redis stored for the given key and set
* the given {@link Duration TTL expiration} for the cache entry.
* Get the binary value representation from Redis stored for the given key and set the given {@link Duration TTL
* expiration} for the cache entry.
*
* @param name must not be {@literal null}.
* @param key must not be {@literal null}.
Expand All @@ -138,14 +137,41 @@ default byte[] get(String name, byte[] key, @Nullable Duration ttl) {
}

/**
* Determines whether the asynchronous {@link #retrieve(String, byte[])}
* and {@link #retrieve(String, byte[], Duration)} cache operations are supported by the implementation.
* Get the binary value representation from Redis stored for the given key and set the given {@link Duration TTL
* expiration} for the cache entry, obtaining the value from {@code valueLoader} if necessary.
* <p>
* The main factor for whether the {@literal retrieve} operation can be supported will primarily be determined
* by the Redis driver in use at runtime.
* If possible (and configured for locking), implementations should ensure that the loading operation is synchronized
* so that the specified {@code valueLoader} is only called once in case of concurrent access on the same key.
*
* @param name must not be {@literal null}.
* @param key must not be {@literal null}.
* @param valueLoader value loader that creates the value if the cache lookup has been not successful.
* @param ttl {@link Duration} specifying the {@literal expiration timeout} for the cache entry.
* @param timeToIdleEnabled {@literal true} to enable Time to Idle when retrieving the value.
* @since 3.4
*/
default byte[] get(String name, byte[] key, Supplier<byte[]> valueLoader, @Nullable Duration ttl,
boolean timeToIdleEnabled) {

byte[] bytes = timeToIdleEnabled ? get(name, key, ttl) : get(name, key);

if (bytes == null) {
bytes = valueLoader.get();
put(name, key, bytes, ttl);
}

return bytes;
}

/**
* Determines whether the asynchronous {@link #retrieve(String, byte[])} and
* {@link #retrieve(String, byte[], Duration)} cache operations are supported by the implementation.
* <p>
* The main factor for whether the {@literal retrieve} operation can be supported will primarily be determined by the
* Redis driver in use at runtime.
* <p>
* Returns {@literal false} by default. This will have an effect of {@link RedisCache#retrieve(Object)}
* and {@link RedisCache#retrieve(Object, Supplier)} throwing an {@link UnsupportedOperationException}.
* Returns {@literal false} by default. This will have an effect of {@link RedisCache#retrieve(Object)} and
* {@link RedisCache#retrieve(Object, Supplier)} throwing an {@link UnsupportedOperationException}.
*
* @return {@literal true} if asynchronous {@literal retrieve} operations are supported by the implementation.
* @since 3.2
Expand All @@ -155,8 +181,8 @@ default boolean supportsAsyncRetrieve() {
}

/**
* Asynchronously retrieves the {@link CompletableFuture value} to which the {@link RedisCache}
* maps the given {@link byte[] key}.
* Asynchronously retrieves the {@link CompletableFuture value} to which the {@link RedisCache} maps the given
* {@link byte[] key}.
* <p>
* This operation is non-blocking.
*
Expand All @@ -171,8 +197,8 @@ default CompletableFuture<byte[]> retrieve(String name, byte[] key) {
}

/**
* Asynchronously retrieves the {@link CompletableFuture value} to which the {@link RedisCache} maps
* the given {@link byte[] key} setting the {@link Duration TTL expiration} for the cache entry.
* Asynchronously retrieves the {@link CompletableFuture value} to which the {@link RedisCache} maps the given
* {@link byte[] key} setting the {@link Duration TTL expiration} for the cache entry.
* <p>
* This operation is non-blocking.
*
Expand All @@ -187,10 +213,10 @@ default CompletableFuture<byte[]> retrieve(String name, byte[] key) {
/**
* Write the given key/value pair to Redis and set the expiration time if defined.
*
* @param name The cache name must not be {@literal null}.
* @param key The key for the cache entry. Must not be {@literal null}.
* @param value The value stored for the key. Must not be {@literal null}.
* @param ttl Optional expiration time. Can be {@literal null}.
* @param name cache name must not be {@literal null}.
* @param key key for the cache entry. Must not be {@literal null}.
* @param value value stored for the key. Must not be {@literal null}.
* @param ttl optional expiration time. Can be {@literal null}.
*/
void put(String name, byte[] key, byte[] value, @Nullable Duration ttl);

Expand All @@ -199,21 +225,21 @@ default CompletableFuture<byte[]> retrieve(String name, byte[] key) {
* <p>
* This operation is non-blocking.
*
* @param name The cache name must not be {@literal null}.
* @param key The key for the cache entry. Must not be {@literal null}.
* @param value The value stored for the key. Must not be {@literal null}.
* @param ttl Optional expiration time. Can be {@literal null}.
* @param name cache name must not be {@literal null}.
* @param key key for the cache entry. Must not be {@literal null}.
* @param value value stored for the key. Must not be {@literal null}.
* @param ttl optional expiration time. Can be {@literal null}.
* @since 3.2
*/
CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nullable Duration ttl);

/**
* Write the given value to Redis if the key does not already exist.
*
* @param name The cache name must not be {@literal null}.
* @param key The key for the cache entry. Must not be {@literal null}.
* @param value The value stored for the key. Must not be {@literal null}.
* @param ttl Optional expiration time. Can be {@literal null}.
* @param name cache name must not be {@literal null}.
* @param key key for the cache entry. Must not be {@literal null}.
* @param value value stored for the key. Must not be {@literal null}.
* @param ttl optional expiration time. Can be {@literal null}.
* @return {@literal null} if the value has been written, the value stored for the key if it already exists.
*/
@Nullable
Expand All @@ -222,16 +248,16 @@ default CompletableFuture<byte[]> retrieve(String name, byte[] key) {
/**
* Remove the given key from Redis.
*
* @param name The cache name must not be {@literal null}.
* @param key The key for the cache entry. Must not be {@literal null}.
* @param name cache name must not be {@literal null}.
* @param key key for the cache entry. Must not be {@literal null}.
*/
void remove(String name, byte[] key);

/**
* Remove all keys following the given pattern.
*
* @param name The cache name must not be {@literal null}.
* @param pattern The pattern for the keys to remove. Must not be {@literal null}.
* @param name cache name must not be {@literal null}.
* @param pattern pattern for the keys to remove. Must not be {@literal null}.
*/
void clean(String name, byte[] pattern);

Expand Down Expand Up @@ -264,8 +290,8 @@ interface TtlFunction {
/**
* Creates a {@literal Singleton} {@link TtlFunction} using the given {@link Duration}.
*
* @param duration the time to live. Can be {@link Duration#ZERO} for persistent values (i.e. cache entry
* does not expire).
* @param duration the time to live. Can be {@link Duration#ZERO} for persistent values (i.e. cache entry does not
* expire).
* @return a singleton {@link TtlFunction} using {@link Duration}.
*/
static TtlFunction just(Duration duration) {
Expand Down
Loading
Loading