Comments (13)
this is the log:
hstreamdb-hserver-1 | /usr/local/script/wait-for-storage.sh: connect: Connection refused
hstreamdb-hserver-1 | /usr/local/script/wait-for-storage.sh: line 11: /dev/tcp/zookeeper/2181: Connection refused
hstreamdb-hserver-1 | Waiting for zookeeper zookeeper:2181 ...
hstreamdb-hserver-1 | /usr/local/script/wait-for-storage.sh: connect: Connection refused
hstreamdb-hserver-1 | /usr/local/script/wait-for-storage.sh: line 11: /dev/tcp/zookeeper/2181: Connection refused
hstreamdb-hserver-1 | Waiting for zookeeper zookeeper:2181 ...
hstreamdb-hserver-1 | HStore is up - Starting HServer...
hstreamdb-hserver-1 | 2023-05-09 01:25:25,123:1(0x7fb0b9688640):ZOO_INFO@log_env@753: Client environment:zookeeper.version=zookeeper C client 3.4.13
hstreamdb-hserver-1 | 2023-05-09 01:25:25,123:1(0x7fb0b9688640):ZOO_INFO@log_env@757: Client environment:host.name=1fecedecbee8
hstreamdb-hserver-1 | 2023-05-09 01:25:25,123:1(0x7fb0b9688640):ZOO_INFO@log_env@764: Client environment:os.name=Linux
hstreamdb-hserver-1 | 2023-05-09 01:25:25,123:1(0x7fb0b9688640):ZOO_INFO@log_env@765: Client environment:os.arch=5.10.134-13.1.al8.x86_64
hstreamdb-hserver-1 | 2023-05-09 01:25:25,123:1(0x7fb0b9688640):ZOO_INFO@log_env@766: Client environment:os.version=#1 SMP Mon Feb 6 14:54:50 CST 2023
hstreamdb-hserver-1 | 2023-05-09 01:25:25,123:1(0x7fb0b9688640):ZOO_INFO@log_env@774: Client environment:user.name=(null)
hstreamdb-hserver-1 | 2023-05-09 01:25:25,123:1(0x7fb0b9688640):ZOO_INFO@log_env@782: Client environment:user.home=/root
hstreamdb-hserver-1 | 2023-05-09 01:25:25,123:1(0x7fb0b9688640):ZOO_INFO@log_env@794: Client environment:user.dir=/
hstreamdb-hserver-1 | 2023-05-09 01:25:25,123:1(0x7fb0b9688640):ZOO_INFO@zookeeper_init@818: Initiating client connection, host=zookeeper:2181 sessionTimeout=5000 watcher=(nil) sessionId=0 sessionPasswd=<null> context=(nil) flags=0
hstreamdb-hserver-1 | 2023-05-09 01:25:25,126:1(0x7fb09b7f6700):ZOO_INFO@check_events@1763: initiated connection to server [172.19.0.2:2181]
hstreamdb-hserver-1 | 2023-05-09 01:25:25,161:1(0x7fb09b7f6700):ZOO_INFO@check_events@1809: session establishment complete on server [172.19.0.2:2181], sessionId=0x100129730310000, negotiated timeout=5000
hstreamdb-hserver-1 | E0509 01:25:25.379181 1 [not-set] FileConfigSource.cpp:129] stat_mtime() stat() on config file "" failed. errno=2 (No such file or directory)
hstreamdb-hserver-1 | E0509 01:25:25.379660 1 [not-set] FileConfigSource.cpp:129] stat_mtime() stat() on config file "" failed. errno=2 (No such file or directory)
hstreamdb-hserver-1 | E0509 01:25:25.383151 54 [logdevice:WG0] SSLFetcher.cpp:77] reloadSSLContext() Failed to load SSL certificate, ex: SSL_CTX_load_verify_locations: No such file or directory; no such file; system lib
hstreamdb-hserver-1 | E0509 01:25:25.384287 54 [logdevice:WG0] GetClusterStateRequest.cpp:243] onError() Retrieving the state of the cluster failed. giving up.
hstreamdb-hserver-1 | E0509 01:25:25.384299 54 [logdevice:WG0] ClusterState.cpp:260] onGetClusterStateDone() Unable to refresh cluster state: FAILED: request failed
hstreamdb-hserver-1 | 2023-05-09 01:25:26,921:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 13ms
hstreamdb-hserver-1 | W0509 01:25:28.958837 54 [logdevice:WG0] SocketSender.cpp:633] forceShutdown() Force shutdown of SocketSender w/o graceful shutdown attempt
hstreamdb-hserver-1 | E0509 01:25:28.961684 1 [not-set] FileConfigSource.cpp:129] stat_mtime() stat() on config file "" failed. errno=2 (No such file or directory)
hstreamdb-hserver-1 | E0509 01:25:28.961949 1 [not-set] FileConfigSource.cpp:129] stat_mtime() stat() on config file "" failed. errno=2 (No such file or directory)
hstreamdb-hserver-1 | E0509 01:25:28.962907 57 [logdevice:WG0] SSLFetcher.cpp:77] reloadSSLContext() Failed to load SSL certificate, ex: SSL_CTX_load_verify_locations: No such file or directory; no such file; system lib
hstreamdb-hserver-1 | W0509 01:25:30.103831 57 [logdevice:WG0] ClientReadStream.cpp:2476] updateCurrentMetaData() Existing epoch metadata has changed for log 4611686018427387901 on epoch 1, check metadata log!
hstreamdb-hserver-1 | [INFO][2023-05-09T01:25:30+0000][./HStream/IO/Worker.hs:35:3][thread#4]new Worker with hsConfig:HStreamConfig {serviceUrl = "hstream://********:6570"}
hstreamdb-hserver-1 | [INFO][2023-05-09T01:25:30+0000][app/server.hs:153:3][thread#10]************************
hstreamdb-hserver-1 |
hstreamdb-hserver-1 | _ _ __ _____ ___ ___ __ __ __
hstreamdb-hserver-1 | | || |/' _/_ _| _ \ __|/ \| V |
hstreamdb-hserver-1 | | >< |`._`. | | | v / _|| /\ | \_/ |
hstreamdb-hserver-1 | |_||_||___/ |_| |_|_\___|_||_|_| |_|
hstreamdb-hserver-1 |
hstreamdb-hserver-1 |
hstreamdb-hserver-1 | [INFO][2023-05-09T01:25:30+0000][app/server.hs:161:3][thread#10]************************
hstreamdb-hserver-1 | [INFO][2023-05-09T01:25:30+0000][app/server.hs:234:3][thread#10]Starting server with hs-grpc-server...
hstreamdb-hserver-1 | [INFO][2023-05-09T01:25:30+0000][src/HStream/Gossip/Start.hs:117:7][thread#11]Starting gossip server with hs-grpc-server...
hstreamdb-hserver-1 | [INFO][2023-05-09T01:25:30+0000][app/server.hs:164:9][thread#2]Server is started on port 6570, waiting for cluster to get ready
hstreamdb-hserver-1 | [INFO][2023-05-09T01:25:31+0000][src/HStream/Gossip/Start.hs:147:3][thread#4]Only one node in the cluster, no bootstrapping needed
hstreamdb-hserver-1 | [INFO][2023-05-09T01:25:31+0000][src/HStream/Gossip/Start.hs:150:3][thread#4]All servers have been initialized
hstreamdb-hserver-1 | [INFO][2023-05-09T01:25:31+0000][app/server.hs:166:59][thread#26]Cluster is ready!
hstreamdb-hserver-1 | [INFO][2023-05-09T01:25:31+0000][app/server.hs:173:11][thread#26]recovering local io tasks
hstreamdb-hserver-1 | [INFO][2023-05-09T01:25:31+0000][app/server.hs:175:11][thread#26]recovering local query tasks
hstreamdb-hserver-1 | [INFO][2023-05-09T01:25:31+0000][app/server.hs:177:11][thread#26]recovered tasks
hstreamdb-hserver-1 | 2023-05-09 01:25:42,966:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 16ms
hstreamdb-hserver-1 | [INFO][2023-05-09T01:25:57+0000][src/HStream/Server/Core/Cluster.hs:93:3][thread#30]receive lookupShard request: LookupShardRequest {lookupShardRequestShardId = 1766532669049322}, should send to "ServerNode {serverNodeId = 100, serverNodeHost = \"********\", serverNodePort = 6570}"
hstreamdb-hserver-1 | E0509 01:25:57.225585 57 [logdevice:WG0] HashBasedSequencerLocator.cpp:208] locateSequencer() No available sequencer node for log 1766532669049322. All sequencer nodes are unavailable.
hstreamdb-hserver-1 | E0509 01:25:57.225614 57 [logdevice:WG0] SequencerRouter.cpp:56] operator()() Error during sequencer lookup for log 1766532669049322 (NOSEQUENCER), reporting NOSEQUENCER.
hstreamdb-hserver-1 | E0509 01:25:57.225619 57 [logdevice:WG0] SequencerRouter.cpp:105] onFailure() Error during sequencer routing for log 1766532669049322 (NOSEQUENCER), request[type:APPEND_REQ_TYPE, handler:0x7fb090019a28].
hstreamdb-hserver-1 | [WARNING][2023-05-09T01:25:57+0000][src/HStream/Server/Handler/Stream.hs:203:57][thread#31]NOSEQUENCER {name:NOSEQUENCER, description:NOSEQUENCER: no sequencer was found for log, callstack:CallStack (from HasCallStack):
hstreamdb-hserver-1 | throwStreamErrorIfNotOK', called at ./HStream/Store/Internal/LogDevice/Writer.hs:135:16 in hstream-store-0.1.0.0-b4bf2d5a6afba9e5cba28f15d29ae5e2c366a59feb5ce6752a10dc8423ad197f:HStream.Store.Internal.LogDevice.Writer
hstreamdb-hserver-1 | appendBatchBS, called at ./HStream/Store/Internal/LogDevice/Writer.hs:71:43 in hstream-store-0.1.0.0-b4bf2d5a6afba9e5cba28f15d29ae5e2c366a59feb5ce6752a10dc8423ad197f:HStream.Store.Internal.LogDevice.Writer
hstreamdb-hserver-1 | appendCompressedBS, called at src/HStream/Server/Core/Stream.hs:190:30 in hstream-0.1.0.0-5bbf944534ed1ba138333a394d08b11e0e84afac34970537d78579944ddd8596:HStream.Server.Core.Stream
hstreamdb-hserver-1 | appendStream, called at src/HStream/Server/Core/Stream.hs:172:11 in hstream-0.1.0.0-5bbf944534ed1ba138333a394d08b11e0e84afac34970537d78579944ddd8596:HStream.Server.Core.Stream
hstreamdb-hserver-1 | append, called at src/HStream/Server/Handler/Stream.hs:147:3 in hstream-0.1.0.0-5bbf944534ed1ba138333a394d08b11e0e84afac34970537d78579944ddd8596:HStream.Server.Handler.Stream}
hstreamdb-hserver-1 | [INFO][2023-05-09T01:26:00+0000][src/HStream/Server/Core/Cluster.hs:93:3][thread#33]receive lookupShard request: LookupShardRequest {lookupShardRequestShardId = 1766532669049322}, should send to "ServerNode {serverNodeId = 100, serverNodeHost = \"********\", serverNodePort = 6570}"
hstreamdb-hserver-1 | [INFO][2023-05-09T01:26:02+0000][src/HStream/Server/Core/Cluster.hs:93:3][thread#35]receive lookupShard request: LookupShardRequest {lookupShardRequestShardId = 1766532669049322}, should send to "ServerNode {serverNodeId = 100, serverNodeHost = \"********\", serverNodePort = 6570}"
hstreamdb-hserver-1 | [INFO][2023-05-09T01:26:08+0000][src/HStream/Server/Core/Cluster.hs:106:3][thread#38]receive lookupSubscription request: LookupSubscriptionRequest {lookupSubscriptionRequestSubscriptionId = "rawdata_topic"}
hstreamdb-hserver-1 | [INFO][2023-05-09T01:26:08+0000][src/HStream/Server/Core/Cluster.hs:106:3][thread#39]receive lookupSubscription request: LookupSubscriptionRequest {lookupSubscriptionRequestSubscriptionId = "rawdata_topic"}
hstreamdb-hserver-1 | [INFO][2023-05-09T01:26:08+0000][src/HStream/Server/Core/Subscription.hs:346:7][thread#43]Create a checkpoint store reader for rawdata_topic
hstreamdb-hserver-1 | [INFO][2023-05-09T01:26:08+0000][src/HStream/Server/Core/Subscription.hs:353:7][thread#43]Create a reader for rawdata_topic
hstreamdb-hserver-1 | 2023-05-09 01:26:27,235:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 43ms
hstreamdb-hserver-1 | [INFO][2023-05-09T01:26:27+0000][src/HStream/Server/Core/Cluster.hs:93:3][thread#169]receive lookupShard request: LookupShardRequest {lookupShardRequestShardId = 1766532669049322}, should send to "ServerNode {serverNodeId = 100, serverNodeHost = \"********\", serverNodePort = 6570}"
hstreamdb-hserver-1 | 2023-05-09 01:26:40,632:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 11ms
hstreamdb-hserver-1 | 2023-05-09 01:26:54,031:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 12ms
hstreamdb-hserver-1 | 2023-05-09 01:26:57,381:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 12ms
hstreamdb-hserver-1 | [INFO][2023-05-09T01:26:57+0000][src/HStream/Server/Core/Cluster.hs:93:3][thread#326]receive lookupShard request: LookupShardRequest {lookupShardRequestShardId = 1766532669049322}, should send to "ServerNode {serverNodeId = 100, serverNodeHost = \"********\", serverNodePort = 6570}"
hstreamdb-hserver-1 | 2023-05-09 01:27:02,405:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 13ms
hstreamdb-hserver-1 | 2023-05-09 01:27:05,760:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 15ms
hstreamdb-hserver-1 | 2023-05-09 01:27:09,106:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 11ms
hstreamdb-hserver-1 | 2023-05-09 01:27:12,454:1(0x7fb09b7f6700):ZOO_WARN@zookeeper_interest@1597: Exceeded deadline by 11ms
from hstream.
hi, need some informations to help located the problem:
- Did you call Ack() after the message was received by the consumer? Here is an example:
var consumer = client.newConsumer().subscription(subId).rawRecordReceiver( (receivedRawRecord, responder) -> { System.out.println(receivedRawRecord.getRecordId()); responder.ack(); // <- call ACK here }).build();
- Were all previously received messages retransmitted? Or just some of them?
from hstream.
yes, i call Ack();
RawRecordReceiver receiver =
((rRecord, responder) -> {
var receiveData = rRecord.getRawRecord();
responder.ack();//收到消息后立即返回确认,这种下面处理如果有异常不会重新处理
System.out.println("接收到消息 :" + HexFormat.of().formatHex(receiveData));
});
I don't know if it's all the messages, I saw that the first received messages were 6 days ago
from hstream.
In fact, our current subscription only supports at-least-once delivery, and in order to make full use of bandwidth, ack responses are sent to the server in batch, so in some scenarios, it is indeed possible that the acked data will still be retransmitted after the server restarts, e.g:
- The server is restarted before it has a chance to finish processing all the received ack
- When the server is restarted, the client has not yet sent the ack to the server
(Note: ack only works on the current subscription, ack is not shared between multiple subscriptions created for the same stream.)
Anyway, it seems impossible for a consumer to retransmit data from 6 days ago with continuous consumption, the consumption progress will be stuck in the first unacked record.
So, for your scenario.
- If you create a new subscription after a restart, instead of consuming the old one directly, then it is normal to read the old data. Because the two subscriptions are independent.
- If it was a direct consumption of a previous subscription, I can only think of one possible reason for the error: In v0.15.0 we updated the underlying handling of consumption progress, so if you created a subscription before v0.15.0, and then upgraded to v0.15.0, you may have lost progress in consumption. In this case, you can only clear all the data and rebuild the entire cluster, then check if there are still retransmissions happening
from hstream.
yes,i upgraded from latest to v0.15.0
how to clear all the data in stream? delete stream and recreate it?
from hstream.
I suggest you use the image of hstreamdb/hstream:latest, we have made additional updates after v0.15.0, but no new version of the image has been released yet. The client version does not need to be updated.
Follow these steps(Note that all previous data will be removed):
- Stop cluster:
docker compose -f quick-start.yaml down
- Remove volume:
docker volume rm quickstart_data_store quickstart_data_zk_data quickstart_data_zk_datalog
- Fetch new image:
docker pull hstreamdb/hstream:latest
- Update
quick-start.yaml
to usehstreamdb/hstream:latest
- Restart cluster:
docker compose -f quick-start.yaml up -d
Now you get a clean cluster and can create streams, do subscribe like before.
If there is still any retransmission occurring in the future, feel free to let us know.
from hstream.
I have drop the stream and recreate it.It is working now.
By the way, I created two consumption threads through the official website tutorial. But only one of the consumer threads receives the message after each startup. The other one has not received the message all the time, is this normal?
from hstream.
Do you mean two consumers consume the same subscription? Or each consumer bind to a different subscription? Would you mind posting the code for the subscription and consumer sections?
In addition, have you set the shard count more than one when you create stream?
from hstream.
With the same subscription.
String subscription = "rawdata_topic";// "your_subscription_id";
HStreamClient client = HStreamClient.builder()
.serviceUrl(serviceUrl)
.build();
// create two consumers to consume records with several partition keys.
Thread t1 = new Thread(() -> consumeDataFromSubscriptionShared(client, subscription, consumer1));
Thread t2 = new Thread(() -> consumeDataFromSubscriptionShared(client, subscription, consumer2));
t1.start();
t2.start();
try {
t1.join();
t2.join();
var i = System.in.read();
client.close();
} catch (Exception ex) {
ex.printStackTrace();
}
public static void consumeDataFromSubscriptionShared(
HStreamClient client, String subscription, String consumerName) {
RawRecordReceiver receiver =
((rRecord, responder) -> {
var receiveData = rRecord.getRawRecord();
responder.ack();
System.out.println("接收到消息 :" + HexFormat.of().formatHex(receiveData));
});
Consumer consumer =
client
.newConsumer()
.subscription(subscription)
.name(consumerName)
.rawRecordReceiver(receiver)
// When ack() is called, the consumer will not send it to servers immediately,
// the ack request will be buffered until the ack count reaches ackBufferSize
// or the consumer is stopping or reached ackAgelimit
.ackBufferSize(100)
.ackAgeLimit(100)
.build();
// add Listener for handling failed consumer
var threadPool = new ScheduledThreadPoolExecutor(1);
consumer.addListener(
new Service.Listener() {
public void failed(Service.State from, Throwable failure) {
//deleteSubscription(client,streamName);
System.out.println("消费失败, 异常信息: " + failure.getMessage());
consumer.stopAsync().awaitTerminated();
}
@Override
public void terminated(Service.State from) {
//deleteSubscription(client,streamName);
System.out.println("消费停止,线程已终止: " + from.name());
consumer.stopAsync().awaitTerminated();
}
},
threadPool);
// start Consumer as a background service and return
consumer.startAsync().awaitRunning();
}
from hstream.
In addition, have you set the shard count more than one when you create stream?
Sorry, an additional question was added. For the concept of shard, please refer to https://docs.hstream.io/write/shards.html (By the way, this is our new document address)
Multiple consumers on the same subscription form a consumer group. The data of a stream will be distributed to different consumers in the consumer group according to the smallest unit of the shard. That is: if you have two consumers but the stream has only one shard, then only one consumer will deliver the data and the other will be idle. If you have 4 shards, then each consumer will deliver data for 2 shards each
from hstream.
I have recreate the stream by:
public static void createStreamWithAttrsExample(HStreamClient client, String streamName) {
client.createStream(
streamName,
(short) 1 // replication factor
,
4 // Number of shards
,
7 * 24 * 3600 // backlog retention time in seconds
);
}
> show streams;
+-------------+---------+----------------+-------------+
| Stream Name | Replica | Retention Time | Shard Count |
+-------------+---------+----------------+-------------+
| rawdemo | 1 | 604800 seconds | 4 |
+-------------+---------+----------------+-------------+
But still only one thread can receive the message, and the other one cannot receive the message
from hstream.
This is probably because the partition-key was not set when the records were created. In hstream, data with the same partition-key is written to the same shard. If the key is not set, all the data will be written to the same shard, and will naturally be delivered to the same consumer when consumed.
Here is an example to set partition-key
Record record =
Record.newBuilder().rawRecord(message.getBytes(StandardCharsets.UTF_8)).build();
record.setPartitionKey("key-" + System.currentTimeMillis());
from hstream.
ok,it is working now,thanks a lot
from hstream.
Related Issues (20)
- Provide version info command for binaries (like `-v`, `--version`) HOT 2
- Unable to connect to JAVA client through JDK 11 - HOT 11
- Upgrade to ghc9
- 推荐您的项目参加-2022年**开源创新大赛
- arm64 bit docker builds HOT 1
- hstreamdb-java cannot connect to remote hstream server,the ip is converted to the Hstream Server intranet address HOT 1
- According to the test code on the official website, I can only listen to the message of the set duration. After this duration, the thread will exit. How to keep the thread running in the background HOT 9
- sql: parse empty array error
- How to deploy a web management page? Is this currently not open source? HOT 1
- 通过docker安装,无法启动hstore HOT 12
- 不支持Java嘛? HOT 1
- bug HOT 7
- hstream-http-server HOT 1
- write test HOT 5
- The difference between hstream and flink HOT 2
- Problems with connecting to secondary storage HOT 3
- stats: code refactoring
- reconnect zk while session expired
- Include HStream HTTP Server in Docker image. HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from hstream.