adyliu / jafka Goto Github PK
View Code? Open in Web Editor NEWa fast and simple distributed publish-subscribe messaging system (mq)
Home Page: https://github.com/adyliu/jafka/wiki
License: Apache License 2.0
a fast and simple distributed publish-subscribe messaging system (mq)
Home Page: https://github.com/adyliu/jafka/wiki
License: Apache License 2.0
现在看到只有topic方式的信息,不知是否打算支持queue方式
如果producer没有对broker的域名解析怎么办(不改hosts的情况)?
Support partitions number for each broker when using config mode.
Deleter operation can delete the topic who is never used any more.
The messages of this topic will be deleted also.
请问jafka和kafka那个版本接口兼容,谢谢。
看配置只能支持多broker(zk rr路由选择)、多分区文件,没找到主备的配置
jafka 监听IP的参数是什么?现在可以支持不?
我用的ZK3.5 ,jafka是1.4 .用demo里面的测试代码 发送消息 在获取brokerList 节点时 格式化错误,"partitions" 分区什么,的
增加服务器新端口或者启用新的groupid,都会收到之前已经发送过的历史消息,如果不想收到以往的消息,是否有参数进行设置?
If the cache in the thread is full then the FetcherRunner thread is pause and never give up.
com.sohu.jafka.consumer.FetcherRunnable.fetchOnce()
fixed to :
try {
read += processMessages(messages, info);
} catch (IOException e) {
throw e;
} catch (InterruptedException e) {
if (!stopped) {
logger.error("error in FetcherRunnable for " + info, e);
info.enqueueError(e, info.getFetchedOffset());
}
throw e;
}catch (RuntimeException e) {
if (!stopped) {
logger.error("error in FetcherRunnable for " + info, e);
info.enqueueError(e, info.getFetchedOffset());
}
throw e;
}
The client and server must handle the bad topic name.
Producer sends some messages with new topic(not registed at broker).
The producer processes the partitions changing incorrectly if the broker creates this topic more than one partition.
See the code(https://github.com/adyliu/jafka/blob/v1.0/src/main/java/com/sohu/jafka/producer/ZKBrokerPartitionInfo.java):
class BrokerTopicsListener implements IZkChildListener {
private Map<String, SortedSet<Partition>> originalBrokerTopicsParitions;
private Map<Integer, Broker> originBrokerIds;
public BrokerTopicsListener(Map<String, SortedSet<Partition>> originalBrokerTopicsParitions, Map<Integer, Broker> originBrokerIds) {
super();
this.originalBrokerTopicsParitions = originalBrokerTopicsParitions;
this.originBrokerIds = originBrokerIds;
We need a new map before the producer creating a default partition for new topic.
public BrokerTopicsListener(Map<String, SortedSet<Partition>> originalBrokerTopicsParitions,
Map<Integer, Broker> originBrokerIds) {
super();
this.originalBrokerTopicsParitions = new HashMap<String, SortedSet<Partition>>(
originalBrokerTopicsParitions);
this.originBrokerIds = new HashMap<Integer, Broker>(originBrokerIds);
此方法如下:
public void start(Properties mainProperties, Properties consumerProperties, Properties producerProperties) {
final ServerConfig config = new ServerConfig(mainProperties);
final ConsumerConfig consumerConfig = consumerProperties == null ? null : new ConsumerConfig(consumerProperties);
final ProducerConfig producerConfig = consumerConfig == null ? null : new ProducerConfig(producerProperties);
start(config, consumerConfig, producerConfig);
}
ProducerConfig producerConfig ...
这行代码判断是否有误,应该是:
final ProducerConfig producerConfig = producerProperties == null ? null : new ProducerConfig(producerProperties);
是否应该是判断 producerProperties == null
而不是 consumerConfig == null
.
We use string format to produce message.
A string producer is useful.
add和poll 是非阻塞式的,用阻塞的添加或获取 是否更合适?
Creating a tool to watching the consumer status is amusive.
It can watch the consumer status of all topic with groups.
I keep getting Connection refused error when try to run sample producer program from remote system as explained in Quick Tutorial demo.
The exception as follows:
Caused by: java.lang.RuntimeException: Connection refused: connect
at com.sohu.jafka.producer.SyncProducer.connect(SyncProducer.java:148)
at com.sohu.jafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.java:125)
at com.sohu.jafka.producer.SyncProducer.send(SyncProducer.java:97)
The following observations have been found in Server logs:
2012-09-13 12:52:48.783 INFO NIOServerCnxn$Factory Accepted socket connection from /10.94.22.50:30660
2012-09-13 12:52:48.790 INFO NIOServerCnxn Client attempting to establish new session at /10.94.22.50:30660
2012-09-13 12:52:48.795 INFO NIOServerCnxn Established session 0x139be612cff0006 with negotiated timeout 6000 for clien t /10.94.22.50:30660
2012-09-13 12:52:53.042 INFO PrepRequestProcessor Processed session termination for sessionid: 0x139be612cff0006
2012-09-13 12:52:53.047 INFO NIOServerCnxn Closed socket connection for client /10.94.22.50:30660 which had sessionid 0 x139be612cff0006
If I run the same program locally in server where Jafka process is running; I see no issues and able to send messages as expected. But If I run the same program remotely; I see connection issues.
I still debugging the issue; appreciate any comments or help in this regard.
Thank you,
Vish
zookeeperconsumerconnector$zkrebalancerlistener - [adyliu_tc_156_44-1340242264959-60554954] rebalanced failed. try #1, cost 12 ms
The deleter operation is dangerous. So we need password checking.
The password checking will be use in any important operations.
[hadoop@ZK1 bin]$ ./consumer-console.sh --zookeeper 192.168.1.97:2181,192.168.1.96:2181,192.168.1.98:2181 --topic ly --from-beginning
log4j:WARN No appenders could be found for logger (com.sohu.jafka.consumer.ZookeeperConsumerConnector).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.lang.NullPointerException
at com.github.zkclient.ZkClient.deleteRecursive(ZkClient.java:442)
at com.sohu.jafka.console.ConsoleConsumer.tryCleanupZookeeper(ConsoleConsumer.java:174)
at com.sohu.jafka.console.ConsoleConsumer.main(ConsoleConsumer.java:106)
消费者接受消息为什么会报错啊,我用给的java demo 是可以接收的 ,但是这个却不可以。求解
jafka 还更新吗
com.sohu.jafka.common.NoBrokersForPartitionException: Partition= demo
error occur when i use zookeeper to send message
A simple message dumper is useful while debuging message content.
Dumping binary message maybe is difficult. But we can dump the string (utf-8) message.
If a newly broker starts up which means there is no file in its data directory, it cannot accept the previous topic messages because it could not register itself to /brokers/topics/[topic]/[broker_id]. As a result, it can only accept new topic messages. One solution is to create topic-partition directory in the data directory manually. Or is this the actual usage??
throw exception with illegal topic directory
在 Log.read(long offset, int length) 方法中看到一次 Fetch 操作只会二分寻找 offset 所在的那个 LogSegment 并最多只读到该日志段的末尾,如果 offset + length 横跨多个 LogSegment 的情况下该怎么处理?
Hello Team,
Thanks,
Siddharth
对jafka的说明中有这样一条:高吞吐量:即使是低配制的硬件条件,单个Broker也能支持每秒数十万的消息吞吐。
请问高吞吐量的技术支撑是什么?nio吗?
Wiki page (https://github.com/adyliu/jafka/wiki)
Future might be written as 'Feature' ?
Update the zkclient to 2.0.
LogManager registers wrong partition number to zookeeper when it starts up in some cases. For example, a user enlarges the partition number ,such as 4 ,of a topic by the admin script. Then everything goes on well. But if the broker restarts, the partition number in zookeeper turns out to be 3. Problem occurs!
您好,
我想问下,最新版本是否支持分区备份及选 leader,貌似没在源码中看到。
问下,当一个node宕机后,存在此节点下的数据还会被消费吗?
The size of message must be checked at server side.
We need the function of topic creation.
Sending messages to the broker first time, we need to create the topic with given partition.
Though this operation can be finished in the broker configuration. But this need restarting.
This tools can enlarge the partition of exist topic also.
We need a producer interface not a implementation class if we want to do some work by myself.
This fetures of http client:
how change jafka zk root znode path?
Hello,
I am in process of evaluating the Apache Kafka/JafKa message broker, looking for simple message distribution and partition example. The goal is to distribute messages equally across all the brokers in a cluster.
Thank you,
Vish
调整jafka jvm heap值在在哪边设置呢?
默认情况下一条消息从Producer到broker是确保了At least once,可通过设置Producer异步发送实现At most once)
但是读jafka源码的时候发现实现的都是At most once
,想确认一下是不是这样的
add some ignore rules for IntelliJ IDEA.
还是以前开发碰到的,如果一个消息已经上线很久,比如积累了100w条记录,一个新的消费者刚刚上线,这时有两种需求,一种是从头收取所有的消息,逐一处理(支持的很棒),而还有种需求,是不处理老数据,直接从新上线时处理才收到的msg。 这个好像还是不支持把,或者得用很复杂的方式支持?
好久都没有更新了
Bind the port on given ip address for safety.
But now the jafka binds service on all ip addresses.
We need a script or help to build our package.
消费者消费topic 中的一些消息 为什么zookeeper 中的offsets 是空的啊
ls /consumers
[test_group1, test_group123, test_group, test_group2, test_group1235]
ls /consumers/test_group
[ids, owners, offsets]
ls /consumers/test_group/offsets
[test1234, ly1234567, test123456, lin123, test, demo123, ly123456, ssss, MJ]
ls /consumers/test_group/offsets/lin123
[1-0]
ls /consumers/test_group/offsets/lin123/1-0
[]
Exception(com.sohu.jafka.common.UnavaliableProducerException) is error and duplicates with 'com.sohu.jafka.common.UnavailableProducerException'.
Delete the file:
/jafka/src/main/java/com/sohu/jafka/common/UnavaliableProducerException.java
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.