Giter VIP home page Giter VIP logo

Comments (4)

nihohit avatar nihohit commented on June 12, 2024

can you please recreate this in a minimal example, without the extra dependencies?

from redis-rs.

seal90 avatar seal90 commented on June 12, 2024

https://github.com/seal90/tonic_learn/blob/main/src/minimal/server.rs
I try just use tokio tokio-stream redis the problem also exists, when i start the log like this
send ok
send ok
receive ok, data: "hello"
receive ok, data: "hello"
send ok
send ok
receive ok, data: "hello"
receive ok, data: "hello"

from redis-rs.

nihohit avatar nihohit commented on June 12, 2024

I think the issue is that you're using a sync connection instead of an async connection. I added 2 prints, after publishing and receiving messages, and now I see these prints:

publish ok
received message
send ok
publish ok
received message
send ok
publish ok
received message
receive ok, data: "hello"
receive ok, data: "hello"
send ok
publish ok
received message

Which show that the issue is in pulling from the channel, not in redis-rs. This hinted that the issue stems from the spawned tasks not yielding to the async runtime, so I switched to using async connections, which solved the issue:

publish ok
received message
send ok
receive ok, data: "hello"
publish ok
received message
send ok
receive ok, data: "hello"
publish ok
received message
send ok
receive ok, data: "hello"
use std::time::Duration;

use redis::AsyncCommands;
use tokio::sync::mpsc;
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(2);
    let channel_name = "hello_1";

    tokio::spawn(async move {
        let client = redis::Client::open("redis://127.0.0.1/").unwrap();
        let con = client.get_async_connection().await.unwrap();

        let mut pubsub = con.into_pubsub();
        pubsub.subscribe(&channel_name).await.unwrap();
        let mut stream = pubsub.on_message();

        loop {
            let Some(msg) = stream.next().await else {
                println!("nothing");
                continue;
            };
            println!("received message");

            match tx.send("hello").await {
                Ok(_) => {
                    println!("send ok")
                    // item (server response) was queued to be send to client
                }
                Err(_item) => {
                    // output_stream was build from rx and both are dropped
                    break;
                }
            }
        }
    });

    tokio::spawn(async move {
        loop {
            let recv = rx.recv().await;
            println!("receive ok, data: {:?}", recv.unwrap());
        }
    });

    let repeat = std::iter::repeat("repeat");
    let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(1000)));

    let client = redis::Client::open("redis://127.0.0.1/").unwrap();
    let mut con = client.get_async_connection().await.unwrap();

    let channel = "hello_1";

    while let Some(item) = stream.next().await {
        let success: bool = con.publish(channel, item).await.unwrap();
        if success {
            println!("publish ok");
        } else {
            println!("publish failed :(");
        }
    }
    println!("\tclient disconnected");
}

from redis-rs.

seal90 avatar seal90 commented on June 12, 2024

thank you

from redis-rs.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.