Giter VIP home page Giter VIP logo

Comments (13)

cadonna avatar cadonna commented on May 30, 2024

@AleksandarTokarev This looks like duplicates that happen due to the at-least-once semantics (ALOS) KSQL guarantees by default. Under ALOS, when a failure happens records might be written multiple times to an aggregate or to an output topic [1]. To solve your issue, you could try to run the application under exactly-once semantics (EOS). I am wondering whether in your specific it would suffice to use COLLECT_SET [2] instead of COLLECT_LIST.

[1] https://docs.ksqldb.io/en/latest/operate-and-deploy/exactly-once-semantics/#at-least-once-semantics
[2] https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/aggregate-functions/#collect_set

from ksql.

AleksandarTokarev avatar AleksandarTokarev commented on May 30, 2024

@cadonna Is it possible that the items are being retained with COLLECT_LIST ? Because those stale/old items are not in the ITEMS_TABLE table?

Also these ALOS semantics would be on the application level that writes to the ITEMS_TABLE I assume?
And also how would COLLECT_SET know which comparator to use and how to pick the latest item (in our case with the newest updatedTIme)?

from ksql.

cadonna avatar cadonna commented on May 30, 2024

Now, I realized that updatedTime is different for the "duplicates". I am sorry for that!
Could you please post a minimal example that reproduces what you experience?
More specifically:

  • the CREATE-statement for ITEMS_TABLE (BTW, in step 2 above it should be ITEMS_TABLE, right?)
  • when to restart ksql to reproduce the situation

from ksql.

AleksandarTokarev avatar AleksandarTokarev commented on May 30, 2024

Yes, time updatedTime is different for the records, I guess you missed that.
The actual CREATE statement above is for the ITEMS_PER_USER_TABLE (pasting it below)- I will need to find the one for ITEMS_TABLE

CREATE or replace table ITEMS_PER_USER_TABLE
    WITH (PARTITIONS = 12, VALUE_FORMAT = 'JSON') AS
SELECT userId, COLLECT_LIST(
        STRUCT("name":=name, "price":=price, "id":=id, "updatedTime"=:updatedTime)) 
        AS items
FROM ITEMS_TABLE
GROUP BY userId
EMIT CHANGES;

These duplicates/retaining of items occurs when we have KSQLDB issues - and what is more strange to me is that even if the issues are there for few moments - the stale records would be gone from ITEMS_TABLE - and the actual new records are constantly being updated (therefore the updatedTime) - my assumption is that these duplicates/retaining would go away - but they are not - they are staying - but the ones updated are being updated.

from ksql.

cadonna avatar cadonna commented on May 30, 2024

At the moment, I do not understand why the updatedTime of the duplicates have values that do not exist in ITEMS_TABLE.

from ksql.

AleksandarTokarev avatar AleksandarTokarev commented on May 30, 2024

They do not exist because I have not put them above (and they do not exist in the beginning) - with time the items are being updated in ITEMS_TABLE.
After some time - they exist - but they old ones are gone from the ITEMS_TABLE. And the actual main issue that we have is with ITEMS_PER_USER_TABLE - because the new ones are there and updating - but we are seeing some stale ones (which in my opinion should not even appear because the COLLECT_LIST is collecting the items from the ITEMS_TABLE

from ksql.

cadonna avatar cadonna commented on May 30, 2024

I see. We need a minimal example to reproduce the situation, otherwise it is just guessing. I tried to repro it but was not able.

from ksql.

AleksandarTokarev avatar AleksandarTokarev commented on May 30, 2024

What else can I provide in order to be more helpful?

from ksql.

AleksandarTokarev avatar AleksandarTokarev commented on May 30, 2024

Regarding the logs in our datadog system that I see for ksql when these issues occur - they are not really helpful - let me know if anything makes sense to you. I can try to look for more logs/data
image

from ksql.

AleksandarTokarev avatar AleksandarTokarev commented on May 30, 2024

ok @cadonna some more followup on the issues.
Here are the CREATE table statements for the other tables
ITEMS_TABLE

CREATE OR REPLACE TABLE ITEMS_TABLE WITH (CLEANUP_POLICY=‘compact’, KAFKA_TOPIC=‘ITEMS_TABLE’, 
PARTITIONS=12, REPLICAS=3, RETENTION_MS=600000) AS SELECT * FROM ITEMS_TABLE_INITIAL EMIT CHANGES;

It seems there is one more table that is backing up this ITEMS_TABLE - initial one - here is the DDL
ITEMS_TABLE_INITIAL

CREATE TABLE ITEMS_TABLE_INITIAL (ID STRING PRIMARY KEY, USERID BIGINT, NAME STRING, PRICE DOUBLE, UPDATEDTIME) WITH (KAFKA_TOPIC=‘items.active’, KEY_FORMAT=‘KAFKA’, VALUE_FORMAT=‘JSON’);

Also it seems that these issues have occurred when there was OOM in the KSQLDB - here is one of the logs when OOM occurred -

Process _confluent-ksql (pid: 1886306) triggered an OOM kill on itself. The process had reached 2560000 pages in size.
This OOM kill was invoked by a cgroup, containerID: xxxxx.

Few things that come on top of my mind:

  1. The actual is OOM is concerning - this should not happen in the first place. We might want to increased the RAM
  2. Do you think the issue can come from the fact that we are not using the INITIAL table - instead we are using the one created from it?

Thanks a lot

from ksql.

cadonna avatar cadonna commented on May 30, 2024

@AleksandarTokarev The best thing would be, if you could provide a reproduction with a docker-compose setup. You can find a docker-compose setup that you can re-use under the following link: https://ksqldb.io/quickstart.html#quickstart-content
Once we can reproduce the issue, it gets easier to find the root cause. Otherwise, it is a lot of crystal balling.

from ksql.

AleksandarTokarev avatar AleksandarTokarev commented on May 30, 2024

@cadonna we have added exactly once as a server parameter and it was good for like 2 weeks. Today we upgraded to ARM instances and during the upgrade there was an issue with the volumes/resources and the bug occurred again.
We are thinking of writing our custom UDAF where we would be dedupping the LIST in the aggregate part.
Is there a proper example on how to do that? Or maybe some further hints on how to do it.

Also i am not sure if i understand - but how are the merge and undo functions used in the COLLECT_LIST example?
https://github.com/confluentinc/ksql/blob/master/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/array/CollectListUdaf.java#L104-L116

from ksql.

AleksandarTokarev avatar AleksandarTokarev commented on May 30, 2024

@cadonna I think i have finally found a way how to reproduce this.
The reproducing does not happen always at the same time - but it pretty much happens often.

docker compose file

version: "3"

networks:
  kafka-net:
    name: kafka-net
    driver: bridge

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.8
    container_name: zookeeper
    restart: unless-stopped
    networks:
      - kafka-net
    ports:
      - "2181:2181"
    environment:
      ALLOW_ANONYMOUS_LOGIN: "yes"

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - 8080:8080
    networks:
      - kafka-net
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:29092
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
    depends_on:
      - kafka

  kafka:
    image: docker.io/bitnami/kafka:3
    container_name: kafka
    restart: unless-stopped
    networks:
      - kafka-net
    ports:
      - "9092:9092"
    environment:
      ALLOW_PLAINTEXT_LISTENER: "yes"
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_INTERNAL:PLAINTEXT,DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_CFG_LISTENERS: DOCKER_INTERNAL://:29092,DOCKER_EXTERNAL://:9092
      KAFKA_CFG_ADVERTISED_LISTENERS: DOCKER_INTERNAL://kafka:29092,DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: DOCKER_INTERNAL
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CFG_BROKER_ID: 1
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
    depends_on:
      - zookeeper

  kafka-ksqldb:
    image: confluentinc/ksqldb-server:0.29.0
    container_name: kafka-ksqldb
    restart: unless-stopped
    networks:
      - kafka-net
    ports:
      - "8088:8088"
    environment:
      KSQL_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:29092
      KSQL_LISTENERS: http://0.0.0.0:8088/
      KSQL_KSQL_SERVICE_ID: kafka-ksqldb
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_EXTENSION_DIR: "/data/udfs"
    volumes:
      - ./data/kafka-ksqldb-data/scripts:/data/scripts/
      - ./data/kafka-ksqldb-data/udfs:/data/udfs/
    depends_on:
      - kafka

Steps:
a) Create topic users.items
b) Create table on top of the topic

CREATE TABLE USERS_ITEMS_INITIAL (F_KEY STRING PRIMARY KEY, USERID BIGINT, NAME STRING, DESCRIPTION STRING, AMOUNT DOUBLE, UPDATED BIGINT) WITH (KAFKA_TOPIC='users.items', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');

c) Use a COLLECT_LIST

CREATE TABLE IF NOT EXISTS ITEMS_PER_USER_TABLE WITH (KAFKA_TOPIC='ITEMS_PER_USER_TABLE', PARTITIONS=3, VALUE_FORMAT='JSON') AS SELECT USERS_ITEMS_INITIAL.USERID USERID,
  COLLECT_LIST(STRUCT(`NAME`:=USERS_ITEMS_INITIAL.NAME, `DESCRIPTION`:=USERS_ITEMS_INITIAL.DESCRIPTION, `AMOUNT`:=USERS_ITEMS_INITIAL.AMOUNT, `UPDATED`:=USERS_ITEMS_INITIAL.UPDATED)) ACTIVE_ITEMS
FROM USERS_ITEMS_INITIAL
GROUP BY USERS_ITEMS_INITIAL.USERID
EMIT CHANGES;

Do some insertions in the users.items topic

Key
10-T-TOM
Value
{
	"name": "TOMATOES",
	"description": "They are tasty",
	"amount": 1,
	"updated": 1905906822314,
	"userId": 10
}

Key
10-T-TOM
Value
{
	"name": "TOMATOES",
	"description": "They are tasty",
	"amount": 2,
	"updated": 1925906822314,
	"userId": 10
}

Key
10-T-TOM
Value
{
	"name": "TOMATOES",
	"description": "They are tasty",
	"amount": 3,
	"updated": 1935906822314,
	"userId": 10
}

Key
10-T-POT
Value
{
	"name": "Potatoes",
	"description": "They are tasty too",
	"amount": 0,
	"updated": 1905906822314,
	"userId": 10
}

Key
10-T-POT
Value
{
	"name": "Potatoes",
	"description": "They are tasty too",
	"amount": 1,
	"updated": 1915906822314,
	"userId": 10
}

After the last one - i seem to have ended with 2 Potatoes records in the resulting table - which is not what I want

{
	"ACTIVE_ITEMS": [
		{
			"NAME": "TOMATOES",
			"DESCRIPTION": "They are tasty",
			"AMOUNT": 3,
			"UPDATED": 1935906822314
		},
		{
			"NAME": "Potatoes",
			"DESCRIPTION": "They are tasty too",
			"AMOUNT": 0,
			"UPDATED": 1905906822314
		},
		{
			"NAME": "Potatoes",
			"DESCRIPTION": "They are tasty too",
			"AMOUNT": 1,
			"UPDATED": 1915906822314
		}
	]
}

Here is a screenshot
image

DISCLAIMER: This does not happen always and for the same records - it is kinda intermittent.
Would gladly appreciate the help

UPDATE 1: It seems this happens when same message key goes into different partitions. I wonder why and if this could happen in the failure scenarios (like OOM, volume issues, etc)

from ksql.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.