Skip to content

DefaultFileSystemMonitor produces NPE during connector start #465

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
madoed opened this issue May 5, 2023 · 2 comments · Fixed by #487
Closed

DefaultFileSystemMonitor produces NPE during connector start #465

madoed opened this issue May 5, 2023 · 2 comments · Fixed by #487
Labels
released Issue has been released

Comments

@madoed
Copy link

madoed commented May 5, 2023

Describe the bug
We are using FP to read CSV file s from S3 to Kafka. After some time of work connector stopped to accept new files and after restarting of connector we are got next error stacktrace:

java.lang.NullPointerException 
io.streamthoughts.kafka.connect.filepulse.fs.DefaultFileSystemMonitor.recoverPreviouslyCompletedSources(DefaultFileSystemMonitor.java:192) 
 io.streamthoughts.kafka.connect.filepulse.fs.DefaultFileSystemMonitor.<init>(DefaultFileSystemMonitor.java:183) 
 io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector.start(FilePulseSourceConnector.java:125) 
 org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184) 
 org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209) 
 org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348) 
 org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331) 
 org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140) 
 org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117) 
 java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 
 java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
 java.base/java.lang.Thread.run(Thread.java:829)

Looks like something went wrong during processing file and I guess file was deleted from S3.Also it could be related to #457. Because our auto tests may produce empty csv file.

To Reproduce
Unfortunately I didn't find a way to reproduce it locally.
Expected behavior
Successful recover after connector restart.

Screenshots
N/A

Additional context
We are using FP 2.7.0
Here our configuration:

{
    "name": "<connector_name>",
    "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
    "topic": "<output_topic>",
    "tasks.max": "1",
    "fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing",
    "fs.listing.interval.ms": "10000",
    "fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
    "file.filter.regex.pattern": ".*\\.csv$",
    "aws.access.key.id": "<access_key>",
    "aws.secret.access.key": "<secret_key>",
    "aws.s3.service.endpoint": "<s3_url>",
    "aws.s3.bucket.name": "<some_bucket_name>",
    "aws.s3.bucket.prefix": "incoming",
    "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy",
    "fs.cleanup.policy.move.success.aws.prefix.path": "processed",
    "fs.cleanup.policy.move.failure.aws.prefix.path": "failed",
    "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.AmazonS3RowFileInputReader",
    "offset.attributes.string": "name+lastModified",
    "filters": "ParseCSVLine",
    "filters.ParseCSVLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
    "filters.ParseCSVLine.columns": "offerId:LONG;clientId:STRING;clientType:INTEGER",
    "filters.ParseCSVLine.trimColumn": "true",
    "filters.ParseCSVLine.separator": ";",
    "value.connect.schema": "{\"name\":\"ru.tinkoff.target.experiments.avro.client.ClientOfferAvro\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{\"clientId\":{\"type\":\"STRING\",\"isOptional\":false},\"offerId\":{\"type\":\"INT64\",\"isOptional\":false},\"clientType\":{\"type\":\"INT32\",\"isOptional\":false}},\"version\":1}",
    "tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
    "tasks.file.status.storage.topic": "<status_topic>",
    "tasks.file.status.storage.bootstrap.servers": "broker:29092",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
}

@fhussonnois
Copy link
Member

🎉 This issue has been resolved in v2.13.0 (Release Notes)

@madoed
Copy link
Author

madoed commented Sep 12, 2023

@fhussonnois btw could you please also update version of connector at confluent hub?
For now only 2.10.0 version is available there

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
released Issue has been released
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants