Giter VIP home page Giter VIP logo

karafka's Introduction

karafka logo

Build Status Gem Version Join the chat at https://slack.karafka.io

About Karafka

Karafka is a Ruby and Rails multi-threaded efficient Kafka processing framework that:

# Define what topics you want to consume with which consumers in karafka.rb
Karafka::App.routes.draw do
  topic 'system_events' do
    consumer EventsConsumer
  end
end

# And create your consumers, within which your messages will be processed
class EventsConsumer < ApplicationConsumer
  # Example that utilizes ActiveRecord#insert_all and Karafka batch processing
  def consume
    # Store all of the incoming Kafka events locally in an efficient way
    Event.insert_all messages.payloads
  end
end

Karafka uses threads to handle many messages simultaneously in the same process. It does not require Rails but will integrate tightly with any Ruby on Rails applications to make event processing dead simple.

Getting started

karafka web ui

If you're entirely new to the subject, you can start with our "Kafka on Rails" articles series, which will get you up and running with the terminology and basic ideas behind using Kafka:

If you want to get started with Kafka and Karafka as fast as possible, then the best idea is to visit our Getting started guides and the example apps repository.

We also maintain many integration specs illustrating various use-cases and features of the framework.

TL;DR (1 minute from setup to publishing and consuming messages)

Prerequisites: Kafka running. You can start it by following instructions from here.

  1. Add and install Karafka:
# Make sure to install Karafka 2.3
bundle add karafka --version ">= 2.3.0"

bundle exec karafka install
  1. Dispatch a message to the example topic using the Rails or Ruby console:
Karafka.producer.produce_sync(topic: 'example', payload: { 'ping' => 'pong' }.to_json)
  1. Run Karafka server and see the consumption magic happen:
bundle exec karafka server

[86d47f0b92f7] Polled 1 message in 1000ms
[3732873c8a74] Consume job for ExampleConsumer on example started
{"ping"=>"pong"}
[3732873c8a74] Consume job for ExampleConsumer on example finished in 0ms

Want to Upgrade? LGPL is not for you? Want to help?

I also sell Karafka Pro subscriptions. It includes a commercial-friendly license, priority support, architecture consultations, enhanced Web UI and high throughput data processing-related features (virtual partitions, long-running jobs, and more).

10% of the income will be distributed back to other OSS projects that Karafka uses under the hood.

Help me provide high-quality open-source software. Please see the Karafka homepage for more details.

Support

Karafka has Wiki pages for almost everything and a pretty decent FAQ. It covers the installation, setup, and deployment, along with other useful details on how to run Karafka.

If you have questions about using Karafka, feel free to join our Slack channel.

Karafka has priority support for technical and architectural questions that is part of the Karafka Pro subscription.

karafka's People

Contributors

agwozdowski avatar dependabot[bot] avatar dkolath avatar filiptepper avatar juike avatar jwkoelewijn avatar katafrakt avatar lardcanoe avatar mensfeld avatar mikkelrbech avatar mmkolodziej avatar nijikon avatar ojab avatar olleolleolle avatar oozzal avatar pavel-guseynov avatar pavlo-vavruk avatar prikha avatar quynhethereal avatar renovate[bot] avatar rewritten avatar robertomiranda avatar solnic avatar spape avatar thomasklemm avatar unasuke avatar vbyno avatar vitellochris avatar webandtech avatar xdamman avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

karafka's Issues

Loading application

Example of my app:

# Karafka application class
class App < Karafka::App
  Dir[App.root + 'lib/**/*.rb'].each { |file| require file }

  setup do |config|
    config.kafka_hosts = System::Settings.kafka.hosts
    config.zookeeper_hosts = System::Settings.zookeeper.hosts
    config.worker_timeout = System::Settings.sidekiq.timeout
    config.redis = System::Settings.sidekiq.redis
    config.concurrency = System::Settings.karafka.concurrency
    config.name = System::Settings.karafka.application_name
  end
end

# Load all application internal code (app/, lib/, etc)
Karafka::Loader.new.load(App.root)

I have to load /lib to get System::Settings before initializing App before running `Karafka::Loader.new.load. And doing 2 loading feels like duplicating. I think loading files and configuration should be 2 separate phases.

Backoff configuration

Hello,

after an incident that our Kafka producer generated a huge backlog, Karafka Sidekiq used huge ammounts of Ram while queueing everything in Redis. So the question is, is there a way to set a backoff configuration, so that the consumer stops fetching messages if the Sidekiq queue is maxed out?

Best
Giorgos

Sidekiq config file

Unless I has config/sidekiq.yml file, be rake karakfa:sidekiq starts sidekiq with default settings. You can track it by number of threads: it's always 25.

Set up monitor in config

It'd be great to be able to set monitor in config:

class App < Karafka::App
  ...
  setup do |config|
    config.monitor = AppMonitor.new
    ...
  end
end

Redis namespace

In default sidekiq config there's a possibility to specify redis namespace:

  config.redis = { url: 'redis://redis.example.com:7372/12', namespace: 'mynamespace' }

And karafka configuration lack this parameter.

I'm not sure how it should be resolved, since it really adds maintainability cost to support every sidekiq's option. That's why I think that explicit workers are great choice.

Explicit workers

There should be an abillity to use explicit sidekiq workers, since current implicit creates more problems that it solves:

  • it's impossible to use sidekiq plugins
  • what if I want to use enterprise version of sidekiq
  • there's not option to prioritise sidekiq workers

and so on. It'd be 100 times better to be able to use user-defined workers.

Customize socket timeout

We should have an option to tune this parameter and based on that we should also tune Celluloid shutdown_timeout

Perform method not defined error

If I define my own worker, perform method should not be needed.

class LandingInvitationController < Karafka::BaseController
  self.topic = 'mail.landing_invitation'
  self.worker = LandingInvitationJob

  # def perform
  # end
end

class LandingInvitationJob
  include Sidekiq::Worker

  sidekiq_options :queue => :landing_invitation

  def perform(payload)
    Karafka.logger.info(payload)
  end
end

echo "Cg5mb29Aa3VlbmRlLmNvbRIGYmFyYmF6" | kafkacat -b localhost:9092 -t mail.landing_invitation -P

Karafka::Errors::PerformMethodNotDefined (Karafka::Errors::PerformMethodNotDefined)

If I uncomment the perform method, it is not called anyway. So why should I add the empty method if worker is called instead ?

Autodiscover Kafka brokers based on Zookeeper data

ZK.new('172.17.0.5:2181').children '/brokers/ids'
ZK.new('172.17.0.5:2181').get '/brokers/ids/0'
["{\"jmx_port\":7203,\"timestamp\":\"1453980499046\",\"host\":\"172.17.0.6\",\"version\":1,\"port\":9092}", #<Zookeeper::Stat:0x000000019e8828 @exists=true, @czxid=18, @mzxid=18, @ctime=1453980499065, @mtime=1453980499065, @version=0, @cversion=0, @aversion=0, @ephemeralOwner=95288065869611008, @dataLength=89, @numChildren=0, @pzxid=18>]

Support for zookeeper chroot

Hello,

Does consumer support chroot for zookeeper connections?
I am asking because on a multi-environment setup and each Kafka broker should running under a different "/" node when all Kafka clusters are using the same Zookeeper cluster.

I've tried adding the chroot in config.zookeeper_hosts array but Karafka seems to ignore the setting.
config.zookeeper_hosts = %w( 10.0.0.1:2181/kafka_prd 10.0.0.2:2181/kafka_prd 10.0.0.3:2181/kafka_prd )

Thank you.

rake karafka:run uses app.rb only

I'd like to use app.rb for sinatra app and leave karafka_app.rb for karafka app. It's not possible now and it forces me to keep all setting together in one file

wrong constant name

Controller code:

# app/controllers/channels/playlist_items/uploaded_controller.rb
module Channels
  module PlaylistItems
    class UploadedController < Karafka::BaseController

Exception:

    from /Users/yukke/.rvm/gems/ruby-2.2.3@playlist-processor/bundler/gems/karafka-c43362cd860f/lib/karafka/workers/builder.rb:33:in `build'
    from /Users/yukke/.rvm/gems/ruby-2.2.3@playlist-processor/bundler/gems/karafka-c43362cd860f/lib/karafka/base_controller.rb:131:in `worker'
    from /Users/yukke/.rvm/gems/ruby-2.2.3@playlist-processor/bundler/gems/karafka-c43362cd860f/lib/karafka/routing/mapper.rb:15:in `map'
    from /Users/yukke/.rvm/gems/ruby-2.2.3@playlist-processor/bundler/gems/karafka-c43362cd860f/lib/karafka/routing/mapper.rb:15:in `workers'
    from /Users/yukke/.rvm/gems/ruby-2.2.3@playlist-processor/bundler/gems/karafka-c43362cd860f/lib/karafka/app.rb:19:in `bootstrap'
    from /Users/yukke/apps/StrikeSocial/PlaylistProcessor/app.rb:27:in `<top (required)>'
    from /Users/yukke/.rvm/rubies/ruby-2.2.3/lib/ruby/2.2.0/irb/init.rb:280:in `require'
    from /Users/yukke/.rvm/rubies/ruby-2.2.3/lib/ruby/2.2.0/irb/init.rb:280:in `block in load_modules'
    from /Users/yukke/.rvm/rubies/ruby-2.2.3/lib/ruby/2.2.0/irb/init.rb:278:in `each'
    from /Users/yukke/.rvm/rubies/ruby-2.2.3/lib/ruby/2.2.0/irb/init.rb:278:in `load_modules'
    from /Users/yukke/.rvm/rubies/ruby-2.2.3/lib/ruby/2.2.0/irb/init.rb:20:in `setup'
    from /Users/yukke/.rvm/rubies/ruby-2.2.3/lib/ruby/2.2.0/irb.rb:378:in `start'
    from /Users/yukke/.rvm/rubies/ruby-2.2.3/bin/irb:15:in `<main>'

Where to define routes

Hello,

I was not able to use routing because I get the error

undefined method 'routes' for App:Class (NoMethodError)

Where are routes supposed to be defined? I added mine inside app.rb, under App class definition.

Any ideas about what I might be doing wrong?

Also I had to use Karafka::BaseController insead of ApplicationController because the latter was undefined. I am using v 0.4.

thanks

Custom parser class

Hi,

I'm working with protobuf messages and for each message I have a generated class:

class Proto::MyClass
  include Beefcake::Message
  required :email, :string, 1
  required :token, :string, 2
end

Is it possible to support lambda as well for parser class ?

class MyController < Karafka::BaseController
     self.parser = ParserProxy.proxy(Proto::MyClass)
end

class ParserProxy
  class ParserError < StandardError; end

  def self.proxy(klass)
    lambda do |msg|
      begin
        klass.decode(msg)
      rescue => e
        MyNotifier.capture_error(e, :extra => { :consumer => klass.name.to_s, :message => Base64.encode64(msg) })
        raise ParserError
      end
    end
  end
end

Poseidon cluster incorporation

We should take the poseidon cluster logic and incorporate it in Karafka environment. It is really small and would be better to keep it as a core feature since we will work on clusterization, etc

New Routing engine

I believe that we should have more strict routing engine that will allow use to do following things:

  1. Validate topics names (so invalid won't hit Kafka)
  2. Should allow catching typos and other programmers bugs
  3. Should allow control over outgoing messages/events so we can design a nice flow that will be strict
  4. Should be independent from the message source as long as it responds to .topic
  5. Should take over the responsibility of managing topics from controllers and groups

action specification for single topics

We could add an optional support for specifying an action that should be taken on a single topic, so we would could easier build a more restful controllers

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.