Skip to content

Commit a6abb9d

Browse files
committed
feat(filesystem): allow to configure S3 client RetryPolicy with max.retries and back.off delay (#247)
Resolves: #247
1 parent ab74f57 commit a6abb9d

File tree

2 files changed

+81
-2
lines changed

2 files changed

+81
-2
lines changed

connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/AmazonS3ClientConfig.java

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.amazonaws.auth.AWSCredentialsProvider;
2222
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
2323
import com.amazonaws.regions.Regions;
24+
import com.amazonaws.retry.PredefinedRetryPolicies;
2425
import io.streamthoughts.kafka.connect.filepulse.internal.StringUtils;
2526
import org.apache.kafka.common.config.AbstractConfig;
2627
import org.apache.kafka.common.config.ConfigDef;
@@ -68,6 +69,24 @@ public class AmazonS3ClientConfig extends AbstractConfig {
6869
private static final String AWS_CREDENTIALS_PROVIDER_DOC = "The AWSCredentialsProvider to use if no access key id and secret access key is configured";
6970
public static final String AWS_CREDENTIALS_PROVIDER_DEFAULT = EnvironmentVariableCredentialsProvider.class.getName();
7071

72+
public static final String AWS_S3_RETRY_BACKOFF_DELAY_MS_CONFIG = "aws.s3.backoff.delay.ms";
73+
public static final String AWS_S3_RETRY_BACKOFF_DELAY_MS_DOC = "The base back-off time (milliseconds) before retrying a request.";
74+
// Default values from AWS SDK (see: PredefinedBackoffStrategies)
75+
public static final int AWS_S3_RETRY_BACKOFF_DELAY_MS_DEFAULT = 100;
76+
public static final String AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG = "aws.s3.backoff.max.delay.ms";
77+
public static final String AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_DOC = "The maximum back-off time (in milliseconds) before retrying a request.";
78+
// Default values from AWS SDK (see: PredefinedBackoffStrategies)
79+
public static final int AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT = 20_000;
80+
public static final String AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG = "aws.s3.backoff.max.retries";
81+
public static final String AWS_S3_RETRY_BACKOFF_MAX_RETRIES_DOC = "The maximum number of retry attempts for failed retryable requests.";
82+
// Default values from AWS SDK (see: PredefinedBackoffStrategies)
83+
84+
// Maximum retry limit. Avoids integer overflow issues.
85+
// NOTE: If the value is greater than 30, there can be integer overflow issues during delay calculation.
86+
public static final int AWS_S3_RETRY_BACKOFF_MAX_RETRIES_MAX_VALUE = 30;
87+
88+
public static final int AWS_S3_RETRY_BACKOFF_MAX_RETRIES_DEFAULT = PredefinedRetryPolicies.DEFAULT_MAX_ERROR_RETRY;
89+
7190
/**
7291
* Creates a new {@link AmazonS3ClientConfig} instance.
7392
*
@@ -113,6 +132,18 @@ public AWSCredentialsProvider getAwsCredentialsProvider() {
113132
return getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CLASS, AWSCredentialsProvider.class);
114133
}
115134

135+
public int getAwsS3RetryBackoffDelayMs() {
136+
return getInt(AWS_S3_RETRY_BACKOFF_DELAY_MS_CONFIG);
137+
}
138+
139+
public int getAwsS3RetryBackoffMaxDelayMs() {
140+
return getInt(AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG);
141+
}
142+
143+
public int getAwsS3RetryBackoffMaxRetries() {
144+
return getInt(AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG);
145+
}
146+
116147
/**
117148
* @return the {@link ConfigDef}.
118149
*/
@@ -225,12 +256,48 @@ static ConfigDef getConf() {
225256
AWS_CREDENTIALS_PROVIDER_CLASS,
226257
ConfigDef.Type.CLASS,
227258
AWS_CREDENTIALS_PROVIDER_DEFAULT,
228-
ConfigDef.Importance.HIGH,
259+
ConfigDef.Importance.MEDIUM,
229260
AWS_CREDENTIALS_PROVIDER_DOC,
230261
GROUP_AWS,
231262
awsGroupCounter++,
232263
ConfigDef.Width.NONE,
233264
AWS_CREDENTIALS_PROVIDER_CLASS
265+
)
266+
.define(
267+
AWS_S3_RETRY_BACKOFF_DELAY_MS_CONFIG,
268+
ConfigDef.Type.INT,
269+
AWS_S3_RETRY_BACKOFF_DELAY_MS_DEFAULT,
270+
ConfigDef.Range.atLeast(1),
271+
ConfigDef.Importance.MEDIUM,
272+
AWS_S3_RETRY_BACKOFF_DELAY_MS_DOC,
273+
GROUP_AWS,
274+
awsGroupCounter++,
275+
ConfigDef.Width.NONE,
276+
AWS_S3_RETRY_BACKOFF_DELAY_MS_CONFIG
277+
)
278+
.define(
279+
AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG,
280+
ConfigDef.Type.INT,
281+
AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT,
282+
ConfigDef.Range.atLeast(1),
283+
ConfigDef.Importance.MEDIUM,
284+
AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_DOC,
285+
GROUP_AWS,
286+
awsGroupCounter++,
287+
ConfigDef.Width.NONE,
288+
AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG
289+
)
290+
.define(
291+
AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG,
292+
ConfigDef.Type.INT,
293+
AWS_S3_RETRY_BACKOFF_MAX_RETRIES_DEFAULT,
294+
ConfigDef.Range.between(1, AWS_S3_RETRY_BACKOFF_MAX_RETRIES_MAX_VALUE),
295+
ConfigDef.Importance.MEDIUM,
296+
AWS_S3_RETRY_BACKOFF_MAX_RETRIES_DOC,
297+
GROUP_AWS,
298+
awsGroupCounter++,
299+
ConfigDef.Width.NONE,
300+
AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG
234301
);
235302
}
236303

connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/AmazonS3ClientUtils.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import com.amazonaws.auth.BasicAWSCredentials;
2727
import com.amazonaws.auth.BasicSessionCredentials;
2828
import com.amazonaws.client.builder.AwsClientBuilder;
29+
import com.amazonaws.retry.PredefinedBackoffStrategies;
30+
import com.amazonaws.retry.PredefinedRetryPolicies;
31+
import com.amazonaws.retry.RetryPolicy;
2932
import com.amazonaws.services.s3.AmazonS3;
3033
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
3134
import io.streamthoughts.kafka.connect.filepulse.annotation.VisibleForTesting;
@@ -63,7 +66,16 @@ public static AmazonS3 createS3Client(final AmazonS3ClientConfig config) {
6366
*/
6467
public static AmazonS3 createS3Client(final AmazonS3ClientConfig config,
6568
final String url) {
66-
final ClientConfiguration clientConfiguration = PredefinedClientConfigurations.defaultConfig();
69+
final ClientConfiguration clientConfiguration = PredefinedClientConfigurations.defaultConfig()
70+
.withRetryPolicy(new RetryPolicy(
71+
PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION,
72+
new PredefinedBackoffStrategies.FullJitterBackoffStrategy(
73+
config.getAwsS3RetryBackoffDelayMs(),
74+
config.getAwsS3RetryBackoffMaxDelayMs()
75+
),
76+
config.getAwsS3RetryBackoffMaxRetries(),
77+
false)
78+
);
6779

6880
AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard()
6981
.withPathStyleAccessEnabled(config.isAwsS3PathStyleAccessEnabled())

0 commit comments

Comments
 (0)