-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Refactor Jedis and Lettuce RedisConnectionFactory
to SmartLifecycle
beans
#2627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,11 +26,13 @@ | |
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CompletionStage; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.Consumer; | ||
import java.util.stream.Collectors; | ||
|
||
import org.springframework.beans.factory.DisposableBean; | ||
import org.springframework.beans.factory.InitializingBean; | ||
import org.springframework.context.SmartLifecycle; | ||
import org.springframework.dao.DataAccessException; | ||
import org.springframework.dao.InvalidDataAccessApiUsageException; | ||
import org.springframework.data.redis.ExceptionTranslationStrategy; | ||
|
@@ -116,8 +118,8 @@ | |
* @author Andrea Como | ||
* @author Chris Bono | ||
*/ | ||
public class LettuceConnectionFactory | ||
implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory { | ||
public class LettuceConnectionFactory implements RedisConnectionFactory, ReactiveRedisConnectionFactory, | ||
InitializingBean, DisposableBean, SmartLifecycle { | ||
|
||
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy( | ||
LettuceExceptionConverter.INSTANCE); | ||
|
@@ -144,8 +146,11 @@ public class LettuceConnectionFactory | |
|
||
private @Nullable ClusterCommandExecutor clusterCommandExecutor; | ||
|
||
private boolean initialized; | ||
private boolean destroyed; | ||
enum State { | ||
CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED; | ||
} | ||
|
||
private AtomicReference<State> state = new AtomicReference<>(State.CREATED); | ||
|
||
/** | ||
* Constructs a new {@link LettuceConnectionFactory} instance with default settings. | ||
|
@@ -333,33 +338,78 @@ public static RedisConfiguration createRedisConfiguration(RedisURI redisUri) { | |
return LettuceConverters.createRedisStandaloneConfiguration(redisUri); | ||
} | ||
|
||
public void afterPropertiesSet() { | ||
@Override | ||
public void start() { | ||
|
||
State current = state.getAndUpdate(state -> { | ||
if (State.CREATED.equals(state) || State.STOPPED.equals(state)) { | ||
return State.STARTING; | ||
} | ||
return state; | ||
}); | ||
|
||
this.client = createClient(); | ||
if (State.CREATED.equals(current) || State.STOPPED.equals(current)) { | ||
|
||
this.connectionProvider = new ExceptionTranslatingConnectionProvider(createConnectionProvider(client, CODEC)); | ||
this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider( | ||
createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC)); | ||
this.client = createClient(); | ||
|
||
if (isClusterAware()) { | ||
this.connectionProvider = new ExceptionTranslatingConnectionProvider(createConnectionProvider(client, CODEC)); | ||
this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider( | ||
createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC)); | ||
|
||
if (isClusterAware()) { | ||
|
||
this.clusterCommandExecutor = new ClusterCommandExecutor( | ||
new LettuceClusterTopologyProvider((RedisClusterClient) client), | ||
new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider), | ||
EXCEPTION_TRANSLATION); | ||
} | ||
|
||
state.set(State.STARTED); | ||
|
||
this.clusterCommandExecutor = new ClusterCommandExecutor( | ||
new LettuceClusterTopologyProvider((RedisClusterClient) client), | ||
new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider), | ||
EXCEPTION_TRANSLATION); | ||
if (getEagerInitialization() && getShareNativeConnection()) { | ||
initConnection(); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void stop() { | ||
|
||
this.initialized = true; | ||
if (state.compareAndSet(State.STARTED, State.STOPPING)) { | ||
resetConnection(); | ||
dispose(connectionProvider); | ||
dispose(reactiveConnectionProvider); | ||
try { | ||
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod(); | ||
Duration timeout = clientConfiguration.getShutdownTimeout(); | ||
client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS); | ||
state.set(State.STOPPED); | ||
} catch (Exception e) { | ||
|
||
if (getEagerInitialization() && getShareNativeConnection()) { | ||
initConnection(); | ||
if (log.isWarnEnabled()) { | ||
log.warn((client != null ? ClassUtils.getShortName(client.getClass()) : "LettuceClient") | ||
+ " did not shut down gracefully.", e); | ||
} | ||
} | ||
state.set(State.STOPPED); | ||
} | ||
} | ||
|
||
public void destroy() { | ||
@Override | ||
public boolean isRunning() { | ||
return State.STARTED.equals(state.get()); | ||
} | ||
|
||
resetConnection(); | ||
@Override | ||
public void afterPropertiesSet() { | ||
// customization hook. initialization happens in start | ||
} | ||
|
||
@Override | ||
public void destroy() { | ||
|
||
stop(); | ||
client = null; | ||
if (clusterCommandExecutor != null) { | ||
|
||
try { | ||
|
@@ -368,23 +418,7 @@ public void destroy() { | |
log.warn("Cannot properly close cluster command executor", ex); | ||
} | ||
} | ||
|
||
dispose(connectionProvider); | ||
dispose(reactiveConnectionProvider); | ||
|
||
try { | ||
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod(); | ||
Duration timeout = clientConfiguration.getShutdownTimeout(); | ||
client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS); | ||
} catch (Exception e) { | ||
|
||
if (log.isWarnEnabled()) { | ||
log.warn((client != null ? ClassUtils.getShortName(client.getClass()) : "LettuceClient") | ||
+ " did not shut down gracefully.", e); | ||
} | ||
} | ||
|
||
this.destroyed = true; | ||
state.set(State.DESTROYED); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
} | ||
|
||
private void dispose(LettuceConnectionProvider connectionProvider) { | ||
|
@@ -532,8 +566,6 @@ public void initConnection() { | |
*/ | ||
public void resetConnection() { | ||
|
||
assertInitialized(); | ||
|
||
Optionals.toStream(Optional.ofNullable(connection), Optional.ofNullable(reactiveConnection)) | ||
.forEach(SharedConnection::resetConnection); | ||
|
||
|
@@ -1267,8 +1299,19 @@ private RedisClient createBasicClient() { | |
} | ||
|
||
private void assertInitialized() { | ||
Assert.state(this.initialized, "LettuceConnectionFactory was not initialized through afterPropertiesSet()"); | ||
Assert.state(!this.destroyed, "LettuceConnectionFactory was destroyed and cannot be used anymore"); | ||
|
||
State current = state.get(); | ||
|
||
if (State.STARTED.equals(current)) { | ||
return; | ||
} | ||
|
||
switch (current) { | ||
case CREATED, STOPPED -> throw new IllegalStateException(String.format("LettuceConnectionFactory has been %s. Use start() to initialize it", current)); | ||
case DESTROYED -> throw new IllegalStateException( | ||
"LettuceConnectionFactory was destroyed and cannot be used anymore"); | ||
default -> throw new IllegalStateException(String.format("LettuceConnectionFactory is %s", current)); | ||
} | ||
} | ||
|
||
private static void applyToAll(RedisURI source, Consumer<RedisURI> action) { | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That code could go into
stop
and upon destroy it would be neat to reuse whatstop
does.