Giter VIP home page Giter VIP logo

racecar's Introduction

Racecar

Racecar is a friendly and easy-to-approach Kafka consumer framework. It allows you to write small applications that process messages stored in Kafka topics while optionally integrating with your Rails models.

The framework is based on rdkafka-ruby, which, when used directly, can be a challenge: it's a flexible library with lots of knobs and options. Most users don't need that level of flexibility, though. Racecar provides a simple and intuitive way to build and configure Kafka consumers.

NOTE: Racecar requires Kafka 0.10 or higher.

Table of content

  1. Installation
  2. Usage
    1. Creating consumers
    2. Running consumers
    3. Producing messages
    4. Configuration
    5. Testing consumers
    6. Deploying consumers
    7. Handling errors
    8. Logging
    9. Operations
    10. Upgrading from v1 to v2
    11. Compression
  3. Development
  4. Contributing
  5. Support and Discussion
  6. Copyright and license

Installation

Add this line to your application's Gemfile:

gem 'racecar'

And then execute:

$ bundle

Or install it yourself as:

$ gem install racecar

Then execute (if you're in a Rails application):

$ bundle exec rails generate racecar:install

This will add a config file in config/racecar.yml.

Usage

Racecar is built for simplicity of development and operation. First, a short introduction to the Kafka consumer concept as well as some basic background on Kafka.

Kafka stores messages in so-called partitions which are grouped into topics. Within a partition, each message gets a unique offset.

In Kafka, consumer groups are sets of processes that collaboratively process messages from one or more Kafka topics; they divide up the topic partitions amongst themselves and make sure to reassign the partitions held by any member of the group that happens to crash or otherwise becomes unavailable, thus minimizing the risk of disruption. A consumer in a group is responsible for keeping track of which messages in a partition it has processed – since messages are processed in-order within a single partition, this means keeping track of the offset into the partition that has been processed. Consumers periodically commit these offsets to the Kafka brokers, making sure that another consumer can resume from those positions if there is a crash.

Creating consumers

A Racecar consumer is a simple Ruby class that inherits from Racecar::Consumer:

class UserBanConsumer < Racecar::Consumer
  subscribes_to "user_banned"

  def process(message)
    data = JSON.parse(message.value)
    user = User.find(data["user_id"])
    user.banned = true
    user.save!
  end
end

In order to create your own consumer, run the Rails generator racecar:consumer:

$ bundle exec rails generate racecar:consumer TapDance

This will create a file at app/consumers/tap_dance_consumer.rb which you can modify to your liking. Add one or more calls to subscribes_to in order to have the consumer subscribe to Kafka topics.

Now run your consumer with bundle exec racecar TapDanceConsumer.

Note: if you're not using Rails, you'll have to add the file yourself. No-one will judge you for copy-pasting it.

Running consumers in parallel (experimental)

Warning - limited battle testing in production environments; use at your own risk!

If you want to process different partitions in parallel, and don't want to deploy a number of instances matching the total partitions of the topic, you can specify the number of workers to spin up - that number of processes will be forked, and each will register its own consumer in the group. Some things to note:

  • This would make no difference on a single partitioned topic - only one consumer would ever be assigned a partition. A couple of example configurations to process all partitions in parallel (we'll assume a 15 partition topic):
    • Parallel workers set to 3, 5 separate instances / replicas running in your container orchestrator
    • Parallel workers set to 5, 3 separate instances / replicas running in your container orchestrator
  • Since we're forking new processes, the memory demands are a little higher
    • From some initial testing, running 5 parallel workers requires no more than double the memory of running a Racecar consumer without parallelism.

The number of parallel workers is configured per consumer class; you may only want to take advantage of this for busier consumers:

class ParallelProcessingConsumer < Racecar::Consumer
  subscribes_to "some-topic"

  self.parallel_workers = 5

  def process(message)
    ...
  end
end

Initializing consumers

You can optionally add an initialize method if you need to do any set-up work before processing messages, e.g.

class PushNotificationConsumer < Racecar::Consumer
  subscribes_to "notifications"

  def initialize
    @push_service = PushService.new # pretend this exists.
  end

  def process(message)
    data = JSON.parse(message.value)

    @push_service.notify!(
      recipient: data.fetch("recipient"),
      notification: data.fetch("notification"),
    )
  end
end

This is useful to do any one-off work that you wouldn't want to do for each and every message.

Setting the starting position

When a consumer is started for the first time, it needs to decide where in each partition to start. By default, it will start at the beginning, meaning that all past messages will be processed. If you want to instead start at the end of each partition, change your subscribes_to like this:

subscribes_to "some-topic", start_from_beginning: false

Note that once the consumer has started, it will commit the offsets it has processed until and in the future will resume from those.

Processing messages in batches

If you want to process whole batches of messages at a time, simply rename your #process method to #process_batch. The method will now be called with an array of message objects:

class ArchiveEventsConsumer < Racecar::Consumer
  subscribes_to "events"

  def process_batch(messages)
    file_name = [
      messages.first.topic, # the topic this batch of messages came from.
      messages.first.partition, # the partition this batch of messages came from.
      messages.first.offset, # offset of the first message in the batch.
      messages.last.offset, # offset of the last message in the batch.
    ].join("-")

    File.open(file_name, "w") do |file|
      # the messages in the batch.
      messages.each do |message|
        file << message.value
      end
    end
  end
end

An important detail is that, if an exception is raised while processing a batch, the whole batch is re-processed.

Message headers

Any headers set on the message will be available when consuming the message:

message.headers #=> { "Header-A" => 42, ... }

Long-running message processing

In order to avoid your consumer being kicked out of its group during long-running message processing operations, you'll need to let Kafka regularly know that the consumer is still healthy. There's two mechanisms in place to ensure that:

Heartbeats: They are automatically sent in the background and ensure the broker can still talk to the consumer. This will detect network splits, ungraceful shutdowns, etc.

Message Fetch Interval: Kafka expects the consumer to query for new messages within this time limit. This will detect situations with slow IO or the consumer being stuck in an infinite loop without making actual progress. This limit applies to a whole batch if you do batch processing. Use max_poll_interval to increase the default 5 minute timeout, or reduce batching with fetch_messages.

Tearing down resources when stopping

When a Racecar consumer shuts down, it gets the opportunity to tear down any resources held by the consumer instance. For example, it may make sense to close any open files or network connections. Doing so is simple: just implement a #teardown method in your consumer class and it will be called during the shutdown procedure.

class ArchiveConsumer < Racecar::Consumer
  subscribes_to "events"

  def initialize
    @file = File.open("archive", "a")
  end

  def process(message)
    @file << message.value
  end

  def teardown
    @file.close
  end
end

Running consumers

Racecar is first and foremost an executable consumer runner. The racecar executable takes as argument the name of the consumer class that should be run. Racecar automatically loads your Rails application before starting, and you can load any other library you need by passing the --require flag, e.g.

$ bundle exec racecar --require dance_moves TapDanceConsumer

The first time you execute racecar with a consumer class a consumer group will be created with a group id derived from the class name (this can be configured). If you start racecar with the same consumer class argument multiple times, the processes will join the existing group – even if you start them on other nodes. You will typically want to have at least two consumers in each of your groups – preferably on separate nodes – in order to deal with failures.

Producing messages

Consumers can produce messages themselves, allowing for powerful stream processing applications that transform and filter message streams. The API for this is simple:

class GeoCodingConsumer < Racecar::Consumer
  subscribes_to "pageviews"

  def process(message)
    pageview = JSON.parse(message.value)
    ip_address = pageview.fetch("ip_address")

    country = GeoCode.country(ip_address)

    # Enrich the original message:
    pageview["country"] = country

    # The `produce` method enqueues a message to be delivered after #process
    # returns. It won't actually deliver the message.
    produce(JSON.dump(pageview), topic: "pageviews-with-country", key: pageview["id"])
  end
end

The deliver! method can be used to block until the broker received all queued published messages (according to the publisher ack settings). This will automatically being called in the shutdown procedure of a consumer.

You can set message headers by passing a headers: option with a Hash of headers.

Standalone Producer

Racecar provides a standalone producer to publish messages to Kafka directly from your Rails application:

# app/controllers/comments_controller.rb
class CommentsController < ApplicationController
  def create
    @comment = Comment.create!(params)

    # This will publish a JSON representation of the comment to the `comments` topic
    # in Kafka. Make sure to create the topic first, or this may fail.
    Racecar.produce_sync(value:comment.to_json, topic: "comments")
  end
end

The above example will block the server process until the message has been delivered. If you want deliveries to happen in the background in order to free up your server processes more quickly, call #deliver_async instead:

# app/controllers/comments_controller.rb
class CommentsController < ApplicationController
  def show
    @comment = Comment.find(params[:id])

    event = {
      name: "comment_viewed",
      data: {
        comment_id: @comment.id,
        user_id: current_user.id
      }
    }

    # By delivering messages asynchronously you free up your server processes faster.
    Racecar.produce_async(value: event.to_json, topic: "activity")
  end
end

In addition to improving response time, delivering messages asynchronously also protects your application against Kafka availability issues -- if messages cannot be delivered, they'll be buffered for later and retried automatically.

A third method is to produce messages first (without delivering the messages to Kafka yet), and deliver them synchronously later:

 # app/controllers/comments_controller.rb
 class CommentsController < ApplicationController
   def create
     @comment = Comment.create!(params)

     event = {
       name: "comment_created",
       data: {
         comment_id: @comment.id
         user_id: current_user.id
       }
     }

     # This will queue the two messages in the internal buffer and block server process until they are delivered.
    Racecar.wait_for_delivery do
      Racecar.produce_async(comment.to_json, topic: "comments")
      Racecar.produce_async(event.to_json, topic: "activity")
    end
   end
 end

Configuration

Racecar provides a flexible way to configure your consumer in a way that feels at home in a Rails application. If you haven't already, run bundle exec rails generate racecar:install in order to generate a config file. You'll get a separate section for each Rails environment, with the common configuration values in a shared common section.

Note: many of these configuration keys correspond directly to similarly named concepts in rdkafka-ruby; for more details on low-level operations, read that project's documentation.

It's also possible to configure Racecar using environment variables. For any given configuration key, there should be a corresponding environment variable with the prefix RACECAR_, in upper case. For instance, in order to configure the client id, set RACECAR_CLIENT_ID=some-id in the process in which the Racecar consumer is launched. You can set brokers by passing a comma-separated list, e.g. RACECAR_BROKERS=kafka1:9092,kafka2:9092,kafka3:9092.

Finally, you can configure Racecar directly in Ruby. The file config/racecar.rb will be automatically loaded if it exists; in it, you can configure Racecar using a simple API:

Racecar.configure do |config|
  # Each config variable can be set using a writer attribute.
  config.brokers = ServiceDiscovery.find("kafka-brokers")
end

Basic configuration

  • brokers – A list of Kafka brokers in the cluster that you're consuming from. Defaults to localhost on port 9092, the default Kafka port.
  • client_id – A string used to identify the client in logs and metrics.
  • group_id – The group id to use for a given group of consumers. Note that this must be different for each consumer class. If left blank a group id is generated based on the consumer class name such that (for example) a consumer with the class name BaconConsumer would default to a group id of bacon-consumer.
  • group_id_prefix – A prefix used when generating consumer group names. For instance, if you set the prefix to be kevin. and your consumer class is named BaconConsumer, the resulting consumer group will be named kevin.bacon-consumer.

Batches

  • fetch_messages - The number of messages to fetch in a single batch. This can be set on a per consumer basis.

Logging

  • logfile – A filename that log messages should be written to. Default is nil, which means logs will be written to standard output.
  • log_level – The log level for the Racecar logs, one of debug, info, warn, or error. Default is info.

Consumer checkpointing

The consumers will checkpoint their positions from time to time in order to be able to recover from failures. This is called committing offsets, since it's done by tracking the offset reached in each partition being processed, and committing those offset numbers to the Kafka offset storage API. If you can tolerate more double-processing after a failure, you can increase the interval between commits in order to better performance. You can also do the opposite if you prefer less chance of double-processing.

  • offset_commit_interval – How often to save the consumer's position in Kafka. Default is every 10 seconds.

Timeouts & intervals

All timeouts are defined in number of seconds.

  • session_timeout – The idle timeout after which a consumer is kicked out of the group. Consumers must send heartbeats with at least this frequency.
  • heartbeat_interval – How often to send a heartbeat message to Kafka.
  • max_poll_interval – The maximum time between two message fetches before the consumer is kicked out of the group. Put differently, your (batch) processing must finish earlier than this.
  • pause_timeout – How long to pause a partition for if the consumer raises an exception while processing a message. Default is to pause for 10 seconds. Set this to 0 in order to disable automatic pausing of partitions or to -1 to pause indefinitely.
  • pause_with_exponential_backoff – Set to true if you want to double the pause_timeout on each consecutive failure of a particular partition.
  • socket_timeout – How long to wait when trying to communicate with a Kafka broker. Default is 30 seconds.
  • max_wait_time – How long to allow the Kafka brokers to wait before returning messages. A higher number means larger batches, at the cost of higher latency. Default is 1 second.
  • message_timeout – How long to try to deliver a produced message before finally giving up. Default is 5 minutes. Transient errors are automatically retried. If a message delivery fails, the current read message batch is retried.
  • statistics_interval – How frequently librdkafka should publish statistics about its consumers and producers; you must also add a statistics_callback method to your processor, otherwise the stats are disabled. The default is 1 second, however this can be quite memory hungry, so you may want to tune this and monitor.

Memory & network usage

Kafka is really good at throwing data at consumers, so you may want to tune these variables in order to avoid ballooning your process' memory or saturating your network capacity.

Racecar uses rdkafka-ruby under the hood, which fetches messages from the Kafka brokers in a background thread. This thread pushes fetch responses, possible containing messages from many partitions, into a queue that is read by the processing thread (AKA your code). The main way to control the fetcher thread is to control the size of those responses and the size of the queue.

  • max_bytes — Maximum amount of data the broker shall return for a Fetch request.
  • min_message_queue_size — The minimum number of messages in the local consumer queue.

The memory usage limit is roughly estimated as max_bytes * min_message_queue_size, plus whatever your application uses.

SSL encryption, authentication & authorization

  • security_protocol – Protocol used to communicate with brokers (:ssl)
  • ssl_ca_location – File or directory path to CA certificate(s) for verifying the broker's key
  • ssl_crl_location – Path to CRL for verifying broker's certificate validity
  • ssl_keystore_location – Path to client's keystore (PKCS#12) used for authentication
  • ssl_keystore_password – Client's keystore (PKCS#12) password
  • ssl_certificate_location – Path to the certificate
  • ssl_key_location – Path to client's certificate used for authentication
  • ssl_key_password – Client's certificate password

SASL encryption, authentication & authorization

Racecar has support for using SASL to authenticate clients using either the GSSAPI or PLAIN mechanism either via plaintext or SSL connection.

  • security_protocol – Protocol used to communicate with brokers (:sasl_plaintext :sasl_ssl)

  • sasl_mechanism – SASL mechanism to use for authentication (GSSAPI PLAIN SCRAM-SHA-256 SCRAM-SHA-512)

  • sasl_kerberos_principal – This client's Kerberos principal name

  • sasl_kerberos_kinit_cmd – Full kerberos kinit command string, %{config.prop.name} is replaced by corresponding config object value, %{broker.name} returns the broker's hostname

  • sasl_kerberos_keytab – Path to Kerberos keytab file. Uses system default if not set

  • sasl_kerberos_min_time_before_relogin – Minimum time in milliseconds between key refresh attempts

  • sasl_username – SASL username for use with the PLAIN and SASL-SCRAM-.. mechanism

  • sasl_password – SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism

Producing messages

These settings are related to consumers that produce messages to Kafka.

  • partitioner – The strategy used to determine which topic partition a message is written to when Racecar produces a value to Kafka. The codec needs to be one of consistent, consistent_random murmur2 murmur2_random fnv1a fnv1a_random either as a Symbol or a String, defaults to consistent_random
  • producer_compression_codec – If defined, Racecar will compress messages before writing them to Kafka. The codec needs to be one of gzip, lz4, or snappy, either as a Symbol or a String.

Datadog monitoring

Racecar supports Datadog monitoring integration. If you're running a normal Datadog agent on your host, you just need to set datadog_enabled to true, as the rest of the settings come with sane defaults.

  • datadog_enabled – Whether Datadog monitoring is enabled (defaults to false).
  • datadog_host – The host running the Datadog agent.
  • datadog_port – The port of the Datadog agent.
  • datadog_socket_path – The unix domain socket of the Datadog agent (when set takes precedence over host/port).
  • datadog_namespace – The namespace to use for Datadog metrics.
  • datadog_tags – Tags that should always be set on Datadog metrics.

Furthermore, there's a standard Datadog dashboard configuration file that you can import to get started with a Racecar dashboard for all of your consumers.

Consumers Without Rails

By default, if Rails is detected, it will be automatically started when the consumer is started. There are cases where you might not want or need Rails. You can pass the --without-rails option when starting the consumer and Rails won't be started.

Testing consumers

Since consumers are merely classes that implement a simple interface, they're dead simple to test.

Here's an example of testing a consumer class using RSpec and Rails:

# app/consumers/create_contacts_consumer.rb
#
# Creates a Contact whenever an email address is written to the
# `email-addresses` topic.
class CreateContactsConsumer < Racecar::Consumer
  subscribes_to "email-addresses"

  def process(message)
    email = message.value

    Contact.create!(email: email)
  end
end

# spec/consumers/create_contacts_consumer_spec.rb
describe CreateContactsConsumer do
  it "creates a Contact for each email address in the topic" do
    message = double("message", value: "[email protected]")
    consumer = CreateContactsConsumer.new

    consumer.process(message)

    expect(Contact.where(email: "[email protected]")).to exist
  end
end

Deploying consumers

If you're already deploying your Rails application using e.g. Capistrano, all you need to do to run your Racecar consumers in production is to have some process supervisor start the processes and manage them for you.

Foreman is a very straightford tool for interfacing with several process supervisor systems. You define your process types in a Procfile, e.g.

racecar-process-payments: bundle exec racecar ProcessPaymentsConsumer
racecar-resize-images: bundle exec racecar ResizeImagesConsumer

If you've ever used Heroku you'll recognize the format – indeed, deploying to Heroku should just work if you add Racecar invocations to your Procfile and enable the Heroku integration

With Foreman, you can easily run these processes locally by executing foreman run; in production you'll want to export to another process management format such as Upstart or Runit. capistrano-foreman allows you to do this with Capistrano.

Deploying to Kubernetes

If you run your applications in Kubernetes, use the following Deployment spec as a starting point:

Recreate Strategy
apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-racecar-deployment
  labels:
    app: my-racecar
spec
  replicas: 4 # <-- this is a good value if you have a multliple of 4 partitions
  selector:
    matchLabels:
      app: my-racecar
  strategy:
    type: Recreate # <-- this is the important part.
  template:
    metadata:
      labels:
        app: my-racecar
    spec:
      containers:
        - name: my-racecar
          image: my-racecar-image
          command: ["bundle", "exec", "racecar", "MyConsumer"]
          env: # <-- you can configure the consumer using environment variables!
            - name: RACECAR_BROKERS
              value: kafka1,kafka2,kafka3
            - name: RACECAR_OFFSET_COMMIT_INTERVAL
              value: 5

This configuration uses the recreate strategy which completely terminates all consumers before starting new ones. It's simple and easy to understand but can result in significant 'downtime' where no messages are processed.

Rolling Updates and 'sticky-cooperative' Assignment

A newer alternative is to use the consumer's "cooperative-sticky" assignment strategy which allows healthy consumers to keep processing their partitions while others are terminated. This can be combined with a restricted rolling update to minimize processing downtime.

Add to your Racecar config:

Racecar.configure do |c|
  c.partition_assignment_strategy = "cooperative-sticky"
end

Replace the Kubernetes deployment strategy with:

  strategy:
    type: RollingUpdate
    rollingUpdate:
        maxSurge: 0 # <- Never boot an excess consumer
        maxUnavailable: 1 # <- The deploy 'rolls' one consumer at a time

These two configurations should be deployed together.

While maxSurge should always be 0, maxUnavailable can be increased to reduce deployment times in exchange for longer pauses in message processing.

Liveness Probe

Racecar comes with a built-in liveness probe, primarily for use with Kubernetes, but useful for any deployment environment where you can periodically run a process to check the health of your consumer.

To use this feature:

  • set the liveness_probe_enabled config option to true.
  • configure your Kubernetes deployment to run $ racecarctl liveness_probe

When enabled (see config) Racecar will touch the file at liveness_probe_file_path each time it finishes polling Kafka and processing the messages in the batch (if any).

The modified time of this file can be observed to determine when the consumer last exhibited 'liveness'.

Running racecarctl liveness_probe will return a successful exit status if the last 'liveness' event happened within an acceptable time, liveness_probe_max_interval.

liveness_probe_max_interval should be long enough to account for both the Kafka polling time of max_wait_time and the processing time of a full message batch.

On receiving SIGTERM, Racecar will gracefully shut down and delete this file, causing the probe to fail immediately after exit.

You may wish to tolerate more than one failed probe run to accommodate for environmental variance and clock changes.

See the Configuration section for the various ways the liveness probe can be configured, environment variables being one option.

Here is an example Kubernetes liveness probe configuration:

apiVersion: apps/v1
kind: Deployment
spec:
  template:
    spec:
      containers:
      - name: consumer

        args:
        - racecar
        - SomeConsumer

        env:
        - name: RACECAR_LIVENESS_PROBE_ENABLED
          value: "true"

        livenessProbe:
          exec:
            command:
            - racecarctl
            - liveness_probe

          # Allow up to 10 consecutive failures before terminating Pod:
          failureThreshold: 10

          # Wait 30 seconds before starting the probes:
          initialDelaySeconds: 30

          # Perform the check every 10 seconds:
          periodSeconds: 10

Deploying to Heroku

If you run your applications in Heroku and/or use the Heroku Kafka add-on, you application will be provided with 4 ENV variables that allow connecting to the cluster: KAFKA_URL, KAFKA_TRUSTED_CERT, KAFKA_CLIENT_CERT, and KAFKA_CLIENT_CERT_KEY.

Racecar has a built-in helper for configuring your application based on these variables – just add require "racecar/heroku" and everything should just work.

Please note aliasing the Heroku Kafka add-on will break this integration. If you have a need to do that, please ask on the discussion board.

# This takes care of setting up your consumer based on the ENV
# variables provided by Heroku.
require "racecar/heroku"

class SomeConsumer < Racecar::Consumer
  # ...
end

Running consumers in the background

While it is recommended that you use a process supervisor to manage the Racecar consumer processes, it is possible to daemonize the Racecar processes themselves if that is more to your liking. Note that this support is currently in alpha, as it hasn't been tested extensively in production settings.

In order to daemonize Racecar, simply pass in --daemonize when executing the command:

$ bundle exec racecar --daemonize ResizeImagesConsumer

This will start the consumer process in the background. A file containing the process id (the "pidfile") will be created, with the file name being constructed from the consumer class name. If you want to specify the name of the pidfile yourself, pass in --pidfile=some-file.pid.

Since the process is daemonized, you need to know the process id (PID) in order to be able to stop it. Use the racecarctl command to do this:

$ bundle exec racecarctl stop --pidfile=some-file.pid

Again, the recommended approach is to manage the processes using process managers. Only do this if you have to.

Handling errors

When processing messages from a Kafka topic, your code may encounter an error and raise an exception. The cause is typically one of two things:

  1. The message being processed is somehow malformed or doesn't conform with the assumptions made by the processing code.
  2. You're using some external resource such as a database or a network API that is temporarily unavailable.

In the first case, you'll need to either skip the message or deploy a new version of your consumer that can correctly handle the message that caused the error. In order to skip a message, handle the relevant exception in your #process method:

def process(message)
  data = JSON.parse(message.value)
  # ...
rescue JSON::ParserError => e
  puts "Failed to process message in #{message.topic}/#{message.partition} at offset #{message.offset}: #{e}"
  # It's probably a good idea to report the exception to an exception tracker service.
end

Since the exception is handled by your #process method and is no longer raised, Racecar will consider the message successfully processed. Tracking these errors in an exception tracker or some other monitoring system is highly recommended, as you otherwise will have little insight into how many messages are being skipped this way.

If, on the other hand, the exception was cause by a temporary network or database problem, you will probably want to retry processing of the message after some time has passed. By default, if an exception is raised by the #process method, the consumer will pause all processing of the message's partition for some number of seconds, configured by setting the pause_timeout configuration variable. This allows the consumer to continue processing messages from other partitions that may not be impacted by the problem while still making sure to not drop the original message. Since messages in a single Kafka topic partition must be processed in order, it's not possible to keep processing other messages in that partition.

In addition to retrying the processing of messages, Racecar also allows defining an error handler callback that is invoked whenever an exception is raised by your #process method. This allows you to track and report errors to a monitoring system:

Racecar.config.on_error do |exception, info|
  MyErrorTracker.report(exception, {
    topic: info[:topic],
    partition: info[:partition],
    offset: info[:offset],
  })
end

It is highly recommended that you set up an error handler. Please note that the info object contains different keys and values depending on whether you are using process or process_batch. See the instrumentation_payload object in the process and process_batch methods in the Runner class for the complete list.

Errors related to Compression

A sample error might look like this:

E, [2022-10-09T11:28:29.976548 #15] ERROR -- : (try 5/10): Error for topic subscription #<struct Racecar::Consumer::Subscription topic="support.entity_incremental.views.view_ticket_ids", start_from_beginning=false, max_bytes_per_partition=104857, additional_config={}>: Local: Not implemented (not_implemented)

Please see Compression

Logging

By default, Racecar will log to STDOUT. If you're using Rails, your application code will use whatever logger you've configured there.

In order to make Racecar log its own operations to a log file, set the logfile configuration variable or pass --log filename.log to the racecar command.

Operations

In order to gracefully shut down a Racecar consumer process, send it the SIGTERM signal. Most process supervisors such as Runit and Kubernetes send this signal when shutting down a process, so using those systems will make things easier.

In order to introspect the configuration of a consumer process, send it the SIGUSR1 signal. This will make Racecar print its configuration to the standard error file descriptor associated with the consumer process, so you'll need to know where that is written to.

Upgrading from v1 to v2

In order to safely upgrade from Racecar v1 to v2, you need to completely shut down your consumer group before starting it up again with the v2 Racecar dependency.

Compression

Racecar v2 requires a C library (zlib) to compress the messages before producing to the topic. If not already installed on you consumer docker container, please install using following command in Dockerfile of consumer

apt-get update && apt-get install -y libzstd-dev

Development

After checking out the repo, run bin/setup to install dependencies. Then, run rspec to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.

The integration tests run against a Kafka instance that is not automatically started from within rspec. You can set one up using the provided docker-compose.yml by running docker-compose up.

Running RSpec within Docker

There can be behavioural inconsistencies between running the specs on your machine, and in the CI pipeline. Due to this, there is now a Dockerfile included in the project, which is based on the CircleCI ruby 2.7.8 image. This could easily be extended with more Dockerfiles to cover different Ruby versions if desired. In order to run the specs via Docker:

  • Uncomment the tests service from the docker-compose.yml
  • Bring up the stack with docker-compose up -d
  • Execute the entire suite with docker-compose run --rm tests bundle exec rspec
  • Execute a single spec or directory with docker-compose run --rm tests bundle exec rspec spec/integration/consumer_spec.rb

Please note - your code directory is mounted as a volume, so you can make code changes without needing to rebuild

Contributing

Bug reports and pull requests are welcome on GitHub. Feel free to join our Slack team and ask how best to contribute!

Support and Discussion

If you've discovered a bug, please file a Github issue, and make sure to include all the relevant information, including the version of Racecar, rdkafka-ruby, and Kafka that you're using.

If you have other questions, or would like to discuss best practises, or how to contribute to the project, join our Slack team!

Copyright and license

Copyright 2017 Daniel Schierbeck & Zendesk

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.

You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

racecar's People

Contributors

alanjsph avatar andrewthauer avatar apeacock1991 avatar augustocravosilva avatar bestie avatar bquorning avatar breunigs avatar codella avatar colindkelley avatar dasch avatar deepredsky avatar ellimist avatar filiptepper avatar hammadk avatar harijonnala avatar heynonster avatar ivoanjo avatar jhk avatar jonmagic avatar lairen avatar leonmaia avatar lynne-ashminov avatar mainameiz avatar malandrina avatar nadavk72 avatar olleolleolle avatar pfemo avatar sparrovv avatar staugaard avatar tjwp avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

racecar's Issues

Parallel processing of certain topics

We have some topics which we offload to sidekiq jobs for parallel processing, such as for data loading. These jobs also produce messages as data may be transformed along the way. Any thoughts to supporting parallel processing / backgrounding natively in Racecar?

It's certainly easy to kick off the worker from the consumer today, but I'm thinking this may be a common use case, and if Racecar is also able to buffer and produce messages, it would be nice to not have to mirror that functionality in a worker class.

How to read form beginning every time

I want to make a rake task in which I want my consumer to read from the beginning so that I can process the messages and store them in the database.

I went through the documentation at this point and got a variable to set.

But this works only one time, once the consumer read all the messages it commits its position and next time read from there. But in the rake task, I want whenever that task is executed the consumer reads from the beginning.

I also didn't find any way to manually set the offset of a consumer.

Thanks in advance

Allow inspecting a consumer's state

It would be nice to be able to introspect a running consumer and see e.g. what partitions were assigned to it; the number of processed messages; the number of outstanding offset commits.

One option would be to allowed sending the USR1 signal to the Racecar process, which would dump the data to STDERR. This is what Consul does.

Allow configuring a client id prefix

Rather than using a single client id for all consumers it would make sense to have a per-consumer client id. This would allow monitoring the low-level operations of each consumer separately.

In order to do this, we could allow users to configure a client id prefix and construct the client id based on the consumer name, e.g. my-prefix.my-consumer.

log writing failed. can't be called from trap context

When a racecar consumer is running and I send a SIGTERM to it. It prints the following:

Jun 11 18:55:49 ip-10-212-33-14 systemd[1]: Stopping App client consumer. Instance=1....
Jun 11 18:55:49 ip-10-212-33-14 bash[9370]: log writing failed. can't be called from trap context
Jun 11 18:55:49 ip-10-212-33-14 bash[9370]: log writing failed. can't be called from trap context
Jun 11 18:55:49 ip-10-212-33-14 bash[9370]: log writing failed. can't be called from trap context
Jun 11 18:55:49 ip-10-212-33-14 bash[9370]: log writing failed. can't be called from trap context
Jun 11 18:55:49 ip-10-212-33-14 bash[9370]: log writing failed. can't be called from trap context
Jun 11 18:55:49 ip-10-212-33-14 bash[9370]: log writing failed. can't be called from trap context
Jun 11 18:55:49 ip-10-212-33-14 systemd[1]: Stopped App client consumer. Instance=1..
Not always but sometimes.

I think this maybe because we can not use logger in signal handler https://bugs.ruby-lang.org/issues/7917, but ruby-kafka is doing this. If you follow trap signal handling by racecar you will finally land in a code in ruby-kafka that uses logger.info

Solution may be to create a new thread like karafka. But I don't like this method.

Ruby-kafka ticket ref: zendesk/ruby-kafka#598

Version of Ruby: 2.3.1
Version of Kafka: 0.11.0.1
Version of ruby-kafka: 0.5.2
Version of racecar: 0.3.7

Errors from Racecar::Cli#initialize are swallowed

For example if a "--require"d file raises an exception.

19451-revans:racecar revans$ git rev-parse HEAD
236a8872c406e4707a1c876d4a55d6ca61494f63

19451-revans:racecar revans$ > lib/foo.rb
19451-revans:racecar revans$ ruby -I lib ./exe/racecar --require foo bar
=> Starting Racecar consumer bar...
=> Crashed: NameError: wrong constant name bar
/Users/revans/git/github.com/rvedotrc/github-mirror/var/checkout/full_name/zendesk/racecar/lib/racecar/cli.rb:33:in `const_get'
/Users/revans/git/github.com/rvedotrc/github-mirror/var/checkout/full_name/zendesk/racecar/lib/racecar/cli.rb:33:in `run'
/Users/revans/git/github.com/rvedotrc/github-mirror/var/checkout/full_name/zendesk/racecar/lib/racecar/cli.rb:10:in `main'
./exe/racecar:9:in `<main>'

19451-revans:racecar revans$ echo 'raise' > lib/foo.rb
19451-revans:racecar revans$ ruby -I lib ./exe/racecar --require foo bar
19451-revans:racecar revans$ echo $?
1

Reading message value fails in process_batch

On 2.0.0.alpha3, accessing a message.value inside process_batch crashes with the following error:

warning: Racecar::Message#value at /usr/local/lib/ruby/2.6.0/forwardable.rb:158 forwarding to private method Rdkafka::Consumer::Message#value
undefined method `value' for #\u003cRdkafka::Consumer::Message:0x000055aee47b0c80\u003e

mark_as_processed false

Incase of any exceptions from connecting to any server or error, how to make the message as unprocessed?

Slow startup troubleshooting

Hi there, thanks so much for this gem!

I'm having an issue where in my staging environment, racecar takes forever to start up. I see these lines:

=> Starting Racecar consumer DataSyncConsumer...
=> Detected Rails, booting application...

and then a LONG wait (7+ minutes) before I see the VROOOM message and messages start getting handled.

Some additional factors that might play into this:

  • Behavior happens in staging where we are connecting to Kafka in confluent cloud
  • I do not see it in development mode connecting to a local Kafka broker

Any chance you might have some insight as to what's taking my consumer so long to start up?

3.8.0 don't have support for sasl scram ?

Hello,

Five days ago you updated racecar to 3.8.0 and adding ruby-kafka 0.5 in dependencies, with sasl scram feature.

I has updated racecar to version 3.8.0 via bundle update, but it still don't have ruby-kafka or scram feature. Is it normal ?

Thanks.

Connection error while fetching messages: Connection error Errno::EBADF: Bad file descriptor

Simply start a consumer. Lets say for the sake of simplicity that there is nothing to consume. When I shutdown a consumer I get the following error in my logs.

D, [2018-06-13T11:01:18.380510 #23536] DEBUG -- : Closing socket to x.x.x.x:9092
D, [2018-06-13T11:01:18.380584 #23536] DEBUG -- : Closing socket to x.x.x.x:9092
E, [2018-06-13T11:01:18.380668 #23536] ERROR -- : Connection error while fetching messages: Connection error Errno::EBADF: Bad file descriptor

Version of Ruby:2.3.1
Version of Kafka:0.5.2
Version of ruby-kafka:0.3.7

Thread deadlocks?

When running under ruby 2.4.4 (and any other ruby I tried, 2.3.7, others) and the ruby application spins up threads at application start, I have noticed that after awhile the threads that were created at startup eventually deadlock.

For example, our rails app opens a websocket. We read from Kafka and inject into the websocket using racecar. This websocket has a thread on it that does ping/pongs to the server. The connection was constantly dropping because the ping/pong thread would fire a sleep() and seize up.

This happens only when running with --daemonize. Running without it works fine. I suspect that somehow the forking process is interferring with the Thread, but I can't prove this yet.

Any thoughts?

Configuring logger in Racecar.configure block

Hello!

Thank you for developing Racecar, it is very useful for us.
But when configuring it for one of our applications I faced the following:

Racecar.configure do |config|
  config.brokers = [...]
  config.group_id = "my_shiny_group_id"
  config.logger = MyLogger.new # here is the problem
end
Racecar.logger = MyLogger.new # that's the solution

I couldn't configure the logger object inside the configure block. I had to set it separately from all other things. I looked into the code and found out that that's because of KingKonf, that does not allow "fancy structures" (like the logger instance, for example). I was wondering, is this by design or it could be fixed (I'm ready to make a PR)?

Feature request: Connection error handling

Currently racecar will not call error handler when can not connect to kafka (dns error, host not reachable, open connection error, etc...)

Temporary monkey patch:

Racecar::Runner.class_eval do
  alias_method :run_origin, :run

  def run
    run_origin
  rescue Kafka::ConnectionError => error
    config.error_handler.call(error)
    raise error
  end
end

How to commit offset manually?

Is there a way to commit offset manually from Consumer?

I don't want any automated offset committing for one particular case.

Over Reads on a Topic

Hi,

I started a consumer subscribing to an empty topic. Then with extreme speed dumped 37869 messages in a that clean topic. My consumer side by side started fetching. Looking at the log shows that it over processed. It processed till 37904 and then reset back to the correct one. Why did this happen? And this time my count of processes also reduced to 37756 instead of 37869.

Log:


D, [2018-02-01T19:20:55.952360 #13490] DEBUG -- : Marking users/0:37990 as processed
D, [2018-02-01T19:20:55.953482 #13490] DEBUG -- : Marking users/0:37868 as processed
D, [2018-02-01T19:20:55.956459 #13490] DEBUG -- : Marking users/0:37869 as processed
D, [2018-02-01T19:20:55.957973 #13490] DEBUG -- : Marking users/0:37870 as processed
D, [2018-02-01T19:20:55.959402 #13490] DEBUG -- : Marking users/0:37871 as processed
D, [2018-02-01T19:20:55.960880 #13490] DEBUG -- : Marking users/0:37872 as processed
D, [2018-02-01T19:20:55.962321 #13490] DEBUG -- : Marking users/0:37873 as processed
D, [2018-02-01T19:20:55.963735 #13490] DEBUG -- : Marking users/0:37874 as processed
D, [2018-02-01T19:20:55.965176 #13490] DEBUG -- : Marking users/0:37875 as processed
D, [2018-02-01T19:20:55.966594 #13490] DEBUG -- : Marking users/0:37876 as processed
D, [2018-02-01T19:20:55.968016 #13490] DEBUG -- : Marking users/0:37877 as processed
D, [2018-02-01T19:20:55.969427 #13490] DEBUG -- : Marking users/0:37878 as processed
D, [2018-02-01T19:20:55.970775 #13490] DEBUG -- : Marking users/0:37879 as processed
D, [2018-02-01T19:20:55.972231 #13490] DEBUG -- : Marking users/0:37880 as processed
D, [2018-02-01T19:20:55.975846 #13490] DEBUG -- : Marking users/0:37881 as processed
D, [2018-02-01T19:20:55.978254 #13490] DEBUG -- : Marking users/0:37882 as processed
D, [2018-02-01T19:20:55.980070 #13490] DEBUG -- : Marking users/0:37883 as processed
D, [2018-02-01T19:20:55.981492 #13490] DEBUG -- : Marking users/0:37884 as processed
D, [2018-02-01T19:20:55.982938 #13490] DEBUG -- : Marking users/0:37885 as processed
D, [2018-02-01T19:20:55.984352 #13490] DEBUG -- : Marking users/0:37886 as processed
D, [2018-02-01T19:20:55.985751 #13490] DEBUG -- : Marking users/0:37887 as processed
D, [2018-02-01T19:20:55.987158 #13490] DEBUG -- : Marking users/0:37888 as processed
D, [2018-02-01T19:20:55.988612 #13490] DEBUG -- : Marking users/0:37889 as processed
D, [2018-02-01T19:20:55.990045 #13490] DEBUG -- : Marking users/0:37890 as processed
D, [2018-02-01T19:20:55.991432 #13490] DEBUG -- : Marking users/0:37891 as processed
D, [2018-02-01T19:20:55.992823 #13490] DEBUG -- : Marking users/0:37892 as processed
D, [2018-02-01T19:20:55.994276 #13490] DEBUG -- : Marking users/0:37893 as processed
D, [2018-02-01T19:20:55.995800 #13490] DEBUG -- : Marking users/0:37894 as processed
D, [2018-02-01T19:20:56.004625 #13490] DEBUG -- : Marking users/0:37895 as processed
D, [2018-02-01T19:20:56.007385 #13490] DEBUG -- : Marking users/0:37896 as processed
D, [2018-02-01T19:20:56.008876 #13490] DEBUG -- : Marking users/0:37897 as processed
D, [2018-02-01T19:20:56.010345 #13490] DEBUG -- : Marking users/0:37898 as processed
D, [2018-02-01T19:20:56.012165 #13490] DEBUG -- : Marking users/0:37899 as processed
D, [2018-02-01T19:20:56.014249 #13490] DEBUG -- : Marking users/0:37900 as processed
D, [2018-02-01T19:20:56.016525 #13490] DEBUG -- : Marking users/0:37901 as processed
D, [2018-02-01T19:20:56.017950 #13490] DEBUG -- : Marking users/0:37902 as processed
D, [2018-02-01T19:20:56.019486 #13490] DEBUG -- : Marking users/0:37903 as processed
D, [2018-02-01T19:20:56.021068 #13490] DEBUG -- : Marking users/0:37904 as processed
D, [2018-02-01T19:20:56.021225 #13490] DEBUG -- : Fetching batch from users/0 starting at offset 37905
D, [2018-02-01T19:20:56.021392 #13490] DEBUG -- : Sending fetch API request 35 to 10.10.10.10:9092
D, [2018-02-01T19:20:56.021595 #13490] DEBUG -- : Waiting for response 35 from 10.10.10.10:9092
D, [2018-02-01T19:20:56.022878 #13490] DEBUG -- : Received response 35 from 10.10.10.10:9092
E, [2018-02-01T19:20:56.024361 #13490] ERROR -- : Invalid offset for users/0, resetting to default offset
D, [2018-02-01T19:20:56.024567 #13490] DEBUG -- : Sending list_offset API request 36 to 10.10.10.10:9092

Consumer:


class TestConsumer < Racecar::Consumer
  subscribes_to 'users', start_from_beginning: false

  def process(message)
    count = Rails.cache.read("count")

    unless count
      count = 0
    end

    Rails.cache.write("count", count + 1)
  end
end

Versions:

  • Racecar: 0.3.5, 0.3.7 [tried on both]
  • Ruby Kafka: 0.5.1
  • Ruby: 2.3.1

No long able to use infinite pause.

Ruby Kafka

def pause(topic, partition, timeout: nil)
      @paused_partitions[topic] ||= {}
      @paused_partitions[topic][partition] = timeout && Time.now + timeout
end

    def paused?(topic, partition)
      partitions = @paused_partitions.fetch(topic, {})

      if partitions.key?(partition)
        # Users can set an optional timeout, after which the partition is
        # automatically resumed. When pausing, the timeout is translated to an
        # absolute point in time.
        timeout = partitions.fetch(partition)

        if timeout.nil?
          true
        elsif Time.now < timeout
          true
        else
          @logger.info "Automatically resuming partition #{topic}/#{partition}, pause timeout expired"
          resume(topic, partition)
          false
        end
      end
    end

As you can see that we may choose to pass nil in case we like infinite pause. But in racecar its not allowed due to following line and KingConf. How can we fix this?

float :pause_timeout, default: 10

ruby-kafka: 0.5.2
racecar: 0.3.7
ruby: 2.3.1

Load an optional Ruby config file on startup

Some configuration, such as an error handler, cannot be set through YAML or the ENV. In Rails, we might use an initializer, although that would mean that the code would also be executed for non-Racecar processes (sad!)

A better solution would be to have a Ruby file named by convention, e.g. config/racecar.rb that, if present, would be automatically required before startup. This can already be accomplished by running Racecar with the --require flag, e.g. racecar --require config/racecar MyConsumer, but that's tedious and unintuitive.

Reprocessing of complete queue in case of start_from_beginning: true

When using start_from_beginning: true there is an issue. Once the complete queue is consumed, it starts consumption from the very start. This goes on over and over.

Logs:

D, [2018-01-04T17:16:01.990400 #97361] DEBUG -- : Marking citruspay_usr_cpuser_detail/1:8409 as processed
D, [2018-01-04T17:16:01.990637 #97361] DEBUG -- : Fetching batch from citruspay_usr_cpuser_detail/1 starting at offset 8410
D, [2018-01-04T17:16:01.990790 #97361] DEBUG -- : Sending fetch API request 82 to 10.100.93.176:9092
D, [2018-01-04T17:16:01.990966 #97361] DEBUG -- : Waiting for response 82 from 10.100.93.176:9092
D, [2018-01-04T17:16:01.992103 #97361] DEBUG -- : Received response 82 from 10.100.93.176:9092
E, [2018-01-04T17:16:01.992844 #97361] ERROR -- : Invalid offset for citruspay_usr_cpuser_detail/1, resetting to default offset
D, [2018-01-04T17:16:01.993055 #97361] DEBUG -- : Sending list_offset API request 83 to 10.100.93.176:9092
D, [2018-01-04T17:16:01.993245 #97361] DEBUG -- : Waiting for response 83 from 10.100.93.176:9092
D, [2018-01-04T17:16:01.995727 #97361] DEBUG -- : Received response 83 from 10.100.93.176:9092
D, [2018-01-04T17:16:01.995975 #97361] DEBUG -- : Fetching batch from citruspay_usr_cpuser_detail/1 starting at offset 0
D, [2018-01-04T17:16:01.996267 #97361] DEBUG -- : Sending fetch API request 84 to 10.100.93.176:9092
D, [2018-01-04T17:16:01.996970 #97361] DEBUG -- : Waiting for response 84 from 10.100.93.176:9092
D, [2018-01-04T17:16:03.055945 #97361] DEBUG -- : Received response 84 from 10.100.93.176:9092
D, [2018-01-04T17:16:03.110114 #97361] DEBUG -- : Marking citruspay_usr_cpuser_detail/1:26 as processed
I, [2018-01-04T17:16:03.110237 #97361]  INFO -- : Sending heartbeat...
D, [2018-01-04T17:16:03.110406 #97361] DEBUG -- : Sending heartbeat API request 85 to 10.100.93.176:9092
D, [2018-01-04T17:16:03.110886 #97361] DEBUG -- : Waiting for response 85 from 10.100.93.176:9092
D, [2018-01-04T17:16:03.112038 #97361] DEBUG -- : Received response 85 from 10.100.93.176:9092
D, [2018-01-04T17:16:03.122607 #97361] DEBUG -- : Marking citruspay_usr_cpuser_detail/1:27 as processed

Versions:

- Racecar: 0.3.5
- Ruby Kafka: 0.5.1
- Ruby: 2.3.1

Signal handlers are not re-entrant

Hi racecar team, love your gem <3

Racecar's binstub does not properly handle signals in my experience. I get this printed in the output when I send Ctrl+C to bin/racecar MyConsumer:

log writing failed. can't be called from trap context

I've encountered unpredictable failures scenarios around sending signals to racecar actually:

  • after killing, I get a console prompt as though the process is gone but I still get output printed to my console
  • if you press Ctrl+C enough times you can just crash the process (this is likely a race condition)

The reason this happens is because the signal handlers are not re-entrant. The handlers in runner.rb call shutdown code directly from within the trap block. This code could be called at any time, including before the previous signal is finished being handled. This is prime race condition territory.

Instead, you should consider using a global queue + a "self pipe" to handle signals serially. This is what sidekiq, foreman, unicorn, and I'm sure many others do to handle this problem.

Replay from offset

Does racecar have an API to allow the consumer to seek from a specific offset? There are times where we want to replay but right now I'm not seeing a way to do it.

Integration Testing a Consumer

We are starting on integrating Kafka and racecar into an app. Looking at the documentation and the code, it's not clear to me how we can integration test a producer-consumer pair from within our test suite.

that is, while in production the produced events will be coming from an external service, in our test suite we would like to run a consumer, produce what should be an event that matches what will be produced in production, and assert against the expected results of the consumption.

anything that exists for this that y'all can point me to, or a discussion of the challenges of doing so?

both for the gem and the reply, many thanks!

Horizontally Scalable for Consumers?

I see in https://github.com/zendesk/ruby-kafka#consuming-messages-from-kafka that:

Each consumer process will be assigned one or more partitions from each topic that the group subscribes to. In order to handle more messages, simply start more processes.

Is this true for racecar? Can we simply add more boxes that run the bundle exec racecar MyConsumer and assuming we have the brokers/partitions configured it will scale?

I read through https://github.com/zendesk/ruby-kafka#consumer-design but I'm not sure if that covers it.

Batch processing API

We're considering using racecar for some of our Kafka consumers, but our use case requires processing whole batches of messages.

Would you be interested in a contribution that would allow that kind of behavior, built on top of each_batch method?

Allow manual heartbeats

class LongRunningConsumer < Racecar::Consumer
  subscribes_to "slow-ass-jobs"

  def process(message)
    decode(message).each do |part|
      # Each part of the processing can be slow.
      process_part(part)

      # In order to avoid getting kicked out of the group due to a lack of
      # activity, we send a heartbeat after each part has been processed.
      heartbeat!

      # If we don't want to *force* a heartbeat, but rather just send one if
      # the heartbeat thresholds have been reached:
      heartbeat
    end
  end
end

General Question: Consumers & Mutations

Hey all, this is more of a general question about best practices.

With the following consumer, is @events at risk of being mutated if consumers are working concurrently? I guess I'm asking if racecar is creating an instance per event or reusing the same one for all events.

class TestConsumer < Racecar::Consumer
  def process(event)
    @event = event

    do_something
  end

  def do_something
    service.call(@event)
  end
end

I tried asking in the ruby-kafka slack channel listed in the README w/ no prevail 😞

Thanks!

How to run multiple consumers?

It support only 1 consumer right now, is it?
Any plans to add support for multiple consumers?


I have few more questions:

What is usual practice to make retries? I don't find any functionality to access publishing messages, or access to kafka client object (it'a a local variable)

Exception in teardown (defined in consumer) will make process exit immediately, will it be better to call Racecar.config.error_handler ?

Is it possible not to load rails on load? If we want to run plain ruby consumer inside rails project

Documentation should mention configuration through Ruby files

I was having a problem where I needed one of the keys for the Racecar config to be the result of some computation (writing the CA certs to a file temporarily then reading them in that way; sort of an atypical configuration), and I saw in the code that Racecar loads configs from config/racecar.rb after loading config/racecar.yml, which allowed me to proceed.

This was super useful and I would never have known it was there if I'd only gone by the documentation. I recommend adding a mention of it to the README for others who may be in the same boat.

General Questions about Implementation

I wanted to go over some questions I have while implementing racecar for ruby kafka.

  1. So say we have one 1 topic and we want to have 2 consumers belonging to one group to act on that topic.
    a. Do we built separate consumer classes subscribing to the same topic?
    b. I see the racecar.yml having the configurations for brokers, client_id etc, but I would think the group id should be consumer class specific? If yes how do we declare that in the consumer class to tie two consumers into one group?
    c. So once we tie the consumers together in a group, racecar will handle assigning them to partitions automatically?

  2. Just to confirm we must have separate daemons to run each consumer class?

  3. Is the only point we would want to shut down a consumer is when it crashes?

  4. What process supervisor does Zendesk use? Unit, Kubernetes or God?

  5. We were looking into Docker/Mesos, do you have any concerns about using those with racecar?

False-Positive when detecting rails application

TL;DR
require 'rails' returns true for non-rails project with "activesuppport" or "railties"

Detection if the app is a rails app happens here

which produces false positive when app uses activesupport. Rails object is then defined, but it has no "logger" object so these lines raise an exception (undefined method on nil object :) )

It was detected in a closed-source project, I can share part of my Gemfile.lock

$ cat Gemfile.lock G rail
      rails-dom-testing (~> 2.0)
      rails-html-sanitizer (~> 1.0, >= 1.0.2)
      rails-dom-testing (~> 2.0)
      rails-html-sanitizer (~> 1.0, >= 1.0.3)
    rails-dom-testing (2.0.3)
    rails-html-sanitizer (1.0.4)
    railties (5.1.6)
      railties (>= 4.2.7, < 5.3.0)

and

$ cat Gemfile.lock G active
      activesupport (= 5.1.6)
      activesupport (= 5.1.6)
    activemodel (5.1.6)
      activesupport (= 5.1.6)
    activerecord (5.1.6)
      activemodel (= 5.1.6)
      activesupport (= 5.1.6)
    activesupport (5.1.6)
      activesupport (>= 4.2.0)
      activesupport (= 5.1.6)
      activerecord (>= 4.2.7, < 5.3.0)
  activerecord (~> 5.1)
  activesupport (~> 5.1)

Do not put current directory in LOAD_PATH

Hello everybody!
First of all, thanks for making such a good project. It is a great addition to such a low-level ruby-kafka :)


My question...

I have found a strange issue using racecar + bootsnap.

As you may know Bootsnap recursively scans everything in $LOAD_PATH and caches all paths to speed up application boot process.

Also I have found that racecar adds current directory to $LOAD_PATH - 0849ecc.

This causes bootsnap to scan every directory in my rails project :) including shared storage linked to public/uploads.

So my question: is it necessary to put current directory into $LOAD_PATH?

Wrong logging of offsets.

Hi,

I started a consumer subscribing to an empty topic. Then with extreme speed dumped 37869 messages in a that clean topic. My consumer side by side started fetching. Looking at the log shows that it processed exactly 37869 times. But when I see the offset that is processed, there are duplicates => so some offsets are missing [to account for the duplicate.]. So I think that there is issue with logging.

You can see that offset 14, 15 47, etc being repeated. As this is not a complete log you cannot see the missing ones. But I counted the complete processed lines and it is 37869 but the uniq offsets in the logs are only 36213.

Log:

[2018-02-01T18:36:54.665162 #9229] DEBUG -- : Opening connection to 10.10.10.10:9092 with client id app...
[2018-02-01T18:36:54.666681 #9229] DEBUG -- : Sending topic_metadata API request 1 to 10.10.10.10:9092
[2018-02-01T18:36:54.667059 #9229] DEBUG -- : Waiting for response 1 from 10.10.10.10:9092
[2018-02-01T18:36:54.734866 #9229] DEBUG -- : Received response 1 from 10.10.10.10:9092
[2018-02-01T18:36:54.735114 #9229]  INFO -- : Discovered cluster metadata; nodes: 10.10.10.10:9092 (node_id=0)
[2018-02-01T18:36:54.735727 #9229] DEBUG -- : Closing socket to 10.10.10.10:9092
[2018-02-01T18:36:54.736415 #9229]  INFO -- : Joining group `app.test-consumer`
[2018-02-01T18:36:54.736644 #9229] DEBUG -- : Getting group coordinator for `app.test-consumer`
[2018-02-01T18:36:54.737095 #9229] DEBUG -- : Opening connection to 10.10.10.10:9092 with client id app...
[2018-02-01T18:36:54.738381 #9229] DEBUG -- : Sending group_coordinator API request 1 to 10.10.10.10:9092
[2018-02-01T18:36:54.738769 #9229] DEBUG -- : Waiting for response 1 from 10.10.10.10:9092
[2018-02-01T18:36:54.740335 #9229] DEBUG -- : Received response 1 from 10.10.10.10:9092
[2018-02-01T18:36:54.752906 #9229] DEBUG -- : Coordinator for group `app.test-consumer` is 0. Connecting...
[2018-02-01T18:36:54.753322 #9229] DEBUG -- : Connected to coordinator: 10.10.10.10:9092 (node_id=0) for group `app.test-consumer`
[2018-02-01T18:36:54.765412 #9229] DEBUG -- : Sending join_group API request 2 to 10.10.10.10:9092
[2018-02-01T18:36:54.795344 #9229] DEBUG -- : Waiting for response 2 from 10.10.10.10:9092
[2018-02-01T18:36:54.825464 #9229] DEBUG -- : Received response 2 from 10.10.10.10:9092
[2018-02-01T18:36:54.825875 #9229]  INFO -- : Joined group `app.test-consumer` with member id `app-6219334d-867f-4d6b-aff6-d1e841912c07`
[2018-02-01T18:36:54.826119 #9229]  INFO -- : Chosen as leader of group `app.test-consumer`
[2018-02-01T18:36:55.832149 #9229]  INFO -- : Fetching cluster metadata from kafka://10.10.10.10:9092
[2018-02-01T18:36:55.832409 #9229] DEBUG -- : Opening connection to 10.10.10.10:9092 with client id app...
[2018-02-01T18:36:55.833883 #9229] DEBUG -- : Sending topic_metadata API request 1 to 10.10.10.10:9092
[2018-02-01T18:36:55.834169 #9229] DEBUG -- : Waiting for response 1 from 10.10.10.10:9092
[2018-02-01T18:36:55.835199 #9229] DEBUG -- : Received response 1 from 10.10.10.10:9092
[2018-02-01T18:36:55.835335 #9229]  INFO -- : Discovered cluster metadata; nodes: 10.10.10.10:9092 (node_id=0)
[2018-02-01T18:36:55.835403 #9229] DEBUG -- : Closing socket to 10.10.10.10:9092
[2018-02-01T18:36:55.835788 #9229] DEBUG -- : Sending sync_group API request 3 to 10.10.10.10:9092
[2018-02-01T18:36:55.836217 #9229] DEBUG -- : Waiting for response 3 from 10.10.10.10:9092
[2018-02-01T18:36:55.840094 #9229] DEBUG -- : Received response 3 from 10.10.10.10:9092
[2018-02-01T18:36:55.840194 #9229]  INFO -- : Partitions assigned for `users`: 0
[2018-02-01T18:36:55.840358 #9229] DEBUG -- : Sending offset_fetch API request 4 to 10.10.10.10:9092
[2018-02-01T18:36:55.840597 #9229] DEBUG -- : Waiting for response 4 from 10.10.10.10:9092
[2018-02-01T18:36:55.841112 #9229] DEBUG -- : Received response 4 from 10.10.10.10:9092
[2018-02-01T18:36:55.841285 #9229] DEBUG -- : Sending list_offset API request 5 to 10.10.10.10:9092
[2018-02-01T18:36:55.841527 #9229] DEBUG -- : Waiting for response 5 from 10.10.10.10:9092
[2018-02-01T18:36:55.854307 #9229] DEBUG -- : Received response 5 from 10.10.10.10:9092
[2018-02-01T18:36:55.854537 #9229] DEBUG -- : Fetching batch from users/0 starting at offset 0
[2018-02-01T18:36:55.854915 #9229] DEBUG -- : Sending fetch API request 6 to 10.10.10.10:9092
[2018-02-01T18:36:55.855252 #9229] DEBUG -- : Waiting for response 6 from 10.10.10.10:9092
[2018-02-01T18:37:00.859910 #9229] DEBUG -- : Received response 6 from 10.10.10.10:9092
[2018-02-01T18:37:00.862207 #9229] DEBUG -- : Fetching batch from users/0 starting at offset 0
[2018-02-01T18:37:00.862523 #9229] DEBUG -- : Sending fetch API request 7 to 10.10.10.10:9092
[2018-02-01T18:37:00.862960 #9229] DEBUG -- : Waiting for response 7 from 10.10.10.10:9092
[2018-02-01T18:37:05.866017 #9229] DEBUG -- : Received response 7 from 10.10.10.10:9092
[2018-02-01T18:37:05.866268 #9229]  INFO -- : Sending heartbeat...
[2018-02-01T18:37:05.866508 #9229] DEBUG -- : Sending heartbeat API request 8 to 10.10.10.10:9092
[2018-02-01T18:37:05.866932 #9229] DEBUG -- : Waiting for response 8 from 10.10.10.10:9092
[2018-02-01T18:37:05.867952 #9229] DEBUG -- : Received response 8 from 10.10.10.10:9092
[2018-02-01T18:37:05.868115 #9229] DEBUG -- : Fetching batch from users/0 starting at offset 0
[2018-02-01T18:37:05.868393 #9229] DEBUG -- : Sending fetch API request 9 to 10.10.10.10:9092
[2018-02-01T18:37:05.868784 #9229] DEBUG -- : Waiting for response 9 from 10.10.10.10:9092
[2018-02-01T18:37:10.872356 #9229] DEBUG -- : Received response 9 from 10.10.10.10:9092
[2018-02-01T18:37:10.872620 #9229] DEBUG -- : Fetching batch from users/0 starting at offset 0
[2018-02-01T18:37:10.872828 #9229] DEBUG -- : Sending fetch API request 10 to 10.10.10.10:9092
[2018-02-01T18:37:10.873052 #9229] DEBUG -- : Waiting for response 10 from 10.10.10.10:9092
[2018-02-01T18:37:15.879963 #9229] DEBUG -- : Received response 10 from 10.10.10.10:9092
[2018-02-01T18:37:15.880153 #9229]  INFO -- : Sending heartbeat...
[2018-02-01T18:37:15.880293 #9229] DEBUG -- : Sending heartbeat API request 11 to 10.10.10.10:9092
[2018-02-01T18:37:15.880564 #9229] DEBUG -- : Waiting for response 11 from 10.10.10.10:9092
[2018-02-01T18:37:15.881873 #9229] DEBUG -- : Received response 11 from 10.10.10.10:9092
[2018-02-01T18:37:15.882035 #9229] DEBUG -- : Fetching batch from users/0 starting at offset 0
[2018-02-01T18:37:15.882276 #9229] DEBUG -- : Sending fetch API request 12 to 10.10.10.10:9092
[2018-02-01T18:37:15.882596 #9229] DEBUG -- : Waiting for response 12 from 10.10.10.10:9092
[2018-02-01T18:37:20.885767 #9229] DEBUG -- : Received response 12 from 10.10.10.10:9092
[2018-02-01T18:37:20.885968 #9229] DEBUG -- : Fetching batch from users/0 starting at offset 0
[2018-02-01T18:37:20.886338 #9229] DEBUG -- : Sending fetch API request 13 to 10.10.10.10:9092
[2018-02-01T18:37:20.886756 #9229] DEBUG -- : Waiting for response 13 from 10.10.10.10:9092
[2018-02-01T18:37:25.891438 #9229] DEBUG -- : Received response 13 from 10.10.10.10:9092
[2018-02-01T18:37:25.891691 #9229]  INFO -- : Sending heartbeat...
[2018-02-01T18:37:25.891980 #9229] DEBUG -- : Sending heartbeat API request 14 to 10.10.10.10:9092
[2018-02-01T18:37:25.892540 #9229] DEBUG -- : Waiting for response 14 from 10.10.10.10:9092
[2018-02-01T18:37:25.893998 #9229] DEBUG -- : Received response 14 from 10.10.10.10:9092
[2018-02-01T18:37:25.894296 #9229] DEBUG -- : Fetching batch from users/0 starting at offset 0
[2018-02-01T18:37:25.894606 #9229] DEBUG -- : Sending fetch API request 15 to 10.10.10.10:9092
[2018-02-01T18:37:25.894912 #9229] DEBUG -- : Waiting for response 15 from 10.10.10.10:9092
[2018-02-01T18:37:30.900586 #9229] DEBUG -- : Received response 15 from 10.10.10.10:9092
[2018-02-01T18:37:30.902817 #9229] DEBUG -- : Fetching batch from users/0 starting at offset 0
[2018-02-01T18:37:30.903222 #9229] DEBUG -- : Sending fetch API request 16 to 10.10.10.10:9092
[2018-02-01T18:37:30.903673 #9229] DEBUG -- : Waiting for response 16 from 10.10.10.10:9092
[2018-02-01T18:37:35.906216 #9229] DEBUG -- : Received response 16 from 10.10.10.10:9092
[2018-02-01T18:37:35.906349 #9229]  INFO -- : Sending heartbeat...
[2018-02-01T18:37:35.906485 #9229] DEBUG -- : Sending heartbeat API request 17 to 10.10.10.10:9092
[2018-02-01T18:37:35.906702 #9229] DEBUG -- : Waiting for response 17 from 10.10.10.10:9092
[2018-02-01T18:37:35.908049 #9229] DEBUG -- : Received response 17 from 10.10.10.10:9092
[2018-02-01T18:37:35.908200 #9229] DEBUG -- : Fetching batch from users/0 starting at offset 0
[2018-02-01T18:37:35.908439 #9229] DEBUG -- : Sending fetch API request 18 to 10.10.10.10:9092
[2018-02-01T18:37:35.908703 #9229] DEBUG -- : Waiting for response 18 from 10.10.10.10:9092
[2018-02-01T18:37:40.910913 #9229] DEBUG -- : Received response 18 from 10.10.10.10:9092
[2018-02-01T18:37:40.911282 #9229] DEBUG -- : Fetching batch from users/0 starting at offset 0
[2018-02-01T18:37:40.911484 #9229] DEBUG -- : Sending fetch API request 19 to 10.10.10.10:9092
[2018-02-01T18:37:40.911729 #9229] DEBUG -- : Waiting for response 19 from 10.10.10.10:9092
[2018-02-01T18:37:45.914329 #9229] DEBUG -- : Received response 19 from 10.10.10.10:9092
[2018-02-01T18:37:45.914544 #9229]  INFO -- : Sending heartbeat...
[2018-02-01T18:37:45.914694 #9229] DEBUG -- : Sending heartbeat API request 20 to 10.10.10.10:9092
[2018-02-01T18:37:45.914959 #9229] DEBUG -- : Waiting for response 20 from 10.10.10.10:9092
[2018-02-01T18:37:45.916428 #9229] DEBUG -- : Received response 20 from 10.10.10.10:9092
[2018-02-01T18:37:45.916642 #9229] DEBUG -- : Fetching batch from users/0 starting at offset 0
[2018-02-01T18:37:45.916982 #9229] DEBUG -- : Sending fetch API request 21 to 10.10.10.10:9092
[2018-02-01T18:37:45.917291 #9229] DEBUG -- : Waiting for response 21 from 10.10.10.10:9092
[2018-02-01T18:37:48.876384 #9229] DEBUG -- : Received response 21 from 10.10.10.10:9092
[2018-02-01T18:37:48.882302 #9229] DEBUG -- : Marking users/0:0 as processed
[2018-02-01T18:37:48.882483 #9229]  INFO -- : Committing offsets with recommit: users/0:1
[2018-02-01T18:37:48.882635 #9229] DEBUG -- : Sending offset_commit API request 22 to 10.10.10.10:9092
[2018-02-01T18:37:48.882937 #9229] DEBUG -- : Waiting for response 22 from 10.10.10.10:9092
[2018-02-01T18:37:48.885830 #9229] DEBUG -- : Received response 22 from 10.10.10.10:9092
[2018-02-01T18:37:48.885981 #9229] DEBUG -- : Fetching batch from users/0 starting at offset 1
[2018-02-01T18:37:48.886155 #9229] DEBUG -- : Sending fetch API request 23 to 10.10.10.10:9092
[2018-02-01T18:37:48.886364 #9229] DEBUG -- : Waiting for response 23 from 10.10.10.10:9092
[2018-02-01T18:37:49.062736 #9229] DEBUG -- : Received response 23 from 10.10.10.10:9092
[2018-02-01T18:37:49.065367 #9229] DEBUG -- : Marking users/0:1 as processed
[2018-02-01T18:37:49.065489 #9229] DEBUG -- : Fetching batch from users/0 starting at offset 2
[2018-02-01T18:37:49.077563 #9229] DEBUG -- : Sending fetch API request 24 to 10.10.10.10:9092
[2018-02-01T18:37:49.077726 #9229] DEBUG -- : Waiting for response 24 from 10.10.10.10:9092
[2018-02-01T18:37:49.101840 #9229] DEBUG -- : Received response 24 from 10.10.10.10:9092
[2018-02-01T18:37:49.106902 #9229] DEBUG -- : Marking users/0:2 as processed
[2018-02-01T18:37:49.107106 #9229] DEBUG -- : Fetching batch from users/0 starting at offset 3
[2018-02-01T18:37:49.107895 #9229] DEBUG -- : Sending fetch API request 25 to 10.10.10.10:9092
[2018-02-01T18:37:49.108265 #9229] DEBUG -- : Waiting for response 25 from 10.10.10.10:9092
[2018-02-01T18:37:49.117768 #9229] DEBUG -- : Received response 25 from 10.10.10.10:9092
[2018-02-01T18:37:49.122265 #9229] DEBUG -- : Marking users/0:6 as processed
[2018-02-01T18:37:49.127335 #9229] DEBUG -- : Marking users/0:7 as processed
[2018-02-01T18:37:49.130767 #9229] DEBUG -- : Marking users/0:8 as processed
[2018-02-01T18:37:49.134990 #9229] DEBUG -- : Marking users/0:9 as processed
[2018-02-01T18:37:49.139811 #9229] DEBUG -- : Marking users/0:8 as processed
[2018-02-01T18:37:49.143620 #9229] DEBUG -- : Marking users/0:9 as processed
[2018-02-01T18:37:49.148249 #9229] DEBUG -- : Marking users/0:12 as processed
[2018-02-01T18:37:49.151859 #9229] DEBUG -- : Marking users/0:13 as processed
[2018-02-01T18:37:49.155126 #9229] DEBUG -- : Marking users/0:14 as processed
[2018-02-01T18:37:49.158733 #9229] DEBUG -- : Marking users/0:15 as processed
[2018-02-01T18:37:49.162549 #9229] DEBUG -- : Marking users/0:14 as processed
[2018-02-01T18:37:49.165250 #9229] DEBUG -- : Marking users/0:15 as processed
[2018-02-01T18:37:49.170275 #9229] DEBUG -- : Marking users/0:44 as processed
[2018-02-01T18:37:49.172494 #9229] DEBUG -- : Marking users/0:45 as processed
[2018-02-01T18:37:49.175404 #9229] DEBUG -- : Marking users/0:46 as processed
[2018-02-01T18:37:49.179739 #9229] DEBUG -- : Marking users/0:47 as processed
[2018-02-01T18:37:49.184462 #9229] DEBUG -- : Marking users/0:48 as processed
[2018-02-01T18:37:49.188032 #9229] DEBUG -- : Marking users/0:49 as processed
[2018-02-01T18:37:49.191116 #9229] DEBUG -- : Marking users/0:50 as processed
[2018-02-01T18:37:49.194450 #9229] DEBUG -- : Marking users/0:51 as processed
[2018-02-01T18:37:49.197938 #9229] DEBUG -- : Marking users/0:52 as processed
[2018-02-01T18:37:49.201562 #9229] DEBUG -- : Marking users/0:53 as processed
[2018-02-01T18:37:49.205061 #9229] DEBUG -- : Marking users/0:54 as processed
[2018-02-01T18:37:49.207701 #9229] DEBUG -- : Marking users/0:55 as processed
[2018-02-01T18:37:49.210568 #9229] DEBUG -- : Marking users/0:56 as processed
[2018-02-01T18:37:49.212780 #9229] DEBUG -- : Marking users/0:57 as processed
[2018-02-01T18:37:49.216208 #9229] DEBUG -- : Marking users/0:58 as processed
[2018-02-01T18:37:49.218950 #9229] DEBUG -- : Marking users/0:59 as processed
[2018-02-01T18:37:49.220674 #9229] DEBUG -- : Marking users/0:60 as processed
[2018-02-01T18:37:49.223278 #9229] DEBUG -- : Marking users/0:61 as processed
[2018-02-01T18:37:49.227621 #9229] DEBUG -- : Marking users/0:62 as processed
[2018-02-01T18:37:49.229517 #9229] DEBUG -- : Marking users/0:63 as processed
[2018-02-01T18:37:49.232582 #9229] DEBUG -- : Marking users/0:64 as processed
[2018-02-01T18:37:49.235681 #9229] DEBUG -- : Marking users/0:65 as processed
[2018-02-01T18:37:49.240375 #9229] DEBUG -- : Marking users/0:66 as processed
[2018-02-01T18:37:49.246845 #9229] DEBUG -- : Marking users/0:67 as processed
[2018-02-01T18:37:49.251738 #9229] DEBUG -- : Marking users/0:68 as processed
[2018-02-01T18:37:49.254770 #9229] DEBUG -- : Marking users/0:69 as processed
[2018-02-01T18:37:49.259062 #9229] DEBUG -- : Marking users/0:70 as processed
[2018-02-01T18:37:49.263681 #9229] DEBUG -- : Marking users/0:71 as processed
[2018-02-01T18:37:49.265919 #9229] DEBUG -- : Marking users/0:72 as processed
[2018-02-01T18:37:49.271436 #9229] DEBUG -- : Marking users/0:73 as processed
[2018-02-01T18:37:49.282440 #9229] DEBUG -- : Marking users/0:46 as processed
[2018-02-01T18:37:49.288774 #9229] DEBUG -- : Marking users/0:47 as processed
[2018-02-01T18:37:49.289384 #9229] DEBUG -- : Fetching batch from users/0 starting at offset 48
[2018-02-01T18:37:49.289677 #9229] DEBUG -- : Sending fetch API request 26 to 10.10.10.10:9092
[2018-02-01T18:37:49.324238 #9229] DEBUG -- : Waiting for response 26 from 10.10.10.10:9092
[2018-02-01T18:37:49.410467 #9229] DEBUG -- : Received response 26 from 10.10.10.10:9092
[2018-02-01T18:37:49.417028 #9229] DEBUG -- : Marking users/0:69 as processed
[2018-02-01T18:37:49.427439 #9229] DEBUG -- : Marking users/0:70 as processed
[2018-02-01T18:37:49.429819 #9229] DEBUG -- : Marking users/0:71 as processed

Consumer:


class TestConsumer < Racecar::Consumer
  subscribes_to 'users', start_from_beginning: false

  def process(message)
    count = Rails.cache.read("count")

    unless count
      count = 0
    end

    Rails.cache.write("count", count + 1)
  end
end

Versions:

  • Racecar: 0.3.5
  • Ruby Kafka: 0.5.1
  • Ruby: 2.3.1

Middleware

Thanks for making racecar! I'm wondering if the team would be open to a PR that add's middleware to racecar? I'm finding that the consumers we are writing share a lot of initial code and while a prepended module is fine for now, I'd prefer an ordered middleware similar to how sidekiq does it.

I'm open to any design ideas you might have but also happy to propose something like:

Racecar.configure do |config|
  config.consumer_middleware do |chain|
    chain.add(MyConsumerMiddleware)
  end
end

where a middleware just needs to respond to one method:

class MyConsumerMiddlware
  def call(consumer, message)
    Rails.logger.tagged(consumer.class.name) do 
      yield
    end
  end
end

ServiceDiscovery.find ... is this another pretended class?

I'm sorry if this is already something that you've addressed. I started writing this issue before checking the closed issues. I'm trying to checkout the current racecar gem in IRB using the code snippets in the README.md file.

The prospects of a ServiceDiscovery.find functionality are very appealing to me. I was disappointed when your config example didn't work becase no such class exists.

Exception: Unsupported compression type

Got this when restarting Kafka in local dev:

Crashed: Rdkafka::RdkafkaError: Unsupported compression type (unsupported_compression_type)
/bundle/gems/rdkafka-0.6.0/lib/rdkafka/consumer.rb:355:in `poll'
/bundle/gems/racecar-2.0.0.alpha1/lib/racecar/consumer_set.rb:16:in `poll'
/bundle/gems/racecar-2.0.0.alpha1/lib/racecar/runner.rb:69:in `block (2 levels) in run'
/bundle/gems/activesupport-5.1.6.2/lib/active_support/notifications.rb:168:in `instrument'
/bundle/gems/racecar-2.0.0.alpha1/lib/racecar/runner.rb:61:in `block in run'
/bundle/gems/racecar-2.0.0.alpha1/lib/racecar/runner.rb:58:in `loop'
/bundle/gems/racecar-2.0.0.alpha1/lib/racecar/runner.rb:58:in `run'
/bundle/gems/racecar-2.0.0.alpha1/lib/racecar.rb:50:in `run'
/bundle/gems/racecar-2.0.0.alpha1/lib/racecar/cli.rb:63:in `run'
/bundle/gems/racecar-2.0.0.alpha1/lib/racecar/cli.rb:10:in `main'
/bundle/gems/racecar-2.0.0.alpha1/exe/racecar:7:in `<top (required)>'
bin/racecar:17:in `load'
bin/racecar:17:in `<main>'

/cc @JHK @breunigs

SSL Failures before Error Handler

I saw issue #32 and while similar, we had an issue where SSL certs failed, via:

Crashed: OpenSSL::SSL::SSLError: SSL_connect returned=1 errno=0 state=error: unexpected message

We have an error handler set up but because we never got into process, it never sent it to sentry. We only noticed this because K8s kept restarting the container and we had a policy set up to watch excessive restart counts.

Is there anyway to get the error handler set up before making the first connection? I noticed that our ActiveSupport::Notification subscriber saw the event but not Racecar's.

Consumer thread crashed

I have 3 different environments. In 2 out of 3, starting a racecar consumer works fine. In the third, I get the following error:

racecar UserEventConsumer
=> Starting Racecar consumer UserEventConsumer...
=> Detected Rails, booting application...
=> Wrooooom!
=> Ctrl-C to shutdown consumer
I, [2017-11-15T13:25:23.172617 #68337]  INFO -- : New topics added to target list: playground_2
I, [2017-11-15T13:25:23.172819 #68337]  INFO -- : Fetching cluster metadata from kafka://development-magicbus1.cloudith.com:9092
D, [2017-11-15T13:25:23.173106 #68337] DEBUG -- : Opening connection to development-magicbus1.cloudith.com:9092 with client id tms...
D, [2017-11-15T13:25:23.263129 #68337] DEBUG -- : Sending request 1 to development-magicbus1.cloudith.com:9092
D, [2017-11-15T13:25:23.263456 #68337] DEBUG -- : Waiting for response 1 from development-magicbus1.cloudith.com:9092
D, [2017-11-15T13:25:23.298736 #68337] DEBUG -- : Received response 1 from development-magicbus1.cloudith.com:9092
I, [2017-11-15T13:25:23.299062 #68337]  INFO -- : Discovered cluster metadata; nodes: development-magicbus2.cloudith.com:9092 (node_id=2), development-magicbus5.cloudith.com:9092 (node_id=5), development-magicbus4.cloudith.com:9092 (node_id=4), development-magicbus1.cloudith.com:9092 (node_id=1), development-magicbus3.cloudith.com:9092 (node_id=3)
D, [2017-11-15T13:25:23.299143 #68337] DEBUG -- : Closing socket to development-magicbus1.cloudith.com:9092
I, [2017-11-15T13:25:23.299296 #68337]  INFO -- : Joining group `test`
D, [2017-11-15T13:25:23.299390 #68337] DEBUG -- : Getting group coordinator for `test`
D, [2017-11-15T13:25:23.299592 #68337] DEBUG -- : Opening connection to development-magicbus2.cloudith.com:9092 with client id tms...
D, [2017-11-15T13:25:23.377238 #68337] DEBUG -- : Sending request 1 to development-magicbus2.cloudith.com:9092
D, [2017-11-15T13:25:23.377368 #68337] DEBUG -- : Waiting for response 1 from development-magicbus2.cloudith.com:9092
D, [2017-11-15T13:25:23.413285 #68337] DEBUG -- : Received response 1 from development-magicbus2.cloudith.com:9092
I, [2017-11-15T13:25:23.413531 #68337]  INFO -- : Leaving group `test`
D, [2017-11-15T13:25:23.413575 #68337] DEBUG -- : Getting group coordinator for `test`
D, [2017-11-15T13:25:23.413716 #68337] DEBUG -- : Sending request 2 to development-magicbus2.cloudith.com:9092
D, [2017-11-15T13:25:23.413874 #68337] DEBUG -- : Waiting for response 2 from development-magicbus2.cloudith.com:9092
D, [2017-11-15T13:25:23.458023 #68337] DEBUG -- : Received response 2 from development-magicbus2.cloudith.com:9092
E, [2017-11-15T13:25:23.458406 #68337] ERROR -- : Consumer thread crashed: Kafka::GroupAuthorizationCode: Kafka::GroupAuthorizationCode
.rvm/gems/ruby-2.3.1@tms/gems/ruby-kafka-0.4.1/lib/kafka/protocol.rb:96:in `handle_error'
.rvm/gems/ruby-2.3.1@tms/gems/ruby-kafka-0.4.1/lib/kafka/cluster.rb:93:in `block in get_group_coordinator'
.rvm/gems/ruby-2.3.1@tms/gems/ruby-kafka-0.4.1/lib/kafka/cluster.rb:88:in `each'
.rvm/gems/ruby-2.3.1@tms/gems/ruby-kafka-0.4.1/lib/kafka/cluster.rb:88:in `get_group_coordinator'
.rvm/gems/ruby-2.3.1@tms/gems/ruby-kafka-0.4.1/lib/kafka/consumer_group.rb:167:in `coordinator'
.rvm/gems/ruby-2.3.1@tms/gems/ruby-kafka-0.4.1/lib/kafka/consumer_group.rb:111:in `join_group'
.rvm/gems/ruby-2.3.1@tms/gems/ruby-kafka-0.4.1/lib/kafka/consumer_group.rb:40:in `join'
.rvm/gems/ruby-2.3.1@tms/gems/ruby-kafka-0.4.1/lib/kafka/consumer.rb:357:in `join_group'
.rvm/gems/ruby-2.3.1@tms/gems/ruby-kafka-0.4.1/lib/kafka/consumer.rb:374:in `fetch_batches'
.rvm/gems/ruby-2.3.1@tms/gems/ruby-kafka-0.4.1/lib/kafka/consumer.rb:183:in `block in each_message'
.rvm/gems/ruby-2.3.1@tms/gems/ruby-kafka-0.4.1/lib/kafka/consumer.rb:319:in `consumer_loop'
.rvm/gems/ruby-2.3.1@tms/gems/ruby-kafka-0.4.1/lib/kafka/consumer.rb:182:in `each_message'
.rvm/gems/ruby-2.3.1@tms/bundler/gems/racecar-675f42e46cee/lib/racecar/runner.rb:54:in `run'
.rvm/gems/ruby-2.3.1@tms/bundler/gems/racecar-675f42e46cee/lib/racecar.rb:27:in `run'
.rvm/gems/ruby-2.3.1@tms/bundler/gems/racecar-675f42e46cee/lib/racecar/cli.rb:52:in `main'
.rvm/gems/ruby-2.3.1@tms/bundler/gems/racecar-675f42e46cee/exe/racecar:9:in `<top (required)>'
.rvm/gems/ruby-2.3.1@tms/bin/racecar:23:in `load'
.rvm/gems/ruby-2.3.1@tms/bin/racecar:23:in `<main>'
.rvm/gems/ruby-2.3.1@tms/bin/ruby_executable_hooks:15:in `eval'
.rvm/gems/ruby-2.3.1@tms/bin/ruby_executable_hooks:15:in `<main>'
=> Crashed: Kafka::GroupAuthorizationCode

The only difference I have found is that in the two environments where the consumer starts successfully, I do not see this in the output:

Leaving group test

Versions of Kafka:

  1. 10.0.1
  2. 10.2.1
  3. 11.0.0

10.2.1 is the env with the failure.

I have tried changing session_timeout, but no difference.

Why is producing from consumers alpha?

1 - What problems have been seen producing from consumers?
2 - Would using delivery boy from a consumer mitigate any issues with the built in producer?

Kubernetes Readiness/Liveness Probes

I'm in the midst of rolling out some racecar workers in our Kubernetes cluster, and I'm interested in whether there are any recommendations on implementing liveness and readiness probes for K8s deployments. Readiness seems relatively easily achievable, as one could drop a file in a known location on startup and have the probe check for that file to exist (this also might not be terribly necessary since k8s deploys are done with Recreate).

I'd love some advice on how to go about implementing liveness, though. We ran into an issue today where a container got stuck in a loop trying to connect, which was quickly resolved when the container was killed and a new one came up, but we had to manually resolve the issue.

Support for producing Kafka messages

A lot of our use cases are centered around transforming streams of Kafka messages and writing them back to another topic, typically enriched with data from our Rails app. Currently, Racecar provides no direct support for producing messages. Adding direct support would allow the delivery of messages to be optimized.

Update ruby-kafka to v5

Hi,

Will you consider to upgrade to ruby-kafka v5 ? It could be great also because of the support SASL SCRAM !

Thanks,
Jules

Exponential backoff pauses

Currently, there's a tension when configuring the pause_timeout between minimizing the pause in cases where a long pause is not necessary and avoiding situations in which repeated attempts are harmful, e.g. when a downstream service needs to recover before being able to handle requests.

Typically, this is balanced using exponential backoff. The first retry is done shortly after the failure, but for each subsequent failure the pause between retries increases exponentially.

Another typical improvement is to add a random jitter to the pause in order to break any synchronization between clients that would otherwise all retry at the same time and follow the same backoff pattern.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.