Giter VIP home page Giter VIP logo

secor's People

Contributors

achad4 avatar ahsan-wego avatar ahsandar avatar andykram avatar ankon avatar ashm-4 avatar brndnmtthws avatar dangoldin avatar dependabot-preview[bot] avatar dependabot-support avatar dependabot[bot] avatar domwos avatar fluxx avatar glasser avatar henrycaihaiying avatar igorcalabria avatar jaimess avatar keithdotpower avatar lefthandmagic avatar lukess avatar marcin-kamionowski avatar mthssdrbrg avatar norrs avatar pdambrauskas avatar pgarbacki avatar prasincs avatar robmccoll avatar snyk-bot avatar suminb avatar zackdever avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

secor's Issues

Offsets are kept outside of zk chroot

Any particular reason the offsets are stored in /consumers rather than inside zk chroot {kafka.zookeeper.path}/consumers/{group}/offsets/?

This also might be the reason kafka.tools.ConsumerOffsetChecker is not working with this layout.

Support IAM temporary credentials from ec2 instance profile / role

We're looking at deploying secor on ec2 to push data to our S3 buckets but we're currently using ec2 instance profiles to allow instances to get temporary credentials to access AWS resources rather than hard-coding keys in config.

We've had a look at modifying secor to support this but it doesn't seem simple due to using the Hadoop S3 abstractions as they use an old version of jets3t that doesn't support passing the additional token needed to use the temporary credentials.

So it seems we can wait for Hadoop to finally work this this:

Or have an uploader that uses the native AWS Java SDK which supports this already (including token refreshing, etc..)

Support archiving of Kafka tombstones

A kafka tombstone is used with topics having hash keys to indicate data for the key has been removed from the log stream. The log cleaner/compacter can be configured to eventually remove these tombstones. Tombstones are kafka log messages with a null payload.

Currently, it appears that Secor has no support for tracking tombstones through to the archive, as it does not even retain the key of the message when reading from Kafka. It will simply write out the offset of the tombstone, with no information about what was actually deleted (assuming it doesn't run into one of the other reported bugs about null payloads). This information is critical if the downstream consumers are to make sense of deletes in the archived stream.

I believe that this may require an output file format change, as a triple of key/offset/payload must be stored.

Support Avro Message Parser

Secor only supports Thrift and JSON messages out of the box right now. Secor should provide a configurable Avro Message Parser which uses Avros GenericRecord parser and a configurable Timestamp field.

Secor does not handle topic deletion gracefully

When a topic gets deleted, the consumer offset for secor stays at whatever it was before, and causes these sorts of ERRORs:

2015-08-27 21:53:50,911 ConsumerFetcherThread-secor_consumer1231423_e25fd1c7e266_1_12-0-1 ERROR [ConsumerFetcherThread-secor_consumer1231423_e25fd1c7e266_1_12-0-1], Current offset 15 for partition [test.endtoendtest.category01,0] out of range; reset offset to 0

However, it doesn't resolve itself. In the mean time, a lot of errors get produced on the kafka end, since secor is requesting out of range offsets or non-existent topics.

I was hoping auto.offset.reset=min would help but looks like it has no effect.

Guava upgrade causes RateLimitUtil.acquire error

I tried running 9937d7d, and it fails with the following stack trace.

May 04, 2015 8:11:43 PM com.twitter.logging.Logger log
INFO: Starting LatchedStatsListener
May 04, 2015 8:11:43 PM com.twitter.logging.Logger log
INFO: Admin HTTP interface started on port 9990.
2015-05-04 20:11:45,862 [Thread-4] (kafka.utils.Logging$class:83) WARN  [secor_mobile_ip-X_26621_13], No broker partitions consumed by consumer thread secor_mobile_ip-X_26621_13-0 for topic mobile
2015-05-04 20:11:45,864 [Thread-5] (kafka.utils.Logging$class:83) WARN  [secor_mobile_ip-X_26621_14], No broker partitions consumed by consumer thread secor_mobile_ip-X_26621_14-0 for topic mobile
2015-05-04 20:11:45,873 [Thread-7] (kafka.utils.Logging$class:83) WARN  [secor_mobile_ip-X_26621_16], No broker partitions consumed by consumer thread secor_mobile_ip-X_26621_16-0 for topic mobile
2015-05-04 20:11:45,877 [Thread-3] (kafka.utils.Logging$class:83) WARN  [secor_mobile_ip-X_26621_12], No broker partitions consumed by consumer thread secor_mobile_ip-X_26621_12-0 for topic mobile
2015-05-04 20:11:45,888 [Thread-6] (kafka.utils.Logging$class:83) WARN  [secor_mobile_ip-X_26621_15], No broker partitions consumed by consumer thread secor_mobile_ip-X_26621_15-0 for topic mobile
2015-05-04 20:11:46,912 [secor_mobile_ip-X_26621_14_watcher_executor] (kafka.utils.Logging$class:83) WARN  [secor_mobile_ip-X_26621_14], No broker partitions consumed by consumer thread secor_mobile_ip-X_26621_14-0 for topic mobile
2015-05-04 20:11:46,921 [secor_mobile_ip-X_26621_15_watcher_executor] (kafka.utils.Logging$class:83) WARN  [secor_mobile_ip-X_26621_15], No broker partitions consumed by consumer thread secor_mobile_ip-X_26621_15-0 for topic mobile
2015-05-04 20:11:46,928 [secor_mobile_ip-X_26621_16_watcher_executor] (kafka.utils.Logging$class:83) WARN  [secor_mobile_ip-X_26621_16], No broker partitions consumed by consumer thread secor_mobile_ip-X_26621_16-0 for topic mobile
2015-05-04 20:11:47,143 [Thread-3] (com.pinterest.secor.main.ConsumerMain$1:65) ERROR Thread Thread[Thread-3,5,main] failed
java.lang.NoSuchMethodError: com.google.common.util.concurrent.RateLimiter.acquire()D
        at com.pinterest.secor.util.RateLimitUtil.acquire(RateLimitUtil.java:39)
        at com.pinterest.secor.reader.MessageReader.read(MessageReader.java:144)
        at com.pinterest.secor.consumer.Consumer.consumeNextMessage(Consumer.java:121)
        at com.pinterest.secor.consumer.Consumer.run(Consumer.java:90)

Reverting to ac688f5 fixes the issue. I am running OpenJDK 1.7.0_65

java version "1.7.0_65"
OpenJDK Runtime Environment (IcedTea 2.5.3) (7u71-2.5.3-0ubuntu0.14.04.1)
OpenJDK 64-Bit Server VM (build 24.65-b04, mixed mode)

Strange behavior around folders (broken exactly-once guarantee)

Hi,

I've been fighting to get secor working smoothly on our side but still keep hitting further issues. At first the line-delimited format was missing data (previous issue) so I switched over to sequence files which seem to be more reliable.

I have an issue as the files move to a new folder however, where the filename is not the starting offset, but rather the same filename as the final file in the previous bucket. I've been renaming them successfully for a while, but today I hit upon a set of files that looked like this:

/seq/counter/offset=4000000/1_0_00000000000004978500 contains: 4978500 => 4992667 (a)
/seq/counter/offset=4000000/1_0_00000000000004992668 contains: 4992668 => 4999999 (b)
/seq/counter/offset=5000000/1_0_00000000000004920756 contains: 5000000 => 5059705 (c)
/seq/counter/offset=5000000/1_0_00000000000004992668 contains: 5000000 => 5014068 (d)
/seq/counter/offset=5000000/1_0_00000000000005014069 contains: 5014069 => 5027951 (e)

To me that's a fundamental break in the guarentee's secor provides (exactly once), which seems to be related to a bug in the filenames. The set of files a/b/d/e would be correct, and I imagine (d) would have overwritten (c) if they were correctly named 1_0_00000000000005000000, but because of the naming both files were present.

Secor failure upload policy

If max file age set to more than 24 hours Secor fails to upload files when using DateMessageParser or other parsers that partition message based on date. Probably because every new day there will be new file created in each topic partition that renews the result of getModificationAgeSec method in FileRegistry.

Support persistence to HDFS instead of S3

For use cases where kafka runs entirely behind corp firewalls, and AWS isn't an option, it would be wonderful to use Secor. Would it be possible to persist things onto hdfs instead of s3 as an option?

Thanks!

Have trouble running run_tests.sh

Hi,

I know this has been reported before. Yes I read the run_tests header but I am kind of confuse on what are the "Hadoop native libs" refer to? Those in lib/native (The share libraries) or something else? I am running this on mac os Yosemite and my thrift version is 0.9.3


scripts/run_tests.sh: line 55: declare: -A: invalid option
declare: usage: declare [-afFirtx] [-p] [name[=value] ...]


Running tests for Message Type: 0 and ReaderWriter:com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory using filesystem: s3n


running post_and_verify_test
running pkill -f 'com.pinterest.secor.main.ConsumerMain' || true
running pkill -f 'kafka.Kafka' || true
running pkill -f 'org.apache.zookeeper.server.quorum.QuorumPeerMain' || true
running rm -r -f /tmp/secor_dev
running s3cmd del --recursive s3://test-bucket/secor_dev
scripts/run_tests.sh: line 77: s3cmd: command not found
running s3cmd ls -r s3://test-bucket/secor_dev
scripts/run_tests.sh: line 77: s3cmd: command not found
running mkdir -p /tmp/secor_dev/logs
running scripts/run_kafka_class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain scripts/../zookeeper.test.properties > /tmp/secor_dev/logs/zookeeper.log 2>&1 &
running scripts/run_kafka_class.sh kafka.Kafka scripts/../kafka.test.properties > /tmp/secor_dev/logs/kafka_server.log 2>&1 &
running scripts/run_kafka_class.sh kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test > /tmp/secor_dev/logs/create_topic.log 2>&1
running /Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home//bin/java -server -ea -Dlog4j.configuration=log4j.dev.properties -Dconfig=secor.test.backup.properties -Dsecor.s3.filesystem=s3n -Dsecor.file.reader.writer.factory=com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory -cp :lib/ com.pinterest.secor.main.ConsumerMain > /tmp/secor_dev/logs/secor_backup.log 2>&1 &
running /Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home//bin/java -server -ea -Dlog4j.configuration=log4j.dev.properties -Dconfig=secor.test.backup.properties -cp :lib/ com.pinterest.secor.main.TestLogMessageProducerMain -t test -m 100 -p 1 -type 0 -timeshift 0 > /tmp/secor_dev/logs/test_log_message_producer.log 2>&1
Waiting 40 sec for Secor to upload logs to s3
Verifying 100 0
running /Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home//bin/java -server -ea -Dlog4j.configuration=log4j.dev.properties -Dconfig=secor.test.backup.properties -Dsecor.s3.filesystem=s3n -Dsecor.file.reader.writer.factory=com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory -cp :lib/ com.pinterest.secor.main.LogFileVerifierMain -t test -m 100 -q > /tmp/secor_dev/logs/log_verifier_backup.log 2>&1
\e[1;41;97mVerification FAILED\e[0m
See log /tmp/secor_dev/logs/log_verifier_backup.log for more details
2015-12-11 12:52:33,216 main WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2015-12-11 12:52:34,439 main ERROR Log file verifier failed
org.apache.hadoop.security.AccessControlException: Permission denied: s3n://test-bucket/secor_dev/backup/test
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:449)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:427)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleException(Jets3tNativeFileSystemStore.java:411)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:181)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at org.apache.hadoop.fs.s3native.$Proxy4.retrieveMetadata(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:526)
at com.pinterest.secor.util.FileUtil.list(FileUtil.java:102)
at com.pinterest.secor.util.FileUtil.listRecursively(FileUtil.java:119)
at com.pinterest.secor.tools.LogFileVerifier.populateTopicPartitionToOffsetToFiles(LogFileVerifier.java:58)
at com.pinterest.secor.tools.LogFileVerifier.verifyCounts(LogFileVerifier.java:116)
at com.pinterest.secor.main.LogFileVerifierMain.main(LogFileVerifierMain.java:93)
Caused by: org.jets3t.service.impl.rest.HttpException: 403 Forbidden
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:425)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:279)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1052)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2264)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2193)
at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120)
at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174)
... 13 more
See log /tmp/secor_dev/logs/secor_backup.log for more details

2015-12-11 12:51:52,714 Thread-0 INFO Property serializer.class is overridden to kafka.serializer.DefaultEncoder
Exception in thread "Thread-0" java.lang.RuntimeException: Undefined message encoding type: 0
at com.pinterest.secor.tools.TestLogMessageProducer.run(TestLogMessageProducer.java:76)


TIA

Kim

Zookeeper quorum and Kafka broker host configs both required

Mainly just wondering if there's a reason why Secor needs to know both the Zookeeper hosts and a Kafka broker in order to function. As far as I know, for older versions of kafka (pre 0.82), talking to zookeeper will be able to provide info of kafka brokers as needed, while for newer versions (0.82 and beyond) clients can manage zookeeper offsets through kafka itself.

But looking at Secor's config files, it seems like both are required. Is there a particular reason?

S3 files corruption

Secor is leaving the end of messages in the output of smaller messages.

If you load these 2 messages into kafka.
{"array":[1,2,3],"boolean":true,"null":null,"dog":"brown","dog2":"brown","dog3":"brown","dog4":"brown","dog5":"brown","dog6":"brown","dog7":"brown","dog8":"brown","dog9":"brown","dog10":"brown","dog11":"brown","number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}

{"array":[1,2,3],"boolean":true,"null":null,"number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}

The secor file in S3 will look like this. I put the extra data in bold on line 14
13: {"array":[1,2,3],"boolean":true,"null":null,"dog":"brown","dog2":"brown","dog3":"brown","dog4":"brown","dog5":"brown","dog6":"brown","dog7":"brown","dog8":"brown","dog9":"brown","dog10":"brown","dog11":"brown","number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}

14: {"array":[1,2,3],"boolean":true,"null":null,"number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}n","dog6":"brown","dog7":"brown","dog8":"brown","dog9":"brown","dog10":"brown","dog11":"brown","number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}

Looks like this is an issue with BytesWritable. Here is some code testing that.
package secor;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;

public class TestWriter {

private  SequenceFile.Writer mWriter;


public static void main(String[] args) {

    TestWriter testWriter = new TestWriter();

    try {
        testWriter.init();

        testWriter.write(10, "This is the shit");
        testWriter.write(11, "Leo");
        testWriter.mWriter.close();

        testWriter.read();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (InstantiationException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (IllegalAccessException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }


}

public void init() throws IOException{
    Configuration config = new Configuration();
    FileSystem fs = FileSystem.get(config);
    Path fsPath = new Path("/tmp/testfile");

    this.mWriter = SequenceFile.createWriter(fs, config, fsPath,
        LongWritable.class, BytesWritable.class);
}


public void write(long key, String value) throws IOException {

    LongWritable writeableKey = new LongWritable(key);
    BytesWritable writeableValue = new BytesWritable(value.getBytes());
    System.out.println(new String(writeableValue.getBytes()));
    this.mWriter.append(writeableKey, writeableValue);

}

public void read() throws IOException, InstantiationException, IllegalAccessException{
    Configuration config = new Configuration();
    FileSystem fs = FileSystem.get(config);
    Path fsPath = new Path("/tmp/testfile");

    SequenceFile.Reader reader = new SequenceFile.Reader(fs, fsPath, new Configuration());
    LongWritable key = new LongWritable();
    BytesWritable value = new BytesWritable();
    System.out.println("reading file " + fsPath);
    while (reader.next(key, value)) {
        System.out.println(new String(value.getBytes()));
    }
}

}

Message level processing

Hi,

I started looking into Secor from last 2-3 days and found it really useful. Now, I've a requirement to process individual message before dumping it into Amazon S3. Processing could be kind of transformation, projection etc. After exploring the source code I didn't find any place where we can plug our custom transformation logic.
My first query is: Am I missing something in the source code? If Yes, then please let me know where can I do it? Otherwise I've created another package (In my local repo) which can be used for the same purpose and I would love to contribute my extension in the Secor repo.

Thanks,
Ashish

SimpleDateFormat threading issue throws ArrayIndexOutOfBoundsException in TimestampedMessageParser

Hi all,

I think there's a problem with using SimpleDateFormat in the TimestampedMessageParser on java 1.8.0:

$ java -version
java version "1.8.0_20"
Java(TM) SE Runtime Environment (build 1.8.0_20-b26)
Java HotSpot(TM) 64-Bit Server VM (build 25.20-b23, mixed mode)

The error is:

java.lang.ArrayIndexOutOfBoundsException: 13
at sun.util.calendar.BaseCalendar.getCalendarDateFromFixedDate(BaseCalendar.java:453)
at java.util.GregorianCalendar.computeFields(GregorianCalendar.java:2397)
at java.util.GregorianCalendar.computeFields(GregorianCalendar.java:2312)
at java.util.Calendar.complete(Calendar.java:2236)
at java.util.Calendar.get(Calendar.java:1826)
at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1119)
at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966)
at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936)
at java.text.DateFormat.format(DateFormat.java:345)
at com.pinterest.secor.parser.TimestampedMessageParser.generatePartitions(TimestampedMessageParser.java:78)
at com.pinterest.secor.parser.TimestampedMessageParser.extractPartitions(TimestampedMessageParser.java:99)
at com.pinterest.secor.parser.MessageParser.parse(MessageParser.java:40)
at com.pinterest.secor.consumer.Consumer.consumeNextMessage(Consumer.java:139)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:93)

The relevant code at TimestampedMessageParser.java:78 is:

    String dt = "dt=" + mDtFormatter.format(date);

This line will probably be a problem as well:

    String hr = "hr=" + mHrFormatter.format(date);

For the first cut, I'm going to try to synchronize on mDtFormatter. But supposedly, it's not as fast as using a ThreadLocal variable.

What's the best way of fixing this?

Thanks

Data loss during rebalance

We observed a data loss during rebalance. This happens very rarely, but look at code path, this definitely can happen.

The current consumer deleteWriters, then another thread committed offset.
https://github.com/pinterest/secor/blob/master/src/main/java/com/pinterest/secor/uploader/Uploader.java#L96

https://github.com/pinterest/secor/blob/master/src/main/java/com/pinterest/secor/uploader/Uploader.java#L102

This if statement return false, nothing happens, All data in this writer will be lost.

Since this is a very rare case, I think maybe the best solution will be throw a runtime exception if the if statement return false?

Partition Finalizer appears thrift specific

Was looking at possibility of adding extra partitioning support for a list of fields and was browsing around

PartitionFinalizer.java. It appears that this class is thrift specific:
private ThriftMessageParser mThriftMessageParser;

Not really a problem for me at the moment, but figured I'd let you know.

configure secor logging

When invoking com.pinterest.secor.main.ConsumerMain
I include the suggested logging configuration paramater:
-Dlog4j.configuration=log4j.prod.properties

strace tells me that the app never tries to read log4j.prod.properties

$ strace -f -e trace=file java -ea -Dsecor_group=secor_backup -Dlog4j.configuration=log4j.prod.properties -Dconfig=secor.prod.backup.properties -cp secor-0.1-SNAPSHOT.jar:lib/* com.pinterest.secor.main.ConsumerMain 2> out.log
$ grep properties out.log | grep log
[pid 6469] open("/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/logging.properties", O_RDONLY) = 95

The strace is successful and other properties files related to logging are being read.
Unfortunately, the specified log4j.prod.properties is never read.

$ java -version
java version "1.7.0_65"
OpenJDK Runtime Environment (IcedTea 2.5.3) (7u71-2.5.3-0ubuntu0.14.04.1)
OpenJDK 64-Bit Server VM (build 24.65-b04, mixed mode)

Flaw in handling un-parsable messages causing logs to be lost?

Hello there, I haven't been working with Secor too long and I apologise if I have misunderstood. (If so feel free to close this issue.) However, I think there is a flaw in the way un-parsable (malformed) messages are handled.

I noticed during testing that when I fed an un-parsable message into a topic, consumption of it by secor would cause the files for that given topic-partition to be deleted.

I believe the problem to be in the implementation of the logic described here,:

writer.write(message) {
  t = message.topic;
  p = message.partition;
  if message.offset != last_seen_offset[t, p] + 1 {
    delete_local_files_for_topic_partition(t, p);
  }
  write_to_local_file(message);
  last_seen_offset[t, p] = message.offset;
}

The problem is that although Secor will gracefully skip over the malformed message, the last_seen_offset will not get updated. This means that when reading the next message message.offset != last_seen_offset[t, p] + 1 will evaluate to true and the files for that topic-partition will be deleted even though no rebalancing event occurred!

I have a modified fork of Secor but as far as I can tell this is logic from before I forked. Would be really great if someone else confirm/disprove this behaviour.

automatic deleting local files

Does Secor delete local files that have been uploaded to s3? I saw this setting:

# If greater than 0, upon starup Secor will clean up directories and files under secor.local.path
# that are older than this value.
secor.local.log.delete.age.hours=-1

But the documentation seems to say it only does the check on startup. Is/will there be a setting for doing cleanup by age during runtime?

Thx

Issue completing /scripts/run_tests.sh

Hey Team Pinterest!

Awesome project and even more awesome you guys released it as OSS.

I'm having an issue completing the run_tests.sh, even with modifying the s3:// bucket to my own with full privileges. This might be the same issue as this ticket

Here's the error output:

WARNING: Remote list is empty.
running mkdir -p /tmp/secor_dev/logs
running ./bin/scripts/run_kafka_class.sh         org.apache.zookeeper.server.quorum.QuorumPeerMain zookeeper.test.properties >         /tmp/secor_dev/logs/zookeeper.log 2>&1 &
running ./bin/scripts/run_kafka_class.sh kafka.Kafka kafka.test.properties >         /tmp/secor_dev/logs/kafka_server.log 2>&1 &
running ./bin/scripts/run_kafka_class.sh kafka.admin.TopicCommand --create --zookeeper         localhost:2181 --replication-factor 1 --partitions 2 --topic test >         /tmp/secor_dev/logs/create_topic.log 2>&1
running java -server -ea -Dlog4j.configuration=log4j.dev.properties         -Dconfig=secor.dev.backup.properties -Dsecor.file.reader.writer=com.pinterest.secor.io.impl.DelimitedTextFileReaderWriter -cp secor-0.1-SNAPSHOT.jar:lib/*         com.pinterest.secor.main.ConsumerMain > /tmp/secor_dev/logs/secor_backup.log 2>&1 &
running java -server -ea -Dlog4j.configuration=log4j.dev.properties         -Dconfig=secor.dev.backup.properties -cp secor-0.1-SNAPSHOT.jar:lib/*         com.pinterest.secor.main.TestLogMessageProducerMain -t test -m 1000 -p 1 -type json >         /tmp/secor_dev/logs/test_log_message_producer.log 2>&1
Waiting 120 sec for Secor to upload logs to s3
running java -server -ea -Dlog4j.configuration=log4j.dev.properties           -Dconfig=secor.dev.backup.properties -Dsecor.file.reader.writer=com.pinterest.secor.io.impl.DelimitedTextFileReaderWriter -cp secor-0.1-SNAPSHOT.jar:lib/*           com.pinterest.secor.main.LogFileVerifierMain -t test -m 1000 -q >           /tmp/secor_dev/logs/log_verifier_backup.log 2>&1
Verification FAILED
See log /tmp/secor_dev/logs/log_verifier_backup.log for more details
Error: Could not find or load main class com.pinterest.secor.main.LogFileVerifierMain

Encrypting data in S3 using secor

I'm trying to configure secor to use AES256 encryption in the target S3 bucket. Does anyone have past experience in getting this working?

Support other output formats

The message committed to S3 is restricted to format (Long, Bytes) which is most frequently used. Anyway sometimes is necessary to use other formats like 'Text' for instance.

underscore in property makes it hard to configure via bash interpolation

This is kind of crazy but the underscore present in https://github.com/pinterest/secor/blob/master/src/main/config/secor.common.properties#L21
and the corresponding https://github.com/pinterest/secor/blob/master/src/main/java/com/pinterest/secor/common/SecorConfig.java#L172
makes it really hard to interpolate Bash environment variables into the typesafe config format, as one might be inspired to do in the case of Docker's environment files and a tool like https://github.com/ralph-tice/offspring.sh

So, is it crazy to ask that configuration keys don't include _, which are the only reasonably delimiter to use for POSIX environment variables?

For example, I would use MYORG_SECOR_KAFKA_TOPIC_FILTER=mytopicprefix.*

perf_topic_prefix has the same problem but I don't see its usage outside of tests...

S3 sync warnings

I get the following warning each time secor writes to S3. I think that the requests just make sure the file does not exist before actually uploading to S3, is that the case or should I worry about them?

I don't think the local "WARN No writer found for path" at the end is related, would that be a problem?

2014-10-27 16:05:25,225 [Thread-3] (org.jets3t.service.impl.rest.httpclient.RestS3Service:393) WARN  Response '/raw_logs%2Fsecor_backup%2Fmat_postbacks%2Foffset%3D0%2F1_0_00000000000001039472' - Unexpected response code 404, expected 200
2014-10-27 16:05:25,238 [Thread-3] (org.jets3t.service.impl.rest.httpclient.RestS3Service:393) WARN  Response '/raw_logs%2Fsecor_backup%2Fmat_postbacks%2Foffset%3D0%2F1_0_00000000000001039472_%24folder%24' - Unexpected response code 404, expected 200
2014-10-27 16:05:25,267 [Thread-3] (org.jets3t.service.impl.rest.httpclient.RestS3Service:393) WARN  Response '/raw_logs%2Fsecor_backup%2Fmat_postbacks%2Foffset%3D0%2F1_0_00000000000001039472' - Unexpected response code 404, expected 200
2014-10-27 16:05:25,282 [Thread-3] (org.jets3t.service.impl.rest.httpclient.RestS3Service:393) WARN  Response '/raw_logs%2Fsecor_backup%2Fmat_postbacks%2Foffset%3D0%2F1_0_00000000000001039472_%24folder%24' - Unexpected response code 404, expected 200
2014-10-27 16:05:25,350 [Thread-3] (com.pinterest.secor.common.FileRegistry:157) WARN  No writer found for path /var/log/secor_data/message_logs/backup/18704_12/mat_postbacks/offset=0/1_0_00000000000001039472
Oct 27, 2014 4:05:25 PM com.twitter.common.zookeeper.DistributedLockImpl unlock
INFO: Cleaning up this locks ephemeral node.
Oct 27, 2014 4:05:25 PM com.twitter.common.zookeeper.DistributedLockImpl cleanup

DelimitedTextFile trim file incorrectly around rebalance

DelimitedTextFile assumed LogFilePath's offset is the starting offset, and entire file is continuous stream.

First problem will be a rebalance happened after datetime partition switch, but this doesn't happen very often at same time with a rebalance event.

Second situation is an old thread picked a topic again after a period time, it will still use previous committed offset as filename which can be days ago.

"Transform" topic name before uploading to S3

Currently the topic name is used as-is, which isn't necessarily a bad thing, but we'd like to have some more control over the final structure on S3 and thus would like to "transform" the directory path where things are stored on S3.

Basically what we want to do is replace . (dots) with / (slashes) in the topic name (as one is not allowed to use slashes in topic names (for obvious reasons)).

@pgarbacki any thoughts about this? What would be the best way to approach this? Anyone else feel like they would benefit from this?

Class clash between AWS libraries on the classpath.

I've upgrade Secor to 0.9 and I've switched to s3a file system.

After upgrade I've started to receive the following errors (stacktrace can be in a little mess, because I've retrieved from our temporary log system) :

INFO: Cleaning up!
INFO: Cleaning up this locks ephemeral node.
Dec 17, 2015 10:30:29 AM com.twitter.common.zookeeper.DistributedLockImpl unlock
Dec 17, 2015 10:30:29 AM com.twitter.common.zookeeper.DistributedLockImpl cleanup
2015-12-17 10:30:29,299 [Thread-3] (com.pinterest.secor.main.ConsumerMain$1:65) ERROR Thread Thread[Thread-3,5,main] failed
java.lang.RuntimeException: Failed to apply upload policy
    at com.pinterest.secor.uploader.HadoopS3UploadManager$1.run(HadoopS3UploadManager.java:57)
    at com.pinterest.secor.util.FileUtil.moveToCloud(FileUtil.java:156)
    at com.pinterest.secor.util.FileUtil.getFileSystem(FileUtil.java:71)
    at com.pinterest.secor.uploader.Uploader.uploadFiles(Uploader.java:95)
    at com.pinterest.secor.uploader.Uploader.checkTopicPartition(Uploader.java:186)
    at com.pinterest.secor.uploader.Uploader.applyPolicy(Uploader.java:207)
    at com.pinterest.secor.consumer.Consumer.checkUploadPolicy(Consumer.java:112)
    at com.pinterest.secor.consumer.Consumer.checkUploadPolicy(Consumer.java:110)
    at com.pinterest.secor.consumer.Consumer.run(Consumer.java:102)
    at com.pinterest.secor.uploader.FutureHandle.get(FutureHandle.java:34)
Caused by: java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold(I)V
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2653)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:285)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
Caused by: java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold(I)V
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.lang.Thread.run(Thread.java:745)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

Problems is with AWS libs:

  • Secor requires com.amazonaws:aws-java-sdk-s3:1.10.2 (com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold method has Long parameter)
  • Hadoop lib org.apache.hadoop:hadoop-aws:2.7.0 requires com.amazonaws:aws-java-sdk:1.7.4 (com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold method has int parameter)

How could I parse created_at field in Twitter's stream

I use JSON date parser but in Twitter's streams haven't timestamp. I got the error like this

java.lang.NumberFormatException: For input string: "Mon Dec 14 16:28:09 +0000 2015"
    at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1250)
    at java.lang.Double.valueOf(Double.java:504)
    at com.pinterest.secor.parser.JsonMessageParser.extractTimestampMillis(JsonMessageParser.java:39)
    at com.pinterest.secor.parser.TimestampedMessageParser.extractPartitions(TimestampedMessageParser.java:104)
    at com.pinterest.secor.parser.MessageParser.parse(MessageParser.java:40)
    at com.pinterest.secor.consumer.Consumer.consumeNextMessage(Consumer.java:139)
    at com.pinterest.secor.consumer.Consumer.run(Consumer.java:93)

Could you please give me any suggestion?

Question: High Level Consumer API dependency

I'd like to look at secor support for IBM Message Hub (kafka) on Bluemix, however I have just seen:

You cannot use the Kafka high-level consumer API with Message Hub. Instead, use the new Kafka 0.9 consumer API.

Source: https://www.ng.bluemix.net/docs/services/MessageHub/index.html

I see from the secor README.md that it relies on the high level consumer API's.

  1. Is the README.md still accurate in that statement?
  2. How prolific is the dependency on the high-level API's? I.e. how much effort would it take to port secor to use the new APIs?

issue with s3 and new bucketname

I have setup secor but when I run test_run, i get the following error. note that I was successfully able to set it up with another bucket and it works find for that. but when I change the s3 bucket to a different name(I have also created the bucket manually) it provides such error.

See log /tmp/secor_dev/logs/log_verifier_backup.log for more details
2015-08-14 14:06:46,037 main ERROR Log file verifier failed
org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: S3 HEAD request failed for '/sector-test%2Fbackup%2Ftest' - ResponseCode=400, ResponseMessage=Bad Request
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleServiceException(Jets3tNativeFileSystemStore.java:229)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:85)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:62)
at org.apache.hadoop.fs.s3native.$Proxy1.retrieveMetadata(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:407)
at com.pinterest.secor.util.FileUtil.list(FileUtil.java:55)
at com.pinterest.secor.util.FileUtil.listRecursively(FileUtil.java:71)
at com.pinterest.secor.tools.LogFileVerifier.populateTopicPartitionToOffsetToFiles(LogFileVerifier.java:62)
at com.pinterest.secor.tools.LogFileVerifier.verifyCounts(LogFileVerifier.java:120)
at com.pinterest.secor.main.LogFileVerifierMain.main(LogFileVerifierMain.java:93)

Secor not safe to null payloads

com.pinterest.secor.message.Message.fieldsToString not null safe.
Somehow we got a null payload in kafka, and secor throws a NullPointerException.

15/05/11 19:12:24 ERROR main.ConsumerMain: Thread Thread[Thread-4,5,main] failed
java.lang.NullPointerException
at java.lang.String.(String.java:554)
at com.pinterest.secor.message.Message.fieldsToString(Message.java:35)
at com.pinterest.secor.message.Message.toString(Message.java:44)
at java.lang.String.valueOf(String.java:2981)
at java.lang.StringBuilder.append(StringBuilder.java:131)
at com.pinterest.secor.reader.MessageReader.read(MessageReader.java:141)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:92)

I would fix this myself but I can no longer build secor due to twitter dependencies that are no longer available.

[ERROR] Failed to execute goal com.twitter:maven-finagle-thrift-plugin:0.0.9:compile (thrift-sources) on project secor: Execution thrift-sources of goal com.twitter:maven-finagle-thrift-plugin:0.0.9:compile failed: A required class was missing while executing com.twitter:maven-finagle-thrift-plugin:0.0.9:compile: com/google/common/collect/ImmutableSet

I tried lots of ways to find the dependencies but nothing would fix it completely.

I am a committer on this project and really would like to contribute some more, if someone could help with the dependency problems

Support fror s3a hadoop file system is broken

I've tried use s3a file system in my configuration, but I've get error:

java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key 
must be specified as the username or password (respectively) of a s3n URL, 
or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey 
properties (respectively).

After short investigation I've noticed that support for s3a introduced in commit Support s3a hadoop file system. was accidentally destroyed by next commit Openstack Swift integration.

In the class com.pinterest.secor.uploaderHadoopS3Uploader the method mConfig.getS3Prefix() was replaced with com.pinterest.secor.util.FileUtil.getPrefix() which doesn't support "s3a". Probably only change which have to be made to restore s3a support is change in the com.pinterest.secor.util.FileUtil.getPrefix() method.

Line 90:

prefix = "s3n://" + config.getS3Bucket() + "/" + config.getS3Path();

should be changed to:

prefix = config.getS3Prefix();

PartitionFinalizer failing for custom MessageParser

I have a message parser that generates partitions in S3 of the form "'y'=yyyy/'m'=MM/'d'=dd/'H'=HH". When I run the partition finalizer I get this error:

2015-11-18 23:21:42,824 main ERROR Partition finalizer failed
java.lang.AssertionError: wrong partition format: y=2015/m=11/d=18
at com.pinterest.secor.parser.PartitionFinalizer.finalizePartitionsUpTo(PartitionFinalizer.java:143)
at com.pinterest.secor.parser.PartitionFinalizer.finalizePartitions(PartitionFinalizer.java:190)
at com.pinterest.secor.main.PartitionFinalizerMain.main(PartitionFinalizerMain.java:45)

I understand if having the '=' sign might cause a conflict when storing in Hive, but since I don't use Hive, it would be nice if this didn't fail.

Fresh build fails on CentOS 7.1

There appears to be something wrong wit the pom.xml file. A number of warnings are emitted, and then the build fails unable to find the com.twitter.finagle.thrift package.

[WARNING] 
[WARNING] Some problems were encountered while building the effective model for com.pinterest:secor:jar:0.2-SNAPSHOT
[WARNING] 'build.plugins.plugin.(groupId:artifactId)' must be unique but found duplicate declaration of plugin org.codehaus.mojo:exec-maven-plugin @ line 269, column 21
[WARNING] 
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING] 
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING] 

...

[WARNING] Checksum validation failed, no checksums available from the repository for http://maven.twttr.com/org/apache/thrift/libthrift/0.5.0/libthrift-0.5.0.pom

...

[WARNING] Checksum validation failed, no checksums available from the repository for http://maven.twttr.com/thrift/libthrift/0.5.0/libthrift-0.5.0.jar

...

[ERROR] COMPILATION ERROR : 
[ERROR] /home/build/jenkins_tempdir/workspace/secor/target/generated-sources/thrift/gen-java/com/pinterest/secor/thrift/TestMessage.java:[35,33] error: package com.twitter.finagle.thrift does not exist

Oddly, it appears to build without incident on OSX 10.10.5

Locale-dependent test

Hello! This DateMessageParser test is locale-dependent, and fails in non-English locales (which is how I discovered it!).

What would the best solution for this issue? Adding a new locale parameter (e.g. message.timestamp.locale) or just removing the test?

Restoring Data

The default behaviour of Secor is to backup data to Amazon S3.
What about restoring this data? I mean, what if a cluster or a broker breaks, you loose all the log data or it is deleted and you need it again?

Does this project also serves for this??

Thanks in advance

LogFilePrinter displaying extra characters

When using the LogFilePrinter secor is not using length to trim the byte array from the BytesWritable.

If you load these 2 messages into kafka.
{"array":[1,2,3],"boolean":true,"null":null,"dog":"brown","dog2":"brown","dog3":"brown","dog4":"brown","dog5":"brown","dog6":"brown","dog7":"brown","dog8":"brown","dog9":"brown","dog10":"brown","dog11":"brown","number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}

{"array":[1,2,3],"boolean":true,"null":null,"number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}

The LogFilePrinter will display this. I put the extra data in bold on line 14
13: {"array":[1,2,3],"boolean":true,"null":null,"dog":"brown","dog2":"brown","dog3":"brown","dog4":"brown","dog5":"brown","dog6":"brown","dog7":"brown","dog8":"brown","dog9":"brown","dog10":"brown","dog11":"brown","number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}

14: {"array":[1,2,3],"boolean":true,"null":null,"number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}n","dog6":"brown","dog7":"brown","dog8":"brown","dog9":"brown","dog10":"brown","dog11":"brown","number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}

This is the bug in Hadoop https://issues.apache.org/jira/browse/HADOOP-6298
The bold shows the fix for this
package com.pinterest.secor.tools;

import com.pinterest.secor.util.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;

import java.io.IOException;

/**

  • Log file printer displays the content of a log file.
    *

  • @author Pawel Garbacki ([email protected])
    */
    public class LogFilePrinter {
    private boolean mPrintOffsetsOnly;

    public LogFilePrinter(boolean printOffsetsOnly) throws IOException {
    mPrintOffsetsOnly = printOffsetsOnly;
    }

    public void printFile(String path) throws Exception {
    FileSystem fileSystem = FileUtil.getFileSystem(path);
    Path fsPath = new Path(path);
    SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, fsPath,
    new Configuration());
    LongWritable key = (LongWritable) reader.getKeyClass().newInstance();
    BytesWritable value = (BytesWritable) reader.getValueClass().newInstance();
    System.out.println("reading file " + path);
    while (reader.next(key, value)) {
    if (mPrintOffsetsOnly) {
    System.out.println(Long.toString(key.get()));
    } else {
    byte[] nonPaddedBytes = new byte[value.getLength()];
    System.arraycopy(value.getBytes(), 0, nonPaddedBytes, 0, value.getLength());
    System.out.println(Long.toString(key.get()) + ": " + new String(nonPaddedBytes));

    }

    }
    

    }
    }

issue when I start runing secor

Hi

I am trying to run secor using the following command but I am getting the following error. The test run works fine for me.
I set zookeeper and kafka to localhost.

java -ea -Dsecor_group=secor_backup -Dlog4j.configuration=log4j.prod.properties -Dconfig=secor.prod.backup.properties -cp secor-0.2-SNAPSHOT.jar:lib/* com.pinterest.secor.main.ConsumerMain
Aug 11, 2015 12:57:51 PM com.twitter.logging.Logger log
INFO: Starting LatchedStatsListener
Aug 11, 2015 12:57:51 PM com.twitter.logging.Logger log
INFO: Admin HTTP interface started on port 9999.
2015-08-11 12:57:53,320 Thread-7 ERROR Thread Thread[Thread-7,5,main] failed
java.lang.AssertionError: 1 == 2
at com.pinterest.secor.common.ZookeeperConnector.getZookeeperAddresses(ZookeeperConnector.java:67)
at com.pinterest.secor.common.ZookeeperConnector.(ZookeeperConnector.java:57)
at com.pinterest.secor.uploader.Uploader.(Uploader.java:57)
at com.pinterest.secor.consumer.Consumer.init(Consumer.java:71)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:80)
2015-08-11 12:57:53,321 Thread-5 ERROR Thread Thread[Thread-5,5,main] failed
java.lang.AssertionError: 1 == 2
at com.pinterest.secor.common.ZookeeperConnector.getZookeeperAddresses(ZookeeperConnector.java:67)
at com.pinterest.secor.common.ZookeeperConnector.(ZookeeperConnector.java:57)
at com.pinterest.secor.uploader.Uploader.(Uploader.java:57)
at com.pinterest.secor.consumer.Consumer.init(Consumer.java:71)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:80)
2015-08-11 12:57:53,321 Thread-9 ERROR Thread Thread[Thread-9,5,main] failed
java.lang.AssertionError: 1 == 2
at com.pinterest.secor.common.ZookeeperConnector.getZookeeperAddresses(ZookeeperConnector.java:67)
at com.pinterest.secor.common.ZookeeperConnector.(ZookeeperConnector.java:57)
at com.pinterest.secor.uploader.Uploader.(Uploader.java:57)
at com.pinterest.secor.consumer.Consumer.init(Consumer.java:71)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:80)
2015-08-11 12:57:53,322 Thread-8 ERROR Thread Thread[Thread-8,5,main] failed
java.lang.AssertionError: 1 == 2
at com.pinterest.secor.common.ZookeeperConnector.getZookeeperAddresses(ZookeeperConnector.java:67)
at com.pinterest.secor.common.ZookeeperConnector.(ZookeeperConnector.java:57)
at com.pinterest.secor.uploader.Uploader.(Uploader.java:57)
at com.pinterest.secor.consumer.Consumer.init(Consumer.java:71)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:80)
2015-08-11 12:57:53,322 Thread-4 ERROR Thread Thread[Thread-4,5,main] failed
java.lang.AssertionError: 1 == 2
at com.pinterest.secor.common.ZookeeperConnector.getZookeeperAddresses(ZookeeperConnector.java:67)
at com.pinterest.secor.common.ZookeeperConnector.(ZookeeperConnector.java:57)
at com.pinterest.secor.uploader.Uploader.(Uploader.java:57)
at com.pinterest.secor.consumer.Consumer.init(Consumer.java:71)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:80)
2015-08-11 12:57:53,322 Thread-6 ERROR Thread Thread[Thread-6,5,main] failed
java.lang.AssertionError: 1 == 2
at com.pinterest.secor.common.ZookeeperConnector.getZookeeperAddresses(ZookeeperConnector.java:67)
at com.pinterest.secor.common.ZookeeperConnector.(ZookeeperConnector.java:57)
at com.pinterest.secor.uploader.Uploader.(Uploader.java:57)
at com.pinterest.secor.consumer.Consumer.init(Consumer.java:71)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:80)
2015-08-11 12:57:53,323 Thread-3 ERROR Thread Thread[Thread-3,5,main] failed
java.lang.AssertionError: 1 == 2
at com.pinterest.secor.common.ZookeeperConnector.getZookeeperAddresses(ZookeeperConnector.java:67)
at com.pinterest.secor.common.ZookeeperConnector.(ZookeeperConnector.java:57)
at com.pinterest.secor.uploader.Uploader.(Uploader.java:57)
at com.pinterest.secor.consumer.Consumer.init(Consumer.java:71)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:80)

Secor continuously fetching metadata from Kafka cluster after killing the seed Kafka broker

I have a Kafka cluster with two nodes. I supplied node A to Secor to be the seed broker. After killing node A, Secor keep fetching metadata from node B without doing any data persistence to S3.

2015-03-27 14:32:32,194 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Property request.timeout.ms is overridden to 30000
2015-03-27 14:32:32,194 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Fetching metadata from broker id:2,host:ip-10-0-21-156.ap-southeast-1.compute.internal,port:9092 with correlation id 2326 for 3 topic(s) Set(flight-fares, fares, test2)
2015-03-27 14:32:32,199 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Connected to ip-10-0-21-156.ap-southeast-1.compute.internal:9092 for producing
2015-03-27 14:32:32,202 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Disconnecting from ip-10-0-21-156.ap-southeast-1.compute.internal:9092
2015-03-27 14:32:32,203 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  [ConsumerFetcherManager-1427437182052] Added fetcher for partitions ArrayBuffer()
2015-03-27 14:32:32,410 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Verifying properties
2015-03-27 14:32:32,413 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Property client.id is overridden to secor_backup
2015-03-27 14:32:32,413 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Property metadata.broker.list is overridden to ip-10-0-21-156.ap-southeast-1.compute.internal:9092
2015-03-27 14:32:32,414 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Property request.timeout.ms is overridden to 30000
2015-03-27 14:32:32,414 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Fetching metadata from broker id:2,host:ip-10-0-21-156.ap-southeast-1.compute.internal,port:9092 with correlation id 2327 for 3 topic(s) Set(flight-fares, fares, test2)
2015-03-27 14:32:32,416 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Connected to ip-10-0-21-156.ap-southeast-1.compute.internal:9092 for producing
2015-03-27 14:32:32,418 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Disconnecting from ip-10-0-21-156.ap-southeast-1.compute.internal:9092
2015-03-27 14:32:32,420 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  [ConsumerFetcherManager-1427437182052] Added fetcher for partitions ArrayBuffer()
2015-03-27 14:32:32,624 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Verifying properties
2015-03-27 14:32:32,625 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Property client.id is overridden to secor_backup
2015-03-27 14:32:32,625 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Property metadata.broker.list is overridden to ip-10-0-21-156.ap-southeast-1.compute.internal:9092
2015-03-27 14:32:32,625 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Property request.timeout.ms is overridden to 30000
2015-03-27 14:32:32,626 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Fetching metadata from broker id:2,host:ip-10-0-21-156.ap-southeast-1.compute.internal,port:9092 with correlation id 2328 for 3 topic(s) Set(flight-fares, fares, test2)
2015-03-27 14:32:32,627 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Connected to ip-10-0-21-156.ap-southeast-1.compute.internal:9092 for producing
2015-03-27 14:32:32,630 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Disconnecting from ip-10-0-21-156.ap-southeast-1.compute.internal:9092
2015-03-27 14:32:32,631 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  [ConsumerFetcherManager-1427437182052] Added fetcher for partitions ArrayBuffer()
2015-03-27 14:32:32,835 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Verifying properties
2015-03-27 14:32:32,836 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Property client.id is overridden to secor_backup
2015-03-27 14:32:32,836 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Property metadata.broker.list is overridden to ip-10-0-21-156.ap-southeast-1.compute.internal:9092
2015-03-27 14:32:32,837 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Property request.timeout.ms is overridden to 30000
2015-03-27 14:32:32,837 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Fetching metadata from broker id:2,host:ip-10-0-21-156.ap-southeast-1.compute.internal,port:9092 with correlation id 2329 for 3 topic(s) Set(flight-fares, fares, test2)
2015-03-27 14:32:32,839 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Connected to ip-10-0-21-156.ap-southeast-1.compute.internal:9092 for producing
2015-03-27 14:32:32,842 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Disconnecting from ip-10-0-21-156.ap-southeast-1.compute.internal:9092
2015-03-27 14:32:32,843 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  [ConsumerFetcherManager-1427437182052] Added fetcher for partitions ArrayBuffer()
2015-03-27 14:32:33,050 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Verifying properties
2015-03-27 14:32:33,051 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Property client.id is overridden to secor_backup
2015-03-27 14:32:33,051 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Property metadata.broker.list is overridden to ip-10-0-21-156.ap-southeast-1.compute.internal:9092
2015-03-27 14:32:33,051 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Property request.timeout.ms is overridden to 30000
2015-03-27 14:32:33,052 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Fetching metadata from broker id:2,host:ip-10-0-21-156.ap-southeast-1.compute.internal,port:9092 with correlation id 2330 for 3 topic(s) Set(flight-fares, fares, test2)
2015-03-27 14:32:33,054 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Connected to ip-10-0-21-156.ap-southeast-1.compute.internal:9092 for producing
2015-03-27 14:32:33,056 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  Disconnecting from ip-10-0-21-156.ap-southeast-1.compute.internal:9092
2015-03-27 14:32:33,057 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO  [ConsumerFetcherManager-1427437182052] Added fetcher for partitions ArrayBuffer()

Is this a feature or a bug?

MessageParser support other timestamp formats

The data that I have worked with there is no timestamp in a 'Long' format, instead it used 'Date' in a format like "2014-10-10 00:00:00", so I created a new parser which supports any kind of date pattern used by SimpleDateFormat ( http://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html )
This kind of format is very common and keeps compatibility with joda-time.

Since it is a new parser, the code is totally isolated from the rest.

#21

Partitioning support based on values from specific fields

I have a use case where I have a few different schemas flowing into the same Kafka topic. I debated if it made sense to use a topic per schema which would fit fine with the Secor model, but if I do this, I can lose ordering for stream processing consumers from Kafka.

I wanted to know if this was a feature the Secor devs would like to see and had any input on how it should be structured.

The ordering of partitions needs to be taken into account, so was thinking of having a new configuration option "message.partition.keys" that defines both the field names and ordering. This would end up plugging into the existing timestamp partitioners. If "message.partition.keys" is empty, you get the same default behavior, otherwise it defines the order of the partitions, where the time field must be included in the list.

This would allow for paths like:
/schema=test/dt=2014-11-01

or

/dt=2014-11-01/schema=test

Let me know if something like this would be accepted by the project and if there is any feedback where things might get complex. I'd like to implement when I have a chance.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.