Giter VIP home page Giter VIP logo

terraform-provider-ksql's Introduction

terraform-provider-ksql

CircleCI

A Terraform plugin for managing Confluent KSQL Server.

Contents

Installation

Download and extract the latest release to your [terraform plugin directory][third-party-plugins] (typically ~/.terraform.d/plugins/)

Developing

  1. Install go

  2. Clone repository to: $GOPATH/src/github.com/Mongey/terraform-provider-ksql

    mkdir -p $GOPATH/src/github.com/Mongey/terraform-provider-ksql; cd $GOPATH/src/github.com/Mongey/
    git clone https://github.com/Mongey/terraform-provider-ksql.git
  3. Build the provider make build

  4. Run the tests make test

  5. Build the provider make build

Provider Configuration

Example

provider "ksql" {
  url = "http://localhost:8083"
}

Resources

ksql_stream

A resource for managing KSQL streams

resource "ksql_stream" "actions" {
  name = "vip_actions"
  query = "SELECT userid, page, action
              FROM clickstream c
              LEFT JOIN users u ON c.userid = u.user_id
              WHERE u.level =
              'Platinum';"
}

ksql_table

A resource for managing KSQL tables

resource "ksql_table" "users" {
  name = "users-thing"
  query = "SELECT error_code,
            count(*),
            FROM monitoring_stream
            WINDOW TUMBLING (SIZE 1 MINUTE)
            WHERE  type = 'ERROR'
            GROUP BY error_code;"
  }
}

terraform-provider-ksql's People

Contributors

louich avatar mongey avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

terraform-provider-ksql's Issues

Update resource schemas

This is something I came up with recently and wanted to keep track as a possible upcoming feature to improve the resource definition and the state handling (see #2) at the same time.

I wont have the time to help implement that in the next weeks so I wanted to throw the idea and see were it goes from there ๐Ÿ˜‰.

There is a lot of documentation to help figure out the rationale but it's mostly excerpts from the KSQL documentation. Therefore a little hidden the proposed schemas are located into the Proposal sections.

Contents

Objective

Be more "terraform like" in the resource definition and auto-link resources dependency, leading to better state handling and a more declarative resource configuration.

Changes

  • Differentiate CREATE and CREATE AS SELECT
  • Use the query schema field as a map of properties instead of a string
  • Update the resource state handling

Resource schemas

CREATE

Reference

CREATE TABLE table_name ( { column_name data_type } [, ...] )
  WITH ( property_name = expression [, ...] );

The supported column data types are:

  • BOOLEAN
  • INTEGER
  • BIGINT
  • DOUBLE
  • VARCHAR (or STRING)
  • ARRAY<ArrayType> (JSON and AVRO only. Index starts from 0)
  • MAP<VARCHAR, ValueType> (JSON and AVRO only)
  • STRUCT<FieldName FieldType, ...> (JSON and AVRO only) The STRUCT type requires
    you to specify a list of fields. For each field you must specify the field name
    (FieldName) and field type (FieldType). The field type can be any of the supported
    KSQL types, including the complex types MAP, ARRAY, and STRUCT. STRUCT fields
    can be accessed in expressions using the struct dereference (->) operator. See
    Operators for more details.
Property Description
KAFKA_TOPIC (required) The name of the Kafka topic that backs this table. The topic must already exist in Kafka.
VALUE_FORMAT (required) Specifies the serialization format of message values in the topic. Supported formats: JSON, DELIMITED (comma-separated value), and AVRO.
KEY (required for tables)

Associates a field/column within the Kafka message value with the implicit ROWKEY column (message key) in the KSQL table.

KSQL currently requires that the Kafka message key, which will be available as the implicit ROWKEY column in the table, must also be present as a field/column in the message value. You must set the KEY property to this corresponding field/column in the message value, and this column must be in VARCHAR aka STRING format. See Key Requirements for more information.

TIMESTAMP By default, the implicit ROWTIME column is the timestamp of the message in the Kafka topic. The TIMESTAMP property can be used to override ROWTIME with the contents of the specified field/column within the Kafka message value (similar to timestamp extractors in Kafka's Streams API). Timestamps have a millisecond accuracy. Time-based operations, such as windowing, will process a record according to the timestamp in ROWTIME.
TIMESTAMP_FORMAT Used in conjunction with TIMESTAMP. If not set will assume that the timestamp field is a bigint. If it is set, then the TIMESTAMP field must be of type varchar and have a format that can be parsed with the Java DateTimeFormatter. If your timestamp format has characters requiring single quotes, you can escape them with two successive single quotes, '', for example: 'yyyy-MM-dd''T''HH:mm:ssX'. For more information on timestamp formats, see DateTimeFormatter.
WINDOW_TYPE (stream only) By default, the topic is assumed to contain non-windowed data. If the data is windowed i.e. was created using KSQL using a query that contains a WINDOW clause, then the WINDOW_TYPE property can be used to provide the window type. Valid values are SESSION, HOPPING`, and ``TUMBLING.

Proposal

resource "ksql_stream" "stream_resource_name" {
  name = "some stream name"
  
  fields = {
    registertime = {
      "type": "BIGINT",
      "fields": null,
      "memberSchema": null
    }
    gender       = {
      "type": "VARCHAR"
    }
    regionid     = {
      "type": "VARCHAR"
    }
    userid       = {
      "type": "VARCHAR"
    }
  }

  settings = {
    /* Required */
    kafka_topic = "some input topic"
    value_format = 'AVRO' | 'JSON' | 'DELIMITED'
    /* Optional */
    key = "${computed}"
    timestamp = "ROWTIME"
    timestamp_format = ""
    window_type =  "SESSION" | "HOPPING" | "TUMBLING"
  }

}
resource "ksql_table" "table_resource_name" {
  name = "some table name"
  
  fields = {
    registertime = {
      "type": "BIGINT",
      "fields": null,
      "memberSchema": null
    }
    gender       = {
      "type": "VARCHAR"
    }
    regionid     = {
      "type": "VARCHAR"
    }
    userid       = {
      "type": "VARCHAR"
    }
  }

  settings = {
    /* Required */
    kafka_topic = "some input topic"
    value_format = 'AVRO' | 'JSON' | 'DELIMITED'
    key = ""
    /* Optional */
    timestamp = "ROWTIME"
    timestamp_format = ""

  }

}

CREATE AS SELECT

Reference

CREATE STREAM stream_name
  [WITH ( property_name = expression [, ...] )]
  AS SELECT  select_expr [, ...]
  FROM from_stream
  [ LEFT | FULL | INNER ] JOIN [join_table | join_stream] [ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ] ON join_criteria
  [ WHERE condition ]
  [PARTITION BY column_name];
CREATE TABLE table_name
  [WITH ( property_name = expression [, ...] )]
  AS SELECT  select_expr [, ...]
  FROM from_item
  [ LEFT | FULL | INNER ] JOIN join_table ON join_criteria
  [ WINDOW window_expression ]
  [ WHERE condition ]
  [ GROUP BY grouping_expression ]
  [ HAVING having_expression ];
Property Description
KAFKA_TOPIC (required) The name of the Kafka topic that backs this table. The topic must already exist in Kafka.
VALUE_FORMAT (required) Specifies the serialization format of message values in the topic. Supported formats: JSON, DELIMITED (comma-separated value), and AVRO.
KEY (required for tables)

Associates a field/column within the Kafka message value with the implicit ROWKEY column (message key) in the KSQL table.

KSQL currently requires that the Kafka message key, which will be available as the implicit ROWKEY column in the table, must also be present as a field/column in the message value. You must set the KEY property to this corresponding field/column in the message value, and this column must be in VARCHAR aka STRING format. See Key Requirements for more information.

TIMESTAMP By default, the implicit ROWTIME column is the timestamp of the message in the Kafka topic. The TIMESTAMP property can be used to override ROWTIME with the contents of the specified field/column within the Kafka message value (similar to timestamp extractors in Kafka's Streams API). Timestamps have a millisecond accuracy. Time-based operations, such as windowing, will process a record according to the timestamp in ROWTIME.
TIMESTAMP_FORMAT Used in conjunction with TIMESTAMP. If not set will assume that the timestamp field is a bigint. If it is set, then the TIMESTAMP field must be of type varchar and have a format that can be parsed with the Java DateTimeFormatter. If your timestamp format has characters requiring single quotes, you can escape them with two successive single quotes, '', for example: 'yyyy-MM-dd''T''HH:mm:ssX'. For more information on timestamp formats, see DateTimeFormatter.
WINDOW_TYPE (stream only) By default, the topic is assumed to contain non-windowed data. If the data is windowed i.e. was created using KSQL using a query that contains a WINDOW clause, then the WINDOW_TYPE property can be used to provide the window type. Valid values are SESSION, HOPPING`, and ``TUMBLING.

Proposal

resource "ksql_stream_as_select" "stream_as_select_resource_name" {
  name = "some_stream_as_select_name"
  
  query = {
    /* Required */
    select = {
      field1 = "expression"
      ...
    }
    from = ${ksql_stream_or_table}

    /* Optional */
    join = {
      type = "left" | "full" | "inner"
      on = {
        resource = ${ref}
        criteria = ""
      }
      within = ""
    }
    where = ""
    partition_by = "${column_name}"
  }

  settings = {
    /* Required */
    kafka_topic = "some input topic"
    value_format = 'AVRO' | 'JSON' | 'DELIMITED'
    /* Optional */
    partitions = 1 /* Default ksql.sink.partitions */
    replicas = 1
    timestamp = "ROWTIME"
    timestamp_format = ""
  }

  /* Computed from select */
  fields = { }
}
resource "ksql_table_as_select" "table_as_select_resource_name" {
  name = "some_table_as_select_name"
  
  query = {
    /* Required */
    select = {
      field1 = "expression"
      ...
    }
    from = ${ksql_stream_or_table}

    /* Optional */
    join = {
      type = "left" | "full" | "inner"
      on = {
        resource = ${ref}
        criteria = ""
      }
    }
    window = ""
    where = ""
    group_by = ""
    having = ""
  }

  settings = {
    /* Required */
    kafka_topic = "some input topic"
    value_format = 'AVRO' | 'JSON' | 'DELIMITED'
    /* Optional */
    partitions = 1 /* Default ksql.sink.partitions */
    replicas = 1
    timestamp = "ROWTIME"
    timestamp_format = ""
  }
}

Building the CREATE statement

Because of the new format, the schema would have to be thoroughly parsed and digested in order to generate a valid and equivalent KSQL CREATE statement. By doing so, some cases has to be taken into account:

  • If VALUE_FORMAT is not 'AVRO', then the column name should be wrapped in double quotes and exposed AS "[fields.key]" to preserve the case sensitivity, otherwise KSQL will uppercase the field name.
  • The field key representing the field name should always be explicitly defined ([expression] AS [field.key]) in a CREATE AS SELECT resource to avoid having the user do it himself, duplicating the name and suppressing the field key need/reason to be.

Resource state handling

For demo purposes the Mongey ksql client will be abstracted and the Confluent REST API will be used directly instead to show the mapping from the source data to the resource state. Of course the implementation should call the Mongey ksql client containing the DESCRIBE query.

KSQL API

From the KSQL REST API documentation, getting the STREAM and TABLE resources state looks like:

Query:

{
  "ksql": "DESCRIBE [RESOURCE_NAME];",
  "streamsProperties": {}
}

Result:

[
    {
        "@type": "sourceDescription",
        "statementText": "DESCRIBE [RESOURCE_NAME];",
        "sourceDescription": {
            "name": "[RESOURCE_NAME]",
            "readQueries": [
              /* => resource dependencies */
              /* Other queries listening for data */
            ],
            "writeQueries": [
                {
                    "sinks": [
                        "[RESOURCE_NAME]"
                    ],
                    "id": "[QUERY_ID]",
                    "queryString": "CREATE ..."
                }
            ],
            "fields": [
                {
                    "name": "ROWTIME",
                    "schema": {
                        "type": "BIGINT",
                        "fields": null,
                        "memberSchema": null
                    }
                },
                {
                    "name": "ROWKEY",
                    "schema": {
                        "type": "STRING",
                        "fields": null,
                        "memberSchema": null
                    }
                },
                {
                    "name": "[CUSTOM_NAME]",
                    "schema": {
                        "type": "[CUSTOM_TYPES]",
                        "fields": "[CUSTOM_FIELDS]",
                        "memberSchema": "[CUSTOM_MEMBER_SCHEMA]"
                    }
                },
                ...
            ],
            "type": "[STREAM|TABLE]",
            "key": "[value from the `WITH KEY` option]",
            "timestamp": "",
            "statistics": "",
            "errorStats": "",
            "extended": false,
            "format": "[AVRO|JSON|DELIMITED]",
            "topic": "[THE_TOPIC_RELATED_TO_THE_OUTPUT_DATA]",
            "partitions": 0,
            "replication": 0
        }
    }
]

To then make it match the resource state:

{
  "name": "[RESOURCE_NAME]",
  "readQueries": [],
  "writeQueries": [
    {
      "sinks": [
        "[RESOURCE_NAME]"
      ],
      "id": "[QUERY_ID]",
      "queryString": "CREATE ..."
    }
  ],
  "fields": [
      {
          "name": "ROWTIME",
          "schema": {
              "type": "BIGINT",
              "fields": null,
              "memberSchema": null
          }
      },
      {
          "name": "ROWKEY",
          "schema": {
              "type": "STRING",
              "fields": null,
              "memberSchema": null
          }
      },
      {
          "name": "[CUSTOM_NAME]",
          "schema": {
              "type": "[CUSTOM_TYPES]",
              "fields": "[CUSTOM_FIELDS]",
              "memberSchema": "[CUSTOM_MEMBERSCHEMA]"
          }
      },
      ...
  ],
  "type": "[STREAM|TABLE]",
  "key": "[value from the `WITH KEY` option]",
  "timestamp": "",
  "format": "[AVRO|JSON|DELIMITED]",
  "topic": "[THE_TOPIC_RELATED_TO_THE_OUTPUT_DATA]",
  "partitions": 0,
  "replication": 0
}

CREATE

Let's use info as the DESCRIBE returned payload defined above for the schema fields below:

Name Value
name info.name
fields[name] info.fields[i].schema where i fits the fields[name] == info.fields[i].name criteria without case sensitivity
settings.kafka_topic info.topic
settings.value_format info.format
settings.key info.key
settings.timestamp info.timestamp

CREATE AS SELECT

Let's use info as the DESCRIBE returned payload defined above for the schema fields below:

Name Value
name info.name
query.select Parse the writeQueries[0].queryString between SELECT and FROM and split the fields to map them
query.from Resource name matching writeQueries[0].queryString word after FROM
query.join writeQueries[0].queryString parsed value from JOIN to WHERE, PARTITION BY or ;
query.where writeQueries[0].queryString statement from WHERE to PARTITION BY or ;
query.partition_by writeQueries[0].queryString after PARTITION BY
settings.kafka_topic info.topic
settings.value_format partition info.format
settings.partitions info.partitions
settings.replicas info.replication
settings.timestamp info.timestamp

Terraform v0.12 support?

Hi,

I tried to get the plugin v0.0.1 to work in Atlantis v0.8.3 (Terraform v0.12.3)

Error: Failed to instantiate provider "ksql" to obtain schema: Incompatible API version with plugin. Plugin version: 4, Client versions: [5]

I then tried building the plugin from master on MacOsX 18.7.0 Darwin but got:

Error: Failed to instantiate provider "ksql" to obtain schema: fork/exec /home/atlantis/.terraform.d/plugins/linux_amd64/terraform-provider-ksql: exec format error

Is the plugin supposed to work with Terraform โ‰ฅ v0.12?

Thank you for sharing this great work.

More options for the HTTP client

While testing, I have an invalid certificate, and need to be able to skip TLS verification.
We also need to be able to provide basic auth, so I wonder if the provider can have additional options, such as:

provider "ksql" {
   url = "https://localhost:8083"

   skip_tls_verification = true

   auth {
      type     = "basic"
      username = "user"
      password = "password"
   }
}

Update resources state handling

Goals

Terraform plugins basics

  • The create function:
    • Should define the resource unique identifier with d.SetId("[ID]") to indicate the resource was successfully created.

      It uses SetId, a built-in function, to set the ID of the resource to the address. The existence of a non-blank ID is what tells Terraform that a resource was created. This ID can be any string value, but should be a value that can be used to read the resource again.

      ...

      Again, because of the call to SetId, Terraform believes the resource was created. When running plan, Terraform properly determines there are no changes to apply.

    • Should return the resource state function.
  • The destroy function:
    • Should destroy the resource.
    • Can call d.SetId("") to indicate that the resource was sucessfully deleted, but is not necessary since any non-error return value assumes the resource was deleted successfully.
    • Should never update any state on the resource.
  • The read function:
    • Should sync the local state with the actual state (upstream).
    • Should be a read-only operation and therefore Should never modify the real resource.
    • Should update the ID to blank, if the resource no longer exists (maybe it was destroyed out of band).

Proposed changes

Update resources create function to return the read function

As stated in the Terraform doc:

"The create and update function should always return the read function to ensure the state is reflected"
https://www.terraform.io/docs/extend/writing-custom-providers.html#defining-resources

Example from Terraform on table implementation:

func resourceServerCreate(d *schema.ResourceData, m interface{}) error {
        address := d.Get("address").(string)
        d.SetId(address)
        return resourceServerRead(d, m)
}

Suppress diffs for resource name case sensitivity

The resource name should be case insensitive as SQL language does not take capitalization into account and the KSQL server stores the value uppercased. No differences should therefore be seen between capitalization.

Update read function to alter the resource state

func resourceServerRead(d *schema.ResourceData, m interface{}) error {
  client := m.(*MyClient)

  // Attempt to read from an upstream API
  obj, ok := client.Get(d.Id())

  // If the resource does not exist, inform Terraform. We want to immediately
  // return here to prevent further processing.
  if !ok {
    d.SetId("")
    return nil
  }

  d.Set("address", obj.Address)
  return nil
}

All the schema fields should be updated from the upstream configuration for sync
purposes.

Update the state with the DESCRIBE infos

If the schema gets more complex it could be interesting to parse the DESCRIBE query content in order to resolve the resource infos.

KSQL DESCRIBE

Query:

{
  "ksql": "DESCRIBE [RESOURCE_NAME];",
  "streamsProperties": {}
}

Result:

[
    {
        "@type": "sourceDescription",
        "statementText": "DESCRIBE [RESOURCE_NAME];",
        "sourceDescription": {
            "name": "[RESOURCE_NAME]",
            "readQueries": [
              /* => resource dependencies */
              /* Other queries listening for data */
            ],
            "writeQueries": [
                {
                    "sinks": [
                        "[RESOURCE_NAME]"
                    ],
                    "id": "[QUERY_ID]",
                    "queryString": "CREATE ..." // Good state value to compare against for changes
                }
            ],
            "fields": [
                {
                    "name": "ROWTIME",
                    "schema": {
                        "type": "BIGINT",
                        "fields": null,
                        "memberSchema": null
                    }
                },
                {
                    "name": "ROWKEY",
                    "schema": {
                        "type": "STRING",
                        "fields": null,
                        "memberSchema": null
                    }
                },
                {
                    "name": "[CUSTOM_NAME]",
                    "schema": {
                        "type": "[CUSTOM_TYPES]",
                        "fields": "[CUSTOM_FIELDS]",
                        "memberSchema": "[CUSTOM_MEMBER_SCHEMA]"
                    }
                },
                ...
            ],
            "type": "[STREAM|TABLE]",
            "key": "[value from the `WITH KEY` option]",
            "timestamp": "",
            "statistics": "",
            "errorStats": "",
            "extended": false,
            "format": "[AVRO|JSON|DELIMITED]",
            "topic": "[THE_TOPIC_RELATED_TO_THE_OUTPUT_DATA]",
            "partitions": 0,
            "replication": 0
        }
    }
]

Resource state

Not all of the payload from the above is required and relevant regarding the the resource's state considering the actual schema and could be similar to:

{
  "name": sourceDescription.name,
  "query": sourceDescription.writeQueries[index_where_sinks_contains_res_name].queryString
}

It is to note that the query would only be present on a CREATE AS SELECT query type.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.