Giter VIP home page Giter VIP logo

flix-tech / avro-serde-php Goto Github PK

View Code? Open in Web Editor NEW
57.0 7.0 35.0 175 KB

Avro Serialisation/Deserialisation (SerDe) library for PHP 7.3+ & 8.0 with a Symfony Serializer integration

Home Page: https://www.flix.tech/

License: MIT License

PHP 93.80% Dockerfile 0.46% Makefile 1.83% Shell 3.91%
confluent-platform avro avro-schema php serialization deserialization confluent serde avro-format symfony

avro-serde-php's Introduction

Avro SerDe for PHP 7.3+ and 8.0

php-confluent-serde Actions Status Maintainability Test Coverage Latest Stable Version Total Downloads License

Motivation

When serializing and deserializing messages using the Avro serialization format, especially when integrating with the Confluent Platform, you want to make sure that schemas are evolved in a way that downstream consumers are not affected.

Hence Confluent developed the Schema Registry which has the responsibility to validate a given schema evolution against a configurable compatibility policy.

Unfortunately Confluent is not providing an official Avro SerDe package for PHP. This library aims to provide an Avro SerDe library for PHP that implements the Confluent wire format and integrates FlixTech's Schema Registry Client.

Installation

This library is using the composer package manager for PHP.

composer require 'flix-tech/avro-serde-php:^1.6'

Quickstart

NOTE

You should always use a cached schema registry client, since otherwise you'd make an HTTP request for every message serialized or deserialized.

1. Create a cached Schema Registry client

See the Schema Registry client documentation on caching for more detailed information.

<?php

use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use GuzzleHttp\Client;

$schemaRegistryClient = new CachedRegistry(
    new PromisingRegistry(
        new Client(['base_uri' => 'registry.example.com'])
    ),
    new AvroObjectCacheAdapter()
);

2. Build the RecordSerializer instance

The RecordSerializer is the main way you interact with this library. It provides the encodeRecord and decodeMessage methods for SerDe operations.

<?php

use FlixTech\AvroSerializer\Objects\RecordSerializer;

/** @var \FlixTech\SchemaRegistryApi\Registry $schemaRegistry */
$recordSerializer = new RecordSerializer(
    $schemaRegistry,
    [
        // If you want to auto-register missing schemas set this to true
        RecordSerializer::OPTION_REGISTER_MISSING_SCHEMAS => false,
        // If you want to auto-register missing subjects set this to true
        RecordSerializer::OPTION_REGISTER_MISSING_SUBJECTS => false,
    ]
);

3. Encoding records

This is a simple example on how you can use the RecordSerializer to encode messages in the Confluent Avro wire format.

<?php

/** @var \FlixTech\AvroSerializer\Objects\RecordSerializer $recordSerializer */
$subject = 'my-topic-value';
$avroSchema = AvroSchema::parse('{"type": "string"}');
$record = 'Test message';

$encodedBinaryAvro = $recordSerializer->encodeRecord($subject, $avroSchema, $record);
// Send this over the wire...

4. Decoding messages

This is a simple example on how you can use the RecordSerializer to decode messages.

<?php

/** @var \FlixTech\AvroSerializer\Objects\RecordSerializer $recordSerializer */
/** @var string $encodedBinaryAvro */
$record = $recordSerializer->decodeMessage($encodedBinaryAvro);

echo $record; // 'Test message'

Schema Resolvers

Schema Resolvers are responsible to know which Avro schema belongs to which type of record. This is especially useful if you want to manage your Avro schemas in separate files. Schema Resolvers enable you to integrate with whatever schema management concept you may have outside of the scope of this library.

Schema Resolvers take a $record of any type and try to resolve a matching AvroSchema instance for it.

FileResolver

In even moderately complicated applications you want to manage your schemas within the VCS, most probably as .avsc files. These files contain JSON that is describing the Avro schema.

The resolver takes a $baseDir in which you want to manage the files and an inflector callable, which is a simple function that takes the record as first parameter, and a second boolean $isKey parameter indicating if the inflection is targeting a key schema.

<?php

namespace MyNamespace;

use FlixTech\AvroSerializer\Objects\SchemaResolvers\FileResolver;
use function get_class;use function is_object;
use function str_replace;

class MyRecord {}

$record = new MyRecord();

$baseDir = __DIR__ . '/files';

$inflector = static function ($record, bool $isKey) {
    $ext = $isKey ? '.key.avsc' : '.avsc';
    $fileName = is_object($record)
        ? str_replace('\\', '.', get_class($record))
        : 'default';
    
    return $fileName . $ext;
};


echo $inflector($record, false); // MyNamespace.MyRecord.avsc
echo $inflector($record, true); // MyNamespace.MyRecord.key.avsc

$resolver = new FileResolver($baseDir, $inflector);

$resolver->valueSchemaFor($record); // This will load from $baseDir . '/' . MyNamespace.MyRecord.avsc
$resolver->keySchemaFor($record); // This will load from $baseDir . '/' . MyNamespace.MyRecord.key.avsc

CallableResolver

This is the simplest but also most flexible resolver. It just takes two callables that are responsible to fetch either value- or key-schemas respectively. A key schema resolver is optional.

<?php

use FlixTech\AvroSerializer\Objects\SchemaResolvers\CallableResolver;
use PHPUnit\Framework\Assert;
use function Widmogrod\Functional\constt;

$valueSchemaJson = '
{
  "type": "record",
  "name": "user",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"}
  ]
}
';
$valueSchema = AvroSchema::parse($valueSchemaJson);

$resolver = new CallableResolver(
    constt(
        AvroSchema::parse($valueSchemaJson)
    )
);

$record = [ 'foo' => 'bar' ];

$schema = $resolver->valueSchemaFor($record);

Assert::assertEquals($schema, $valueSchema);

DefinitionInterfaceResolver

This library also provides a HasSchemaDefinitionInterface that exposes two static methods:

  • HasSchemaDefinitionInterface::valueSchemaJson returns the schema definition for the value as JSON string
  • HasSchemaDefinitionInterface::keySchemaJson returns either NULL or the schema definition for the key as JSON string.

The DefinitionInterfaceResolver checks if a given record implements that interface (if not it will throw an InvalidArgumentException) and resolves the schemas via the static methods.

<?php

namespace MyNamespace;

use FlixTech\AvroSerializer\Objects\HasSchemaDefinitionInterface;
use FlixTech\AvroSerializer\Objects\SchemaResolvers\DefinitionInterfaceResolver;

class MyRecord implements HasSchemaDefinitionInterface {
    public static function valueSchemaJson() : string
    {
        return '
               {
                 "type": "record",
                 "name": "user",
                 "fields": [
                   {"name": "name", "type": "string"},
                   {"name": "age", "type": "int"}
                 ]
               }
               ';
    }
    
    public static function keySchemaJson() : ?string
    {
        return '{"type": "string"}';
    }
}

$record = new MyRecord();

$resolver = new DefinitionInterfaceResolver();

$resolver->valueSchemaFor($record); // Will resolve from $record::valueSchemaJson();
$resolver->keySchemaFor($record); // Will resolve from $record::keySchemaJson();

ChainResolver

The chain resolver is a useful tool for composing multiple resolvers. The first resolver to be able to resolve a schema will win. If none of the resolvers in the chain is able to determine a schema, an InvalidArgumentException is thrown.

<?php

namespace MyNamespace;

use FlixTech\AvroSerializer\Objects\SchemaResolvers\ChainResolver;

$record = ['foo' => 'bar'];

/** @var \FlixTech\AvroSerializer\Objects\SchemaResolvers\FileResolver $fileResolver */
/** @var \FlixTech\AvroSerializer\Objects\SchemaResolvers\CallableResolver $callableResolver */

$resolver = new ChainResolver($fileResolver, $callableResolver);
// or new ChainResolver(...[$fileResolver, $callableResolver]);

$resolver->valueSchemaFor($record); // Will resolve $fileResolver, then $callableResolver
$resolver->keySchemaFor($record); // Will resolve $fileResolver, then $callableResolver

Symfony Serializer Integration

This library provides integrations with the Symfony Serializer component.

<?php

use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\AvroSerDeEncoder;
use FlixTech\AvroSerializer\Objects\DefaultRecordSerializerFactory;
use PHPUnit\Framework\Assert;
use Symfony\Component\Serializer\Normalizer\GetSetMethodNormalizer;
use Symfony\Component\Serializer\Serializer;

class User
{
    /** @var string */
    private $name;

    /** @var int */
    private $age;

    public function __construct(string $name, int $age)
    {
        $this->name = $name;
        $this->age = $age;
    }

    public function getName(): string
    {
        return $this->name;
    }

    public function setName(string $name): void
    {
        $this->name = $name;
    }

    public function getAge(): int
    {
        return $this->age;
    }

    public function setAge(int $age): void
    {
        $this->age = $age;
    }
}

$recordSerializer = DefaultRecordSerializerFactory::get(
    getenv('SCHEMA_REGISTRY_HOST')
);

$avroSchemaJson = '{
  "type": "record",
  "name": "user",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"}
  ]
}';

$user = new User('Thomas', 38);

$normalizer = new GetSetMethodNormalizer();
$encoder = new AvroSerDeEncoder($recordSerializer);

$symfonySerializer = new Serializer([$normalizer], [$encoder]);

$serialized = $symfonySerializer->serialize(
    $user,
    AvroSerDeEncoder::FORMAT_AVRO,
    [
        AvroSerDeEncoder::CONTEXT_ENCODE_SUBJECT => 'users-value',
        AvroSerDeEncoder::CONTEXT_ENCODE_WRITERS_SCHEMA => AvroSchema::parse($avroSchemaJson),
    ]
);

$deserializedUser = $symfonySerializer->deserialize(
    $serialized,
    User::class,
    AvroSerDeEncoder::FORMAT_AVRO
);

Assert::assertEquals($deserializedUser, $user);

Name converter

Sometimes your property names may differ from the names of the fields in your schema. One option to solve this is by using custom Serializer annotations. However, if you're using the annotations provided by this library, you may use our name converter that parses these annotations and maps between the schema field names and the property names.

<?php

use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\AvroSerDeEncoder;
use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\NameConverter\AvroNameConverter;
use FlixTech\AvroSerializer\Objects\DefaultRecordSerializerFactory;
use Symfony\Component\Serializer\Normalizer\GetSetMethodNormalizer;
use Symfony\Component\Serializer\Serializer;
use Doctrine\Common\Annotations\AnnotationReader as DoctrineAnnotationReader;
use Doctrine\Common\Annotations\AnnotationRegistry;
use FlixTech\AvroSerializer\Objects\Schema\Generation\AnnotationReader;

$recordSerializer = DefaultRecordSerializerFactory::get(
    getenv('SCHEMA_REGISTRY_HOST')
);

AnnotationRegistry::registerLoader('class_exists');

$reader = new AnnotationReader(
    new DoctrineAnnotationReader()
);

$nameConverter = new AvroNameConverter($reader);

$normalizer = new GetSetMethodNormalizer(null, $nameConverter);
$encoder = new AvroSerDeEncoder($recordSerializer);

$symfonySerializer = new Serializer([$normalizer], [$encoder]);

Schema builder

This library also provides means of defining schemas using php, very similar to the SchemaBuilder API provided by the Java SDK:

<?php

use FlixTech\AvroSerializer\Objects\Schema;
use FlixTech\AvroSerializer\Objects\Schema\Record\FieldOption;

Schema::record()
    ->name('object')
    ->namespace('org.acme')
    ->doc('A test object')
    ->aliases(['stdClass', 'array'])
    ->field('name', Schema::string(), FieldOption::doc('Name of the object'), FieldOption::orderDesc())
    ->field('answer', Schema::int(), FieldOption::default(42), FieldOption::orderAsc(), FieldOption::aliases('wrong', 'correct'))
    ->field('ignore', Schema::boolean(), FieldOption::orderIgnore())
    ->parse();

Schema generator

Besides providing a fluent api for defining schemas, we also provide means of generating schema from class metadata (annotations). For this to work, you have to install the doctrine/annotations package.

<?php

use FlixTech\AvroSerializer\Objects\DefaultSchemaGeneratorFactory;
use FlixTech\AvroSerializer\Objects\Schema\Generation\Annotations as SerDe;

/**
 * @SerDe\AvroType("record")
 * @SerDe\AvroName("user")
 */
class User
{
    /**
     * @SerDe\AvroType("string")
     * @var string
     */
    private $firstName;

    /**
     * @SerDe\AvroType("string")
     * @var string
     */
    private $lastName;

    /**
     * @SerDe\AvroType("int")
     * @var int
     */
    private $age;

    public function __construct(string $firstName, string $lastName, int $age)
    {
        $this->firstName = $firstName;
        $this->lastName = $lastName;
        $this->age = $age;
    }

    public function getFirstName(): string
    {
        return $this->firstName;
    }

    public function getLastName(): string
    {
        return $this->lastName;
    }

    public function getAge(): int
    {
        return $this->age;
    }
}

$generator = DefaultSchemaGeneratorFactory::get();

$schema = $generator->generate(User::class);
$avroSchema = $schema->parse();

Further examples on the possible annotations can be seen in the test case.

Examples

This library provides a few executable examples in the examples folder. You should have a look to get an understanding how this library works.

avro-serde-php's People

Contributors

alexeevdv avatar alumarcu avatar bafs avatar kandrejevs2 avatar konstantincodes avatar mauriziomoreo avatar mente avatar nick-zh avatar tpl0ch avatar xico42 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

avro-serde-php's Issues

unpack with A* is causing trouble for specific values

Hello, we have problems decoding messages when the binary message contains some specific values. We noticed that when the avro schema defined has a last element an int-value, and that int value has the value of 5 or 16, it is decoded as 0 in our application.

After some hours of debugging, we realized that Protocol::decode() uses format C%s/N%s/A*%s for unpacking binary data. It seems A* is causing the problem in this specific cases. We noticed that incoming int values have a representation which are L-shifted by 1, so an int 5 has a representation of 10 -> which string representation is \n, since ASCII 10 is Linefeed (\n) . Now regarding to php doc https://www.php.net/manual/en/function.unpack.php unpack with formatstring A means:

A strips all trailing ASCII whitespace (spaces, tabs, newlines, carriage returns, and NULL bytes).

The same occurs with the value 16 (shifted-left by 1 =32 -> 32 in ASCII is SPACE => gets stripped).

NULL (ASCII 0), Tabs (ASCII 9), Carriage (ASCII 13) are not affected, since they are odd.

Changing the formatstring to C%s/N%s/a*%s seems to be a fix.

It seems to be the same issue as #31, but the pr #45 fixed only encode, not decode.

String representation of avro schema is not correct (flix-tech/avro-php)

Background: I have a java application which uses Java kafka client and avro packages to register a schema. The messages to that schema will be pushed to the topic using a PHP application.

Issue: Before producing a kafka message following API is being called to determine the schemaId of the schema we wish to produce message to. The current API call being sent from flix-tech/schema-registry-php-client resembles the following:

POST /subjects/test HTTP/1.1
Host: schemaregistry.example.com
Accept: application/vnd.schemaregistry.v1+json
{"schema":"{\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"type\":\"string\",\"name\":\"field1\"},{\"type\":\"int\",\"name\":\"field2\"}]}"}

This results in an error from kafka schema registry 40403 schema not found

The above API call is being sent from here:
https://github.com/flix-tech/schema-registry-php-client/blob/ad1a7960731fa2b4f8a0708286368b4b21a0ad40/src/Requests/Functions.php#L76

The schema string used in the above API call is being generated in flix-tech/avro-php package is from this function which simply uses json_encode on the avro object.
https://github.com/flix-tech/avro-php/blob/0eadbc19f3e0f79804d73b71f8c3e309a37efe77/lib/avro/schema.php#L548

The ideal way of sending the above API call is following which uses a proper formatted json string according to confluent doc:

POST /subjects/test HTTP/1.1
Host: schemaregistry.example.com
Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json

{
      "schema":
         "{
                \"type\": \"record\",
                \"name\": \"test\",
                \"fields\":
                  [
                    {
                      \"type\": \"string\",
                      \"name\": \"field1\"
                    },
                    {
                      \"type\": \"int\",
                      \"name\": \"field2\"
                    }
                  ]
              }"
    }

https://docs.confluent.io/3.0.0/schema-registry/docs/api.html
The above API call works fine with correct json encoded formatting.

I had to extend AvroSchema class to define a static variable called $json which is initialized in parse function
https://github.com/flix-tech/avro-php/blob/0eadbc19f3e0f79804d73b71f8c3e309a37efe77/lib/avro/schema.php#L307

and for returning the schema from __toString I am simply returning the static variable value. Could you please add a proper fix for this? Also, I believe the schema should be registered with same format as well.

Avro producer being used for the PHP application is
https://github.com/jobcloud/php-kafka-lib

@nick-zh

Unable to decode avro message where a schema field is of multiple types

I have a situation where I need to use a boolean field with default value null for the avro schema field, to do that I tried using a schema of this type:

{
    "namespace": "kafka",
    "name": "sample_schema",
    "type": "record",
    "fields": [
        {
            "name": "field_1",
            "type": "int"
        },
        {
            "name": "field_2",
            "type": [
                "null",
                "boolean"
            ]
        },
        {
            "name": "field_3",
            "type": "string"
        }
    ]
}

These are the errors I get:

AvroIOSchemaMatchException: Writer's schema "null" and Reader's schema "boolean" do not match.

Next FlixTech\AvroSerializer\Objects\Exceptions\AvroDecodingException: Could not decode message.

How to define logicalTypes

Hi everyone !

I'm trying to define a field as "timestamp-millis" and i can see that this definition does exist in the code as type long with a logical type.
But i dont see any usage of it in the tests and it seems to be unavailable as type in the annotations Avro\AvroType.

Can anyone give me an example on how to define it ? If its not available right now, i can work on a PR adding it.

Unions of complex types are not handled correctly

Since the supposedly bugfix #66 in avro-php 4.3.0 the handling of unions with complex types does not work correctly any more: No matter which of the union members is provided, always the first member is written (with value NULL).
For illustration please see the attached file containing tests for a sample schema.
UnionsWithoutDefaultsTest.txt

In my opinion, there are multiple aspects causing this problem:

  • The function default_value() returns NULL if no default value exists for a field.
  • The function is_valid_datum in schema.php replaces field->name() by field->default_value() and thus gets a NULL, if no default value has been defined. Therefore, always the first union member is evaluated as "valid datum".
  • The same happens in function write_record in datum.php: Always the first union member is written with (supposed) default NULL.

I think a solution would include the following fixes:
=> A record that is missing an expected field must not pass as a valid record if the field is not nullable.
=> If an expected field is missing, this field must not be written at all (instead of writing it with value NULL). If the field is not nullable, this must raise an error.

BTW according to Avro spec, default values are not to be used when writing. They only provide a means for the reader to replace missing fields by default values, see https://avro.apache.org/docs/1.10.2/spec.html:
default: A default value for this field, only used when reading instances that lack the field for schema evolution purposes. The presence of a default value does not make the field optional at encoding time.

Integers decode incorrectly when the last encoded byte is a whitespace character.

<?php

require_once('vendor/autoload.php');

use GuzzleHttp\Client;

use FlixTech\AvroSerializer\Objects\RecordSerializer;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;

$registry = new CachedRegistry(
	new PromisingRegistry(
		new Client(['base_uri' => 'schema-registry:8081'])
	),
	new AvroObjectCacheAdapter()
);

$serializer = new RecordSerializer(
	$registry,
	[
		RecordSerializer::OPTION_REGISTER_MISSING_SCHEMAS => true,
		RecordSerializer::OPTION_REGISTER_MISSING_SUBJECTS => true, 
	]
);

$schema = AvroSchema::parse('{"name":"Key","type":"record","fields":[{"name":"project_date_id","type":"int"}]}');
$record = ['project_date_id' => 13829974];

$encoded = $serializer->encodeRecord('bytes-test', $schema, $record);
echo implode(' ', array_map('ord', str_split($encoded))) . PHP_EOL;
echo $serializer->decodeMessage($encoded)['project_date_id'] . PHP_EOL;

Produces this output:

0 0 0 0 21 172 157 152 13
198486

The fifth byte will vary with your registry, but the result is that the original id, 13829974, becomes 198486 after encoding/decoding. I've tracked it down to this line:

'C%s/N%s/A*%s',

Where this happens:

https://www.php.net/manual/en/function.unpack.php

The "A" code now strips all trailing ASCII whitespace (spaces, tabs, newlines, carriage returns, and NULL bytes).

So the uppercase A in the format string is dropping the last byte (13) because it thinks it is whitespace. A lowercase a seems to work correctly.

Not able to use union as map values using annotations

Hey, I found that it's not possible to do the following with annotations

This work fine with schema builder

$avroSchema = Schema::record()
    ->name('SubmissionReceived')
    ...
    ->field(
        'formData',
        Schema::map()
            ->values(
                Schema::union(
                    Schema::string(),
                    Schema::array()->items(Schema::string())
                )
            )
    )
    ->parse();

but I'm not able to do the same with annotations

    /**
     * @SerDe\AvroName("formData")
     * @SerDe\AvroType("map", attributes={
     *     @SerDe\AvroValues(???),
     * })
     */
    private array $formData;

Could someone help?

Karapace issue with PromisingRegistry::schemaId()

I've been tracking down an issue where I get a ""Schema not found" Exception even though I'm 100% sure the schema is there and I think I've found the problem. PromisingRegistry's schemaId() method uses the function checkIfSubjectHasSchemaRegisteredRequest() which sets up a call to the registry with a url of "subjects/" but with Karapace this call isn't supported and returns a 404.

I'd be happy to raise a PR except I'm not sure the best way to fix this. Looking at the code around schemaId() it appears to extract the global Id for the schema from the rest response but it doesn't appear to make any use of the schema version. Karapace looks like it will return the needed info but only if you use "subjects//versions/<version number or 'latest'>" and the global Id is going to be different depending on the schema version you want.

So any idea how to fix this? For reference, the call to schemaId() I am debugging is made from RecordSerializer's getSchemaIdForSchema() method so it does look like it expects to use the returned id.

help about creating record

hi
i was reading the document but I did not understand how to create a record and encode it according to the schema

    $avro = AvroSchema::parse(file_get_contents('./schemas/user.avsc'));
    $schemaRegistryClient=new CachedRegistry(
        new PromisingRegistry(
            new Client([
                'base_uri'=>"registry.example.com"
            ])
        ),
        new AvroObjectCacheAdapter()
    );
    $recordSerializer=new RecordSerializer($schemaRegistryClient,[
        RecordSerializer::OPTION_REGISTER_MISSING_SCHEMAS=>false,
        RecordSerializer::OPTION_REGISTER_MISSING_SUBJECTS=>false
    ]);
    $record=[];//how to create record ??
    $recordSerializer->encodeRecord('user_topic',$avro,$record);

can anybody explain to me how to create record like in java SDK?

SchemaNotFound when decoding

Hi !

I'm trying to consume a message from a topic. When i try, i get
SchemaNotFoundException (40403)] Schema 577074789 not found
and theoretically i agree. There is no Schema in the registry with that ID but where does that value come from ? The correct schema id would be 42.

Schema default value is not taken into account

Code snipped below doesn't work.
$dataWriter->append(...); seems to not process default value.

Isn't it expected to contain "married" field with "null" value in encoded string? It throws AvroIOTypeException instead

$schema = <<<_JSON
{
 "name":"member",
 "type":"record",
 "fields":[
    {"name": "member_id", "type": "int"},
    {"name": "member_name", "type": "string"},
    {"name": "married", "type": ["null","boolean"], "default": null}
 ]
}
_JSON;

$jose = [
    'member_id' => 1392,
    'member_name' => 'Jose',
    //    'married' => true
];

$writerSchema = AvroSchema::parse($schema);
$io = new AvroStringIO();
$writer = new AvroIODatumWriter($writerSchema);
$dataWriter = new AvroDataIOWriter($io, $writer, $writerSchema);
$dataWriter->append($jose);
$dataWriter->close();
echo $binaryString = $io->string();

schemas registry and refernce to schemas

I'm looking at the library for now, trying to figure out the possibility of using schema references. Is this supported in the library?
For example I have 2 files:

com.myapp.Children.avsc:

{
  "namespace": "com.myapp",
  "type": "record",
  "name": "Children",
  "fields": [
    {"name": "first_name", "type": "string"},
    {"name": "last_name", "type": "string"}
  ]
}

com.myapp.People.avsc:

{
  "namespace": "com.myapp",
  "type": "record",
  "name": "People",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "first_name", "type": "string"},
    {"name": "last_name", "type": "string"},
    {
      "name": "children",
      "type": ["null",
        {
          "type": "array",
          "items": "Children"
        }],
      "doc": ""
    }
  ]
}

And I want to get a schema, that matches People.avsc.

My test code:

        $schemaRegistryClient = new CachedRegistry(
            new PromisingRegistry(
                new Client(['base_uri' => 'http://****:8081'])
            ),
            new AvroObjectCacheAdapter()
        );

        $recordSerializer = new RecordSerializer(
            $schemaRegistryClient,
            [
                // If you want to auto-register missing schemas set this to true
                RecordSerializer::OPTION_REGISTER_MISSING_SCHEMAS => false,
                // If you want to auto-register missing subjects set this to true
                RecordSerializer::OPTION_REGISTER_MISSING_SUBJECTS => false,
            ]
        );

        $baseDir = "./avro";

        $inflector = static function ($record, bool $isKey) {
            echo "Searching for:".get_class($record);
            $ext = $isKey ? '.key.avsc' : '.avsc';
            $fileName = is_object($record)
                ? str_replace('\\', '.', get_class($record))
                : 'default';

            return $fileName . $ext;
        };

        $resolver = new FileResolver($baseDir, $inflector);
        $normalizer = new GetSetMethodNormalizer();
        $encoder = new AvroSerDeEncoder($recordSerializer);

        $symfonySerializer = new Serializer([$normalizer], [$encoder]);

        $children=new Children("Nikolai","Lubiagov");

        $people=new People("Nikolai","Lubiagov", [$children]);

        $schemac= $resolver->valueSchemaFor($children);
        $schemap= $resolver->valueSchemaFor($people);

I don't see any code at all to resolve the link. Or how to transfer the already obtained Children scheme to People, is this supported?

I also can't get already registread schema, with refernces over:
$sc=$schemaRegistryClient->schemaForSubjectAndVersion("phptopic-value",1)->wait();

{
"subject":"phptopic-value",
"version":1,
"id":70,
"references":[
{
"name":"Children",
"subject":"Children",
"version":1
}
],
"schema":"{"type":"record","name":"People","namespace":"com.myapp","fields":[{"name":"id","type":"string"},{"name":"first_name","type":"string"},{"name":"last_name","type":"string"},{"name":"children","type":["null",{"type":"array","items":"Children"}],"doc":""}]}"
}

I found this 322fcd6 for previously defined types, but if type defined in another subject, it is not fit...?

Question: RecordSerializer::decodeMessage

Hey @tPl0ch

I have the following scenario, i have a consumer that is consuming from multiple topics (meaning different schemas). Now lets say the consumer needs to have fixed versions for some of them.
From what i see, i would need to duplicate the first few lines of decodeMessage, namly:

$decoded = decode($binaryMessage);
$schemaId = valueOf($decoded->bind($this->schemaIdGetter));

To check if i have a fixed version for a specific schemaId. Since i don't want to decode twice, i was thinking about maybe splitting decodeMessage in two parts.
I was thinking something in the line of getValidatedMessage,
but not sure if this is the best approach...
Any input would be highly appreciated

Version 2 is not release

in your documentation, you guys write we can use

composer require 'flix-tech/avro-serde-php:^2.0'

but version 2 is not released yet.

Using a custom datum reader

Hi together, first awesome work on this library and also the other 2 regarding avro.
My Question is there any way to overwrite the datum reader with a custom datum reader or is the only possible way for now to overwrite RecordSerializer?

Deprecation warnings

Environment

  • Confluent Kafka running on Confluent Cloud
  • php 8.2.5 running on macOS M1
  • librdkafka: stable 2.1.0 from homebrew
  • rdkafka 6.0.3 from pecl
  • flix-tech/avro-serde-php version 1.7.2
  • jobcloud/php-kafka-lib version 2.0.0

The following deprecation warnings were printed when trying to consume messages from a Kafka topic that had Avro messages. There is no loss of functionality, merely these warnings. I presume at some point they will turn into errors, and hence this is being reported.

Deprecated: Creation of dynamic property AvroPrimitiveSchema::$type is deprecated in /tmp/php/vendor/flix-tech/avro-php/lib/avro/schema.php on line 486
Deprecated: Creation of dynamic property AvroPrimitiveSchema::$logical_type is deprecated in /tmp/php/vendor/flix-tech/avro-php/lib/avro/schema.php on line 491
PHP Deprecated:  Creation of dynamic property AvroPrimitiveSchema::$extra_attributes is deprecated in /tmp/php/vendor/flix-tech/avro-php/lib/avro/schema.php on line 492
PHP Deprecated:  Creation of dynamic property AvroField::$type is deprecated in /tmp/php/vendor/flix-tech/avro-php/lib/avro/schema.php on line 1560

Schema default not taken into account when writing record

Not sure if this is the correct place to raise this issue but not able to raise it directly on https://github.com/flix-tech/avro-php

I may be missing something but I always get an error being thrown from https://github.com/flix-tech/avro-php/blob/master/lib/avro/datum.php#L254 when using default values and not passing any data for that field, for example:

{
    "namespace": "test",
    "type": "record",
    "name": "AccountCreated",
    "fields": [

        {"name": "email", "type" : "string"},
        {"name": "first_name", "type" : "string"},
        {"name": "last_name", "type" : "string"},
        {"name": "test", "type": ["null", "string"], "default": null}
    ]
}

Should the code at https://github.com/flix-tech/avro-php/blob/master/lib/avro/datum.php#L254 be falling back to the default value of the field if the data is not provided? Something like:

 $value = isset($datum[$field->name()]) ? $datum[$field->name()] : $field->default_value();
 $this->write_data($field->type(), $value, $encoder);

This is running in PHP 8.1.3 using avro-sede-php 1.7.1 and running via the AvroSerDeEncoder symfony serializer.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.