Giter VIP home page Giter VIP logo

jackdaw's People

Contributors

alexvpopov avatar andreacrotti avatar andrewkeedle avatar apanourgiasfc avatar apmaros avatar aprobus avatar arrdem avatar bobby avatar cddr avatar chrisdevo avatar danburton avatar davewm avatar funderbar avatar gphilipp avatar jbropho avatar kidpollo avatar lamp avatar louiseklodt avatar mamolli avatar marcoccchan avatar mattford63 avatar matthias-margush avatar minimal avatar morganastra avatar noisesmith avatar prakashkillada-fc avatar rads avatar sbrauer avatar sgerrand avatar xiongtx avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

jackdaw's Issues

Occasional build failures due to `gen/any-printable` producing #NaN sometimes in the EDN test

Occasional build failures due to gen/any-printable producing #NaN sometimes in the EDN test. This comes from the double generator used by any-printable.

(prop/for-all [x gen/any-printable]

Spotted by @r0man

Example failing build: https://circleci.com/gh/FundingCircle/jackdaw/65?utm_campaign=workflow-failed&utm_medium=email&utm_source=notification

Race-condition in rest-proxy transport

Astute followers may have noticed some flakeyness in the rest-proxy transport test. On closer inspection, this does indeed seem like a bug although I'm not quite sure yet how it's happening. When the transport is created, it is supposed to wait until the consumer has performed at least one poll before it "starts" the producer and allows any messages to be submitted by the test machine. This is intended to ensure that the journal captures all output initiated by the current test.

https://github.com/FundingCircle/jackdaw/blob/master/src/jackdaw/test/transports/rest_proxy.clj#L289

However careful inspection of the log for a test failure reveals that when the test intermittently fails, it looks like the producer is indeed being "started" before the consumer

Schema registry basic auth issues

I'm not sure if its how jackdaw creates its default client or not but I couldn't find a way to set or get it to pick up the piece of config to tell it to extract the basic auth creds from the URL. A simple workaround is to create one yourself

(defn cached-schema-registry-cli []
  (let [rest-service (RestService. "https://user:[email protected]:13584")]
    (CachedSchemaRegistryClient. rest-service 10 (doto (new java.util.HashMap) (.put "basic.auth.credentials.source" "URL")))))

(def serde-resolver
  (partial resolver/serde-resolver :schema-registry-url "" :schema-registry-client (cached-schema-registry-cli)))

Minor points but it would also be nice to not require a url for the resolver and also the topic-metadata map insists on a default schema even if using it from the registry.

Thanks.

Review the return types used for operations in the admin ns

Many of the operations in the admin namespace block on some upstream result before transforming the result into something a bit more idiomatic for Clojure programs. If the operation is unable to complete (e.g. it is unable to connect to the configured broker), the operation will never return, potentially blocking your REPL.

Returning a manifold deferred instead allows jackdaw to asynchronously perform the same transformation but instead of returning the transformed object after blocking, immediately return a deferred that eventually resolves to the transformed result.

In addition, functions like create-topics! currently return a boolean indicating whether all operations were completed. The underlying kafka operations allow us to infer which operations failed and provide representation of this to the caller.

Current jackdaw.admin

Operation Blocking Operation Returns
list-topics ListTopicsResult.names [Str]
topic-exists? ListTopicsResult.names boolean
retry-exists? specified by caller boolean
create-topics! CreateTopicsResult.all boolean
describe-topics DescribeTopicsResult.all {Str topic-metadata?}
describe-topics-configs DescribeConfigsResult.all {Str topic-config?}
topics-ready? DescribeTopicsResult.all boolean
alter-topic-config! AlterConfigsResult.all boolean
delete-topics! DeleteTopicsResult.all boolean
partition-ids-of-topics DescribeTopicsResult.all {Str partition-info?}
describe-cluster DescribeClusterResult.all cluster-result?
get-broker-config DescribeConfigsResult.all config-result?

Proposed jackdaw.admin

Operation Blocking Operation Deferred? Returns Comments
list-topics ListTopicsResult.names Yes [Str]  
topic-exists? ListTopicsResult.names Yes boolean  
retry-exists? specified by caller No boolean Unchanged
create-topics! CreateTopicsResult.all Yes {Str boolean} boolean -> {Str boolean}
describe-topics DescribeTopicsResult.all Yes {Str topic-metadata?}  
describe-topics-configs DescribeConfigsResult.all Yes {Str topic-config?}  
topics-ready? DescribeTopicsResult.all No boolean Unchanged
alter-topic-config! AlterConfigsResult.all Yes {Str boolean} boolean -> {Str boolean}
delete-topics! DeleteTopicsResult.all Yes {Str boolean} boolean -> {Str boolean}
partition-ids-of-topics DescribeTopicsResult.all Yes {Str partition-info?}  
describe-cluster DescribeClusterResult.all Yes cluster-result?  
get-broker-config DescribeConfigsResult.all Yes config-result?  

Cannot create record with ConsumerRecordFactory when the key is of type String

When trying to test my topology I stumbled upon a bug. I'm trying to test a topology that groups a stream in time windows. When the topic's key is a String, the wrong method in the ConsumerRecordFactory is used when using an explicit time-ms.

This happens in jackdaw.streams.mock/procucer: (.pipeInput test-driver (.create record-factory k v time-ms)). The intention is to use public ConsumerRecord<byte[], byte[]> create(final K key, final V value, final long timestampMs) in line 227, but instead it then uses public ConsumerRecord<byte[], byte[]> create(final String topicName, final K key, final V value) in line 265.

I was able to create another test case for streams-test that reproduces the issue:

  (testing "windowed-by-time with string keys"
    (let [topic-a (assoc (mock/topic "topic-a") :key-serde (Serdes/String))
          topic-b (assoc (mock/topic "topic-b") :key-serde (Serdes/String))
          driver (mock/build-driver (fn [builder]
                                      (-> builder
                                          (k/kstream topic-a)
                                          (k/group-by-key)
                                          (k/window-by-time (TimeWindows/of 1000))
                                          (k/reduce + topic-a)
                                          (k/to-kstream)
                                          (k/map (fn [[k v]] [(.key k) v]))
                                          (k/to topic-b))))
          publish (partial mock/publish driver topic-a)]

      (publish 1000 "a" 1)
      (publish 1500 "a" 2)
      (publish 5000 "a" 4)

      (let [keyvals (mock/get-keyvals driver topic-b)]
        (is (= 3 (count keyvals)))
        (is (= [0 1] (first keyvals)))
        (is (= [0 3] (second keyvals)))
        (is (= [0 4] (nth keyvals 2))))))

Support for naming windowed join topics and other internal topics

The current stable version of Kafka streams (2.3) supports naming some internal topics per KIP's.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping
https://cwiki.apache.org/confluence/display/KAFKA/KIP+230%3A+Name+Windowing+Joins

It seems that it is possible (in 2.3) to name some internal topics now but not windowed join topics. We actually discovered that in trunk it is implemented but not released yet. https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L1109

Being able to name joins is important for long lived JOIN windows so that changes in the topology dont change the internal topic name and ignore the join history upon topology restart.

In the meant time we are gettin by a custom build 2.2 of kafka with support for Named Joins.
https://github.com/FundingCircle/kafka/pull/5/files#diff-5142e1d4a6410459d6bf6df98828e5afR920-R921

And a patch to join impl functions:

(defn join-windowed
  "Combines the values of two streams that share the same key using a windowed
  inner join. Adds the `join-name` parameter, which is used to name the internal
  storage topics. Requires patched version of the kafka streams jar."
  [this-kstream other-kstream value-joiner-fn windows
   {key-serde :key-serde this-value-serde :value-serde}
   {other-value-serde :value-serde}
   join-name]
  (clj-kstream
   (.join (kstream* this-kstream)
          (kstream* other-kstream)
          (value-joiner value-joiner-fn)
          windows
          (Joined/with key-serde this-value-serde other-value-serde join-name))))

(defn left-join-windowed
  "Combines the values of two streams that share the same key using a windowed
  left join. Adds the `join-name` parameter, which is used to name the internal
  storage topics. Requires patched version of the kafka streams jar."
  [this-kstream other-kstream value-joiner-fn windows
   {key-serde :key-serde this-value-serde :value-serde}
   {other-value-serde :value-serde}
   join-name]
  (clj-kstream
   (.leftJoin (kstream* this-kstream)
              (kstream* other-kstream)
              (value-joiner value-joiner-fn)
              windows
              (Joined/with key-serde this-value-serde other-value-serde join-name))))

Supporting naming of windowed joins seems quite critical as explained above but looking into supporting other internal topic custom naming support should also be looked at.

Expose "reader schema" feature

Avro has a very interesting ability to use a different schema for reading a message than the one used to writing the message. Both schemas must be made available to the reader (this is the raison d'être for the schema registry) but as long as the schemas are compatible, it means that consumers and producers are not forced to share their schemas via some centralized github repo.

At the moment jackdaw doesn't support this feature. Even though it requires a :value-serde in many consuming contexts, it does not actually use it to deserialize the message.

It should be possible to add a :reader-schema key to avro serde multi-method which allows users to take advantage of this feature without breaking any existing uses.

describe-topics-configs throws ClassCastException

(ja/describe-topics-configs client [{:topic-name "connect-offsets"}]) throws a ClassCastException when invoked. I think the fix is relatively simple just have to add topics to (-> client (.describeConfigs (map #(-> % :topic-name jd/->topic-resource) topics)) .all deref vals first jd/datafy)

Using Schema Registry with basic auth

Does jackdaw provide a convenient way to use schema registry with basic auth? I think it's a good feature, considering it's how Confluent Cloud secures its Schema Registry instances.

After lots of source reading I was able to produce messages by instantiating a custom CachedSchemaRegistryClient object and passing it as :schema-registry-client, for instance:

(RestService. ^str "https://xxxxx.confluent.cloud")
(def cli (CachedSchemaRegistryClient. rs 200 (select-keys (basic-config) ["basic.auth.credentials.source" "schema.registry.basic.auth.user.info"])))
(def serde-resolver
  (partial resolver/serde-resolver :schema-registry-url (get (basic-config) "schema.registry.url")
           :schema-registry-client cli))

Deploy Fails for Pushed Tag

$ #!/bin/bash -eo pipefail
lein deploy clojars

No credentials found for clojars
See `lein help deploying` for how to configure credentials to avoid prompts.
Username: Password: 
Compiling jackdaw.serdes.fn-impl
Created /home/circleci/jackdaw/target/jackdaw-0.6.0.jar
Wrote /home/circleci/jackdaw/pom.xml
Need to sign 2 files with GPG
[1/2] Signing /home/circleci/jackdaw/target/jackdaw-0.6.0.jar with GPG
gpg: directory '/home/circleci/.gnupg' created
gpg: keybox '/home/circleci/.gnupg/pubring.kbx' created
gpg: no default secret key: No secret key
gpg: signing failed: No secret key
Could not sign /home/circleci/jackdaw/target/jackdaw-0.6.0.jar
gpg: directory '/home/circleci/.gnupg' created
gpg: keybox '/home/circleci/.gnupg/pubring.kbx' created
gpg: no default secret key: No secret key
gpg: signing failed: No secret key


See `lein help gpg` for how to set up gpg.
If you don't expect people to need to verify the authorship of your jar, you
can add `:sign-releases false` to the relevant `:deploy-repositories` entry.
Exited with code 1

Windowed KStream missing some reduce

Hello and thx for the great lib.

I have a window-by-time aggregation that works w/ reduce when this only has an adder. But, I need to also have a subtracter and this fails w/:

java.lang.AbstractMethodError: jackdaw.streams.interop.reduce

I believe that the signature of the reduce for CljTimeWindowedKStream is missing for this case.

Thx

Use of datafy forces a Clojure 1.10 dependency

With Clojure 1.9 or lower, this casques a problem

Caused by: java.lang.IllegalArgumentException: Unbound: #'clojure.core.protocols/Datafiable is not a protocol
 at clojure.core$extend.invokeStatic(core_deftype.clj:784)
 at clojure.core$extend.doInvoke(core_deftype.clj:746)
 at clojure.lang.RestFn.invoke(RestFn.java:439)
 at jackdaw.data__init.load(Unknown Source)
 at jackdaw.data__init.<clinit>(Unknown Source)

Need to re-work in and around

(require '[clojure.core.protocols :refer [Datafiable]])

Remove dependencies on private infrastructure

At current, the CircleCI configuration uses the org-global context. This brings private configuration data into scope. As a matter of policy, open source projects must not make use of the same private infrastructure and secrets as private projects.

jackdaw.client.send! delays the entire operation, rather than doing it asynchronously

The intent of send! is to write the data but not block for the result. It uses a delay around the actual result to delay the datafy of the response when it arrives.

Unfortunately, the use of delay also wraps the send. call to kafka, so calling send! does nothing until you deref it, at which point you are blocked until the write succeeds. The use of the threading macro obfuscates things here also (for me at least ...)

deref jd/datafy delay))

Can be fixed by moving the send. call outside of the delay, so the kafka write is started and we just delaying the datafying of the eventual result, without the threading macro use so its more explicit. i.e.

(let [send-future (.send ^Producer producer ^ProducerRecord record)] 
     (delay (jd/datafy @send-future)))

I'll raise a PR tmrw,

Word count test fails after first run

This happens consistently for me with the word-count example:

  • Open word_count_test.clj
  • m-x cider-jack-in
  • m-x cider-load-buffer
  • m-x cider-test-run-ns-tests works (first time)
  • m-x cider-test-run-ns-tests fails - every time after the first
Error in test-word-count-example
Uncaught exception, not in assertion
   error: org.apache.kafka.streams.errors.ProcessorStateException: Error opening store KSTREAM-AGGREGATE-STATE-STORE-0000000004 at location /tmp/kafka-streams/word-count/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-0000000004

(Related: #114)

:key? in topic config

At the moment, the avro serde requires a key of :key? to be present in the topic config (see the code here). If the topic config doesn't include it, you get a NPE. AFAICT, this key determines whether the serde is for a key or value - so the topic config doesn't seem like the right place for it, as there can be an avro serde for both the key and the value.

Am I missing something here? Or would it be best to pass key? as a separate parameter to avro-serde?

Give `:delete-first?` option to topic-fixture

As a test-author,
I'd like to ensure a topic does not exist before re-creating the topic according to the fixture before my test runs
So that I can be sure the topic used is the one my test expects

One option for this is a :delete-first? option on the existing topic-fixture but there are a few things to consider in relation to giving the caller a chance to abort the fixture if the topics were not created to it's satisfaction within the expected time.

Avro serde requires a local schema even for Consumers

The avro serde resolver requires the specification of a local schema even though the schema is only actually used for producing messages. When consuming messages, we just let the KafkaAvroDeserializer lookup the schema-id in the registry so when "resolving" a topic-metadata definition we should not require the presence of a schema.

Strange names

Given that the test machine is just called test we end up with some interesting file names, like

  • src/jackdaw/test.clj
  • test/jackdaw/test_test.clj

I'm not sure what could be a better name tbh, but I think that the whole machinery under src/test is not really just useful to write tests, it's something that can help just setting up kafka topics, so it might be worth to rename it something else.

Document versions of Clojure supported

At the moment only Clojure 1.10 works, so we should either fix it or at least document clearly what versions of Clojure are currently supported, to avoid surprises.

key not used in jackdaw.client/produce!

produce! accepts key as an argument and does nothing with it.

(defn produce!
 "Helper wrapping `#'send!`.

 Builds and sends a `ProducerRecord` so you don't have to. Returns
 a future which will produce datafied record metadata when forced."
 ([producer topic value]
  (send! producer
         (jd/->ProducerRecord topic value)))
 ([producer topic key value]
  (send! producer
         (jd/->ProducerRecord topic value)))
 ([producer topic partition key value]
  (send! producer
         (jd/->ProducerRecord topic partition topic value)))
 ([producer topic partition timestamp key value]
  (send! producer
         (jd/->ProducerRecord topic partition timestamp topic value)))
 ([producer topic partition timestamp key value headers]
  (send! producer
         (jd/->ProducerRecord topic partition timestamp topic value headers))))

Unexpected semantics of seek-to-timestamp

I'm writing a utility which has to process messages within a time period and my implementation uses seek-to-timestamp. Documentation for seek-to-timestamp says the following:

After seeking, the first message read from each partition will be
the EARLIEST message whose timestamp is greater than or equal to the
timestamp sought.

This behavior is exactly what I need and what I'd expect. However, it is not what I actually get. Calling seek-to-timestamp with the current time as argument (e.g. Instant/now) positions consumer at the beginning instead of the end of partitions. Consequently, consumer reads all messages in a topic instead of none.

The following (existing) test case from client_test.clj demonstrates the problem:

   (testing "seek to ts-next=1000"
          (let [ts-next 1000]
            (as-> consumer $
              (client/assign-all $ (map :topic-name [topic-config]))
              (client/seek-to-timestamp $ ts-next [topic-config])
              (client/position-all $)
              (is (= [0]
                     (vals $))))))

Contrary to the documentation and expectation, the first message read would not be the one whose timestamp is greater than or equal to the timestamp sought, but the message with timestamp 1.

I suppose the implementation which would be more aligned with the documentation/expectation would handle nils returned by KafkaConsumer/offsetsForTimes and instead of returning position 0, return the last offset in partition. Unfortunately, trying to maintain the expectation in case of timestamps in the future is probably infeasible so that would not be covered.

I'd like to hear your thoughts on this.

Inconsistent assign-all api

All other client fn's that expect a topic expect topic metadata. This fn expects a topic name string. I thing it should extract the name from the metadata as other fn's do."

As assing-all might already be used be sure to add a deprecation warning.

Function topics-ready? seems broken

Function jackdaw.admin/topics-ready? seems broken (at least from 0.6.8 to the latest commit a68f3d7, probably even earlier). I think it uses return value of describe-topics* without datafying it and it throws exception.

`reset-application-fixture` hardcodes bootstrap.servers value

Attempting to use reset-application-fixture or, by extension, integration-fixture with a Kafka bootstrap server other than localhost:9092 results in a connection timeout. Examining the source, it looks like this value has been hardcoded as an argument to StreamsResetter.

Exception while trying to use avro serdes: "No implementation of method: :clj->avro of protocol:"

Hi! I'm trying to use various jackdaw serdes, but unfortunately I'm not managing to get avro serdes to work. The following exception is thrown:

Execution error (IllegalArgumentException) at jackdaw.serdes.avro/eval4495$fn$G (avro.clj:123).
No implementation of method: :clj->avro of protocol: #'jackdaw.serdes.avro/SchemaCoercion found for class: nil

I'm using clojure 1.10.0 and jackdaw 0.6.6. Kafka clusters/schema registry is managed by Confluent Cloud. Using Confluent Platform 5.2.1. Key/value schemas are already in registry. I'd really appreciate any help or suggestion! This code is mostly the same as examples/serdes. String and JSON serdes are working fine.

(ns trying-jackdaw.core
  (:require [clojure.algo.generic.functor :refer [fmap]]
            [clojure.java.shell :refer [sh]]
            [jackdaw.client :as jc]
            [jackdaw.client.log :as jcl]
            [jackdaw.admin :as ja]
            [jackdaw.serdes.edn :as jse]
            [jackdaw.streams :as j]
            [jackdaw.serdes]
            [jackdaw.serdes.json :as json]
            [jackdaw.serdes.avro.confluent :as jsac]
            [jackdaw.serdes.edn :as jse]
            [jackdaw.serdes.resolver :as resolver]))

;; of course not pasting my keys here
#_(defn basic-config
  []
  {"ssl.endpoint.identification.algorithm" "https"
   "sasl.mechanism"                        "PLAIN"
   "request.timeout.ms"                    (int 20000)
   "bootstrap.servers"                     "xxxxx.confluent.cloud:9092"
   "retry.backoff.ms"                      (int 500)
   "sasl.jaas.config"                      (str "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                                                ccloud-key "\" password=\"" ccloud-secret "\";")
   "security.protocol"                     "SASL_SSL"

   ; Schema Registry specific settings
   "basic.auth.credentials.source"         "USER_INFO"
   "schema.registry.basic.auth.user.info"  (str sr-key ":" sr-secret)
   "schema.registry.url"                   "https://xxxxxxx.confluent.cloud"

   ; Enable Avro serializer with Schema Registry
   "key.serializer"                        "io.confluent.kafka.serializers.KafkaAvroSerializer"
   "value.serializer"                      "io.confluent.kafka.serializers.KafkaAvroSerializer"
   })
(defn basic-config
  "Returns the application config."
  []
  {"application.id" "serdes"
   "bootstrap.servers" "localhost:9092"
   "cache.max.bytes.buffering" "0"})

(defn producer-config [] (basic-config))
(defn admin-config [] (basic-config))

;;; ------------------------------------------------------------
;;;
;;; Configure topics
;;;

(def +topic-metadata+
  {"example_avro"
                  {:topic-name  "example_avro"
                   :key-serde   {:serde-keyword   :jackdaw.serdes.avro.confluent/serde
                                 :schema-filename "key_schema.json"
                                 :key?            true}
                   :value-serde {:serde-keyword   :jackdaw.serdes.avro.confluent/serde
                                 :schema-filename "value_schema.json"
                                 :key?            false}}
   "example_json" {
                   :topic-name  "example_json"
                   :key-serde   (json/serde)
                   :value-serde (json/serde)
                   }
   "example_str"
                  {:topic-name  "example_str"
                   :key-serde   {:serde-keyword :jackdaw.serdes/string-serde}
                   :value-serde {:serde-keyword :jackdaw.serdes/string-serde}}})

#_(def serde-resolver
  (partial resolver/serde-resolver :schema-registry-url (get (basic-config) "schema.registry.url")))
(def serde-resolver
  (partial resolver/serde-resolver :schema-registry-url "http://localhost:8081"))

(def topic-metadata
  (memoize (fn []
             (fmap #(if (= (:topic-name %) "example_json")
                      %
                      (assoc % :key-serde ((serde-resolver) (:key-serde %))
                               :value-serde ((serde-resolver) (:value-serde %))))
                   +topic-metadata+))))

(defn publish
  "Takes a topic config and record value, and (optionally) a key and
  parition number, and produces to a Kafka topic."
  ([topic-config value]
   (with-open [client (jc/producer (producer-config) topic-config)]
     @(jc/produce! client topic-config value))
   nil)

  ([topic-config key value]
   (with-open [client (jc/producer (producer-config) topic-config)]
     @(jc/produce! client topic-config key value))
   nil)

  ([topic-config partition key value]
   (with-open [client (jc/producer (producer-config) topic-config)]
     @(jc/produce! client topic-config partition key value))
   nil))

(def example-key {"timestamp" 1562266979037
                  "name"      "example-name"})

(def example-value {"int_field"    1
                    "string_field" ""})

(defn -main
  [& args]
  (publish (get (topic-metadata) "example_str") "example-key" "example-value")
  (publish (get (topic-metadata) "example_json") example-key example-value)
  (publish (get (topic-metadata) "example_avro") example-key example-value))

resources/key_schema.json

{
  "type" : "record",
  "name" : "key_example",
  "namespace" : "avro",
  "fields" : [
    {
      "name" : "timestamp",
      "type" : "long"
    },
    {
      "name" : "name",
      "type" : "string"
    }
  ]
}

resources/value_schema.json

{
  "type" : "record",
  "name" : "value_example",
  "namespace" : "avro",
  "fields" : [
    {
      "name" : "int_field",
      "type" : "int"
    },
    {
      "name" : "string_field",
      "type" : "string"
    }
  ]
}

Update: Using Confluent Platform, since issue doesn't depend on Confluent Cloud.

Review use of Logging in Test Machine

It hides a lot of output in the logs (well, its not strictly hidden but its not "in your face" when there are errors). It also uses stack trace pretty formatting which may not work with the 1.10 errors - and also means control sequences appear in the logs making the errors harder to read.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.