Giter VIP home page Giter VIP logo

streamthoughts / kafka-connect-file-pulse Goto Github PK

View Code? Open in Web Editor NEW
321.0 8.0 61.0 16.51 MB

๐Ÿ”— A multipurpose Kafka Connect connector that makes it easy to parse, transform and stream any file, in any format, into Apache Kafka

Home Page: https://streamthoughts.github.io/kafka-connect-file-pulse/

License: Apache License 2.0

Java 99.34% Dockerfile 0.08% Makefile 0.17% Shell 0.20% ANTLR 0.21%
kafka kafka-connect grok-filters etl file-streaming kafka-producer kafka-connector avro csv xml

kafka-connect-file-pulse's People

Contributors

alankan-finocomp avatar at0dd avatar bertrandcedric avatar bjarosze avatar dependabot[bot] avatar fhussonnois avatar github-actions[bot] avatar loquisgon avatar lucas-dclrcq avatar marky110 avatar ogawa-takeshi avatar pka23 avatar qgeffard avatar rmoff avatar romlinch avatar rouellet99 avatar slachiewicz avatar sunxiaojian avatar tsironneau avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-file-pulse's Issues

NPE when no filter is configured

java.lang.NullPointerException at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:118) at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:91) at io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer.next(DefaultFileRecordsPollingConsumer.java:165) at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll(FilePulseSourceTask.java:125)

Incomplete documentation on DelimitedRowFilter.separator

In order to support the most popular delimited format - csv - the default delimiter (separator property) is ;

We're dealing with a pipe '|' separated format file. My initial attempt was to take/change your music example and alter the separator-character, like this filters.ParseDelimitedRow.separator": "|"

This doesn't work and creates unexpected results. Reason - as you'll probably know - is that under the hood a (String) value.split() is called. For the regex | is a special character, and the escaped alternative \| should be used instead.

It would be helpful to have either an extra warning line in the docs or (else) an extra example.

It's just a detail. Great product, keep up the good work!

The connector fails with an IllegalStateException when is restarted after a first crash while starting

trace": "java.lang.IllegalStateException: Cannot init again.\n\tat io.streamthoughts.kafka.connect.filepulse.storage.KafkaStateBackingStore.start(KafkaStateBackingStore.java:94)\n\tat io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner.(LocalFileSystemScanner.java:152)\n\tat io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector.start(FilePulseSourceConnector.java:107)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.resume(WorkerConnector.java:131)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:198)\n\tat org.apache.kafka.connect.runtime.Worker.transitionTo(Worker.java:828)\n\tat org.apache.kafka.connect.runtime.Worker.setTargetState(Worker.java:812)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.processTargetStateChanges(DistributedHerder.java:479)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:341)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:245)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n

Not recognizing the new file after processing the first one

I have File Pulse finish processing a "file1.JSON" (about 110MB in size) and added a second "file2.JSON" (about 170MB in size) and the File Pulse is not picking up the second file.
Things I have tried:
When I Copy and Paste multiple files into the Directory that File Pulse is watching it works fine and gets processed with no issues.

When I use a C# app to write files into that Directory then the File Pulse will process the first file successfully but not the second one.
I tried to create a new "file2.JSON" with a C# app in the Directory that File Pulse is watching and started writing data into the newly created "file2.CLOG". I monitored the connect-file-pulse-status topic and nothing changed. In other words the connect-file-pulse-status did not produce any new info.

My guess is that the File Pulse is somehow not picking up additional files (after the first file is finished processing) that are being added/written in the Directory that is being watched by File Pulse.

I am using the filepulse 1.3.1 release.

Is there anything else I can try? Any configuration I am missing?

Please help me solve this issue. I would be happy to provide more details if necessary. Thank you.

Error XML File Ingestion: org.apache.avro.SchemaParseException: Illegal initial character: #text

Hi,

I am ingesting XML Files with the following structure

<root>
  <document>
    <paragraph id="1">
      <sentence id="1">
        <content>Perfectly equipped.</content>
        <topic_gold>24</topic_gold>
      </sentence>
      <sentence id="2">
        <content>[...TEXT TO SELECT...]</content>
        <topic_gold>24</topic_gold>
        <subTopic_gold>24</subTopic_gold>
        <sentiment_gold>-1</sentiment_gold>
      </sentence>
      
      [sentence*]

    </paragraph>
    
     [paragraph*]
</root>

querying with the following xpath

//root/document/paragraph/sentence/topic_gold[text()=24]/parent::node()/content/text()

for the text in the content element where <topic_gold> has the text value 24. This query gives me the following error:

connect            | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
connect            |    at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:270)
connect            |    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:294)
connect            |    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
connect            |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect            |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect            |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect            |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect            |    at java.lang.Thread.run(Thread.java:748)
connect            | Caused by: org.apache.avro.SchemaParseException: Illegal initial character: #text
connect            |    at org.apache.avro.Schema.validateName(Schema.java:1147)
connect            |    at org.apache.avro.Schema.access$200(Schema.java:81)
connect            |    at org.apache.avro.Schema$Name.<init>(Schema.java:489)
connect            |    at org.apache.avro.Schema.createRecord(Schema.java:161)
connect            |    at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:893)
connect            |    at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:732)
connect            |    at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:726)
connect            |    at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:365)
connect            |    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:77)
connect            |    at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:270)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
connect            |    ... 11 more

When removing \text() at the end, querying just for the element the request works fine and the file gets processed. The problem then is that the text written to the topic has strange characters in it, which seem to be character conversion/encoding problems of characters like "</>".

Could you kindly assist why the original xpath query is not working? The XML File syntactically correct which I've checked multiple times already. Is this a bug or am I doing something wrong here?

Thanks
Eduard

Two errors using DelimitedRowFilter.columns

First minor issue is the documentation of DelimitedRowFilter.columns stating: Define the list of column names in order they appear in each row. columns must be in the form of TYPE:NAME. This should be NAME:TYPE.

Also, with just this line of documentation - and without inspecting the source code - it is hard to discover that the column definitions must be separated by a ';'

Now the second issue arises when one also specifies a separator (in our case the '|'). Suddenly the columns definition is no longer valid. Culprit is this line in the DelimitedRowFilterConfig:
for (String column : columns.split(delimiter())). It uses the same delimiter/separator as the file reading part.

This is a probably a bug, and at least very confusing :)

The following unit-test in DelimitedRowFilterTest passes because of this.

    @Test
    public void shouldWork() {
        final TypedStruct INNER_STRUCT = TypedStruct.create()
                .put("message", "title|album");

        configs.put(READER_FIELD_COLUMNS_CONFIG, "c1:STRING|c2:STRING");
    	configs.put(READER_FIELD_SEPARATOR_CONFIG, "\\|");
        filter.configure(configs);
        RecordsIterable<TypedStruct> output = filter.apply(null, INNER_STRUCT, false);
        Assert.assertNotNull(output);
        Assert.assertEquals(1, output.size());

        final TypedStruct record = output.iterator().next();
        Assert.assertEquals("title", record.getString("c1"));
        Assert.assertEquals("album", record.getString("c2"));
    }

internal.kafka.reporter.cluster.bootstrap.servers cannot connect to Confluent Cloud

We use a quite common infrastructure for our Kafka ecosystem: the Kafka brokers + schema registry are hosted in/by Confluent Cloud, and we host the Kafka Connect cluster ourselves. All this works well when the Kafka Connect cluster is configured properly: ssl, api-keys, etc.

We are investigating whether to use kafka-connect-file-pulse connector for file reading. It seems to work well on a local Kafka environment. However, the connector tracks file status on a designated Kafka topic. In order to do so it requires the following settings "internal.kafka.reporter.bootstrap.servers".

The file pulse connector uses this property to create an extra/internal connection to our Kafka broker. Most probably because doing so requires more than just the bootstrap.server, it fails with this message:
[Producer clientId=producer-5] Connection to node -1 (aws.confluent.cloud.hostname:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:748)

Is this a common or known issue? Is there a work-around, perhaps another way of status tracking that does not require this property be set?

mvn clean package - test fails

I cloned the repo, ran mvn clean package and had this error:

ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.02 s <<< FAILURE! - in io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalkerTest
[ERROR] shouldExtractXZGipCompressedFilesPathWhileScanningGivenNoConfig(io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalkerTest)  Time elapsed: 0.02 s  <<< FAILURE!
org.junit.ComparisonFailure: expected:<...t-compressed/archive[/file-entry-0.txt]> but was:<...t-compressed/archive[.zip]>
	at io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalkerTest.shouldExtractXZGipCompressedFilesPathWhileScanningGivenNoConfig(LocalFSDirectoryWalkerTest.java:71)

[INFO]
[INFO] Results:
[INFO]
[ERROR] Failures:
[ERROR]   LocalFSDirectoryWalkerTest.shouldExtractXZGipCompressedFilesPathWhileScanningGivenNoConfig:71 expected:<...t-compressed/archive[/file-entry-0.txt]> but was:<...t-compressed/archive[.zip]>
[INFO]
[ERROR] Tests run: 38, Failures: 1, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for Kafka Connect Source File Pulse Reactor 1.3.0-SNAPSHOT:
[INFO]
[INFO] Kafka Connect Source File Pulse Reactor ............ SUCCESS [  0.814 s]
[INFO] Kafka Connect Source File Pulse APIs ............... SUCCESS [  6.808 s]
[INFO] Kafka Connect Source File Pulse Filters ............ SUCCESS [  2.025 s]
[INFO] Kafka Connect Source File Pulse Plugin ............. FAILURE [  3.068 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  12.810 s
[INFO] Finished at: 2020-01-10T15:10:50Z
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M3:test (default-test) on project kafka-connect-file-pulse-plugin: There are test failures.
[ERROR]
[ERROR] Please refer to /Users/jean-me/Documents/apath/kafka-connect-file-pulse/connect-file-pulse-plugin/target/surefire-reports for the individual test results.
[ERROR] Please refer to dump files (if any exist) [date].dump, [date]-jvmRun[N].dump and [date].dumpstream.
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn <args> -rf :kafka-connect-file-pulse-plugin

connect-file-pulse-plugin/target/surefire-reports for the individual test results. this file doesn;t exist

Add DateFilter

The connector should propose a built-in filter to parse Date into a unix timestamp.

KafkaException: Could not instantiate class

Hello,
I am trying to setup File Pulse for the first time. I have a distributed Kafka with three workers running and I was successful at using the default Java FileStreamSourceConnector. But when I try to start the File Pulse connector I keep getting the following error:
{
"name": "file-pulse-connector",
"connector": {
"state": "FAILED",
"worker_id": "my.ip.address:8085",
"trace": "org.apache.kafka.common.KafkaException: Could not instantiate class io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker\n\tat org.apache.kafka.common.utils.Utils.newInstance(Utils.java:325)\n\tat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:370)\n\tat io.streamthoughts.kafka.connect.filepulse.config.ConnectorConfig.directoryScanner(ConnectorConfig.java:81)\n\tat io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector.start(FilePulseSourceConnector.java:100)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)\n\tat org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:252)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1079)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:117)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$11.call(DistributedHerder.java:797)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$11.call(DistributedHerder.java:783)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:296)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:245)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.reflect.InvocationTargetException\n\tat sun.reflect.GeneratedConstructorAccessor19.newInstance(Unknown Source)\n\tat sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)\n\tat java.lang.reflect.Constructor.newInstance(Constructor.java:423)\n\tat org.apache.kafka.common.utils.Utils.newInstance(Utils.java:321)\n\t... 18 more\nCaused by: java.lang.NoClassDefFoundError: org/apache/commons/compress/archivers/tar/TarArchiveInputStream\n\tat io.streamthoughts.kafka.connect.filepulse.scanner.local.codec.CodecManager.(CodecManager.java:44)\n\tat io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker.(LocalFSDirectoryWalker.java:67)\n\tat io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker.(LocalFSDirectoryWalker.java:56)\n\t... 22 more\nCaused by: java.lang.ClassNotFoundException: org.apache.commons.compress.archivers.tar.TarArchiveInputStream\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:418)\n\tat org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:351)\n\t... 25 more\n"
},
"tasks": [],
"type": "source"
}

Please help, I am not sure if this is because of the source code or because of the configuration. I would really appreciate any help.

Thank you.

Record Filtering

{ "CLOG_Objects": { "Record_Id": "74394818", "File": "TypeA", "Record_Data": "" } }
{ "CLOG_Objects": { "Record_Id": "74394818", "File": "TypeB", "Record_Data": "" } }
{ "CLOG_Objects": { "Record_Id": "74394818", "File": "TypeA", "Record_Data": "" } }

We receive the above string data and we want to apply filters on the text ["File":"TypeA"], so the topic would have 2 records and leave out the record which has the text as ["File":"TypeB"].
Would - Drop Filter be a right choice for this and also can regex be used for filtering. How to access the input data in the IF condition to apply the filter. Please help us.

Config file used:
{"name": "FilePulseConnector","config": {
"connector.class" : "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "org.apache.kafka.connect.storage.StringConverter",
"topic" : "filepulsetopic",
"internal.kafka.reporter.bootstrap.servers" : "...",
"fs.scan.directory.path" : "/mnt/KafkaInput",
"fs.cleanup.policy.class" : "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"filters" : "Drop",
"filters.Drop.type" : "io.streamthoughts.kafka.connect.filepulse.filter.DropFilter",
"filters.Drop.if" : "{{ contains( {{ CLOG_Objects.File }}, TypeA) }}",
"filters.Drop.invert" : "true"
}}

Error:
org.apache.kafka.connect.errors.ConnectException: io.streamthoughts.kafka.connect.filepulse.expression.ExpressionException: Invalid substitution expression : '{{ contains( {{ CLOG_Objects.File }}' (original expression = '{{ contains( {{ CLOG_Objects.File }}, TypeA) }}')
at io.streamthoughts.kafka.connect.filepulse.config.TaskConfig.filters(TaskConfig.java:155)
at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.newFileRecordsPollingConsumer(FilePulseSourceTask.java:89)
at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.start(FilePulseSourceTask.java:81)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:213)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.streamthoughts.kafka.connect.filepulse.expression.ExpressionException: Invalid substitution expression : '{{ contains( {{ CLOG_Objects.File }}' (original expression = '{{ contains( {{ CLOG_Objects.File }}, TypeA) }}')
at io.streamthoughts.kafka.connect.filepulse.expression.parser.regex.RegexExpressionParser.parseExpression(RegexExpressionParser.java:82)
at io.streamthoughts.kafka.connect.filepulse.expression.parser.regex.RegexExpressionParser.parseExpression(RegexExpressionParser.java:56)
at io.streamthoughts.kafka.connect.filepulse.filter.condition.ExpressionFilterCondition.(ExpressionFilterCondition.java:46)
at io.streamthoughts.kafka.connect.filepulse.filter.config.CommonFilterConfig.condition(CommonFilterConfig.java:85)
at io.streamthoughts.kafka.connect.filepulse.filter.AbstractRecordFilter.configure(AbstractRecordFilter.java:50)
at io.streamthoughts.kafka.connect.filepulse.config.TaskConfig.filters(TaskConfig.java:152)
... 10 more

No records is returned until read.max.wait.ms is reached.

The RowFileInputReader waits for read.max.wait.ms for more records to be available when the reader reaches the end of file. Currently, the waiting process is applied into the RowFileInputIterator#hasNext() method which is invoked in different places. Unfortunately this blocks the processing of the latest read records.

XML ingestion - map xml elements to apache avro classes

Hi,

I am ingesting XML files and writing them to a kafka topics. According to documentation, I am matching elements (including sub-elements) using an xpath query.

I'd like to know if it would it be possible to map the resulting entities (elements) together with their properties (sub-elements or element attributes) to avro-classes (taken from kafka schema-registry) which already exist and thus use pre-defined data types. Otherwise, most (maybe all?) of the automatically created schema fields are defined using String data types.

Many Thank for your help,
Eduard

Files are not being processed when more records are added

I have a file pulse connector running and it is processing the files when I initially start the connector. But after the initial processing, I am manually adding more data into the file and at this point it is not being processed. Is this normal or expected? If so, is there a way to change.configure it so that everytime there is a new data added to the file it is processed near real-time?

Thank you in advance for your help.

Cannot redefine a grok pattern twice

Currently, it is not possible to overload a pattern definition.

For example, this definition leads to the exception : java.lang.IllegalStateException: Duplicate key JAVACLASS (attempted merging values (?:[a-zA-Z$_][a-zA-Z$_0-9]*\.)*[a-zA-Z$_][a-zA-Z$_0-9]* and (?:[a-zA-Z0-9-]+\.)+[A-Za-z0-9$]+)

JAVACLASS (?:[a-zA-Z$_][a-zA-Z$_0-9]*\.)*[a-zA-Z$_][a-zA-Z$_0-9]*
#Space is an allowed character to match special cases like 'Native Method' or 'Unknown Source'
JAVAFILE (?:[A-Za-z0-9_. -]+)
#Allow special <init>, <clinit> methods
JAVAMETHOD (?:(<(?:cl)?init>)|[a-zA-Z$_][a-zA-Z$_0-9]*)
#Line number is optional in special cases 'Native method' or 'Unknown source'
JAVASTACKTRACEPART %{SPACE}at %{JAVACLASS:class}\.%{JAVAMETHOD:method}\(%{JAVAFILE:file}(?::%{NUMBER:line})?\)
# Java Logs
JAVATHREAD (?:[A-Z]{2}-Processor[\d]+)
JAVACLASS (?:[a-zA-Z0-9-]+\.)+[A-Za-z0-9$]+
#JAVAFILE (?:[A-Za-z0-9_.-]+)
JAVALOGMESSAGE (.*)
# MMM dd, yyyy HH:mm:ss eg: Jan 9, 2014 7:13:13 AM
CATALINA_DATESTAMP %{MONTH} %{MONTHDAY}, 20%{YEAR} %{HOUR}:?%{MINUTE}(?::?%{SECOND}) (?:AM|PM)
# yyyy-MM-dd HH:mm:ss,SSS ZZZ eg: 2014-01-09 17:32:25,527 -0800
TOMCAT_DATESTAMP 20%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:?%{MINUTE}(?::?%{SECOND}) %{ISO8601_TIMEZONE}
CATALINALOG %{CATALINA_DATESTAMP:timestamp} %{JAVACLASS:class} %{JAVALOGMESSAGE:logmessage}

How to specify config fields for file readers

I am using row file reader, as

task.reader.class=io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader

It has configs like skip.headers, etc.
How do i specify these configs in properties file ?

Cannot find RegexFileFilter

I wanted to filter out files except for .txt using

fs.scan.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileFilter
file.filter.regex.pattern="\\.txt$"

But it seems like there is not class in repo.
Do you plan to add it ?

Multiple JSON file as input

I need to read json file and send it to Kafka topic based on file pattern. like

If json file name contain a_ (a_sample.json) then this needs to goto x topic
If json file name contain b_ (b_sample.json) then this needs to goto y topic

Is file pulse connect support this? If yes,could you please guide me. Thanks.

Mohaideen S.

Build Failure

[INFO] Running io.streamthoughts.kafka.connect.filepulse.source.SourceMetadataTest
[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.013 s <<< FAILURE! - in io.streamthoughts.kafka.connect.filepulse.source.SourceMetadataTest
[ERROR] shouldCreateSourceMetadataGivenFile(io.streamthoughts.kafka.connect.filepulse.source.SourceMetadataTest) Time elapsed: 0.001 s <<< FAILURE!
java.lang.AssertionError
at io.streamthoughts.kafka.connect.filepulse.source.SourceMetadataTest.shouldCreateSourceMetadataGivenFile(SourceMetadataTest.java:54)

[INFO]
[INFO] Results:
[INFO]
[ERROR] Failures:
[ERROR] SourceMetadataTest.shouldCreateSourceMetadataGivenFile:54
[INFO]
[ERROR] Tests run: 71, Failures: 1, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for Kafka Connect Source File Pulse Reactor 1.3.0-SNAPSHOT:
[INFO]
[INFO] Kafka Connect Source File Pulse Reactor ............ SUCCESS [ 2.729 s]
[INFO] Kafka Connect Source File Pulse APIs ............... FAILURE [ 14.384 s]
[INFO] Kafka Connect Source File Pulse Filters ............ SKIPPED
[INFO] Kafka Connect Source File Pulse Plugin ............. SKIPPED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 17.763 s
[INFO] Finished at: 2020-01-19T23:21:08+05:30
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M3:test (default-test) on project kafka-connect-file-pulse-api: There are test failures.

NoClassDefFoundError: io/streamthoughts/kafka/connect/filepulse/offset/OffsetManager

Hello,

So after downloading and running mvn clean install -DskipTests I wanted to test your plugin locally following this configuration:

config/kafka-connect-file-pulse.conf
{
  "config": {
    "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
    "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
    "fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
    "fs.scan.directory.path": "/Users/me/Documents/ingestion_test",
    "fs.scan.interval.ms": "10000",
    "internal.kafka.reporter.bootstrap.servers": "localhost:9092",
    "internal.kafka.reporter.id": "connect-file-pulse-quickstart-test",
    "internal.kafka.reporter.topic": "connect-file-pulse-status",
    "offset.strategy": "name+hash",
    "skip.headers": "1",
    "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
    "topic": "ingestion",
    "tasks.max": 1
  },
  "name": "connect-file-pulse-quickstart-test"
}

alongside my local kafka.

Thus, on a naive and simple step I ran:
bin/connect-standalone.sh config/connect-standalone.properties config/kafka-connect-file-pulse.conf

but this returns the following error:

[2020-01-10 16:09:51,314] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectStandalone:130)
java.lang.NoClassDefFoundError: io/streamthoughts/kafka/connect/filepulse/offset/OffsetManager
	at java.lang.Class.getDeclaredConstructors0(Native Method)
	at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
	at java.lang.Class.getConstructor0(Class.java:3075)
	at java.lang.Class.newInstance(Class.java:412)
	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.versionFor(DelegatingClassLoader.java:380)
	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:350)
	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:330)
	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:263)
	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:255)
	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:224)
	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:201)
	at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:60)
	at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:79)
Caused by: java.lang.ClassNotFoundException: io.streamthoughts.kafka.connect.filepulse.offset.OffsetManager
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 13 more

on the folder for my plugin.path I put the 3 jar generated on the packaging (api, filters, plugin)

Am I doing something wrong? What would you suggest in order to run it with cli & without docker?

well-formed error

When i read XML file using pulse connector i am getting following error.

ERROR Error while processing source file '[name='Sample_Member_Enrollment_Status_File-3.xml', path='/opt/kafka/confluent/tmp', size=1681, lastModified=1590085242000, inode=5097008, hash=1839052428]' (io.streamthoughts.kafka.connect.filepulse.source.KafkaFileStateReporter:118)
io.streamthoughts.kafka.connect.filepulse.reader.ReaderException: Unexpected error happened while initializing 'XMLFileInputIterator'
at io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader$XMLFileInputIterator.(XMLFileInputReader.java:111)
at io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader.newIterator(XMLFileInputReader.java:75)
at io.streamthoughts.kafka.connect.filepulse.reader.AbstractFileInputReader.newIterator(AbstractFileInputReader.java:55)
at io.streamthoughts.kafka.connect.filepulse.source.FileInputIterable.open(FileInputIterable.java:66)
at io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer.openAndGetIteratorOrNullIfInvalid(DefaultFileRecordsPollingConsumer.java:278)
at io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer.next(DefaultFileRecordsPollingConsumer.java:156)
at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll(FilePulseSourceTask.java:127)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:270)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:237)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.xml.sax.SAXParseException; lineNumber: 14; columnNumber: 26; The content of elements must consist of well-formed character data or markup.
at com.sun.org.apache.xerces.internal.parsers.DOMParser.parse(DOMParser.java:257)
at com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderImpl.parse(DocumentBuilderImpl.java:339)
at io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader$XMLFileInputIterator.(XMLFileInputReader.java:96)
... 15 more

My configuration is

{
"config": {
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
"fs.scan.directory.path": "/opt/kafka/confluent/tmp/",
"fs.scan.interval.ms": "5000",
"offset.strategy": "name",
"read.max.wait.ms": "5000",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"topic": "sampleTopic",
"tasks.max": 1,
"internal.kafka.reporter.bootstrap.servers": "XXXXX",
"internal.kafka.reporter.id": "connect-file-pulse-xml",
"internal.kafka.reporter.topic": "connect-file-pulse-status",
"internal.kafka.reporter.consumer.security.protocol":"SASL_SSL",
"internal.kafka.reporter.consumer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username='XXXXX' password='XXXXXX",
"internal.kafka.reporter.consumer.ssl.endpoint.identification.algorithm":"https",
"internal.kafka.reporter.consumer.sasl.mechanism":"PLAIN",
"internal.kafka.reporter.producer.security.protocol":"SASL_SSL",
"internal.kafka.reporter.producer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username='XXXXXXXX' password='XXXXXX",
"internal.kafka.reporter.producer.ssl.endpoint.identification.algorithm":"https",
"internal.kafka.reporter.producer.sasl.mechanism":"PLAIN",
"xpath.expression":"/Members/Member[1]"
},
"name": "connect-file-pulse-quickstart-XML"
}

XML file content is

Null Pointer Exception when starting connector

Hi, the connector works great on another machine, but when I ported over to a new machine with the same connector configuration, I run into a NullPointerException with the following trace:

[2020-07-07 17:37:32,676] INFO Releasing access on FileStateBackingStore instance for                                                                                                                                                        group fp-test (remaining = null) (io.streamthoughts.kafka.connect.filepulse.state.Stat                                                                                                                                                       eBackingStoreRegistry:82)
[2020-07-07 17:37:32,676] INFO Stopping instance registered instance FileStateBackingS                                                                                                                                                       tore for group fp-test (io.streamthoughts.kafka.connect.filepulse.state.StateBackingSt                                                                                                                                                       oreRegistry:84)
[2020-07-07 17:37:32,676] INFO Closing FileStateBackingStore (io.streamthoughts.kafka.                                                                                                                                                       connect.filepulse.storage.KafkaStateBackingStore:112)
[2020-07-07 17:37:32,676] INFO Stopping KafkaBasedLog for topic connect-file-pulse-sta                                                                                                                                                       tus (io.streamthoughts.kafka.connect.filepulse.storage.KafkaBasedLog:145)
[2020-07-07 17:37:32,677] ERROR WorkerConnector{id=fp-test} Error while starting conne                                                                                                                                                       ctor (org.apache.kafka.connect.runtime.WorkerConnector:119)
java.lang.NullPointerException
at io.streamthoughts.kafka.connect.filepulse.storage.KafkaBasedLog.stop(KafkaBasedLog.java:153)
at io.streamthoughts.kafka.connect.filepulse.storage.KafkaStateBackingStore.stop(KafkaStateBackingStore.java:114)
at io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreRegistry.release(StateBackingStoreRegistry.java:85)
at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector.start(FilePulseSourceConnector.java:128)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:252)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1079)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:117)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1095)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1091)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

And here is my existing connector config:

{
  "config": {
    "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
    "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
    "fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
    "fs.scan.directory.path": "/path/to/files/",
    "fs.scan.filters": "io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter,io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.LastModifiedFileListFilter",
    "fs.scan.interval.ms": "1000",
    "fs.recursive.scan.enable": "false",
    "file.filter.regex.pattern": "status-[0-9]*.json$",
    "file.filter.minimum.age.ms": "180000",
    "internal.kafka.reporter.bootstrap.servers": "localhost:9092",
    "internal.kafka.reporter.topic": "topic_1",
    "offset.strategy": "name+hash",
    "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
    "topic": "topic_2",
    "tasks.max": 1,

	"transforms": "ExtractField",
	"transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
	"transforms.ExtractField.field":"message",

	"key.converter": "org.apache.kafka.connect.json.JsonConverter",
	"value.converter": "org.apache.kafka.connect.storage.StringConverter",
	"key.converter.schemas.enable": "false",
	"value.converter.schemas.enable": "false"

  },
  "name": "filepulse-test"
}

Any advice on what might be causing the issue? Much appreciated.

Add JoinFilter

The connector should propose a built-in filter to join elements of an array into a string value using a configurable separator

Error while starting connector

I am trying to get the File Pulse Connector started and I am getting the following error:

[2020-05-20 18:14:30,911] INFO ConnectorConfig values:
filters = []
fs.cleanup.policy.class = class io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy
fs.scan.directory.path = /mnt/kafkashared/
fs.scan.filters = []
fs.scan.interval.ms = 10000
fs.scanner.class = class io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker
internal.kafka.reporter.bootstrap.servers = my.ip.address:9093,my.ip.address:9094,my.ip.address:9095
internal.kafka.reporter.id = connect-distributed-cluster
internal.kafka.reporter.topic = connect-distributed-status
offset.strategy = name+hash
task.reader.class = class io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader
topic = file-pulse-topic
(io.streamthoughts.kafka.connect.filepulse.config.ConnectorConfig:347)
[2020-05-20 18:14:30,911] INFO Registering new store for name : connect-distributed-cluster (io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreRegistry:60)
[2020-05-20 18:14:30,912] INFO State store already registered for name : connect-distributed-cluster (io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreRegistry:64)
[2020-05-20 18:14:30,912] INFO Getting access on FileStateBackingStore instance for group connect-distributed-cluster (io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreRegistry:73)
[2020-05-20 18:14:30,912] INFO Added codec 'class io.streamthoughts.kafka.connect.filepulse.scanner.local.codec.ZipCodec' (io.streamthoughts.kafka.connect.filepulse.scanner.local.codec.CodecManager:48)
[2020-05-20 18:14:30,912] INFO Added codec 'class io.streamthoughts.kafka.connect.filepulse.scanner.local.codec.GZipCodec' (io.streamthoughts.kafka.connect.filepulse.scanner.local.codec.CodecManager:48)
[2020-05-20 18:14:30,912] INFO Added codec 'class io.streamthoughts.kafka.connect.filepulse.scanner.local.codec.TarballCodec' (io.streamthoughts.kafka.connect.filepulse.scanner.local.codec.CodecManager:48)
[2020-05-20 18:14:30,913] INFO Creating local filesystem scanner (io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner:127)
[2020-05-20 18:14:30,913] ERROR WorkerConnector{id=file-pulse-connector} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector:119)
java.lang.IllegalStateException: Cannot init again.
at io.streamthoughts.kafka.connect.filepulse.storage.KafkaStateBackingStore.start(KafkaStateBackingStore.java:94)
at io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner.(LocalFileSystemScanner.java:152)
at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector.start(FilePulseSourceConnector.java:107)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:252)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1079)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:117)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$11.call(DistributedHerder.java:797)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$11.call(DistributedHerder.java:783)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:296)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:245)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-05-20 18:14:30,915] WARN Discarding state update value with invalid key : status-connector-file-pulse-connector (io.streamthoughts.kafka.connect.filepulse.storage.KafkaStateBackingStore:285)
[2020-05-20 18:14:30,919] INFO Finished creating connector file-pulse-connector (org.apache.kafka.connect.runtime.Worker:268)
[2020-05-20 18:14:30,919] INFO [Worker clientId=connect-1, groupId=connect-distributed-cluster] Skipping reconfiguration of connector file-pulse-connector since it is not running (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1155)

I am not sure what the error is. I am having a difficulty pinpointing the issue.
Could there be something wrong with my configuration?
Thank you.

Not able to get "timestamp"

Hi,

i am using filepulse to read text files using following config. I want to add timestamp using Kafka transform.

name=TextSourceConnector
connector.class=io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
topic=text-source
tasks.max=1
transforms=InsertField
transforms.InsertField.timestamp.field=loadedAt
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
task.reader.class=com.accern.reader.TextFileInputReader
fs.cleanup.policy.class=io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy
fs.scanner.class=io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker
fs.scan.directory.path=/tmp/data/text
fs.scan.interval.ms=10000
internal.kafka.reporter.bootstrap.servers=localhost:9092
internal.kafka.reporter.id=filepulse-text
internal.kafka.reporter.topic=filepulse-status
offset.strategy=name+hash

I get output like this

{
"message":"Some message",
"title":"my title",
"loadedAt":null
}

I tried searching, They say, We need to add "timestamp" to kafka's SourceRecord at time of creating it. How do i do that with filepulse ?

SASL_SSL Support

Very good connector. We are using Confluent cloud and it support SASL_SSL. I am not seeing any reference in document. Can you let me know is this connector support SASL_SSL if yes can you provide implementation reference.

Thanks in advance.

xml files not recognized

Hi,
I'd like to ingest XML files using your connector, however, xml files are not recognized/found. Here is my json config

{
  "config": {
    "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
    "fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
    "fs.scan.directory.path": "/tmp/kafka-connect/examples/",
    "fs.scan.interval.ms": "10000",
    "fs.scan.filters": "io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
    "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
    "file.filter.regex.pattern": "\\.xml$",
    "internal.kafka.reporter.bootstrap.servers": "broker:29092",
    "internal.kafka.reporter.topic": "xmlfile-connect",
    "offset.strategy": "name+hash",
    "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
    "topic": "xmlfile-connect",
    "tasks.max": 1
  },
  "name": "xmlfile-connect"
}

The output of connect log says:

connect  | 2020-07-24 09:15:13,505 INFO [FileSystemMonitorThread] Completed scanned, number of files detected '0'  (io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner)
connect  | 2020-07-24 09:15:13,951 INFO [FileSystemMonitorThread] Finished lookup for new files : '0' files selected (io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner)
connect  | 2020-07-24 09:15:13,952 INFO [FileSystemMonitorThread] Waiting 9553 ms to scan for new files. (io.streamthoughts.kafka.connect.filepulse.source.FileSystemMonitorThread)

although the file sample-review.xml exists in the directory /tmp/kafka-connect/examples/ of the connect container of your docker-compose example. Could you kindly assist why files are not found? Could you maybe also provide me an XML example for your reader, because I haven't found anything. This would be really a big help.

Many Thanks
Eduard

File Pulse Source Connector Unable to setup

Hello Team,

I am trying to setup the FilePulseSourceConnector in my environment to test the local files sent to kafka topic, but by following the procedure creating the connector, unfortunately it is not allowing to setup the connector,

Below is the configuration of a connector,

{
"name": "FilePulseSourceConnector",
"config": {
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"tasks.max": "2",
"topic": "connectfilepulse.source",
"fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
"fs.scan.directory.path": "/var/log",
"fs.scan.filters": "worker.log",
"fs.scan.interval.ms": "10",
"FileInputReader": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
"internal.kafka.reporter.topic": "connect-file-pulse-status",
"internal.kafka.reporter.id": "test",
"internal.kafka.reporter.cluster.bootstrap.servers": "testserver:1234",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanPolicy",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileReader"
}
}

Errors are below,
Connector configuration is invalid and contains the following 3 error(s):
Invalid value io.streamthoughts.kafka.connect.filepulse.reader.RowFileReader for configuration task.reader.class: Class io.streamthoughts.kafka.connect.filepulse.reader.RowFileReader could not be found.
Missing required configuration "internal.kafka.reporter.bootstrap.servers" which has no default value.
Invalid value io.streamthoughts.kafka.connect.filepulse.clean.LogCleanPolicy for configuration fs.cleanup.policy.class: Class io.streamthoughts.kafka.connect.filepulse.clean.LogCleanPolicy could not be found.

Could you please look into the errors and suggest us to move forward to setup the connector.

Thanks
Mac

Cannot Register Filters

It might be I overlooked some documentation, but I am in a situation that I think my standalone kafka-connect worker is ignoring the filters setting.

name=connect-file-pulse-quickstart
connector.class=io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector

filters=RenameKeyFilter
filters.RenameKeyFilter.type=io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter
filters.RenameKeyFilter.field="$key"
filters.RenameKeyFilter.value="{{ $metadata.path }}/{{ $metadata.name }}"

fs.cleanup.policy.class=io.streamthoughts.kafka.connect.filepulse.clean.MoveCleanupPolicy
fs.scanner.class=io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker
fs.scan.directory.path=/tmp/data/
#fs.scan.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter
file.filter.regex.pattern="\\.raw_bin$"
fs.scan.interval.ms=10000


internal.kafka.reporter.bootstrap.servers=localhost:9092
internal.kafka.reporter.id=connect-file-pulse-quickstart
internal.kafka.reporter.topic=connect-file-pulse-status
offset.strategy=name+path
read.max.wait.ms=5000
task.reader.class=io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader
topic=rawbins
tasks.max=1

I need to configure this filter because I have files with the same name but in different folders.
Therefore I am trying to convert the key from just the file name to {path}/{filename}.

[2020-07-01 23:54:49,061] ERROR Unexpected error while scanning file system. (io.streamthoughts.kafka.connect.filepulse.source.FileSystemMonitorThread:81)
java.lang.IllegalStateException: Duplicate key [name='Kube+ 51421338 08;15;00.raw_bin', path='/tmp/data/2020-05-15', size=671, lastModified=1589637255000, inode=443601, hash=4286515073]
        at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
        at java.util.HashMap.merge(HashMap.java:1254)
        at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
        at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner.toScheduled(LocalFileSystemScanner.java:251)
        at io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner.updateFiles(LocalFileSystemScanner.java:228)
        at io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner.scan(LocalFileSystemScanner.java:193)
        at io.streamthoughts.kafka.connect.filepulse.source.FileSystemMonitorThread.run(FileSystemMonitorThread.java:79)

I was doing this name conversion because of the above Duplicate key exception. And I believe File Pulse is taking the filename as the message key.

I had gone through the console output and tried to find any keywords of filter or Filter, and got no results. Therefore I was suggesting the config file is not registering the filters as I imagined.

Any help is much appreciated. I have to confess that I am very new to the kafka ecosystem. Thanks.

SSL Support

Hello

Thank you for your amazing project.

I have a kafka cluster using ssl enabled, I tried to configure kafka-connect-file-pulse using ssl, but I am not able to configure properly, I got an error message about describe metadata from a topic. I am wondering if it is a issue related to ssl suport

I did not find anything about ssl suport in documentation. Is it possible to use kafka-connect-file-pulse connector using ssl?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.