confluentinc / ksql Goto Github PK
View Code? Open in Web Editor NEWThe database purpose-built for stream processing applications.
Home Page: https://ksqldb.io
License: Other
The database purpose-built for stream processing applications.
Home Page: https://ksqldb.io
License: Other
ksql> CREATE STREAM my_logs_stream ( "@timestamp" varchar, "@version" bigint, message
varchar, logger_name varchar, thread_name varchar, level varchar, level_value bigint, HOSTNAME
varchar, app varchar) WITH (kafka_topic='applogs', value_format='JSON');
SELECT app, HOSTNAME, "@version" FROM my_logs_stream LIMIT 3;
Error:
Line 1, Column 17: Unexpected token "@"
unfortunately my JSON data looks like this
{
"@timestamp":"2017-08-30T06:48:05.149+00:00",
"@version":1,
"message":"cancel()",
"logger_name":"ss-QuoteGenerator",
"thread_name":"reactor-http-nio-4",
"level":"INFO",
"level_value":20000,
"HOSTNAME":"weqeqe",
"app":"stream-service"
}
I had read ksql's document but not found the illustrate
There's a dependency for KQL:kql-build-tools:1.0-SNAPSHOT specified in the top-level pom.xml file, but we don't appear to have anything like that in our internal Nexus server. Commenting out the surrounding element allows things to build just fine.
I'm guessing the solution is to deploy kql-build-tools to Nexus, which sounds pretty simple, but I don't know how to do that yet (sorry) so here's an issue instead.
Hi,
I tried to let KSQL loose on a topic that contains (RSVP data from Meetup.com) as data in JSON format.
One of the fields in this dataset is called "group", which is also a keyword in KSQL (GROUP by). The parser fails with
ksql> CREATE STREAM rsvps2 (group VARCHAR) WITH (value_format='json', kafka_topic='rsvps');
line 1:23: extraneous input 'group' expecting {'ADD', 'APPROXIMATE', 'AT', 'CONFIDENCE', 'NO', 'SUBSTRING', 'POSITION', 'TINYINT', 'SMALLINT', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'ROW', 'VIEW', 'REPLACE', 'GRANT', 'REVOKE', 'PRIVILEGES', 'PUBLIC', 'OPTION', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'GRAPHVIZ', 'LOGICAL', 'DISTRIBUTED', 'TRY', 'SHOW', 'TABLES', 'SCHEMAS', 'CATALOGS', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'TO', 'SYSTEM', 'BERNOULLI', 'POISSONIZED', 'TABLESAMPLE', 'RESCALED', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'WORK', 'ISOLATION', 'LEVEL', 'SERIALIZABLE', 'REPEATABLE', 'COMMITTED', 'UNCOMMITTED', 'READ', 'WRITE', 'ONLY', 'CALL', 'NFD', 'NFC', 'NFKD', 'NFKC', 'IF', 'NULLIF', 'COALESCE', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
I tried to escape the name by using double quotes, but then KSQL seems to include those quotes in the name and never finds any data.
Issue with the CLI exec command -- docker-compose exec ksql-cli ksql-cli local --bootstrap-server kafka:29092 returned a OCI path error.
This command worked. docker-compose exec ksql-cli java -jar /usr/share/confluent/ksql-cli-0.1-SNAPSHOT-standalone.jar local --bootstrap-server kafka:29092
Hi all,
We are using siddhi now, And some of our logic can't write as sql, so we use siddhi's UDF to deal with it.
I had read ksql's document but can't find UDF, so I want know does ksql have plan to support UDF.
Refactor integration tests and enhance KSQLContext to improve testing capability
After completing the quickstart, I run SHOW TOPICS;
within KSQL and get the following output:
Kafka Topic | Registered | Partitions | Partition Replicas
-------------------------------------------------------------------
_confluent-metrics | false | 12 | 1
_schemas | false | 1 | 1
ksql__commands | true | 1 | 1
The two demonstration topics - pageviews
, and users
don't appear. Running docker container ls
from bash, I get the following output:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORT S NAMES
d2853fffdfac confluentinc/ksql-cli:latest "perl -e 'while(1)..." 13 minutes ago Up 13 minutes quickstart_ksql-cli_1
6f54121daf3e confluentinc/cp-schema-registry:latest "/etc/confluent/do..." 13 minutes ago Up 13 minutes 0.0. 0.0:8081->8081/tcp quickstart_schema-registry_1
7dedcd9fba36 confluentinc/cp-enterprise-kafka:latest "/etc/confluent/do..." 13 minutes ago Up 13 minutes 0.0. 0.0:9092->9092/tcp, 0.0.0.0:29092->29092/tcp quickstart_kafka_1
4337617d71f6 confluentinc/cp-zookeeper:latest "/etc/confluent/do..." 13 minutes ago Up 13 minutes 2181 /tcp, 2888/tcp, 3888/tcp, 0.0.0.0:32181->32181/tcp quickstart_zookeeper_1
The remedy turns out to be running docker-compose up -d
again from the /ksql/docs/quickstart
directory:
user@server:~/ksql/docs/quickstart$ docker-compose up -d
quickstart_zookeeper_1 is up-to-date
quickstart_kafka_1 is up-to-date
quickstart_schema-registry_1 is up-to-date
Starting quickstart_ksql-datagen-users_1
Starting quickstart_ksql-datagen-pageviews_1
quickstart_ksql-cli_1 is up-to-date
I then get:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
d2853fffdfac confluentinc/ksql-cli:latest "perl -e 'while(1)..." 15 minutes ago Up 15 minutes quickstart_ksql-cli_1
27a412186e20 confluentinc/ksql-examples:latest "bash -c 'echo Wai..." 15 minutes ago Up 18 seconds quickstart_ksql-datagen-pageviews_1
82ffabc651e3 confluentinc/ksql-examples:latest "bash -c 'echo Wai..." 15 minutes ago Up 17 seconds quickstart_ksql-datagen-users_1
6f54121daf3e confluentinc/cp-schema-registry:latest "/etc/confluent/do..." 15 minutes ago Up 15 minutes 0.0.0. 0:8081->8081/tcp quickstart_schema-registry_1
7dedcd9fba36 confluentinc/cp-enterprise-kafka:latest "/etc/confluent/do..." 15 minutes ago Up 15 minutes 0.0.0. 0:9092->9092/tcp, 0.0.0.0:29092->29092/tcp quickstart_kafka_1
4337617d71f6 confluentinc/cp-zookeeper:latest "/etc/confluent/do..." 15 minutes ago Up 15 minutes 2181/t cp, 2888/tcp, 3888/tcp, 0.0.0.0:32181->32181/tcp quickstart_zookeeper_1
And can see the topics from ksql:
ksql> show topics;
Kafka Topic | Registered | Partitions | Partition Replicas
-------------------------------------------------------------------
_confluent-metrics | false | 12 | 1
_schemas | false | 1 | 1
ksql__commands | true | 1 | 1
pageviews | false | 1 | 1
users | false | 1 | 1
I'm running this on Ubuntu Server 16.04.
Looks like the scripts for the pageviews and users topics need to retry creation a few times?
Given two tables
CREATE TABLE pageviews_avg_1b \
AS SELECT \
userid, \
SUM(viewtime) AS sum_viewtime, \
MIN(viewtime) AS min_viewtime, \
MAX(viewtime) AS max_viewtime, \
COUNT(*) AS view_count \
FROM pageviews \
WINDOW SESSION (60 SECONDS) GROUP BY userid;
CREATE TABLE pageviews_avg_5b \
AS SELECT \
userid, \
SUM(viewtime) AS sum_viewtime, \
MIN(viewtime) AS min_viewtime, \
MAX(viewtime) AS max_viewtime, \
COUNT(*) AS view_count \
FROM pageviews \
WINDOW SESSION (300 SECONDS) GROUP BY userid;
which individually return results
We want to compare them with a query
SELECT \
p1.view_count AS view_count_p1, \
p5.view_count AS view_count_p5 \
FROM \
pageviews_avg_1b p1 \
LEFT JOIN pageviews_avg_5b p5 \
ON p1.user_id = p5.user_id;
The query returns immediately and the user interface returns "No value present".
Is this not supported?
[2017-09-22 15:27:02,565] ERROR No value present
java.util.Optional.get(Optional.java:135)
io.confluent.ksql.physical.PhysicalPlanBuilder.buildJoin(PhysicalPlanBuilder.java:506)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:112)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:104)
io.confluent.ksql.physical.PhysicalPlanBuilder.buildProject(PhysicalPlanBuilder.java:363)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:119)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:104)
io.confluent.ksql.physical.PhysicalPlanBuilder.buildOutput(PhysicalPlanBuilder.java:137)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:127)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:104)
io.confluent.ksql.physical.PhysicalPlanBuilder.buildPhysicalPlan(PhysicalPlanBuilder.java:100)
io.confluent.ksql.QueryEngine.buildQueryPhysicalPlan(QueryEngine.java:221)
io.confluent.ksql.QueryEngine.buildPhysicalPlans(QueryEngine.java:199)
io.confluent.ksql.KsqlEngine.planQueries(KsqlEngine.java:139)
io.confluent.ksql.KsqlEngine.buildMultipleQueries(KsqlEngine.java:125)
io.confluent.ksql.rest.server.resources.streaming.QueryStreamWriter.<init>(QueryStreamWriter.java:53)
io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource.streamQuery(StreamedQueryResource.java:68)
sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81)
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144)
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:161)
org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$ResponseOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:160)
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99)
org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389)
org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347)
org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102)
org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:326)
org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)
org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)
org.glassfish.jersey.internal.Errors.process(Errors.java:315)
org.glassfish.jersey.internal.Errors.process(Errors.java:297)
org.glassfish.jersey.internal.Errors.process(Errors.java:267)
org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317)
org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:305)
org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1154)
org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:473)
org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:408)
org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:583)
org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:524)
org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:461)
org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652)
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:585)
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159)
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
org.eclipse.jetty.server.Server.handle(Server.java:499)
org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:311)
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544)
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
java.lang.Thread.run(Thread.java:748)
(io.confluent.ksql.cli.console.Console:130)
When creating a stream from kafka topic having records using avro schema values are not getting mapped to column names properly. Some field does not read values and others have first character truncated.
"ksql-cli local" worked very well when first used and after that it started hanging.
even after reboot the machine and can not fix the bug
and there is no error message and can not find the related logs
how to resolve the issue?
thanks a lot.
According to the documentation ksql supports JSON and csv as VALUE_FORMAT
.
It would be really nice to start testing the concept with higher velocity streams of protobuf data, any plans to support this?
Needs to be cleanedup with Queue passing and handling.
If Any 1 message with wrong format or blank line is published, it stops processing the entire stream/table altogether. Unless the faulty entry is removed by topic recreation. On recreating the stream/table will only fix the issue, else Ksql keeps throwing exceptions as below.
Steps to reproduce:
Create a table from another stream which is of json format message. Publish a blank line to topic, observe the exception on ksql.
`Exception in thread "ksql_query_10-f8dae1f2-2fc1-4d1e-a8d2-927da88bbab7-StreamThread-67" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=beacons-input2, partition=0, offset=3000046
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input
at [Source: [B@7f3b5dc3; line: 1, column: 0]
Caused by: com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input
at [Source: [B@7f3b5dc3; line: 1, column: 0]
at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:270)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3838)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.getGenericRow(KsqlJsonDeserializer.java:74)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:66)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:37)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56)
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
`
when I used command bin/ksql-cli
local to start ksql, it throws a FileNotException
. which told me that it can't find xxx/ksql-cli-0.1-SNAPSHOT-package/config/log4j-file.properties
. I checked the project and found that there was no config directory in ksql-cli-0.1-SNAPSHOT-package
while a etc directory after I package the project . So what can I do to solve this problem?
Existing topic, with data is mysite. All data if flowing through with two associated streams:
ksql> SHOW STREAMS;
Stream Name | Kafka Topic | Format
--------------------------------------------
BIRF_BROWSER_SESSID | mysite | JSON
RUM_TTL_SESSID | mysite | JSON
ksql> CREATE STREAM ttl_browser_request WITH (kafka_topic='browser_requests', value_format='JSON') AS SELECT r.rum_ttl AS ttl, b.brand_name AS browser, r.sessid AS sessid, b.camg_reqid AS requestid FROM rum_ttl_sessid r LEFT JOIN birf_browser_sessid b ON r.requestid=b.camg_reqid;
Invalid topology building: Topic mysite has already been registered by another source.
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas
-------------------------------------------------------------------
_confluent-metrics | false | 12 | 1
_schemas | false | 1 | 1
browser_requests | true | 2 | 1
mysite | true | 1 | 1
ksql__commands | true | 1 | 1
pageviews | false | 1 | 1
users | false | 1 | 1
ksql>
Am I reading this correctly? How can I associate ttl_browser_request
with browser_requests
.
EDIT: Formatting
After reading the documentation, I still have no idea how I'd make a connection to KSQL from another non-JVM language. Would a PostgreSQL driver for Node work to connect? The top FAQ question speaks to KSQL being useful to non-JVM language, yet there's no indication on how this is possible.
Right now, the core code base is full of redundant calls to toUpperCase() and equalsIgnoreCase() so that case insensitivity can be supported. These calls should be greatly reduced, but first we should decide on how far we actually want to go with case-insensitivity in KSQL.
I can think of two ways to approach case-sensitivity:
From what I've seen of the code base, it looks like door number 2 would be easier to implement--it might be as simple as changing SqlBase.g4 and flat-out removing virtually every call to toUpperCase and equalsIgnoreCase. However, door number 1 seems closer to idiomatic SQL syntax and may be more familiar to the data scientists who may end up using this some day.
I'm leaning slightly towards door number 2, but I'd love to hear everyone else's thoughts on the matter.
I've already implemented rough drafts of both options, which can be seen here and here for options 1 and 2, respectively, but I'd rather not open an official PR until which way we decide we want to go.
@ewencp, @hjafarpour, @blueedgenick, thoughts?
io.confluent.ksql.parser.SqlBaseParser does not exist.
In SQL, we often use self joins to obtain data from the same table multiple times in the same query, for example on parent-child relationships etc.
This statement for example
SELECT \
p1.view_count AS view_count_p1, \
p5.view_count AS view_count_p5 \
FROM \
pageviews_avg_1b p1 \
LEFT JOIN pageviews_avg_1b p5 \
ON p1.min_viewtime = p5.max_viewtime;
throws "Invalid topology building: Topic PAGEVIEWS_AVG_1B has already been registered by another source.".
Are self joins not supported yet?
The log shows
ERROR Invalid topology building: Topic PAGEVIEWS_AVG_1B has already been registered by another source.
org.apache.kafka.streams.processor.TopologyBuilder.validateTopicNotAlreadyRegistered(TopologyBuilder.java:659)
org.apache.kafka.streams.processor.TopologyBuilder.addSource(TopologyBuilder.java:538)
org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:240)
org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:182)
io.confluent.ksql.physical.PhysicalPlanBuilder.buildSource(PhysicalPlanBuilder.java:422)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:110)
io.confluent.ksql.physical.PhysicalPlanBuilder.buildJoin(PhysicalPlanBuilder.java:497)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:112)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:104)
io.confluent.ksql.physical.PhysicalPlanBuilder.buildProject(PhysicalPlanBuilder.java:363)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:119)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:104)
io.confluent.ksql.physical.PhysicalPlanBuilder.buildOutput(PhysicalPlanBuilder.java:137)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:127)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:104)
io.confluent.ksql.physical.PhysicalPlanBuilder.buildPhysicalPlan(PhysicalPlanBuilder.java:100)
io.confluent.ksql.QueryEngine.buildQueryPhysicalPlan(QueryEngine.java:221)
io.confluent.ksql.QueryEngine.buildPhysicalPlans(QueryEngine.java:199)
io.confluent.ksql.KsqlEngine.planQueries(KsqlEngine.java:139)
io.confluent.ksql.KsqlEngine.buildMultipleQueries(KsqlEngine.java:125)
io.confluent.ksql.rest.server.resources.streaming.QueryStreamWriter.<init>(QueryStreamWriter.java:53)
io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource.streamQuery(StreamedQueryResource.java:68)
when running a query in ksql such as:
SELECT * FROM users
and then when you press Ctrl-C the message "Query terminated" appears. The wording is inappropriate because the query continues to run as documented and as can be seen using "show queries".
A better wording would be: "Query output interrupted" or maybe "Query output stopped".
Hi there, I'm super excited for the potential ksql can bring. I'm curious to hear what the roadmap looks like for adding new serialization formats. In particular we would require protobuf support to integrate with our existing topics.
I have a simple select * from xyz
running (where xyz
is obtained from a simple create stream
from a topic) and if there is no data being produced into the topic, it takes 10s of seconds to get back to the CLI prompt after doing ctrl-c.
Hello i a mrunning the example of pageviews . i dont have the same behaviour in the local mode and in the standalone mode .. did i miss something ?
$ bin/ksql-cli local
.........
ksql> SELECT pageid FROM pageviews_original LIMIT 3;
Page_58
Page_30
Page_74
LIMIT reached for the partition.
Query terminated
ksql> exit
Exiting KSQL.
$ cat./ksql-examples/src/main/resources/simpleQuery.sql
SELECT pageid FROM pageviews_original LIMIT 3;
$ bin/ksql-cli standalone ./ksql-examples/src/main/resources/simpleQuery.sql
io.confluent.ksql.exception.ParseFailedException: Parsing failed on KsqlEngine msg:PAGEVIEWS_ORIGINAL does not exist.
at io.confluent.ksql.KsqlEngine.parseQueries(KsqlEngine.java:204)
at io.confluent.ksql.cli.StandaloneExecutor.executeStatements(StandaloneExecutor.java:50)
at io.confluent.ksql.cli.commands.Standalone.run(Standalone.java:74)
bestsR
phil
os:suse 12 sp2
NKG1000114449:/home/ksql/ksql-0.1.x/ksql-clickstream-demo/demo # ./ksql-tables-to-grafana.sh
Loading Clickstream-Demo TABLES to Confluent-Connect => Elastic => Grafana datasource
Logging to: /tmp/ksql-connect.log
Charting CLICK_USER_SESSIONS_TS
Charting USER_IP_ACTIVITY_TS
Charting CLICKSTREAM_STATUS_CODES_TS
Charting ENRICHED_ERROR_CODES_TS
Charting ERRORS_PER_MIN_ALERT_TS
Charting ERRORS_PER_MIN_TS
Charting EVENTS_PER_MIN_MAX_AVG_TS
Charting EVENTS_PER_MIN_TS
Charting PAGES_PER_MIN_TS
Navigate to http://localhost:3000/dashboard/db/click-stream-analysis
NKG1000114449:/home/ksql/ksql-0.1.x/ksql-clickstream-demo/demo # ./clickstream-analysis-dashboard.sh
Loading Grafana ClickStream Dashboard
{"slug":"click-stream-analysis","status":"success","version":5}
ksql> SELECT * FROM EVENTS_PER_MIN_TS LIMIT 5;
1505526580000 | 4^�� | 1505526580000 | 4 | 1 1505526580000 | 27^�
� | 1505526580000 | 27 | 2
1505526580000 | 2^�� | 1505526580000 | 2 | 1 1505526580000 | 15^�
� | 1505526580000 | 15 | 1
1505526580000 | 39^�`� | 1505526580000 | 39 | 1
LIMIT reached for the partition.
Query terminated
it seem there is no index in elasticsearch
NKG1000114449:/home/ksql/ksql-0.1.x/ksql-clickstream-demo/demo # curl -XGET 'localhost:9200/_cat/indices?v&pretty'
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
After a permission error I had to restart the ksql clickstream installation.
On re-running ksql-clickstream-demo/demo/clickstream-schema.sql, the exception
_io.confluent.ksql.exception.KafkaTopicException: Topic 'EVENTS_PER_MIN' does not conform to the requirements Partitions:1 v 4. Replication: 1 v 1
was raised. At that point, I had no tables
ksql> show tables;
Table Name | Kafka Topic | Format | Windowed
but plenty of leftover topics
ksql> show topics;
Kafka Topic | Registered | Partitions | Partition Replicas
----------------------------------------------------------------------------
_schemas | false | 1 | 1
CLICK_USER_SESSIONS | false | 1 | 1
CLICK_USER_SESSIONS_TS | false | 1 | 1
clickstream | true | 1 | 1
clickstream_codes | false | 1 | 1
CLICKSTREAM_STATUS_CODES_TS | false | 1 | 1
clickstream_users | false | 1 | 1
connect-configs | false | 1 | 1
connect-offsets | false | 25 | 1
connect-statuses | false | 5 | 1
CUSTOMER_CLICKSTREAM | false | 1 | 1
ENRICHED_ERROR_CODES | false | 1 | 1
ENRICHED_ERROR_CODES_COUNT | false | 1 | 1
ENRICHED_ERROR_CODES_TS | false | 1 | 1
ERRORS_PER_MIN | false | 1 | 1
ERRORS_PER_MIN_ALERT | false | 1 | 1
ERRORS_PER_MIN_ALERT_TS | false | 1 | 1
ERRORS_PER_MIN_TS | false | 1 | 1
EVENTS_PER_MIN | false | 1 | 1
EVENTS_PER_MIN_MAX_AVG | false | 1 | 1
EVENTS_PER_MIN_MAX_AVG_TS | false | 1 | 1
EVENTS_PER_MIN_TS | false | 1 | 1
ksql__commands | true | 1 | 1
PAGES_PER_MIN | false | 1 | 1
PAGES_PER_MIN_TS | false | 1 | 1
USER_IP_ACTIVITY | false | 1 | 1
USER_IP_ACTIVITY_TS | false | 1 | 1
That could be recovered from by removing the topic and restarting the script; however it then ran into a similar error with EVENTS_PER_MIN_TS. This time we attempted to simply increase the partitions to 4.
On the next run of the script, we now seem to fail when the EVENTS_PER_MIN table appears to be dropped - in this case we run into
_io.confluent.ksql.exception.ParseFailedException: Parsing failed on KsqlEngine msg:Cannot add the new data source. Another data source with the same name already exists: KsqlStream name:EVENTS_PER_MIN_
in the previous step, the script attempts to drop the table; when you execute that manually, you fail with
-------------------------------------------------------------------------------
io.confluent.ksql.util.KsqlException: No topic with name true was registered.
in a secenario where
ksql> show tables;
Table Name | Kafka Topic | Format | Windowed
-----------------------------------------------------
EVENTS_PER_MIN | EVENTS_PER_MIN | JSON | true
and
ksql> show topics;
Kafka Topic | Registered | Partitions | Partition Replicas
----------------------------------------------------------------------------
...
...
EVENTS_PER_MIN | false | 4 | 1
...
Are these messages expected? Is there a better way to recover once you've got a similar failure?
I read a warning about not using KSQL in a production environment. I would like to know what are the risks involved in this. I would love to use KSQL on production environment on my isolated topic with debug data not related to operational status of production, I am completely fine with KSQL not behaving as expected, would love to reassure that using KSQL is not a threat to the kafka cluster itself and other streams.
create a table:
command: CREATE table users_count as select user, count(*) from users group by user;
response: Table created and running
drop:
command: drop stream users_count;
response: Source USERS_COUNT was dropped
result: users_count table is dropped
First
I execute a create stream as select statement, but recieve a error message
ksql> CREATE STREAM test_timestamp2 WITH (kafka_topic='test_time',key='flag',partitions=3,replications=3,timestamp='genTime') as select flag,id,username,money,stringtotimestamp(create_time,'yyyy-MM-dd HH:mm:ss') as genTime, random from test1;
io.confluent.ksql.util.KsqlException: Invalid config variable in the WITH clause: KEY
then I review the source code, that the actually property is PARTITION_BY
.
The document is inconsistent with the code
Refactor core into parser, metastore and common to breakdown some of the code structure
Apologies if this is not the right channel to ask about this but I would like to know if it is possible to create a query such as:
CREATE STREAM active_users AS SELECT user_id, user_name FROM user_created_events WHERE user_id NOT IN (SELECT user_id FROM user_deleted_events)
Or somehow implement this behaviour in a different way.
I am trying to create tables from my own data via Connect / JDBC (JSON converted with default /etc/kafka/connect-standalone.properties
) and it seems the Connect's output topic that I use as a source for the KSQL CREATE TABLE statement needs a key to make it work (I had to use a ValueToKey transform in Connect so the topic's key is not null). Otherwise, the table is created but the SELECT * FROM table hangs.
I have also tried the WITH KEY option at table creation, without success. If I use CREATE STREAM I don't need this key.
Can you confirm or infirm this need of a key in the source topic for table creation ?
Given a BIGINT column, or an integer value, null is returned in the result set unless a mathematical operation is applied
For example, looking at the example pageviews stream
ksql> select viewtime, abs(viewtime), abs (viewtime+0.0), abs (-1), abs (1), abs (-1.1), round (1), floor (1), ceil (1) from pageviews;
1506088116031 | null | 1.506088116031E12 | null | null | 1.1 | null | null | null
We would expect values to be returned for all columns - even if a value is already a absolute number (in the case of ABS) or an Integer (in the case of FLOOR/CEIL/ROUND)
The CLI log shows:
[2017-09-22 13:48:36,283] ERROR Error calculating column with index 1 : KSQL_COL_1 (io.confluent.ksql.structured.SchemaKStream:180)
[2017-09-22 13:48:36,284] ERROR Error calculating column with index 3 : KSQL_COL_3 (io.confluent.ksql.structured.SchemaKStream:180)
[2017-09-22 13:48:36,284] ERROR Error calculating column with index 4 : KSQL_COL_4 (io.confluent.ksql.structured.SchemaKStream:180)
[2017-09-22 13:48:36,284] ERROR Error calculating column with index 6 : KSQL_COL_6 (io.confluent.ksql.structured.SchemaKStream:180)
[2017-09-22 13:48:36,284] ERROR Error calculating column with index 7 : KSQL_COL_7 (io.confluent.ksql.structured.SchemaKStream:180)
[2017-09-22 13:48:36,284] ERROR Error calculating column with index 8 : KSQL_COL_8 (io.confluent.ksql.structured.SchemaKStream:180)
I figured the stop command, but after going through the demo, how to clean Grafana entries and stop the ksql server were the first things I looked but couldn't find in the doc.
Hi all,
I'm trying to run 'ksql-cli local' using another kafka server instance which runs on a different port than the default one (port 10092) and is not working.
Everytime when ksql-cli starts it uses the default port for the bootstrap.server (TCP:9092), even that in the config file I set another port (TCP:10092).
The setting was made after the ksql compilation and also before the compilation and in the both cases the result was the same.
The contents of the ksqlserver.properties config file:
cat config/ksqlserver.properties
bootstrap.servers=localhost:10092
application.id=ksql_server_quickstart
ksql.command.topic.suffix=commands
listeners=http://localhost:8080
The output of ./bin/ksql-run-class io.confluent.ksql.Ksql "local":
(io.confluent.ksql.util.KsqlConfig:223)
[2017-10-03 17:30:22,204] INFO KsqlConfig values:
application.id = ksql_
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10000000
client.id =
commit.interval.ms = 2000
connections.max.idle.ms = 540000
default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
key.serde = null
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 4
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
timestamp.extractor = null
value.serde = null
windowstore.changelog.additional.retention.ms = 86400000
zookeeper.connect =
On creating tables in the CLI, we ran into
io.confluent.ksql.exception.KafkaResponseGetFailedException: Failed to guarantee existence of topic PAGEVIEWS_AVG_1
when creating a table like so:
CREATE TABLE pageviews_avg_1b \
AS SELECT \
userid, \
SUM(viewtime) AS sum_viewtime, \
MIN(viewtime) AS min_viewtime, \
MAX(viewtime) AS max_viewtime, \
COUNT(*) AS view_count \
FROM pageviews \
WINDOW SESSION (60 SECONDS) GROUP BY userid;
If we re-issued that statement, the table would create fine and SELECT worked.
This could be reproduced within the session (CREATE TABLE pageviews_avg_1c etc) , and also happened on another CLI session by another user. Exiting the CLI and starting it again healed the issue, and the tables then created straight away.
After a while, the issue started happening again in the session where it just worked
The CLI log showed
[2017-09-22 14:35:34,795] ERROR io.confluent.ksql.exception.KafkaResponseGetFailedException: Failed to guarantee existence of topic PAGEVIEWS_AVG_1A
at io.confluent.ksql.util.KafkaTopicClientImpl.createTopic(KafkaTopicClientImpl.java:63)
at io.confluent.ksql.structured.SchemaKStream.createSinkTopic(SchemaKStream.java:336)
at io.confluent.ksql.structured.SchemaKTable.into(SchemaKTable.java:70)
at io.confluent.ksql.structured.SchemaKTable.into(SchemaKTable.java:51)
at io.confluent.ksql.physical.PhysicalPlanBuilder.buildOutput(PhysicalPlanBuilder.java:208)
at io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:127)
at io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:104)
at io.confluent.ksql.physical.PhysicalPlanBuilder.buildPhysicalPlan(PhysicalPlanBuilder.java:100)
at io.confluent.ksql.QueryEngine.buildQueryPhysicalPlan(QueryEngine.java:221)
at io.confluent.ksql.QueryEngine.buildPhysicalPlans(QueryEngine.java:199)
at io.confluent.ksql.KsqlEngine.planQueries(KsqlEngine.java:139)
at io.confluent.ksql.KsqlEngine.buildMultipleQueries(KsqlEngine.java:125)
at io.confluent.ksql.rest.server.computation.StatementExecutor.startQuery(StatementExecutor.java:331)
at io.confluent.ksql.rest.server.computation.StatementExecutor.executeStatement(StatementExecutor.java:274)
at io.confluent.ksql.rest.server.computation.StatementExecutor.handleStatementWithTerminatedQueries(StatementExecutor.java:215)
at io.confluent.ksql.rest.server.computation.StatementExecutor.handleStatement(StatementExecutor.java:114)
at io.confluent.ksql.rest.server.computation.CommandRunner.executeStatement(CommandRunner.java:109)
at io.confluent.ksql.rest.server.computation.CommandRunner.run(CommandRunner.java:75)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'PAGEVIEWS_AVG_1A' already exists.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:213)
at io.confluent.ksql.util.KafkaTopicClientImpl.createTopic(KafkaTopicClientImpl.java:61)
... 18 more
Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'PAGEVIEWS_AVG_1A' already exists.
(io.confluent.ksql.rest.server.computation.StatementExecutor:220)
It would be easy to implement web-client for KSQL-Server if SSE and CORS are implemented.
SSE provides better abstraction to represent streaming structured data (JSON) in the browser or server-side (WebClient).
CORS helps to deploy KSQL-Server and Web-Clients on separate hosts.
First pass, rename ksql-core to ksql-engine
Once you start a ksql-server, you can do cool REST API calls such as
Running a simple request
## Simple KSQL
curl -X "POST" "http://server:port/ksql";; \
-H "Content-Type: application/json; charset=utf-8" \
-d $'{
"ksql": "LIST STREAMS;",
"streamsProperties": {}
}'
or retrieving streaming data
## Stream Query
curl -X "POST" "http://server:port/query";; \
-H "Content-Type: application/json; charset=utf-8" \
-d $'{
"ksql": "SELECT * FROM TEST_A;",
"streamsProperties": {}
}'
This is a neat way to query KSQL from elsewhere, particularly for ad-hoc information. Even better when it runs under https. Some authorization/authentication capabilities in future would also be valuable.
In the documentation, the REST component is only getting a short mention, and it's used internally within the CLI and on quickstart.html. Is it planned to support it in the production release? #
It's natural to want to create a stream, but what data exists in a topic? It may be completely free form without a schema.
How do I sample the topic a bit to see what data is coming through in order to know what I want to create?
Perhaps to start just something simple like:
select * from mytopic limit 5;
A more powerful query would be something like:
select random_sample(*, 100, 0) from mytopic;
selecting all the columns and 100 randomly sampled messages starting at offset 0
I have few issues running ksql-server-start
in Docker Env.
ksql-server-start /etc/ksql/ksqlserver.properties
KSQL_CONFIG_DIR
Environment variable.bootstrap.servers
property via Environment variable. STREAMS_BOOTSTRAP_SERVERS
has no effect.inconsistent with statement at https://github.com/confluentinc/ksql/blob/master/docs/concepts.md
Use with default settings:
./bin/ksql-server-start
version: '2.1'
services:
zookeeper:
image: confluentinc/cp-zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
volumes:
- zookeeper:/var/lib/zookeeper
networks:
- reactive-network
kafka:
image: confluentinc/cp-kafka
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_NUM_PARTITIONS: 1
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_DELETE_TOPIC_ENABLE: "true"
volumes:
- kafka:/var/lib/kafka
networks:
- reactive-network
links:
- zookeeper
depends_on:
- zookeeper
ksql-server:
image: confluentinc/ksql-cli
ports:
- 9098:8080
networks:
- reactive-network
links:
- kafka
depends_on:
- kafka
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: kafka:9092
entrypoint: 'ksql-server-start /etc/ksql/ksqlserver.properties'
networks:
reactive-network:
driver: bridge
volumes:
zookeeper:
kafka:
Currently, the LIST STREAMS;
statement lists both streams and tables, and includes a column in the output that specifies which each of the listed items is which. Couldn't we limit the LIST STREAMS;
statement to only listing streams, and add a new LIST TABLES;
statement which then lists tables (and also includes statestore information as well)? @hjafarpour maybe you can provide some insight into why LIST STREAMS;
works as it currently does?
The following test fails with a NullPointerException when added to the KQLParserTest.java class:
@Test
public void testMultipleJoins() throws Exception {
MetaStore tempMetaStore = new MetaStoreImpl();
for (KQLTopic kqlTopic : metaStore.getAllKQLTopics().values()) {
tempMetaStore.putTopic(kqlTopic);
}
for (StructuredDataSource dataSource : metaStore.getAllStructuredDataSources().values()) {
tempMetaStore.putSource(dataSource);
}
SchemaBuilder schemaBuilder = SchemaBuilder.struct()
.field("COL0", SchemaBuilder.INT64_SCHEMA)
.field("COL1", SchemaBuilder.FLOAT64_SCHEMA)
.field("COL2", SchemaBuilder.STRING_SCHEMA)
.field("COL3", SchemaBuilder.INT64_SCHEMA);
KQLTopic
kqlTopic =
new KQLTopic("TEST3", "test3", new KQLJsonTopicSerDe());
KQLStream kqlStream = new KQLStream("TEST3", schemaBuilder, schemaBuilder.field("COL0"),
kqlTopic);
tempMetaStore.putTopic(kqlTopic);
tempMetaStore.putSource(kqlStream);
String
queryStr =
"SELECT t1.col1, t2.col2, t3.col3 FROM "
+ "(test1 t1 LEFT JOIN test2 t2 ON t1.col1 = t2.col1) "
+ "LEFT JOIN test3 t3 ON t1.col1 = t3.col1;";
Statement statement = kqlParser.buildAST(queryStr, tempMetaStore).get(0);
Assert.assertTrue("testSimpleQuery fails", statement instanceof Query);
Query query = (Query) statement;
Assert.assertTrue("testTripleJoin fails", query.getQueryBody() instanceof QuerySpecification);
}
The problem appears to be in the DataSourceExtractor.visitAliasedRelation() method, where it is assumed that the relationPrimary
component of an aliasedRelation
rule is a qualifiedName
, when in fact it can also be another relation
contained between parentheses, like in (test1 t1 LEFT JOIN test2 t2 ON t1.col1 = t2.col1)
. The result is that this relationPrimary
is then visited as a joinRelation
, which causes the visitJoinRelation() method to be called, which then returns null
all the way up the call chain back to the original invocation of visitAliasedRelation
, which then causes the NPE when the result (assumed to be a Table
, actually null
) is accessed.
Initially we ran into this on on of our own queries. We have reproduced it with the pageviews table from the quickstart:
CREATE STREAM pageviews \
(viewtime BIGINT, \
userid VARCHAR, \
pageid VARCHAR) \
WITH (kafka_topic='pageviews', \
value_format='DELIMITED', \
key='pageid', \
timestamp='viewtime');
create table pageviews_avg as select userid, sum(viewtime) / count(*) as avg_viewtime, min(viewtime) as min_viewtime, max(viewtime) as max_viewtime, count(*) as view_count from pageviews window session (60 seconds) group by userid;
SELECT p.userid, p.viewtime, pa.avg_viewtime FROM pageviews p LEFT JOIN pageviews_avg pa ON p.userid = pa.userid;
On the SELECT statement, KSQL errors with
java.lang.String cannot be cast to org.apache.kafka.streams.kstream.Windowed
Query terminated
and the CLI log shows these errors
[2017-09-22 12:23:20,633] ERROR stream-thread [ksql_transient_7722472958265653440_1506082995954-66bcd070-20e1-4769-a599-43dfcaa2d723-StreamThread-47] Streams application error during processing: (org.apache.kafka.streams.processor.internals.StreamThread:527)
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.streams.kstream.Windowed
at org.apache.kafka.streams.kstream.internals.WindowedSerializer.serialize(WindowedSerializer.java:33)
at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:173)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
at org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:113)
at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:190)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:671)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:549)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
[2017-09-22 12:23:20,660] WARN stream-thread [ksql_transient_7722472958265653440_1506082995954-66bcd070-20e1-4769-a599-43dfcaa2d723-StreamThread-47] Unexpected state transition from RUNNING to DEAD. (org.apache.kafka.streams.processor.internals.StreamThread:985)
[2017-09-22 12:23:21,028] ERROR Exception occurred while writing to connection stream: (io.confluent.ksql.rest.server.resources.streaming.QueryStreamWriter:105)
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.streams.kstream.Windowed
at org.apache.kafka.streams.kstream.internals.WindowedSerializer.serialize(WindowedSerializer.java:33)
at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:173)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
at org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:113)
at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:190)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:671)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:549)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
[2017-09-22 12:23:21,032] WARN stream-thread [ksql_transient_7722472958265653440_1506082995954-66bcd070-20e1-4769-a599-43dfcaa2d723-StreamThread-47] Unexpected state transition from DEAD to PENDING_SHUTDOWN. (org.apache.kafka.streams.processor.internals.StreamThread:985)
[2017-09-22 12:23:21,036] ERROR java.lang.String cannot be cast to org.apache.kafka.streams.kstream.Windowed
org.apache.kafka.streams.kstream.internals.WindowedSerializer.serialize(WindowedSerializer.java:33)
org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:173)
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:113)
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:190)
org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:671)
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:549)
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
(io.confluent.ksql.cli.console.Console:130)
it would be great to have an ability to get KSQL definition of existing tables and streams.
use case: experimenting in development and moving structures to production
We had to cycle our ksql dev box. Sadly KSQL hadn't been played with for a few days; however after the server maintenance we attempted to go back into the CLI in local mode (non-dockerized) which did not show any response or error after launching it.
The CLI started to repeat
[2017-09-21 15:17:33,781] WARN No records received after 30 seconds of polling; something may be wrong (io.confluent.ksql.rest.server.computation.CommandStore:153)
After updating our build to the latest release the issue persisted. We then deleted test topics in this order
ksql_transient*
ksql_transient_528162554338760763_1505245884471-TEST_Atransient_-changelog
ksql_transient_528162554338760763_1505245884471-TEST_Atransient_-repartition
ksql_transient_6259632277308348632_1505245661426-TEST_Atransient_-changelog
ksql_transient_6259632277308348632_1505245661426-TEST_Atransient_-repartition
ksql_transient_7529857592400492031_1505246139152-TEST_Atransient_-changelog
ksql_transient_7529857592400492031_1505246139152-TEST_Atransient_-repartition
ksql_query_1-KSQL_Agg_Query*
ksql_query_1-KSQL_Agg_Query_1505245639135-changelog
ksql_query_1-KSQL_Agg_Query_1505245639135-repartition
and finally
ksql__commands
The CLI was stopped and started after each of the 3 groups of topics. After deleting ksql_commands (the topic was empty) and restarting the CLI we got to the Welcome Screen and command prompt again.
A full log file is here ksql_hung.txt
I could not reproduce the issue after that, will add info if a pattern emerges.
Compile SqlBase.g4 with ANTLR.
but, found and less SqlBaseBaseVisitor. I still lack what?
When compiling/packaging using mvn
, it is not possible to skip compiling and running the unit tests (via mvn package -Dmaven.test.skip=true
) as attempts to do so yield the following:
[ERROR] Failed to execute goal on project ksql-cli: Could not resolve dependencies for project io.confluent.ksql:ksql-cli:jar:0.1-SNAPSHOT: Could not find artifact io.confluent.ksql:ksql-core:jar:tests:0.1-SNAPSHOT in confluent (http://packages.confluent.io/maven/) -> [Help 1]
It's a bit tedious to have to run the unit tests each time as they take a few minutes for each run.
Let me know if there's a way to get around having to sit through the test execution each time.
Thanks.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.