Comments (13)
@AleksandarTokarev This looks like duplicates that happen due to the at-least-once semantics (ALOS) KSQL guarantees by default. Under ALOS, when a failure happens records might be written multiple times to an aggregate or to an output topic [1]. To solve your issue, you could try to run the application under exactly-once semantics (EOS). I am wondering whether in your specific it would suffice to use COLLECT_SET
[2] instead of COLLECT_LIST
.
[1] https://docs.ksqldb.io/en/latest/operate-and-deploy/exactly-once-semantics/#at-least-once-semantics
[2] https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/aggregate-functions/#collect_set
from ksql.
@cadonna Is it possible that the items are being retained with COLLECT_LIST
? Because those stale/old items are not in the ITEMS_TABLE
table?
Also these ALOS semantics would be on the application level that writes to the ITEMS_TABLE
I assume?
And also how would COLLECT_SET
know which comparator to use and how to pick the latest item (in our case with the newest updatedTIme)?
from ksql.
Now, I realized that updatedTime
is different for the "duplicates". I am sorry for that!
Could you please post a minimal example that reproduces what you experience?
More specifically:
- the
CREATE
-statement forITEMS_TABLE
(BTW, in step 2 above it should beITEMS_TABLE
, right?) - when to restart ksql to reproduce the situation
from ksql.
Yes, time updatedTime
is different for the records, I guess you missed that.
The actual CREATE
statement above is for the ITEMS_PER_USER_TABLE
(pasting it below)- I will need to find the one for ITEMS_TABLE
CREATE or replace table ITEMS_PER_USER_TABLE
WITH (PARTITIONS = 12, VALUE_FORMAT = 'JSON') AS
SELECT userId, COLLECT_LIST(
STRUCT("name":=name, "price":=price, "id":=id, "updatedTime"=:updatedTime))
AS items
FROM ITEMS_TABLE
GROUP BY userId
EMIT CHANGES;
These duplicates/retaining
of items occurs when we have KSQLDB issues - and what is more strange to me is that even if the issues are there for few moments - the stale
records would be gone from ITEMS_TABLE
- and the actual new records are constantly being updated (therefore the updatedTime
) - my assumption is that these duplicates/retaining
would go away - but they are not - they are staying - but the ones updated are being updated.
from ksql.
At the moment, I do not understand why the updatedTime
of the duplicates have values that do not exist in ITEMS_TABLE
.
from ksql.
They do not exist because I have not put them above (and they do not exist in the beginning) - with time the items are being updated in ITEMS_TABLE
.
After some time - they exist - but they old ones are gone from the ITEMS_TABLE
. And the actual main issue that we have is with ITEMS_PER_USER_TABLE
- because the new ones are there and updating - but we are seeing some stale ones (which in my opinion should not even appear because the COLLECT_LIST
is collecting the items from the ITEMS_TABLE
from ksql.
I see. We need a minimal example to reproduce the situation, otherwise it is just guessing. I tried to repro it but was not able.
from ksql.
What else can I provide in order to be more helpful?
from ksql.
Regarding the logs in our datadog system that I see for ksql when these issues occur - they are not really helpful - let me know if anything makes sense to you. I can try to look for more logs/data
from ksql.
ok @cadonna some more followup on the issues.
Here are the CREATE
table statements for the other tables
ITEMS_TABLE
CREATE OR REPLACE TABLE ITEMS_TABLE WITH (CLEANUP_POLICY=‘compact’, KAFKA_TOPIC=‘ITEMS_TABLE’,
PARTITIONS=12, REPLICAS=3, RETENTION_MS=600000) AS SELECT * FROM ITEMS_TABLE_INITIAL EMIT CHANGES;
It seems there is one more table that is backing up this ITEMS_TABLE
- initial one - here is the DDL
ITEMS_TABLE_INITIAL
CREATE TABLE ITEMS_TABLE_INITIAL (ID STRING PRIMARY KEY, USERID BIGINT, NAME STRING, PRICE DOUBLE, UPDATEDTIME) WITH (KAFKA_TOPIC=‘items.active’, KEY_FORMAT=‘KAFKA’, VALUE_FORMAT=‘JSON’);
Also it seems that these issues have occurred when there was OOM in the KSQLDB - here is one of the logs when OOM occurred -
Process _confluent-ksql (pid: 1886306) triggered an OOM kill on itself. The process had reached 2560000 pages in size.
This OOM kill was invoked by a cgroup, containerID: xxxxx.
Few things that come on top of my mind:
- The actual is OOM is concerning - this should not happen in the first place. We might want to increased the RAM
- Do you think the issue can come from the fact that we are not using the
INITIAL
table - instead we are using the one created from it?
Thanks a lot
from ksql.
@AleksandarTokarev The best thing would be, if you could provide a reproduction with a docker-compose setup. You can find a docker-compose setup that you can re-use under the following link: https://ksqldb.io/quickstart.html#quickstart-content
Once we can reproduce the issue, it gets easier to find the root cause. Otherwise, it is a lot of crystal balling.
from ksql.
@cadonna we have added exactly once as a server parameter and it was good for like 2 weeks. Today we upgraded to ARM instances and during the upgrade there was an issue with the volumes/resources and the bug occurred again.
We are thinking of writing our custom UDAF where we would be dedupping
the LIST in the aggregate part.
Is there a proper example on how to do that? Or maybe some further hints on how to do it.
Also i am not sure if i understand - but how are the merge
and undo
functions used in the COLLECT_LIST example?
https://github.com/confluentinc/ksql/blob/master/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/array/CollectListUdaf.java#L104-L116
from ksql.
@cadonna I think i have finally found a way how to reproduce this.
The reproducing does not happen always at the same time - but it pretty much happens often.
docker compose file
version: "3"
networks:
kafka-net:
name: kafka-net
driver: bridge
services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.8
container_name: zookeeper
restart: unless-stopped
networks:
- kafka-net
ports:
- "2181:2181"
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
networks:
- kafka-net
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:29092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
depends_on:
- kafka
kafka:
image: docker.io/bitnami/kafka:3
container_name: kafka
restart: unless-stopped
networks:
- kafka-net
ports:
- "9092:9092"
environment:
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_INTERNAL:PLAINTEXT,DOCKER_EXTERNAL:PLAINTEXT
KAFKA_CFG_LISTENERS: DOCKER_INTERNAL://:29092,DOCKER_EXTERNAL://:9092
KAFKA_CFG_ADVERTISED_LISTENERS: DOCKER_INTERNAL://kafka:29092,DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: DOCKER_INTERNAL
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CFG_BROKER_ID: 1
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
depends_on:
- zookeeper
kafka-ksqldb:
image: confluentinc/ksqldb-server:0.29.0
container_name: kafka-ksqldb
restart: unless-stopped
networks:
- kafka-net
ports:
- "8088:8088"
environment:
KSQL_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:29092
KSQL_LISTENERS: http://0.0.0.0:8088/
KSQL_KSQL_SERVICE_ID: kafka-ksqldb
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_EXTENSION_DIR: "/data/udfs"
volumes:
- ./data/kafka-ksqldb-data/scripts:/data/scripts/
- ./data/kafka-ksqldb-data/udfs:/data/udfs/
depends_on:
- kafka
Steps:
a) Create topic users.items
b) Create table on top of the topic
CREATE TABLE USERS_ITEMS_INITIAL (F_KEY STRING PRIMARY KEY, USERID BIGINT, NAME STRING, DESCRIPTION STRING, AMOUNT DOUBLE, UPDATED BIGINT) WITH (KAFKA_TOPIC='users.items', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');
c) Use a COLLECT_LIST
CREATE TABLE IF NOT EXISTS ITEMS_PER_USER_TABLE WITH (KAFKA_TOPIC='ITEMS_PER_USER_TABLE', PARTITIONS=3, VALUE_FORMAT='JSON') AS SELECT USERS_ITEMS_INITIAL.USERID USERID,
COLLECT_LIST(STRUCT(`NAME`:=USERS_ITEMS_INITIAL.NAME, `DESCRIPTION`:=USERS_ITEMS_INITIAL.DESCRIPTION, `AMOUNT`:=USERS_ITEMS_INITIAL.AMOUNT, `UPDATED`:=USERS_ITEMS_INITIAL.UPDATED)) ACTIVE_ITEMS
FROM USERS_ITEMS_INITIAL
GROUP BY USERS_ITEMS_INITIAL.USERID
EMIT CHANGES;
Do some insertions in the users.items
topic
Key
10-T-TOM
Value
{
"name": "TOMATOES",
"description": "They are tasty",
"amount": 1,
"updated": 1905906822314,
"userId": 10
}
Key
10-T-TOM
Value
{
"name": "TOMATOES",
"description": "They are tasty",
"amount": 2,
"updated": 1925906822314,
"userId": 10
}
Key
10-T-TOM
Value
{
"name": "TOMATOES",
"description": "They are tasty",
"amount": 3,
"updated": 1935906822314,
"userId": 10
}
Key
10-T-POT
Value
{
"name": "Potatoes",
"description": "They are tasty too",
"amount": 0,
"updated": 1905906822314,
"userId": 10
}
Key
10-T-POT
Value
{
"name": "Potatoes",
"description": "They are tasty too",
"amount": 1,
"updated": 1915906822314,
"userId": 10
}
After the last one - i seem to have ended with 2 Potatoes records in the resulting table - which is not what I want
{
"ACTIVE_ITEMS": [
{
"NAME": "TOMATOES",
"DESCRIPTION": "They are tasty",
"AMOUNT": 3,
"UPDATED": 1935906822314
},
{
"NAME": "Potatoes",
"DESCRIPTION": "They are tasty too",
"AMOUNT": 0,
"UPDATED": 1905906822314
},
{
"NAME": "Potatoes",
"DESCRIPTION": "They are tasty too",
"AMOUNT": 1,
"UPDATED": 1915906822314
}
]
}
DISCLAIMER: This does not happen always and for the same records - it is kinda intermittent.
Would gladly appreciate the help
UPDATE 1: It seems this happens when same message key goes into different partitions. I wonder why and if this could happen in the failure scenarios (like OOM, volume issues, etc)
from ksql.
Related Issues (20)
- InvalidTimestampException when run emit changes queries HOT 2
- KSQL_KSQL_HEARTBEAT_ENABLE settings inquiry HOT 2
- UDAF with GROUP BY not working properly - NullPointerException HOT 1
- RETENTION_MS config mandatory?
- Data Discrepancy in Tumbling Windowed Table Creation from IoT Data Stream HOT 1
- Unable to verify if the value schema for topic: is compatible with ksqlDB HOT 1
- release 7.6.0-137 missing HOT 1
- Unable to Locate the packages in maven repository HOT 1
- Inconsistent results in pull queries with distributed KsqlDB setup HOT 1
- KSQL application for denormalizing data for data warehouses
- KSQLDB is throwing Error as "The group is Not Empty"
- Persistent query silently fails / is deleted, but still shows as RUNNING
- Testing tool missing from 7.6.0 release
- `ksql-migrations` returns an error to the shell when there are no eligible migrations
- Table Pull Query Scan: Add 'IS (NOT) NULL' Filter HOT 1
- Unable to verify if the value schema for topic -- PROTOBUF -- Reason: null HOT 1
- Streaming ETL pipeline Tutorial Error
- KSQL Configuration with Kafka Connect with Authentication HOT 1
- KSQL with authenticated kafka connect not documented
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from ksql.