Giter VIP home page Giter VIP logo

Comments (27)

vleushin avatar vleushin commented on September 13, 2024

What I propose is this:

connection.set("mykey", "myval")
  .flatMap(result -> connection.get("mykey"))

this executes set command and when in returns result it executes get command. Errors and correctly propageted, so if set command failed get command won't be invoked. And there are many more features of Rx.

from lettuce-core.

vleushin avatar vleushin commented on September 13, 2024

I tried to wrap current async API into observable:

public abstract class RedisFutureOnSubscribe<T> implements Observable.OnSubscribe<T>{
    private final RedisConnectionPool<RedisAsyncConnection<String, String>> redisConnectionPool;

    public RedisFutureOnSubscribe(RedisConnectionPool<RedisAsyncConnection<String, String>> redisConnectionPool) {
        this.redisConnectionPool = redisConnectionPool;
    }

    @Override
    public void call(Subscriber<? super T> subscriber) {
        RedisAsyncConnection<String, String> connection = redisConnectionPool.allocateConnection();
        try {
            RedisFuture<T> future = createFuture(connection);
            future.addListener(() -> {
                if (future.getError() != null) {
                    subscriber.onError(new RuntimeException(future.getError()));
                } else {
                    try {
                        subscriber.onNext(future.get());
                        subscriber.onCompleted();
                    } catch (InterruptedException | ExecutionException e) {
                        e.printStackTrace(); // TODO
                    }
                }
            }, MoreExecutors.directExecutor());
        } finally {
            redisConnectionPool.freeConnection(connection);
        }
    }

    protected abstract RedisFuture<T> createFuture(RedisAsyncConnection<String, String> connection);
}
public class AsyncTest {

    @Test
    public void test() throws ExecutionException, InterruptedException {
        RedisClient redisClient = new RedisClient("localhost");
        RedisConnectionPool<RedisAsyncConnection<String, String>> pool = redisClient.asyncPool();

        Observable<String> getObservable = Observable.create(new RedisFutureOnSubscribe<String>(pool) {

            @Override
            protected RedisFuture<String> createFuture(RedisAsyncConnection<String, String> connection) {
                return connection.get("test{key}");
            }
        });

        Observable<String> putObservable = Observable.create(new RedisFutureOnSubscribe<String>(pool) {

            @Override
            protected RedisFuture<String> createFuture(RedisAsyncConnection<String, String> connection) {
                return connection.set("test{key}", "hello");
            }
        });

        assertThat(
                putObservable
                        .flatMap(result ->
                                getObservable
                                        .map(value -> value + " world"))
                        .toBlocking().first(), equalTo("hello world"));
    }
}

I rely on listener runnable. But I'm nervous :). Other than async API your lib is very good, I will be digging more into it.

from lettuce-core.

mp911de avatar mp911de commented on September 13, 2024

Thanks for your input. You're right about the futures. The futures are a pain point since they're limited right now. I'm collecting ideas on how to proceed. I want to support with 3.x Java 6+ and I do not want to break the current API. My current plan is to use CompletionStage for lettuce 4.x and to drop Java 6/7 support.

Using RxJava is a new and interesting impulse that could deliver some benefits. Same applies to vert.x and so on. lettuce already depends on netty, commons-pool2 and guava. I'd love to get more input on this topic and then find the best way to satisfy most common needs. Everything else can be a how-to or a contrib/util.

from lettuce-core.

mp911de avatar mp911de commented on September 13, 2024

My feedback to your code:
0. Great!

  1. You do not need pooled connections for regular use cases. Pooled connections are checked with a PING before obtaining the connection. Just be aware of that behavior. Lettuce connections are thread-safe and long-living. Connections get reconnected if the socket gets reset.
  2. The part if error != null is good. I'd propose to use RedisException. It's a runtime exception too and it's not a raw type
  3. You should handle InterruptedException and ExecutionException in separate ways. The ExecutionException can carry exceptions which occurred in the I/O handling due to socket/protocol/SSL errors. It's a good idea to propagate the cause to the subscriber.

from lettuce-core.

vleushin avatar vleushin commented on September 13, 2024
  1. How do I know if connection is good or not? There is isOpen method on connection, can I rely on that and if it is closed then I get new one from pool?
  2. OK
  3. OK, I forgot to call onError there, for simpllicity I have one caluse to hande them all
  4. I did more testing, second requirement for me is binary data. Jedis, for instance, has two interfaces like set(String, String) and set(byte[], byte[]). You rely on Codecs, which is OK. The only thing I don't like or don't understand -- why you share codec for each connection but not have separate instance for each (share nothing).

Netty and Guava are pretty much standard. For Rx people usually create separate module or go native (like this guys -- http://blog.couchbase.com/why-couchbase-chose-rxjava-new-java-sdk ), as it is becoming more and more common for async api. Regarding your API -- I would suggest you leave it as is, use CompletionStage, but for Rx use separate module.

from lettuce-core.

vleushin avatar vleushin commented on September 13, 2024

Here is my code and question about encoding -- do I need two connections to make <String, String> and <String, byte[]> requests?

public class AsyncTest {

    @Test
    public void test() {
        RedisClient redisClient = new RedisClient("localhost");

        RedisAsyncConnection<String, String> connection = redisClient.connectAsync();
        RedisRxConnection<String, String> rxConnection = new RedisRxConnection<>(connection);
        Observable<String> observable = rxConnection.set("test{key}", "hello")
                .flatMap(setResult ->
                        rxConnection.get("test{key}"))
                .map(getResult -> getResult + " world");

        assertThat(observable.toBlocking().first(), equalTo("hello world"));


        RedisAsyncConnection<String, byte[]> binaryConnection = redisClient.connectAsync(new ByteArrayCodec());
        RedisRxConnection<String, byte[]> binaryRxConnection = new RedisRxConnection<>(binaryConnection);
        Auth auth = Auth.newBuilder() // random protobuf object
                .setAuthId(1)
                .build();
        Observable<byte[]> binaryObservable = binaryRxConnection.set("test{key}", auth.toByteArray())
                .flatMap(result ->
                        binaryRxConnection.get("test{key}"));

        assertThat(binaryObservable.toBlocking().first(), equalTo(auth.toByteArray()));
    }
}
public class RedisRxConnection<K, V> {
    private RedisAsyncConnection<K, V> connection;
    public RedisRxConnection(RedisAsyncConnection<K, V> connection) {
        this.connection = connection; // TODO what to do if connection is bad?
    }

    public Observable<String> set(K key, V value) {
        return Observable.create(new RedisFutureOnSubscribe<K, V, String>(connection) {

            @Override
            protected RedisFuture<String> createFuture(RedisAsyncConnection<K, V> connection) {
                return connection.set(key, value);
            }
        });
    }

    public Observable<V> get(K key) {
        return Observable.create(new RedisFutureOnSubscribe<K, V, V>(connection) {

            @Override
            protected RedisFuture<V> createFuture(RedisAsyncConnection<K, V> connection) {
                return connection.get(key);
            }
        });
    }
}
public abstract class RedisFutureOnSubscribe<K, V, T> implements Observable.OnSubscribe<T> {
    private final RedisAsyncConnection<K, V> connection;

    public RedisFutureOnSubscribe(RedisAsyncConnection<K, V> connection) {
        this.connection = connection;
    }

    @Override
    public void call(Subscriber<? super T> subscriber) {
        RedisFuture<T> future = createFuture(connection);
        future.addListener(() -> {
            if (future.getError() != null) {
                subscriber.onError(new RedisException(future.getError()));
            } else {
                try {
                    subscriber.onNext(future.get());
                    subscriber.onCompleted();
                } catch (InterruptedException | ExecutionException e) {
                    subscriber.onError(e);
                }
            }
        }, MoreExecutors.directExecutor());
    }

    protected abstract RedisFuture<T> createFuture(RedisAsyncConnection<K, V> connection);
}

from lettuce-core.

mp911de avatar mp911de commented on September 13, 2024

Knowing whether the connection is good: If you want to check the connection state, isOpen is your best option, but this should not be an issue. lettuce tries to reconnect your connection and so, in the end, whether you create a new connection or reconnect the old one, it ends up with working connections. The only real difference is, that a reconnect will re-issue commands that were buffered.

Multiple connections can share a codec but that's not necessary. You can create two connections with different codecs. You could go for example for RedisConnection<String, byte[]> and do the string/byte conversion on your own, as you need it. Otherwise, yes, two connections would be the alternative.

Until now, there was no request for not sharing the codec. Since the codec has a synchronized block it would speed up a little with non-sharing. The non-sharing approach would require a codec factory instead of the codec itself.

The code above looks good. You might want to use timeouts on future.get() since the get() method waits indefinitely. At least I would go for timeouts, but this depends on your application and requirements.

from lettuce-core.

vleushin avatar vleushin commented on September 13, 2024

Thanks for your input. I will look into timeouts and give feedback. Another quick question -- I noticed that there is no pipeline. Can async mode achieve same performance as pipeline? For example, I know that I'm going to work on keys that are on same slot and it is safe (I assume, but I might be wrong) to use pipeline on Redis Cluster.

from lettuce-core.

mp911de avatar mp911de commented on September 13, 2024

I'm not sure I understood you right: You're talking that you want to connect with lettuce to a dedicated cluster node and then work on the key/keys which are handled by this particular node?

If so, then you've the same performance. RedisClusterClient can do the same for you. It looks up the cluster topology at the moment you connect and then dispatches the calls to the particular nodes.

from lettuce-core.

vleushin avatar vleushin commented on September 13, 2024

Redis pipeline allows to execute bunch of commands in one go (one trip to server). I can do different operations and they will be executed as a bulk. Jedis for instance has pipeline if used without cluster: https://github.com/xetorthio/jedis/wiki/AdvancedUsage#pipelining

Question was what will be faster -- put 20 different commands in pipeline, or make 20 different async calls.

from lettuce-core.

vleushin avatar vleushin commented on September 13, 2024

Here comes first road block for me:

        Map<String, String> map = newHashMap();
        map.put("key1", "value1");
        map.put("key2", "value2");
        map.put("key3", "value3");
        map.put("key4", "value4");
        map.put("key5", "value5");
        List<String> values = rxConnection.mset(map)
                .flatMap(result ->
                        Observable.from(newArrayList("key5", "key4", "key3", "key2", "key1"))
                                .flatMap(rxConnection::get))
                .toList()
                .toBlocking()
                .first();
        assertThat(values, equalTo(newArrayList("value5", "value4", "value3", "value2", "value1")));

This fails on cluster environment because order of values can be different. It is different beause listeners fire at different time, which is understandable. I cannot use mget because keys can be on different nodes and I get CROSSSLOT. And this leaves me puzzled

from lettuce-core.

mp911de avatar mp911de commented on September 13, 2024

Ah, ok, now I got what you mean. lettuce does not split multi-key commands. It rather routes these commands in one piece to the particular cluster node.

from lettuce-core.

mp911de avatar mp911de commented on September 13, 2024

Will continue the RxJava/Future topic in #48

from lettuce-core.

mp911de avatar mp911de commented on September 13, 2024

Is there anything I can do for you within this ticket or can I close it?

from lettuce-core.

vleushin avatar vleushin commented on September 13, 2024

You can close ticket. I sorted out my problem, will continue with prototype and share when I'm done

from lettuce-core.

mp911de avatar mp911de commented on September 13, 2024

Cool. I would appreciate all contributions that help to improve lettuce.

from lettuce-core.

mp911de avatar mp911de commented on September 13, 2024

@vleushin Interested in an alpha of lettuce 4.x? Can handle certain CROSSSLOT commands (#66), has CompletionStage's (#48) and an evolving advanced cluster API.

from lettuce-core.

vleushin avatar vleushin commented on September 13, 2024

Yes, I don't mind.

BTW I sorted out most of my issues (last issue is about executor, but I will probably pass it in constructor) and living with this:

public abstract class RedisClusterFutureOnSubscribe<K, V, T> implements Observable.OnSubscribe<T> {
    // TODO
    private static final Executor executor = Executors.newSingleThreadExecutor();
    private final RedisClusterAsyncConnection<K, V> connection;

    public RedisClusterFutureOnSubscribe(RedisClusterAsyncConnection<K, V> connection) {
        this.connection = connection;
    }

    @Override
    public void call(Subscriber<? super T> subscriber) {
        RedisFuture<T> future = createFuture(connection);
        future.addListener(() -> {
            if (future.getError() != null) {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onError(new RedisException(future.getError()));
                }
            } else {
                try {
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onNext(future.get());
                        subscriber.onCompleted();
                    }
                } catch (InterruptedException | ExecutionException e) {
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onError(e);
                    }
                }
            }
        }, executor);
        future.await(5, TimeUnit.SECONDS);
    }

    protected abstract RedisFuture<T> createFuture(RedisClusterAsyncConnection<K, V> connection);
}

and this guy (I manually collect results into array, this is needed to say "here is bunch of ids, get them, in order, and call me when you are done"):

public abstract class RedisClusterMultipleFutureOnSubscribe<K, V, T> implements Observable.OnSubscribe<List<T>> {
    // TODO
    private static final Executor executor = Executors.newSingleThreadExecutor();
    private final RedisClusterAsyncConnection<K, V> connection;

    public RedisClusterMultipleFutureOnSubscribe(RedisClusterAsyncConnection<K, V> connection) {
        this.connection = connection;
    }

    @Override
    public void call(Subscriber<? super List<T>> subscriber) {
        List<RedisFuture<T>> futures = createFutures(connection);
        int size = futures.size();
        //noinspection unchecked
        T[] responses = (T[]) new Object[size];
        AtomicInteger counter = new AtomicInteger(size);
        AtomicBoolean hadErrors = new AtomicBoolean(false);
        for (int i = 0; i < size; i++) {
            RedisFuture<T> future = futures.get(i);
            final int number = i;
            future.addListener(() -> {
                if (future.getError() != null) {
                    hadErrors.set(true);
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onError(new RedisException(future.getError()));
                    }
                } else {
                    try {
                        responses[number] = future.get();
                    } catch (InterruptedException | ExecutionException e) {
                        hadErrors.set(true);
                        if (!subscriber.isUnsubscribed()) {
                            subscriber.onError(new RedisException(future.getError()));
                        }
                    } catch (Throwable e) {
                        hadErrors.set(true);
                        if (!subscriber.isUnsubscribed()) {
                            subscriber.onError(new RuntimeException(future.getError()));
                        }
                    }
                    if (counter.decrementAndGet() == 0) {
                        if (!subscriber.isUnsubscribed() && !hadErrors.get()) {
                            subscriber.onNext(Arrays.asList(responses));
                            subscriber.onCompleted();
                        }
                    }
                }
                future.await(5, TimeUnit.SECONDS);
            }, executor);
        }
    }

    protected abstract List<RedisFuture<T>> createFutures(RedisClusterAsyncConnection<K, V> connection);
}
public class RedisClusterRxConnection<K, V> {
    private final RedisClusterAsyncConnection<K, V> connection;

    public RedisClusterRxConnection(RedisClusterAsyncConnection<K, V> connection) {
        this.connection = connection;
    }

    public Observable<Long> incr(K key) {
        return Observable.create(new RedisClusterFutureOnSubscribe<K, V, Long>(connection) {

            @Override
            protected RedisFuture<Long> createFuture(RedisClusterAsyncConnection<K, V> connection) {
                return connection.incr(key);
            }
        });
    }

   ...

   public Observable<Map<K, V>> hgetall(K key) {
        return Observable.create(new RedisClusterFutureOnSubscribe<K, V, Map<K, V>>(connection) {

            @Override
            protected RedisFuture<Map<K, V>> createFuture(RedisClusterAsyncConnection<K, V> connection) {
                return connection.hgetall(key);
            }
        });
    }

    public Observable<List<Map<K, V>>> hgetall(List<K> keys) {
        return Observable.create(new RedisClusterMultipleFutureOnSubscribe<K, V, Map<K, V>>(connection) {

            @Override
            protected List<RedisFuture<Map<K, V>>> createFutures(RedisClusterAsyncConnection<K, V> connection) {
                return keys.stream()
                        .map(connection::hgetall)
                        .collect(toList());
            }
        });
    }

   ...

}

from lettuce-core.

mp911de avatar mp911de commented on September 13, 2024

RedisClusterMultipleFutureOnSubscribe passes future.getError() within the catch blocks (instead of the catched exception)
Here's the improved code:

public abstract class RedisClusterFutureOnSubscribe<K, V, T> implements Observable.OnSubscribe<T> {
    // TODO
    private static final Executor executor = Executors.newSingleThreadExecutor();
    private final RedisClusterAsyncConnection<K, V> connection;

    public RedisClusterFutureOnSubscribe(RedisClusterAsyncConnection<K, V> connection) {
        this.connection = connection;
    }

    @Override
    public void call(Subscriber<? super T> subscriber) {
        RedisFuture<T> future = createFuture(connection);
        future.addListener(() -> {
          if(subscriber.isUnsubscribed()) {
            return;
          }
            if (future.getError() != null) {
                  subscriber.onError(new RedisException(future.getError()));
            } else {
                try {
                      subscriber.onNext(future.get());
                      subscriber.onCompleted();
                } catch (InterruptedException | ExecutionException e) {
                      subscriber.onError(e);
                }
            }
        }, executor);
        future.await(5, TimeUnit.SECONDS);
    }

    protected abstract RedisFuture<T> createFuture(RedisClusterAsyncConnection<K, V> connection);
}
public abstract class RedisClusterMultipleFutureOnSubscribe<K, V, T> implements Observable.OnSubscribe<List<T>> {
    // TODO
    private static final Executor executor = Executors.newSingleThreadExecutor();
    private final RedisClusterAsyncConnection<K, V> connection;

    public RedisClusterMultipleFutureOnSubscribe(RedisClusterAsyncConnection<K, V> connection) {
        this.connection = connection;
    }

    @Override
    public void call(Subscriber<? super List<T>> subscriber) {
        List<RedisFuture<T>> futures = createFutures(connection);
        int size = futures.size();
        //noinspection unchecked
        List<T> responses = new ArrayList<>();
        AtomicInteger counter = new AtomicInteger(size);
        AtomicBoolean hadErrors = new AtomicBoolean(false);
        for (int i = 0; i < size; i++) {
            RedisFuture<T> future = futures.get(i);

            future.addListener(() -> {
                if (future.getError() != null) {
                    hadErrors.set(true);
                }
                if (subscriber.isUnsubscribed()) {
                    return;
                }

                if(future.getError() != null) {
                    subscriber.onError(new RedisException(future.getError()));
                }
                else {
                    try {
                        responses.add(future.get());
                    } catch (InterruptedException | ExecutionException e) {
                        hadErrors.set(true);
                        subscriber.onError(e);
                    }

                    if (counter.decrementAndGet() == 0  && !hadErrors.get()) {
                        subscriber.onNext(responses);
                        subscriber.onCompleted();
                    }
                }
                future.await(5, TimeUnit.SECONDS);
            }, executor);
        }
    }

    protected abstract List<RedisFuture<T>> createFutures(RedisClusterAsyncConnection<K, V> connection);
}

I noticed you're using Java 8. I would like to take your code to reshape it for a use that results in more compact "client" code. Something like:

  public Observable<List<Map<K, V>>> hgetall(List<K> keys) {
        return RedisFutureObservable.create(() -> 
                return keys.stream()
                        .map(connection::hgetall)
                        .collect(toList());
            });
    }

What do you think?

from lettuce-core.

mp911de avatar mp911de commented on September 13, 2024

You can sneek peek the lettuce 4.0 at:

https://oss.sonatype.org/content/repositories/snapshots/biz/paluch/redis/lettuce/4.0-SNAPSHOT/

<dependency>
  <groupId>biz.paluch.redis</groupId>
  <artifactId>lettuce</artifactId>
  <version>4.0-SNAPSHOT</version>
</dependency>

from lettuce-core.

mp911de avatar mp911de commented on September 13, 2024

@vleushin I refactored the code to simplify the use of RxJava with lettuce. You can find it at https://gist.github.com/mp911de/8664c7c7cae3cb4c0092
Kudos to you.

from lettuce-core.

vleushin avatar vleushin commented on September 13, 2024

Your code is OK except one (most) important thing:

This test illustrates problem with your code of RedisClusterMultipleFutureOnSubscribe

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class RedisClusterMultipleFutureOnSubscribeTest {

    @Mock
    RedisAsyncConnection<String, String> connection;

    @Mock
    RedisFuture<String> future1;

    @Mock
    RedisFuture<String> future2;

    @Mock
    RedisFuture<String> future3;

    @Captor
    ArgumentCaptor<Runnable> listener1Captor;

    @Captor
    ArgumentCaptor<Runnable> listener2Captor;

    @Captor
    ArgumentCaptor<Runnable> listener3Captor;

    @Test
    public void test() throws ExecutionException, InterruptedException {
        Observable.create(new RedisClusterMultipleFutureOnSubscribe<String, String, String>(connection) {

            @Override
            protected List<RedisFuture<String>> createFutures(RedisClusterAsyncConnection<String, String> connection) {
                return ImmutableList.of(future1, future2, future3);
            }
        }).subscribe(strings -> {
            assertThat(strings.get(0), equalTo("future1 result"));
            assertThat(strings.get(1), equalTo("future2 result"));
            assertThat(strings.get(2), equalTo("future3 result"));
        });

        verify(future1).addListener(listener1Captor.capture(), any());
        verify(future2).addListener(listener2Captor.capture(), any());
        verify(future3).addListener(listener3Captor.capture(), any());

        when(future1.get()).thenReturn("future1 result");
        when(future2.get()).thenReturn("future2 result");
        when(future3.get()).thenReturn("future3 result");

        // order is different here
        listener3Captor.getValue().run();
        listener2Captor.getValue().run();
        listener1Captor.getValue().run();
    }

}

Because order matters! I used array to collect result, and you use responses.add(future.get()); which depends on order in which listeners run. But listeners can fire in any order.

from lettuce-core.

vleushin avatar vleushin commented on September 13, 2024

Thanks, I will get my hands on your code today or tomorrow.

from lettuce-core.

mp911de avatar mp911de commented on September 13, 2024

You're right. I was not aware of the importance of order.

from lettuce-core.

mp911de avatar mp911de commented on September 13, 2024

I updated the code of my gist accordingly and used the array solution you proposed

from lettuce-core.

vleushin avatar vleushin commented on September 13, 2024

I'm back to lettuce and updating from 3.2.Final to 4.0.Final. Glad you went with Rx :)

from lettuce-core.

mp911de avatar mp911de commented on September 13, 2024

Appreciate your feedback.

from lettuce-core.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.