Giter VIP home page Giter VIP logo

jlogstash's Introduction

注释:

  jlogstash前期的有部分代码引用了hangout项目里的代码,这里也感谢hangout的作者。

说明:

用java版本重写logstash目的是提升性能,跟ruby logstash 对比 请看 https://github.com/DTStack/jlogstash-performance-testing

不同的业务,性能会有所不同,dtstack 后台日志解析 java 版本是ruby版本的5倍,在单台4g 4cores 虚拟机上。

jlogstash 的参数配置和使用看wiki介绍,现在的插件跟ruby版本相比还太少,希望更多的人参与开发。

Inputs详情:

https://github.com/DTStack/jlogstash/tree/master/pipeline/inputs/README.md

Filters详情:

https://github.com/DTStack/jlogstash/tree/master/pipeline/filters/README.md

Outputs详情:

https://github.com/DTStack/jlogstash/tree/master/pipeline/outputs/README.md

性能采集:

目前支持将jlogstash运行时的性能数据采集到promethues.

详情:https://github.com/DTStack/jlogstash/tree/master/core/metrics.md

Jar放置目录(编译的jar必须要有版本号 ):

jlogstash 核心代码放在jlogstash/lib/下

插件的代码分别的放到jlogstash/plugin 下的filter,input,output目录下

启动命令:

sh jlogstash.sh -f /home/admin/ysq.yaml vv

启动参数:

-name:任务名称

-f:配置文件 yaml格式路径(必填)

-l:日志文件路径

-i:input queue size coefficient 默认 200f/1024

-w:filter work number 默认是根据的机器cpu核数+2

-o:output work number 默认是根据的机器cpu核数

-c:output queue size coefficient 默认 500f/1024

-dev: 开发模式,直接在pom.xml引用包即可。

v: error级别

vv: warn级别

vvv:info级别

vvvv:debug级别

vvvvv:trace级别

插件开发:

 1.现在各自的plugin 的包 都会有各自的classloder去加载,parent classloder是AppClassLoder,所以各自的plugin的代码即使引用了相同的第三的jar版   本不一样也不会导致版本冲突  

2.各个插件的代码不能相互引用,如果有公共代码需要打入到各自的jar包中

3.所需依赖到maven中心库 搜索 jlogstash(http://search.maven.org/https://oss.sonatype.org)

4.插件开发样列 https://github.com/DTStack/jlogstash/tree/master/src/test/java/com/dtstack/jlogstash

5.每一个plugin打的包名的前缀要跟插件的类名一致,不区分大小写,不然会报类找不到,列如:input.kafka-1.0.0.jar 或    kafka-1.0.0.jar

招聘:

1.大数据平台开发工程师,想了解岗位详细信息可以添加本人微信号ysqwhiletrue,注明招聘,如有意者发送简历至[email protected]

jlogstash's People

Contributors

demotto avatar dependabot[bot] avatar donglangdtstack avatar ggg1235 avatar gnuhpc avatar jiemotongxue avatar kira8565 avatar laoguanjie avatar lijiangbo avatar wzqiang1332 avatar yangsishu avatar zhaijp avatar zoudaokoulife avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

jlogstash's Issues

outputs为es的时候,丢数据问题

  • 前提是配置consistency参数为true
  • es监听中afterBulk(long arg0, BulkRequest arg1, Throwable arg2)方法:
public void afterBulk(long arg0, BulkRequest arg1, Throwable arg2) {
          logger.error("bulk got exception:", arg2);
                        
          for(ActionRequest request : arg1.requests()){
               addFailedMsg(request);
          }
                        
}

该方法中的addFailedMsg(request)传入的参数为ActionRequest,而 addFailedMsg(Object msg):

public void addFailedMsg(Object msg) {
        if (msg instanceof Map) {
            super.addFailedMsg(msg);
            return;
        }

        throw new IllegalArgumentException("addFailedMsg only accept Map instance");
    }

中实际处理时候做了msg是否为 Map 的判断,所以从这里进去的数据永远不会被添加到 failedMsgQueue,从而导致数据丢失

高可用实现了吗?,数据分布式增量读取

如果启动2个进程,配置的相同的配置文件,读取的相同文件一共1000 条数据,数据是怎么平均分配到这2个线程,每个500条。看了下源码,基本是多线程和队列都是进程内的,如何保证集群了读取的 数据不重复那?咨询下。谢谢.QQ:907519797, vx:17865621181

使用说明不太详细

大神,刚开始使用jlogstash。感觉使用说明不太详细,接入的时候,感觉很吃力。。弄一个jgrok格式,都弄老半天。

请问如何过滤数据

实现类似于下面的功能:

filter {
  if [loglevel] == "debug" {
    drop { }
  }
}

Remove 可以实现吗?如果可以,请帮忙写的例示。非常感谢!

对elasticsearch支持

请问是否支持elasticsearch7.0以上版本,如果es开启了安全验证,这边怎么配置呢?

连接kafka10报错 jlogstash start error:null,启动不了

使用kafka10的inputs插件时,报以下error,启动失败。
2019-06-24 17:11:04.218 [main] WARN com.dtstack.jlogstash.factory.InstanceFactory [InstanceFactory.java:64] - field: topics annotation:Required check
2019-06-24 17:11:04.221 [main] ERROR com.dtstack.jlogstash.JlogstashMain [JlogstashMain.java:53] - jlogstash start error:null
com.dtstack.jlogstash.annotation.plugin.RequiredPlugin.required(39)
com.dtstack.jlogstash.factory.InstanceFactory.checkAnnotation(139)
com.dtstack.jlogstash.factory.InstanceFactory.configInstance(65)
com.dtstack.jlogstash.factory.InputFactory.getInstance(44)
com.dtstack.jlogstash.factory.InputFactory.getBatchInstance(64)
com.dtstack.jlogstash.assembly.AssemblyPipeline.assemblyPipeline(96)
com.dtstack.jlogstash.JlogstashMain.main(51)

debug模式调试,定位代码com.dtstack.jlogstash.annotation.plugin.RequiredPlugin中required 为空指针。请看看是什么原因?
public void required(Field field,Object obj) throws Exception{
// TODO Auto-generated method stub
Required required = field.getAnnotation(Required.class);
if(required.required()&&obj==null){
throw new RequiredException(field.getName());
}

配置文件:
inputs:
- Kafka10:
codec: json
encoding: UTF8 # defaut UTF8
topics: "my-slf4j-topic-test"
groupId: "test"
bootstrapServers: 172.18.231.8:32129,172.18.231.8:32130
consumerSettings:
auto.commit.interval.ms: "1000"
outputs:
- File:
path: E:/dockerhub/jlogstash/srsyslog-performance-%{+YYYY.MM.dd}.txt

How to use??

大神你好!!
我抓了你的包後,在lib內所需要的jar是否要自己編寫嗎?

jdbc配置多表

你好,非常喜欢你的项目,想在你项目基础上进行一些开发,请问你项目目前支持单个yml配置文件配置多表吗?如果支持请问是怎么配置的,还有如何在配置文件里配置logstash那样的文档主键id????谢谢

jlogstash可以在k8s容器中运行吗?尝试过做成镜像在k8s上运行,遇到报Java heap space,但非容器正常。如果容器化部署,有相关的部署说明吗?谢谢

2019-07-02 13:44:10.619 [main] INFO org.elasticsearch.plugins.PluginsService [PluginsService.java:180] - loaded plugin [org.elasticsearch.transport.Netty4Plugin]
java.lang.OutOfMemoryError: Java heap space
Dumping heap to ../logs/heapdump.hprof ...
Heap dump file created [380426013 bytes in 2.713 secs]
Exception in thread "elasticsearch[client][scheduler][T#1]" java.lang.OutOfMemoryError: Java heap space
Exception in thread "elasticsearch[client][scheduler][T#1]" java.lang.IllegalMonitorStateException
2019-07-02 13:45:13.213 [elasticsearch[client][transport_client_boss][T#5]] WARN io.netty.util.ReferenceCountUtil [Netty4InternalESLogger.java:149] - Failed to release a message: PooledDirectByteBuf(freed)
java.lang.OutOfMemoryError: Java heap space
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.signal(AbstractQueuedSynchronizer.java:1939)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1103)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "elasticsearch[client][generic][T#1]"

老哥 kafka10的pom里面jlogstash是不是打包时也去掉

provided
不去掉的话 反射对象报错,
去掉了 启动的时候又找不到类了,看了一下 包里面是有这个类的
ERROR com.dtstack.jlogstash.inputs.Kafka10 [Kafka10.java:101] - kafka emit error
org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.StringDeserializer for configuration value.deserializer: Class org.apache.kafka.common.serialization.StringDeserializer could not be found.

闲置的时候CPU占用率在5%~8%

JLogStash使用文件输入、Netty输出的时候,在日志量很少或者没有的情况下,CPU依然会占用5-8%,管理器上面看到待唤醒线程数有2000+

使用说明

能否更新使用说明呢,最好有示例,我无法使用大部分的input插件采集到数据,总是提示这样或者那样的问题

数据读取完成后不会自动退出虚拟机

看代码中使用的是阻塞队列,Input数据读取完后,OutputThread还被阻塞在,没办法自动退出虚拟机。

OutputThread中部分代码: 
if(!priorityFail()){
event = this.outputQueue.take();
if (event != null) {
for (BaseOutput bo : outputProcessors) {
bo.process(event);
}
}
}

反射input插件 kafka10时,报错

在InputFactor.java中46行,导致类型转换失败,不知道大神遇到过没,怎么解决
jlogstash start error:com.dtstack.jlogstash.inputs.Kafka10 cannot be cast to com.dtstack.jlogstash.inputs.BaseInput
com.dtstack.jlogstash.factory.InputFactory.getInstance(46)
com.dtstack.jlogstash.factory.InputFactory.getBatchInstance(64)
com.dtstack.jlogstash.assembly.AssemblyPipeline.assemblyPipeline(103)
com.dtstack.jlogstash.JlogstashMain.main(51)

filters config example

看了說明還是不太明白怎麼去設置filters
可以請大神幫忙一下嗎??

已下是我用官方的logstsh 設置的設定檔
input {
beats {
port => 5044
client_inactivity_timeout => 300
}
}

filter{

if "beats_input_codec_plain_applied" in [tags] {
mutate {
remove_tag => ["beats_input_codec_plain_applied"]
}
}
if "_geoip_lookup_failure" in [tags] {
drop { }
}

if "_grokparsefailure" in [tags] {
drop { }
}
if [xclientip] == "-" {
mutate{
replace => { "xclientip" => "0.0.0.0" }
}
}
if [type] in [ "aa", "bb", "cc" ] {

grok{
patterns_dir => ["/usr/local/logstash/patterns"]
match => [ "message", "%{COMBINEDAPACHELOG2}" ]
}
geoip{
source => "xclientip"
target => "geoip"
database => "/usr/local/logstash/GeoIP/GeoLite2-City.mmdb"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate{
convert => [ "[geoip][coordinates]", "float", "bytes", "integer", "elapsedmillis", "integer" ]
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" , "ISO8601" ]
target => "@timestamp"

}

}
}

output {
if [type] in [ "aa", "bb", "cc" ] {
elasticsearch {
hosts =>[ "192.168.1.111:9200","192.168.1.110:9200" ]
index => "%{type}-%{[@metadata][beat]}-%{+YYYY.MM.dd}"

flush_size => 50000

timeout => "60"
document_type => "%{[@metadata][type]}"
template => "/usr/local/logstash/filebeat-index-template.json"
template_overwrite => true
user => 'elastic'
password => 'changeme'

}
}
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.