This is something I came up with recently and wanted to keep track as a possible upcoming feature to improve the resource definition and the state handling (see #2) at the same time.
I wont have the time to help implement that in the next weeks so I wanted to throw the idea and see were it goes from there ๐.
There is a lot of documentation to help figure out the rationale but it's mostly excerpts from the KSQL documentation. Therefore a little hidden the proposed schemas are located into the Proposal
sections.
Contents
Objective
Be more "terraform like" in the resource definition and auto-link resources dependency, leading to better state handling and a more declarative resource configuration.
Changes
- Differentiate
CREATE
and CREATE AS SELECT
- Use the
query
schema field as a map of properties instead of a string
- Update the resource state handling
Resource schemas
CREATE
Reference
CREATE TABLE table_name ( { column_name data_type } [, ...] )
WITH ( property_name = expression [, ...] );
The supported column data types are:
BOOLEAN
INTEGER
BIGINT
DOUBLE
VARCHAR
(or STRING
)
ARRAY<ArrayType>
(JSON and AVRO only. Index starts from 0)
MAP<VARCHAR, ValueType>
(JSON and AVRO only)
STRUCT<FieldName FieldType, ...>
(JSON and AVRO only) The STRUCT type requires
you to specify a list of fields. For each field you must specify the field name
(FieldName) and field type (FieldType). The field type can be any of the supported
KSQL types, including the complex types MAP, ARRAY, and STRUCT. STRUCT fields
can be accessed in expressions using the struct dereference (->) operator. See
Operators for more details.
Property |
Description |
KAFKA_TOPIC (required) |
The name of the Kafka topic that backs this table. The topic must already exist in Kafka.
|
VALUE_FORMAT (required) |
Specifies the serialization format of message values in the topic. Supported formats: JSON , DELIMITED (comma-separated value), and AVRO .
|
KEY (required for tables) |
Associates a field/column within the Kafka message value with the implicit ROWKEY column (message key) in the KSQL table. KSQL currently requires that the Kafka message key, which will be available as the implicit ROWKEY column in the table, must also be present as a field/column in the message value. You must set the KEY property to this corresponding field/column in the message value, and this column must be in VARCHAR aka STRING format. See Key Requirements for more information.
|
TIMESTAMP |
By default, the implicit ROWTIME column is the timestamp of the message in the Kafka topic. The TIMESTAMP property can be used to override ROWTIME with the contents of the specified field/column within the Kafka message value (similar to timestamp extractors in Kafka's Streams API). Timestamps have a millisecond accuracy. Time-based operations, such as windowing, will process a record according to the timestamp in ROWTIME .
|
TIMESTAMP_FORMAT |
Used in conjunction with TIMESTAMP. If not set will assume that the timestamp field is a bigint. If it is set, then the TIMESTAMP field must be of type varchar and have a format that can be parsed with the Java DateTimeFormatter . If your timestamp format has characters requiring single quotes, you can escape them with two successive single quotes, '' , for example: 'yyyy-MM-dd''T''HH:mm:ssX' . For more information on timestamp formats, see DateTimeFormatter.
|
WINDOW_TYPE (stream only) |
By default, the topic is assumed to contain non-windowed data. If the data is windowed i.e. was created using KSQL using a query that contains a WINDOW clause, then the WINDOW_TYPE property can be used to provide the window type. Valid values are SESSION , HOPPING`, and ``TUMBLING .
|
Proposal
resource "ksql_stream" "stream_resource_name" {
name = "some stream name"
fields = {
registertime = {
"type": "BIGINT",
"fields": null,
"memberSchema": null
}
gender = {
"type": "VARCHAR"
}
regionid = {
"type": "VARCHAR"
}
userid = {
"type": "VARCHAR"
}
}
settings = {
/* Required */
kafka_topic = "some input topic"
value_format = 'AVRO' | 'JSON' | 'DELIMITED'
/* Optional */
key = "${computed}"
timestamp = "ROWTIME"
timestamp_format = ""
window_type = "SESSION" | "HOPPING" | "TUMBLING"
}
}
resource "ksql_table" "table_resource_name" {
name = "some table name"
fields = {
registertime = {
"type": "BIGINT",
"fields": null,
"memberSchema": null
}
gender = {
"type": "VARCHAR"
}
regionid = {
"type": "VARCHAR"
}
userid = {
"type": "VARCHAR"
}
}
settings = {
/* Required */
kafka_topic = "some input topic"
value_format = 'AVRO' | 'JSON' | 'DELIMITED'
key = ""
/* Optional */
timestamp = "ROWTIME"
timestamp_format = ""
}
}
CREATE AS SELECT
Reference
CREATE STREAM stream_name
[WITH ( property_name = expression [, ...] )]
AS SELECT select_expr [, ...]
FROM from_stream
[ LEFT | FULL | INNER ] JOIN [join_table | join_stream] [ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ] ON join_criteria
[ WHERE condition ]
[PARTITION BY column_name];
CREATE TABLE table_name
[WITH ( property_name = expression [, ...] )]
AS SELECT select_expr [, ...]
FROM from_item
[ LEFT | FULL | INNER ] JOIN join_table ON join_criteria
[ WINDOW window_expression ]
[ WHERE condition ]
[ GROUP BY grouping_expression ]
[ HAVING having_expression ];
Property |
Description |
KAFKA_TOPIC (required) |
The name of the Kafka topic that backs this table. The topic must already exist in Kafka.
|
VALUE_FORMAT (required) |
Specifies the serialization format of message values in the topic. Supported formats: JSON , DELIMITED (comma-separated value), and AVRO .
|
KEY (required for tables) |
Associates a field/column within the Kafka message value with the implicit ROWKEY column (message key) in the KSQL table. KSQL currently requires that the Kafka message key, which will be available as the implicit ROWKEY column in the table, must also be present as a field/column in the message value. You must set the KEY property to this corresponding field/column in the message value, and this column must be in VARCHAR aka STRING format. See Key Requirements for more information.
|
TIMESTAMP |
By default, the implicit ROWTIME column is the timestamp of the message in the Kafka topic. The TIMESTAMP property can be used to override ROWTIME with the contents of the specified field/column within the Kafka message value (similar to timestamp extractors in Kafka's Streams API). Timestamps have a millisecond accuracy. Time-based operations, such as windowing, will process a record according to the timestamp in ROWTIME .
|
TIMESTAMP_FORMAT |
Used in conjunction with TIMESTAMP. If not set will assume that the timestamp field is a bigint. If it is set, then the TIMESTAMP field must be of type varchar and have a format that can be parsed with the Java DateTimeFormatter . If your timestamp format has characters requiring single quotes, you can escape them with two successive single quotes, '' , for example: 'yyyy-MM-dd''T''HH:mm:ssX' . For more information on timestamp formats, see DateTimeFormatter.
|
WINDOW_TYPE (stream only) |
By default, the topic is assumed to contain non-windowed data. If the data is windowed i.e. was created using KSQL using a query that contains a WINDOW clause, then the WINDOW_TYPE property can be used to provide the window type. Valid values are SESSION , HOPPING`, and ``TUMBLING .
|
Proposal
resource "ksql_stream_as_select" "stream_as_select_resource_name" {
name = "some_stream_as_select_name"
query = {
/* Required */
select = {
field1 = "expression"
...
}
from = ${ksql_stream_or_table}
/* Optional */
join = {
type = "left" | "full" | "inner"
on = {
resource = ${ref}
criteria = ""
}
within = ""
}
where = ""
partition_by = "${column_name}"
}
settings = {
/* Required */
kafka_topic = "some input topic"
value_format = 'AVRO' | 'JSON' | 'DELIMITED'
/* Optional */
partitions = 1 /* Default ksql.sink.partitions */
replicas = 1
timestamp = "ROWTIME"
timestamp_format = ""
}
/* Computed from select */
fields = { }
}
resource "ksql_table_as_select" "table_as_select_resource_name" {
name = "some_table_as_select_name"
query = {
/* Required */
select = {
field1 = "expression"
...
}
from = ${ksql_stream_or_table}
/* Optional */
join = {
type = "left" | "full" | "inner"
on = {
resource = ${ref}
criteria = ""
}
}
window = ""
where = ""
group_by = ""
having = ""
}
settings = {
/* Required */
kafka_topic = "some input topic"
value_format = 'AVRO' | 'JSON' | 'DELIMITED'
/* Optional */
partitions = 1 /* Default ksql.sink.partitions */
replicas = 1
timestamp = "ROWTIME"
timestamp_format = ""
}
}
Building the CREATE
statement
Because of the new format, the schema would have to be thoroughly parsed and digested in order to generate a valid and equivalent KSQL
CREATE
statement. By doing so, some cases has to be taken into account:
- If
VALUE_FORMAT
is not 'AVRO'
, then the column name should be wrapped in double quotes and exposed AS "[fields.key]"
to preserve the case sensitivity, otherwise KSQL
will uppercase the field name.
- The
field
key representing the field name should always be explicitly defined ([expression] AS [field.key]
) in a CREATE AS SELECT
resource to avoid having the user do it himself, duplicating the name and suppressing the field key need/reason to be.
Resource state handling
For demo purposes the Mongey ksql client
will be abstracted and the Confluent REST API
will be used directly instead to show the mapping from the source data to the resource state. Of course the implementation should call the Mongey ksql client
containing the DESCRIBE
query.
KSQL API
From the KSQL REST API documentation, getting the STREAM
and TABLE
resources state looks like:
Query:
{
"ksql": "DESCRIBE [RESOURCE_NAME];",
"streamsProperties": {}
}
Result:
[
{
"@type": "sourceDescription",
"statementText": "DESCRIBE [RESOURCE_NAME];",
"sourceDescription": {
"name": "[RESOURCE_NAME]",
"readQueries": [
/* => resource dependencies */
/* Other queries listening for data */
],
"writeQueries": [
{
"sinks": [
"[RESOURCE_NAME]"
],
"id": "[QUERY_ID]",
"queryString": "CREATE ..."
}
],
"fields": [
{
"name": "ROWTIME",
"schema": {
"type": "BIGINT",
"fields": null,
"memberSchema": null
}
},
{
"name": "ROWKEY",
"schema": {
"type": "STRING",
"fields": null,
"memberSchema": null
}
},
{
"name": "[CUSTOM_NAME]",
"schema": {
"type": "[CUSTOM_TYPES]",
"fields": "[CUSTOM_FIELDS]",
"memberSchema": "[CUSTOM_MEMBER_SCHEMA]"
}
},
...
],
"type": "[STREAM|TABLE]",
"key": "[value from the `WITH KEY` option]",
"timestamp": "",
"statistics": "",
"errorStats": "",
"extended": false,
"format": "[AVRO|JSON|DELIMITED]",
"topic": "[THE_TOPIC_RELATED_TO_THE_OUTPUT_DATA]",
"partitions": 0,
"replication": 0
}
}
]
To then make it match the resource state:
{
"name": "[RESOURCE_NAME]",
"readQueries": [],
"writeQueries": [
{
"sinks": [
"[RESOURCE_NAME]"
],
"id": "[QUERY_ID]",
"queryString": "CREATE ..."
}
],
"fields": [
{
"name": "ROWTIME",
"schema": {
"type": "BIGINT",
"fields": null,
"memberSchema": null
}
},
{
"name": "ROWKEY",
"schema": {
"type": "STRING",
"fields": null,
"memberSchema": null
}
},
{
"name": "[CUSTOM_NAME]",
"schema": {
"type": "[CUSTOM_TYPES]",
"fields": "[CUSTOM_FIELDS]",
"memberSchema": "[CUSTOM_MEMBERSCHEMA]"
}
},
...
],
"type": "[STREAM|TABLE]",
"key": "[value from the `WITH KEY` option]",
"timestamp": "",
"format": "[AVRO|JSON|DELIMITED]",
"topic": "[THE_TOPIC_RELATED_TO_THE_OUTPUT_DATA]",
"partitions": 0,
"replication": 0
}
CREATE
Let's use info
as the DESCRIBE
returned payload defined above for the schema fields below:
Name |
Value |
name |
info.name
|
fields[name] |
info.fields[i].schema
where i fits the
fields[name] == info.fields[i].name
criteria without case sensitivity
|
settings.kafka_topic |
info.topic
|
settings.value_format |
info.format
|
settings.key |
info.key
|
settings.timestamp |
info.timestamp
|
CREATE AS SELECT
Let's use info
as the DESCRIBE
returned payload defined above for the schema fields below:
Name |
Value |
name |
info.name
|
query.select |
Parse the
writeQueries[0].queryString
between SELECT and FROM and split the fields to map them
|
query.from |
Resource name matching
writeQueries[0].queryString
word after FROM
|
query.join |
writeQueries[0].queryString
parsed value from JOIN to WHERE , PARTITION BY or ;
|
query.where |
writeQueries[0].queryString
statement from WHERE to PARTITION BY or ;
|
query.partition_by |
writeQueries[0].queryString
after PARTITION BY
|
settings.kafka_topic |
info.topic
|
settings.value_format |
partition
info.format
|
settings.partitions |
info.partitions
|
settings.replicas |
info.replication
|
settings.timestamp |
info.timestamp
|