Skip to content

Health check #617

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 25, 2021
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ dependencies {
errorproneJavac('com.google.errorprone:javac:9+181-r4173-1')
errorprone('com.google.errorprone:error_prone_core:2.3.4')

compile group: 'com.uber.tchannel', name: 'tchannel-core', version: '0.8.5'
compile group: 'com.uber.tchannel', name: 'tchannel-core', version: '0.8.30'
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25'
compile group: 'org.apache.thrift', name: 'libthrift', version: '0.9.3'
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.6'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;

Expand All @@ -59,6 +56,7 @@ public class SyncWorkflowWorker
private final ScheduledExecutorService ldaHeartbeatExecutor = Executors.newScheduledThreadPool(4);
private SuspendableWorker ldaWorker;
private POJOActivityTaskHandler ldaTaskHandler;
private final IWorkflowService service;

public SyncWorkflowWorker(
IWorkflowService service,
Expand All @@ -74,6 +72,7 @@ public SyncWorkflowWorker(
ThreadPoolExecutor workflowThreadPool) {
Objects.requireNonNull(workflowThreadPool);
this.dataConverter = workflowOptions.getDataConverter();
this.service = service;

factory =
new POJOWorkflowImplementationFactory(
Expand Down Expand Up @@ -252,4 +251,8 @@ public <R> R queryWorkflowExecution(
public void accept(PollForDecisionTaskResponse pollForDecisionTaskResponse) {
workflowWorker.accept(pollForDecisionTaskResponse);
}

public CompletableFuture<Boolean> isHealthy() {
return service.isHealthy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -317,6 +318,11 @@ private class WorkflowServiceWrapper implements IWorkflowService {

private final IWorkflowService impl;

@Override
public CompletableFuture<Boolean> isHealthy() {
return impl.isHealthy();
}

private WorkflowServiceWrapper(IWorkflowService impl) {
if (impl == null) {
// Create empty implementation that just ignores all requests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,11 @@ public void GetWorkflowExecutionHistoryWithTimeout(
impl.GetWorkflowExecutionHistoryWithTimeout(getRequest, resultHandler, timeoutInMillis);
}

@Override
public CompletableFuture<Boolean> isHealthy() {
return impl.isHealthy();
}

@Override
public void PollForDecisionTask(
PollForDecisionTaskRequest pollRequest, AsyncMethodCallback resultHandler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,13 @@ public void GetWorkflowExecutionHistoryWithTimeout(
GetWorkflowExecutionHistory(getRequest, resultHandler);
}

@Override
public CompletableFuture<Boolean> isHealthy() {
CompletableFuture<Boolean> rval = new CompletableFuture<>();
rval.complete(Boolean.TRUE);
return rval;
}

@Override
public void PollForDecisionTask(
PollForDecisionTaskRequest pollRequest, AsyncMethodCallback resultHandler) throws TException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.uber.cadence.StartWorkflowExecutionRequest;
import com.uber.cadence.WorkflowService.AsyncIface;
import com.uber.cadence.WorkflowService.Iface;
import java.util.concurrent.CompletableFuture;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;

Expand Down Expand Up @@ -70,6 +71,7 @@ void GetWorkflowExecutionHistoryWithTimeout(
AsyncMethodCallback resultHandler,
Long timeoutInMillis)
throws TException;

/**
* SignalWorkflowExecutionWithTimeout signal workflow same as SignalWorkflowExecution but with
* timeout
Expand All @@ -84,4 +86,10 @@ void SignalWorkflowExecutionWithTimeout(
AsyncMethodCallback resultHandler,
Long timeoutInMillis)
throws TException;

/**
* Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer
* list
*/
CompletableFuture<Boolean> isHealthy();
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import com.uber.tchannel.errors.ErrorType;
import com.uber.tchannel.messages.ThriftRequest;
import com.uber.tchannel.messages.ThriftResponse;
import com.uber.tchannel.messages.generated.Meta;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
Expand All @@ -127,7 +128,7 @@ public class WorkflowServiceTChannel implements IWorkflowService {
private final ClientOptions options;
private final Map<String, String> thriftHeaders;
private final TChannel tChannel;
private final SubChannel subChannel;
private SubChannel subChannel;

/**
* Creates Cadence client that connects to the specified host and port using specified options.
Expand Down Expand Up @@ -159,6 +160,13 @@ public WorkflowServiceTChannel(ClientOptions options) {
+ Version.FEATURE_VERSION);
}

public void resetSubchannelPeers() throws UnknownHostException {
InetAddress address = InetAddress.getByName(options.getHost());
ArrayList<InetSocketAddress> peers = new ArrayList<>();
peers.add(new InetSocketAddress(address, options.getPort()));
this.subChannel.setPeers(peers);
}

/**
* Creates Cadence client with specified sub channel and options.
*
Expand Down Expand Up @@ -207,6 +215,49 @@ private <T> ThriftRequest<T> buildThriftRequest(String apiName, T body) {
return buildThriftRequest(apiName, body, null);
}

/**
* Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer
* list
*/
@Override
public CompletableFuture<Boolean> isHealthy() {
final ThriftRequest<Meta.health_args> req =
new ThriftRequest.Builder<Meta.health_args>(options.getServiceName(), "Meta::health")
.setBody(new Meta.health_args())
.build();
final CompletableFuture<Boolean> result = new CompletableFuture<>();
try {

final TFuture<ThriftResponse<Meta.health_result>> future = this.subChannel.send(req);
future.addCallback(
response -> {
req.releaseQuietly();
if (response.isError()) {
try {
this.resetSubchannelPeers();
} catch (final Exception inner_e) {
}
result.completeExceptionally(new TException("Rpc error:" + response.getError()));
} else {
result.complete(response.getBody(Meta.health_result.class).getSuccess().isOk());
}
try {
response.release();
} catch (final Exception e) {
// ignore
}
});
} catch (final TChannelError e) {
req.releaseQuietly();
try {
this.resetSubchannelPeers();
} catch (final Exception inner_e) {
}
result.complete(Boolean.FALSE);
}
return result;
}

private <T> ThriftRequest<T> buildThriftRequest(String apiName, T body, Long rpcTimeoutOverride) {
String endpoint = getEndpoint(INTERFACE_NAME, apiName);
ThriftRequest.Builder<T> builder =
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/com/uber/cadence/worker/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -326,4 +327,12 @@ public void resumePolling() {
public boolean isSuspended() {
return workflowWorker.isSuspended() && activityWorker.isSuspended();
}

/**
* Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer
* list
*/
public CompletableFuture<Boolean> isHealthy() {
return workflowWorker.isHealthy();
}
}
16 changes: 16 additions & 0 deletions src/main/java/com/uber/cadence/worker/WorkerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -287,6 +289,20 @@ public synchronized void shutdownNow() {
}
}

/**
* Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer
* list
*/
public CompletableFuture<Boolean> isHealthy() {
List<CompletableFuture<Boolean>> healthyList =
workers.stream().map(Worker::isHealthy).collect(Collectors.toList());
CompletableFuture<Boolean> result = CompletableFuture.supplyAsync(() -> true);
for (CompletableFuture<Boolean> future : healthyList) {
result = result.thenCombine(future, (current, other) -> current && other);
}
return result;
}

/**
* Blocks until all tasks have completed execution after a shutdown request, or the timeout
* occurs, or the current thread is interrupted, whichever happens first.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package com.uber.cadence.workerFactory;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

import com.uber.cadence.client.WorkflowClient;
import com.uber.cadence.serviceclient.ClientOptions;
Expand Down Expand Up @@ -65,6 +64,11 @@ public void whenAFactoryIsStartedAllWorkersStart() {

factory.start();
assertTrue(factory.isStarted());
try {
assertTrue(factory.isHealthy().get());
} catch (Exception e) {
assertNull("Failed to check if cluster is health!", e);
}
factory.shutdown();
factory.awaitTermination(1, TimeUnit.SECONDS);
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
</configuration>