Skip to content

fix(plugin): fix NPE in DefaultFileSystemMonitor (#465) #487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package io.streamthoughts.kafka.connect.filepulse.storage;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;

Expand All @@ -26,6 +27,10 @@
*/
public class StateSnapshot<T> {

public static <T> StateSnapshot<T> empty() {
return new StateSnapshot<>(-1, Collections.emptyMap());
}

private final long offset;

private final Map<String, T> states;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectStatus;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Predicate;
Expand All @@ -39,21 +40,35 @@ public class SourceConnectorConfig extends CommonSourceConfig {
private static final String FS_CLEANUP_POLICY_EXECUTE_DOC = "Specify the status when a file get cleanup. Valid values are: " + Arrays.toString(FS_CLEANUP_POLICY_EXECUTE_VALID_VALUES);

/* Settings for FileSystemMonitorThread */
public static final String FS_LISTING_INTERVAL_MS_CONFIG = "fs.listing.interval.ms";
private static final String FS_LISTING_INTERVAL_MS_DOC = "The time interval, in milliseconds, in which the connector invokes the scan of the filesystem.";
private static final long FS_LISTING_INTERVAL_MS_DEFAULT = 10000L;
public static final String FS_LISTING_INTERVAL_MS_CONFIG = "fs.listing.interval.ms";
private static final String FS_LISTING_INTERVAL_MS_DOC = "The time interval, in milliseconds, in which the connector invokes the scan of the filesystem.";
private static final long FS_LISTING_INTERVAL_MS_DEFAULT = 10000L;

/* Settings for DefaultFileSystemMonitor */
public static final String STATE_INITIAL_READ_TIMEOUT_MS_CONFIG = "state.initial.read.timeout.ms";
public static final String STATE_INITIAL_READ_TIMEOUT_MS_DOC = "The maximum amount of time in milliseconds " +
"the filesystem monitor thread waits to read all the file processing states before timing out. " +
"This property is used only on connector startup.";
public static final long STATE_INITIAL_READ_TIMEOUT_MS_DEFAULT = 300000L;

public static final String STATE_DEFAULT_READ_TIMEOUT_MS_CONFIG = "state.default.read.timeout.ms";
public static final String STATE_DEFAULT_READ_TIMEOUT_MS_DOC = "The maximum amount of time in milliseconds " +
"the filesystem monitor thread waits to read all the file processing states before timing out.";
public static final long STATE_DEFAULT_READ_TIMEOUT_MS_DEFAULT = 5000L;

/* Settings for FilePulseSourceConnector */
public static final String MAX_SCHEDULED_FILES_CONFIG = "max.scheduled.files";
private static final String MAX_SCHEDULED_FILES_DOC = "Maximum number of files that can be schedules to tasks.";
private static final int MAX_SCHEDULED_FILES_DEFAULT = 1000;
public static final String MAX_SCHEDULED_FILES_CONFIG = "max.scheduled.files";
private static final String MAX_SCHEDULED_FILES_DOC = "Maximum number of files that can be schedules to tasks.";
private static final int MAX_SCHEDULED_FILES_DEFAULT = 1000;

public static final String FS_LISTING_TASK_DELEGATION_ENABLED_CONFIG = "fs.listing.task.delegation.enabled";
private static final String FS_LISTING_TASK_DELEGATION_ENABLED_DOC = "Boolean indicating whether the file listing process should be delegated to tasks.";


/**
* Creates a new {@link SourceConnectorConfig} instance.
* @param originals the originals configuration.
*
* @param originals the original configuration.
*/
public SourceConnectorConfig(final Map<?, ?> originals) {
super(getConf(), originals);
Expand Down Expand Up @@ -106,6 +121,20 @@ public static ConfigDef getConf() {
ConfigDef.ValidString.in(FS_CLEANUP_POLICY_EXECUTE_VALID_VALUES),
ConfigDef.Importance.MEDIUM,
FS_CLEANUP_POLICY_EXECUTE_DOC
)
.define(
STATE_INITIAL_READ_TIMEOUT_MS_CONFIG,
ConfigDef.Type.LONG,
STATE_INITIAL_READ_TIMEOUT_MS_DEFAULT,
ConfigDef.Importance.MEDIUM,
STATE_INITIAL_READ_TIMEOUT_MS_DOC
)
.define(
STATE_DEFAULT_READ_TIMEOUT_MS_CONFIG,
ConfigDef.Type.LONG,
STATE_DEFAULT_READ_TIMEOUT_MS_DEFAULT,
ConfigDef.Importance.MEDIUM,
STATE_DEFAULT_READ_TIMEOUT_MS_DOC
);
}

Expand Down Expand Up @@ -136,4 +165,12 @@ public long getListingInterval() {
public boolean isFileListingTaskDelegationEnabled() {
return getBoolean(FS_LISTING_TASK_DELEGATION_ENABLED_CONFIG);
}

public Duration getStateDefaultReadTimeoutMs() {
return Duration.ofMillis(getLong(STATE_DEFAULT_READ_TIMEOUT_MS_CONFIG));
}

public Duration getStateInitialReadTimeoutMs() {
return Duration.ofMillis(getLong(STATE_INITIAL_READ_TIMEOUT_MS_CONFIG));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ public class DefaultFileSystemMonitor implements FileSystemMonitor {

private static final Logger LOG = LoggerFactory.getLogger(DefaultFileSystemMonitor.class);

private static final Duration ON_START_READ_END_LOG_TIMEOUT = Duration.ofSeconds(30);
private static final Duration DEFAULT_READ_END_LOG_TIMEOUT = Duration.ofSeconds(5);
private static final int MAX_SCHEDULE_ATTEMPTS = 3;

private final FileSystemListing<?> fsListing;
Expand Down Expand Up @@ -99,12 +97,16 @@ public class DefaultFileSystemMonitor implements FileSystemMonitor {

private final TaskFileOrder taskFileOrder;

private Duration stateInitialReadTimeout = Duration.ofMinutes(5);

private Duration stateDefaultReadTimeout = Duration.ofSeconds(5);

/**
* Creates a new {@link DefaultFileSystemMonitor} instance.
*
* @param allowTasksReconfigurationAfterTimeoutMs {@code true} to allow tasks reconfiguration after a timeout.
* @param fsListening the {@link FileSystemListing} to be used for listing object files.
* @param cleanPolicy the {@link GenericFileCleanupPolicy} to be used for cleaning object files.
* @param cleanPolicy the {@link GenericFileCleanupPolicy} to be used for cleaning object files.
* @param offsetPolicy the {@link SourceOffsetPolicy} to be used computing offset for object fileS.
* @param store the {@link StateBackingStore} used for storing object file cursor.
*/
Expand Down Expand Up @@ -162,9 +164,9 @@ public void onStateUpdate(final String key, final FileObject object) {
final FileObjectMeta removed = scheduled.remove(objectId);
if (removed == null && status.isOneOf(FileObjectStatus.CLEANED)) {
LOG.debug(
"Received cleaned status but no object-file currently scheduled for: '{}'. " +
"This warn should only occurred during recovering step",
key
"Received cleaned status but no object-file currently scheduled for: '{}'. " +
"This warn should only occurred during recovering step",
key
);
}
}
Expand All @@ -177,23 +179,29 @@ public void onStateUpdate(final String key, final FileObject object) {
"with tasks processing files is already started. You can ignore that warning if the connector " +
" is recovering from a crash or resuming after being paused.");
}
readStatesToEnd(ON_START_READ_END_LOG_TIMEOUT);
recoverPreviouslyCompletedSources();
// Trigger a cleanup during initialization to ensure that all cleanable
// object-files are eventually removed before scheduling any tasks.
cleanUpCompletedFiles();

if (readStatesToEnd(stateInitialReadTimeout)) {
recoverPreviouslyCompletedSources();
// Trigger a cleanup during initialization to ensure that all cleanable
// object-files are eventually removed before scheduling any tasks.
cleanUpCompletedFiles();
} else {
LOG.warn("Cannot recover completed files from previous execution. State is empty.");
}
LOG.info("Initialized FileSystemMonitor");
}

private void recoverPreviouslyCompletedSources() {
LOG.info("Recovering completed files from a previous execution");
fileState.states()
.entrySet()
.stream()
.map(it -> it.getValue().withKey(FileObjectKey.of(it.getKey())))
.filter(it -> cleanablePredicate.test(it.status()))
.forEach(cleanable::add);
LOG.info("Finished recovering previously completed files : {}", cleanable);
if (fileState != null && !fileState.states().isEmpty()) {
LOG.info("Recovering completed files from a previous execution");
fileState.states()
.entrySet()
.stream()
.map(it -> it.getValue().withKey(FileObjectKey.of(it.getKey())))
.filter(it -> cleanablePredicate.test(it.status()))
.forEach(cleanable::add);
LOG.info("Finished recovering completed files from previous execution: {}", cleanable);
}
}

private boolean readStatesToEnd(final Duration timeout) {
Expand All @@ -202,14 +210,23 @@ private boolean readStatesToEnd(final Duration timeout) {
fileState = store.snapshot();
LOG.debug(
"Finished reading to end of log and updated states snapshot, new states log position: {}",
fileState.offset());
fileState.offset()
);
return true;
} catch (TimeoutException e) {
LOG.warn("Failed to reach end of states log quickly enough", e);
return false;
}
}

public void setStateInitialReadTimeout(final Duration stateInitialReadTimeout) {
this.stateInitialReadTimeout = stateInitialReadTimeout;
}

public void setStateDefaultReadTimeout(final Duration stateDefaultReadTimeout) {
this.stateDefaultReadTimeout = stateDefaultReadTimeout;
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -267,13 +284,13 @@ private synchronized boolean updateFiles() {
final boolean noScheduledFiles = scheduled.isEmpty();
if (!noScheduledFiles && allowTasksReconfigurationAfterTimeoutMs == Long.MAX_VALUE) {
LOG.info(
"Scheduled files still being processed: {}. Skip filesystem listing while waiting for tasks completion",
scheduled.size()
"Scheduled files still being processed: {}. Skip filesystem listing while waiting for tasks completion",
scheduled.size()
);
return false;
}

boolean toEnd = readStatesToEnd(DEFAULT_READ_END_LOG_TIMEOUT);
boolean toEnd = readStatesToEnd(stateDefaultReadTimeout);
if (noScheduledFiles && !toEnd) {
LOG.warn("Failed to read state changelog. Skip filesystem listing due to timeout");
return false;
Expand Down Expand Up @@ -315,7 +332,7 @@ private synchronized boolean updateFiles() {
if (timeout > 0) {
LOG.info(
"Scheduled files still being processed ({}) but new files detected. " +
"Waiting for {} ms before allowing task reconfiguration",
"Waiting for {} ms before allowing task reconfiguration",
scheduled.size(),
timeout
);
Expand Down Expand Up @@ -372,13 +389,13 @@ public List<FileObjectMeta> listFilesToSchedule(final int maxFilesToSchedule) {
do {
changed.set(false);
LOG.info(
"Preparing next scheduling using the object files found during last iteration (attempt={}/{}).",
attempts + 1,
MAX_SCHEDULE_ATTEMPTS
"Preparing next scheduling using the object files found during last iteration (attempt={}/{}).",
attempts + 1,
MAX_SCHEDULE_ATTEMPTS
);
// Try to read states to end to make sure we do not attempt
// to schedule an object file that has been cleanup.
final boolean toEnd = readStatesToEnd(DEFAULT_READ_END_LOG_TIMEOUT);
final boolean toEnd = readStatesToEnd(stateDefaultReadTimeout);
if (!toEnd) {
LOG.warn("Failed to read state changelog while scheduling object files. Timeout.");
}
Expand All @@ -400,8 +417,8 @@ public List<FileObjectMeta> listFilesToSchedule(final int maxFilesToSchedule) {
if (changed.get()) {
if (attempts == MAX_SCHEDULE_ATTEMPTS) {
LOG.warn(
"Failed to prepare the object files after attempts: {}.",
MAX_SCHEDULE_ATTEMPTS
"Failed to prepare the object files after attempts: {}.",
MAX_SCHEDULE_ATTEMPTS
);
// Make sure to clear the schedule list before returning.
scheduled.clear();
Expand All @@ -415,8 +432,8 @@ public List<FileObjectMeta> listFilesToSchedule(final int maxFilesToSchedule) {

if (partitions.isEmpty()) {
LOG.warn(
"Filesystem could not be scanned quickly enough, " +
"or no object file was detected after starting the connector."
"Filesystem could not be scanned quickly enough, " +
"or no object file was detected after starting the connector."
);
}
return taskFileOrder.sort(partitions);
Expand All @@ -434,7 +451,7 @@ public void close() {
if (running.compareAndSet(true, false)) {
try {
LOG.info("Closing FileSystemMonitor resources");
readStatesToEnd(DEFAULT_READ_END_LOG_TIMEOUT);
readStatesToEnd(stateDefaultReadTimeout);
cleanUpCompletedFiles();
LOG.info("Closed FileSystemMonitor resources");
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.streamthoughts.kafka.connect.filepulse.fs.FileSystemListing;
import io.streamthoughts.kafka.connect.filepulse.fs.FileSystemMonitor;
import io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreAccess;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -67,7 +68,7 @@ public class FilePulseSourceConnector extends SourceConnector {

private SourceConnectorConfig connectorConfig;

private FileSystemMonitor monitor;
private FileSystemMonitor fsMonitor;

private String connectorGroupName;

Expand Down Expand Up @@ -110,22 +111,8 @@ public void start(final Map<String, String> props) {
);

partitioner = connectorConfig.getTaskPartitioner();

final FileSystemListing<?> fileSystemListing = connectorConfig.getFileSystemListing();
fileSystemListing.setFilter(new CompositeFileListFilter(connectorConfig.getFileSystemListingFilter()));

monitor = new DefaultFileSystemMonitor(
connectorConfig.allowTasksReconfigurationAfterTimeoutMs(),
fileSystemListing,
connectorConfig.getFsCleanupPolicy(),
connectorConfig.getFsCleanupPolicyPredicate(),
connectorConfig.getSourceOffsetPolicy(),
sharedStore.get().getResource(),
connectorConfig.getTaskFilerOrder()
);

monitor.setFileSystemListingEnabled(!connectorConfig.isFileListingTaskDelegationEnabled());
fsMonitorThread = new FileSystemMonitorThread(context, monitor, connectorConfig.getListingInterval());
fsMonitor = createFileSystemMonitor(connectorConfig, sharedStore.get().getResource());
fsMonitorThread = new FileSystemMonitorThread(context, fsMonitor, connectorConfig.getListingInterval());
fsMonitorThread.setUncaughtExceptionHandler((t, e) -> {
LOG.info("Uncaught error from file system monitoring thread [{}]", t.getName(), e);
context.raiseError(new ConnectException("Unexpected error from FileSystemMonitorThread", e));
Expand All @@ -138,6 +125,29 @@ public void start(final Map<String, String> props) {
}
}

private FileSystemMonitor createFileSystemMonitor(final SourceConnectorConfig connectorConfig,
final StateBackingStore<FileObject> store) {

final FileSystemListing<?> fileSystemListing = connectorConfig.getFileSystemListing();
fileSystemListing.setFilter(new CompositeFileListFilter(connectorConfig.getFileSystemListingFilter()));

DefaultFileSystemMonitor monitor = new DefaultFileSystemMonitor(
connectorConfig.allowTasksReconfigurationAfterTimeoutMs(),
fileSystemListing,
connectorConfig.getFsCleanupPolicy(),
connectorConfig.getFsCleanupPolicyPredicate(),
connectorConfig.getSourceOffsetPolicy(),
store,
connectorConfig.getTaskFilerOrder()
);

monitor.setStateDefaultReadTimeout(connectorConfig.getStateDefaultReadTimeoutMs());
monitor.setStateInitialReadTimeout(connectorConfig.getStateInitialReadTimeoutMs());
monitor.setFileSystemListingEnabled(!connectorConfig.isFileListingTaskDelegationEnabled());

return monitor;
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -194,7 +204,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
}

private List<List<String>> partitionAndGet(int maxTasks) {
final List<FileObjectMeta> files = monitor.listFilesToSchedule(connectorConfig.getMaxScheduledFiles());
final List<FileObjectMeta> files = fsMonitor.listFilesToSchedule(connectorConfig.getMaxScheduledFiles());
return partitioner.partition(files, maxTasks)
.stream()
.map(it -> it.stream().map(Object::toString).collect(Collectors.toList()))
Expand Down
Loading