Giter VIP home page Giter VIP logo

loggie's People

Contributors

cnxiafc avatar dependabot[bot] avatar dongjiang1989 avatar ethfoo avatar hansedong avatar iamyeka avatar jinglanghe avatar kytool avatar leizhang-hunter avatar linshiyx avatar listencx avatar loggie-robot avatar lyp256 avatar machine3 avatar max-cheng avatar mercurius-lei avatar michael754267513 avatar mmaxiaolei avatar oliverwoo avatar owynwang avatar shaoyun avatar snowsi avatar sunbin0530 avatar testwill avatar wchy1001 avatar yenchangchan avatar yuzhichang avatar zanderme avatar zhu733756 avatar ziyu-zhao avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

loggie's Issues

读取配置热加载需求场景

1.loggie客户端原生支持读取静态文件配置实现source,interceptor,sink配置,实现日志采集输入,处理,日志输出
2.能否实现loggie支持定时拉取远处服务端有关loggie source,interceptor,sink配置数据,实现动态加载配置的功能,满足企业日志服务
平台能支持界面配置loggie配置实现loggie配置热加载的能力

interceptors json_decode配置和源码对应不上

文档:
jsonDecode.target string 非必填 body json decode的目标字段
代码:


type Config struct {
	BodyKey    string   `yaml:"bodyKey,omitempty"`
	DropFields []string `yaml:"drop,omitempty"`
}

实际并没有实现target

k8s stdout 日志 source多行合并

pod 的stdout日志是json格式,例如:{"log":"01-Dec-2021 03:13:58.298 INFO [main] Starting service","stream":"stdout","time":"2021-06-02T03:20:54.260571439Z"}。如果是java的堆栈报错日子,需要根据正则多行合并成一条日志,有什么方法吗?

Kafka source disable auto-commit will occur re-consume

Minimal reproduce step

1.Using Kafka to create a topic
2.Create a loggie to consume this topic by disable auto-commit
3.Create a another one loggie using same config.

Suggest

using franz-go to consume Kafka consumerGroup. When rebalance happen, loggie can using kgo.OnPartitionsRevoked to handle event of new client join/lost the group.

For example, using CommitUncommittedOffsets to commit whole buffer offset to Kafka.In this case, we won't consume twice and committed offset when sink ack

logconfig elasticsearch报错

{"level":"warn","time":"2022-03-21T17:26:04Z","caller":"/go/src/loggie.io/loggie/pkg/interceptor/retry/interceptor.go:175","message":"interceptor/retry retry buffer size(2) too large"}
{"level":"error","time":"2022-03-21T17:26:04Z","caller":"/go/src/loggie.io/loggie/pkg/pipeline/pipeline.go:267","message":"consumer batch fail,err: elasticsearch client not initialized yet"}

es版本7x

能否添加常用数据作为内置变量

日志文件名通常与日期,时间有关,如果只采集固定文件名,无法采集全。某些日志不规范,只有时间,没有日期值,需要在采集时补全。
日志中经常使用的数据,例如当天日期,当前主机hostname,当前时间的小时,是否能内嵌进采集端,可做采取时计算参数?

fatal error: concurrent map iteration and map write

What version of Loggie?

docker image built by main branch

Expected Behavior

not panic

Actual Behavior

fatal error: concurrent map iteration and map write

goroutine 168 [running]:
runtime.throw(0x21ac606, 0x26)
	/usr/local/go/src/runtime/panic.go:1117 +0x72 fp=0xc000569578 sp=0xc000569548 pc=0x440b52
runtime.mapiternext(0xc00094d3e0)
	/usr/local/go/src/runtime/map.go:858 +0x54c fp=0xc0005695f8 sp=0xc000569578 pc=0x4196cc
runtime.mapiterinit(0x1e752c0, 0xc0005f2450, 0xc00094d3e0)
	/usr/local/go/src/runtime/map.go:848 +0x1c5 fp=0xc000569618 sp=0xc0005695f8 pc=0x419085
reflect.mapiterinit(0x1e752c0, 0xc0005f2450, 0x8)
	/usr/local/go/src/runtime/map.go:1340 +0x54 fp=0xc000569648 sp=0xc000569618 pc=0x4741d4
github.com/modern-go/reflect2.(*UnsafeMapType).UnsafeIterate(...)
	/go/src/loggie.io/loggie/vendor/github.com/modern-go/reflect2/go_below_118.go:17
github.com/json-iterator/go.(*mapEncoder).Encode(0xc00065aa80, 0xc0005b95d0, 0xc000b14000)
	/go/src/loggie.io/loggie/vendor/github.com/json-iterator/go/reflect_map.go:257 +0x1c5 fp=0xc000569700 sp=0xc000569648 pc=0x94fca5
github.com/json-iterator/go.(*onePtrEncoder).Encode(0xc000848990, 0xc0005f2450, 0xc000b14000)
	/go/src/loggie.io/loggie/vendor/github.com/json-iterator/go/reflect.go:219 +0x68 fp=0xc000569728 sp=0xc000569700 pc=0x946d48
github.com/json-iterator/go.(*Stream).WriteVal(0xc000b14000, 0x1e752c0, 0xc0005f2450)
	/go/src/loggie.io/loggie/vendor/github.com/json-iterator/go/reflect.go:98 +0x150 fp=0xc000569790 sp=0xc000569728 pc=0x945b90
github.com/json-iterator/go.(*dynamicEncoder).Encode(0xc000848970, 0xc000dfb168, 0xc000b14000)
	/go/src/loggie.io/loggie/vendor/github.com/json-iterator/go/reflect_dynamic.go:15 +0x65 fp=0xc0005697c0 sp=0xc000569790 pc=0x948dc5
github.com/json-iterator/go.(*mapEncoder).Encode(0xc00065aa80, 0xc0005b95c8, 0xc000b14000)
	/go/src/loggie.io/loggie/vendor/github.com/json-iterator/go/reflect_map.go:269 +0x371 fp=0xc000569878 sp=0xc0005697c0 pc=0x94fe51
github.com/json-iterator/go.(*onePtrEncoder).Encode(0xc000848990, 0xc000448000, 0xc000b14000)
	/go/src/loggie.io/loggie/vendor/github.com/json-iterator/go/reflect.go:219 +0x68 fp=0xc0005698a0 sp=0xc000569878 pc=0x946d48
github.com/json-iterator/go.(*Stream).WriteVal(0xc000b14000, 0x1e752c0, 0xc000448000)
	/go/src/loggie.io/loggie/vendor/github.com/json-iterator/go/reflect.go:98 +0x150 fp=0xc000569908 sp=0xc0005698a0 pc=0x945b90
github.com/json-iterator/go.(*frozenConfig).Marshal(0xc00047b040, 0x1e752c0, 0xc000448000, 0x0, 0x0, 0x0, 0x0, 0x0)
	/go/src/loggie.io/loggie/vendor/github.com/json-iterator/go/config.go:299 +0xaf fp=0xc000569978 sp=0xc000569908 pc=0x93ad4f
github.com/loggie-io/loggie/pkg/sink/codec/json.(*Json).Encode(0xc0003a6168, 0x24caaa0, 0xc000446300, 0xc000db7650, 0x17, 0x0, 0x0, 0x0)
	/go/src/loggie.io/loggie/pkg/sink/codec/json/json.go:79 +0x15d fp=0xc0005699f8 sp=0xc000569978 pc=0x175cc9d
github.com/loggie-io/loggie/pkg/sink/kafka.(*Sink).Consume(0xc0008c03c0, 0x24afe40, 0xc000d04180, 0x33196e0, 0x0)
	/go/src/loggie.io/loggie/pkg/sink/kafka/sink.go:134 +0x169 fp=0xc000569b88 sp=0xc0005699f8 pc=0x1a9e7c9
github.com/loggie-io/loggie/pkg/core/sink.(*SubscribeInvoker).Invoke(0x3317968, 0x24afe40, 0xc000d04180, 0x7eff4c045e10, 0xc0008c03c0, 0x203000, 0x0)
	/go/src/loggie.io/loggie/pkg/core/sink/invoke.go:42 +0x45 fp=0xc000569bc0 sp=0xc000569b88 pc=0x7f8a45
github.com/loggie-io/loggie/pkg/interceptor/retry.(*Interceptor).Intercept(0xc0003d7b20, 0x247d580, 0x3317968, 0x24afe40, 0xc000d04180, 0x7eff4c045e10, 0xc0008c03c0, 0xaa, 0x0)
	/go/src/loggie.io/loggie/pkg/interceptor/retry/interceptor.go:132 +0xea fp=0xc000569cb8 sp=0xc000569bc0 pc=0x164024a
github.com/loggie-io/loggie/pkg/pipeline.buildSinkInvokerChain.func1(0x24afe40, 0xc000d04180, 0x7eff4c045e10, 0xc0008c03c0, 0x1, 0x0)
	/go/src/loggie.io/loggie/pkg/pipeline/pipeline.go:508 +0x69 fp=0xc000569d10 sp=0xc000569cb8 pc=0x808ac9
github.com/loggie-io/loggie/pkg/core/sink.(*AbstractInvoker).Invoke(0xc0003a6210, 0x24afe40, 0xc000d04180, 0x7eff4c045e10, 0xc0008c03c0, 0x2, 0x1010323a280)
	/go/src/loggie.io/loggie/pkg/core/sink/invoke.go:35 +0x51 fp=0xc000569d50 sp=0xc000569d10 pc=0x7f89d1
github.com/loggie-io/loggie/pkg/interceptor/metric.(*Interceptor).Intercept(0xc0004483f0, 0x247d560, 0xc0003a6210, 0x24afe40, 0xc000d04180, 0x7eff4c045e10, 0xc0008c03c0, 0x0, 0xc00020f6c0)
	/go/src/loggie.io/loggie/pkg/interceptor/metric/interceptor.go:79 +0x63 fp=0xc000569da8 sp=0xc000569d50 pc=0x163f163
github.com/loggie-io/loggie/pkg/pipeline.buildSinkInvokerChain.func1(0x24afe40, 0xc000d04180, 0x7eff4c045e10, 0xc0008c03c0, 0xc000583588, 0xc00070eeec)
	/go/src/loggie.io/loggie/pkg/pipeline/pipeline.go:508 +0x69 fp=0xc000569e00 sp=0xc000569da8 pc=0x808ac9
github.com/loggie-io/loggie/pkg/core/sink.(*AbstractInvoker).Invoke(0xc0003a6228, 0x24afe40, 0xc000d04180, 0x7eff4c045e10, 0xc0008c03c0, 0x2, 0x2)
	/go/src/loggie.io/loggie/pkg/core/sink/invoke.go:35 +0x51 fp=0xc000569e40 sp=0xc000569e00 pc=0x7f89d1
github.com/loggie-io/loggie/pkg/pipeline.(*Pipeline).startSinkConsumer.func1(0x24afe40, 0xc000d04180, 0x0, 0x0)
	/go/src/loggie.io/loggie/pkg/pipeline/pipeline.go:444 +0x55 fp=0xc000569e88 sp=0xc000569e40 pc=0x808895
github.com/loggie-io/loggie/pkg/pipeline.(*Pipeline).sinkInvokeLoop(0xc00039b400, 0x0, 0x7eff4c045e10, 0xc0008c03c0, 0x7eff4c045c48, 0xc0004dea10, 0xc0003f2680, 0x2, 0x2, 0xc00018c1e0)
	/go/src/loggie.io/loggie/pkg/pipeline/pipeline.go:480 +0x1f8 fp=0xc000569f90 sp=0xc000569e88 pc=0x805898
runtime.goexit()
	/usr/local/go/src/runtime/asm_amd64.s:1371 +0x1 fp=0xc000569f98 sp=0xc000569f90 pc=0x47a961
created by github.com/loggie-io/loggie/pkg/pipeline.(*Pipeline).startSinkConsumer
	/go/src/loggie.io/loggie/pkg/pipeline/pipeline.go:460 +0x666

more panic appears just like the one above:

fatal error: concurrent map read and map write

goroutine 61 [running]:
runtime.throw(0x21a3b9c, 0x21)
	/usr/local/go/src/runtime/panic.go:1117 +0x72 fp=0xc0008b9738 sp=0xc0008b9708 pc=0x440b52
runtime.mapaccess2_faststr(0x1e752c0, 0xc0005784b0, 0xc000674b69, 0x5, 0xc00101fb88, 0xc0008b9801)
	/usr/local/go/src/runtime/map_faststr.go:116 +0x4a5 fp=0xc0008b97a8 sp=0xc0008b9738 pc=0x41ce05
github.com/loggie-io/loggie/pkg/util/runtime.(*Object).Get(0xc001020010, 0xc000674b69, 0x5, 0xc001020010)
	/go/src/loggie.io/loggie/pkg/util/runtime/object.go:47 +0x7f fp=0xc0008b97f8 sp=0xc0008b97a8 pc=0x1749adf
github.com/loggie-io/loggie/pkg/util/runtime.(*Object).GetPaths(0xc0008b99d8, 0xc000c581a0, 0x2, 0x2, 0x0)
	/go/src/loggie.io/loggie/pkg/util/runtime/object.go:64 +0x6f fp=0xc0008b9838 sp=0xc0008b97f8 pc=0x1749c8f
github.com/loggie-io/loggie/pkg/util/runtime.(*Object).GetPath(0xc0008b99d8, 0xc000674b62, 0xc, 0xc0008b98d0)
	/go/src/loggie.io/loggie/pkg/util/runtime/object.go:58 +0x91 fp=0xc0008b9890 sp=0xc0008b9838 pc=0x1749bf1
github.com/loggie-io/loggie/pkg/util/runtime.getNew(0xc0008b99d8, 0xc000674b62, 0xc, 0x2130b00, 0xc000800000, 0xc0008b9968, 0x93ae4f)
	/go/src/loggie.io/loggie/pkg/util/runtime/select.go:80 +0x56 fp=0xc0008b98d0 sp=0xc0008b9890 pc=0x174b1b6
github.com/loggie-io/loggie/pkg/util/runtime.PatternFormat(0xc0008b99d8, 0xc000674b60, 0xf, 0xc00040e4b0, 0x1, 0xa, 0xc000a31500, 0x17, 0x0, 0x0)
	/go/src/loggie.io/loggie/pkg/util/runtime/select.go:57 +0xfe fp=0xc0008b9988 sp=0xc0008b98d0 pc=0x174ad3e
github.com/loggie-io/loggie/pkg/sink/kafka.(*Sink).selectTopic(0xc000676c80, 0x24caaa0, 0xc0006cc6f0, 0xc001578f00, 0x2fb, 0x2fb, 0x0)
	/go/src/loggie.io/loggie/pkg/sink/kafka/sink.go:159 +0x99 fp=0xc0008b99f8 sp=0xc0008b9988 pc=0x1a9ee99
github.com/loggie-io/loggie/pkg/sink/kafka.(*Sink).Consume(0xc000676c80, 0x24afe40, 0xc000d6a000, 0x33196e0, 0x0)
	/go/src/loggie.io/loggie/pkg/sink/kafka/sink.go:128 +0x106 fp=0xc0008b9b88 sp=0xc0008b99f8 pc=0x1a9e766
github.com/loggie-io/loggie/pkg/core/sink.(*SubscribeInvoker).Invoke(0x3317968, 0x24afe40, 0xc000d6a000, 0x7f0904fb1750, 0xc000676c80, 0x203000, 0x0)
	/go/src/loggie.io/loggie/pkg/core/sink/invoke.go:42 +0x45 fp=0xc0008b9bc0 sp=0xc0008b9b88 pc=0x7f8a45
github.com/loggie-io/loggie/pkg/interceptor/retry.(*Interceptor).Intercept(0xc0001f36c0, 0x247d580, 0x3317968, 0x24afe40, 0xc000d6a000, 0x7f0904fb1750, 0xc000676c80, 0xaa, 0x0)
	/go/src/loggie.io/loggie/pkg/interceptor/retry/interceptor.go:132 +0xea fp=0xc0008b9cb8 sp=0xc0008b9bc0 pc=0x164024a
github.com/loggie-io/loggie/pkg/pipeline.buildSinkInvokerChain.func1(0x24afe40, 0xc000d6a000, 0x7f0904fb1750, 0xc000676c80, 0x1, 0x0)
	/go/src/loggie.io/loggie/pkg/pipeline/pipeline.go:508 +0x69 fp=0xc0008b9d10 sp=0xc0008b9cb8 pc=0x808ac9
github.com/loggie-io/loggie/pkg/core/sink.(*AbstractInvoker).Invoke(0xc0002141a0, 0x24afe40, 0xc000d6a000, 0x7f0904fb1750, 0xc000676c80, 0x2, 0x1010323a280)
	/go/src/loggie.io/loggie/pkg/core/sink/invoke.go:35 +0x51 fp=0xc0008b9d50 sp=0xc0008b9d10 pc=0x7f89d1
github.com/loggie-io/loggie/pkg/interceptor/metric.(*Interceptor).Intercept(0xc0006d73e0, 0x247d560, 0xc0002141a0, 0x24afe40, 0xc000d6a000, 0x7f0904fb1750, 0xc000676c80, 0x0, 0xc0004c41c0)
	/go/src/loggie.io/loggie/pkg/interceptor/metric/interceptor.go:79 +0x63 fp=0xc0008b9da8 sp=0xc0008b9d50 pc=0x163f163
github.com/loggie-io/loggie/pkg/pipeline.buildSinkInvokerChain.func1(0x24afe40, 0xc000d6a000, 0x7f0904fb1750, 0xc000676c80, 0xc000682c88, 0xc00076beec)
	/go/src/loggie.io/loggie/pkg/pipeline/pipeline.go:508 +0x69 fp=0xc0008b9e00 sp=0xc0008b9da8 pc=0x808ac9
github.com/loggie-io/loggie/pkg/core/sink.(*AbstractInvoker).Invoke(0xc0002141b8, 0x24afe40, 0xc000d6a000, 0x7f0904fb1750, 0xc000676c80, 0x2, 0x2)
	/go/src/loggie.io/loggie/pkg/core/sink/invoke.go:35 +0x51 fp=0xc0008b9e40 sp=0xc0008b9e00 pc=0x7f89d1
github.com/loggie-io/loggie/pkg/pipeline.(*Pipeline).startSinkConsumer.func1(0x24afe40, 0xc000d6a000, 0x0, 0x0)
	/go/src/loggie.io/loggie/pkg/pipeline/pipeline.go:444 +0x55 fp=0xc0008b9e88 sp=0xc0008b9e40 pc=0x808895
github.com/loggie-io/loggie/pkg/pipeline.(*Pipeline).sinkInvokeLoop(0xc00031f540, 0x0, 0x7f0904fb1750, 0xc000676c80, 0x7f0904fb1588, 0xc0002ffe30, 0xc000868500, 0x2, 0x2, 0xc000716140)
	/go/src/loggie.io/loggie/pkg/pipeline/pipeline.go:480 +0x1f8 fp=0xc0008b9f90 sp=0xc0008b9e88 pc=0x805898
runtime.goexit()
	/usr/local/go/src/runtime/asm_amd64.s:1371 +0x1 fp=0xc0008b9f98 sp=0xc0008b9f90 pc=0x47a961
created by github.com/loggie-io/loggie/pkg/pipeline.(*Pipeline).startSinkConsumer
	/go/src/loggie.io/loggie/pkg/pipeline/pipeline.go:460 +0x666

goroutine 1 [chan receive]:
main.main()
	/go/src/loggie.io/loggie/cmd/loggie/main.go:130 +0x7e7

goroutine 6 [chan receive]:
k8s.io/klog/v2.(*loggingT).flushDaemon(0x32e5d20)
	/go/src/loggie.io/loggie/vendor/k8s.io/klog/v2/klog.go:1169 +0x8b
created by k8s.io/klog/v2.init.0
	/go/src/loggie.io/loggie/vendor/k8s.io/klog/v2/klog.go:420 +0xdf

goroutine 131 [chan receive]:

Steps to Reproduce the Problem

  1. create a normal logconfig collecting stdout log from pods
apiVersion: v1
items:
- apiVersion: loggie.io/v1beta1
  kind: LogConfig
  metadata:
    name: horizon
    namespace: horizon
  spec:
    pipeline:
      sources: |
        - type: file
          name: mylog
          fields:
            logType: horizon
            topic: filebeat-perftest-temp1
          paths:
          - stdout
    selector:
      labelSelector:
        cloudnative.music.netease.com/cluster: horizon-backend-core
      type: pod
kind: List
metadata:
  resourceVersion: ""
  selfLink: ""
  1. problem happens when a normal restart of the pods

loggie在golang 1.18 json库解析panic

操作系统:

ubuntu 20.04

go版本:

1.18

panic现场:

fatal error: fault
[signal SIGSEGV: segmentation violation code=0x80 addr=0x0 pc=0x477a5f]

goroutine 49 [running]:
runtime.throw({0x2042115?, 0x0?})
        /usr/local/go/src/runtime/panic.go:992 +0x71 fp=0xc00068b748 sp=0xc00068b718 pc=0x447851
runtime.sigpanic()
        /usr/local/go/src/runtime/signal_unix.go:825 +0x305 fp=0xc00068b798 sp=0xc00068b748 pc=0x45daa5
aeshashbody()
        /usr/local/go/src/runtime/asm_amd64.s:1343 +0x39f fp=0xc00068b7a0 sp=0xc00068b798 pc=0x477a5f
runtime.mapiternext(0xc0004be6c0)
        /usr/local/go/src/runtime/map.go:934 +0x2cb fp=0xc00068b810 sp=0xc00068b7a0 pc=0x4216ab
runtime.mapiterinit(0xc000079400?, 0xc0005aa000?, 0x0?)
        /usr/local/go/src/runtime/map.go:861 +0x228 fp=0xc00068b830 sp=0xc00068b810 pc=0x421388
reflect.mapiterinit(0x420c9d?, 0x1d0f040?, 0xc0005aa008?)
        /usr/local/go/src/runtime/map.go:1373 +0x19 fp=0xc00068b858 sp=0xc00068b830 pc=0x4741f9
github.com/modern-go/reflect2.(*UnsafeMapType).UnsafeIterate(...)
        /home/zhanglei/data/loggie-main/vendor/github.com/modern-go/reflect2/unsafe_map.go:112
github.com/json-iterator/go.(*sortKeysMapEncoder).Encode(0xc0007fe240, 0xc000788e40, 0xc0005ee4e0)
        /home/zhanglei/data/loggie-main/vendor/github.com/json-iterator/go/reflect_map.go:291 +0x225 fp=0xc00068b9c8 sp=0xc00068b858 pc=0x8e19e5
github.com/json-iterator/go.(*onePtrEncoder).Encode(0xc0007fc190, 0xc0005ac660, 0xc0004be6c0?)
        /home/zhanglei/data/loggie-main/vendor/github.com/json-iterator/go/reflect.go:219 +0x82 fp=0xc00068ba00 sp=0xc00068b9c8 pc=0x8d9c02
github.com/json-iterator/go.(*Stream).WriteVal(0xc0005ee4e0, {0x1d405a0, 0xc0005ac660})
        /home/zhanglei/data/loggie-main/vendor/github.com/json-iterator/go/reflect.go:98 +0x158 fp=0xc00068ba70 sp=0xc00068ba00 pc=0x8d8f18
github.com/json-iterator/go.(*frozenConfig).Marshal(0xc000388c80, {0x1d405a0, 0xc0005ac660})
        /home/zhanglei/data/loggie-main/vendor/github.com/json-iterator/go/config.go:299 +0xc9 fp=0xc00068bb08 sp=0xc00068ba70 pc=0x8d0049
github.com/loggie-io/loggie/pkg/sink/codec/json.(*Json).Encode(0xc000114608, {0x23b6b68, 0xc000587320})
        /home/zhanglei/data/loggie-main/pkg/sink/codec/json/json.go:80 +0x152 fp=0xc00068bb60 sp=0xc00068bb08 pc=0x1472ed2
github.com/loggie-io/loggie/pkg/sink/dev.(*Sink).Consume(0xc0005878c0, {0x23a8948?, 0xc0004be680?})
        /home/zhanglei/data/loggie-main/pkg/sink/dev/sink.go:93 +0xbd fp=0xc00068bbd8 sp=0xc00068bb60 pc=0x147363d
github.com/loggie-io/loggie/pkg/core/sink.(*SubscribeInvoker).Invoke(0x43?, {{0x23a8948?, 0xc0004be680?}, {0x7faec044c0b8?, 0xc0005878c0?}})
        /home/zhanglei/data/loggie-main/pkg/core/sink/invoke.go:42 +0x31 fp=0xc00068bc00 sp=0xc00068bbd8 pc=0x635631
github.com/loggie-io/loggie/pkg/interceptor/metric.(*Interceptor).Intercept(0x7faec0de4580?, {0x23996c0?, 0x33fc378?}, {{0x23a8948?, 0xc0004be680?}, {0x7faec044c0b8?, 0xc0005878c0?}})
        /home/zhanglei/data/loggie-main/pkg/interceptor/metric/interceptor.go:76 +0x63 fp=0xc00068bc60 sp=0xc00068bc00 pc=0x1398ea3
github.com/loggie-io/loggie/pkg/pipeline.buildSinkInvokerChain.func1({{0x23a8948?, 0xc0004be680?}, {0x7faec044c0b8?, 0xc0005878c0?}})
        /home/zhanglei/data/loggie-main/pkg/pipeline/pipeline.go:464 +0x54 fp=0xc00068bca8 sp=0xc00068bc60 pc=0x644514
github.com/loggie-io/loggie/pkg/core/sink.(*AbstractInvoker).Invoke(0x20290c0?, {{0x23a8948?, 0xc0004be680?}, {0x7faec044c0b8?, 0xc0005878c0?}})
        /home/zhanglei/data/loggie-main/pkg/core/sink/invoke.go:35 +0x3d fp=0xc00068bcd8 sp=0xc00068bca8 pc=0x63559d
github.com/loggie-io/loggie/pkg/interceptor/retry.(*Interceptor).Intercept(0xc0001ae150, {0x23996a0, 0xc000114698}, {{0x23a8948?, 0xc0004be680?}, {0x7faec044c0b8?, 0xc0005878c0?}})
        /home/zhanglei/data/loggie-main/pkg/interceptor/retry/interceptor.go:129 +0x15e fp=0xc00068bdd8 sp=0xc00068bcd8 pc=0x1399e9e
github.com/loggie-io/loggie/pkg/pipeline.buildSinkInvokerChain.func1({{0x23a8948?, 0xc0004be680?}, {0x7faec044c0b8?, 0xc0005878c0?}})
        /home/zhanglei/data/loggie-main/pkg/pipeline/pipeline.go:464 +0x54 fp=0xc00068be20 sp=0xc00068bdd8 pc=0x644514
github.com/loggie-io/loggie/pkg/core/sink.(*AbstractInvoker).Invoke(0x0?, {{0x23a8948?, 0xc0004be680?}, {0x7faec044c0b8?, 0xc0005878c0?}})
        /home/zhanglei/data/loggie-main/pkg/core/sink/invoke.go:35 +0x3d fp=0xc00068be50 sp=0xc00068be20 pc=0x63559d
github.com/loggie-io/loggie/pkg/pipeline.(*Pipeline).startSinkConsumer.func1({0x23a8948?, 0xc0004be680?})
        /home/zhanglei/data/loggie-main/pkg/pipeline/pipeline.go:400 +0x3e fp=0xc00068be88 sp=0xc00068be50 pc=0x643a1e
github.com/loggie-io/loggie/pkg/pipeline.(*Pipeline).sinkInvokeLoop(0xc0004527e0, 0x0, {{0x7faec044c0b8, 0xc0005878c0}, {0x7faec044bf48, 0xc0001d2380}, {0xc0004e5980, 0x2, 0x2}}, 0xc0002ad220)
        /home/zhanglei/data/loggie-main/pkg/pipeline/pipeline.go:436 +0x1ea fp=0xc00068bf80 sp=0xc00068be88 pc=0x643c4a
github.com/loggie-io/loggie/pkg/pipeline.(*Pipeline).startSinkConsumer.func3()
        /home/zhanglei/data/loggie-main/pkg/pipeline/pipeline.go:416 +0x4e fp=0xc00068bfe0 sp=0xc00068bf80 pc=0x64392e
runtime.goexit()
        /usr/local/go/src/runtime/asm_amd64.s:1571 +0x1 fp=0xc00068bfe8 sp=0xc00068bfe0 pc=0x47a661
created by github.com/loggie-io/loggie/pkg/pipeline.(*Pipeline).startSinkConsumer
        /home/zhanglei/data/loggie-main/pkg/pipeline/pipeline.go:416 +0x5bd

goroutine 1 [chan receive]:
main.main()
        /home/zhanglei/data/loggie-main/cmd/loggie/main.go:126 +0x99b

goroutine 6 [chan receive]:
k8s.io/klog/v2.(*loggingT).flushDaemon(0x0?)
        /home/zhanglei/data/loggie-main/vendor/k8s.io/klog/v2/klog.go:1169 +0x6a
created by k8s.io/klog/v2.init.0
        /home/zhanglei/data/loggie-main/vendor/k8s.io/klog/v2/klog.go:420 +0xf6

goroutine 25 [chan receive]:
github.com/panjf2000/ants/v2.(*Pool).purgePeriodically(0xc0001f8d90)
        /home/zhanglei/data/loggie-main/vendor/github.com/panjf2000/ants/v2/pool.go:69 +0x8b
created by github.com/panjf2000/ants/v2.NewPool
        /home/zhanglei/data/loggie-main/vendor/github.com/panjf2000/ants/v2/pool.go:137 +0x34a

goroutine 9 [syscall]:
os/signal.signal_recv()
        /usr/local/go/src/runtime/sigqueue.go:151 +0x2f
os/signal.loop()
        /usr/local/go/src/os/signal/signal_unix.go:23 +0x19
created by os/signal.Notify.func1.1
        /usr/local/go/src/os/signal/signal.go:151 +0x2a

goroutine 32 [chan receive]:
github.com/loggie-io/loggie/pkg/core/signals.SetupSignalHandler.func1()
        /home/zhanglei/data/loggie-main/pkg/core/signals/signal.go:39 +0x2d
created by github.com/loggie-io/loggie/pkg/core/signals.SetupSignalHandler
        /home/zhanglei/data/loggie-main/pkg/core/signals/signal.go:38 +0xbc

goroutine 33 [select]:
github.com/loggie-io/loggie/pkg/eventbus/export/logger.(*logger).run(0xc000300000, {0x1, 0x6fc23ac00, 0x0, 0x0, {{0x0, 0x0}, 0x0, 0x0, 0x0, ...}})
        /home/zhanglei/data/loggie-main/pkg/eventbus/export/logger/logger.go:107 +0x19c
created by github.com/loggie-io/loggie/pkg/eventbus/export/logger.Run
        /home/zhanglei/data/loggie-main/pkg/eventbus/export/logger/logger.go:94 +0xff

goroutine 34 [select]:
github.com/loggie-io/loggie/pkg/eventbus/listener/pipeline.(*Listener).run(0xc00036be00)
        /home/zhanglei/data/loggie-main/pkg/eventbus/listener/pipeline/listener.go:84 +0x114
created by github.com/loggie-io/loggie/pkg/eventbus/listener/pipeline.(*Listener).Start
        /home/zhanglei/data/loggie-main/pkg/eventbus/listener/pipeline/listener.go:65 +0x56

goroutine 35 [select]:
github.com/loggie-io/loggie/pkg/eventbus/listener/filesource.(*Listener).run(0xc0004e48a0)
        /home/zhanglei/data/loggie-main/pkg/eventbus/listener/filesource/listener.go:189 +0x13d
created by github.com/loggie-io/loggie/pkg/eventbus/listener/filesource.(*Listener).Start
        /home/zhanglei/data/loggie-main/pkg/eventbus/listener/filesource/listener.go:85 +0x56

goroutine 36 [select]:
github.com/loggie-io/loggie/pkg/eventbus/listener/filewatcher.(*Listener).run(0xc0004e48e0)
        /home/zhanglei/data/loggie-main/pkg/eventbus/listener/filewatcher/listener.go:198 +0x13a
created by github.com/loggie-io/loggie/pkg/eventbus/listener/filewatcher.(*Listener).Start
        /home/zhanglei/data/loggie-main/pkg/eventbus/listener/filewatcher/listener.go:88 +0x56

goroutine 37 [select]:
github.com/loggie-io/loggie/pkg/eventbus/listener/reload.(*Listener).export(0xc0002a3968)
        /home/zhanglei/data/loggie-main/pkg/eventbus/listener/reload/listener.go:93 +0xf4
created by github.com/loggie-io/loggie/pkg/eventbus/listener/reload.(*Listener).Start
        /home/zhanglei/data/loggie-main/pkg/eventbus/listener/reload/listener.go:69 +0x56

goroutine 38 [select]:
github.com/loggie-io/loggie/pkg/eventbus/listener/sink.(*Listener).run(0xc0004e4920)
        /home/zhanglei/data/loggie-main/pkg/eventbus/listener/sink/listener.go:99 +0x126
created by github.com/loggie-io/loggie/pkg/eventbus/listener/sink.(*Listener).Start
        /home/zhanglei/data/loggie-main/pkg/eventbus/listener/sink/listener.go:76 +0x56

goroutine 39 [select]:
github.com/loggie-io/loggie/pkg/eventbus/listener/queue.(*Listener).run(0xc0004e4940)
        /home/zhanglei/data/loggie-main/pkg/eventbus/listener/queue/listener.go:100 +0x126
created by github.com/loggie-io/loggie/pkg/eventbus/listener/queue.(*Listener).Start
        /home/zhanglei/data/loggie-main/pkg/eventbus/listener/queue/listener.go:77 +0x56

goroutine 40 [select]:
github.com/loggie-io/loggie/pkg/eventbus.(*EventCenter).run(0xc0002e0600)
        /home/zhanglei/data/loggie-main/pkg/eventbus/center.go:144 +0xb9
created by github.com/loggie-io/loggie/pkg/eventbus.StartAndRun
        /home/zhanglei/data/loggie-main/pkg/eventbus/center.go:76 +0x65

goroutine 41 [select]:
github.com/loggie-io/loggie/pkg/interceptor/retry.(*Interceptor).run(0xc0001ae150)
        /home/zhanglei/data/loggie-main/pkg/interceptor/retry/interceptor.go:161 +0x1f2
created by github.com/loggie-io/loggie/pkg/interceptor/retry.(*Interceptor).Start
        /home/zhanglei/data/loggie-main/pkg/interceptor/retry/interceptor.go:111 +0x5a

goroutine 42 [select]:
database/sql.(*DB).connectionOpener(0xc00039d520, {0x23b4470, 0xc0004be8c0})
        /usr/local/go/src/database/sql/sql.go:1226 +0x8d
created by database/sql.OpenDB
        /usr/local/go/src/database/sql/sql.go:794 +0x18d

goroutine 43 [select]:
github.com/loggie-io/loggie/pkg/source/file.(*dbHandler).run(0xc000117a00)
        /home/zhanglei/data/loggie-main/pkg/source/file/persistence.go:190 +0x265
created by github.com/loggie-io/loggie/pkg/source/file.newDbHandler
        /home/zhanglei/data/loggie-main/pkg/source/file/persistence.go:110 +0x20c

goroutine 44 [runnable]:
github.com/loggie-io/loggie/pkg/source/file.(*AckChainHandler).run(0xc0002ad180)
        /home/zhanglei/data/loggie-main/pkg/source/file/ack.go:295 +0x1d2
created by github.com/loggie-io/loggie/pkg/source/file.NewAckChainHandler
        /home/zhanglei/data/loggie-main/pkg/source/file/ack.go:264 +0x1c5

goroutine 45 [syscall]:
syscall.Syscall6(0xe8, 0x8, 0xc000613c14, 0x7, 0xffffffffffffffff, 0x0, 0x0)
        /usr/local/go/src/syscall/asm_linux_amd64.s:43 +0x5
golang.org/x/sys/unix.EpollWait(0x0?, {0xc000613c14?, 0x0?, 0x0?}, 0x0?)
        /home/zhanglei/data/loggie-main/vendor/golang.org/x/sys/unix/zsyscall_linux_amd64.go:77 +0x58
github.com/fsnotify/fsnotify.(*fdPoller).wait(0xc000118c00)
        /home/zhanglei/data/loggie-main/vendor/github.com/fsnotify/fsnotify/inotify_poller.go:86 +0x7d
github.com/fsnotify/fsnotify.(*Watcher).readEvents(0xc0002ad1d0)
        /home/zhanglei/data/loggie-main/vendor/github.com/fsnotify/fsnotify/inotify.go:192 +0x26e
created by github.com/fsnotify/fsnotify.NewWatcher
        /home/zhanglei/data/loggie-main/vendor/github.com/fsnotify/fsnotify/inotify.go:59 +0x1c5

goroutine 46 [select]:
github.com/loggie-io/loggie/pkg/source/file.(*Watcher).run(0xc0003b1200)
        /home/zhanglei/data/loggie-main/pkg/source/file/watch.go:613 +0x20b
created by github.com/loggie-io/loggie/pkg/source/file.newWatcher
        /home/zhanglei/data/loggie-main/pkg/source/file/watch.go:79 +0x28c

goroutine 47 [runnable]:
github.com/loggie-io/loggie/pkg/queue/channel.(*Queue).In(0x33c8940?, {0x23b6b68?, 0xc000297230?})
        /home/zhanglei/data/loggie-main/pkg/queue/channel/queue.go:167 +0x36
github.com/loggie-io/loggie/pkg/core/source.(*PublishInvoker).Invoke(0xc0004eb6b0?, {{0x23b6b68?, 0xc000297230?}, {0x7faec044bf48?, 0xc0001d2380?}})
        /home/zhanglei/data/loggie-main/pkg/core/source/invoke.go:55 +0x31
github.com/loggie-io/loggie/pkg/interceptor/maxbytes.(*Interceptor).Intercept(0xc0001145f0, {0x2399700, 0x33fc378}, {{0x23b6b68?, 0xc000297230?}, {0x7faec044bf48?, 0xc0001d2380?}})
        /home/zhanglei/data/loggie-main/pkg/interceptor/maxbytes/interceptor.go:77 +0x11e
github.com/loggie-io/loggie/pkg/pipeline.buildSourceInvokerChain.func1({{0x23b6b68?, 0xc000297230?}, {0x7faec044bf48?, 0xc0001d2380?}})
        /home/zhanglei/data/loggie-main/pkg/pipeline/pipeline.go:609 +0x54
github.com/loggie-io/loggie/pkg/core/source.(*AbstractInvoker).Invoke(0xc0004e8b48?, {{0x23b6b68?, 0xc000297230?}, {0x7faec044bf48?, 0xc0001d2380?}})
        /home/zhanglei/data/loggie-main/pkg/core/source/invoke.go:38 +0x3d
github.com/loggie-io/loggie/pkg/pipeline.(*Pipeline).startSourceProduct.func1({0x23b6b68, 0xc000297230})
        /home/zhanglei/data/loggie-main/pkg/pipeline/pipeline.go:511 +0xb6
github.com/loggie-io/loggie/pkg/source/file.(*Job).ProductEvent(0xc000690300, 0xe5fff, {0x1c68?, 0xc00004e00a?, 0x33c8940?}, {0xc0005dc398, 0x5b, 0x0?})
        /home/zhanglei/data/loggie-main/pkg/source/file/job.go:327 +0x746
github.com/loggie-io/loggie/pkg/source/file.(*Reader).work(0xc00048e750, 0x0)
        /home/zhanglei/data/loggie-main/pkg/source/file/read.go:183 +0xf3e
created by github.com/loggie-io/loggie/pkg/source/file.(*Reader).Start.func1
        /home/zhanglei/data/loggie-main/pkg/source/file/read.go:100 +0x33

goroutine 48 [select]:
github.com/loggie-io/loggie/pkg/queue/channel.(*Queue).worker(0xc0001d2380)
        /home/zhanglei/data/loggie-main/pkg/queue/channel/queue.go:124 +0x293
created by github.com/loggie-io/loggie/pkg/queue/channel.(*Queue).Start
        /home/zhanglei/data/loggie-main/pkg/queue/channel/queue.go:97 +0xed

goroutine 51 [select]:
github.com/loggie-io/loggie/pkg/pipeline.(*Pipeline).survive(0xc0004527e0)
        /home/zhanglei/data/loggie-main/pkg/pipeline/pipeline.go:645 +0xab
created by github.com/loggie-io/loggie/pkg/pipeline.(*Pipeline).Start
        /home/zhanglei/data/loggie-main/pkg/pipeline/pipeline.go:201 +0x1a5

goroutine 52 [select]:
github.com/loggie-io/loggie/pkg/core/reloader.(*Reloader).Run(0xc000111140, 0xc000058300)
        /home/zhanglei/data/loggie-main/pkg/core/reloader/reload.go:61 +0x115
created by main.main
        /home/zhanglei/data/loggie-main/cmd/loggie/main.go:105 +0x77b

goroutine 53 [IO wait]:
internal/poll.runtime_pollWait(0x7faec0545098, 0x72)
        /usr/local/go/src/runtime/netpoll.go:302 +0x89
internal/poll.(*pollDesc).wait(0xc000117b00?, 0x0?, 0x0)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:83 +0x32
internal/poll.(*pollDesc).waitRead(...)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:88
internal/poll.(*FD).Accept(0xc000117b00)
        /usr/local/go/src/internal/poll/fd_unix.go:614 +0x22c
net.(*netFD).accept(0xc000117b00)
        /usr/local/go/src/net/fd_unix.go:172 +0x35
net.(*TCPListener).accept(0xc0001111b8)
        /usr/local/go/src/net/tcpsock_posix.go:139 +0x28
net.(*TCPListener).Accept(0xc0001111b8)
        /usr/local/go/src/net/tcpsock.go:288 +0x3d
net/http.(*Server).Serve(0xc000390460, {0x23b32f8, 0xc0001111b8})
        /usr/local/go/src/net/http/server.go:3039 +0x385
net/http.(*Server).ListenAndServe(0xc000390460)
        /usr/local/go/src/net/http/server.go:2968 +0x7d
net/http.ListenAndServe(...)
        /usr/local/go/src/net/http/server.go:3222
main.main.func2()
        /home/zhanglei/data/loggie-main/cmd/loggie/main.go:119 +0xf1
created by main.main
        /home/zhanglei/data/loggie-main/cmd/loggie/main.go:118 +0x96c

Proces

尝试解决方案

//json = jsoniter.ConfigFastest
	json = jsoniter.ConfigCompatibleWithStandardLibrary

最后采用官方json库后解决,解决代码位置:

sink/codec/json.go

Support aliyun sls storage

Add a new sink that could send data to sls storage of aliyun, see below:

    sink:
      type: sls
      endpoint: xxx.cn-hangzhou.log.aliyuncs.com
      accessKeyId: xxxx
      accessKeySecret: xxxx
      project: loggie
      logStore: test1
      topic: ${fields.topic}

无法解析字段

报错日志"stacktrace": ["org.elasticsearch.index.mapper.MapperParsingException: failed to parse field [message] of type [text] in document with id 'Je5_QYABuC1itGwEeHC-'. Preview of field's value: ''",
image
image
es是7.8.0版本

Add spec.interceptors and spec.sink params in ClusterLogConfig/LogConfig CRD

logConfig example below:

apiVersion: loggie.io/v1beta1
kind: LogConfig
metadata:
  name: tomcat
  namespace: default
spec:
  selector:
    type: pod
    labelSelector:
      app: tomcat
  pipeline:
    sources: |
      - type: file
        name: common
        paths:
          - stdout
          - /usr/local/tomcat/logs/*.log
    sink: |
      type: dev
      printEvents: false
    interceptors: |
      - type: rateLimit
        qps: 90000

So we can add sink/interceptors in ClusterLogConfig/LogConfig directly.

note:

  • sink/interceptors in logConfig have higher priority than sinkRef/interceptorRef, so when there are sink and sinkRef both in the logConfig, only sink is validate.

容器内日志如何采集

需要pod的 yaml 需要做特定的 empty 或者 hostpath 的前提才可以采集到吗? 下面是一个我们的demo yaml

volumeMounts:
        - mountPath: /home/admin/logs
          name:01606d0d6354456aa546dfbb36d8a764-datadir
          subPath: logs

  volumes:
    - name:01606d0d6354456aa546dfbb36d8a764-datadir
      persistentVolumeClaim:
        claimName: >-
          01606d0d6354456aa54

每Pipeline不能多个Sink的问题

每Pipeline实现多Sink存在的问题在于:
1 多Sink时,如何解决单个Sink写下游失败时的重试的问题?
2 多Sink时,如何解决不同Sink的不同拦截器配置的问题?业务侧可能会想要不同Sink下游的数据中有不同的Intercepter配置呢?

从代码看已经实现了 sink.Interceptor可以与Sink关联 这个特性。
是否借助这个特性其实就可以解决以上两个问题了?

问题1 的解法
retry interceptor 与特定的Sink关联,这样就可以在不同的Sink间做独立的重试。
此时比如2个Sink其中有一个的下游阻塞了,则重试行为是同一个pipeline的不同sink应该都阻塞。
Roadmap计划是做成这个效果吗?

问题2的解法
为不同的Sink指定不同的拦截器,这样就可以解决不同数据下游需要不同处理流程的问题。不过还是没法将不同的source.Sink为不同的数据下游区分开。

可能opentelementry-collector的多pipeline复用source的方式才可以解决这个问题?
https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/design.md
The same receiver can feed data to multiple Pipelines and multiple pipelines can feed data into the same Exporter.

The interceptors cannot remove fields such as stream and time

环境:kubernetes1.21 + docker
logconfig
apiVersion: loggie.io/v1beta1 kind: LogConfig metadata: name: nginx namespace: default spec: pipeline: interceptorsRef: nginx-interce sinkRef: nginx-sink sources: | - type: file name: mylog containerName: nginx fields: topic: "nginx-access" matchFields: labelKey: [app] paths: - stdout selector: labelSelector: app: nginx type: pod
Interceptor
apiVersion: loggie.io/v1beta1 kind: Interceptor metadata: name: nginx-interce spec: interceptors: | - type: normalize name: stdproc belongTo: ["mylog"] processors: - jsonDecode: target: body - drop: targets: ["stream", "time", "body"] - rename: convert: - from: "log" to: "message"
sink
apiVersion: loggie.io/v1beta1 kind: Sink metadata: name: nginx-sink spec: sink: | type: dev printEvents: true

部署完成请求nginx,查看loggie pod日志为:
{ "fields": { "namespace": "default", "nodename": "10.0.20.28", "podname": "nginx-6799fc88d8-td4sc", "containername": "nginx", "logconfig": "nginx", "topic": "nginx-access" }, "body": "{\"log\":\"10.203.2.0 - - [21/Mar/2022:14:47:44 +0000] \\\"GET / HTTP/1.1\\\" 200 615 \\\"-\\\" \\\"curl/7.29.0\\\" \\\"-\\\"\\n\",\"stream\":\"stdout\",\"time\":\"2022-03-21T14:47:44.246358969Z\"}" }
日志中log字段没有被替换,且stream、time字段未被删除

磁盘占用不释放

containerd历史日志自动删除 loggie占用历史日志不释放占用硬盘空间 需要重启loggie服务才会释放磁盘空间

Adding raw sink codec

Support a kind of sink codec which do nothing but just return event body bytes.

example:

    sink:
      type: dev
      printEvents: true
      codec:
        type: raw

support any data content and lasting period in dev source

Describe the feature

Loggie Dev Source is primarily used to simulate data generation. Currently, it supports qps and byteSize, and we hope to add:

  • Configurable data format and content
  • Supports data generation only for a period of time

Loggie dev source主要用于模拟产生数据,目前支持qps和byteSize,我们希望可以增加:

  • 可配置的数据格式和内容
  • 支持只在一段时间产生数据

Single pipeline bottleneck

The performance of the single pipeline have the bottleneck, it would lead the rate of the collection in about 40MiB/s.
But actually, the max rate of the single HDD disk would reach about 180-200MiB/s.
Will you tune the performance to reach the collection rate of 200MiB/s for single pipeline?

One Pipeline can only have one Sink?

According to the doc and source code, it seems one pipeline can only have one Sink?
How to configure for more than one sink in one pipeline?
Can sources in different pipelines be reused for different sink?

support loki sink

loki sink configuration example, see below

sink:
- type: loki
  url: "http://localhost:3100/loki/api/v1/push"
  encodeJson: true

Glob extension support

  1. Brace Expansion:{alt1,...},matches a sequence of characters if one of the comma-separated alternatives matches
  2. Glob Star:/**/,matches zero or more directories

example:

collect target files:

  1. /tmp/loggie/service/order/access.log
  2. /tmp/loggie/service/order/access.log.2022-04-11
  3. /tmp/loggie/service/pay/access.log
  4. /tmp/loggie/service/pay/access.log.2022-04-11

config path: /tmp/loggie/**/access.log{,.[2-9][0-9][0-9][0-9]-[01][0-9]-[0123][0-9]}

A lightweight stream process interceptor

In this stream interceptor, we could do:

  • filter events
  • aggregate events, including operators: COUNT/COUNT-DISTINCT/SUM/AVG/MAX/MIN...
  • simple calculate expression

and more?

matchFields参数无法使用

logConfig如下:

kind: LogConfig
metadata:
name: nginx
namespace: default
spec:
pipeline:
sinkRef: nginx-sink
sources: |
- type: file
name: mylog
containerName: nginx
matchFields:
labelKey: app
paths:
- stdout
selector:
labelSelector:
app: nginx
type: pod

loggie pod日志报错:

{"level":"warn","time":"2022-03-20T06:21:42Z","caller":"/go/src/loggie.io/loggie/pkg/discovery/kubernetes/controller/controller.go:406","message":"reconcile logConfig default/nginx err: yaml: unmarshal errors:\n line 5: cannot unmarshal !!str app into []string\nunpack logConfig default sources failed"}

retry存在的问题分析

目前看retry这部分实现较为复杂,经过分析,存在如下的两个问题需要特别注意:
1 重试存在乱序的可能
思考如下场景:Sink消费协程从Queue消费一个batch,此时执行失败,则在Retry Intercepter的逻辑中,将此batch给丢到 retry chan。
此时,如果Retry Intercepter中的协程没有来得及调度,此时Sink消费协程仍然可以从Queue中消费下一个batch,并写入到下游。这里有时间窗口导致一个文件内的数据采集乱序。
乱序的根因在于 Retry Intercepter中的协程 与 Sink 消费协程中间执行存在race condition。

2 retry Invoker链上的Intercepter必须保证幂等,否则有问题。
目前有两条sink.Invoker链,一条是正常的(Sink消费协程执行),一条是重试的(Pipeline中的Survive协程执行)。
目前看代码中,一个被重试的batch则会在两条链上执行,实际上如果不能保证Intercepter逻辑的幂等,则经过重试的batch会有可能出现重复的字段值。

以上两个问题,辛苦帮忙看下,是否是这样呢?

未复现文档中同时采集容器标准输出和容器内日志文件的功能

按照文档所示,进行配置,只能采集标准输出的日志,未能采集容器内日志文件。
采集对象是我写的一个简单的程序,同时向标准输出和文件输出日志内容:

kind: Deployment
metadata:
  name: log2file
spec:
  selector:
    matchLabels:
      app: log2file
  template:
    metadata:
      labels:
        app: log2file
    spec:
      containers:
        - name: log2file
          image: reg.harbor.com/logging/log2file:v1.0.0
          env:
            - name: LOG_PATH
              value: /var/log/tw/
          volumeMounts:
          - mountPath: /var/log/tw
            name: log
      volumes:
        - emptyDir: {}
          name: log

sink文件:

apiVersion: loggie.io/v1beta1
kind: Sink
metadata:
  name: log2file
spec:
  sink: |
    type: dev
    printEvents: true

Interceptor文件:

apiVersion: loggie.io/v1beta1
kind: Interceptor
metadata:
  name: log2file
spec:
  interceptors: |
    - type: normalize
      name: stdproc
      belongTo: ["stdoutlog"]
      processors:
      - jsonDecode: ~
      - drop:
          targets: ["stream", "time", "body"]
      - regex:
          target: "log"
          pattern: 'time=\"(?<time>.*?)\" level=(?<level>\S+) msg=\"(?<msg>.*?)\" No=(?<No>\S+)'
      - drop:
          targets: ["log"]
    - type: normalize
      name: fileproc
      belongTo: ["infolog"]
      processors:
      - jsonDecode: ~

logconfig文件:

apiVersion: loggie.io/v1beta1
kind: LogConfig
metadata:
  name: log2file
  namespace: default
spec:
  selector:
    type: pod
    labelSelector:
      app: log2file
  pipeline:
    sources: |
      - type: file
        name: stdoutlog
        paths:
        - stdout
        fields:
          from: stdout
      - type: file
        name: infolog
        paths:
        - /var/log/tw/info.*.log
        fields:
          from: filelog
    sinkRef: log2file
    interceptorRef: log2file

怎么考虑pipelines之间的资源隔离问题的?

你好,developer~
loggie以sidecar 或者 daemonSet的方式部署后,对loggie容器限制一定的资源,那么不同的pipeline之间就会竞争资源,如果某个数据量增大,必然会影响其它pipeline的采集状态。请问在设计上对于这块是怎么考虑的?谢谢~

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.