Skip to content

Refactor Jedis and Lettuce RedisConnectionFactory to SmartLifecycle beans #2627

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>3.2.0-SNAPSHOT</version>
<version>3.2.x-2503-SNAPSHOT</version>

<name>Spring Data Redis</name>
<description>Spring Data module for Redis</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.LinkedHashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLParameters;
Expand All @@ -42,9 +43,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
Expand Down Expand Up @@ -85,7 +86,7 @@
* @see JedisClientConfiguration
* @see Jedis
*/
public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
public class JedisConnectionFactory implements RedisConnectionFactory, InitializingBean, DisposableBean, SmartLifecycle {

private final static Log log = LogFactory.getLog(JedisConnectionFactory.class);
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy(
Expand All @@ -104,8 +105,11 @@ public class JedisConnectionFactory implements InitializingBean, DisposableBean,
private @Nullable ClusterTopologyProvider topologyProvider;
private @Nullable ClusterCommandExecutor clusterCommandExecutor;

private boolean initialized;
private boolean destroyed;
enum State {
CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED;
}

private AtomicReference<State> state = new AtomicReference<>(State.CREATED);

/**
* Constructs a new {@link JedisConnectionFactory} instance with default settings (default connection pooling).
Expand Down Expand Up @@ -287,24 +291,80 @@ protected JedisConnection postProcessConnection(JedisConnection connection) {
return connection;
}

public void afterPropertiesSet() {
@Override
public void start() {

clientConfig = createClientConfig(getDatabase(), getRedisUsername(), getRedisPassword());
State current = state.getAndUpdate(state -> {
if (State.CREATED.equals(state) || State.STOPPED.equals(state)) {
return State.STARTING;
}
return state;
});

if (getUsePool() && !isRedisClusterAware()) {
this.pool = createPool();
if (State.CREATED.equals(current) || State.STOPPED.equals(current)) {

if (getUsePool() && !isRedisClusterAware()) {
this.pool = createPool();
}

if (isRedisClusterAware()) {

this.cluster = createCluster();
this.topologyProvider = createTopologyProvider(this.cluster);
this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider,
new JedisClusterConnection.JedisClusterNodeResourceProvider(this.cluster, this.topologyProvider),
EXCEPTION_TRANSLATION);
}

state.set(State.STARTED);
}
}

if (isRedisClusterAware()) {
@Override
public void stop() {

if (state.compareAndSet(State.STARTED, State.STOPPING)) {
if (getUsePool() && !isRedisClusterAware()) {
if (pool != null) {
try {
this.pool.close();
} catch (Exception ex) {
log.warn("Cannot properly close Jedis pool", ex);
}
this.pool = null;
}
}

if(this.clusterCommandExecutor != null) {
try {
this.clusterCommandExecutor.destroy();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

this.cluster = createCluster();
this.topologyProvider = createTopologyProvider(this.cluster);
this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider,
new JedisClusterConnection.JedisClusterNodeResourceProvider(this.cluster, this.topologyProvider),
EXCEPTION_TRANSLATION);
if (this.cluster != null) {

this.topologyProvider = null;

try {
cluster.close();
} catch (Exception ex) {
log.warn("Cannot properly close Jedis cluster", ex);
}
}
state.set(State.STOPPED);
}
}

@Override
public boolean isRunning() {
return State.STARTED.equals(state.get());
}

this.initialized = true;
@Override
public void afterPropertiesSet() {
clientConfig = createClientConfig(getDatabase(), getRedisUsername(), getRedisPassword());
}

JedisClientConfig createSentinelClientConfig(SentinelConfiguration sentinelConfiguration) {
Expand Down Expand Up @@ -415,32 +475,8 @@ protected JedisCluster createCluster(RedisClusterConfiguration clusterConfig,

public void destroy() {

if (getUsePool() && pool != null) {

try {
pool.destroy();
} catch (Exception ex) {
log.warn("Cannot properly close Jedis pool", ex);
}
pool = null;
}

if (cluster != null) {

try {
cluster.close();
} catch (Exception ex) {
log.warn("Cannot properly close Jedis cluster", ex);
}

try {
clusterCommandExecutor.destroy();
} catch (Exception ex) {
log.warn("Cannot properly close cluster command executor", ex);
}
}

this.destroyed = true;
stop();
state.set(State.DESTROYED);
}

public RedisConnection getConnection() {
Expand Down Expand Up @@ -866,8 +902,19 @@ private MutableJedisClientConfiguration getMutableConfiguration() {
}

private void assertInitialized() {
Assert.state(this.initialized, "JedisConnectionFactory was not initialized through afterPropertiesSet()");
Assert.state(!this.destroyed, "JedisConnectionFactory was destroyed and cannot be used anymore");

State current = state.get();

if (State.STARTED.equals(current)) {
return;
}

switch (current) {
case CREATED, STOPPED -> throw new IllegalStateException(String.format("JedisConnectionFactory has been %s. Use start() to initialize it", current));
case DESTROYED -> throw new IllegalStateException(
"JedisConnectionFactory was destroyed and cannot be used anymore");
default -> throw new IllegalStateException(String.format("JedisConnectionFactory is %s", current));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.ExceptionTranslationStrategy;
Expand Down Expand Up @@ -116,8 +118,8 @@
* @author Andrea Como
* @author Chris Bono
*/
public class LettuceConnectionFactory
implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory {
public class LettuceConnectionFactory implements RedisConnectionFactory, ReactiveRedisConnectionFactory,
InitializingBean, DisposableBean, SmartLifecycle {

private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy(
LettuceExceptionConverter.INSTANCE);
Expand All @@ -144,8 +146,11 @@ public class LettuceConnectionFactory

private @Nullable ClusterCommandExecutor clusterCommandExecutor;

private boolean initialized;
private boolean destroyed;
enum State {
CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED;
}

private AtomicReference<State> state = new AtomicReference<>(State.CREATED);

/**
* Constructs a new {@link LettuceConnectionFactory} instance with default settings.
Expand Down Expand Up @@ -333,33 +338,78 @@ public static RedisConfiguration createRedisConfiguration(RedisURI redisUri) {
return LettuceConverters.createRedisStandaloneConfiguration(redisUri);
}

public void afterPropertiesSet() {
@Override
public void start() {

State current = state.getAndUpdate(state -> {
if (State.CREATED.equals(state) || State.STOPPED.equals(state)) {
return State.STARTING;
}
return state;
});

this.client = createClient();
if (State.CREATED.equals(current) || State.STOPPED.equals(current)) {

this.connectionProvider = new ExceptionTranslatingConnectionProvider(createConnectionProvider(client, CODEC));
this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider(
createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC));
this.client = createClient();

if (isClusterAware()) {
this.connectionProvider = new ExceptionTranslatingConnectionProvider(createConnectionProvider(client, CODEC));
this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider(
createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC));

if (isClusterAware()) {

this.clusterCommandExecutor = new ClusterCommandExecutor(
new LettuceClusterTopologyProvider((RedisClusterClient) client),
new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),
EXCEPTION_TRANSLATION);
}

state.set(State.STARTED);

this.clusterCommandExecutor = new ClusterCommandExecutor(
new LettuceClusterTopologyProvider((RedisClusterClient) client),
new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),
EXCEPTION_TRANSLATION);
if (getEagerInitialization() && getShareNativeConnection()) {
initConnection();
}
}
}

@Override
public void stop() {

this.initialized = true;
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
resetConnection();
dispose(connectionProvider);
dispose(reactiveConnectionProvider);
try {
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
Duration timeout = clientConfiguration.getShutdownTimeout();
client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
state.set(State.STOPPED);
} catch (Exception e) {

if (getEagerInitialization() && getShareNativeConnection()) {
initConnection();
if (log.isWarnEnabled()) {
log.warn((client != null ? ClassUtils.getShortName(client.getClass()) : "LettuceClient")
+ " did not shut down gracefully.", e);
}
}
state.set(State.STOPPED);
}
}

public void destroy() {
@Override
public boolean isRunning() {
return State.STARTED.equals(state.get());
}

resetConnection();
@Override
public void afterPropertiesSet() {
// customization hook. initialization happens in start
}

@Override
public void destroy() {

stop();
client = null;
if (clusterCommandExecutor != null) {

try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That code could go into stop and upon destroy it would be neat to reuse what stop does.

Expand All @@ -368,23 +418,7 @@ public void destroy() {
log.warn("Cannot properly close cluster command executor", ex);
}
}

dispose(connectionProvider);
dispose(reactiveConnectionProvider);

try {
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
Duration timeout = clientConfiguration.getShutdownTimeout();
client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (Exception e) {

if (log.isWarnEnabled()) {
log.warn((client != null ? ClassUtils.getShortName(client.getClass()) : "LettuceClient")
+ " did not shut down gracefully.", e);
}
}

this.destroyed = true;
state.set(State.DESTROYED);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

private void dispose(LettuceConnectionProvider connectionProvider) {
Expand Down Expand Up @@ -532,8 +566,6 @@ public void initConnection() {
*/
public void resetConnection() {

assertInitialized();

Optionals.toStream(Optional.ofNullable(connection), Optional.ofNullable(reactiveConnection))
.forEach(SharedConnection::resetConnection);

Expand Down Expand Up @@ -1267,8 +1299,19 @@ private RedisClient createBasicClient() {
}

private void assertInitialized() {
Assert.state(this.initialized, "LettuceConnectionFactory was not initialized through afterPropertiesSet()");
Assert.state(!this.destroyed, "LettuceConnectionFactory was destroyed and cannot be used anymore");

State current = state.get();

if (State.STARTED.equals(current)) {
return;
}

switch (current) {
case CREATED, STOPPED -> throw new IllegalStateException(String.format("LettuceConnectionFactory has been %s. Use start() to initialize it", current));
case DESTROYED -> throw new IllegalStateException(
"LettuceConnectionFactory was destroyed and cannot be used anymore");
default -> throw new IllegalStateException(String.format("LettuceConnectionFactory is %s", current));
}
}

private static void applyToAll(RedisURI source, Consumer<RedisURI> action) {
Expand Down
Loading