bakdata / quick Goto Github PK
View Code? Open in Web Editor NEWThe Fastest Way to Create Live Data Products
Home Page: https://bakdata.github.io/quick
License: Apache License 2.0
The Fastest Way to Create Live Data Products
Home Page: https://bakdata.github.io/quick
License: Apache License 2.0
The manager should prepare the deployment of a range mirror. The manager must pass the range field to the range mirror. The range field is passed by the user with the --range-field <Fieled>
through the Quick CLI. For that, we need a new endpoint in the manager.
Similar to the docu for range queries, we want to add a description of multi subscriptions. This means we will have a part with a basic intro + example and a separate section that goes into details (for developers and interested users).
The gateway cannot retrieve values of a given list with a non-string type (e.g., long or integer).
For a given schema:
type Query {
findProducts(productId: [Int]): [Product] @topic(name: "schema-product-topic-test", keyArgument: "productId")
}
type Product {
productId: Int!,
name: String,
}
{
findProduct(productId: [123, 456]) {
productId
name
}
}
Currently the gateway returns this error:
{
"errors": [
{
"message": "Exception while fetching data (/findProducts) : class java.lang.Integer cannot be cast to class java.lang.String (java.lang.Integer and java.lang.String are in module java.base of loader 'bootstrap')",
"locations": [
{
"line": 2,
"column": 5
}
],
"path": [
"findProducts"
],
"extensions": {
"classification": "DataFetchingException"
}
}
],
"data": {
"findProducts": null
}
}
The ListArgumentFetcher cast the arguments object to the list of strings:
final List<V> results = this.dataFetcherClient.fetchResults((List<String>) arguments);
This cast fails for a list of integers or longs.
We require a GraphQL to Protobuf supporter similar to GraphQLToAvroConverter
.
It will be called when a user creates a new topic with a GraphQL schema and expects Quick to use Protobuf schemas.
As a lash-up solution for #88 we disabled the validation rule (KeyInformation) that caused the clash with the semantics of multi-subscription. We now want to refactor the validation rules so that the KeyInformation rule can be used again.
For MultiSubscriptionFetcher
we only have tests that involve string types.
In the scope of this issue, we want to add extra tests that encompass complex types (avro and proto).
As in the title.
The Gateway needs to prepare the range query request to send to the mirror. This can be done with a range data fetcher. The fetcher then sends a GET request to the mirror endpoint. Example: GET /user-request-mirror/mirror/range/1?from=1&to=2
Last updated: 04.05.2022
Milestone: Protobuf support
Development: 0.7
This issue describes our approach for the support of Protobuf in Quick.
Protobuf is a data format for (de-)serializing data that has gained a lot of support in the Kafka ecosystem recently. It is comparable to Avro, which so far is the only schema format supported by Quick.
We track all related issues in the Protobuf support milestone. As per the roadmap, the development of this feature is planned for Quick 0.7.
With the implementation of this enhancement, Quick supports:
Goal: users can create topics that are backed by Protobuf schemas
First, let's look into what happens when the user creates a new topic. Quick:
The steps affected by the proposed change are 1 and 3:
Quick additionally requires a way to let users decide between Avro and Protobuf. There are (at least) the following two options to implement this:
The advantages of 1 is the flexibility that comes with it. A user can decide per topic creation which schema they want.
However, this can also become repetitive since most use a single format. This also complicates the overall implementation: We would then require a way to propagate the information per topic.
We therefore start with option 2. If users require option 1, we can still add it later.
Goal: components should be able to tell which schema format a topic uses.
All the following goals require a mechanism in place that tells the corresponding components whether the topics use Avro or Protobuf. Since we start with a global environment variable as described in goal 1, this configuration can be used.
Other options that allow more granular configurations are:
Goal: users can ingest data into topics backed by Protobuf schemas
The ingest-service uses the TypeResolver
to transform JSON to Avro. We therefore require an additional implementation of TypeResolver
for Protobuf. The configuration of the TypeResolver
happens in QuickTopicTypeService
. Here Quick has to differentiate between Avro and Protobuf and set the resolver accordingly.
Goal: gateways can query topics backed by Protobuf schemas
This is dependent on goal 6 (mirror). During a GraphQL query, the gateway forwards requests to corresponding mirror applications. The communication between gateway and mirror uses REST + JSON. Therefore, the underlying schema format is transparent from the gateway's point of view.
Goal: gateways can subscribe to topics backed by Protobuf schemas
Similar to the data ingest, the GraphQL subscription uses the SerDe provided by the QuickTopicTypeService
. Since Quick can't know the exact message, it has to use DynamicMessage
. This is similar to the way Quick uses Avro's GenericRecords
currently.
Goal: mirrors can read data from topics backed by Protobuf schemas
As in the data ingest, mirrors use the QuickTopicTypeService
to get TypeResolver
for (de-)serializing data. The resolvers are used to
Thus, the mirror can handle Protobuf with the updated TypeResolver
.
We should consider adding a section in our documentation where we address the supported GraphQL elements. For example, we are not supporting GraphQL Schemas with Union or Interface types.
We're still on Micronaut 2.5 and could benefit from some of latest updates
The mirror should expose the REST API GET /user-request-mirror/mirror/range/1?from=1&to=2
. Moreover, we should extend implement the a getRange
function in the QueryService interface.
What should happen if a user passes --no-point
flag and no range is specified at the same time?
Some possible scenarios:
point=true
and create a topic and the corresponding mirror -> log to the user what happensThe --retention-time
option should only be set with --point
option and not with the --range-field
option
The mirror should be able to read data that was serialized with Protobuf.
This should mainly be done through #24. We have to evaluate if further changes to the mirror are necessary.
The user should switch between both types as required. Right now, Quick assumes all schemas are in Avro.
As described in #17, when creating a new topic, Quick checks whether the corresponding subject for its schema already exists in the schema registry.
We need to make sure the existing mechanism also works with Protobuf.
We require the automatic conversion from the Kafka schema to GraphQL for checking schema compatibility and automatically creating the target schema.
With #3, we therefore need the conversion from Protobuf to GraphQL.
Curenntly, the Internal Topic Registry, does not contain the precise schema type of the topic. Concretely, if Quick is configured with one of the supported schemas (i.e., Avro or Protobuf), the Internal Topic Registry registers each topic only with the value type SCHEMA
.
That the Internal Topic Registry stores the correct schema type in the value type of a topic.
The Internal Topic Registry registers the value type of a topic with a schema as SCHEMA
.
Here is a screenshot of a registered topic with schema:
quick topic create example-topic --key-type int --value-type schema --schema test-gateway.mytype
The TopicController class sets the value type to SCHEMA
. The TopicService does not check if the schema type is Avro or Protobuf.
Consider the scenario in which a user wants to make a query according to the following:
type Query {
getPurchase(id: String): Purchase @topic(name: "purchase", keyArgument: "purchaseId")
}
with a concrete query being:
{
getPurchase(id: "abc") {
productId
}
}
What happens behind the scenes, for example in QueryKeyArgumentFetcher.get()
:
DataFetcherClient
,If we start and end up with JSON, we make unnecessary conversions. Thus, we might refrain from working with different data types (f.e., Double, Protobuf, Avro) in Gateway and work directly with JSON.
For this:
a) MirrorDataFetcherClient
has to be rebuilt so that it does not work with TypeResolver<V>
but with JSON,
b) MirrorClient
should not receive a resolver that works on a given data type but simply on JSON.
Additionally, it might be considered to complete remove the generic V parameter from Gateway.
After upgrading Kafka to 3.1, the ingestion time increased noticeably. I had to add sleeps in the e2e tests after each ingest. We should investigate what the main reason is.
As described in #17, we require a configuration variable for letting users set their desired schema format (e.g., Avro, Protobuf).
Imagine the following scenario, if a user has a topic filled with records and only wants to query them, they only need a mirror that consumes the data from their topic. But now they have to create a topic with quick move their records to the new topic to make them queryable. This limits the users.
We should consi der updating the mirror creation command in the CLI so it sends the correct key and value type. The manager should check the Internal Topic Registry and register the topic with the correct key and value type. This check can be done in the KubernetesMirrorService.
I wanted to test multi-subscriptions using CLI. I created a gateway and applied the following schema:
type Query {
findPurchase(purchaseId: String): Purchase @topic(name: "purchase", keyArgument: "purchaseId")
allPurchases: [Purchase!] @topic(name: "purchase")
}
type Purchase {
purchaseId: String!
productId: Int!
userId: Int!
product: Product @topic(name: "product", keyField: "productId")
amount: Int
price: Price
}
type Product {
productId: Int!
name: String
description: String
price: Price
}
type Price {
total: Float
currency: String
}
type Click {
userId: Int!
timestamp: Int
}
type Subscription {
userStatistics: UserStatistics
}
type UserStatistics {
purchase: Purchase @topic(name: "purchase")
click: Click @topic(name: "click")
}
I received the following error: Internal Server Error: {"type":"errors/serverError","title":"Internal Server Error","code":500,"detail":"An unexpected error occurred:When the return type is not a list for a non-mutation and non-subscription type, key information (keyArgument or keyField) is needed.","uriPath":"/control/schema"} Could not apply schema to gateway: multisubstest
Investigation of the error message led me to the one of the Validation Rules - KeyInformation
The semantics of multi subscription that transfers topic directives from the Subscription type to the user-defined type clashes with this rule.
As described by https://github.com/bakdata/kafka-key-value-store, Quick mirrors should expose information about the different partitions and the corresponding hosts.
Currently, quick only supports point queries. Some use-cases need the support of range queries. Interactive Queries enable querying the state store in Kafka Streams. For range queries we found these approaches:
range()
method in the ReadOnlyKeyValueStore
interface.prefixScan()
method.RangeQuery
class to the IQs.We should evaluate these approaches and set up a road map for the implementation.
The partition routing in the gateway is not working properly for mirrors with range index. There are some bugs and problems I noticed while working with the partition routing more in-depth:
The StreamsStateHost creates a wrong URL. So the request never arrives at the Mirror.
The getResponseFromFallbackService method in MirrorRequestManagerWithFallback creates the wrong URL because it is ignoring the query parameters of the initial URL.
The MirrorDataFetcherClient creates a PartitionedMirrorClient using the String SerDe. This causes that the partition calculation gets wrong. We need to know the type of the key that user is sending with its query request.
That the partition router updates the partition to host map correctly.
The partition router in the gateway fails to fetch the information from a mirror and cannot access the correct mirror.
We should add tracing to Quick to troubleshoot interactions between the Microservices. The traces can be gathered with the API, SDK of OpenTelemetry. For the tracing backend, we can choose between Jeager, Zipkin, or SigNoz.
We use checkstyle for our coding style. But currently, we are not enforcing the check in our CI runs. We should add this check a let the CI fail.
Currently we fetch information about the partition-host mapping once during the initialisation of the PartitionRouter. Should the mapping change, for example because of the addition of another replica, some cache-misses will occur as there is no update-mechanism. Thus, it is advisable to introduce such a mechanism, i.e. the possibility to update the mapping dynamically.
A possible approach:
The Gateway should be able to read and store the information for a range query
type Query {
userRequests(
userId: Int
timestampFrom: Int
timestampTo: Int
): [UserRequests] @topic(name: "user-request-range",
keyArgument: "userId",
rangeFrom: "timestampFrom",
rangeTo: "timestampTo")
}
type UserRequests {
userId: Int
serviceId: Int
timestamp: Int
requests: Int
success: Int
}
This issue is related to #49 and should address its limitations, i.e. adjusting SubscriptionFetcher
and MultiSubscriptionFetcher
to work with JSON.
We've two existing solutions, "Creating and querying real-time Customer Profiles" and "Real-time Monitoring and Analytics", that should be moved into the examples
section of our documentation.
When we have a single mirror, then the situation is clear. All partitions are located on this particular mirror, and when there is a request for the value of a given key, it is retrieved seamlessly.
The problem arises when we have more than one replica of a mirror:
Scenario: Mirror1 stores partitions 1 and 4, and Mirror2 stores partitions 2 and 3. Let's say that we want to get data for the key="x"
. Our hashing function h says that h("x") = 1
which means that the value for the key "x" is stored in the first partition, which is located in Mirror1.
Currently, a request from gateway goes to the (Kubernetes?) service that chooses a replica in a round-robin fashion. This means that it is statistically half of the time wrong. When this happens, the request has to be redirected to the other replica.
Because of the fact that a mirror knows which partitions it stores (it has a mapping between a partitions and a host), we can use this information to introduce routing based on the partition mapping.
Questions:
Extra ideas:
Currently, we just concentrated on range queries with topic directives on the Query type. It would be nice to support range of queries on the field type.
type Query {
product(key: Int, timestampFrom: Int, timestampTo: Int): ProductInfo!
}
type ProductInfo {
key: String!
info: [Info!] @topic(name: "info-topic", keyField: "key", rangeFrom: "timestampFrom", rangeTo: "timestampTo")
}
type Info {
key: Int
timestamp: Int!
}
The TypeResolver
is responsible for (de)-serializing data from and to strings. Quick requires this functionality for Ingest (String -> Protobuf) and Mirror/Gateway (Protobuf -> String).
This can probably be done with Protobuf's included json conversion.
Executing the gateway create command with the -s
flag results in error.
Example:
quick gateway create -s schema.graphql schematest
(quick gateway create --schema schema.graphql schematest
)
results in:
Internal Server Error: An unexpected error occurred:while parsing a block mapping
in 'reader', line 1, column 1:
apiVersion: v1
^
expected <block end>, but found '}'
in 'reader', line 14, column 1:
}
^
schema.graphql is:
type Query {
getPerson(id: ID): Person @topic(name: "person", keyArgument: "id")
getCorporate(id: ID): Corporate @topic(name: "corporate", keyArgument: "id")
}
type Person {
id: ID!
corporateId: ID!
firstName: String
lastName: String
# birthday: String
# birthLocation: String
corporate: Corporate @topic(name: "corporate", keyField: "corporateId")
}
type Corporate {
id: ID!
referenceId: String
name: String
street: String
city: String
}
There is a problem with transferring the schema.graphql to the .yml file with themeleaf. After having corrected the schema.graphql manually so that the indentation is 2 spaces and not 4, the error changes to:
mapping values are not allowed here
in 'reader', line 12, column 35:
findPurchase(purchaseId: String): Purchase @topic(name: "purchas ...
Issue description
Issuing the following command doesn't lead to an error message:
quick gateway create example
Expected behaviour
An error should be displayed
Current behaviour
No error
Steps to Reproduce
quick gateway create example
quick gateway create example
quick gateway create example
Detailed Description
Executing commands one after another yields the following result: Create gateway example (this may take a few seconds)
instead of an error.
Similar behaviour is expected when creating an application or a mirror because the corresponding services also reference the KubernetesManagerClient.deploy
function.
Range queries upper bound should be exclusive.
Sometimes it is desired to use the TestPyPI as an instance of the Python Package Index. For example, a specific new functionality of Quick demands a more recent version of CLI, which has not been deployed yet.
The possibility of choosing the package index version during the process of building a docker image is nice-to-have.
The index and the version of CLI could be passed to the docker build command as arguments. For example:
docker build --build-arg index=test version=0.7.0.dev6
When this is done, the documentation can be adjusted accordingly.
The mirror should implement a custom processor and create the flattened key (with the zero padding) index structure
Regarding streams app deployments, the manager is limited to open (public) container repositories. In order to allow users to deploy an app from a private repo, a possibility to add imagePullSecrets through quick and quick-cli should be added.
Example:
quick app deploy tiny-url-counter \
--registry bakdata \
--image quick-examples-tinyurl \
--tag 0.0.1 \
--imagePullSecrets=secret\
--args input-topics=track-fetch output-topic=count-fetch productive=false
We assume that a user has (uses) only one secret. Checking multiple secrets won’t be supported for now.
Below, some examples which show invalid behaviour or missing checks.
type Product {
id: ID!
name: String!
}
type Query {
getProduct(id: ID): Product @topic(name: "product-topic")
}
The query is missing the keyArgment
but since the mutation rule passes no error will be thrown.
Another example:
type Query {
getProduct(productId: ID): ProductInfo
}
type ProductInfo {
product: Product @topic(name: "product-topic")
url: String @topic(name: "url-topic", keyArgument: "productId")
}
type Mutation {
setClick(clickCount: Long): Long @topic(name: "click-topic")
}
It should contain 2 inputs.
To develop and debug Quick locally, we can use a tool like Telepresence.
We decided to drop the possibility of not creating a mirror index. A mirror index is always created by default. Thus, there is no need for --point
or --no-point
flags. Since Manager has already been adjusted to these flags, it must re-adjusted again.
We should provide documentation and a brief example of how the user can create and query range queries.
Development: 0.8
last update: 06.10.2022
This issue describes our approach for the support of Range queries in Quick.
Goal: the user defines range mirrors that index the data for range queries
During topic creation, the user can pass a --range-field <Fieled>
option. This option deploys a mirror with an extra state store containing the range query index.
Example:
quick topic user-request-range --key integer --value schema --schema gateway.UserRequests --range-field timestamp
This command sends a request to the manager, and the manager prepares the deployment of a mirror called user-request-range
. This mirror creates two indexes:
userId
) and timestamp
userId
)Goal: the user defines the range (from and to) in GraphQL Query type
The user needs to define the range query and arguments in the GraphQL schema. The GraphQL schema should contain the necessary information for the range data fetcher. For simplicity, we decided to extend the @topic
directive. The @topic
directive gets two new arguments, rangeFrom
and rangeTo
. These two arguments define the range for a specific field.
Example:
type Query {
userRequests(
userId: Int
timestampFrom: Int
timestampTo: Int
): [UserRequests] @topic(name: "user-request-range",
keyArgument: "userId",
rangeFrom: "timestampFrom",
rangeTo: "timestampTo")
}
type UserRequests {
userId: Int
serviceId: Int
timestamp: Int
requests: Int
success: Int
}
Goal: extracts the range information and prepares the request to the mirror
Given the example below:
# query from 1 to 2
{
userRequests(userId: 1, timestampFrom: 1, timestampTo: 2) {
requests
}
}
The range data fetcher gets the necessary information and prepares a range call to the mirror range endpoint:
GET /user-request-mirror/mirror/range/1?from=1&to=2
. It is important to notice that the range query is an exclusive range. In other words, the boundary point is not included in the range. For this specific example, so only the value of timestamp 1 is included in the returned value.
Goal: the mirror builds a range index on a separated state store
The mirror needs a new processor to prepare a range index in a separate state store for range queries. Consider the following example. The topic contains the following information:
key (UserId) | value |
---|---|
1 | {timestamp: 1, serviceId: 2, requests: 10, success: 8} |
1 | {timestamp: 2, serviceId: 3, requests: 5, success: 3} |
2 | {timestamp: 1, serviceId: 4, requests: 7, success: 2} |
The range mirror will materialize the topic in RocksDB in two ways:
key | value |
---|---|
1_00000000001 | {timestamp: 1, serviceId: 2, requests: 10, success: 8} |
1_00000000002 | {timestamp: 2, serviceId: 3, requests: 5, success: 3} |
2_00000000001 | {timestamp: 1, serviceId: 4, requests: 7, success: 2} |
key | value |
---|---|
1 | {timestamp: 2, serviceId: 3, requests: 5, success: 3} |
2 | {timestamp: 1, serviceId: 4, requests: 7, success: 2} |
Goal: a flattened string key in the mirror
The mirror implements the processor API to create an index to support range queries. This index is a flattened string with a combination of the topic key and the value for which the range queries are requested. The index needs to pad the values (depending on the type Int
10 digits or Long
19 digits) with zeros to keep the lexicographic order. So to generify the format of the key in the state store: <topicKeyValue>_<zero_paddings><rangeFieldValue>
. In our example, if we have a topic with userId
as its key and want to create a range over the timestamp
, the key in the state store would look like this:
1_00000000001
The flattened key approach will create unique keys for each user in a timestamp. Therefore all the values will be accessible when running a range query.
Goal: a service that calls the Interactive Query API to fetch the data from the range state store
when the request GET /user-request-mirror/mirror/range/<key>?from=<rangeFrom>&to=<rangeTo>
(e.g GET /user-request-mirror/mirror/range/1?from=1&to=2
) is received by the mirror. The mirror creates the range from argument (in the above example, this would be 00000000001_00000000001
and range to (again in the example, this value would be 00000000001_00000000002
) and passes these values to the range method of the IQ puts the values in a list and returns them to the requested gateway.
Kafka 3.0 is already out for a while. We should try whether there are any problems when running on Kafka 3.0.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.