Giter VIP home page Giter VIP logo

aklivity / zilla Goto Github PK

View Code? Open in Web Editor NEW
517.0 10.0 48.0 23.37 MB

🦎 A multi-protocol edge & service proxy. Seamlessly interface web apps, IoT clients, & microservices to Apache Kafka® via declaratively defined, stateless APIs.

Home Page: https://docs.aklivity.io/zilla

License: Other

Java 98.33% ANTLR 0.18% Shell 0.06% HTML 0.01% CSS 0.01% Dockerfile 0.02% JavaScript 0.01% Smarty 0.01% Lua 1.39%
kafka rest server-sent-events streaming-api grpc mqtt event-driven-architecture proxy event-stream-proxy event-streaming

zilla's People

Contributors

akrambek avatar alfusainey avatar ankitk-me avatar antonmry avatar attilakreiner avatar bmaidics avatar dependabot[bot] avatar dhanushnehru avatar jfallows avatar kalyanimhala avatar llukyanov avatar lukefallows avatar prajwalgraj avatar saakshii12 avatar shresthasurav avatar ttimot24 avatar vordimous avatar voutilad avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

zilla's Issues

Support key in event id field for sse-kafka binding

When using an SSE stream to synchronize a table of snapshots from a compacted Kafka topic, each SSE event needs to convey the message key so that updates for the same key can be easily detected to replace the previous values per distinct key.

Therefore we need to expose the message key in the SSE event, and the message key may not be present in the payload of the message.

Given that custom SSE event fields are not exposed to browser clients, it makes sense to encode that message key into the id field instead.

We already use the id field for last-event-id during stream reconnect, but we can extend it to include the message key (base64) and make it easily parseable by the client as JSON.

id: ["<base64(key)>","<progress>/<etag>"]

Then clients can do JSON.parse(event. lastEventId)[0] to get the message key.

Not every sse-kafka mapping will be used with compacted Kafka topics (which always require a message key for compaction), so there is a possibility that the message key could be null.

In this case we can choose to encode the id field as follows:

id: [null, "<progress>/<etag>"]

Then JSON.parse(event.id)[0] would return null for the message key, which is accurate.

Additionally, the message payload may already contain enough information to extract the message key, making it unnecessary to include an opaque message key in the SSE event id field. This should be captured in the binding configuration .

When message key is omitted from the event id field, then no change in behavior.

id: <progress>/<etag>

Note: see #26 regarding <etag> in above id field syntax.

Mockito test failure only on GitHub Actions

Error:  Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 0.9 s <<< FAILURE! - in io.aklivity.zilla.runtime.cog.http.internal.util.function.ObjectIntBiConsumerTest
Error:  io.aklivity.zilla.runtime.cog.http.internal.util.function.ObjectIntBiConsumerTest.shouldInvokeBeforeAndThenAfter  Time elapsed: 0.886 s  <<< ERROR!
org.mockito.exceptions.base.MockitoException: 

Cannot call abstract real method on java object!
Calling real methods is only possible when mocking non abstract method.
  //correct example:
  when(mockOfConcreteClass.nonAbstractMethod()).thenCallRealMethod();
	at io.aklivity.zilla.runtime.cog.http.internal.util.function.ObjectIntBiConsumerTest.shouldInvokeBeforeAndThenAfter(ObjectIntBiConsumerTest.java:52)

Error:  io.aklivity.zilla.runtime.cog.http.internal.util.function.ObjectIntBiConsumerTest.shouldInvokePrimitiveAccept  Time elapsed: 0.001 s  <<< ERROR!
org.mockito.exceptions.base.MockitoException: 

Cannot call abstract real method on java object!
Calling real methods is only possible when mocking non abstract method.
  //correct example:
  when(mockOfConcreteClass.nonAbstractMethod()).thenCallRealMethod();
	at io.aklivity.zilla.runtime.cog.http.internal.util.function.ObjectIntBiConsumerTest.shouldInvokePrimitiveAccept(ObjectIntBiConsumerTest.java:37)

Support default welcome path in http-filesystem binding

When request arrives for / then support propagating a default filename, such as index.html.

This can be handled by a routing rule such as:

{
    "type" : "http-filesystem",
    "kind": "proxy",
    "routes":
    [
        {
            "when":
            [
                {
                    "path": "/"
                }
            ],
            "exit": "filesystem_server0",
            "with":
            {
                "path": "index.html"
            }
        },
        {
            "when":
            [
                {
                    "path": "/{path}"
                }
            ],
            "exit": "filesystem_server0",
            "with":
            {
                "path": "${params.path}"
            }
        }
    ]
}

However, it may be more helpful to support the concept of a welcome-path in http-filesystem binding options.

{
    "type" : "http-filesystem",
    "kind": "proxy",
    "options":
    {
        "welcome-path": "/index.html"
    }
    "routes":
    [
        {
            "when":
            [
                {
                    "path": "/{path}"
                }
            ],
            "exit": "filesystem_server0",
            "with":
            {
                "path": "${params.path}"
            }
        }
    ]
}

Then when the / request arrives, it is treated as if it was /index.html.

Optimize delete event payload for sse-kafka

Given that the id can contain a reference to the key (and etag), there is no need to send additional information in the payload of the delete event.

According to the HTML5 SSE Specification, each time data: is encountered in the stream, the payload following ("" in this case) is appended, plus a newline.

Then when the event is fully parsed, including potentially multiple data fields, the event is only ignored if the aggregate data is empty. This can only occur if no data field is present in the event. Then the final newline is removed, dispatching an empty payload for events such as:

event:delete\n
id:...\n
data:\n
\n

Today we send base64(key) as the delete event payload, but this is unnecessary as we now have access to the key via the id instead, for all event types including both message and delete.

Support verbatim string keys in sse-kafka event id

We currently support sse-kafka with for event id as follows:

{
  "options":
  {
    "event":
    {
      "id": "[\"${base64(key)}\",\"${progress}\"]"
    }
  }
}

This is often necessary because key is binary in kafka messages.

However, when key is actually a string, then base64 encoding is not needed.

Therefore, we can enhance the supported event id formats to include:

{
  "options":
  {
    "event":
    {
      "id": "[\"${key}\",\"${progress}\"]"
    }
  }
}

Enhance http-kafka idempotency key

When idempotency-key http header is specified by the client, the intention is to detect and prevent duplication of work for repeated requests.

Therefore, we check the reply-to topic for existing responses, correlated by the value of idempotency-key in zilla:correlation-id header.

However, it is possible that the same idempotency-key could be used with a different http request, making the correlated response inappropriate for that distinct request.

Additionally, we need to capture the md5 hash of the request payload and include it in both the request message sent to the request topic and in the filter on the reply-to topic for the correlated response.

This md5 hash can be appended to the http idempotency-key request header value and sent in the zilla:correlation-id header, so the requirements on the kafka service remain unchanged, i.e. copy the opaque value of zilla:correlation-id header on the kafka request message to a corresponding zilla:correlation-id header on the kafka response message.

Then if the same request is repeated, the correlated response will be sent back to the client immediately and the kafka service can safely detect and ignore the duplicate request due to the already processed response.

However, if a different request is sent using the same idempotency-key http header, the md5 hash will differ and the zilla:correlation-id will therefore also differ, causing the new request to be processed by the kafka service as desired.

Simplify configuration for JWT identity provider signing keys

Zilla supports JWT authorization and requires the public signing keys from the JWT identity provider to verify token integrity.

    "guards":
    {
        "jwt0":
        {
           "type":"jwt",
           "options":
           {
              "issuer": "https://auth.example.com/",
              "audience": "https://api.example.com/",
              "keys":
              [
                  ...
              ]
           }
        }
    }

This requires copying the contents of the identity provider supplied JSON Web Key Set (JWKS).

However, identity providers can rotate their signing keys, requiring the configuration to be updated.

Instead, we can add support for specifying the URL where the JSON Web Key Set can be retrieved.

    "guards":
    {
        "jwt0":
        {
           "type":"jwt",
           "options":
           {
              "issuer": "https://auth.example.com/",
              "audience": "https://api.example.com/",
              "keys": "https://auth.example.com/.well-known/jwks.json"
           }
        }
    }

In fact, we can derive a default value for keys URL by appending .well-known/jwks.json to the issuer URL, making it unnecessary to specify the keys in the configuration.

    "guards":
    {
        "jwt0":
        {
           "type":"jwt",
           "options":
           {
              "issuer": "https://auth.example.com/",
              "audience": "https://api.example.com/"
           }
        }
    }

The keys URL can be resolved when parsing configuration on startup.

Systems that prefer to configure the keys explicitly can still do so using the existing syntax.

Require exit be omitted from tcp client configuration

tcp client binding currently requires exit to be present for routes even though the next step is onto the network and not towards another named binding, therefore has no meaning.

In future, we may elect to provide meaning to this exit, such as guiding the traffic out through a specific network interface when more than one is viable.

Therefore, it makes sense to require the exit be omitted now, leaving the possibility of adding new semantics for exit later without breaking backwards compatibility.

Support `{{ mustache }}` syntax in zilla.json

Support {{ mustache }} syntax in zilla.json.

Consider making the root level objects pluggable, with initial support for env to provide access to environment variables from the executing shell.

This avoids the need to put any potentially sensitive information directly into the configuration.

For example:

...
    "options":
    {
        "sasl":
        {
            "mechanism": "plain",
            "username": "{{env.SASL_USERNAME}}",
            "password": "{{env.SASL_PASSWORD}}"
        }
    }
...

Note: initial support for env is already implemented, but disabled by default for now.

Migrate implicit stream open, close, error, bytes metrics to explicit configuration syntax

Note: requires design #100.

Zilla engine synchronously captures the following transmit and receive metrics for every stream flowing over shared memory, for every binding.

  • tx.opens, rx.opens
  • tx.closes, rx.closes
  • tx.errors, rx.errors
  • tx.bytes, rx.bytes

Use the telemetry metrics configuration design to support opt-in for these counter metrics at specific bindings instead.

Record these metrics asynchronously, via passive observation of streams shared memory.

This requires supporting an upper bound on progress for shared memory streams to guarantee that the metrics observer cannot fall more than a full lap behind in the shared memory ring buffer, as this would lead to metrics loss.

Support tombstone messages via sse-kafka binding

Kafka tombstone messages have a value of null.

Per HTML5 SSE specification, events with id field only and no data field would not be dispatched by browsers, instead they would just update the local last-event-id to be used in future during SSE stream reconnect.

  1. Set the last event ID string of the event source to the value of the last event ID buffer. The buffer does not get reset, so the last event ID string of the event source remains set to this value until the next time it is set by the server.
  2. If the data buffer is an empty string, set the data buffer and the event type buffer to the empty string and return.

Therefore, we need to represent tombstone messages using a different SSE event type, such as delete, with a non-empty data field, as well as an id field to advance last-event-id in the same way for tombstone messages as for non-null messages.

This allows UX list based on SSE stream of Kafka snapshots to be kept in sync based on SSE events arriving at the client, with message type SSE events representing upserts and delete type SSE events representing tombstones.

Integrate OpenTelemetry collectors by exporting local metrics over OTLP

Note: requires design #109 and #111.

Support the otlp kind of telemetry exporter, contributed via the telemetry-otlp component.

    "exporters":
    {
      "otlp0":
      {
        "kind": "otlp",
        "endpoint": "otlp://myserver.local:55690"
        "endpoint": "https://example.com:4318"  <-- /v1/traces, /v1/metrics, /v1/logs (json or protobuf, grpc)
        "vault": ...
      }
    }

We need to determine the frequency of metrics reporting and potential aggregation across cores.

It may be simpler to export directly on each local metrics collection core individually until we determine whether or not we need more than one metrics collection core to fully keep up with the multiple data cores.

CacheMergedIT.shouldFetchMergedMessageValues fails only on GitHub Actions

The CacheMergedIT.shouldFetchMergedMessageValues integration test has variable behavior as it merges multiple partition streams into a unified topic stream, where the order of messages in each partition is maintained but the total order of messages across all partitions can vary and still be correct.

We need to find a way to make the merge behavior more deterministic during this IT so that it can pass predictably in all environments and not be susceptible to any race conditions impacting the observed total message order.

Allow list of merged topics in kafka binding options to be optional

A "merged" topic combines messages from all partitions in the same topic into a unified stream while preserving message order per partition but making no guarantees about the ordering of messages across topic partitions. In practice, the progress is quite balanced such that no one topic partition dominates the others in the merged stream.

Allowing kafka binding options merged topics list to be omitted makes it easier to get the configuration correct for simpler cases while still supporting explicitly specifying the only allowed merged topics.

In practice, this involves changing from default deny to default allow for the list of merged topic names.

Implement `zilla dump` command similar to `tcpdump`

The Zilla runtime has a a pluggable command structure, currently used for start and stop.

./zilla
usage: zilla <command> [ <args> ]

Commands are:
    help    Display help information
    start   Start engine
    stop    Stop engine

See 'zilla help <command>' for more information on a specific command.

Add a new zilla dump command to produce a virtual packet capture of Zilla shared memory streams that can be dissected in Wireshark as tcp, tls, http, mqtt, amqp, etc.

Capturing the raw byte stream for each binding in zilla.json in .pcap format is sufficient to let Wireshark dissectors present the structured form of the protocol frames.

Allow tls trustcacerts option to work without vault

According to documentation trustcacerts let you trust public CA certificates, however, to get it to work you still need to reference the vault which is unnecessary.

Currently

        "tls_client0":
        {
            "type" : "tls",
            "kind": "client",
            "vault": "client_truststore",
            "options":
            {
                "trustcacerts": true
            },
            "exit": "tcp_client0"
        },

Expected

        "tls_client0":
        {
            "type" : "tls",
            "kind": "client",
            "options":
            {
                "trustcacerts": true
            },
            "exit": "tcp_client0"
        },

Support `websocket` over `http/2`

websocket predates http/2 and has historically relied on http/1.1 upgrade to unlock full-duplex bidirectional communication over a separate connection.

RFC 8441: Bootstrapping WebSockets with HTTP/2 adds support for websockets over the shared http/2 transport, without creating a separate connection.

Implementing support for this involves awareness at the ws binding of the underlying http version, such as http/1.1 or h2, as the required websocket handshake headers differ between http versions.

We likely need to enhance http binding to expose version in application stream HttpBeginEx to let ws binding behave correctly for either http/1.1 or h2.

Chrome Status
Firefox Status

Support parallel builds

./mvnw -T 1C clean install attempts to execute a parallel build, but needs support for parallel execution from all plugins.

Need to get resolution to moditect/moditect#95.

[WARNING] *****************************************************************
[WARNING] * Your build is requesting parallel execution, but project      *
[WARNING] * contains the following plugin(s) that have goals not marked   *
[WARNING] * as @threadSafe to support parallel building.                  *
[WARNING] * While this /may/ work fine, please look for plugin updates    *
[WARNING] * and/or request plugins be made thread-safe.                   *
[WARNING] * If reporting an issue, report it against the plugin in        *
[WARNING] * question, not against maven-core                              *
[WARNING] *****************************************************************
[WARNING] The following plugins are not marked @threadSafe in zilla::specs::engine.spec:
[WARNING] io.aklivity.zilla:flyweight-maven-plugin:develop-SNAPSHOT
[WARNING] org.moditect:moditect-maven-plugin:1.0.0.RC1
[WARNING] Enable debug to see more precisely which goals are not marked @threadSafe.
[WARNING] *****************************************************************

Note: k3po plugin listens on a tcp port to receive unit test control protocol, so we need a different port for each project running in parallel to avoid collisions.

Ideally, we would run the zilla engine directly from the test, supporting k3po style scripts in a yet-to-be-implemented integration-test binding, allowing us to drive test behavior via configuration without using a tcp control port.

Optimize transfer-encoding for http-kafka correlated response

Currently, when we deliver the correlated response for http-kafka produce capability bindings, we omit the content-length so in HTTP/1.1 the response uses chunked transfer-encoding due to indeterminate length.

Instead we should use the kafka deferred length to set the http response content-length in http-kafka, then HTTP/1.1 will no longer need to use chunked transfer-encoding.

Extract credentials from HTTP path query string even when non-terminal parameter

HTTP path query string can consist of multiple parameters separated by &.

When configured to extract credentials from path query string such as...

                        "credentials":
                        {
                            "query":
                            {
                                "access_token": "{credentials}"
                            }
                        }

...then make sure requests such as the following still extract TOKEN as expected.

https://localhost:9090/?access_token=TOKEN&parameter=value

Feature request: Reqeust limiter

HI,

Was investigating gateway API for Kafka and found Zilla. Thanks for detailed examples which lead straight to the point what Zilla can do and can't.

We are considering to use ZIlla in one of our POC projects where rate limiter is important part.

As feature suggestion would be great to have a rate limiter which is working per IP or per some value in jwt token, similar to "echo:stream scope" is working in https://github.com/aklivity/zilla-examples/tree/main/http.echo.jwt example.

Another good improvement will be shared state about current limits between Zilla peers. Such functionality will block requests on all Zilla entry points at the same time Could be implemented using separate events topic in Kafka Below flow demonstrates an idea:
[ZIlla_1]
[http client] ->[round robin load balancer] -> [Zilla_2] -> [Kafka]
[Zilla_3]

Support additional http specific metrics

Note: requires design #109.

The following http metrics should be contributed to the Zilla engine's awareness of available metrics via the telemetry-http component, consistent with other pluggable aspects of the runtime.

  • http.duration: histogram
  • http.request_size: histogram
  • http.response_size: histogram
  • http.active_requests: updown

Note that the built-in counters for Zilla streams are all counters, so this will introduce new metrics types for updown and histogram.

The nature of the duration metric is also more stateful as there is a need to keep track of the request start timestamp and the later of the request and response end timestamps to capture the full duration of any request-response interaction.

Note: we need to validate that zilla metrics is reporting these new metrics types correctly as well.

Ws to tls proxy misinterprets begin extension

org.agrona.concurrent.AgentTerminationException: java.lang.NullPointerException: Cannot invoke "io.aklivity.zilla.runtime.binding.tcp.internal.types.ProxyAddressFamily.ordinal()" because the return value of "io.aklivity.zilla.runtime.binding.tcp.internal.types.ProxyAddressFW.kind()" is null
	at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.doWork(DispatchAgent.java:572)
	at org.agrona.core/org.agrona.concurrent.AgentRunner.doDutyCycle(AgentRunner.java:291)
	at org.agrona.core/org.agrona.concurrent.AgentRunner.run(AgentRunner.java:164)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException: Cannot invoke "io.aklivity.zilla.runtime.binding.tcp.internal.types.ProxyAddressFamily.ordinal()" because the return value of "io.aklivity.zilla.runtime.binding.tcp.internal.types.ProxyAddressFW.kind()" is null
	at io.aklivity.zilla.runtime.binding.tcp/io.aklivity.zilla.runtime.binding.tcp.internal.types.ProxyAddressFW.tryWrap(ProxyAddressFW.java:79)
	at io.aklivity.zilla.runtime.binding.tcp/io.aklivity.zilla.runtime.binding.tcp.internal.types.stream.ProxyBeginExFW.tryWrap(ProxyBeginExFW.java:52)
	at io.aklivity.zilla.runtime.binding.tcp/io.aklivity.zilla.runtime.binding.tcp.internal.types.OctetsFW.get(OctetsFW.java:15)
	at io.aklivity.zilla.runtime.binding.tcp/io.aklivity.zilla.runtime.binding.tcp.internal.stream.TcpClientFactory.newStream(TcpClientFactory.java:130)
	at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.handleBeginInitial(DispatchAgent.java:1278)
	at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.handleDefaultReadInitial(DispatchAgent.java:1070)
	at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.handleReadInitial(DispatchAgent.java:1010)
	at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.handleRead(DispatchAgent.java:953)
	at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.concurent.ManyToOneRingBuffer.read(ManyToOneRingBuffer.java:181)
	at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.doWork(DispatchAgent.java:566)
	... 3 more
	Suppressed: java.lang.Exception: [engine/data#9]	[0x0909000000000009] streams=[consumeAt=0x000005c8 (0x00000000000005c8), produceAt=0x000005c8 (0x00000000000005c8)]
		at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.doWork(DispatchAgent.java:570)
		... 3 more

Recipe for brew install

Support brew install recipe for zpm.

When running zilla locally without using the docker image, you need to run the following commands.

zpm install
zpm clean --keep-image

Therefore we need to provide an easy way to make zpm available as a local command via brew install.

Support etag in event id field for sse-kafka binding

http-kafka binding uses etag to support conditional operations, such as GET if-none-match or PUT if-match.

When clients use both http-kafka binding and sse-kafka binding in combination, we need to expose the etag header of each kafka message on each SSE event, so that it can be used by the client for PUT if-match to support optimistic locking.

The [progress]/[etag header] format used for etag http header in http-kafka binding can also be used for the id field of each SSE event in sse-kafka binding to make them compatible.

Simplify duplicate request detection at event-driven microservices

In http-kafka binding routes with produce capability for HTTP request-response, we first send the request to a Kafka topic before attempting to fetch the correlated response. This makes sense as the request happens-before the response.

However, when the request is a duplicate we use the same zilla:correlation-id, and so the response may already be present if the event-driven microservice has already processed the original request.

Therefore, we should instead attempt to fetch the correlated response first, and if not yet present, then send the request to Kafka. This will reduce the number of duplicate requests observed by the microservice.

Naturally, there is still a window where the original request has not yet been processed when the duplicate request arrives. Thankfully, with the changes proposed above, this duplicate request window becomes significantly narrower, and is therefore much easier to detect accurately at the event-driven microservice.

When the event-driven microservice detects a duplicate request, it can safely ignore it if the correlated response message has not yet expired, which is much easier to guarantee with the changes above in place.

Provide zilla metrics command to report current values locally

Note: requires design #109.

Zilla current has a zilla load command in the incubator that reports on the following metrics for each binding.

  • tx.opens, rx.opens
  • tx.closes, rx.closes
  • tx.errors, rx.errors
  • tx.bytes, rx.bytes

Rename zilla load command to zilla metrics command and report on using the new local storage structures for metrics.

Given that the list of metrics is now extensible, a tabular layout is probably no longer the best option.

Note that while the above metrics are all counters, there are general requirements to support gauge and histogram metrics as well, which affects the information presented for each metric.

Promote zilla metrics out of the incubator to the runtime.

Configurable isolation level for kafka binding

In kafka binding client, the consumer currently defaults to isolation 0 (read uncommitted).

The configuration in zilla.json can be enhanced to specify isolation of read_uncommitted or read_committed.

For example:

{
    "bindings":
    {
        "kafka_client0":
        {
            "type": "kafka",
            "kind": "client",
            "options":
            {
                "topics":
                [
                    {
                        "name": "items-snapshots",
                        "isolation": "read_committed"
                    }
                ]
            }
            "exit": "tcp_client0"
        }
    }
}

Additionally, mapping bindings that target kafka for produce, such as http-kafka binding, can be enhanced to configure isolation in their with clause when following a route.

{
    "bindings":
    {
        "http_kafka_proxy0":
        {
            "type": "http-kafka",
            "kind": "proxy",
            "routes":
            [
                {
                    "when":
                    [
                        {
                            "path": "/items/{id}"
                        }
                    ],
                    "exit": "kafka_cache_client0",
                    "with":
                    {
                        "capability": "fetch",
                        "topic": "items-snapshots",
                        "key": "${params.id}",
                        "isolation": "read_committed"
                    }
                }
            ]
        }
    }
}

This will require changes to KafkaMergedBeginEx and KafkaFetchBeginEx to convey the required read_committed mode, also internally in the kafka cache_server and cache_client to handle isolation level properly in the cache.

As part of implementing this enhancement, the default isolation level should also change to read_committed.

Design observable metrics configuration syntax

Zilla engine is designed to be observable at near-zero performance impact, so we can use that to provide low overhead metrics.

As with other first class concepts in zilla.yaml, such as vaults and guards, we need a general concept of metrics, independent of how they are stored locally or transmitted remotely, with more specific implementation for observability standards such as OpenTelemetry.

Metrics can fall into different categories, such as counter, gauge or histogram, which has implications on how metrics are stored in Zilla engine before being periodically exported.

Enhance idempotency support for HTTP-Kafka binding

Idempotent HTTP requests correlated over commands and replies Kafka topics must be able to properly handle duplicate requests with the same idempotency-key header.

Typically, both the commands and replies topics are configured for time based retention, so the correlated reply is typically already in the replies topic when the idempotent request is replayed by the client.

Note: the time base retention period for the replies topic effectively dictates the idempotency key expiration window.

Ideally, we would detect this scenario at zilla and avoid sending the replayed request to the commands topic, simply returning the already correlated response from the replies topic to the client. (not currently implemented)

Even if such an enhancement were implemented, we must still consider the race condition where the idempotent request has already been sent to the commands topic, and the reply will be sent but it is not yet present in the replies topic when the idempotent request is replayed, so zilla would not detect the correlated reply and would send the replayed request to the commands topic.

Note: even if zilla would retain some in-memory awareness of previously sent idempotent requests, a peer or restarted zilla instance could receive the replayed request with no such historical awareness.

If the service observes multiple idempotent requests, then it needs to retain knowledge of previously received idempotency keys to prevent duplicate processing.

When such a duplicate request is detected, it can be safely ignored, as long as the correlated response is guaranteed to still be present in the replies topic and not already cleaned up due to time based retention policy.

So the service has to handle this race condition, where the replayed idempotent request arrives on the commands topic after the response on the replies topic has expired.

This implies that the historical awareness at the service of previously observed idempotency keys would need to be synchronized perfectly with the time based retention policy of the replies topic.

Rather than placing this edge case requirement on the service, we can instead implement the proposed enhancement above, where zilla detects the already correlated response in the replies topic and avoids sending the replayed request to the commands topic.

This narrows the time window of the race condition where the service can receive a replayed request to something approximating the request-response round trip time, rather than the full time based retention period of the replies topic, making it straightforward for the service to detect and ignore the replayed request, as the correlated response is still present in the replies topic.

In http-kafka binding, we can change the sync and async implementations to first attempt to fetch the correlated response from the replies topic before producing the request to the commands topic, rather than always producing the request to the commands topic as we do now.

Refer to sse-kafka event id progress as etag instead

Support event id formats to make clear that second array element represent etag that can be used in if-match with http-kafka bindings.

  • "${etag}"
  • "[\"${base64(key)}\",\"${etag}\"]"
  • "[\"${key}\",\"${etag}\"]"

See #42 for non-base64 key.

Update zilla docs example configuration as well.

Review vault placement in configuration syntax

Currently vault is a top-level property of binding, even though it is not used by most binding types.

"tls0":
{
    "type" : "tls",
    "kind": "server",
    "vault": "server",
    "options":
    {
        "keys": [ "localhost" ],
        "sni": [ "localhost" ],
        "alpn": [ "echo" ]
    }
}

Consider moving to a binding-specific option instead, giving:

"tls0":
{
    "type" : "tls",
    "kind": "server",
    "options":
    {
        "vault": "server",
        "keys": [ "localhost" ],
        "sni": [ "localhost" ],
        "alpn": [ "echo" ]
    }
}

Error running http.kafka.oneway from zilla-examples

Describe the bug
I followed the instructions in the http.kafka.oneway/README.md file in the zilla-examples repository and ran into an error.

To Reproduce
Steps to reproduce the behavior:

  1. Go to https://github.com/aklivity/zilla-examples/blob/e3beb1eae6e338f93ada5105e1486ea7d02e8d95/http.kafka.oneway/README.md and follow the instructions there
  2. At the Verify behavior step I got an error with this stack trace when sending the POST request with curl:
org.agrona.concurrent.AgentTerminationException: java.lang.NullPointerException: Cannot invoke "io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaDataExFW.merged()" because "kafkaDataEx" is null
	at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.doWork(DispatchAgent.java:572)
	at org.agrona.core/org.agrona.concurrent.AgentRunner.doDutyCycle(AgentRunner.java:291)
	at org.agrona.core/org.agrona.concurrent.AgentRunner.run(AgentRunner.java:164)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException: Cannot invoke "io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaDataExFW.merged()" because "kafkaDataEx" is null
	at io.aklivity.zilla.runtime.binding.kafka/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory$KafkaUnmergedProduceStream.doProduceInitialData(KafkaMergedFactory.java:2818)
	at io.aklivity.zilla.runtime.binding.kafka/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory$KafkaMergedStream.onMergedInitialData(KafkaMergedFactory.java:1142)
	at io.aklivity.zilla.runtime.binding.kafka/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory$KafkaMergedStream.onMergedMessage(KafkaMergedFactory.java:1033)
	at io.aklivity.zilla.runtime.binding.kafka/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory.lambda$newStream$4(KafkaMergedFactory.java:228)
	at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.handleReadInitial(DispatchAgent.java:988)
	at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.handleRead(DispatchAgent.java:953)
	at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.concurent.ManyToOneRingBuffer.read(ManyToOneRingBuffer.java:181)
	at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.doWork(DispatchAgent.java:566)
	... 3 more
	Suppressed: java.lang.Exception: [engine/data#3]	[0x0303000000000007] streams=[consumeAt=0x00001658 (0x0000000000001658), produceAt=0x00001770 (0x0000000000001770)]
		at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.doWork(DispatchAgent.java:570)
		... 3 more
stopped

Expected behavior
The curl command in the instructions should work well.

Desktop:

  • OS: macOS Big Sur 11.6.8

Review Kafka authentication methods

Kafka client currently supports mTLS via TLS client certificates.

Review alternative authentication methods and consider adding Kafka client support.

  • SASL / JAAS
  • SASL / GSSAPI (Kerberos)
  • SASL / OAUTHBEARER
  • SASL / PLAIN #89
  • SASL / SCRAM #126
  • Delegation Tokens (SASL / SSL)

Question: how does zilla work

I see that it's a event driven gateway.

Question is that can we take this and deploy my own instance? Does zilla instance connect to any external endpoints? Then publish the message to the kafka topic/kafka server? Or is every message store in zilla instance?

Zilla build fails on timeout

Describe the bug
I tried building the Zilla sources at least three times and every time, the zilla::runtime::binding-tcp module fails the build

To Reproduce
Run ./mvnw clean install

Expected behavior
The build should complete without failure

Screenshots

...
[INFO] Running io.aklivity.zilla.runtime.binding.tcp.internal.streams.ServerPartialWriteLimitsIT
[INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.759 s - in io.aklivity.zilla.runtime.binding.tcp.internal.streams.ServerPartialWriteLimitsIT
[INFO] Running io.aklivity.zilla.runtime.binding.tcp.internal.streams.ClientResetAndAbortIT
[ERROR] Tests run: 4, Failures: 0, Errors: 3, Skipped: 0, Time elapsed: 16.354 s <<< FAILURE! - in io.aklivity.zilla.runtime.binding.tcp.internal.streams.ClientResetAndAbortIT
[ERROR] io.aklivity.zilla.runtime.binding.tcp.internal.streams.ClientResetAndAbortIT.shouldShutdownOutputAndInputWhenClientSendsAbortAndReset  Time elapsed: 5.418 s  <<< ERROR!
org.junit.runners.model.TestTimedOutException: test timed out after 5 seconds
	at io.aklivity.zilla.runtime.binding.tcp.internal.streams.ClientResetAndAbortIT.shouldShutdownOutputAndInputWhenClientSendsAbortAndReset(ClientResetAndAbortIT.java:120)

[ERROR] io.aklivity.zilla.runtime.binding.tcp.internal.streams.ClientResetAndAbortIT.shouldShutdownInputWhenClientSendsReset  Time elapsed: 5.439 s  <<< ERROR!
org.junit.runners.model.TestTimedOutException: test timed out after 5 seconds
	at io.aklivity.zilla.runtime.binding.tcp.internal.streams.ClientResetAndAbortIT.shouldShutdownInputWhenClientSendsReset(ClientResetAndAbortIT.java:161)

[ERROR] io.aklivity.zilla.runtime.binding.tcp.internal.streams.ClientResetAndAbortIT.shouldShutdownOutputAndInputWhenClientSendsResetAndEnd  Time elapsed: 5.369 s  <<< ERROR!
org.junit.runners.model.TestTimedOutException: test timed out after 5 seconds
	at io.aklivity.zilla.runtime.binding.tcp.internal.streams.ClientResetAndAbortIT.shouldShutdownOutputAndInputWhenClientSendsResetAndEnd(ClientResetAndAbortIT.java:199)

[INFO] Running io.aklivity.zilla.runtime.binding.tcp.internal.streams.ClientLimitsIT
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.179 s - in io.aklivity.zilla.runtime.binding.tcp.internal.streams.ClientLimitsIT
[INFO] Running io.aklivity.zilla.runtime.binding.tcp.internal.streams.ClientIOExceptionFromWriteIT
[INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.176 s - in io.aklivity.zilla.runtime.binding.tcp.internal.streams.ClientIOExceptionFromWriteIT
[INFO] Running io.aklivity.zilla.runtime.binding.tcp.internal.streams.ClientIOExceptionFromReadIT
[INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.307 s - in io.aklivity.zilla.runtime.binding.tcp.internal.streams.ClientIOExceptionFromReadIT
[INFO] Running io.aklivity.zilla.runtime.binding.tcp.internal.streams.ClientRoutingIT
[WARNING] Tests run: 8, Failures: 0, Errors: 0, Skipped: 2, Time elapsed: 1.476 s - in io.aklivity.zilla.runtime.binding.tcp.internal.streams.ClientRoutingIT
[INFO] Running io.aklivity.zilla.runtime.binding.tcp.internal.streams.ServerResetAndAbortIT
[ERROR] Tests run: 4, Failures: 0, Errors: 3, Skipped: 0, Time elapsed: 16.76 s <<< FAILURE! - in io.aklivity.zilla.runtime.binding.tcp.internal.streams.ServerResetAndAbortIT
[ERROR] io.aklivity.zilla.runtime.binding.tcp.internal.streams.ServerResetAndAbortIT.shouldShutdownOutputAndInputWhenServerSendsResetAndEnd  Time elapsed: 5.579 s  <<< ERROR!
org.junit.runners.model.TestTimedOutException: test timed out after 5 seconds
	at io.aklivity.zilla.runtime.binding.tcp.internal.streams.ServerResetAndAbortIT.shouldShutdownOutputAndInputWhenServerSendsResetAndEnd(ServerResetAndAbortIT.java:187)

[ERROR] io.aklivity.zilla.runtime.binding.tcp.internal.streams.ServerResetAndAbortIT.shouldShutdownOutputAndInputWhenServerSendsAbortAndReset  Time elapsed: 5.372 s  <<< ERROR!
org.junit.runners.model.TestTimedOutException: test timed out after 5 seconds
	at io.aklivity.zilla.runtime.binding.tcp.internal.streams.ServerResetAndAbortIT.shouldShutdownOutputAndInputWhenServerSendsAbortAndReset(ServerResetAndAbortIT.java:112)

[ERROR] io.aklivity.zilla.runtime.binding.tcp.internal.streams.ServerResetAndAbortIT.shouldShutdownInputWhenServerSendsReset  Time elapsed: 5.606 s  <<< ERROR!
org.junit.runners.model.TestTimedOutException: test timed out after 5 seconds
	at io.aklivity.zilla.runtime.binding.tcp.internal.streams.ServerResetAndAbortIT.shouldShutdownInputWhenServerSendsReset(ServerResetAndAbortIT.java:149)

...

[INFO]
[INFO] Results:
[INFO]
[ERROR] Errors:
[ERROR]   ClientResetAndAbortIT.shouldShutdownInputWhenClientSendsReset:161 » TestTimedOut
[ERROR]   ClientResetAndAbortIT.shouldShutdownOutputAndInputWhenClientSendsAbortAndReset:120 » TestTimedOut
[ERROR]   ClientResetAndAbortIT.shouldShutdownOutputAndInputWhenClientSendsResetAndEnd:199 » TestTimedOut
[ERROR]   ServerResetAndAbortIT.shouldShutdownInputWhenServerSendsReset:149 » TestTimedOut
[ERROR]   ServerResetAndAbortIT.shouldShutdownOutputAndInputWhenServerSendsAbortAndReset:112 » TestTimedOut
[ERROR]   ServerResetAndAbortIT.shouldShutdownOutputAndInputWhenServerSendsResetAndEnd:187 » TestTimedOut
[INFO]
[ERROR] Tests run: 83, Failures: 0, Errors: 6, Skipped: 4

...

[INFO]
[INFO] Results:
[INFO]
[ERROR] Errors:
[ERROR]   ClientResetAndAbortIT.shouldShutdownInputWhenClientSendsReset:161 » TestTimedOut
[ERROR]   ClientResetAndAbortIT.shouldShutdownOutputAndInputWhenClientSendsAbortAndReset:120 » TestTimedOut
[ERROR]   ClientResetAndAbortIT.shouldShutdownOutputAndInputWhenClientSendsResetAndEnd:199 » TestTimedOut
[ERROR]   ServerResetAndAbortIT.shouldShutdownInputWhenServerSendsReset:149 » TestTimedOut
[ERROR]   ServerResetAndAbortIT.shouldShutdownOutputAndInputWhenServerSendsAbortAndReset:112 » TestTimedOut
[ERROR]   ServerResetAndAbortIT.shouldShutdownOutputAndInputWhenServerSendsResetAndEnd:187 » TestTimedOut
[INFO]
[ERROR] Tests run: 83, Failures: 0, Errors: 6, Skipped: 4

...

[INFO] Reactor Summary for zilla develop-SNAPSHOT:
[INFO]
[INFO] zilla .............................................. SUCCESS [  0.733 s]
[INFO] zilla::conf ........................................ SUCCESS [  1.122 s]
[INFO] zilla::build ....................................... SUCCESS [  0.010 s]
[INFO] zilla::build::flyweight-maven-plugin ............... SUCCESS [ 37.767 s]
[INFO] zilla::specs ....................................... SUCCESS [  0.014 s]
[INFO] zilla::specs::engine.spec .......................... SUCCESS [  4.632 s]
[INFO] zilla::runtime ..................................... SUCCESS [  0.012 s]
[INFO] zilla::runtime::engine ............................. SUCCESS [ 46.063 s]
[INFO] zilla::specs::binding-echo.spec .................... SUCCESS [  5.455 s]
[INFO] zilla::specs::binding-fan.spec ..................... SUCCESS [  4.926 s]
[INFO] zilla::specs::binding-filesystem.spec .............. SUCCESS [  5.444 s]
[INFO] zilla::specs::binding-proxy.spec ................... SUCCESS [ 18.539 s]
[INFO] zilla::specs::binding-tcp.spec ..................... SUCCESS [ 12.152 s]
[INFO] zilla::specs::vault-filesystem.spec ................ SUCCESS [  1.620 s]
[INFO] zilla::specs::binding-tls.spec ..................... SUCCESS [ 23.293 s]
[INFO] zilla::specs::binding-http.spec .................... SUCCESS [01:01 min]
[INFO] zilla::specs::binding-sse.spec ..................... SUCCESS [ 21.574 s]
[INFO] zilla::specs::binding-ws.spec ...................... SUCCESS [ 37.220 s]
[INFO] zilla::specs::binding-kafka.spec ................... SUCCESS [ 40.713 s]
[INFO] zilla::specs::binding-http-filesystem.spec ......... SUCCESS [  5.634 s]
[INFO] zilla::specs::binding-http-kafka.spec .............. SUCCESS [ 21.063 s]
[INFO] zilla::specs::binding-sse-kafka.spec ............... SUCCESS [  8.788 s]
[INFO] zilla::specs::guard-jwt.spec ....................... SUCCESS [  1.688 s]
[INFO] zilla::runtime::binding-echo ....................... SUCCESS [  4.284 s]
[INFO] zilla::runtime::binding-fan ........................ SUCCESS [  5.197 s]
[INFO] zilla::runtime::binding-filesystem ................. SUCCESS [  6.111 s]
[INFO] zilla::runtime::binding-proxy ...................... SUCCESS [ 20.880 s]
[INFO] zilla::runtime::binding-tcp ........................ FAILURE [01:00 min]
[INFO] zilla::runtime::vault-filesystem ................... SKIPPED

Additional context
Output of ./mvnw -v :

Apache Maven 3.8.4 (9b656c72d54e5bacbed989b64718c159fe39b537)
Maven home: $HOME/.m2/wrapper/dists/apache-maven-3.8.4-bin/52ccbt68d252mdldqsfsn03jlf/apache-maven-3.8.4
Java version: 11.0.11, vendor: AdoptOpenJDK, runtime: $HOME/.sdkman/candidates/java/11.0.11.j9-adpt
Default locale: en_US, platform encoding: UTF-8
OS name: "mac os x", version: "10.16", arch: "x86_64", family: "mac"

Malformed if-match value triggers exception

Steps to reproduce:

  1. Binding config
"http_kafka_proxy0":

        {
            "type" : "http-kafka",
            "kind": "proxy",
            "routes":
            [
                {
                    "when":
                    [
                        {
                            "method": "PUT",
                            "path": "/items/{id}"
                        },
                       "with":
                      {
                          "capability": "produce",
                          "topic": "items-commands",
                          "key": "${params.id}",
                          "reply-to": "item-replies"
                      }
                        .....
  1. Curl command:
curl -v \
    -X "PUT" "http://localhost:8080/items/9417E83E-313E-468E-AC7C-1BCE0BAF9712" \
    -H "Idempotency-Key: `uuidgen`" \
    -H "Content-Type: application/json" \
    -H "If-Match: wrong" \
    -d "{\"@type\": \"UpdateItemCommand\", \"description\": \"Test1\"}"

Exception:

org.agrona.concurrent.AgentTerminationException: java.lang.IllegalArgumentException: negative length: -1

at [email protected]/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.doWork(DispatchAgent.java:572)

at org.agrona.core/org.agrona.concurrent.AgentRunner.doDutyCycle(AgentRunner.java:291)

at org.agrona.core/org.agrona.concurrent.AgentRunner.run(AgentRunner.java:164)

at java.base/java.lang.Thread.run(Thread.java:833)

Caused by: java.lang.IllegalArgumentException: negative length: -1

at org.agrona.core/org.agrona.concurrent.UnsafeBuffer.lengthCheck(UnsafeBuffer.java:1705)

at org.agrona.core/org.agrona.concurrent.UnsafeBuffer.putBytes(UnsafeBuffer.java:944)

at [email protected]/io.aklivity.zilla.runtime.binding.http.kafka.internal.types.OctetsFW$Builder.set(OctetsFW.java:77)

at [email protected]/io.aklivity.zilla.runtime.binding.http.kafka.internal.types.KafkaHeaderFW$Builder.value(KafkaHeaderFW.java:256)

at [email protected]/io.aklivity.zilla.runtime.binding.http.kafka.internal.config.HttpKafkaWithProduceResult.lambda$header$3(HttpKafkaWithProduceResult.java:168)

at [email protected]/io.aklivity.zilla.runtime.binding.http.kafka.internal.types.Array32FW$Builder.item(Array32FW.java:192)

at [email protected]/io.aklivity.zilla.runtime.binding.http.kafka.internal.config.HttpKafkaWithProduceResult.header(HttpKafkaWithProduceResult.java:164)

at [email protected]/io.aklivity.zilla.runtime.binding.http.kafka.internal.config.HttpKafkaWithProduceResult.lambda$headers$1(HttpKafkaWithProduceResult.java:131)

at [email protected]/io.aklivity.zilla.runtime.binding.http.kafka.internal.types.Array32FW.forEach(Array32FW.java:68)

at [email protected]/io.aklivity.zilla.runtime.binding.http.kafka.internal.config.HttpKafkaWithProduceResult.headers(HttpKafkaWithProduceResult.java:131)

at [email protected]/io.aklivity.zilla.runtime.binding.http.kafka.internal.stream.HttpKafkaProxyFactory$HttpProduceSyncProxy.lambda$onHttpBegin$2(HttpKafkaProxyFactory.java:3179)

at [email protected]/io.aklivity.zilla.runtime.binding.http.kafka.internal.types.stream.KafkaMergedDataExFW$Builder.headers(KafkaMergedDataExFW.java:300)

at [email protected]/io.aklivity.zilla.runtime.binding.http.kafka.internal.stream.HttpKafkaProxyFactory$HttpProduceSyncProxy.lambda$onHttpBegin$3(HttpKafkaProxyFactory.java:3179)

at [email protected]/io.aklivity.zilla.runtime.binding.http.kafka.internal.types.stream.KafkaDataExFW$Builder.merged(KafkaDataExFW.java:265)

at [email protected]/io.aklivity.zilla.runtime.binding.http.kafka.internal.stream.HttpKafkaProxyFactory$HttpProduceSyncProxy.onHttpBegin(HttpKafkaProxyFactory.java:3174)

at [email protected]/io.aklivity.zilla.runtime.binding.http.kafka.internal.stream.HttpKafkaProxyFactory$HttpProduceSyncProxy.onHttpMessage(HttpKafkaProxyFactory.java:3106)

at [email protected]/io.aklivity.zilla.runtime.binding.http.kafka.internal.stream.HttpKafkaProxyFactory.lambda$newStream$4(HttpKafkaProxyFactory.java:224)

at [email protected]/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.handleDefaultReadInitial(DispatchAgent.java:1073)

at [email protected]/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.handleReadInitial(DispatchAgent.java:1010)

at [email protected]/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.handleRead(DispatchAgent.java:953)

at [email protected]/io.aklivity.zilla.runtime.engine.internal.concurent.ManyToOneRingBuffer.read(ManyToOneRingBuffer.java:181)

at [email protected]/io.aklivity.zilla.runtime.engine.internal.registry.DispatchAgent.doWork(DispatchAgent.java:566)

Enhance http binding to support header inclusion, exclusion and injection

http binding currently supports the following options:

{
  "binding":
  {
    "options":
    {
      "overrides":
      {
        "extra-header-name": "extra-header-value"
      }
    }
  }
}

Even though this is referred to as overrides, the behavior is actually injection.

Instead we can support specific header inclusion, exclusion and injection.

{
  "binding":
  {
    "options":
    {
      "headers":
      {
        "includes": [ "*" ],     <-- defaults to all headers, backwards compatible
        "excludes": [ ],         <-- defaults to no exclusions, backwards compatible
        "injects":               <-- defaults to no injections, equivalent to `overrides`, backwards compatible
        {
          "extra-header-name": "extra-header-value"
        }
      }
    }
  }
}

Note we likely need to differentiate between request and response headers for full treatment
Note we might need to expose this functionality per http binding route instead of just at the binding level.

Enhance http client binding to support h2 protocol

Supporting http2 as a client includes:

  • http2 without http/1.1 upgrade, like curl --http2-prior-knowledge ..., when options: { versions: [ "h2" ] }
  • http/1.1 upgrade to http/2, via upgrade: h2c, when options: { versions: [ "http/1.1", "h2" ] }
  • tls alpn negotiate h2 instead of http/1.1 (default), when options: { versions: [ "http/1.1", "h2" ] }

Support HTTP/1.1 upgrade to HTTP/2 via h2c

Support HTTP/1.1 implicitly via http/1.1.
Support HTTP/2 implicitly, assuming client prior knowledge of server HTTP/2 support via h2.
Support HTTP/1.1 upgrade to HTTP/2 via h2c.

See https://httpwg.org/specs/rfc7540.html#discover-http.

See specs added for h2c upgrade when starting HTTP/2.

Implementation approach:

on decoded http 1.1 headers
  normally creates an HttpExchange and sends BEGIN w/ HttpBeginEx
  if upgrade: htc + HTTP2Settings detected...
    send 101 switching protocols w/ upgrade: h2c
    handoff to Http2Server stream for further decode and encode
      HttpServer (http/1.1) will delegate directly to Http2Server::onNetMessage
      create an HttpExchange (http2) and send BEGIN w/ HttpBeginEx
      initial decode state allows body to be decoded for (upgraded) request body
      lack of flow control prevents response from being sent to client
      client sends PRI ..., then SETTINGS, then WINDOW_UPDATE
      on flow control available, WINDOW sent to HttpExchange (http2) response
      response encoded via HTTP/2 stream #1 as required

Investigate GitHub Actions build inconsistencies

The following ITs are failing occasionally when running builds on GitHub Actions, even though local builds are consistently passing.

One option would be to provide access to diagnostic results from target directory by uploading them as a zipped artifact using the upload-artifact action. Then after the GitHub Actions build breaks again, with artifacts uploaded, we can download those artifacts to diagnose and repair the build by addressing whatever race conditions are causing these IT failures on GitHub Actions only.

See https://docs.github.com/en/actions/using-workflows/storing-workflow-data-as-artifacts#example.

zilla::runtime::binding-tls ClientIT.shouldReceiveServerSentWriteAbort
https://github.com/aklivity/zilla/runs/6168041575

zilla::runtime::binding-tls ClientIT.shouldReceiveServerSentReadAbort
https://github.com/aklivity/zilla/runs/6714745603

zilla::runtime::binding-tls ClientIT.shouldReceiveClientSentReadAbort
https://github.com/aklivity/zilla/actions/runs/2387455848

zilla::runtime::binding-tcp ServerLimitsIT.shouldResetWhenWindowIsExceeded
https://github.com/aklivity/zilla/runs/6136947505

zilla::runtime::binding-proxy ProxyServerIT.shouldRejectTcp4WithSslUnderflow
https://github.com/aklivity/zilla/runs/6136454299

zilla::runtime::binding-proxy ProxyServerIT.shouldRejectTcp4WithCrc32cOverflow
https://github.com/aklivity/zilla/runs/6136437911

zilla::runtime::binding-proxy ProxyServerIT.shouldRejectTcp4WithCrc32cMismatch
https://github.com/aklivity/zilla/runs/6119880141

zilla::runtime::binding-proxy ProxyServerIT.shouldRejectTcp4WithCrc32cUnderflow
https://github.com/aklivity/zilla/runs/6566099692

zilla::runtime::binding-sse ReconnectIT.shouldReconnectWithLastEventIdOnEnd
https://github.com/aklivity/zilla/runs/5647995097

zilla::runtime::binding-kafka `CacheMergedIT.shouldFetchMergedMessagesWithNoFilter
https://github.com/aklivity/zilla/actions/runs/2430968606

Ignore failing tests as temporary workarounds

Parallelize kafka cache writers

kafka cache binding splits into 2 responsibilities; cache server (writer) and cache client (reader).

Ordering is maintained per Kafka partition by aligning each cache server as a single writer, while multiple cache clients can read from different worker threads in parallel.

By default, each binding runs on all available workers, though it can be tuned to only participate on specific workers.

When a kafka cache server binding runs on multiple workers, it will fail because the cached partition files already exist.

Therefore, we have several approaches:

  • constrain the engine to use only 1 worker (workaround)
  • constrain the kafka cache server to a single worker (bugfix)
  • consistently route the each topic partition to the same cache server worker, while allowing different topic partitions to operate in parallel (enhancement)

This second option also requires that the discovered topic partition leader metadata is discoverable across cores, especially when a topic partition leader changes.

Note: when parallelizing cache server, we need to consider non-fetch behavior, such as group streams for Kafka consumer groups, and potentially provide affinity to the group coordinator to ensure consistent worker-specific group instance id.

Configurable acknowledgement mode for kafka binding

In kafka binding client, the producer currently defaults to acks -1 (all = in sync replicas).

Mapping bindings that target kafka for produce, such as http-kafka binding, can be enhanced to configure acks in their with clause when following a route.

{
    "bindings":
    {
        "http_kafka_proxy0":
        {
            "type": "http-kafka",
            "kind": "proxy",
            "routes":
            [
                {
                    "when":
                    [
                        {
                            "path": "/items/{id}"
                        }
                    ],
                    "exit": "kafka_cache_client0",
                    "with":
                    {
                        "capability": "produce",
                        "topic": "items-requests",
                        "key": "${params.id}",
                        "acks": "leader_only",
                        "reply-to": "items-responses"
                    }
                }
            ]
        }
    }
}

This will require changes to KafkaMergedBeginEx and KafkaProduceBeginEx to convey the required acks mode, also internally in the kafka cache_server and cache_client to propagate the acks mode across the cache.

As part of implementing this enhancement, the default acks mode should remain as in_sync_replicas.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.