Giter VIP home page Giter VIP logo

avro_turf's People

Contributors

akihiro17 avatar atsheehan avatar bacchir avatar bruno-b-martins avatar bublik avatar damncabbage avatar dasch avatar friendlyantz avatar h4nky avatar jestemkarol avatar jpslav avatar jturkel avatar julianalucena avatar kenforthewin avatar methodmissing avatar michaellennox avatar minukmarkchoi avatar nbibler avatar nicsnoektw avatar petergoldstein avatar piotaixr avatar rpiotaix-tc avatar stokarenko avatar theturtle32 avatar tjwp avatar toy avatar tzzzoz avatar unasuke avatar wallin avatar watson1978 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

avro_turf's Issues

Allow encoding data to an existing stream

In order to improve latency and memory usage, we could have a method that writes the encoded data directly to an IO stream rather than to an internal buffer.

Sinatra has support for streaming responses:

get '/' do
  stream do |out|
    # This is how we could integrate Avro with Sinatra's streaming support.
    avro data, stream: out, schema_name: "something"
  end
end

Schemas published to registry not available to Kafka Connect?

I am successfully able to publish data in to Kafka in the Avro format and able to read it out with a simple consumer. The problem I'm running in to is when I try to feed that data to Kafka Connect, it tries to look for {topic}-key and {topic}-value. Where do these come from, what data needs to be in them, and how do I create them when I'm publishing data to {topic} ?

Instead of raising exception it just sets field value to "no_default"

When the reader's schema has a field with no default value, and writer's schema does not have a field with the same name we are supposed to get an exception during deserialization. However instead i'm getting the value "no_default" assigned to the field.

:no_default is actually a Symbol, but is coerced to a string and so in the case the field is a string we do not get an error at all, we just get bad data. In the case the field is not a string (i.e an int) then an error does arise as Symbol cannot be coerced to int.

Unexpected tLabel

/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro_turf-0.8.0/lib/avro_turf/messaging.rb:34: syntax error, unexpected tLABEL
def initialize(registry: nil, registry_url: nil, schema_store: nil, schemas_path: nil, namespace: nil, logger: nil)

Hello, i'm fairly new to ruby and trying to write a plugin for logstash. I got this error. After searching around, i suspect that this is because logstash uses ruby 1.9.3 which does not support named arguments. Can anyone confirm that my suspicion is right? And what can i do to use avro_turf with ruby 1.9.3.
Thanks in advance.
Edit: I use avro_turf like this:
AvroTurf::Messaging.new(registry_url: registry_url, schemas_path: schemas_path)

Decoding data - Expected data to begin with a magic byte, got `"{"`

I'm encountering an issue that is resolving in the following error when I call decode. My setup is a followed:

# config/initializer/ccpa.rb
require 'avro_turf/messaging'
$avro = AvroTurf::Messaging.new(registry_url: AppConfig.instance.ccpa_schema, schemas_path: "schemas/")
$ccpa_kafka = Kafka.new([AppConfig.instance.ccpa_broker], logger: Rails.logger)


# Consumers
 $ccpa_kafka.each_message(topic: AppConfig.instance.ccpa_fan_lookup_request_topic) do |message|
  message = $avro.decode(message.value, schema_name: 'com.tm.privacy.wirefmt.FanLookupRequest')
  puts message
end

The output I get is:

New topics added to target list: star.p0000.lookup-request.v0.dev
Fetching cluster metadata from kafka://kf0003.cf.p000.dev3.foo.bar:9092
[topic_metadata] Opening connection to kf0003.cf.p000.dev3.foo.bar:9092 with client id ruby-kafka...
[topic_metadata] Sending topic_metadata API request 1 to kf0003.cf.p000.dev3.foo.bar:9092
[topic_metadata] Waiting for response 1 from kf0003.cf.p000.dev3.foo.bar:9092
[topic_metadata] Received response 1 from kf0003.cf.p000.dev3.foo.bar:9092
Discovered cluster metadata; nodes: kf0004.cf.p000.dev3.foo.bar:9092 (node_id=1005), kf0005.cf.p000.dev3.foo.bar:9092 (node_id=1003), kf0001.cf.p000.dev3.foo.bar:9092 (node_id=1007), kf0003.cf.p000.dev3.foo.bar:9092 (node_id=1001), kf0002.cf.p000.dev3.foo.bar:9092 (node_id=1006), kf0007.cf.p000.dev3.foo.bar:9092 (node_id=1008), kf0006.cf.p000.dev3.foo.bar:9092 (node_id=1002), kf0008.cf.p000.dev3.foo.bar:9092 (node_id=1009)
Closing socket to kf0003.cf.p000.dev3.foo.bar:9092
Resolving offset '-2' for star.p0000.lookup-request.v0.dev/0...
[list_offset] Opening connection to kf0003.cf.p000.dev3.foo.bar:9092 with client id ruby-kafka...

[list_offset] Sending list_offset API request 1 to kf0003.cf.p000.dev3.foo.bar:9092
[list_offset] Waiting for response 1 from kf0003.cf.p000.dev3.foo.bar:9092
[list_offset] Received response 1 from kf0003.cf.p000.dev3.foo.bar:9092
Offset for star.p0000.lookup-request.v0.dev/0 is 1261
[fetch] Sending fetch API request 2 to kf0003.cf.p000.dev3.foo.bar:9092
[fetch] Waiting for response 2 from kf0003.cf.p000.dev3.foo.bar:9092
[fetch] Received response 2 from kf0003.cf.p000.dev3.foo.bar:9092
I, [2019-09-13T11:06:47.160017 #79695]  INFO -- : Fetching schema with id 2299
{"fanLookupRequestId"=>"59501332-104c-4420-bb2b-94dd9eeb91ac", "brandId"=>"Special", "contactEmail"=>"[email protected]", "property"=>nil, "item"=>{"email"=>"[email protected]", "firstName"=>"John", "lastName"=>"Doe", "encryptionContext"=>nil}}

RuntimeError: Expected data to begin with a magic byte, got "{"
from /Users/home/Code/web/.bundle/ruby/2.3.0/gems/avro_turf-0.9.0/lib/avro_turf/messaging.rb:99:in 'decode'

My question is, why am I getting this error, and how does one go about resolving it.

Backwards Ruby versions compatibility

The following errors arise when I try to use avro_turf with some ─ not that old ─ ruby versions:

avro_turf-0.4.1/lib/avro_turf.rb:19: syntax error, unexpected ',' (SyntaxError)
  def initialize(schemas_path:, namespace: nil, codec: nil)
                               ^
/home/embs/.rvm/gems/ruby-2.0.0-p598@me/gems/avro_turf-0.4.1/lib/avro_turf.rb:19: Can't assign to nil
  def initialize(schemas_path:, namespace: nil, codec: nil)
                                               ^
/home/embs/.rvm/gems/ruby-2.0.0-p598@me/gems/avro_turf-0.4.1/lib/avro_turf.rb:31: syntax error, unexpected ','
  def encode(data, schema_name:, namespace: @namespace)
                                ^

For this specific example, I was using

$ ruby -v
ruby 2.0.0p598 (2014-11-13 revision 48408) [x86_64-linux]

I think that would be nice at least make explicit in some place the required ruby version. I don't know how that should be for a gem, though.

avro_turf fails to work with fixed

I cant get fixed to work. How do i write it? the original "type":"fixed" didnt work because the gem passed the type thru. saw another issue that showed to write it this way..but this still doesnt work.

  "fields": [
    {
      "name": "uuid",
      "type": { "type" : "fixed" , "size" : 16}
    },

I get the error #<Avro::SchemaParseError: Name is invalid for type fixed!

compatibility with Avro v1.9.0

FYI I tested the compatibility of avro_turf with the release candidate for Avro v1.9.0.

To work with the new version, the avro_turf/schema_to_avro_patch file needs to be excluded.

This can be done conditionally based on the new version of Avro. Unfortunately, Avro does not correctly report the gem's version (Avro::VERSION == "FIXME"). But we can check for something that was introduced in the new version (e.g. Avro::LogicalTypes) to indicate that it is v1.9.0 or later.

Exposing schema_store and registry of Messaging

Currently if I would like to preload schemas and register them in schema registry when using Messaging interface, I need to manually initialise SchemaStore and ConfluentSchemaRegistry (wrapped with CachedConfluentSchemaRegistry), preload/register schemas and pass both instances to Messaging initialiser. If there would be access to registry and schema_store they could be used without manually initialising them. Do you see any problem with creating readers for them?

Inter-schema references example throws exception

I've been getting Avro::SchemaParseError when using a schema similar to the person/addess/person_list example given for inter-schema references. Assuming I made a mistake I took the example from the readme and tried to load that schema and I still get the same error.

I used the following code for debugging:

    store = AvroTurf::SchemaStore.new(path: 'schemas/')
    store.find('address')
    store.find('person')
    store.find('person_list')

This results in:

   gems/avro-1.8.1/lib/avro/schema.rb:48:in `real_parse': Unknown type: {"type"=>"array", "items"=>"person"} (Avro::SchemaParseError)
	from gems/2.3.0/gems/avro_turf-0.7.2/lib/avro_turf/schema_store.rb:20:in `find'

I am using avro_turf 0.7.2 and avro 1.8.1

AvroTurf::Messaging question about schema files

Hey @dasch,

It seems an assumption of AvroTurf::Messaging that you will have Avro schema files on disk already, is that true? My assumption is that the purpose of a schema registry is to avoid dependency on schema files accessible to your client code but maybe I'm missing something. I'm not sure if the workflow that makes sense in my head will mesh w/ this library, ideally I'd like AvroTurf::Messaging.new(...) (opt-in via option possibly?) to load the schemas from the registry into memory, rather than registering + fetching as side-effect of calling encode/decode

Unable to use complex types ("map", "array", "enum", "fixed")

AvroTurf fails when trying to use complex types other than record ("map", "array", "enum", "fixed") in a schema. It appears to be due to AvroTurf attempting to look up a schema with the name of the type rather than falling through to a built-in type. Example below demonstrates this with the "map" type but it fails the same way with any of the other types listed above.

Example schema:

{
  "name": "Person",
  "type": "record",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "attributes",
      "type": "map",
      "values": "string"
    }
  ]
}

Ruby code:

avro = AvroTurf.new(schemas_path: 'schemas/')

data = {
  name: 'Joe',
  attribute: {
    "height": "tall"
  }
}

avro.encode(data, schema_name: 'Person')

Stacktrace:

Traceback (most recent call last):
	6: from test.rb:16:in `<main>'
	5: from vendor/ruby/2.5.0/gems/avro_turf-0.8.0/lib/avro_turf.rb:37:in `encode'
	4: from vendor/ruby/2.5.0/gems/avro_turf-0.8.0/lib/avro_turf.rb:51:in `encode_to_stream'
	3: from vendor/ruby/2.5.0/gems/avro_turf-0.8.0/lib/avro_turf/schema_store.rb:12:in `find'
	2: from vendor/ruby/2.5.0/gems/avro_turf-0.8.0/lib/avro_turf/schema_store.rb:31:in `rescue in find'
	1: from vendor/ruby/2.5.0/gems/avro_turf-0.8.0/lib/avro_turf/schema_store.rb:12:in `find'
vendor/ruby/2.5.0/gems/avro_turf-0.8.0/lib/avro_turf/schema_store.rb:40:in `rescue in find': could not find Avro schema at `schemas/map.avsc' (AvroTurf::SchemaNotFoundError)

No release since August 2018

There has not been a release of the gem since August 2018. It would be cool if we could get a new release of the gem so we can enjoy some of the awesome goodness in master.

Support a disk based cache of the schema registry

Greetings...

Summary

The CachedConfluentSchemaRegistry provides an in-memory cache of the registry. It would be nice if this was extended to support a disk based cache as well. This would allow the cache to be preloaded from disk upon an application restart.

Background

In our use-case, we publish messages via a Chef recipe and/or event handler. The trouble is that the recipe is reinitialized on every Chef client convergence (30 minutes). The net result is millions of POSTs to the upstream schema registry (e.g. 1000 chef nodes X 30 minutes/cycle => 2000 redundant POSTs/hour per schema forever).

Proposed solution

An extension of the CachedConfluentSchemaRegistry, something like this:

require 'json'
require 'avro_turf/cached_confluent_schema_registry'

class AvroTurf::WriteThruDiskCachedConfluentSchemaRegistry < AvroTurf::CachedConfluentSchemaRegistry

  def initialize(upstream, disk_path)
    super(upstream)

    # load the write-thru cache on startup if it exists
    @schemas_by_id_path = ::File.join(disk_path, 'schemas_by_id.json')
    @schemas_by_id = ::JSON.parse(::File.read(@schemas_by_id_path)) if ::File.exist?(@schemas_by_id_path)

    @ids_by_schema_path = ::File.join(disk_path, 'ids_by_schema.json')
    @ids_by_schema = ::JSON.parse(::File.read(@ids_by_schema_path)) if ::File.exist?(@ids_by_schema_path)
  end

  # override to include write-thru cache
  def fetch(id)
    # the write-thru cache (json) does not store keys in numeric format
    # so, convert id to a string for caching purposes only
    ids = id.to_s
    if @schemas_by_id[ids].nil?
      # when fetching from upstream, be sure to use the numeric id
      @schemas_by_id[ids] = @upstream.fetch(id)
      ::File.write(@schemas_by_id_path, ::JSON.pretty_generate(@schemas_by_id))
    end
    @schemas_by_id[ids]
  end

  # override to include write-thru cache
  def register(subject, schema)
    key = subject + schema.to_s
    if @ids_by_schema[key].nil?
      @ids_by_schema[key] = @upstream.register(subject, schema)
      ::File.write(@ids_by_schema_path, ::JSON.pretty_generate(@ids_by_schema))
    end
    @ids_by_schema[key]
  end
end

AvroTypeError while using SchemaRegistry

Hi! Thanks for your library and your time <3

I'm having some trouble using the Messaging API. Everything works fine using AvroTurf.new API, but when I move to AvroTurf::Messaging.new(registry_url: my_registry) the encode function raises Avro::IO::AvroTypeError. It still does update the registry with my local schema.

This is my schema:

{
  "type": "record",
  "name": "campaign",
  "namespace": "com.test",
  "fields": [
      {
          "name": "id",
          "type": "int"
      },
      {
          "name": "user_id",
          "type": "string"
      },
      {
          "name": "contract_id",
          "type": "string"
      },
      {
          "name": "publisher_id",
          "type": "string"
      },
      {
          "name": "name",
          "type": "string"
      },
      {
          "name": "goal",
          "type": "int"
      },
      {
          "name": "start_date",
          "type": "string"
      },
      {
          "name": "end_date",
          "type": "string"
      },
      {
          "name": "activated_at",
          "default": null,
          "type": [
              "null",
              "string"
          ]
      },
      {
          "name": "deactivated_at",
          "default": null,
          "type": [
              "null",
              "string"
          ]
      },
      {
          "name": "active",
          "type": "boolean"
      },
      {
          "name": "listings",
          "type": {
              "type": "array",
              "items": "int"
          }
      }
  ]
}

This is the actual hash i'm trying to serialize:

{
    :id=>4,
    :user_id=>"[email protected]",
    :contract_id=>"c34c728a-09e6-a593-0808-9b2343d80763",
    :publisher_id=>"be6f112b-807a-4c7b-dcad-7374ab6aef5e",
    :name=>"string",
    :goal=>50,
    :start_date=>"2019-12-01",
    :end_date=>"2019-12-30",
    :activated_at=>"teste",
    :deactivated_at=>"teste",
    :active=>false,
    :listings=>[
        1,
        2
    ]
}

The entire error message is:

Avro::IO::AvroTypeError: The datum {:id=>4, :user_id=>"[email protected]", :contract_id=>"c34c728a-09e6-a593-0808-9b2343d80763", :publisher_id=>"be6f112b-807a-4c7b-dcad-7374ab6aef5e", :name=>"string", :goal=>50, :start_date=>"2019-12-01", :end_date=>"2019-12-30", :activated_at=>"teste", :deactivated_at=>"teste", :active=>false, :listings=>[1, 2]} is not an example of schema {"type":"record","name":"campaign","namespace":"com.test","fields":[{"name":"id","type":"int"},{"name":"user_id","type":"string"},{"name":"contract_id","type":"string"},{"name":"publisher_id","type":"string"},{"name":"name","type":"string"},{"name":"goal","type":"int"},{"name":"start_date","type":"string"},{"name":"end_date","type":"string"},{"name":"activated_at","type":["null","string"],"default":null},{"name":"deactivated_at","type":["null","string"],"default":null},{"name":"active","type":"boolean"},{"name":"listings","type":{"type":"array","items":"int"}}]}

Thank you again in advance.

Excon::Error::Socket: end of file reached (EOFError)

Hi

I'm trying to use your 'avro_turf/messaging' with schema registry, but already at very simply examples I'm getting the error message: Excon::Error::Socket: end of file reached (EOFError).

In the internet I found, that it could be a problem of Excon. Either the :idempotent => true or timeout problem. Could you help me, how to tune, correct this problem?

Here is my example of code:

require 'avro_turf/messaging'

avro = AvroTurf::Messaging.new(registry_url: "https://schema-registry.mycompany.com", schemas_path: "/path_to_my/avroschema")
avro_data = avro.encode({ "full_name" => "Jane", "age" => 28 }, schema_name: "person")
Excon::Error::Socket: end of file reached (EOFError)
from <internal:prelude>:76:in `__read_nonblock'
Caused by EOFError: end of file reached
from <internal:prelude>:76:in `__read_nonblock'

person.avsc
{
"name": "person",
"type": "record",
"fields": [
{
"name": "full_name",
"type": "string"
},
{
"name": "age",
"type": "long"
}
]
}

Just for information, I could use the schema registry with a confluent API without any problem, so I suppose, that there is no problem with the registry server, or connection.

thanks for help

encoding error

When I switched from using the avro file format to the messaging format I now get an encoding problem I did not see before:

/usr/local/bundle/gems/avro-1.8.1/lib/avro/io.rb:206:in write': incompatible character encodings: US-ASCII and UTF-8 (Encoding::CompatibilityError) 19:01:44 from /usr/local/bundle/gems/avro-1.8.1/lib/avro/io.rb:206:inwrite_bytes'
19:01:44 from /usr/local/bundle/gems/avro-1.8.1/lib/avro/io.rb:213:in `write_string'

This is happening the UTF-* symbol for degree (\xC2\xB0).

need local schema files for reading?

Hello!
Trying out avro turf and got surprised that when reading using messaging with schema registry, it expects the schemas to be on disc, although it also fetches them from the registry. What is the thought behind that? It would be nice if the clients did not have to care about storing files for schemas since they are in the registry.

disclaimer: i could be misunderstanding something fullly, first time using avro with schema registry.

Better validation error from Messaging#encode

Currently encoding data that doesn't conform to the schema using Messaging#encode, results in exception with not very useful message like Avro::IO::AvroTypeError: The datum nil is not an example of schema "string" which doesn't refer to the field that was not filled. Avro gem includes a class Avro::SchemaValidator which can be used to get meaningful error. It is already used indirectly in DatumWriter through Schema#validate which converts presence/absence of exception to boolean.

As a very basic way to implement validation in Messaging#encode is to add

Avro::SchemaValidator.validate!(schema, message, recursive: true, encoded: false, fail_on_extra_fields: true)

after resolving schema and before encoding the data. message is also required to have all keys deep stringified (which is not required for encoding).

What do you think about adding validation to messaging interface?

question: does encoding/decoding of a avro message occur....

in the schema registry or outside of the SR? the SR is JUST the crud storage of schemas?

so..if another client (say python or java) uses the SR and an event stored in kafka, they have to use another library to decode the message that i have. encoding using avro_turf? no decoding happens in the SR?

Schema registration always returns HTTP 500 from Confluent Schema Registry

Using Confluent Schema Registry Version 2.0.1

Attempting to register a new schema with the registry always, without fail, results in a 500 Internal Server Error and the following output in the Schema Registry logs:

[2016-04-30 23:01:35,007] ERROR Unhandled exception resulting in internal server error response (io.confluent.rest.exceptions.GenericExceptionMapper:37)
com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_OBJECT token
 at [Source: org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream@4923d986; line: 1, column: 2] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest["schema"])
    at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
    at com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:835)
    at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:59)
    at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:12)
    at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:523)
    at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:95)
    at com.fasterxml.jackson.databind.deser.impl.BeanPropertyMap.findDeserializeAndSet(BeanPropertyMap.java:285)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:248)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:136)
    at com.fasterxml.jackson.databind.ObjectReader._bind(ObjectReader.java:1410)
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:860)
    at com.fasterxml.jackson.jaxrs.base.ProviderBase.readFrom(ProviderBase.java:810)
    at io.confluent.rest.validation.JacksonMessageBodyProvider.readFrom(JacksonMessageBodyProvider.java:65)
    at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$TerminalReaderInterceptor.invokeReadFrom(ReaderInterceptorExecutor.java:260)
    at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$TerminalReaderInterceptor.aroundReadFrom(ReaderInterceptorExecutor.java:236)
    at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor.proceed(ReaderInterceptorExecutor.java:156)
    at org.glassfish.jersey.server.internal.MappableExceptionWrapperInterceptor.aroundReadFrom(MappableExceptionWrapperInterceptor.java:74)
    at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor.proceed(ReaderInterceptorExecutor.java:156)
    at org.glassfish.jersey.message.internal.MessageBodyFactory.readFrom(MessageBodyFactory.java:1085)
    at org.glassfish.jersey.message.internal.InboundMessageContext.readEntity(InboundMessageContext.java:853)
    at org.glassfish.jersey.server.ContainerRequest.readEntity(ContainerRequest.java:270)
    at org.glassfish.jersey.server.internal.inject.EntityParamValueFactoryProvider$EntityValueFactory.provide(EntityParamValueFactoryProvider.java:96)
    at org.glassfish.jersey.server.spi.internal.ParameterValueHelper.getParameterValues(ParameterValueHelper.java:81)
    at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$AbstractMethodParamInvoker.getParamValues(JavaResourceMethodDispatcherProvider.java:127)
    at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:143)
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102)
    at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:308)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:297)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:267)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:291)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1140)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:403)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:386)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:334)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:221)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:808)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
    at org.eclipse.jetty.server.Server.handle(Server.java:499)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
    at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
    at java.lang.Thread.run(Thread.java:745)
[2016-04-30 23:01:35,011] INFO 172.17.0.1 - - [30/Apr/2016:23:01:35 +0000] "POST /subjects/test_schema/versions HTTP/1.1" 500 52  8 (io.confluent.rest-utils.requests:77)

The reason is that the schema registry expects the actual schema to be provided as double-stringified JSON.

Correct post body:

{
  "schema": "{\"type\": \"string\"}"
}

Incorrect post body (what AvroTurf currently sends):

{
  "schema": {
    "type": {
      "type_sym":"string"
    }
  }
}

See Confluent's curl examples in their quickstart guide.

lazy schema registration is not thread safe

We are using avro turf on Jruby 9.2.3.0.
I understand that Avro Turf registers schemas in-memory lazily i.e. if a schema name is not present; it'll register it. However, when multiple threads are trying to use the same schema, there will be concurrent schema registration attempts which throw the following exception:

<Avro::SchemaParseError: The name "precomputation_record" is already in use.>}}</b> Per task Stacktrace: {
"aw_conv"=>{:stacktrace=>"[\"/home/deploy/.rvm/gems/jruby-9.2.3.0/gems/avro-1.8.2/lib/avro/schema.rb:410:in `add_name'\", 
\"/home/deploy/.rvm/gems/jruby-9.2.3.0/gems/avro-1.8.2/lib/avro/schema.rb:187:in `initialize'\", 
\"/home/deploy/.rvm/gems/jruby-9.2.3.0/gems/avro-1.8.2/lib/avro/schema.rb:235:in `initialize'\", 
\"/home/deploy/.rvm/gems/jruby-9.2.3.0/gems/avro-1.8.2/lib/avro/schema.rb:67:in `real_parse'\", 
\"/home/deploy/.rvm/gems/jruby-9.2.3.0/gems/avro_turf-0.8.1/lib/avro_turf/schema_store.rb:21:in `find'\",
 \"/home/deploy/.rvm/gems/jruby-9.2.3.0/gems/avro_turf-0.8.1/lib/avro_turf.rb:53:in `encode_to_stream'\",
 \"/home/deploy/.rvm/gems/jruby-9.2.3.0/gems/avro_turf-0.8.1/lib/avro_turf.rb:39:in `encode'\",

Using schemas not in a file or registry

What if I don't want to hard-code a schema in a file path and I don't want to set up the registry stuff? Is there any way to just define the schema in a variable and use that?

Logical data types?

Was looking for an example for using a logical data type? In particular, I'm looking for how to pass a DateTime field via Avro?

talking to Schema Registry needs local storage

hi @dasch ,

I don't understand why do we need to maintain local copies of the schema when we can fetch them from the schema registry. Isn't the whole point of the schema registry is to maintain the latest copy of schema across all applications trying to produce and consume?

Encoding or decoding with avro_turf requires the schema files to be present at ./schemas directory (DEFAULT_SCHEMAS_PATH = "./schemas").
how can I just leverage the schemas present in the schema registry without storing them locally?

I also encounter this error when I try to run

require 'avro_turf/messaging'
require 'ruby-kafka'
require 'pry'

kafka = Kafka.new("kafka-0.x.net:32400,kafka-1.x.net:32401,kafka-2.x.net:32402".split(","))
avro = AvroTurf::Messaging.new(registry_url: "https://schema-registry.x.net/")

producer = kafka.producer

data = avro.encode({"org" => "foobar", "job_id" => "123abc"}, schema_name: "Source")
producer.produce(data, topic: "samples")
producer.deliver_messages
Traceback (most recent call last):
	2: from play.rb:14:in `<main>'
	1: from /Users/shashiravula/.rvm/gems/ruby-2.5.0/gems/avro_turf-0.8.0/lib/avro_turf/messaging.rb:52:in `encode'
/Users/shashiravula/.rvm/gems/ruby-2.5.0/gems/avro_turf-0.8.0/lib/avro_turf/schema_store.rb:23:in `find': expected schema `./schemas/Source.avsc' to define type `Source' (AvroTurf::SchemaError)

but then again if I change the schema name to namespace.schemaname. I can register the schema and decode the message using the same.

 pry(main)> sample = avro.encode(obj, schema_name: "betterdoctor.Source")
I, [2018-05-01T15:33:05.213874 #21628]  INFO -- : Registered schema for subject `betterdoctor.Source`; id = 2 

But outside of pry if use it in the example code above

data = avro.encode({"org" => "foobar", "job_id" => "123abc"}, schema_name: "betterdoctor.Source")
could not find Avro schema at `./schemas/betterdoctor/Source.avsc' (AvroTurf::SchemaNotFoundError)

I would really appreciate if you can help me tell where I am going wrong.

Best,
shashi

No way to specify URI path to schema registry

Not all schema registries are rooted at /. Clients based on confluent libraries accept a URI with a path to the schema registry. AvroTurf uses Excon for http handling, which unfortunately strips out any path passed in with the URI. This makes avro turf not fully compatible with all registry installations.

Creating consumer utilizing Schema Registry.

I am trying to create a consumer that utilizes Schema Registry. I am trying to use avro_turf to top of Racecar gem.

I am able to get object from this:

avro = AvroTurf::Messaging.new(registry_url: "http://my-registry:8081/")

But I am confused what I should do next. I am just creating consumer, I don't need a producer. And I suppose this is what I should be doing:

avro.decode(data)
result = avro.decode_message(data)
result.message   

But where would I get that data from? I saw that I could do this:

data = avro.encode({ "title" => "hello, world" }, schema_id: 2)

And found out, I can leave the first hash empty:

data = avro.encode({ }, schema_id: 2)

Is keeping it empty the right way to do it?

ConfluentSchemaRegistry does support plain user/password authentication

Using the ConfluentSchemaRegistry with Confluent Cloud may require using a very hacky workaround for user/password auth.

  • There is currently no kwargs that get forwarded to excon for user & password
  • Excon supports https://user:password@host but if the password has special characters URI parse will fail
  • The dirty workaround requires something like this:
class CustomUriParser
  def initialize(host:, user:, password:)
    @host = host
    @user = user
    @password = password
  end

  def parse(uri)
    OpenStruct.new(
      host: @host,
      hostname: @host,
      path: '/',
      port: nil,
      query: nil,
      scheme: 'https',
      user: @user,
      password: @password,
    )
  end
end

Excon.defaults[:uri_parser] = CustomUriParser.new(
  host: 'somehost.foo.aws.confluent.cloud',
  user: 'token',
  password: 'secret_key_with_some_special_chars',
)

avro = AvroTurf::Messaging.new(registry_url: 'does not matter')

Ideally we could just do:

AvroTurf::Messaging.new(
  registry_url: 'somehost.foo.aws.confluent.cloud',
  user: 'token',
  password: 'secret_key'
)

Happy to create a PR to address this.

falsey defaults always stripped when submitting to registry

This is a critical issue. All falsey default values in a schema definition are stripped when sending the schema to the registry.

These field definitions...

{"name": "is_usable", "type": "boolean", "default": false},
{"name": "address_2", "type": ["null","string"], "default": null}

...are submitted to the registry as...

{"name": "is_usable", "type": "boolean"}
{"name": "address_2", "type": ["null","string"]}

This is caused by a bug in Apache's Avro library for Ruby. I've filed AVRO-1848 and submitted this pull request to fix it.

Obviously, this introduces unexpected behavior in downstream consumers. Additionally, the registry rejects certain changes as being incompatible only because of the missing default.

I'll submit a monkey-patch PR for avro_turf shortly to stop the bleeding in the interim.

magic_byte and schema_id

Hi,

Thanks guys for your lib. I'm using it with a Schema Registry but i got a problem.

I was using Avro without avro_turf for encoding data.
I used this code to encode my data with a specific Schema:

      buffer = StringIO.new
      encoder = Avro::IO::BinaryEncoder.new(buffer)
      datum_writer = Avro::IO::DatumWriter.new(SCHEMA)
      datum_writer.write(data, encoder)

I had a producer and multiple consumers. Next when i started to use avro_turf, the encoding and producing was ok but my consumers could not decode the avro data.

I check your code for encoding the data and i saw :

      # Always start with the magic byte.
      encoder.write(MAGIC_BYTE)

      # The schema id is encoded as a 4-byte big-endian integer.
      encoder.write([schema_id].pack("N"))

      # The actual message comes last.
      writer.write(message, encoder)

I removed the first 5 bytes of my Buffer (magic byte + shema_id) and every things goes right.

My question is why are you adding this 5 bytes in the buffer ? Is it a way provided by Apache or Confluent ?

Thanks in advance !

Getting Schemas into registry

So I'm using it schemas - working great. I'm thinking of using the registry - but am confused about a few things. How do I get the schema into the registry in the first place? Can I do that with this gem or is that left to do in a different way?

"Nil is not a schema we know about" when encoding

I'm trying a similar example to the one in the README, but I get the following errors:

Loading development environment (Rails 4.2.7.1)
[1] pry(main)> require "avro_turf"
=> true

[2] pry(main)> avro = AvroTurf.new(schemas_path: "#{Rails.root}/app/schemas/")
=> #<AvroTurf:0x000000138eb508 @codec=nil, @namespace=nil, @schema_store=#<AvroTurf::SchemaStore:0x000000138eb490 @path="/vagrant/app/schemas/", @schemas={}>>

[3] pry(main)> avro.encode({ "name" => "jaime" }, schema_name: "order")
Avro::UnknownSchemaError: nil is not a schema we know about.
from /home/vagrant/.rvm/gems/ruby-2.2.6/gems/avro-1.8.1/lib/avro/schema.rb:89:in `real_parse'

[4] pry(main)> avro.encode({ "name" => "jaime" }, schema_name: "order")
NoMethodError: undefined method `map' for nil:NilClass
from /home/vagrant/.rvm/gems/ruby-2.2.6/gems/avro-1.8.1/lib/avro/schema.rb:247:in `to_avro'

I have a single file named app/schemas/order.avsc with this content:

{
  "name": "order",
  "type": "record",
  "fields": [
    {
      "name": { "type": "string" }
    }
  ]
}

Using Ruby 2.2.6 and Rails 4.2.7.1

What am I doing wrong? Thanks!

Pack schema metadata in #decode response.

While using reader and performing avro.decode(data) in cases where reader is responsible for decoding multiple schemas, it would be very useful, that object returned is container packing schema meta data in addition to decoded payload.

I would be happy to offer a backwards compatible PR with this feature in case it's wanted.

docker-compose setup

Do you have an example on how to set this up with docker and docker-compose

I am doing this

zookeeper:
  image: 'bitnami/zookeeper:latest'
  ports:
    - '2181:2181'
  environment:
    - ALLOW_ANONYMOUS_LOGIN=yes
  networks:
    - my-app-tier
kafka:
  image: 'bitnami/kafka:latest'
  ports:
    - '9092:9092'
  environment:
    - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
    - ALLOW_PLAINTEXT_LISTENER=yes
  volumes:
    - kafka-persistence:/bitnami/kafka
  networks:
    - my-app-tier
schema-registry:
  image: 'confluent/schema-registry'
  ports:
    - '8081:8081'
  environment:
    - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
    - SCHEMA_REGISTRY_HOST_NAME=localhost
    - SCHEMA_REGISTRY_DEBUG=true
  networks:
    - my-app-tier
kafka-rest-proxy:
  image: 'confluent/rest-proxy'
  ports:
    - '8082:8082'
  environment:
    - KAFKA_REST_ZOOKEEPER_CONNECT=zookeeper:2181
    - KAFKA_REST_HOST_NAME=localhost
  networks:
    - my-app-tier

But it does not seem to work?

avro = AvroTurf::Messaging.new(registry_url: "schema-registry:8081/", schemas_path: Rails.root.join('app', 'schemas/'))

[36] pry(main)> data = avro.encode({ "name" => "hello, world", data: "test"}, schema_name: "integration")
NoMethodError: undefined method `+' for nil:NilClass
from /gems/gems/excon-0.62.0/lib/excon/connection.rb:239:in `request'

using local storage vs schema registry

Hi there, i did see #66. I'm asking if someone can offer even more explanation and reasoning for this.

Why do i need local storage for schemas when they're already being stored in the schema registry?

We have a rails API app using kafka & need schema validation. We had expected that schemas would be stored in the schema registry, and dynamically fetched and cached from this when encoding/decoding messages (btw. does schema validation happen thru encoding/decoding? or do we need to write code to validate an incoming message against a particular schema)?

We were surprised to see that avro_turf (and others) require the schemas to be stored locally w/ the code. Would love to see more explanation of this (why local) and how the schema validation & versioning works exactly.

Is this a design that we just have to do? we have to store our schemas locally. What's the point of a remote schema registry (e.g., in our case where we have all messages going thru a API server).

Inconsistency between AvroTurf::Messaging and default encoding

I'm using avro_turf for binary encoding for Kafka and use configuration to switch between in-message and registry schemas (in-message for simpler local testing, registry for production). The transformation from Ruby objects to Avro types for these two cases is inconsistent.

For in-message encoding (AvroTurf class), I can do:

avro.encode(serializable_hash, schema_name: schema_name)

This serializes Ruby objects in the hash (e.g. Time objects to ISO8601 strings) and creates the binary encoding.

For registry-based encoding (AvroTurf::Messaging class), I need to call as_json to serialize the Ruby objects first otherwise I get encoding errors. That is:

avro.encode(serializable_hash.as_json, schema_name: schema_name)

I'll aim to find some time in the near future to do a PR, but describing the issue here to make sure it isn't lost. Please comment if the current behaviour is intentional and shouldn't be changed.

Testing / Initialize

Hi,

I'm actually trying to build specs to test your lib but i'm stuck right now.

My first question is where do you initialise the AvroTurf::Messaging ? I do it in the initializer, i'm using it in producers and consumers.

But it block me for testing it, when i initialize it in the initializee directory, i can't mock the url before it have been load. That's why i'm not sure i should load it in the initialize directory.

Thanks in advance

Cache warming

Hi there,

We're using avro turf along with the confluent schema registry and we're looking to provide some resilience to schema registry dropouts. Given enough Rails instances and enough (potentially rarely used) schemas the probability that a given registry dropout will occur on a cold cache is quite high.

We can warm the cache in a bit of a bodge by encoding a sample message for each schema on startup, but that requires us to maintain a set of sample messages and is a bit grotty.

Would you be up for a PR which adds a method to Messaging which e.g. registers all schemas in the schema store? Or something similar?

Minor syntax issue in README

Specification of array type may be wrong:

// person_list.avsc
{
  "name": "person_list",
  "type": "array",
  "items": "person"
}

Correct syntax instead should be:

// person_list.avsc
{
  "name": "person_list",
  "type": {
    "type": "array",
    "items": "person"
  }
}

excon gem vulnerability

#112

In RubyGem excon before 0.71.0, there was a race condition around persistent connections, where a connection which is interrupted (such as by a timeout) would leave data on the socket. Subsequent requests would then read this data, returning content from the previous response. The race condition window appears to be short, and it would be difficult to purposefully exploit this.

Using explicit key for message

I succesfully produce and consume avro messages using avro_turf and phobos. Something like this:

    data = avro.encode(
      contents,
      schema_name: 'myschema',
      namespace: 'mynamespace',
      subject: 'mynamespace.myschema-value'
    )
    producer.publish('mynamespace.myschema', data)

However, I wanted to use a key when publishing producer.publish('mynamespace.myschema', data, mykey), and suddenly everything failed to work. Unknown magic byte for instance. And the Kafka topic UI did not indentify the type as Avro anymore.

What is the correct way to do this? Do I need to encode the key somehow too?

Allow configure connection options to SchemaRegistry

Hi,
it would be nice to have an ability to configure connection options to Schema Registry like connect_timeout, retry_limit etc.

Default timeout in Excon is 60 seconds. So if I register all schemas during app initialization step and for some reason schema is unavailable - it will fail only after 60 seconds, which is unacceptable in my case.

Currently I solve this issue with a little wrapper around AvroTurf::ConfluentSchemaRegistry that accepts Excon connection options:

class AvroSchemaRegistry < AvroTurf::ConfluentSchemaRegistry
  def initialize(url, logger: Logger.new($stdout), **connect_options)
    super(url, logger: logger)
    @connection.params.merge!(connect_options)
  end
end

Expected schema `path' to defined type `name'

I just tried using avro_turf to read in a schema file to process some JSON. I got the above error.

The problem is that the schema includes a namespace field, so when the check for schema.fullname != fullname occurrs, it fails, because schema.fullname is equal to 'avro.tree.schema.red.tree'.

Your find function takes a namespace parameter, but it's unused by the load_schemas! function, so it never actually gets passed.

Filename: tree.avsc
snippet:
{
"name": "tree",
"type": "record",
"namespace": "avro.tree.schema.red"
...
}

Am I using it wrong? Thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.