You can reach me at [email protected]
- ๐ญ Maintiner for Kreds, redis client for Kotlin.
- ๐ฌ Ask me about guitar, travel, photography and music.
A thread-safe, non-blocking, coroutine-based Redis client implementation for Kotlin
Home Page: https://crackthecodeabhi.github.io/kreds
License: Apache License 2.0
Hey,
I have recently found out that when trying to write something for example a JSON object will lead to the issue ERR syntax error
because not escaped.
And almost sure the key is not escaped as well, when sending arguments with double quotes this issue doesn't occur.
Thanks.
Trying to use this library with JDK 1.8 or 11 fails with the following error:
Execution failed for task ':compileKotlin'.
> Error while evaluating property 'filteredArgumentsMap' of task ':compileKotlin'
> Could not resolve all files for configuration ':compileClasspath'.
> Could not resolve io.github.crackthecodeabhi:kreds:0.7.
Required by:
project :
> No matching variant of io.github.crackthecodeabhi:kreds:0.7 was found. The consumer was configured to find an API of a library compatible with Java 11, preferably in the form of class files, preferably optimized for standard JVMs, and its dependencies declared externally, as well as attribute 'org.jetbrains.kotlin.platform.type' with value 'jvm' but:
- Variant 'apiElements' capability io.github.crackthecodeabhi:kreds:0.7 declares an API of a library, packaged as a jar, preferably optimized for standard JVMs, and its dependencies declared externally, as well as attribute 'org.jetbrains.kotlin.platform.type' with value 'jvm':
- Incompatible because this component declares a component compatible with Java 17 and the consumer needed a component compatible with Java 11
- Variant 'runtimeElements' capability io.github.crackthecodeabhi:kreds:0.7 declares a runtime of a library, packaged as a jar, preferably optimized for standard JVMs, and its dependencies declared externally, as well as attribute 'org.jetbrains.kotlin.platform.type' with value 'jvm':
- Incompatible because this component declares a component compatible with Java 17 and the consumer needed a component compatible with Java 11
Recently when trying to get a key the library failed with a NOAUTH error, however when the client was created, the auth method is invoked as it should be.
The key is set once, however, at a later point seems to fail further operations with the following error:
io.github.crackthecodeabhi.kreds.protocol.KredsRedisDataException: NOAUTH Authentication required.
at io.github.crackthecodeabhi.kreds.protocol.CommandProcessor.decode(Command.kt:70)
at io.github.crackthecodeabhi.kreds.connection.AbstractKredsClient$execute$suspendImpl$$inlined$withReentrantLock$2.invokeSuspend(ExclusiveObject.kt:49)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
In my case I'm only using redis with a password.
SetOption throws KredsRedisDataException every time I use ULong options. Example:
newClient(Endpoint.from("127.0.0.1:${redis.firstMappedPort}")).use { client ->
client.set("key", "val", SetOption.Builder(exSeconds = 30u).build())
...
ERR syntax error
io.github.crackthecodeabhi.kreds.protocol.KredsRedisDataException: ERR syntax error
at app//io.github.crackthecodeabhi.kreds.protocol.CommandProcessor.decode(Command.kt:69)
at app//io.github.crackthecodeabhi.kreds.connection.AbstractKredsClient$execute$suspendImpl$$inlined$withReentrantLock$2.invokeSuspend(ExclusiveObject.kt:49)
(Coroutine boundary)
at io.github.crackthecodeabhi.kreds.connection.AbstractKredsClient.execute$suspendImpl(Client.kt:151)
client.expire("key", 30u)
works without issues.
Library version: 0.7
Redis server: redis:6.2.7-alpine
Kotlin 1.6.10
Support SSL for Kreds.
Use netty SSL support to implement.
i added kreds to my project and now if i try to connect it cant:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Exception in thread "main" java.lang.NoClassDefFoundError: mu/KotlinLogging at io.github.crackthecodeabhi.kreds.connection.KonnectionImplKt.<clinit>(KonnectionImpl.kt:52) at io.github.crackthecodeabhi.kreds.connection.KonnectionImpl.connect$suspendImpl(KonnectionImpl.kt:164) at io.github.crackthecodeabhi.kreds.connection.KonnectionImpl$connect$1.invokeSuspend(KonnectionImpl.kt) at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108) at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:800) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:704) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:691) Caused by: java.lang.ClassNotFoundException: mu.KotlinLogging at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) ... 9 more
used dependencies:
implementation(kotlin("reflect"))
implementation("org.jetbrains.kotlin:kotlin-stdlib:1.9.21")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0-RC")
implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.5.0")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.2")
implementation("io.ktor:ktor-server-netty:2.3.7") {
exclude(group = "io.netty", module = "netty-all")
}
implementation("io.netty:netty-all:4.1.104.Final")
implementation("io.ktor:ktor-network:$ktorVersion")
implementation("org.jetbrains.exposed:exposed-core:0.45.0")
implementation("org.jetbrains.exposed:exposed-jdbc:0.45.0")
implementation("org.jetbrains.exposed:exposed-kotlin-datetime:0.45.0")
implementation("io.github.crackthecodeabhi:kreds:0.9.1")
Do you plan supporting Kotlin Multiplatform?
I think multiplatform communication might be implemented using https://github.com/korlibs/korio as base.
I just added the dependency and my ktor application won't run:
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J: Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J: Ignoring binding found at [jar:file:/Users/leon.latsch/.gradle/caches/modules-2/files-2.1/ch.qos.logback/logback-classic/1.2.11/4741689214e9d1e8408b206506cbe76d1c6a7d60/logback-classic-1.2.11.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
Did I miss any step in the setup?
Hi
I want to use this Redis client to connect Redis cluster on K8s.
Do I need to set some configuration?
for example by using quarkus.redis,.client, I have to add configuration properties
like : quarkus.redis.client-type = cluster
thank you
Really, one example is too little for the library. I'll be grateful if you add some extra examples..
Please implement the possibility to connect to redis sentinel setup
How to auth the SubscriberClient? It doesn't implement auth() method
Using kreds occasionally generates the following error:
[2023-03-03 04:11:12] [nioEventLoopGroup-2-1] [io.netty.util.ResourceLeakDetector] ERROR - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.AbstractByteBufAllocator.compositeDirectBuffer(AbstractByteBufAllocator.java:224)
io.netty.buffer.AbstractByteBufAllocator.compositeBuffer(AbstractByteBufAllocator.java:202)
io.netty.handler.codec.MessageAggregator.decode(MessageAggregator.java:269)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.base/java.lang.Thread.run(Thread.java:831)
Hello,
Redis can listen on unix sockets, but currently kreds can only access redis through a port.
Is it planned?
Kind regards
Jonathan
It would be great if there was support for RedisJSON commands
The Pipeline::zrange()
command always returns scores regardless of the value passed to the withScores
parameter.
This is most likely due to the following line which fails to check the actual value of withScores
and always executes with WITHSCORES
as an argument.
Looks like the branch that should be handling this is untouched for 11 months now and conflicts with the main codebase as well. I've not seen any signs of life around using cluster with kreds. Is it safe to assume that cluster support is dead and that I need to look for a different solution or is there something else I can do that doesn't involve rewriting code?
Hi - just came across your library. Can you create a connection or pool of connections (once only) and use it/them throughout a program and if so what are the locking considerations (if any) would need to be taken into consideration?
Hello,
I have tried to replicate a bare bones version of this library so that i can quickly extend and test new functionality like Streaming, SSL and AUTH.
The issue is, I'm observing unsafe access to the read channel when multiple clients are both reading and writing. Clients are reading responses from the writes (XADD, EXPIRE) instead of the XREAD, INFO etc.
Here is my bare bones code. Hopefully I have just copied something wrong / missed something, and it is not an underlying issue with the implementation ๐ค
@Suppress("TooGenericExceptionThrown", "TooManyFunctions")
@Service
class NettyRedisClient(
@Value("\${redis.host}") private val host: String,
@Value("\${redis.port}") private val port: Int,
@Value("\${redis.password}") private val password: String
) : ExclusiveObject, DisposableBean {
private val logger = KotlinLogging.logger {}
final override val mutex: Mutex = Mutex()
override val key: ReentrantMutexContextKey = ReentrantMutexContextKey(mutex)
private val group = NioEventLoopGroup()
private val bootstrap = Bootstrap()
.group(group)
.remoteAddress(host, port)
.option(ChannelOption.SO_KEEPALIVE, true)
.channel(NioSocketChannel::class.java)
val sslContext: SslContext = SslContextBuilder
.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build()
private var writeChannel: SocketChannel? = null
private var readChannel: KChannel<RedisMessage>? = null
fun pipelined(): Pipelined = Pipelined(this)
suspend fun connect() = withReentrantLock {
if (!isConnected()) {
val newReadChannel = KChannel<RedisMessage>(KChannel.UNLIMITED)
val newWriteChannel = bootstrap
.handler(LoggingHandler(LogLevel.INFO))
.handler(channelInitializer(newReadChannel))
.connect()
.suspendableAwait() as SocketChannel
readChannel = newReadChannel
writeChannel = newWriteChannel
auth(newWriteChannel)
}
}
private fun channelInitializer(newReadChannel: KChannel<RedisMessage>): ChannelInitializer<SocketChannel> {
return object : ChannelInitializer<SocketChannel>() {
override fun initChannel(channel: SocketChannel) {
val pipeline: ChannelPipeline = channel.pipeline()
pipeline.addLast(sslContext.newHandler(channel.alloc(), host, port))
pipeline.addLast(RedisDecoder())
pipeline.addLast(RedisBulkStringAggregator())
pipeline.addLast(RedisArrayAggregator())
pipeline.addLast(RedisEncoder())
pipeline.addLast(commandHandler(newReadChannel))
}
}
}
private fun commandHandler(newReadChannel: KChannel<RedisMessage>) = object : ChannelDuplexHandler() {
override fun write(
handlerContext: ChannelHandlerContext,
message: Any,
promise: ChannelPromise
) {
val commands = (message as String)
.trim()
.split(Regex("\\s+"))
.map { command ->
FullBulkStringRedisMessage(
ByteBufUtil.writeUtf8(
handlerContext.alloc(),
command
)
)
}
val request: RedisMessage = ArrayRedisMessage(commands)
handlerContext.write(request, promise)
}
override fun channelRead(handlerContext: ChannelHandlerContext, message: Any) {
message as RedisMessage
newReadChannel.trySend(message)
}
override fun exceptionCaught(handlerContext: ChannelHandlerContext, cause: Throwable) {
handlerContext.close()
newReadChannel.close(cause)
}
}
suspend fun executeCommands(commands: List<String>): List<RedisMessage> = withReentrantLock {
connect()
commands.forEach {
write(it)
}
flush()
commands.map {
read()
}
}
suspend fun executeCommand(command: String): RedisMessage = withReentrantLock {
connect()
writeAndFlush(command)
read()
}
suspend fun info(): String {
return decode(executeCommand("INFO"))
}
suspend fun flushAll(): String {
return decode(executeCommand("FLUSHALL"))
}
suspend fun dbSize(): Long {
return decode(executeCommand("DBSIZE")).toLong()
}
suspend fun xread(streamNames: List<String>, block: Long?): String {
val streamNamesString = streamNames.joinToString(" ")
val streamOffsetsString = streamNames.joinToString(" ") { "0-0" }
val string = "$streamNamesString $streamOffsetsString"
val command = if (block == null) {
"XREAD STREAMS $string"
} else {
"XREAD BLOCK $block STREAMS $string"
}
return decode(executeCommand((command)))
}
private suspend fun auth(writeChannel: SocketChannel) = withReentrantLock {
writeChannel.writeAndFlush("AUTH $password")
val response = (read() as SimpleStringRedisMessage).content()
if (response == "OK") {
logger.info("AUTH successful")
} else {
throw RuntimeException("AUTH failed")
}
}
private suspend fun isConnected(): Boolean = withReentrantLock {
if (writeChannel == null || readChannel == null) {
false
} else {
writeChannel!!.isActive
}
}
private suspend fun write(message: String): Unit = withReentrantLock {
if (!isConnected()) {
throw RuntimeException("Not yet connected")
} else {
writeChannel!!.write(message)
}
}
private suspend fun writeAndFlush(message: String): Unit = withReentrantLock {
if (!isConnected()) {
throw RuntimeException("Not yet connected")
} else {
writeChannel!!.writeAndFlush(message)
}
}
private suspend fun flush(): Unit = withReentrantLock {
if (!isConnected()) {
throw RuntimeException("Not yet connected")
} else {
writeChannel!!.flush()
}
}
private suspend fun read(): RedisMessage = withReentrantLock {
if (!isConnected()) {
throw RuntimeException("Not yet connected")
} else {
readChannel!!.receive()
}
}
override fun destroy() {
runBlocking {
withReentrantLock {
readChannel?.close()
writeChannel?.close()
group.shutdownGracefully()
}
}
}
}
@Suppress("TooGenericExceptionThrown")
class Pipelined(private val client: NettyRedisClient) : ExclusiveObject {
override val mutex: Mutex = Mutex()
override val key: ReentrantMutexContextKey = ReentrantMutexContextKey(mutex)
private var done = false
private val responseFlow = MutableSharedFlow<List<String>>(1)
private val sharedResponseFlow: Flow<List<String>> = responseFlow.asSharedFlow()
private val commands = mutableListOf<String>()
private val commandResponse = mutableListOf<String>()
suspend fun xadd(streamName: String, keyValues: Map<String, String>): Response {
val keyValuesString = keyValues.map { "${it.key} ${it.value}" }.joinToString(" ")
val command = "XADD $streamName * $keyValuesString"
return add(command)
}
suspend fun expire(streamName: String, seconds: Long): Response {
return add("EXPIRE $streamName $seconds")
}
suspend fun execute(): Unit = withReentrantLock {
if (!done) {
commandResponse.addAll(executePipeline(commands))
done = true
responseFlow.tryEmit(commandResponse.toMutableList())
}
}
private suspend fun add(command: String): Response = withReentrantLock {
commands.add(command)
Response(sharedResponseFlow, commands.lastIndex)
}
private suspend fun executePipeline(commands: List<String>): List<String> = withReentrantLock {
val responseMessages = client.executeCommands(commands)
responseMessages.map { message ->
decode(message)
}
}
}
@Suppress("TooGenericExceptionThrown")
internal fun decode(message: RedisMessage): String {
return when (message) {
is ErrorRedisMessage -> message.content()
is SimpleStringRedisMessage -> message.content()
is IntegerRedisMessage -> message.value().toString()
is FullBulkStringRedisMessage -> {
if (message.isNull) {
throw RuntimeException("Stream response is null")
} else {
message.content().toString(Charset.defaultCharset())
}
}
is ArrayRedisMessage -> {
message.children().joinToString(" ") { child ->
decode(child)
}
}
else -> throw NotImplementedError("Message type not implemented")
}
}
@Suppress("TooGenericExceptionThrown")
class Response internal constructor(
private val flow: Flow<List<String>>,
private val index: Int
) {
suspend operator fun invoke(): String {
return flow.first().ifEmpty { throw RuntimeException("Operation was cancelled.") }[index]
}
suspend fun get(): String = invoke()
}
internal interface ExclusiveObject {
val mutex: Mutex
val key: ReentrantMutexContextKey
}
data class ReentrantMutexContextKey(val mutex: Mutex) : CoroutineContext.Key<ReentrantMutexContextElement>
internal class ReentrantMutexContextElement(override val key: ReentrantMutexContextKey) : CoroutineContext.Element
internal suspend inline fun <R> ExclusiveObject.withReentrantLock(crossinline block: suspend () -> R): R {
if (coroutineContext[key] != null) return block()
return withContext(ReentrantMutexContextElement(key)) {
[email protected] {
block()
}
}
}
internal suspend fun ChannelFuture.suspendableAwait(): Channel {
return suspendCoroutine { continuation ->
addListener(object : ChannelFutureListener {
override fun operationComplete(future: ChannelFuture) {
if (future.isDone && future.isSuccess) {
continuation.resume(future.channel())
} else {
continuation.resumeWithException(future.cause())
}
}
})
}
}
A performance benchmark against other widely used Redis clients for JVM.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.