Giter VIP home page Giter VIP logo

kreds's Introduction

Who am i?

Engineer with 9+ Years of experience in Full-Stack Software Development and Devops Engineering

You can reach me at [email protected]

  • ๐Ÿ”ญ Maintiner for Kreds, redis client for Kotlin.
  • ๐Ÿ’ฌ Ask me about guitar, travel, photography and music.

kreds's People

Contributors

crackthecodeabhi avatar dependabot[bot] avatar fossabot avatar gitter-badger avatar neagogu avatar vytskalt avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

kreds's Issues

Arguments not being escaped causing ERR syntax error

Hey,

I have recently found out that when trying to write something for example a JSON object will lead to the issue ERR syntax error because not escaped.

And almost sure the key is not escaped as well, when sending arguments with double quotes this issue doesn't occur.

Thanks.

It's not possible to use kreds with a JDK less than 17

Trying to use this library with JDK 1.8 or 11 fails with the following error:

Execution failed for task ':compileKotlin'.
> Error while evaluating property 'filteredArgumentsMap' of task ':compileKotlin'
   > Could not resolve all files for configuration ':compileClasspath'.
      > Could not resolve io.github.crackthecodeabhi:kreds:0.7.
        Required by:
            project :
         > No matching variant of io.github.crackthecodeabhi:kreds:0.7 was found. The consumer was configured to find an API of a library compatible with Java 11, preferably in the form of class files, preferably optimized for standard JVMs, and its dependencies declared externally, as well as attribute 'org.jetbrains.kotlin.platform.type' with value 'jvm' but:
             - Variant 'apiElements' capability io.github.crackthecodeabhi:kreds:0.7 declares an API of a library, packaged as a jar, preferably optimized for standard JVMs, and its dependencies declared externally, as well as attribute 'org.jetbrains.kotlin.platform.type' with value 'jvm':
                 - Incompatible because this component declares a component compatible with Java 17 and the consumer needed a component compatible with Java 11
             - Variant 'runtimeElements' capability io.github.crackthecodeabhi:kreds:0.7 declares a runtime of a library, packaged as a jar, preferably optimized for standard JVMs, and its dependencies declared externally, as well as attribute 'org.jetbrains.kotlin.platform.type' with value 'jvm':
                 - Incompatible because this component declares a component compatible with Java 17 and the consumer needed a component compatible with Java 11

Auth issues randomly appearing

Recently when trying to get a key the library failed with a NOAUTH error, however when the client was created, the auth method is invoked as it should be.

The key is set once, however, at a later point seems to fail further operations with the following error:

io.github.crackthecodeabhi.kreds.protocol.KredsRedisDataException: NOAUTH Authentication required.
        at io.github.crackthecodeabhi.kreds.protocol.CommandProcessor.decode(Command.kt:70)
        at io.github.crackthecodeabhi.kreds.connection.AbstractKredsClient$execute$suspendImpl$$inlined$withReentrantLock$2.invokeSuspend(ExclusiveObject.kt:49)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
        at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)

In my case I'm only using redis with a password.

ERR syntax error on SetOption ULong options

SetOption throws KredsRedisDataException every time I use ULong options. Example:

newClient(Endpoint.from("127.0.0.1:${redis.firstMappedPort}")).use { client ->
    client.set("key", "val", SetOption.Builder(exSeconds = 30u).build())
...
ERR syntax error
io.github.crackthecodeabhi.kreds.protocol.KredsRedisDataException: ERR syntax error
	at app//io.github.crackthecodeabhi.kreds.protocol.CommandProcessor.decode(Command.kt:69)
	at app//io.github.crackthecodeabhi.kreds.connection.AbstractKredsClient$execute$suspendImpl$$inlined$withReentrantLock$2.invokeSuspend(ExclusiveObject.kt:49)
	(Coroutine boundary)
	at io.github.crackthecodeabhi.kreds.connection.AbstractKredsClient.execute$suspendImpl(Client.kt:151)

client.expire("key", 30u) works without issues.

Library version: 0.7
Redis server: redis:6.2.7-alpine
Kotlin 1.6.10

SSL Support

Support SSL for Kreds.

Use netty SSL support to implement.

Error with SLF4J when connecting

i added kreds to my project and now if i try to connect it cant:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Exception in thread "main" java.lang.NoClassDefFoundError: mu/KotlinLogging at io.github.crackthecodeabhi.kreds.connection.KonnectionImplKt.<clinit>(KonnectionImpl.kt:52) at io.github.crackthecodeabhi.kreds.connection.KonnectionImpl.connect$suspendImpl(KonnectionImpl.kt:164) at io.github.crackthecodeabhi.kreds.connection.KonnectionImpl$connect$1.invokeSuspend(KonnectionImpl.kt) at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108) at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:800) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:704) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:691) Caused by: java.lang.ClassNotFoundException: mu.KotlinLogging at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) ... 9 more

used dependencies:

implementation(kotlin("reflect"))

implementation("org.jetbrains.kotlin:kotlin-stdlib:1.9.21")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0-RC")
implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.5.0")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.2")

 implementation("io.ktor:ktor-server-netty:2.3.7") {
     exclude(group = "io.netty", module = "netty-all")
}
implementation("io.netty:netty-all:4.1.104.Final")
implementation("io.ktor:ktor-network:$ktorVersion")

implementation("org.jetbrains.exposed:exposed-core:0.45.0")
implementation("org.jetbrains.exposed:exposed-jdbc:0.45.0")
implementation("org.jetbrains.exposed:exposed-kotlin-datetime:0.45.0")

implementation("io.github.crackthecodeabhi:kreds:0.9.1")

Error with SLF4J when adding Kreds to my project

I just added the dependency and my ktor application won't run:

SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J: Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J: Ignoring binding found at [jar:file:/Users/leon.latsch/.gradle/caches/modules-2/files-2.1/ch.qos.logback/logback-classic/1.2.11/4741689214e9d1e8408b206506cbe76d1c6a7d60/logback-classic-1.2.11.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.

Did I miss any step in the setup?

How to set to connect Redis cluster

Hi
I want to use this Redis client to connect Redis cluster on K8s.
Do I need to set some configuration?
for example by using quarkus.redis,.client, I have to add configuration properties
like : quarkus.redis.client-type = cluster

thank you

More examples is required

Really, one example is too little for the library. I'll be grateful if you add some extra examples..

Error with netty

Using kreds occasionally generates the following error:

[2023-03-03 04:11:12] [nioEventLoopGroup-2-1] [io.netty.util.ResourceLeakDetector] ERROR - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
	io.netty.buffer.AbstractByteBufAllocator.compositeDirectBuffer(AbstractByteBufAllocator.java:224)
	io.netty.buffer.AbstractByteBufAllocator.compositeBuffer(AbstractByteBufAllocator.java:202)
	io.netty.handler.codec.MessageAggregator.decode(MessageAggregator.java:269)
	io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:831)

Unix sockets

Hello,

Redis can listen on unix sockets, but currently kreds can only access redis through a port.
Is it planned?

Kind regards
Jonathan

Is cluster support dead?

Looks like the branch that should be handling this is untouched for 11 months now and conflicts with the main codebase as well. I've not seen any signs of life around using cluster with kreds. Is it safe to assume that cluster support is dead and that I need to look for a different solution or is there something else I can do that doesn't involve rewriting code?

Scaling...

Hi - just came across your library. Can you create a connection or pool of connections (once only) and use it/them throughout a program and if so what are the locking considerations (if any) would need to be taken into consideration?

Shared mutable state and concurrency

Hello,

I have tried to replicate a bare bones version of this library so that i can quickly extend and test new functionality like Streaming, SSL and AUTH.
The issue is, I'm observing unsafe access to the read channel when multiple clients are both reading and writing. Clients are reading responses from the writes (XADD, EXPIRE) instead of the XREAD, INFO etc.

Here is my bare bones code. Hopefully I have just copied something wrong / missed something, and it is not an underlying issue with the implementation ๐Ÿคž

Bare bones code
@Suppress("TooGenericExceptionThrown", "TooManyFunctions")
@Service
class NettyRedisClient(
    @Value("\${redis.host}") private val host: String,
    @Value("\${redis.port}") private val port: Int,
    @Value("\${redis.password}") private val password: String
) : ExclusiveObject, DisposableBean {
    private val logger = KotlinLogging.logger {}

    final override val mutex: Mutex = Mutex()
    override val key: ReentrantMutexContextKey = ReentrantMutexContextKey(mutex)

    private val group = NioEventLoopGroup()
    private val bootstrap = Bootstrap()
        .group(group)
        .remoteAddress(host, port)
        .option(ChannelOption.SO_KEEPALIVE, true)
        .channel(NioSocketChannel::class.java)

    val sslContext: SslContext = SslContextBuilder
        .forClient()
        .trustManager(InsecureTrustManagerFactory.INSTANCE)
        .build()

    private var writeChannel: SocketChannel? = null
    private var readChannel: KChannel<RedisMessage>? = null

    fun pipelined(): Pipelined = Pipelined(this)

    suspend fun connect() = withReentrantLock {
        if (!isConnected()) {
            val newReadChannel = KChannel<RedisMessage>(KChannel.UNLIMITED)
            val newWriteChannel = bootstrap
                .handler(LoggingHandler(LogLevel.INFO))
                .handler(channelInitializer(newReadChannel))
                .connect()
                .suspendableAwait() as SocketChannel

            readChannel = newReadChannel
            writeChannel = newWriteChannel

            auth(newWriteChannel)
        }
    }

    private fun channelInitializer(newReadChannel: KChannel<RedisMessage>): ChannelInitializer<SocketChannel> {
        return object : ChannelInitializer<SocketChannel>() {
            override fun initChannel(channel: SocketChannel) {
                val pipeline: ChannelPipeline = channel.pipeline()
                pipeline.addLast(sslContext.newHandler(channel.alloc(), host, port))
                pipeline.addLast(RedisDecoder())
                pipeline.addLast(RedisBulkStringAggregator())
                pipeline.addLast(RedisArrayAggregator())
                pipeline.addLast(RedisEncoder())
                pipeline.addLast(commandHandler(newReadChannel))
            }
        }
    }

    private fun commandHandler(newReadChannel: KChannel<RedisMessage>) = object : ChannelDuplexHandler() {
        override fun write(
            handlerContext: ChannelHandlerContext,
            message: Any,
            promise: ChannelPromise
        ) {
            val commands = (message as String)
                .trim()
                .split(Regex("\\s+"))
                .map { command ->
                    FullBulkStringRedisMessage(
                        ByteBufUtil.writeUtf8(
                            handlerContext.alloc(),
                            command
                        )
                    )
                }
            val request: RedisMessage = ArrayRedisMessage(commands)
            handlerContext.write(request, promise)
        }

        override fun channelRead(handlerContext: ChannelHandlerContext, message: Any) {
            message as RedisMessage
            newReadChannel.trySend(message)
        }

        override fun exceptionCaught(handlerContext: ChannelHandlerContext, cause: Throwable) {
            handlerContext.close()
            newReadChannel.close(cause)
        }
    }

    suspend fun executeCommands(commands: List<String>): List<RedisMessage> = withReentrantLock {
        connect()

        commands.forEach {
            write(it)
        }

        flush()

        commands.map {
            read()
        }
    }

    suspend fun executeCommand(command: String): RedisMessage = withReentrantLock {
        connect()
        writeAndFlush(command)
        read()
    }

    suspend fun info(): String {
        return decode(executeCommand("INFO"))
    }

    suspend fun flushAll(): String {
        return decode(executeCommand("FLUSHALL"))
    }

    suspend fun dbSize(): Long {
        return decode(executeCommand("DBSIZE")).toLong()
    }

    suspend fun xread(streamNames: List<String>, block: Long?): String {
        val streamNamesString = streamNames.joinToString(" ")
        val streamOffsetsString = streamNames.joinToString(" ") { "0-0" }
        val string = "$streamNamesString $streamOffsetsString"
        val command = if (block == null) {
            "XREAD STREAMS $string"
        } else {
            "XREAD BLOCK $block STREAMS $string"
        }

        return decode(executeCommand((command)))
    }

    private suspend fun auth(writeChannel: SocketChannel) = withReentrantLock {
        writeChannel.writeAndFlush("AUTH $password")
        val response = (read() as SimpleStringRedisMessage).content()
        if (response == "OK") {
            logger.info("AUTH successful")
        } else {
            throw RuntimeException("AUTH failed")
        }
    }

    private suspend fun isConnected(): Boolean = withReentrantLock {
        if (writeChannel == null || readChannel == null) {
            false
        } else {
            writeChannel!!.isActive
        }
    }

    private suspend fun write(message: String): Unit = withReentrantLock {
        if (!isConnected()) {
            throw RuntimeException("Not yet connected")
        } else {
            writeChannel!!.write(message)
        }
    }

    private suspend fun writeAndFlush(message: String): Unit = withReentrantLock {
        if (!isConnected()) {
            throw RuntimeException("Not yet connected")
        } else {
            writeChannel!!.writeAndFlush(message)
        }
    }

    private suspend fun flush(): Unit = withReentrantLock {
        if (!isConnected()) {
            throw RuntimeException("Not yet connected")
        } else {
            writeChannel!!.flush()
        }
    }

    private suspend fun read(): RedisMessage = withReentrantLock {
        if (!isConnected()) {
            throw RuntimeException("Not yet connected")
        } else {
            readChannel!!.receive()
        }
    }

    override fun destroy() {
        runBlocking {
            withReentrantLock {
                readChannel?.close()
                writeChannel?.close()
                group.shutdownGracefully()
            }
        }
    }
}

@Suppress("TooGenericExceptionThrown")
class Pipelined(private val client: NettyRedisClient) : ExclusiveObject {
    override val mutex: Mutex = Mutex()
    override val key: ReentrantMutexContextKey = ReentrantMutexContextKey(mutex)

    private var done = false
    private val responseFlow = MutableSharedFlow<List<String>>(1)
    private val sharedResponseFlow: Flow<List<String>> = responseFlow.asSharedFlow()
    private val commands = mutableListOf<String>()
    private val commandResponse = mutableListOf<String>()

    suspend fun xadd(streamName: String, keyValues: Map<String, String>): Response {
        val keyValuesString = keyValues.map { "${it.key} ${it.value}" }.joinToString(" ")
        val command = "XADD $streamName * $keyValuesString"

        return add(command)
    }

    suspend fun expire(streamName: String, seconds: Long): Response {
        return add("EXPIRE $streamName $seconds")
    }

    suspend fun execute(): Unit = withReentrantLock {
        if (!done) {
            commandResponse.addAll(executePipeline(commands))
            done = true
            responseFlow.tryEmit(commandResponse.toMutableList())
        }
    }

    private suspend fun add(command: String): Response = withReentrantLock {
        commands.add(command)
        Response(sharedResponseFlow, commands.lastIndex)
    }

    private suspend fun executePipeline(commands: List<String>): List<String> = withReentrantLock {
        val responseMessages = client.executeCommands(commands)

        responseMessages.map { message ->
            decode(message)
        }
    }
}

@Suppress("TooGenericExceptionThrown")
internal fun decode(message: RedisMessage): String {
    return when (message) {
        is ErrorRedisMessage -> message.content()
        is SimpleStringRedisMessage -> message.content()
        is IntegerRedisMessage -> message.value().toString()
        is FullBulkStringRedisMessage -> {
            if (message.isNull) {
                throw RuntimeException("Stream response is null")
            } else {
                message.content().toString(Charset.defaultCharset())
            }
        }

        is ArrayRedisMessage -> {
            message.children().joinToString(" ") { child ->
                decode(child)
            }
        }

        else -> throw NotImplementedError("Message type not implemented")
    }
}

@Suppress("TooGenericExceptionThrown")
class Response internal constructor(
    private val flow: Flow<List<String>>,
    private val index: Int
) {
    suspend operator fun invoke(): String {
        return flow.first().ifEmpty { throw RuntimeException("Operation was cancelled.") }[index]
    }

    suspend fun get(): String = invoke()
}

internal interface ExclusiveObject {
    val mutex: Mutex
    val key: ReentrantMutexContextKey
}

data class ReentrantMutexContextKey(val mutex: Mutex) : CoroutineContext.Key<ReentrantMutexContextElement>
internal class ReentrantMutexContextElement(override val key: ReentrantMutexContextKey) : CoroutineContext.Element

internal suspend inline fun <R> ExclusiveObject.withReentrantLock(crossinline block: suspend () -> R): R {
    if (coroutineContext[key] != null) return block()

    return withContext(ReentrantMutexContextElement(key)) {
        [email protected] {
            block()
        }
    }
}

internal suspend fun ChannelFuture.suspendableAwait(): Channel {
    return suspendCoroutine { continuation ->
        addListener(object : ChannelFutureListener {
            override fun operationComplete(future: ChannelFuture) {
                if (future.isDone && future.isSuccess) {
                    continuation.resume(future.channel())
                } else {
                    continuation.resumeWithException(future.cause())
                }
            }
        })
    }
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.