A Terraform plugin for managing Confluent KSQL Server.
Download and extract the latest
release to
your [terraform plugin directory][third-party-plugins] (typically ~/.terraform.d/plugins/
)
- Install go
- Clone repository
- Build the provider
make build
- Run the tests
make test
- Adjust the VERSION
- Run the build
make -f Makefile all
provider "ksql" {
url = "http://localhost:8083"
}
A resource for managing KSQL streams
resource "ksql_stream" "actions" {
name = "vip_actions"
query = "SELECT userid, page, action
FROM clickstream c
LEFT JOIN users u ON c.userid = u.user_id
WHERE u.level =
'Platinum';"
}
the same with just ksql query string:
resource "ksql_stream" "actions" {
ksql = <<EOF
create stream vip_actions SELECT userid, page, action
FROM clickstream c
LEFT JOIN users u ON c.userid = u.user_id
WHERE u.level =
'Platinum';
EOF
}
A resource for managing KSQL tables
resource "ksql_table" "users" {
name = "users-thing"
query = "SELECT error_code,
count(*),
FROM monitoring_stream
WINDOW TUMBLING (SIZE 1 MINUTE)
WHERE type = 'ERROR'
GROUP BY error_code;"
}
}
the same with just ksql query string:
resource "ksql_table" "users" {
ksql = <<EOF
create table users-thing SELECT error_code,
count(*),
FROM monitoring_stream
WINDOW TUMBLING (SIZE 1 MINUTE)
WHERE type = 'ERROR'
GROUP BY error_code;
EOF
}
}
resource "ksql_source_connector" "jdbcconnector" {
ksql = <<EOF
CREATE SOURCE CONNECTOR `jdbc-connector` WITH(
"connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',
"connection.url"='jdbc:postgresql://localhost:5432/my.db',
"mode"='bulk',
"topic.prefix"='jdbc-',
"table.whitelist"='users',
"key"='username');
EOF
}
}
resource "ksql_sink_connector" "docs" {
ksql = <<EOF
CREATE SINK CONNECTOR DOCS WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://localhost:5432/my_db',
'connection.user' = 'pguser',
'connection.password' = 'pgpass',
'tasks.max' = '1',
'topics' = 'docs_avro',
'batch.size' = '3',
'table.name.format' = 'docs',
'auto.create' = 'true',
'auto.evolve' = 'true',
'delete.enabled' = 'false',
'pk.mode' = 'record_key',
'pk.fields' = 'docId',
'insert.mode' = 'UPSERT'
);
EOF
}
}