Giter VIP home page Giter VIP logo

Comments (13)

xiaobaxiang avatar xiaobaxiang commented on May 23, 2024

this is the log:

hstreamdb-hserver-1  | /usr/local/script/wait-for-storage.sh: connect: Connection refused
hstreamdb-hserver-1  | /usr/local/script/wait-for-storage.sh: line 11: /dev/tcp/zookeeper/2181: Connection refused
hstreamdb-hserver-1  | Waiting for zookeeper zookeeper:2181 ...
hstreamdb-hserver-1  | /usr/local/script/wait-for-storage.sh: connect: Connection refused
hstreamdb-hserver-1  | /usr/local/script/wait-for-storage.sh: line 11: /dev/tcp/zookeeper/2181: Connection refused
hstreamdb-hserver-1  | Waiting for zookeeper zookeeper:2181 ...
hstreamdb-hserver-1  | HStore is up - Starting HServer...
hstreamdb-hserver-1  | 2023-05-09 01:25:25,123:1(0x7fb0b9688640):ZOO_INFO@log_env@753: Client environment:zookeeper.version=zookeeper C client 3.4.13
hstreamdb-hserver-1  | 2023-05-09 01:25:25,123:1(0x7fb0b9688640):ZOO_INFO@log_env@757: Client environment:host.name=1fecedecbee8
hstreamdb-hserver-1  | 2023-05-09 01:25:25,123:1(0x7fb0b9688640):ZOO_INFO@log_env@764: Client environment:os.name=Linux
hstreamdb-hserver-1  | 2023-05-09 01:25:25,123:1(0x7fb0b9688640):ZOO_INFO@log_env@765: Client environment:os.arch=5.10.134-13.1.al8.x86_64
hstreamdb-hserver-1  | 2023-05-09 01:25:25,123:1(0x7fb0b9688640):ZOO_INFO@log_env@766: Client environment:os.version=#1 SMP Mon Feb 6 14:54:50 CST 2023
hstreamdb-hserver-1  | 2023-05-09 01:25:25,123:1(0x7fb0b9688640):ZOO_INFO@log_env@774: Client environment:user.name=(null)
hstreamdb-hserver-1  | 2023-05-09 01:25:25,123:1(0x7fb0b9688640):ZOO_INFO@log_env@782: Client environment:user.home=/root
hstreamdb-hserver-1  | 2023-05-09 01:25:25,123:1(0x7fb0b9688640):ZOO_INFO@log_env@794: Client environment:user.dir=/
hstreamdb-hserver-1  | 2023-05-09 01:25:25,123:1(0x7fb0b9688640):ZOO_INFO@zookeeper_init@818: Initiating client connection, host=zookeeper:2181 sessionTimeout=5000 watcher=(nil) sessionId=0 sessionPasswd=<null> context=(nil) flags=0
hstreamdb-hserver-1  | 2023-05-09 01:25:25,126:1(0x7fb09b7f6700):ZOO_INFO@check_events@1763: initiated connection to server [172.19.0.2:2181]
hstreamdb-hserver-1  | 2023-05-09 01:25:25,161:1(0x7fb09b7f6700):ZOO_INFO@check_events@1809: session establishment complete on server [172.19.0.2:2181], sessionId=0x100129730310000, negotiated timeout=5000
hstreamdb-hserver-1  | E0509 01:25:25.379181       1 [not-set] FileConfigSource.cpp:129] stat_mtime() stat() on config file "" failed. errno=2 (No such file or directory)
hstreamdb-hserver-1  | E0509 01:25:25.379660       1 [not-set] FileConfigSource.cpp:129] stat_mtime() stat() on config file "" failed. errno=2 (No such file or directory)
hstreamdb-hserver-1  | E0509 01:25:25.383151      54 [logdevice:WG0] SSLFetcher.cpp:77] reloadSSLContext() Failed to load SSL certificate, ex: SSL_CTX_load_verify_locations: No such file or directory; no such file; system lib
hstreamdb-hserver-1  | E0509 01:25:25.384287      54 [logdevice:WG0] GetClusterStateRequest.cpp:243] onError() Retrieving the state of the cluster failed. giving up.
hstreamdb-hserver-1  | E0509 01:25:25.384299      54 [logdevice:WG0] ClusterState.cpp:260] onGetClusterStateDone() Unable to refresh cluster state: FAILED: request failed
hstreamdb-hserver-1  | 2023-05-09 01:25:26,921:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 13ms
hstreamdb-hserver-1  | W0509 01:25:28.958837      54 [logdevice:WG0] SocketSender.cpp:633] forceShutdown() Force shutdown of SocketSender w/o graceful shutdown attempt
hstreamdb-hserver-1  | E0509 01:25:28.961684       1 [not-set] FileConfigSource.cpp:129] stat_mtime() stat() on config file "" failed. errno=2 (No such file or directory)
hstreamdb-hserver-1  | E0509 01:25:28.961949       1 [not-set] FileConfigSource.cpp:129] stat_mtime() stat() on config file "" failed. errno=2 (No such file or directory)
hstreamdb-hserver-1  | E0509 01:25:28.962907      57 [logdevice:WG0] SSLFetcher.cpp:77] reloadSSLContext() Failed to load SSL certificate, ex: SSL_CTX_load_verify_locations: No such file or directory; no such file; system lib
hstreamdb-hserver-1  | W0509 01:25:30.103831      57 [logdevice:WG0] ClientReadStream.cpp:2476] updateCurrentMetaData() Existing epoch metadata has changed for log 4611686018427387901 on epoch 1, check metadata log!
hstreamdb-hserver-1  | [INFO][2023-05-09T01:25:30+0000][./HStream/IO/Worker.hs:35:3][thread#4]new Worker with hsConfig:HStreamConfig {serviceUrl = "hstream://********:6570"}
hstreamdb-hserver-1  | [INFO][2023-05-09T01:25:30+0000][app/server.hs:153:3][thread#10]************************
hstreamdb-hserver-1  | 
hstreamdb-hserver-1  |    _  _   __ _____ ___ ___  __  __ __
hstreamdb-hserver-1  |   | || |/' _/_   _| _ \ __|/  \|  V  |
hstreamdb-hserver-1  |   | >< |`._`. | | | v / _|| /\ | \_/ |
hstreamdb-hserver-1  |   |_||_||___/ |_| |_|_\___|_||_|_| |_|
hstreamdb-hserver-1  | 
hstreamdb-hserver-1  |   
hstreamdb-hserver-1  | [INFO][2023-05-09T01:25:30+0000][app/server.hs:161:3][thread#10]************************
hstreamdb-hserver-1  | [INFO][2023-05-09T01:25:30+0000][app/server.hs:234:3][thread#10]Starting server with hs-grpc-server...
hstreamdb-hserver-1  | [INFO][2023-05-09T01:25:30+0000][src/HStream/Gossip/Start.hs:117:7][thread#11]Starting gossip server with hs-grpc-server...
hstreamdb-hserver-1  | [INFO][2023-05-09T01:25:30+0000][app/server.hs:164:9][thread#2]Server is started on port 6570, waiting for cluster to get ready
hstreamdb-hserver-1  | [INFO][2023-05-09T01:25:31+0000][src/HStream/Gossip/Start.hs:147:3][thread#4]Only one node in the cluster, no bootstrapping needed
hstreamdb-hserver-1  | [INFO][2023-05-09T01:25:31+0000][src/HStream/Gossip/Start.hs:150:3][thread#4]All servers have been initialized
hstreamdb-hserver-1  | [INFO][2023-05-09T01:25:31+0000][app/server.hs:166:59][thread#26]Cluster is ready!
hstreamdb-hserver-1  | [INFO][2023-05-09T01:25:31+0000][app/server.hs:173:11][thread#26]recovering local io tasks
hstreamdb-hserver-1  | [INFO][2023-05-09T01:25:31+0000][app/server.hs:175:11][thread#26]recovering local query tasks
hstreamdb-hserver-1  | [INFO][2023-05-09T01:25:31+0000][app/server.hs:177:11][thread#26]recovered tasks
hstreamdb-hserver-1  | 2023-05-09 01:25:42,966:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 16ms
hstreamdb-hserver-1  | [INFO][2023-05-09T01:25:57+0000][src/HStream/Server/Core/Cluster.hs:93:3][thread#30]receive lookupShard request: LookupShardRequest {lookupShardRequestShardId = 1766532669049322}, should send to "ServerNode {serverNodeId = 100, serverNodeHost = \"********\", serverNodePort = 6570}"
hstreamdb-hserver-1  | E0509 01:25:57.225585      57 [logdevice:WG0] HashBasedSequencerLocator.cpp:208] locateSequencer() No available sequencer node for log 1766532669049322. All sequencer nodes are unavailable.
hstreamdb-hserver-1  | E0509 01:25:57.225614      57 [logdevice:WG0] SequencerRouter.cpp:56] operator()() Error during sequencer lookup for log 1766532669049322 (NOSEQUENCER), reporting NOSEQUENCER.
hstreamdb-hserver-1  | E0509 01:25:57.225619      57 [logdevice:WG0] SequencerRouter.cpp:105] onFailure() Error during sequencer routing for log 1766532669049322 (NOSEQUENCER), request[type:APPEND_REQ_TYPE, handler:0x7fb090019a28].
hstreamdb-hserver-1  | [WARNING][2023-05-09T01:25:57+0000][src/HStream/Server/Handler/Stream.hs:203:57][thread#31]NOSEQUENCER {name:NOSEQUENCER, description:NOSEQUENCER: no sequencer was found for log, callstack:CallStack (from HasCallStack):
hstreamdb-hserver-1  |   throwStreamErrorIfNotOK', called at ./HStream/Store/Internal/LogDevice/Writer.hs:135:16 in hstream-store-0.1.0.0-b4bf2d5a6afba9e5cba28f15d29ae5e2c366a59feb5ce6752a10dc8423ad197f:HStream.Store.Internal.LogDevice.Writer
hstreamdb-hserver-1  |   appendBatchBS, called at ./HStream/Store/Internal/LogDevice/Writer.hs:71:43 in hstream-store-0.1.0.0-b4bf2d5a6afba9e5cba28f15d29ae5e2c366a59feb5ce6752a10dc8423ad197f:HStream.Store.Internal.LogDevice.Writer
hstreamdb-hserver-1  |   appendCompressedBS, called at src/HStream/Server/Core/Stream.hs:190:30 in hstream-0.1.0.0-5bbf944534ed1ba138333a394d08b11e0e84afac34970537d78579944ddd8596:HStream.Server.Core.Stream
hstreamdb-hserver-1  |   appendStream, called at src/HStream/Server/Core/Stream.hs:172:11 in hstream-0.1.0.0-5bbf944534ed1ba138333a394d08b11e0e84afac34970537d78579944ddd8596:HStream.Server.Core.Stream
hstreamdb-hserver-1  |   append, called at src/HStream/Server/Handler/Stream.hs:147:3 in hstream-0.1.0.0-5bbf944534ed1ba138333a394d08b11e0e84afac34970537d78579944ddd8596:HStream.Server.Handler.Stream}
hstreamdb-hserver-1  | [INFO][2023-05-09T01:26:00+0000][src/HStream/Server/Core/Cluster.hs:93:3][thread#33]receive lookupShard request: LookupShardRequest {lookupShardRequestShardId = 1766532669049322}, should send to "ServerNode {serverNodeId = 100, serverNodeHost = \"********\", serverNodePort = 6570}"
hstreamdb-hserver-1  | [INFO][2023-05-09T01:26:02+0000][src/HStream/Server/Core/Cluster.hs:93:3][thread#35]receive lookupShard request: LookupShardRequest {lookupShardRequestShardId = 1766532669049322}, should send to "ServerNode {serverNodeId = 100, serverNodeHost = \"********\", serverNodePort = 6570}"
hstreamdb-hserver-1  | [INFO][2023-05-09T01:26:08+0000][src/HStream/Server/Core/Cluster.hs:106:3][thread#38]receive lookupSubscription request: LookupSubscriptionRequest {lookupSubscriptionRequestSubscriptionId = "rawdata_topic"}
hstreamdb-hserver-1  | [INFO][2023-05-09T01:26:08+0000][src/HStream/Server/Core/Cluster.hs:106:3][thread#39]receive lookupSubscription request: LookupSubscriptionRequest {lookupSubscriptionRequestSubscriptionId = "rawdata_topic"}
hstreamdb-hserver-1  | [INFO][2023-05-09T01:26:08+0000][src/HStream/Server/Core/Subscription.hs:346:7][thread#43]Create a checkpoint store reader for rawdata_topic
hstreamdb-hserver-1  | [INFO][2023-05-09T01:26:08+0000][src/HStream/Server/Core/Subscription.hs:353:7][thread#43]Create a reader for rawdata_topic
hstreamdb-hserver-1  | 2023-05-09 01:26:27,235:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 43ms
hstreamdb-hserver-1  | [INFO][2023-05-09T01:26:27+0000][src/HStream/Server/Core/Cluster.hs:93:3][thread#169]receive lookupShard request: LookupShardRequest {lookupShardRequestShardId = 1766532669049322}, should send to "ServerNode {serverNodeId = 100, serverNodeHost = \"********\", serverNodePort = 6570}"
hstreamdb-hserver-1  | 2023-05-09 01:26:40,632:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 11ms
hstreamdb-hserver-1  | 2023-05-09 01:26:54,031:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 12ms
hstreamdb-hserver-1  | 2023-05-09 01:26:57,381:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 12ms
hstreamdb-hserver-1  | [INFO][2023-05-09T01:26:57+0000][src/HStream/Server/Core/Cluster.hs:93:3][thread#326]receive lookupShard request: LookupShardRequest {lookupShardRequestShardId = 1766532669049322}, should send to "ServerNode {serverNodeId = 100, serverNodeHost = \"********\", serverNodePort = 6570}"
hstreamdb-hserver-1  | 2023-05-09 01:27:02,405:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 13ms
hstreamdb-hserver-1  | 2023-05-09 01:27:05,760:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 15ms
hstreamdb-hserver-1  | 2023-05-09 01:27:09,106:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 11ms
hstreamdb-hserver-1  | 2023-05-09 01:27:12,454:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 11ms

from hstream.

YangKian avatar YangKian commented on May 23, 2024

hi, need some informations to help located the problem:

  • Did you call Ack() after the message was received by the consumer? Here is an example:
          var consumer = client.newConsumer().subscription(subId).rawRecordReceiver(
                  (receivedRawRecord, responder) -> {
                      System.out.println(receivedRawRecord.getRecordId());
                      responder.ack();  // <- call ACK here
                  }).build();
  • Were all previously received messages retransmitted? Or just some of them?

from hstream.

xiaobaxiang avatar xiaobaxiang commented on May 23, 2024

yes, i call Ack();

        RawRecordReceiver receiver =
                ((rRecord, responder) -> {
                    var receiveData = rRecord.getRawRecord();
                    responder.ack();//收到消息后立即返回确认,这种下面处理如果有异常不会重新处理
                    System.out.println("接收到消息 :" + HexFormat.of().formatHex(receiveData));
                   
                });

I don't know if it's all the messages, I saw that the first received messages were 6 days ago

from hstream.

YangKian avatar YangKian commented on May 23, 2024

In fact, our current subscription only supports at-least-once delivery, and in order to make full use of bandwidth, ack responses are sent to the server in batch, so in some scenarios, it is indeed possible that the acked data will still be retransmitted after the server restarts, e.g:

  • The server is restarted before it has a chance to finish processing all the received ack
  • When the server is restarted, the client has not yet sent the ack to the server

(Note: ack only works on the current subscription, ack is not shared between multiple subscriptions created for the same stream.)

Anyway, it seems impossible for a consumer to retransmit data from 6 days ago with continuous consumption, the consumption progress will be stuck in the first unacked record.

So, for your scenario.

  • If you create a new subscription after a restart, instead of consuming the old one directly, then it is normal to read the old data. Because the two subscriptions are independent.
  • If it was a direct consumption of a previous subscription, I can only think of one possible reason for the error: In v0.15.0 we updated the underlying handling of consumption progress, so if you created a subscription before v0.15.0, and then upgraded to v0.15.0, you may have lost progress in consumption. In this case, you can only clear all the data and rebuild the entire cluster, then check if there are still retransmissions happening

from hstream.

xiaobaxiang avatar xiaobaxiang commented on May 23, 2024

yes,i upgraded from latest to v0.15.0
how to clear all the data in stream? delete stream and recreate it?

from hstream.

YangKian avatar YangKian commented on May 23, 2024

I suggest you use the image of hstreamdb/hstream:latest, we have made additional updates after v0.15.0, but no new version of the image has been released yet. The client version does not need to be updated.

Follow these steps(Note that all previous data will be removed):

  • Stop cluster: docker compose -f quick-start.yaml down
  • Remove volume: docker volume rm quickstart_data_store quickstart_data_zk_data quickstart_data_zk_datalog
  • Fetch new image: docker pull hstreamdb/hstream:latest
  • Update quick-start.yaml to use hstreamdb/hstream:latest
  • Restart cluster: docker compose -f quick-start.yaml up -d

Now you get a clean cluster and can create streams, do subscribe like before.
If there is still any retransmission occurring in the future, feel free to let us know.

from hstream.

xiaobaxiang avatar xiaobaxiang commented on May 23, 2024

I have drop the stream and recreate it.It is working now.
By the way, I created two consumption threads through the official website tutorial. But only one of the consumer threads receives the message after each startup. The other one has not received the message all the time, is this normal?

from hstream.

YangKian avatar YangKian commented on May 23, 2024

Do you mean two consumers consume the same subscription? Or each consumer bind to a different subscription? Would you mind posting the code for the subscription and consumer sections?

In addition, have you set the shard count more than one when you create stream?

from hstream.

xiaobaxiang avatar xiaobaxiang commented on May 23, 2024

With the same subscription.

        String subscription = "rawdata_topic";// "your_subscription_id";
        HStreamClient client = HStreamClient.builder()
                .serviceUrl(serviceUrl)
                .build();

        // create two consumers to consume records with several partition keys.
        Thread t1 = new Thread(() -> consumeDataFromSubscriptionShared(client, subscription, consumer1));
        Thread t2 = new Thread(() -> consumeDataFromSubscriptionShared(client, subscription, consumer2));
        t1.start();
        t2.start();
        try {
            t1.join();
            t2.join();
            var i = System.in.read();
            client.close();
        } catch (Exception ex) {
            ex.printStackTrace();
        }


 public static void consumeDataFromSubscriptionShared(
            HStreamClient client, String subscription, String consumerName) {
        RawRecordReceiver receiver =
                ((rRecord, responder) -> {
                    var receiveData = rRecord.getRawRecord();
                    responder.ack();
                    System.out.println("接收到消息 :" + HexFormat.of().formatHex(receiveData));
                });
        Consumer consumer =
                client
                        .newConsumer()
                        .subscription(subscription)
                        .name(consumerName)
                        .rawRecordReceiver(receiver)
                        // When ack() is called, the consumer will not send it to servers immediately,
                        // the ack request will be buffered until the ack count reaches ackBufferSize
                        // or the consumer is stopping or reached ackAgelimit
                        .ackBufferSize(100)
                        .ackAgeLimit(100)
                        .build();

        // add Listener for handling failed consumer
        var threadPool = new ScheduledThreadPoolExecutor(1);
        consumer.addListener(
                new Service.Listener() {
                    public void failed(Service.State from, Throwable failure) {
                        //deleteSubscription(client,streamName);
                        System.out.println("消费失败, 异常信息: " + failure.getMessage());
                        consumer.stopAsync().awaitTerminated();
                    }

                    @Override
                    public void terminated(Service.State from) {
                        //deleteSubscription(client,streamName);
                        System.out.println("消费停止,线程已终止: " + from.name());
                        consumer.stopAsync().awaitTerminated();
                    }
                },
                threadPool);

        // start Consumer as a background service and return
        consumer.startAsync().awaitRunning();

    }

from hstream.

YangKian avatar YangKian commented on May 23, 2024

In addition, have you set the shard count more than one when you create stream?

Sorry, an additional question was added. For the concept of shard, please refer to https://docs.hstream.io/write/shards.html (By the way, this is our new document address)

Multiple consumers on the same subscription form a consumer group. The data of a stream will be distributed to different consumers in the consumer group according to the smallest unit of the shard. That is: if you have two consumers but the stream has only one shard, then only one consumer will deliver the data and the other will be idle. If you have 4 shards, then each consumer will deliver data for 2 shards each

from hstream.

xiaobaxiang avatar xiaobaxiang commented on May 23, 2024

I have recreate the stream by:

public static void createStreamWithAttrsExample(HStreamClient client, String streamName) {
        client.createStream(
                streamName,
                (short) 1 // replication factor
                ,
                4 // Number of shards
                ,
                7 * 24 * 3600 // backlog retention time in seconds
        );
    }
> show streams;
+-------------+---------+----------------+-------------+
| Stream Name | Replica | Retention Time | Shard Count |
+-------------+---------+----------------+-------------+
| rawdemo     | 1       | 604800 seconds | 4           |
+-------------+---------+----------------+-------------+

But still only one thread can receive the message, and the other one cannot receive the message

from hstream.

YangKian avatar YangKian commented on May 23, 2024

This is probably because the partition-key was not set when the records were created. In hstream, data with the same partition-key is written to the same shard. If the key is not set, all the data will be written to the same shard, and will naturally be delivered to the same consumer when consumed.

Here is an example to set partition-key

            Record record =
                    Record.newBuilder().rawRecord(message.getBytes(StandardCharsets.UTF_8)).build();
            record.setPartitionKey("key-" + System.currentTimeMillis());

from hstream.

xiaobaxiang avatar xiaobaxiang commented on May 23, 2024

ok,it is working now,thanks a lot

from hstream.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.