pinterest / secor Goto Github PK
View Code? Open in Web Editor NEWSecor is a service implementing Kafka log persistence
License: Apache License 2.0
Secor is a service implementing Kafka log persistence
License: Apache License 2.0
At the core of RateLimiter lies thread synchronisation com.google.common.util.concurrent.RateLimiter#acquire(int)
which blocks threads regardless of allowed message rate.
Could rate limiter be considered on the consumer-thread level?
Hi,
I saw a strange line here:
https://github.com/pinterest/secor/blob/master/src/main/java/com/pinterest/secor/parser/OffsetMessageParser.java#L36
What is the expected behavior ?
It seems to do useless operation ...
Any particular reason the offsets are stored in /consumers
rather than inside zk chroot {kafka.zookeeper.path}/consumers/{group}/offsets/
?
This also might be the reason kafka.tools.ConsumerOffsetChecker
is not working with this layout.
We're looking at deploying secor on ec2 to push data to our S3 buckets but we're currently using ec2 instance profiles to allow instances to get temporary credentials to access AWS resources rather than hard-coding keys in config.
We've had a look at modifying secor to support this but it doesn't seem simple due to using the Hadoop S3 abstractions as they use an old version of jets3t that doesn't support passing the additional token needed to use the temporary credentials.
So it seems we can wait for Hadoop to finally work this this:
Or have an uploader that uses the native AWS Java SDK which supports this already (including token refreshing, etc..)
A kafka tombstone is used with topics having hash keys to indicate data for the key has been removed from the log stream. The log cleaner/compacter can be configured to eventually remove these tombstones. Tombstones are kafka log messages with a null payload.
Currently, it appears that Secor has no support for tracking tombstones through to the archive, as it does not even retain the key of the message when reading from Kafka. It will simply write out the offset of the tombstone, with no information about what was actually deleted (assuming it doesn't run into one of the other reported bugs about null payloads). This information is critical if the downstream consumers are to make sense of deletes in the archived stream.
I believe that this may require an output file format change, as a triple of key/offset/payload must be stored.
Secor only supports Thrift and JSON messages out of the box right now. Secor should provide a configurable Avro Message Parser which uses Avros GenericRecord parser and a configurable Timestamp field.
When a topic gets deleted, the consumer offset for secor stays at whatever it was before, and causes these sorts of ERRORs:
2015-08-27 21:53:50,911 ConsumerFetcherThread-secor_consumer1231423_e25fd1c7e266_1_12-0-1 ERROR [ConsumerFetcherThread-secor_consumer1231423_e25fd1c7e266_1_12-0-1], Current offset 15 for partition [test.endtoendtest.category01,0] out of range; reset offset to 0
However, it doesn't resolve itself. In the mean time, a lot of errors get produced on the kafka end, since secor is requesting out of range offsets or non-existent topics.
I was hoping auto.offset.reset=min would help but looks like it has no effect.
I tried running 9937d7d, and it fails with the following stack trace.
May 04, 2015 8:11:43 PM com.twitter.logging.Logger log
INFO: Starting LatchedStatsListener
May 04, 2015 8:11:43 PM com.twitter.logging.Logger log
INFO: Admin HTTP interface started on port 9990.
2015-05-04 20:11:45,862 [Thread-4] (kafka.utils.Logging$class:83) WARN [secor_mobile_ip-X_26621_13], No broker partitions consumed by consumer thread secor_mobile_ip-X_26621_13-0 for topic mobile
2015-05-04 20:11:45,864 [Thread-5] (kafka.utils.Logging$class:83) WARN [secor_mobile_ip-X_26621_14], No broker partitions consumed by consumer thread secor_mobile_ip-X_26621_14-0 for topic mobile
2015-05-04 20:11:45,873 [Thread-7] (kafka.utils.Logging$class:83) WARN [secor_mobile_ip-X_26621_16], No broker partitions consumed by consumer thread secor_mobile_ip-X_26621_16-0 for topic mobile
2015-05-04 20:11:45,877 [Thread-3] (kafka.utils.Logging$class:83) WARN [secor_mobile_ip-X_26621_12], No broker partitions consumed by consumer thread secor_mobile_ip-X_26621_12-0 for topic mobile
2015-05-04 20:11:45,888 [Thread-6] (kafka.utils.Logging$class:83) WARN [secor_mobile_ip-X_26621_15], No broker partitions consumed by consumer thread secor_mobile_ip-X_26621_15-0 for topic mobile
2015-05-04 20:11:46,912 [secor_mobile_ip-X_26621_14_watcher_executor] (kafka.utils.Logging$class:83) WARN [secor_mobile_ip-X_26621_14], No broker partitions consumed by consumer thread secor_mobile_ip-X_26621_14-0 for topic mobile
2015-05-04 20:11:46,921 [secor_mobile_ip-X_26621_15_watcher_executor] (kafka.utils.Logging$class:83) WARN [secor_mobile_ip-X_26621_15], No broker partitions consumed by consumer thread secor_mobile_ip-X_26621_15-0 for topic mobile
2015-05-04 20:11:46,928 [secor_mobile_ip-X_26621_16_watcher_executor] (kafka.utils.Logging$class:83) WARN [secor_mobile_ip-X_26621_16], No broker partitions consumed by consumer thread secor_mobile_ip-X_26621_16-0 for topic mobile
2015-05-04 20:11:47,143 [Thread-3] (com.pinterest.secor.main.ConsumerMain$1:65) ERROR Thread Thread[Thread-3,5,main] failed
java.lang.NoSuchMethodError: com.google.common.util.concurrent.RateLimiter.acquire()D
at com.pinterest.secor.util.RateLimitUtil.acquire(RateLimitUtil.java:39)
at com.pinterest.secor.reader.MessageReader.read(MessageReader.java:144)
at com.pinterest.secor.consumer.Consumer.consumeNextMessage(Consumer.java:121)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:90)
Reverting to ac688f5 fixes the issue. I am running OpenJDK 1.7.0_65
java version "1.7.0_65"
OpenJDK Runtime Environment (IcedTea 2.5.3) (7u71-2.5.3-0ubuntu0.14.04.1)
OpenJDK 64-Bit Server VM (build 24.65-b04, mixed mode)
Hi,
I've been fighting to get secor working smoothly on our side but still keep hitting further issues. At first the line-delimited format was missing data (previous issue) so I switched over to sequence files which seem to be more reliable.
I have an issue as the files move to a new folder however, where the filename is not the starting offset, but rather the same filename as the final file in the previous bucket. I've been renaming them successfully for a while, but today I hit upon a set of files that looked like this:
/seq/counter/offset=4000000/1_0_00000000000004978500 contains: 4978500 => 4992667 (a)
/seq/counter/offset=4000000/1_0_00000000000004992668 contains: 4992668 => 4999999 (b)
/seq/counter/offset=5000000/1_0_00000000000004920756 contains: 5000000 => 5059705 (c)
/seq/counter/offset=5000000/1_0_00000000000004992668 contains: 5000000 => 5014068 (d)
/seq/counter/offset=5000000/1_0_00000000000005014069 contains: 5014069 => 5027951 (e)
To me that's a fundamental break in the guarentee's secor provides (exactly once), which seems to be related to a bug in the filenames. The set of files a/b/d/e would be correct, and I imagine (d) would have overwritten (c) if they were correctly named 1_0_00000000000005000000
, but because of the naming both files were present.
If max file age set to more than 24 hours Secor fails to upload files when using DateMessageParser or other parsers that partition message based on date. Probably because every new day there will be new file created in each topic partition that renews the result of getModificationAgeSec method in FileRegistry.
For use cases where kafka runs entirely behind corp firewalls, and AWS isn't an option, it would be wonderful to use Secor. Would it be possible to persist things onto hdfs instead of s3 as an option?
Thanks!
Hi,
I know this has been reported before. Yes I read the run_tests header but I am kind of confuse on what are the "Hadoop native libs" refer to? Those in lib/native (The share libraries) or something else? I am running this on mac os Yosemite and my thrift version is 0.9.3
scripts/run_tests.sh: line 55: declare: -A: invalid option
declare: usage: declare [-afFirtx] [-p] [name[=value] ...]
Running tests for Message Type: 0 and ReaderWriter:com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory using filesystem: s3n
running post_and_verify_test
running pkill -f 'com.pinterest.secor.main.ConsumerMain' || true
running pkill -f 'kafka.Kafka' || true
running pkill -f 'org.apache.zookeeper.server.quorum.QuorumPeerMain' || true
running rm -r -f /tmp/secor_dev
running s3cmd del --recursive s3://test-bucket/secor_dev
scripts/run_tests.sh: line 77: s3cmd: command not found
running s3cmd ls -r s3://test-bucket/secor_dev
scripts/run_tests.sh: line 77: s3cmd: command not found
running mkdir -p /tmp/secor_dev/logs
running scripts/run_kafka_class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain scripts/../zookeeper.test.properties > /tmp/secor_dev/logs/zookeeper.log 2>&1 &
running scripts/run_kafka_class.sh kafka.Kafka scripts/../kafka.test.properties > /tmp/secor_dev/logs/kafka_server.log 2>&1 &
running scripts/run_kafka_class.sh kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test > /tmp/secor_dev/logs/create_topic.log 2>&1
running /Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home//bin/java -server -ea -Dlog4j.configuration=log4j.dev.properties -Dconfig=secor.test.backup.properties -Dsecor.s3.filesystem=s3n -Dsecor.file.reader.writer.factory=com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory -cp :lib/ com.pinterest.secor.main.ConsumerMain > /tmp/secor_dev/logs/secor_backup.log 2>&1 &
running /Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home//bin/java -server -ea -Dlog4j.configuration=log4j.dev.properties -Dconfig=secor.test.backup.properties -cp :lib/ com.pinterest.secor.main.TestLogMessageProducerMain -t test -m 100 -p 1 -type 0 -timeshift 0 > /tmp/secor_dev/logs/test_log_message_producer.log 2>&1
Waiting 40 sec for Secor to upload logs to s3
Verifying 100 0
running /Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home//bin/java -server -ea -Dlog4j.configuration=log4j.dev.properties -Dconfig=secor.test.backup.properties -Dsecor.s3.filesystem=s3n -Dsecor.file.reader.writer.factory=com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory -cp :lib/ com.pinterest.secor.main.LogFileVerifierMain -t test -m 100 -q > /tmp/secor_dev/logs/log_verifier_backup.log 2>&1
\e[1;41;97mVerification FAILED\e[0m
See log /tmp/secor_dev/logs/log_verifier_backup.log for more details
2015-12-11 12:52:33,216 main WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2015-12-11 12:52:34,439 main ERROR Log file verifier failed
org.apache.hadoop.security.AccessControlException: Permission denied: s3n://test-bucket/secor_dev/backup/test
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:449)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:427)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleException(Jets3tNativeFileSystemStore.java:411)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:181)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at org.apache.hadoop.fs.s3native.$Proxy4.retrieveMetadata(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:526)
at com.pinterest.secor.util.FileUtil.list(FileUtil.java:102)
at com.pinterest.secor.util.FileUtil.listRecursively(FileUtil.java:119)
at com.pinterest.secor.tools.LogFileVerifier.populateTopicPartitionToOffsetToFiles(LogFileVerifier.java:58)
at com.pinterest.secor.tools.LogFileVerifier.verifyCounts(LogFileVerifier.java:116)
at com.pinterest.secor.main.LogFileVerifierMain.main(LogFileVerifierMain.java:93)
Caused by: org.jets3t.service.impl.rest.HttpException: 403 Forbidden
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:425)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:279)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1052)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2264)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2193)
at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120)
at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174)
... 13 more
See log /tmp/secor_dev/logs/secor_backup.log for more details
2015-12-11 12:51:52,714 Thread-0 INFO Property serializer.class is overridden to kafka.serializer.DefaultEncoder
Exception in thread "Thread-0" java.lang.RuntimeException: Undefined message encoding type: 0
at com.pinterest.secor.tools.TestLogMessageProducer.run(TestLogMessageProducer.java:76)
TIA
Kim
Mainly just wondering if there's a reason why Secor needs to know both the Zookeeper hosts and a Kafka broker in order to function. As far as I know, for older versions of kafka (pre 0.82), talking to zookeeper will be able to provide info of kafka brokers as needed, while for newer versions (0.82 and beyond) clients can manage zookeeper offsets through kafka itself.
But looking at Secor's config files, it seems like both are required. Is there a particular reason?
Secor is leaving the end of messages in the output of smaller messages.
If you load these 2 messages into kafka.
{"array":[1,2,3],"boolean":true,"null":null,"dog":"brown","dog2":"brown","dog3":"brown","dog4":"brown","dog5":"brown","dog6":"brown","dog7":"brown","dog8":"brown","dog9":"brown","dog10":"brown","dog11":"brown","number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}
{"array":[1,2,3],"boolean":true,"null":null,"number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}
The secor file in S3 will look like this. I put the extra data in bold on line 14
13: {"array":[1,2,3],"boolean":true,"null":null,"dog":"brown","dog2":"brown","dog3":"brown","dog4":"brown","dog5":"brown","dog6":"brown","dog7":"brown","dog8":"brown","dog9":"brown","dog10":"brown","dog11":"brown","number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}
14: {"array":[1,2,3],"boolean":true,"null":null,"number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}n","dog6":"brown","dog7":"brown","dog8":"brown","dog9":"brown","dog10":"brown","dog11":"brown","number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}
Looks like this is an issue with BytesWritable. Here is some code testing that.
package secor;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
public class TestWriter {
private SequenceFile.Writer mWriter;
public static void main(String[] args) {
TestWriter testWriter = new TestWriter();
try {
testWriter.init();
testWriter.write(10, "This is the shit");
testWriter.write(11, "Leo");
testWriter.mWriter.close();
testWriter.read();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InstantiationException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void init() throws IOException{
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(config);
Path fsPath = new Path("/tmp/testfile");
this.mWriter = SequenceFile.createWriter(fs, config, fsPath,
LongWritable.class, BytesWritable.class);
}
public void write(long key, String value) throws IOException {
LongWritable writeableKey = new LongWritable(key);
BytesWritable writeableValue = new BytesWritable(value.getBytes());
System.out.println(new String(writeableValue.getBytes()));
this.mWriter.append(writeableKey, writeableValue);
}
public void read() throws IOException, InstantiationException, IllegalAccessException{
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(config);
Path fsPath = new Path("/tmp/testfile");
SequenceFile.Reader reader = new SequenceFile.Reader(fs, fsPath, new Configuration());
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
System.out.println("reading file " + fsPath);
while (reader.next(key, value)) {
System.out.println(new String(value.getBytes()));
}
}
}
Hi,
I started looking into Secor from last 2-3 days and found it really useful. Now, I've a requirement to process individual message before dumping it into Amazon S3. Processing could be kind of transformation, projection etc. After exploring the source code I didn't find any place where we can plug our custom transformation logic.
My first query is: Am I missing something in the source code? If Yes, then please let me know where can I do it? Otherwise I've created another package (In my local repo) which can be used for the same purpose and I would love to contribute my extension in the Secor repo.
Thanks,
Ashish
Hi all,
I think there's a problem with using SimpleDateFormat in the TimestampedMessageParser on java 1.8.0:
$ java -version
java version "1.8.0_20"
Java(TM) SE Runtime Environment (build 1.8.0_20-b26)
Java HotSpot(TM) 64-Bit Server VM (build 25.20-b23, mixed mode)
The error is:
java.lang.ArrayIndexOutOfBoundsException: 13
at sun.util.calendar.BaseCalendar.getCalendarDateFromFixedDate(BaseCalendar.java:453)
at java.util.GregorianCalendar.computeFields(GregorianCalendar.java:2397)
at java.util.GregorianCalendar.computeFields(GregorianCalendar.java:2312)
at java.util.Calendar.complete(Calendar.java:2236)
at java.util.Calendar.get(Calendar.java:1826)
at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1119)
at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966)
at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936)
at java.text.DateFormat.format(DateFormat.java:345)
at com.pinterest.secor.parser.TimestampedMessageParser.generatePartitions(TimestampedMessageParser.java:78)
at com.pinterest.secor.parser.TimestampedMessageParser.extractPartitions(TimestampedMessageParser.java:99)
at com.pinterest.secor.parser.MessageParser.parse(MessageParser.java:40)
at com.pinterest.secor.consumer.Consumer.consumeNextMessage(Consumer.java:139)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:93)
The relevant code at TimestampedMessageParser.java:78 is:
String dt = "dt=" + mDtFormatter.format(date);
This line will probably be a problem as well:
String hr = "hr=" + mHrFormatter.format(date);
For the first cut, I'm going to try to synchronize
on mDtFormatter
. But supposedly, it's not as fast as using a ThreadLocal
variable.
What's the best way of fixing this?
Thanks
We observed a data loss during rebalance. This happens very rarely, but look at code path, this definitely can happen.
The current consumer deleteWriters, then another thread committed offset.
https://github.com/pinterest/secor/blob/master/src/main/java/com/pinterest/secor/uploader/Uploader.java#L96
This if statement return false, nothing happens, All data in this writer will be lost.
Since this is a very rare case, I think maybe the best solution will be throw a runtime exception if the if statement return false?
Was looking at possibility of adding extra partitioning support for a list of fields and was browsing around
PartitionFinalizer.java. It appears that this class is thrift specific:
private ThriftMessageParser mThriftMessageParser;
Not really a problem for me at the moment, but figured I'd let you know.
When invoking com.pinterest.secor.main.ConsumerMain
I include the suggested logging configuration paramater:
-Dlog4j.configuration=log4j.prod.properties
strace tells me that the app never tries to read log4j.prod.properties
$ strace -f -e trace=file java -ea -Dsecor_group=secor_backup -Dlog4j.configuration=log4j.prod.properties -Dconfig=secor.prod.backup.properties -cp secor-0.1-SNAPSHOT.jar:lib/* com.pinterest.secor.main.ConsumerMain 2> out.log
$ grep properties out.log | grep log
[pid 6469] open("/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/logging.properties", O_RDONLY) = 95
The strace is successful and other properties files related to logging are being read.
Unfortunately, the specified log4j.prod.properties is never read.
$ java -version
java version "1.7.0_65"
OpenJDK Runtime Environment (IcedTea 2.5.3) (7u71-2.5.3-0ubuntu0.14.04.1)
OpenJDK 64-Bit Server VM (build 24.65-b04, mixed mode)
secor currently assumes kafka is pointed at the root zk node
I am thinking of using compression codec for Secor, but not sure which ones are available. It'd be great if someone can update the README for this.
Hello there, I haven't been working with Secor too long and I apologise if I have misunderstood. (If so feel free to close this issue.) However, I think there is a flaw in the way un-parsable (malformed) messages are handled.
I noticed during testing that when I fed an un-parsable message into a topic, consumption of it by secor would cause the files for that given topic-partition to be deleted.
I believe the problem to be in the implementation of the logic described here,:
writer.write(message) {
t = message.topic;
p = message.partition;
if message.offset != last_seen_offset[t, p] + 1 {
delete_local_files_for_topic_partition(t, p);
}
write_to_local_file(message);
last_seen_offset[t, p] = message.offset;
}
The problem is that although Secor will gracefully skip over the malformed message, the last_seen_offset will not get updated. This means that when reading the next message message.offset != last_seen_offset[t, p] + 1 will evaluate to true and the files for that topic-partition will be deleted even though no rebalancing event occurred!
I have a modified fork of Secor but as far as I can tell this is logic from before I forked. Would be really great if someone else confirm/disprove this behaviour.
The current implementation of LogFileVerifier is tied to a sequence file format. This should be fixed to support any file format accessible through FileReaderWriter implementations.
Does Secor delete local files that have been uploaded to s3? I saw this setting:
# If greater than 0, upon starup Secor will clean up directories and files under secor.local.path
# that are older than this value.
secor.local.log.delete.age.hours=-1
But the documentation seems to say it only does the check on startup. Is/will there be a setting for doing cleanup by age during runtime?
Thx
Hey Team Pinterest!
Awesome project and even more awesome you guys released it as OSS.
I'm having an issue completing the run_tests.sh
, even with modifying the s3://
bucket to my own with full privileges. This might be the same issue as this ticket
Here's the error output:
WARNING: Remote list is empty.
running mkdir -p /tmp/secor_dev/logs
running ./bin/scripts/run_kafka_class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain zookeeper.test.properties > /tmp/secor_dev/logs/zookeeper.log 2>&1 &
running ./bin/scripts/run_kafka_class.sh kafka.Kafka kafka.test.properties > /tmp/secor_dev/logs/kafka_server.log 2>&1 &
running ./bin/scripts/run_kafka_class.sh kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test > /tmp/secor_dev/logs/create_topic.log 2>&1
running java -server -ea -Dlog4j.configuration=log4j.dev.properties -Dconfig=secor.dev.backup.properties -Dsecor.file.reader.writer=com.pinterest.secor.io.impl.DelimitedTextFileReaderWriter -cp secor-0.1-SNAPSHOT.jar:lib/* com.pinterest.secor.main.ConsumerMain > /tmp/secor_dev/logs/secor_backup.log 2>&1 &
running java -server -ea -Dlog4j.configuration=log4j.dev.properties -Dconfig=secor.dev.backup.properties -cp secor-0.1-SNAPSHOT.jar:lib/* com.pinterest.secor.main.TestLogMessageProducerMain -t test -m 1000 -p 1 -type json > /tmp/secor_dev/logs/test_log_message_producer.log 2>&1
Waiting 120 sec for Secor to upload logs to s3
running java -server -ea -Dlog4j.configuration=log4j.dev.properties -Dconfig=secor.dev.backup.properties -Dsecor.file.reader.writer=com.pinterest.secor.io.impl.DelimitedTextFileReaderWriter -cp secor-0.1-SNAPSHOT.jar:lib/* com.pinterest.secor.main.LogFileVerifierMain -t test -m 1000 -q > /tmp/secor_dev/logs/log_verifier_backup.log 2>&1
Verification FAILED
See log /tmp/secor_dev/logs/log_verifier_backup.log for more details
Error: Could not find or load main class com.pinterest.secor.main.LogFileVerifierMain
I'm trying to configure secor to use AES256 encryption in the target S3 bucket. Does anyone have past experience in getting this working?
The message committed to S3 is restricted to format (Long, Bytes) which is most frequently used. Anyway sometimes is necessary to use other formats like 'Text' for instance.
This is kind of crazy but the underscore present in https://github.com/pinterest/secor/blob/master/src/main/config/secor.common.properties#L21
and the corresponding https://github.com/pinterest/secor/blob/master/src/main/java/com/pinterest/secor/common/SecorConfig.java#L172
makes it really hard to interpolate Bash environment variables into the typesafe config format, as one might be inspired to do in the case of Docker's environment files and a tool like https://github.com/ralph-tice/offspring.sh
So, is it crazy to ask that configuration keys don't include _, which are the only reasonably delimiter to use for POSIX environment variables?
For example, I would use MYORG_SECOR_KAFKA_TOPIC_FILTER=mytopicprefix.*
perf_topic_prefix has the same problem but I don't see its usage outside of tests...
I get the following warning each time secor writes to S3. I think that the requests just make sure the file does not exist before actually uploading to S3, is that the case or should I worry about them?
I don't think the local "WARN No writer found for path" at the end is related, would that be a problem?
2014-10-27 16:05:25,225 [Thread-3] (org.jets3t.service.impl.rest.httpclient.RestS3Service:393) WARN Response '/raw_logs%2Fsecor_backup%2Fmat_postbacks%2Foffset%3D0%2F1_0_00000000000001039472' - Unexpected response code 404, expected 200
2014-10-27 16:05:25,238 [Thread-3] (org.jets3t.service.impl.rest.httpclient.RestS3Service:393) WARN Response '/raw_logs%2Fsecor_backup%2Fmat_postbacks%2Foffset%3D0%2F1_0_00000000000001039472_%24folder%24' - Unexpected response code 404, expected 200
2014-10-27 16:05:25,267 [Thread-3] (org.jets3t.service.impl.rest.httpclient.RestS3Service:393) WARN Response '/raw_logs%2Fsecor_backup%2Fmat_postbacks%2Foffset%3D0%2F1_0_00000000000001039472' - Unexpected response code 404, expected 200
2014-10-27 16:05:25,282 [Thread-3] (org.jets3t.service.impl.rest.httpclient.RestS3Service:393) WARN Response '/raw_logs%2Fsecor_backup%2Fmat_postbacks%2Foffset%3D0%2F1_0_00000000000001039472_%24folder%24' - Unexpected response code 404, expected 200
2014-10-27 16:05:25,350 [Thread-3] (com.pinterest.secor.common.FileRegistry:157) WARN No writer found for path /var/log/secor_data/message_logs/backup/18704_12/mat_postbacks/offset=0/1_0_00000000000001039472
Oct 27, 2014 4:05:25 PM com.twitter.common.zookeeper.DistributedLockImpl unlock
INFO: Cleaning up this locks ephemeral node.
Oct 27, 2014 4:05:25 PM com.twitter.common.zookeeper.DistributedLockImpl cleanup
DelimitedTextFile assumed LogFilePath's offset is the starting offset, and entire file is continuous stream.
First problem will be a rebalance happened after datetime partition switch, but this doesn't happen very often at same time with a rebalance event.
Second situation is an old thread picked a topic again after a period time, it will still use previous committed offset as filename which can be days ago.
Currently the topic name is used as-is, which isn't necessarily a bad thing, but we'd like to have some more control over the final structure on S3 and thus would like to "transform" the directory path where things are stored on S3.
Basically what we want to do is replace .
(dots) with /
(slashes) in the topic name (as one is not allowed to use slashes in topic names (for obvious reasons)).
@pgarbacki any thoughts about this? What would be the best way to approach this? Anyone else feel like they would benefit from this?
I've upgrade Secor to 0.9 and I've switched to s3a file system.
After upgrade I've started to receive the following errors (stacktrace can be in a little mess, because I've retrieved from our temporary log system) :
INFO: Cleaning up!
INFO: Cleaning up this locks ephemeral node.
Dec 17, 2015 10:30:29 AM com.twitter.common.zookeeper.DistributedLockImpl unlock
Dec 17, 2015 10:30:29 AM com.twitter.common.zookeeper.DistributedLockImpl cleanup
2015-12-17 10:30:29,299 [Thread-3] (com.pinterest.secor.main.ConsumerMain$1:65) ERROR Thread Thread[Thread-3,5,main] failed
java.lang.RuntimeException: Failed to apply upload policy
at com.pinterest.secor.uploader.HadoopS3UploadManager$1.run(HadoopS3UploadManager.java:57)
at com.pinterest.secor.util.FileUtil.moveToCloud(FileUtil.java:156)
at com.pinterest.secor.util.FileUtil.getFileSystem(FileUtil.java:71)
at com.pinterest.secor.uploader.Uploader.uploadFiles(Uploader.java:95)
at com.pinterest.secor.uploader.Uploader.checkTopicPartition(Uploader.java:186)
at com.pinterest.secor.uploader.Uploader.applyPolicy(Uploader.java:207)
at com.pinterest.secor.consumer.Consumer.checkUploadPolicy(Consumer.java:112)
at com.pinterest.secor.consumer.Consumer.checkUploadPolicy(Consumer.java:110)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:102)
at com.pinterest.secor.uploader.FutureHandle.get(FutureHandle.java:34)
Caused by: java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold(I)V
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2653)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:285)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
Caused by: java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold(I)V
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.lang.Thread.run(Thread.java:745)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
Problems is with AWS libs:
I use JSON date parser but in Twitter's streams haven't timestamp. I got the error like this
java.lang.NumberFormatException: For input string: "Mon Dec 14 16:28:09 +0000 2015"
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1250)
at java.lang.Double.valueOf(Double.java:504)
at com.pinterest.secor.parser.JsonMessageParser.extractTimestampMillis(JsonMessageParser.java:39)
at com.pinterest.secor.parser.TimestampedMessageParser.extractPartitions(TimestampedMessageParser.java:104)
at com.pinterest.secor.parser.MessageParser.parse(MessageParser.java:40)
at com.pinterest.secor.consumer.Consumer.consumeNextMessage(Consumer.java:139)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:93)
Could you please give me any suggestion?
I'd like to look at secor support for IBM Message Hub (kafka) on Bluemix, however I have just seen:
You cannot use the Kafka high-level consumer API with Message Hub. Instead, use the new Kafka 0.9 consumer API.
Source: https://www.ng.bluemix.net/docs/services/MessageHub/index.html
I see from the secor README.md that it relies on the high level consumer API's.
I have setup secor but when I run test_run, i get the following error. note that I was successfully able to set it up with another bucket and it works find for that. but when I change the s3 bucket to a different name(I have also created the bucket manually) it provides such error.
See log /tmp/secor_dev/logs/log_verifier_backup.log for more details
2015-08-14 14:06:46,037 main ERROR Log file verifier failed
org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: S3 HEAD request failed for '/sector-test%2Fbackup%2Ftest' - ResponseCode=400, ResponseMessage=Bad Request
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleServiceException(Jets3tNativeFileSystemStore.java:229)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:85)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:62)
at org.apache.hadoop.fs.s3native.$Proxy1.retrieveMetadata(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:407)
at com.pinterest.secor.util.FileUtil.list(FileUtil.java:55)
at com.pinterest.secor.util.FileUtil.listRecursively(FileUtil.java:71)
at com.pinterest.secor.tools.LogFileVerifier.populateTopicPartitionToOffsetToFiles(LogFileVerifier.java:62)
at com.pinterest.secor.tools.LogFileVerifier.verifyCounts(LogFileVerifier.java:120)
at com.pinterest.secor.main.LogFileVerifierMain.main(LogFileVerifierMain.java:93)
com.pinterest.secor.message.Message.fieldsToString not null safe.
Somehow we got a null payload in kafka, and secor throws a NullPointerException.
15/05/11 19:12:24 ERROR main.ConsumerMain: Thread Thread[Thread-4,5,main] failed
java.lang.NullPointerException
at java.lang.String.(String.java:554)
at com.pinterest.secor.message.Message.fieldsToString(Message.java:35)
at com.pinterest.secor.message.Message.toString(Message.java:44)
at java.lang.String.valueOf(String.java:2981)
at java.lang.StringBuilder.append(StringBuilder.java:131)
at com.pinterest.secor.reader.MessageReader.read(MessageReader.java:141)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:92)
I would fix this myself but I can no longer build secor due to twitter dependencies that are no longer available.
[ERROR] Failed to execute goal com.twitter:maven-finagle-thrift-plugin:0.0.9:compile (thrift-sources) on project secor: Execution thrift-sources of goal com.twitter:maven-finagle-thrift-plugin:0.0.9:compile failed: A required class was missing while executing com.twitter:maven-finagle-thrift-plugin:0.0.9:compile: com/google/common/collect/ImmutableSet
I tried lots of ways to find the dependencies but nothing would fix it completely.
I am a committer on this project and really would like to contribute some more, if someone could help with the dependency problems
I've tried use s3a file system in my configuration, but I've get error:
java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key
must be specified as the username or password (respectively) of a s3n URL,
or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey
properties (respectively).
After short investigation I've noticed that support for s3a introduced in commit Support s3a hadoop file system. was accidentally destroyed by next commit Openstack Swift integration.
In the class com.pinterest.secor.uploaderHadoopS3Uploader the method mConfig.getS3Prefix() was replaced with com.pinterest.secor.util.FileUtil.getPrefix() which doesn't support "s3a". Probably only change which have to be made to restore s3a support is change in the com.pinterest.secor.util.FileUtil.getPrefix() method.
Line 90:
prefix = "s3n://" + config.getS3Bucket() + "/" + config.getS3Path();
should be changed to:
prefix = config.getS3Prefix();
I have a message parser that generates partitions in S3 of the form "'y'=yyyy/'m'=MM/'d'=dd/'H'=HH". When I run the partition finalizer I get this error:
2015-11-18 23:21:42,824 main ERROR Partition finalizer failed
java.lang.AssertionError: wrong partition format: y=2015/m=11/d=18
at com.pinterest.secor.parser.PartitionFinalizer.finalizePartitionsUpTo(PartitionFinalizer.java:143)
at com.pinterest.secor.parser.PartitionFinalizer.finalizePartitions(PartitionFinalizer.java:190)
at com.pinterest.secor.main.PartitionFinalizerMain.main(PartitionFinalizerMain.java:45)
I understand if having the '=' sign might cause a conflict when storing in Hive, but since I don't use Hive, it would be nice if this didn't fail.
There appears to be something wrong wit the pom.xml file. A number of warnings are emitted, and then the build fails unable to find the com.twitter.finagle.thrift package.
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.pinterest:secor:jar:0.2-SNAPSHOT
[WARNING] 'build.plugins.plugin.(groupId:artifactId)' must be unique but found duplicate declaration of plugin org.codehaus.mojo:exec-maven-plugin @ line 269, column 21
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
...
[WARNING] Checksum validation failed, no checksums available from the repository for http://maven.twttr.com/org/apache/thrift/libthrift/0.5.0/libthrift-0.5.0.pom
...
[WARNING] Checksum validation failed, no checksums available from the repository for http://maven.twttr.com/thrift/libthrift/0.5.0/libthrift-0.5.0.jar
...
[ERROR] COMPILATION ERROR :
[ERROR] /home/build/jenkins_tempdir/workspace/secor/target/generated-sources/thrift/gen-java/com/pinterest/secor/thrift/TestMessage.java:[35,33] error: package com.twitter.finagle.thrift does not exist
Oddly, it appears to build without incident on OSX 10.10.5
Hello! This DateMessageParser test is locale-dependent, and fails in non-English locales (which is how I discovered it!).
What would the best solution for this issue? Adding a new locale parameter (e.g. message.timestamp.locale
) or just removing the test?
The default behaviour of Secor is to backup data to Amazon S3.
What about restoring this data? I mean, what if a cluster or a broker breaks, you loose all the log data or it is deleted and you need it again?
Does this project also serves for this??
Thanks in advance
I observed an issue with the lock used here in Uploader
:
I haven't had time to debug it, but I suspect it's not being cleanup up correctly when the process is not shut down cleanly. Based on my understanding of Zookeeper, this shouldn't be happening.
CC @pgarbacki
@lefthandmagic I received report from @yswu with evidence that the end-to-end test is broken for the json read-writer. Would you mind taking a look at this? Thanks!
When using the LogFilePrinter secor is not using length to trim the byte array from the BytesWritable.
If you load these 2 messages into kafka.
{"array":[1,2,3],"boolean":true,"null":null,"dog":"brown","dog2":"brown","dog3":"brown","dog4":"brown","dog5":"brown","dog6":"brown","dog7":"brown","dog8":"brown","dog9":"brown","dog10":"brown","dog11":"brown","number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}
{"array":[1,2,3],"boolean":true,"null":null,"number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}
The LogFilePrinter will display this. I put the extra data in bold on line 14
13: {"array":[1,2,3],"boolean":true,"null":null,"dog":"brown","dog2":"brown","dog3":"brown","dog4":"brown","dog5":"brown","dog6":"brown","dog7":"brown","dog8":"brown","dog9":"brown","dog10":"brown","dog11":"brown","number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}
14: {"array":[1,2,3],"boolean":true,"null":null,"number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}n","dog6":"brown","dog7":"brown","dog8":"brown","dog9":"brown","dog10":"brown","dog11":"brown","number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Hello World"}
This is the bug in Hadoop https://issues.apache.org/jira/browse/HADOOP-6298
The bold shows the fix for this
package com.pinterest.secor.tools;
import com.pinterest.secor.util.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import java.io.IOException;
/**
Log file printer displays the content of a log file.
*
@author Pawel Garbacki ([email protected])
*/
public class LogFilePrinter {
private boolean mPrintOffsetsOnly;
public LogFilePrinter(boolean printOffsetsOnly) throws IOException {
mPrintOffsetsOnly = printOffsetsOnly;
}
public void printFile(String path) throws Exception {
FileSystem fileSystem = FileUtil.getFileSystem(path);
Path fsPath = new Path(path);
SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, fsPath,
new Configuration());
LongWritable key = (LongWritable) reader.getKeyClass().newInstance();
BytesWritable value = (BytesWritable) reader.getValueClass().newInstance();
System.out.println("reading file " + path);
while (reader.next(key, value)) {
if (mPrintOffsetsOnly) {
System.out.println(Long.toString(key.get()));
} else {
byte[] nonPaddedBytes = new byte[value.getLength()];
System.arraycopy(value.getBytes(), 0, nonPaddedBytes, 0, value.getLength());
System.out.println(Long.toString(key.get()) + ": " + new String(nonPaddedBytes));
}
}
}
}
Hi
I am trying to run secor using the following command but I am getting the following error. The test run works fine for me.
I set zookeeper and kafka to localhost.
java -ea -Dsecor_group=secor_backup -Dlog4j.configuration=log4j.prod.properties -Dconfig=secor.prod.backup.properties -cp secor-0.2-SNAPSHOT.jar:lib/* com.pinterest.secor.main.ConsumerMain
Aug 11, 2015 12:57:51 PM com.twitter.logging.Logger log
INFO: Starting LatchedStatsListener
Aug 11, 2015 12:57:51 PM com.twitter.logging.Logger log
INFO: Admin HTTP interface started on port 9999.
2015-08-11 12:57:53,320 Thread-7 ERROR Thread Thread[Thread-7,5,main] failed
java.lang.AssertionError: 1 == 2
at com.pinterest.secor.common.ZookeeperConnector.getZookeeperAddresses(ZookeeperConnector.java:67)
at com.pinterest.secor.common.ZookeeperConnector.(ZookeeperConnector.java:57)
at com.pinterest.secor.uploader.Uploader.(Uploader.java:57)
at com.pinterest.secor.consumer.Consumer.init(Consumer.java:71)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:80)
2015-08-11 12:57:53,321 Thread-5 ERROR Thread Thread[Thread-5,5,main] failed
java.lang.AssertionError: 1 == 2
at com.pinterest.secor.common.ZookeeperConnector.getZookeeperAddresses(ZookeeperConnector.java:67)
at com.pinterest.secor.common.ZookeeperConnector.(ZookeeperConnector.java:57)
at com.pinterest.secor.uploader.Uploader.(Uploader.java:57)
at com.pinterest.secor.consumer.Consumer.init(Consumer.java:71)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:80)
2015-08-11 12:57:53,321 Thread-9 ERROR Thread Thread[Thread-9,5,main] failed
java.lang.AssertionError: 1 == 2
at com.pinterest.secor.common.ZookeeperConnector.getZookeeperAddresses(ZookeeperConnector.java:67)
at com.pinterest.secor.common.ZookeeperConnector.(ZookeeperConnector.java:57)
at com.pinterest.secor.uploader.Uploader.(Uploader.java:57)
at com.pinterest.secor.consumer.Consumer.init(Consumer.java:71)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:80)
2015-08-11 12:57:53,322 Thread-8 ERROR Thread Thread[Thread-8,5,main] failed
java.lang.AssertionError: 1 == 2
at com.pinterest.secor.common.ZookeeperConnector.getZookeeperAddresses(ZookeeperConnector.java:67)
at com.pinterest.secor.common.ZookeeperConnector.(ZookeeperConnector.java:57)
at com.pinterest.secor.uploader.Uploader.(Uploader.java:57)
at com.pinterest.secor.consumer.Consumer.init(Consumer.java:71)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:80)
2015-08-11 12:57:53,322 Thread-4 ERROR Thread Thread[Thread-4,5,main] failed
java.lang.AssertionError: 1 == 2
at com.pinterest.secor.common.ZookeeperConnector.getZookeeperAddresses(ZookeeperConnector.java:67)
at com.pinterest.secor.common.ZookeeperConnector.(ZookeeperConnector.java:57)
at com.pinterest.secor.uploader.Uploader.(Uploader.java:57)
at com.pinterest.secor.consumer.Consumer.init(Consumer.java:71)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:80)
2015-08-11 12:57:53,322 Thread-6 ERROR Thread Thread[Thread-6,5,main] failed
java.lang.AssertionError: 1 == 2
at com.pinterest.secor.common.ZookeeperConnector.getZookeeperAddresses(ZookeeperConnector.java:67)
at com.pinterest.secor.common.ZookeeperConnector.(ZookeeperConnector.java:57)
at com.pinterest.secor.uploader.Uploader.(Uploader.java:57)
at com.pinterest.secor.consumer.Consumer.init(Consumer.java:71)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:80)
2015-08-11 12:57:53,323 Thread-3 ERROR Thread Thread[Thread-3,5,main] failed
java.lang.AssertionError: 1 == 2
at com.pinterest.secor.common.ZookeeperConnector.getZookeeperAddresses(ZookeeperConnector.java:67)
at com.pinterest.secor.common.ZookeeperConnector.(ZookeeperConnector.java:57)
at com.pinterest.secor.uploader.Uploader.(Uploader.java:57)
at com.pinterest.secor.consumer.Consumer.init(Consumer.java:71)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:80)
I have a Kafka cluster with two nodes. I supplied node A to Secor to be the seed broker. After killing node A, Secor keep fetching metadata from node B without doing any data persistence to S3.
2015-03-27 14:32:32,194 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Property request.timeout.ms is overridden to 30000
2015-03-27 14:32:32,194 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Fetching metadata from broker id:2,host:ip-10-0-21-156.ap-southeast-1.compute.internal,port:9092 with correlation id 2326 for 3 topic(s) Set(flight-fares, fares, test2)
2015-03-27 14:32:32,199 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Connected to ip-10-0-21-156.ap-southeast-1.compute.internal:9092 for producing
2015-03-27 14:32:32,202 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Disconnecting from ip-10-0-21-156.ap-southeast-1.compute.internal:9092
2015-03-27 14:32:32,203 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO [ConsumerFetcherManager-1427437182052] Added fetcher for partitions ArrayBuffer()
2015-03-27 14:32:32,410 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Verifying properties
2015-03-27 14:32:32,413 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Property client.id is overridden to secor_backup
2015-03-27 14:32:32,413 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Property metadata.broker.list is overridden to ip-10-0-21-156.ap-southeast-1.compute.internal:9092
2015-03-27 14:32:32,414 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Property request.timeout.ms is overridden to 30000
2015-03-27 14:32:32,414 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Fetching metadata from broker id:2,host:ip-10-0-21-156.ap-southeast-1.compute.internal,port:9092 with correlation id 2327 for 3 topic(s) Set(flight-fares, fares, test2)
2015-03-27 14:32:32,416 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Connected to ip-10-0-21-156.ap-southeast-1.compute.internal:9092 for producing
2015-03-27 14:32:32,418 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Disconnecting from ip-10-0-21-156.ap-southeast-1.compute.internal:9092
2015-03-27 14:32:32,420 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO [ConsumerFetcherManager-1427437182052] Added fetcher for partitions ArrayBuffer()
2015-03-27 14:32:32,624 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Verifying properties
2015-03-27 14:32:32,625 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Property client.id is overridden to secor_backup
2015-03-27 14:32:32,625 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Property metadata.broker.list is overridden to ip-10-0-21-156.ap-southeast-1.compute.internal:9092
2015-03-27 14:32:32,625 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Property request.timeout.ms is overridden to 30000
2015-03-27 14:32:32,626 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Fetching metadata from broker id:2,host:ip-10-0-21-156.ap-southeast-1.compute.internal,port:9092 with correlation id 2328 for 3 topic(s) Set(flight-fares, fares, test2)
2015-03-27 14:32:32,627 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Connected to ip-10-0-21-156.ap-southeast-1.compute.internal:9092 for producing
2015-03-27 14:32:32,630 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Disconnecting from ip-10-0-21-156.ap-southeast-1.compute.internal:9092
2015-03-27 14:32:32,631 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO [ConsumerFetcherManager-1427437182052] Added fetcher for partitions ArrayBuffer()
2015-03-27 14:32:32,835 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Verifying properties
2015-03-27 14:32:32,836 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Property client.id is overridden to secor_backup
2015-03-27 14:32:32,836 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Property metadata.broker.list is overridden to ip-10-0-21-156.ap-southeast-1.compute.internal:9092
2015-03-27 14:32:32,837 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Property request.timeout.ms is overridden to 30000
2015-03-27 14:32:32,837 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Fetching metadata from broker id:2,host:ip-10-0-21-156.ap-southeast-1.compute.internal,port:9092 with correlation id 2329 for 3 topic(s) Set(flight-fares, fares, test2)
2015-03-27 14:32:32,839 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Connected to ip-10-0-21-156.ap-southeast-1.compute.internal:9092 for producing
2015-03-27 14:32:32,842 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Disconnecting from ip-10-0-21-156.ap-southeast-1.compute.internal:9092
2015-03-27 14:32:32,843 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO [ConsumerFetcherManager-1427437182052] Added fetcher for partitions ArrayBuffer()
2015-03-27 14:32:33,050 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Verifying properties
2015-03-27 14:32:33,051 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Property client.id is overridden to secor_backup
2015-03-27 14:32:33,051 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Property metadata.broker.list is overridden to ip-10-0-21-156.ap-southeast-1.compute.internal:9092
2015-03-27 14:32:33,051 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Property request.timeout.ms is overridden to 30000
2015-03-27 14:32:33,052 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Fetching metadata from broker id:2,host:ip-10-0-21-156.ap-southeast-1.compute.internal,port:9092 with correlation id 2330 for 3 topic(s) Set(flight-fares, fares, test2)
2015-03-27 14:32:33,054 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Connected to ip-10-0-21-156.ap-southeast-1.compute.internal:9092 for producing
2015-03-27 14:32:33,056 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO Disconnecting from ip-10-0-21-156.ap-southeast-1.compute.internal:9092
2015-03-27 14:32:33,057 [secor_backup_secor-staging-a-1.bezurk.org_30798_12-leader-finder-thread] (kafka.utils.Logging$class:68) INFO [ConsumerFetcherManager-1427437182052] Added fetcher for partitions ArrayBuffer()
Is this a feature or a bug?
Is it possible to use the system time of the upload to generate a s3 date structure instead of parsing messages to extract a timestamp there?
The data that I have worked with there is no timestamp in a 'Long' format, instead it used 'Date' in a format like "2014-10-10 00:00:00", so I created a new parser which supports any kind of date pattern used by SimpleDateFormat ( http://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html )
This kind of format is very common and keeps compatibility with joda-time.
Since it is a new parser, the code is totally isolated from the rest.
its straightforward no?
I have a use case where I have a few different schemas flowing into the same Kafka topic. I debated if it made sense to use a topic per schema which would fit fine with the Secor model, but if I do this, I can lose ordering for stream processing consumers from Kafka.
I wanted to know if this was a feature the Secor devs would like to see and had any input on how it should be structured.
The ordering of partitions needs to be taken into account, so was thinking of having a new configuration option "message.partition.keys" that defines both the field names and ordering. This would end up plugging into the existing timestamp partitioners. If "message.partition.keys" is empty, you get the same default behavior, otherwise it defines the order of the partitions, where the time field must be included in the list.
This would allow for paths like:
/schema=test/dt=2014-11-01
or
/dt=2014-11-01/schema=test
Let me know if something like this would be accepted by the project and if there is any feedback where things might get complex. I'd like to implement when I have a chance.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.