Giter VIP home page Giter VIP logo

aeron-go's Introduction

Build Status Go Report Card Join the chat at https://gitter.im/aeron-go/Lobby

aeron-go

Implementation of Aeron messaging client in Go.

Architecture, design, and protocol of Aeron can be found here

Usage

Example subscriber can be found here.

Example publication can be found here.

Common

Instantiate Aeron with Context:

ctx := aeron.NewContext().MediaDriverTimeout(time.Second * 10)

a := aeron.Connect(ctx)

Subscribers

Create subscription:

subscription := <-a.AddSubscription("aeron:ipc", 10)

defer subscription.Close()

aeron.AddSubscription() returns a channel, so that the user has the choice of blocking waiting for subscription to register with the driver or do async select poll.

Define callback for message processing:

handler := func(buffer *buffers.Atomic, offset int32, length int32, header *logbuffer.Header) {
    bytes := buffer.GetBytesArray(offset, length)

    fmt.Printf("Received a fragment with payload: %s\n", string(bytes))
}

Poll for messages:

idleStrategy := idlestrategy.Sleeping{time.Millisecond}

for {
    fragmentsRead := subscription.Poll(handler, 10)
    idleStrategy.Idle(fragmentsRead)
}

Publications

Create publication:

publication := <-a.AddPublication("aeron:ipc", 10)

defer publication.Close()

aeron.AddPublication() returns a channel, so that the user has the choice of blocking waiting for publication to register with the driver or do async select poll.

Create Aeron buffer to send the message:

message := fmt.Sprintf("this is a message %d", counter)

srcBuffer := buffers.MakeAtomic(([]byte)(message))

Optionally make sure that there are connected subscriptions:

for !publication.IsConnected() {
    time.Sleep(time.Millisecond * 10)
}

Send the message, by calling publication.Offer

ret := publication.Offer(srcBuffer, 0, int32(len(message)), nil)
switch ret {
case aeron.NotConnected:
    log.Print("not connected yet")
case aeron.BackPressured:
    log.Print("back pressured")
default:
    if ret < 0 {
        log.Print("Unrecognized code: %d", ret)
    } else {
        log.Print("success!")
    }
}

aeron-go's People

Contributors

aaronong avatar aayushduwadi avatar b-gupta avatar billsegall avatar ccnlui avatar corymonroe-coinbase avatar ethanf avatar gitter-badger avatar grddev avatar jting-cb avatar lirm avatar magno-cb avatar mattyulrich avatar mjpt777 avatar neomantra avatar ntrifunovic avatar petertalos avatar sashahilton00 avatar steven-stern avatar talos-rob avatar vladopajic avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aeron-go's Issues

Failed to connect to media driver

First of all, I apologise if this is the wrong place to raise this issue, but any help would be greatly appreciated.

I have the following docker compose file:

version: "3.8"

services:
  media-driver:
    container_name: media-driver
    restart: always
    shm_size: 128M
    network_mode: host
    command: >
      "-Daeron.sample.ping.channel=aeron:udp?endpoint=service:40124 \
       -Daeron.sample.pong.channel=aeron:udp?endpoint=media-driver:40123 \
       io.aeron.samples.Ping"
    environment:
      - USER=ewanvalentine
    image: registry.gitlab.com/neomantra/oss/aeron/cpp-debian:master
    volumes:
      - ./dev/:/dev/shm/

  service:
    container_name: service
    restart: always
    build: .
    network_mode: host
    environment:
      - USER=ewanvalentine
    volumes:
      - ./dev/:/dev/shm/

And I'm using one of the examples from this repo, but I'm getting the error:
Failed to connect to media driver: aeron cnc file version not understood: version=512

Just wondered if you'd seen this previously? Thanks!

Accept outside logger, do not panic

Hi, I've started using this project. I'm thinking about rewriting the parts that instantiate zap loggers and instead get them from outside, also, as a library, it should return errors instead of panicking.

What do you guys think about this?
Best,
Vincent

Segmentation violation error dereferencing EndOfStreamPosOff in Close() in image.go

In Close() of aeron/image.go, lines 264-265, are setting the isEos of image:
image.isEos = image.finalPosition >=
image.logBuffers.Meta().EndOfStreamPosOff.Get()
However, image.logBuffers.Meta().EndOfStreamPosOff could be a nil pointer, and calling image.logBuffers.Meta().EndOfStreamPosOff.Get() resulted in a nil pointer dereferencing segmentation violation error:

fatal error: fault
[signal SIGSEGV: segmentation violation code=0x1 addr=0x7f182d29a080 pc=0x15f8983]
goroutine 131 [running, locked to thread]:
runtime.throw({0x1aaba05?, 0x1d7ec00?})
GOROOT/src/runtime/panic.go:992 +0x71 fp=0xc00194d910 sp=0xc00194d8e0 pc=0x4361b1
runtime.sigpanic()
GOROOT/src/runtime/signal_unix.go:825 +0x2ec fp=0xc00194d960 sp=0xc00194d910 pc=0x44c52c
github.com/lirm/aeron-go/aeron/flyweight.(*Int64Field).Get(0xc0003bd290)
external/com_github_lirm_aeron_go/aeron/flyweight/fields.go:72 +0x23 fp=0xc00194d978 sp=0xc00194d960 pc=0x15f8983
github.com/lirm/aeron-go/aeron.(*Image).Close(0xc004e72280)
external/com_github_lirm_aeron_go/aeron/image.go:265 +0xc8 fp=0xc00194da50

CNC file corruption on Windows

Starting a go-based aeron client corrupts the CNC.dat file on Windows. It appears that something is mistakenly setting the first int in the file to 1. The client then reports this error:

aeron cnc file version not understood: version=36503552

Upon restarting the driver, it throws the following:

Exception in thread "main" java.lang.IllegalStateException: aeron cnc file version not understood: version=1
        at io.aeron.CommonContext.isDriverActive(CommonContext.java:283)
        at io.aeron.driver.MediaDriver.ensureDirectoryIsRecreated(MediaDriver.java:402)
        at io.aeron.driver.MediaDriver.<init>(MediaDriver.java:162)

To replicate:

  • clone AdaptiveConsulting/Aeron.NET and build the Ping.exe and Pong.exe samples
  • start the media driver
  • verify ping/pong, then shut them down so they don't interfere with aeron-go
  • start the aeron-go Ping example, which should report the above version error
  • stop the driver and check the file header of the CNC file. Its first int should be the value 1
  • restart the driver, it should report the stack trace above

Deadlock quitting pong

Stack:

PC=0x58abb m=0

goroutine 0 [idle]:
runtime.mach_semaphore_wait(0x1103, 0xc40000000e, 0x7fff5fbff580, 0x1dace0, 0x7fff5fbff430, 0x1dace0, 0x7fff5fbff440, 0x4ef13, 0xffffffffffffffff, 0x7fff5fbff470, ...)
/usr/local/Cellar/go/1.7/libexec/src/runtime/sys_darwin_amd64.s:418 +0xb
runtime.semasleep1(0xffffffffffffffff, 0x7fff5fbff470)
/usr/local/Cellar/go/1.7/libexec/src/runtime/os_darwin.go:435 +0x4b
runtime.semasleep.func1()
/usr/local/Cellar/go/1.7/libexec/src/runtime/os_darwin.go:451 +0x33
runtime.systemstack(0x7fff5fbff468)
/usr/local/Cellar/go/1.7/libexec/src/runtime/asm_amd64.s:314 +0xab
runtime.semasleep(0xffffffffffffffff, 0x0)
/usr/local/Cellar/go/1.7/libexec/src/runtime/os_darwin.go:452 +0x44
runtime.notesleep(0x1db370)
/usr/local/Cellar/go/1.7/libexec/src/runtime/lock_sema.go:166 +0x9f
runtime.stopm()
/usr/local/Cellar/go/1.7/libexec/src/runtime/proc.go:1594 +0xad
runtime.gcstopm()
/usr/local/Cellar/go/1.7/libexec/src/runtime/proc.go:1798 +0x9a
runtime.findrunnable(0xc420020a00, 0x0)
/usr/local/Cellar/go/1.7/libexec/src/runtime/proc.go:1848 +0x9a8
runtime.schedule()
/usr/local/Cellar/go/1.7/libexec/src/runtime/proc.go:2120 +0x14c
runtime.park_m(0xc4200c24e0)
/usr/local/Cellar/go/1.7/libexec/src/runtime/proc.go:2183 +0x123
runtime.mcall(0x7fff5fbff690)
/usr/local/Cellar/go/1.7/libexec/src/runtime/asm_amd64.s:240 +0x5b

goroutine 1 [semacquire, 1070 minutes]:
sync.runtime_Semacquire(0xc420096194)
/usr/local/Cellar/go/1.7/libexec/src/runtime/sema.go:47 +0x30
sync.(_Mutex).Lock(0xc420096190)
/usr/local/Cellar/go/1.7/libexec/src/sync/mutex.go:85 +0xd0
github.com/lirm/aeron-go/aeron.(_ClientConductor).releasePublication(0xc420096148, 0xa, 0x0)
/Users/e11778/gohome/src/github.com/lirm/aeron-go/aeron/clientconductor.go:292 +0x112
github.com/lirm/aeron-go/aeron.(*Publication).Close(0xc42001cbd0, 0xc40000000b, 0xc4200fa060)
/Users/e11778/gohome/src/github.com/lirm/aeron-go/aeron/publication.go:92 +0x85
main.main()
/Users/e11778/gohome/src/github.com/lirm/aeron-go/examples/pong/pong.go:112 +0x5e6

goroutine 17 [syscall, 1376 minutes, locked to thread]:
runtime.goexit()
/usr/local/Cellar/go/1.7/libexec/src/runtime/asm_amd64.s:2086 +0x1

goroutine 5 [syscall]:
os/signal.signal_recv(0x1c5140)
/usr/local/Cellar/go/1.7/libexec/src/runtime/sigqueue.go:116 +0x157
os/signal.loop()
/usr/local/Cellar/go/1.7/libexec/src/os/signal/signal_unix.go:22 +0x22
created by os/signal.init.1
/usr/local/Cellar/go/1.7/libexec/src/os/signal/signal_unix.go:28 +0x41

goroutine 6 [semacquire, 1306 minutes, locked to thread]:
sync.runtime_Semacquire(0xc420096194)
/usr/local/Cellar/go/1.7/libexec/src/runtime/sema.go:47 +0x30
sync.(_Mutex).Lock(0xc420096190)
/usr/local/Cellar/go/1.7/libexec/src/sync/mutex.go:85 +0xd0
github.com/lirm/aeron-go/aeron.(_ClientConductor).releaseSubscription(0xc420096148, 0x8, 0xc4200ea090, 0x1, 0x1, 0x0)
/Users/e11778/gohome/src/github.com/lirm/aeron-go/aeron/clientconductor.go:377 +0x11e
github.com/lirm/aeron-go/aeron.(_Subscription).Close(0xc4200ce500, 0xc4200e23e0, 0xc4200e23e0)
/Users/e11778/gohome/src/github.com/lirm/aeron-go/aeron/subscription.go:62 +0xab
github.com/lirm/aeron-go/aeron.(_ClientConductor).Close(0xc420096148, 0x147e38, 0xc420096190)
/Users/e11778/gohome/src/github.com/lirm/aeron-go/aeron/clientconductor.go:171 +0x1b5
github.com/lirm/aeron-go/aeron.(_ClientConductor).onInterServiceTimeout(0xc420096148, 0x146f592464b84688)
/Users/e11778/gohome/src/github.com/lirm/aeron-go/aeron/clientconductor.go:550 +0xfd
github.com/lirm/aeron-go/aeron.(_ClientConductor).onHeartbeatCheckTimeouts(0xc420096148, 0x0)
/Users/e11778/gohome/src/github.com/lirm/aeron-go/aeron/clientconductor.go:563 +0x254
github.com/lirm/aeron-go/aeron.(*ClientConductor).Run(0xc420096148, 0x1c4a40, 0xc42000cc70)
/Users/e11778/gohome/src/github.com/lirm/aeron-go/aeron/clientconductor.go:207 +0x13c
created by github.com/lirm/aeron-go/aeron.Connect
/Users/e11778/gohome/src/github.com/lirm/aeron-go/aeron/aeron.go:82 +0x319

goroutine 9 [select, 1376 minutes, locked to thread]:
runtime.gopark(0x147d80, 0x0, 0x13816e, 0x6, 0x18, 0x2)
/usr/local/Cellar/go/1.7/libexec/src/runtime/proc.go:259 +0x13a
runtime.selectgoImpl(0xc420030f30, 0x0, 0x18)
/usr/local/Cellar/go/1.7/libexec/src/runtime/select.go:423 +0x11d9
runtime.selectgo(0xc420030f30)
/usr/local/Cellar/go/1.7/libexec/src/runtime/select.go:238 +0x1c
runtime.ensureSigM.func1()
/usr/local/Cellar/go/1.7/libexec/src/runtime/signal1_unix.go:304 +0x2d1
runtime.goexit()
/usr/local/Cellar/go/1.7/libexec/src/runtime/asm_amd64.s:2086 +0x1

goroutine 20 [runnable]:
github.com/lirm/aeron-go/aeron.(*Subscription).Poll(0xc4200ce500, 0xc4200ee0c0, 0xa, 0x0)
/Users/e11778/gohome/src/github.com/lirm/aeron-go/aeron/subscription.go:69
main.main.func3(0xc4200ce500, 0xc4200ee0c0)
/Users/e11778/gohome/src/github.com/lirm/aeron-go/examples/pong/pong.go:105 +0x3e
created by main.main
/Users/e11778/gohome/src/github.com/lirm/aeron-go/examples/pong/pong.go:108 +0x57f

rax 0xe
rbx 0x1db260
rcx 0x7fff5fbff3e0
rdx 0x7fff5fbff468
rdi 0x1103
rsi 0x1db261
rbp 0x7fff5fbff418
rsp 0x7fff5fbff3e0
r8 0x7fff5fbff448
r9 0x0
r10 0x0
r11 0x246
r12 0x1ee194258af1d
r13 0x1ef8484969fc0
r14 0x146f66e328b4fa00
r15 0x17c5d8
rip 0x58abb
rflags 0x246
cs 0x7
fs 0x0
gs 0x0

logger caller stack issue

If zap log print with caller code line, it will always in aeron/logging/logging.go.
It could be fixed by replace logging.go line 62 from:
z.logger = logger.Named(name)
to
z.logger = logger.Named(name).WithOptions(zap.AddCallerSkip(1))

MemCpy Pointer Arithmetics Triggers Race Condition(?)

Hi @lirm , I've been trying to debug my application with -race tag, and it always triggered an error related to the unsafe pointer arithmetic on:

func Memcpy(dest uintptr, src uintptr, length int32) {

To be more precise, it threw the following error:

fatal error: checkptr: pointer arithmetic result points to invalid allocation

Which can be traced to the following line:

destPtr := unsafe.Pointer(dest + uintptr(i))

Do you have any workaround for this particular issue? I mean if somehow I need to debug the race condition on another parts of my code.

`Context.AeronDir` does not allow fully specified directory

Context.AeronDir currently specifies the base path that has /aeron-$USER/cnc.dat appended to it.

This is different than the Java and C++ Aeron clients' AeronDir that specify the base directory with /cnc.dat appended.

The drawback of the current approach is that one can never fully specify the path, it will always have aeron-$USER appended.

Changing this will break existing applications but will make operations among Java and C++ clients more consistent.

Is this repository dead?

As in topic. I would like to use it and try it out but I don't want to be locked in something that is dying. Is this repository dead? If so, can I ask why?

I can also consider contributing if the reason is a shortage of manpower.

Add missing available image APIs

We're missing an API that allows an image handler to be specified at the time of creating a subscription. We have image handler support at the context level, but this is hard to use.

    inline std::int64_t addSubscription(
        const std::string &channel,
        std::int32_t streamId,
        const on_available_image_t &onAvailableImageHandler,
        const on_unavailable_image_t &onUnavailableImageHandler)

missing code?

Trying to compile I get these errors:

util\memmap\memmap.go:91: undefined: syscall.Mmap
util\memmap\memmap.go:91: undefined: syscall.PROT_READ
util\memmap\memmap.go:91: undefined: syscall.PROT_WRITE
util\memmap\memmap.go:91: undefined: syscall.MAP_SHARED
util\memmap\memmap.go:123: undefined: syscall.Mmap
util\memmap\memmap.go:123: undefined: syscall.PROT_READ
util\memmap\memmap.go:123: undefined: syscall.PROT_WRITE
util\memmap\memmap.go:123: undefined: syscall.MAP_SHARED
util\memmap\memmap.go:137: undefined: syscall.Munmap

Is there some missing file in the repo?

How can a publication detect if there are connected subscribers?

The example shows the following method:

if !publication.IsConnected() {
    log.Print("no subscribers detected")
}

But then there is also this comment:

// IsConnected returns whether this publication is connected to the driver (not whether it has any Subscriptions)
func (pub *Publication) IsConnected() bool {
	return !pub.IsClosed() && pub.metaData.IsConnected.Get() == 1
}

I'm now very confused. How would a publication detect if there are active subscribers?

Thank you.

Support for subscription by session-id

This is partly a note to self as I'm about to take an extended holiday before implementing and partly an opportunity for comment.

For aeron-archive's ReplayMerge feature the aeron code uses subscriptions by session-id by establishing and looking up a publication.

To achieve this this, the Java implementation maintains a map of registrationIDs to the client conductors internal subscription objects. My intention is to rework the AddSubscription()/OnSubscriptionReady() to support this.

How to write /tmp/aeron-root/cnc.dat file

hi
when i use aeron.NewContext().AeronDir("/tmp").MediaDriverTimeout(time.Second * 10)
i got an error: Failed to map the file ./tmp/aeron-root/cnc.dat with zero size for existing file.
but i dont know how write a cnc.dat file。
can u give a particular cnc.dat file for example?
thanks.

codahale/hdrhistogram repo url has been transferred under the github HdrHstogram umbrella

Problem

The codahale/hdrhistogram repo has been transferred under the github HdrHstogram umbrella with the help from the original author in Sept 2020 (new repo url https://github.com/HdrHistogram/hdrhistogram-go). The main reasons are to group all implementations under the same roof and to provide more active contribution from the community as the original repository was archived several years ago.

The dependency URL should be modified to point to the new repository URL. The tag "v0.9.0" was applied at the point of transfer and will reflect the exact code that was frozen in the original repository.

If you are using Go modules, you can update to the exact point of transfer using the @v0.9.0 tag in your go get command.

go mod edit -replace github.com/codahale/hdrhistogram=github.com/HdrHistogram/[email protected]

Performance Improvements

From the point of transfer, up until now (mon 16 aug 2021), we've released 3 versions that aim support the standard HdrHistogram serialization/exposition formats, and deeply improve READ performance.
We recommend to update to the latest version.

Incorrect location for CNC file on Windows

aeron/context.go constructs the cnc file by combining "aeron-" with os.Getenv("USER"), but it seems Windows stores the desired value in USERNAME, not USER. Changing to os.Getenv("USERNAME") seems to locate the file correctly.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.