dasch / avro_turf Goto Github PK
View Code? Open in Web Editor NEWA library that makes it easier to use the Avro serialization format from Ruby.
License: MIT License
A library that makes it easier to use the Avro serialization format from Ruby.
License: MIT License
In order to improve latency and memory usage, we could have a method that writes the encoded data directly to an IO stream rather than to an internal buffer.
Sinatra has support for streaming responses:
get '/' do
stream do |out|
# This is how we could integrate Avro with Sinatra's streaming support.
avro data, stream: out, schema_name: "something"
end
end
Maybe use Active Support Notifications when available.
I am successfully able to publish data in to Kafka in the Avro format and able to read it out with a simple consumer. The problem I'm running in to is when I try to feed that data to Kafka Connect, it tries to look for {topic}-key and {topic}-value. Where do these come from, what data needs to be in them, and how do I create them when I'm publishing data to {topic} ?
When the reader's schema has a field with no default value, and writer's schema does not have a field with the same name we are supposed to get an exception during deserialization. However instead i'm getting the value "no_default" assigned to the field.
:no_default
is actually a Symbol, but is coerced to a string and so in the case the field is a string we do not get an error at all, we just get bad data. In the case the field is not a string (i.e an int) then an error does arise as Symbol cannot be coerced to int.
/usr/share/logstash/vendor/bundle/jruby/1.9/gems/avro_turf-0.8.0/lib/avro_turf/messaging.rb:34: syntax error, unexpected tLABEL
def initialize(registry: nil, registry_url: nil, schema_store: nil, schemas_path: nil, namespace: nil, logger: nil)
Hello, i'm fairly new to ruby and trying to write a plugin for logstash. I got this error. After searching around, i suspect that this is because logstash uses ruby 1.9.3 which does not support named arguments. Can anyone confirm that my suspicion is right? And what can i do to use avro_turf with ruby 1.9.3.
Thanks in advance.
Edit: I use avro_turf like this:
AvroTurf::Messaging.new(registry_url: registry_url, schemas_path: schemas_path)
I'm encountering an issue that is resolving in the following error when I call decode
. My setup is a followed:
# config/initializer/ccpa.rb
require 'avro_turf/messaging'
$avro = AvroTurf::Messaging.new(registry_url: AppConfig.instance.ccpa_schema, schemas_path: "schemas/")
$ccpa_kafka = Kafka.new([AppConfig.instance.ccpa_broker], logger: Rails.logger)
# Consumers
$ccpa_kafka.each_message(topic: AppConfig.instance.ccpa_fan_lookup_request_topic) do |message|
message = $avro.decode(message.value, schema_name: 'com.tm.privacy.wirefmt.FanLookupRequest')
puts message
end
The output I get is:
New topics added to target list: star.p0000.lookup-request.v0.dev
Fetching cluster metadata from kafka://kf0003.cf.p000.dev3.foo.bar:9092
[topic_metadata] Opening connection to kf0003.cf.p000.dev3.foo.bar:9092 with client id ruby-kafka...
[topic_metadata] Sending topic_metadata API request 1 to kf0003.cf.p000.dev3.foo.bar:9092
[topic_metadata] Waiting for response 1 from kf0003.cf.p000.dev3.foo.bar:9092
[topic_metadata] Received response 1 from kf0003.cf.p000.dev3.foo.bar:9092
Discovered cluster metadata; nodes: kf0004.cf.p000.dev3.foo.bar:9092 (node_id=1005), kf0005.cf.p000.dev3.foo.bar:9092 (node_id=1003), kf0001.cf.p000.dev3.foo.bar:9092 (node_id=1007), kf0003.cf.p000.dev3.foo.bar:9092 (node_id=1001), kf0002.cf.p000.dev3.foo.bar:9092 (node_id=1006), kf0007.cf.p000.dev3.foo.bar:9092 (node_id=1008), kf0006.cf.p000.dev3.foo.bar:9092 (node_id=1002), kf0008.cf.p000.dev3.foo.bar:9092 (node_id=1009)
Closing socket to kf0003.cf.p000.dev3.foo.bar:9092
Resolving offset '-2' for star.p0000.lookup-request.v0.dev/0...
[list_offset] Opening connection to kf0003.cf.p000.dev3.foo.bar:9092 with client id ruby-kafka...
[list_offset] Sending list_offset API request 1 to kf0003.cf.p000.dev3.foo.bar:9092
[list_offset] Waiting for response 1 from kf0003.cf.p000.dev3.foo.bar:9092
[list_offset] Received response 1 from kf0003.cf.p000.dev3.foo.bar:9092
Offset for star.p0000.lookup-request.v0.dev/0 is 1261
[fetch] Sending fetch API request 2 to kf0003.cf.p000.dev3.foo.bar:9092
[fetch] Waiting for response 2 from kf0003.cf.p000.dev3.foo.bar:9092
[fetch] Received response 2 from kf0003.cf.p000.dev3.foo.bar:9092
I, [2019-09-13T11:06:47.160017 #79695] INFO -- : Fetching schema with id 2299
{"fanLookupRequestId"=>"59501332-104c-4420-bb2b-94dd9eeb91ac", "brandId"=>"Special", "contactEmail"=>"[email protected]", "property"=>nil, "item"=>{"email"=>"[email protected]", "firstName"=>"John", "lastName"=>"Doe", "encryptionContext"=>nil}}
RuntimeError: Expected data to begin with a magic byte, got "{"
from /Users/home/Code/web/.bundle/ruby/2.3.0/gems/avro_turf-0.9.0/lib/avro_turf/messaging.rb:99:in 'decode'
My question is, why am I getting this error, and how does one go about resolving it.
The following errors arise when I try to use avro_turf with some ─ not that old ─ ruby versions:
avro_turf-0.4.1/lib/avro_turf.rb:19: syntax error, unexpected ',' (SyntaxError)
def initialize(schemas_path:, namespace: nil, codec: nil)
^
/home/embs/.rvm/gems/ruby-2.0.0-p598@me/gems/avro_turf-0.4.1/lib/avro_turf.rb:19: Can't assign to nil
def initialize(schemas_path:, namespace: nil, codec: nil)
^
/home/embs/.rvm/gems/ruby-2.0.0-p598@me/gems/avro_turf-0.4.1/lib/avro_turf.rb:31: syntax error, unexpected ','
def encode(data, schema_name:, namespace: @namespace)
^
For this specific example, I was using
$ ruby -v
ruby 2.0.0p598 (2014-11-13 revision 48408) [x86_64-linux]
I think that would be nice at least make explicit in some place the required ruby version. I don't know how that should be for a gem, though.
I cant get fixed to work. How do i write it? the original "type":"fixed" didnt work because the gem passed the type thru. saw another issue that showed to write it this way..but this still doesnt work.
"fields": [
{
"name": "uuid",
"type": { "type" : "fixed" , "size" : 16}
},
I get the error #<Avro::SchemaParseError: Name is invalid for type fixed!
FYI I tested the compatibility of avro_turf with the release candidate for Avro v1.9.0.
To work with the new version, the avro_turf/schema_to_avro_patch
file needs to be excluded.
This can be done conditionally based on the new version of Avro. Unfortunately, Avro does not correctly report the gem's version (Avro::VERSION == "FIXME"
). But we can check for something that was introduced in the new version (e.g. Avro::LogicalTypes
) to indicate that it is v1.9.0 or later.
Currently if I would like to preload schemas and register them in schema registry when using Messaging
interface, I need to manually initialise SchemaStore
and ConfluentSchemaRegistry
(wrapped with CachedConfluentSchemaRegistry
), preload/register schemas and pass both instances to Messaging
initialiser. If there would be access to registry
and schema_store
they could be used without manually initialising them. Do you see any problem with creating readers for them?
I've been getting Avro::SchemaParseError
when using a schema similar to the person/addess/person_list example given for inter-schema references. Assuming I made a mistake I took the example from the readme and tried to load that schema and I still get the same error.
I used the following code for debugging:
store = AvroTurf::SchemaStore.new(path: 'schemas/')
store.find('address')
store.find('person')
store.find('person_list')
This results in:
gems/avro-1.8.1/lib/avro/schema.rb:48:in `real_parse': Unknown type: {"type"=>"array", "items"=>"person"} (Avro::SchemaParseError)
from gems/2.3.0/gems/avro_turf-0.7.2/lib/avro_turf/schema_store.rb:20:in `find'
I am using avro_turf 0.7.2 and avro 1.8.1
Hey @dasch,
It seems an assumption of AvroTurf::Messaging
that you will have Avro schema files on disk already, is that true? My assumption is that the purpose of a schema registry is to avoid dependency on schema files accessible to your client code but maybe I'm missing something. I'm not sure if the workflow that makes sense in my head will mesh w/ this library, ideally I'd like AvroTurf::Messaging.new(...)
(opt-in via option possibly?) to load the schemas from the registry into memory, rather than registering + fetching as side-effect of calling encode/decode
AvroTurf fails when trying to use complex types other than record ("map", "array", "enum", "fixed") in a schema. It appears to be due to AvroTurf attempting to look up a schema with the name of the type rather than falling through to a built-in type. Example below demonstrates this with the "map" type but it fails the same way with any of the other types listed above.
Example schema:
{
"name": "Person",
"type": "record",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "attributes",
"type": "map",
"values": "string"
}
]
}
Ruby code:
avro = AvroTurf.new(schemas_path: 'schemas/')
data = {
name: 'Joe',
attribute: {
"height": "tall"
}
}
avro.encode(data, schema_name: 'Person')
Stacktrace:
Traceback (most recent call last):
6: from test.rb:16:in `<main>'
5: from vendor/ruby/2.5.0/gems/avro_turf-0.8.0/lib/avro_turf.rb:37:in `encode'
4: from vendor/ruby/2.5.0/gems/avro_turf-0.8.0/lib/avro_turf.rb:51:in `encode_to_stream'
3: from vendor/ruby/2.5.0/gems/avro_turf-0.8.0/lib/avro_turf/schema_store.rb:12:in `find'
2: from vendor/ruby/2.5.0/gems/avro_turf-0.8.0/lib/avro_turf/schema_store.rb:31:in `rescue in find'
1: from vendor/ruby/2.5.0/gems/avro_turf-0.8.0/lib/avro_turf/schema_store.rb:12:in `find'
vendor/ruby/2.5.0/gems/avro_turf-0.8.0/lib/avro_turf/schema_store.rb:40:in `rescue in find': could not find Avro schema at `schemas/map.avsc' (AvroTurf::SchemaNotFoundError)
There has not been a release of the gem since August 2018. It would be cool if we could get a new release of the gem so we can enjoy some of the awesome goodness in master
.
Greetings...
The CachedConfluentSchemaRegistry provides an in-memory cache of the registry. It would be nice if this was extended to support a disk based cache as well. This would allow the cache to be preloaded from disk upon an application restart.
In our use-case, we publish messages via a Chef recipe and/or event handler. The trouble is that the recipe is reinitialized on every Chef client convergence (30 minutes). The net result is millions of POSTs to the upstream schema registry (e.g. 1000 chef nodes X 30 minutes/cycle => 2000 redundant POSTs/hour per schema forever).
An extension of the CachedConfluentSchemaRegistry
, something like this:
require 'json'
require 'avro_turf/cached_confluent_schema_registry'
class AvroTurf::WriteThruDiskCachedConfluentSchemaRegistry < AvroTurf::CachedConfluentSchemaRegistry
def initialize(upstream, disk_path)
super(upstream)
# load the write-thru cache on startup if it exists
@schemas_by_id_path = ::File.join(disk_path, 'schemas_by_id.json')
@schemas_by_id = ::JSON.parse(::File.read(@schemas_by_id_path)) if ::File.exist?(@schemas_by_id_path)
@ids_by_schema_path = ::File.join(disk_path, 'ids_by_schema.json')
@ids_by_schema = ::JSON.parse(::File.read(@ids_by_schema_path)) if ::File.exist?(@ids_by_schema_path)
end
# override to include write-thru cache
def fetch(id)
# the write-thru cache (json) does not store keys in numeric format
# so, convert id to a string for caching purposes only
ids = id.to_s
if @schemas_by_id[ids].nil?
# when fetching from upstream, be sure to use the numeric id
@schemas_by_id[ids] = @upstream.fetch(id)
::File.write(@schemas_by_id_path, ::JSON.pretty_generate(@schemas_by_id))
end
@schemas_by_id[ids]
end
# override to include write-thru cache
def register(subject, schema)
key = subject + schema.to_s
if @ids_by_schema[key].nil?
@ids_by_schema[key] = @upstream.register(subject, schema)
::File.write(@ids_by_schema_path, ::JSON.pretty_generate(@ids_by_schema))
end
@ids_by_schema[key]
end
end
Hi! Thanks for your library and your time <3
I'm having some trouble using the Messaging API. Everything works fine using AvroTurf.new
API, but when I move to AvroTurf::Messaging.new(registry_url: my_registry)
the encode
function raises Avro::IO::AvroTypeError
. It still does update the registry with my local schema.
This is my schema:
{
"type": "record",
"name": "campaign",
"namespace": "com.test",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "contract_id",
"type": "string"
},
{
"name": "publisher_id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "goal",
"type": "int"
},
{
"name": "start_date",
"type": "string"
},
{
"name": "end_date",
"type": "string"
},
{
"name": "activated_at",
"default": null,
"type": [
"null",
"string"
]
},
{
"name": "deactivated_at",
"default": null,
"type": [
"null",
"string"
]
},
{
"name": "active",
"type": "boolean"
},
{
"name": "listings",
"type": {
"type": "array",
"items": "int"
}
}
]
}
This is the actual hash i'm trying to serialize:
{
:id=>4,
:user_id=>"[email protected]",
:contract_id=>"c34c728a-09e6-a593-0808-9b2343d80763",
:publisher_id=>"be6f112b-807a-4c7b-dcad-7374ab6aef5e",
:name=>"string",
:goal=>50,
:start_date=>"2019-12-01",
:end_date=>"2019-12-30",
:activated_at=>"teste",
:deactivated_at=>"teste",
:active=>false,
:listings=>[
1,
2
]
}
The entire error message is:
Avro::IO::AvroTypeError: The datum {:id=>4, :user_id=>"[email protected]", :contract_id=>"c34c728a-09e6-a593-0808-9b2343d80763", :publisher_id=>"be6f112b-807a-4c7b-dcad-7374ab6aef5e", :name=>"string", :goal=>50, :start_date=>"2019-12-01", :end_date=>"2019-12-30", :activated_at=>"teste", :deactivated_at=>"teste", :active=>false, :listings=>[1, 2]} is not an example of schema {"type":"record","name":"campaign","namespace":"com.test","fields":[{"name":"id","type":"int"},{"name":"user_id","type":"string"},{"name":"contract_id","type":"string"},{"name":"publisher_id","type":"string"},{"name":"name","type":"string"},{"name":"goal","type":"int"},{"name":"start_date","type":"string"},{"name":"end_date","type":"string"},{"name":"activated_at","type":["null","string"],"default":null},{"name":"deactivated_at","type":["null","string"],"default":null},{"name":"active","type":"boolean"},{"name":"listings","type":{"type":"array","items":"int"}}]}
Thank you again in advance.
Hi
I'm trying to use your 'avro_turf/messaging' with schema registry, but already at very simply examples I'm getting the error message: Excon::Error::Socket: end of file reached (EOFError).
In the internet I found, that it could be a problem of Excon. Either the :idempotent => true or timeout problem. Could you help me, how to tune, correct this problem?
Here is my example of code:
require 'avro_turf/messaging'
avro = AvroTurf::Messaging.new(registry_url: "https://schema-registry.mycompany.com", schemas_path: "/path_to_my/avroschema")
avro_data = avro.encode({ "full_name" => "Jane", "age" => 28 }, schema_name: "person")
Excon::Error::Socket: end of file reached (EOFError)
from <internal:prelude>:76:in `__read_nonblock'
Caused by EOFError: end of file reached
from <internal:prelude>:76:in `__read_nonblock'
person.avsc
{
"name": "person",
"type": "record",
"fields": [
{
"name": "full_name",
"type": "string"
},
{
"name": "age",
"type": "long"
}
]
}
Just for information, I could use the schema registry with a confluent API without any problem, so I suppose, that there is no problem with the registry server, or connection.
thanks for help
When I switched from using the avro file format to the messaging format I now get an encoding problem I did not see before:
/usr/local/bundle/gems/avro-1.8.1/lib/avro/io.rb:206:in write': incompatible character encodings: US-ASCII and UTF-8 (Encoding::CompatibilityError) 19:01:44 from /usr/local/bundle/gems/avro-1.8.1/lib/avro/io.rb:206:in
write_bytes'
19:01:44 from /usr/local/bundle/gems/avro-1.8.1/lib/avro/io.rb:213:in `write_string'
This is happening the UTF-* symbol for degree (\xC2\xB0).
Hello!
Trying out avro turf and got surprised that when reading using messaging with schema registry, it expects the schemas to be on disc, although it also fetches them from the registry. What is the thought behind that? It would be nice if the clients did not have to care about storing files for schemas since they are in the registry.
disclaimer: i could be misunderstanding something fullly, first time using avro with schema registry.
In some cases I need a consumer group to reprocess from the beginning. Is it possible to reset the offset for this case?
Currently encoding data that doesn't conform to the schema using Messaging#encode
, results in exception with not very useful message like Avro::IO::AvroTypeError: The datum nil is not an example of schema "string"
which doesn't refer to the field that was not filled. Avro gem includes a class Avro::SchemaValidator
which can be used to get meaningful error. It is already used indirectly in DatumWriter
through Schema#validate
which converts presence/absence of exception to boolean.
As a very basic way to implement validation in Messaging#encode
is to add
Avro::SchemaValidator.validate!(schema, message, recursive: true, encoded: false, fail_on_extra_fields: true)
after resolving schema
and before encoding the data. message
is also required to have all keys deep stringified (which is not required for encoding).
What do you think about adding validation to messaging interface?
in the schema registry or outside of the SR? the SR is JUST the crud storage of schemas?
so..if another client (say python or java) uses the SR and an event stored in kafka, they have to use another library to decode the message that i have. encoding using avro_turf? no decoding happens in the SR?
Using Confluent Schema Registry Version 2.0.1
Attempting to register a new schema with the registry always, without fail, results in a 500 Internal Server Error and the following output in the Schema Registry logs:
[2016-04-30 23:01:35,007] ERROR Unhandled exception resulting in internal server error response (io.confluent.rest.exceptions.GenericExceptionMapper:37)
com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_OBJECT token
at [Source: org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream@4923d986; line: 1, column: 2] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest["schema"])
at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
at com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:835)
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:59)
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:12)
at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:523)
at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:95)
at com.fasterxml.jackson.databind.deser.impl.BeanPropertyMap.findDeserializeAndSet(BeanPropertyMap.java:285)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:248)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:136)
at com.fasterxml.jackson.databind.ObjectReader._bind(ObjectReader.java:1410)
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:860)
at com.fasterxml.jackson.jaxrs.base.ProviderBase.readFrom(ProviderBase.java:810)
at io.confluent.rest.validation.JacksonMessageBodyProvider.readFrom(JacksonMessageBodyProvider.java:65)
at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$TerminalReaderInterceptor.invokeReadFrom(ReaderInterceptorExecutor.java:260)
at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$TerminalReaderInterceptor.aroundReadFrom(ReaderInterceptorExecutor.java:236)
at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor.proceed(ReaderInterceptorExecutor.java:156)
at org.glassfish.jersey.server.internal.MappableExceptionWrapperInterceptor.aroundReadFrom(MappableExceptionWrapperInterceptor.java:74)
at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor.proceed(ReaderInterceptorExecutor.java:156)
at org.glassfish.jersey.message.internal.MessageBodyFactory.readFrom(MessageBodyFactory.java:1085)
at org.glassfish.jersey.message.internal.InboundMessageContext.readEntity(InboundMessageContext.java:853)
at org.glassfish.jersey.server.ContainerRequest.readEntity(ContainerRequest.java:270)
at org.glassfish.jersey.server.internal.inject.EntityParamValueFactoryProvider$EntityValueFactory.provide(EntityParamValueFactoryProvider.java:96)
at org.glassfish.jersey.server.spi.internal.ParameterValueHelper.getParameterValues(ParameterValueHelper.java:81)
at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$AbstractMethodParamInvoker.getParamValues(JavaResourceMethodDispatcherProvider.java:127)
at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:143)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102)
at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:308)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)
at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
at org.glassfish.jersey.internal.Errors.process(Errors.java:297)
at org.glassfish.jersey.internal.Errors.process(Errors.java:267)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:291)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1140)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:403)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:386)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:334)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:221)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:808)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
at org.eclipse.jetty.server.Server.handle(Server.java:499)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
at java.lang.Thread.run(Thread.java:745)
[2016-04-30 23:01:35,011] INFO 172.17.0.1 - - [30/Apr/2016:23:01:35 +0000] "POST /subjects/test_schema/versions HTTP/1.1" 500 52 8 (io.confluent.rest-utils.requests:77)
The reason is that the schema registry expects the actual schema to be provided as double-stringified JSON.
Correct post body:
{
"schema": "{\"type\": \"string\"}"
}
Incorrect post body (what AvroTurf currently sends):
{
"schema": {
"type": {
"type_sym":"string"
}
}
}
I'm having a major integration issue: lensesio/fast-data-dev#22
The root cause is incorrect/outdated handling of schema names which must apparently be registered with -value
and -key
suffixes, according to https://github.com/confluentinc/schema-registry
I wonder if anyone involved can give any clues or a code change directions (if it's not too much of a hassle, I might do a PR). Thanks!
We are using avro turf on Jruby 9.2.3.0.
I understand that Avro Turf registers schemas in-memory lazily i.e. if a schema name is not present; it'll register it. However, when multiple threads are trying to use the same schema, there will be concurrent schema registration attempts which throw the following exception:
<Avro::SchemaParseError: The name "precomputation_record" is already in use.>}}</b> Per task Stacktrace: {
"aw_conv"=>{:stacktrace=>"[\"/home/deploy/.rvm/gems/jruby-9.2.3.0/gems/avro-1.8.2/lib/avro/schema.rb:410:in `add_name'\",
\"/home/deploy/.rvm/gems/jruby-9.2.3.0/gems/avro-1.8.2/lib/avro/schema.rb:187:in `initialize'\",
\"/home/deploy/.rvm/gems/jruby-9.2.3.0/gems/avro-1.8.2/lib/avro/schema.rb:235:in `initialize'\",
\"/home/deploy/.rvm/gems/jruby-9.2.3.0/gems/avro-1.8.2/lib/avro/schema.rb:67:in `real_parse'\",
\"/home/deploy/.rvm/gems/jruby-9.2.3.0/gems/avro_turf-0.8.1/lib/avro_turf/schema_store.rb:21:in `find'\",
\"/home/deploy/.rvm/gems/jruby-9.2.3.0/gems/avro_turf-0.8.1/lib/avro_turf.rb:53:in `encode_to_stream'\",
\"/home/deploy/.rvm/gems/jruby-9.2.3.0/gems/avro_turf-0.8.1/lib/avro_turf.rb:39:in `encode'\",
What if I don't want to hard-code a schema in a file path and I don't want to set up the registry stuff? Is there any way to just define the schema in a variable and use that?
Was looking for an example for using a logical data type? In particular, I'm looking for how to pass a DateTime field via Avro?
hi @dasch ,
I don't understand why do we need to maintain local copies of the schema when we can fetch them from the schema registry. Isn't the whole point of the schema registry is to maintain the latest copy of schema across all applications trying to produce and consume?
Encoding or decoding with avro_turf requires the schema files to be present at ./schemas directory (DEFAULT_SCHEMAS_PATH = "./schemas").
how can I just leverage the schemas present in the schema registry without storing them locally?
I also encounter this error when I try to run
require 'avro_turf/messaging'
require 'ruby-kafka'
require 'pry'
kafka = Kafka.new("kafka-0.x.net:32400,kafka-1.x.net:32401,kafka-2.x.net:32402".split(","))
avro = AvroTurf::Messaging.new(registry_url: "https://schema-registry.x.net/")
producer = kafka.producer
data = avro.encode({"org" => "foobar", "job_id" => "123abc"}, schema_name: "Source")
producer.produce(data, topic: "samples")
producer.deliver_messages
Traceback (most recent call last):
2: from play.rb:14:in `<main>'
1: from /Users/shashiravula/.rvm/gems/ruby-2.5.0/gems/avro_turf-0.8.0/lib/avro_turf/messaging.rb:52:in `encode'
/Users/shashiravula/.rvm/gems/ruby-2.5.0/gems/avro_turf-0.8.0/lib/avro_turf/schema_store.rb:23:in `find': expected schema `./schemas/Source.avsc' to define type `Source' (AvroTurf::SchemaError)
but then again if I change the schema name to namespace.schemaname. I can register the schema and decode the message using the same.
pry(main)> sample = avro.encode(obj, schema_name: "betterdoctor.Source")
I, [2018-05-01T15:33:05.213874 #21628] INFO -- : Registered schema for subject `betterdoctor.Source`; id = 2
But outside of pry if use it in the example code above
data = avro.encode({"org" => "foobar", "job_id" => "123abc"}, schema_name: "betterdoctor.Source")
could not find Avro schema at `./schemas/betterdoctor/Source.avsc' (AvroTurf::SchemaNotFoundError)
I would really appreciate if you can help me tell where I am going wrong.
Best,
shashi
Not all schema registries are rooted at /
. Clients based on confluent libraries accept a URI with a path to the schema registry. AvroTurf uses Excon for http handling, which unfortunately strips out any path passed in with the URI. This makes avro turf not fully compatible with all registry installations.
I am trying to create a consumer that utilizes Schema Registry. I am trying to use avro_turf
to top of Racecar gem.
I am able to get object from this:
avro = AvroTurf::Messaging.new(registry_url: "http://my-registry:8081/")
But I am confused what I should do next. I am just creating consumer, I don't need a producer. And I suppose this is what I should be doing:
avro.decode(data)
result = avro.decode_message(data)
result.message
But where would I get that data
from? I saw that I could do this:
data = avro.encode({ "title" => "hello, world" }, schema_id: 2)
And found out, I can leave the first hash empty:
data = avro.encode({ }, schema_id: 2)
Is keeping it empty the right way to do it?
Using the ConfluentSchemaRegistry with Confluent Cloud may require using a very hacky workaround for user/password auth.
excon
for user
& password
https://user:password@host
but if the password has special characters URI parse will failclass CustomUriParser
def initialize(host:, user:, password:)
@host = host
@user = user
@password = password
end
def parse(uri)
OpenStruct.new(
host: @host,
hostname: @host,
path: '/',
port: nil,
query: nil,
scheme: 'https',
user: @user,
password: @password,
)
end
end
Excon.defaults[:uri_parser] = CustomUriParser.new(
host: 'somehost.foo.aws.confluent.cloud',
user: 'token',
password: 'secret_key_with_some_special_chars',
)
avro = AvroTurf::Messaging.new(registry_url: 'does not matter')
Ideally we could just do:
AvroTurf::Messaging.new(
registry_url: 'somehost.foo.aws.confluent.cloud',
user: 'token',
password: 'secret_key'
)
Happy to create a PR to address this.
This is a critical issue. All falsey default values in a schema definition are stripped when sending the schema to the registry.
These field definitions...
{"name": "is_usable", "type": "boolean", "default": false},
{"name": "address_2", "type": ["null","string"], "default": null}
...are submitted to the registry as...
{"name": "is_usable", "type": "boolean"}
{"name": "address_2", "type": ["null","string"]}
This is caused by a bug in Apache's Avro library for Ruby. I've filed AVRO-1848 and submitted this pull request to fix it.
Obviously, this introduces unexpected behavior in downstream consumers. Additionally, the registry rejects certain changes as being incompatible only because of the missing default.
I'll submit a monkey-patch PR for avro_turf shortly to stop the bleeding in the interim.
Hi,
Thanks guys for your lib. I'm using it with a Schema Registry but i got a problem.
I was using Avro without avro_turf
for encoding data.
I used this code to encode my data with a specific Schema:
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
datum_writer = Avro::IO::DatumWriter.new(SCHEMA)
datum_writer.write(data, encoder)
I had a producer and multiple consumers. Next when i started to use avro_turf
, the encoding and producing was ok but my consumers could not decode the avro data.
I check your code for encoding the data and i saw :
# Always start with the magic byte.
encoder.write(MAGIC_BYTE)
# The schema id is encoded as a 4-byte big-endian integer.
encoder.write([schema_id].pack("N"))
# The actual message comes last.
writer.write(message, encoder)
I removed the first 5 bytes of my Buffer (magic byte + shema_id) and every things goes right.
My question is why are you adding this 5 bytes in the buffer ? Is it a way provided by Apache or Confluent ?
Thanks in advance !
So I'm using it schemas - working great. I'm thinking of using the registry - but am confused about a few things. How do I get the schema into the registry in the first place? Can I do that with this gem or is that left to do in a different way?
I'm trying a similar example to the one in the README, but I get the following errors:
Loading development environment (Rails 4.2.7.1)
[1] pry(main)> require "avro_turf"
=> true
[2] pry(main)> avro = AvroTurf.new(schemas_path: "#{Rails.root}/app/schemas/")
=> #<AvroTurf:0x000000138eb508 @codec=nil, @namespace=nil, @schema_store=#<AvroTurf::SchemaStore:0x000000138eb490 @path="/vagrant/app/schemas/", @schemas={}>>
[3] pry(main)> avro.encode({ "name" => "jaime" }, schema_name: "order")
Avro::UnknownSchemaError: nil is not a schema we know about.
from /home/vagrant/.rvm/gems/ruby-2.2.6/gems/avro-1.8.1/lib/avro/schema.rb:89:in `real_parse'
[4] pry(main)> avro.encode({ "name" => "jaime" }, schema_name: "order")
NoMethodError: undefined method `map' for nil:NilClass
from /home/vagrant/.rvm/gems/ruby-2.2.6/gems/avro-1.8.1/lib/avro/schema.rb:247:in `to_avro'
I have a single file named app/schemas/order.avsc
with this content:
{
"name": "order",
"type": "record",
"fields": [
{
"name": { "type": "string" }
}
]
}
Using Ruby 2.2.6 and Rails 4.2.7.1
What am I doing wrong? Thanks!
While using reader and performing avro.decode(data)
in cases where reader is responsible for decoding multiple schemas, it would be very useful, that object returned is container packing schema meta data in addition to decoded payload.
I would be happy to offer a backwards compatible PR with this feature in case it's wanted.
Do you have an example on how to set this up with docker and docker-compose
I am doing this
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- my-app-tier
kafka:
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
volumes:
- kafka-persistence:/bitnami/kafka
networks:
- my-app-tier
schema-registry:
image: 'confluent/schema-registry'
ports:
- '8081:8081'
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=localhost
- SCHEMA_REGISTRY_DEBUG=true
networks:
- my-app-tier
kafka-rest-proxy:
image: 'confluent/rest-proxy'
ports:
- '8082:8082'
environment:
- KAFKA_REST_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_REST_HOST_NAME=localhost
networks:
- my-app-tier
But it does not seem to work?
avro = AvroTurf::Messaging.new(registry_url: "schema-registry:8081/", schemas_path: Rails.root.join('app', 'schemas/'))
[36] pry(main)> data = avro.encode({ "name" => "hello, world", data: "test"}, schema_name: "integration")
NoMethodError: undefined method `+' for nil:NilClass
from /gems/gems/excon-0.62.0/lib/excon/connection.rb:239:in `request'
Hi there, i did see #66. I'm asking if someone can offer even more explanation and reasoning for this.
Why do i need local storage for schemas when they're already being stored in the schema registry?
We have a rails API app using kafka & need schema validation. We had expected that schemas would be stored in the schema registry, and dynamically fetched and cached from this when encoding/decoding messages (btw. does schema validation happen thru encoding/decoding? or do we need to write code to validate an incoming message against a particular schema)?
We were surprised to see that avro_turf (and others) require the schemas to be stored locally w/ the code. Would love to see more explanation of this (why local) and how the schema validation & versioning works exactly.
Is this a design that we just have to do? we have to store our schemas locally. What's the point of a remote schema registry (e.g., in our case where we have all messages going thru a API server).
I'm using avro_turf for binary encoding for Kafka and use configuration to switch between in-message and registry schemas (in-message for simpler local testing, registry for production). The transformation from Ruby objects to Avro types for these two cases is inconsistent.
For in-message encoding (AvroTurf
class), I can do:
avro.encode(serializable_hash, schema_name: schema_name)
This serializes Ruby objects in the hash (e.g. Time objects to ISO8601 strings) and creates the binary encoding.
For registry-based encoding (AvroTurf::Messaging
class), I need to call as_json
to serialize the Ruby objects first otherwise I get encoding errors. That is:
avro.encode(serializable_hash.as_json, schema_name: schema_name)
I'll aim to find some time in the near future to do a PR, but describing the issue here to make sure it isn't lost. Please comment if the current behaviour is intentional and shouldn't be changed.
Hi,
I'm actually trying to build specs to test your lib but i'm stuck right now.
My first question is where do you initialise the AvroTurf::Messaging
? I do it in the initializer, i'm using it in producers and consumers.
But it block me for testing it, when i initialize it in the initializee directory, i can't mock the url before it have been load. That's why i'm not sure i should load it in the initialize directory.
Thanks in advance
Hi there,
We're using avro turf along with the confluent schema registry and we're looking to provide some resilience to schema registry dropouts. Given enough Rails instances and enough (potentially rarely used) schemas the probability that a given registry dropout will occur on a cold cache is quite high.
We can warm the cache in a bit of a bodge by encoding a sample message for each schema on startup, but that requires us to maintain a set of sample messages and is a bit grotty.
Would you be up for a PR which adds a method to Messaging
which e.g. registers all schemas in the schema store? Or something similar?
I run a comparison benchmark of AVRO vs JSON serialization within a Rails app and was surprised to see these numbers:
https://gist.github.com/jaimeiniesta/2ca95f0026e693441c581fc61a06c458
I expected AVRO to be faster than JSON. Was I wrong in my assumption? Is there a way to make this faster?
Specification of array type may be wrong:
// person_list.avsc
{
"name": "person_list",
"type": "array",
"items": "person"
}
Correct syntax instead should be:
// person_list.avsc
{
"name": "person_list",
"type": {
"type": "array",
"items": "person"
}
}
In RubyGem excon before 0.71.0, there was a race condition around persistent connections, where a connection which is interrupted (such as by a timeout) would leave data on the socket. Subsequent requests would then read this data, returning content from the previous response. The race condition window appears to be short, and it would be difficult to purposefully exploit this.
I succesfully produce and consume avro messages using avro_turf and phobos. Something like this:
data = avro.encode(
contents,
schema_name: 'myschema',
namespace: 'mynamespace',
subject: 'mynamespace.myschema-value'
)
producer.publish('mynamespace.myschema', data)
However, I wanted to use a key when publishing producer.publish('mynamespace.myschema', data, mykey)
, and suddenly everything failed to work. Unknown magic byte for instance. And the Kafka topic UI did not indentify the type as Avro anymore.
What is the correct way to do this? Do I need to encode the key somehow too?
Hi,
it would be nice to have an ability to configure connection options to Schema Registry like connect_timeout
, retry_limit
etc.
Default timeout in Excon is 60 seconds. So if I register all schemas during app initialization step and for some reason schema is unavailable - it will fail only after 60 seconds, which is unacceptable in my case.
Currently I solve this issue with a little wrapper around AvroTurf::ConfluentSchemaRegistry
that accepts Excon
connection options:
class AvroSchemaRegistry < AvroTurf::ConfluentSchemaRegistry
def initialize(url, logger: Logger.new($stdout), **connect_options)
super(url, logger: logger)
@connection.params.merge!(connect_options)
end
end
I just tried using avro_turf to read in a schema file to process some JSON. I got the above error.
The problem is that the schema includes a namespace field, so when the check for schema.fullname != fullname
occurrs, it fails, because schema.fullname
is equal to 'avro.tree.schema.red.tree'.
Your find function takes a namespace parameter, but it's unused by the load_schemas! function, so it never actually gets passed.
Filename: tree.avsc
snippet:
{
"name": "tree",
"type": "record",
"namespace": "avro.tree.schema.red"
...
}
Am I using it wrong? Thanks!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.