hdt3213 / godis Goto Github PK
View Code? Open in Web Editor NEWA Golang implemented Redis Server and Cluster. Go 语言实现的 Redis 服务器和分布式集群
Home Page: https://www.cnblogs.com/Finley/category/1598973.html
License: GNU General Public License v3.0
A Golang implemented Redis Server and Cluster. Go 语言实现的 Redis 服务器和分布式集群
Home Page: https://www.cnblogs.com/Finley/category/1598973.html
License: GNU General Public License v3.0
for example node A do INCR X
, and node B INCR X
also, will they receive different value (first one get 1, second one get 2) or there can be race condition?
版本:发布版本1.2.8
redis.conf配置如下
`
bind 0.0.0.0
port 6399
maxclients 128
appendonly no
appendfilename appendonly.aof
dbfilename test.rdb
`
当前目录没有生成
appendonly.aof 和 test.rdb 文件
for {
// may occurs: client EOF, client timeout, server early close
msg, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
logger.Info("connection close")
h.activeConn.Delete(client)
} else {
logger.Warn(err)
}
return
}
client.Waiting.Add(1)
//logger.Info("sleeping")
//time.Sleep(10 * time.Second)
b := []byte(msg)
_, _ = conn.Write(b)
client.Waiting.Done()
}
在echo_test中,客户端调用了close(),会走到EOF分支,但是这个时候服务端好像没有调用Conn.Close(),仅仅是从Map中移除
这样貌似导致被动关闭方卡在Close wait状态了?
https://www.cnblogs.com/Finley/p/12590718.html
您的系列文章中的第三章,设计lock_map中说的给哈希槽上锁,哈希槽是不是就是Shard?
给哈希槽上锁与Shard类型中的锁,有什么区别?
第一次执行set key value ex 10000,那么key设置成功,并且ttl为10000秒,
过几秒接下来执行set key value nx ex 20000,会发现虽然命令返回nil,但是该key的ttl已经被改成了20000。
bug的问题就出现在如下代码(见注释)
if ttl != unlimitedTTL {
expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond)
db.Expire(key, expireTime) // 可以看到不管怎么样,只要有ex,这里key的ttl就会被更新
db.addAof(CmdLine{
[]byte("SET"),
args[0],
args[1],
})
db.addAof(aof.MakeExpireCmd(key, expireTime).Args)
} else if result > 0 {
db.Persist(key) // override ttl
db.addAof(utils.ToCmdLine3("set", args...))
} else {
db.addAof(utils.ToCmdLine3("set", args...))
}
建议改成如下
if ttl != unlimitedTTL {
// 如果有nx命令,并且set设置失败,则不更新ttl,直接返回nil了
if policy == insertPolicy && result == 0 {
return reply.MakeNullBulkReply()
}
db.Persist(key)
expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond)
db.Expire(key, expireTime) // 可以看到不管怎么样,只要有ex,这里key的ttl就会被更新
db.addAof(CmdLine{
[]byte("SET"),
args[0],
args[1],
})
db.addAof(aof.MakeExpireCmd(key, expireTime).Args)
} else if result > 0 {
db.Persist(key) // override ttl
db.addAof(utils.ToCmdLine3("set", args...))
} else {
db.Persist(key)
db.addAof(utils.ToCmdLine3("set", args...))
}
I want to help with this project, but don't know what to start with. It will be cool to have some easy tasks to start with, to understand how things works.
keys error:
无法加载键:Scan command not supported by redis-server
如题,调用 dict.MakeConcurrent 创建 ConcurrentDict 时,会先用该函数计算shardCount,但是这个函数的算法没明白时啥意思,作者能辛苦解惑一下吗?谢谢
在测试client_test.go时,发现发送Ping
消息能够收到Pong
。但是似乎并没有看到启动server的代码。我在sys.go的Ping
方法打了个断点,发现并没有进入,看了下代码似乎只有调用sys.go的Ping
方法才会返回&reply.PongReply{}
。所以我有点疑惑这个Pong是什么时候写到client.conn里的呢?
(菜鸡顺便求个godis交流群)
Line 71 in bf21b9f
在博客中看到waitDone.Add(1)位置是在协程里面调用的
redis/client/client.go
// Start starts asynchronous goroutines
func (client *Client) Start() {
client.ticker = time.NewTicker(10 * time.Second)
go client.handleWrite()
go func() {
err := client.handleRead() // 不需要处理err
if err != nil {
logger.Error(err)
}
}()
go client.heartbeat()
}
// 实际函数中没有返回error
func (client *Client) handleRead() error {
ch := parser.ParseStream(client.conn)
for payload := range ch {
if payload.Err != nil {
client.finishRequest(reply.MakeErrReply(payload.Err.Error()))
continue
}
client.finishRequest(payload.Data)
}
return nil
}
是否可以写成
// Start starts asynchronous goroutines
func (client *Client) Start() {
client.ticker = time.NewTicker(10 * time.Second)
go client.handleWrite()
go client.handleRead()
go client.heartbeat()
}
// 实际函数中没有返回error
func (client *Client) handleRead() {
ch := parser.ParseStream(client.conn)
for payload := range ch {
if payload.Err != nil {
client.finishRequest(reply.MakeErrReply(payload.Err.Error()))
continue
}
client.finishRequest(payload.Data)
}
}
func parseBulkHeader(msg []byte, state *readState) error {
var err error
state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
if err != nil {
return errors.New("protocol error: " + string(msg))
}
if state.bulkLen == -1 { // null bulk
return nil
} else if state.bulkLen > 0 {
state.msgType = msg[0]
state.readingMultiLine = true
state.expectedArgsCount = 1
state.args = make([][]byte, 0, 1)
return nil
} else {
return errors.New("protocol error: " + string(msg))
}
}
想问下什么情况下会是 state.bulkLen == -1
大佬,您博客中h.activeConn.Delete(conn)应该改为h.activeConn.Delete(client)才对吧
谢谢
// WaitWithTimeout blocks until the WaitGroup counter is zero or timeout
// returns true if timeout
func (w *Wait) WaitWithTimeout(timeout time.Duration) bool {
c := make(chan bool, 1)
go func() {
defer close(c)
w.wg.Wait()
c <- true
}()
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}
w.wg.Wait()
换成w.Wait()
会不会更配套点hh
func(sortedSet *SortedSet) Add(member string, score float64) bool {
element, ok := sortedSet.dict[member]
sortedSet.dict[member] = &Element{
Member: member,
Score: score,
}
if ok {
if score != element.Score {
sortedSet.skiplist.remove(member, score)
sortedSet.skiplist.insert(member, score)
}
return false
}
sortedSet.skiplist.insert(member, score)
return true
}
应改为
func (sortedSet *SortedSet) Add(member string, score float64) bool {
element, ok := sortedSet.dict[member]
sortedSet.dict[member] = &Element{
Member: member,
Score: score,
}
if ok {
if score != element.Score {
sortedSet.skiplist.remove(member, element.Score)
sortedSet.skiplist.insert(member, score)
}
return false
}
sortedSet.skiplist.insert(member, score)
return true
}
提pull有点麻烦,就在这里说下了
redis-benchmark 性能测试默认需要使用
1、start中会开启一个协程来handleRead
2、在dorequest的时候如果出现handleConnectionError的情况,那么很显然是网络出现了错误,故1中的handleRead肯定也会从ch中收到网络err,并执行finishRequest,此时执行finishRequest会阻塞在request := <-client.waitingReqs这一步(因为waitingReqs ch中没有值)
3、如果2中出现handleConnectionError后,会重试,重试如果成功后将request往client.waitingReqs中送,那么这个request最后会和2中err Reply绑定,这好像会导致重试成功后的reply无法绑定?
可能我的理解有所偏差,麻烦大佬帮我解答,感谢!
// Write sends response to client over tcp connection
func (c *Connection) Write(b []byte) error {
if len(b) == 0 {
return nil
}
c.mu.Lock()
c.waitingReply.Add(1)
defer func() {
c.waitingReply.Done()
c.mu.Unlock()
}()
_, err := c.conn.Write(b)
return err
}
部分GUI客户端 会请求这个命令 如RDM
ERR unknown command 'info'
Hi, do you support replication of data in cluster mode for data redundancy?
描述:
并发打印日志时,会出现日志级别与设置不符合情况
详情:
查看源码时, 发现 lib/logger.go 文件中对新增日志是使用 Info(v interface{}) Debug(v interface{})等函数
但是日志级别是在每个函数中调用全局的setPrefix(level logLevel) , 该函数修改全局变量logger的prefix配置;
log.Logger.SetPrefix函数每次修改都会加锁(超大并发性能是否会影响) :
这里的log.Logger.SetPrefix加了锁,但是锁的范围是setPrefix函数
Info日志函数后面的 logger.Println(v...) 并没有在锁中:
简单思考
是否可以每个日志级别单独使用一个 log.Logger
是否可以将prefix与全局的logger对象解绑
Settings对象中是否增加CallerDepth可配置项(如上结果图,我这里文件显示层级有问题)
i read from here https://www.cnblogs.com/Finley/p/14028402.html
i am confused that i think pipeline seems doesn't work that way.
这里的读锁加在循环里面肯定拦不住对shard的写请求,会报读写冲突的吧
func (dict *ConcurrentDict) ForEach(consumer Consumer) {
if dict == nil {
panic("dict is nil")
}
for _, shard := range dict.table {
for key, value := range shard.m {
shard.mutex.RLock()
continues := consumer(key, value)
shard.mutex.RUnlock()
if !continues {
return
}
}
}
}
描述:
lib/utils/limitd_reader.go中的LimitedReader 是有什么特殊考虑吗? (没有使用标准库)
标准库: io.LimitedReader 提供了类似的limit功能, 但是和当前仓库的utils.LimitedReader 有使用的区别
测试如下:
utils.LimitedReader 在limited触发时,会多读取超过buffer (读取缓存)的额外内容
代码如下:
func TestNewLimitedReader(t *testing.T) {
resourceStr := "0123456789"
readFunc := func(r io.Reader) (string, error) {
full := ""
i := 0
for i < 16 {
buffer := make([]byte, 3) // 每次读3个byte
n, err := r.Read(buffer)
if err != nil {
if err == io.EOF {
// ignore eof err in this test
return string(full), nil
}
return "", nil
}
if n != 0 {
full += string(buffer[0:n])
}
i++
}
return full, nil
}
// test
readerList := []struct {
reader io.Reader
name string
}{
{NewLimitedReader(strings.NewReader(resourceStr), 5), "utils/LimitedReader"},
{&io.LimitedReader{
R: strings.NewReader(resourceStr),
N: 5,
}, "io.LimitReader"},
}
for _, r := range readerList {
result, err := readFunc(r.reader)
if err != nil {
log.Print(r.name + " read err:" + err.Error())
}
log.Printf("%s:%s\n", r.name, result)
}
}
测试结果如下:
utils.LimitedReader 额外读取了字符“5” 读取了6个字符
因为limit=5, readFunc中的的buffer设置为3(小于5且不能被5整除),导致utils.LimitedReader 额外多的读出来一个字符“5“
see titile, thank u. 😁
https://github.com/HDT3213/godis/blob/master/src/tcp/echo.go#L58
This storage and deletion are not of the same type?
This is a bit of a puzzle.
基本说明
当 WaitWithTimeout方法由于超时退出后,如果没有调用Done(),那么内部的goroutine中的Wait()方法还在阻塞,导致该goroutine泄露。
代码位置
lib/sync/wait/wait.go
测试代码
func main() {
w := &Wait{}
w.Add(1)
fmt.Printf("goroutines: %d\n", runtime.NumGoroutine())
b := w.WaitWithTimeout(5 * time.Second)
fmt.Println(b)
//w.Done()
time.Sleep(10 * time.Second)
fmt.Printf("goroutines: %d\n", runtime.NumGoroutine())
}
// Wait is similar with sync.WaitGroup which can wait with timeout
type Wait struct {
wg sync.WaitGroup
}
// Add adds delta, which may be negative, to the WaitGroup counter.
func (w *Wait) Add(delta int) {
w.wg.Add(delta)
}
// Done decrements the WaitGroup counter by one
func (w *Wait) Done() {
w.wg.Done()
}
// Wait blocks until the WaitGroup counter is zero.
func (w *Wait) Wait() {
w.wg.Wait()
}
// WaitWithTimeout blocks until the WaitGroup counter is zero or timeout
// returns true if timeout
func (w *Wait) WaitWithTimeout(timeout time.Duration) bool {
c := make(chan bool)
go func() {
defer close(c)
fmt.Println("wait...")
w.wg.Wait()
fmt.Println("done...")
c <- true
}()
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}
结果
goroutines: 1
wait...
true
goroutines: 2
func rollbackGivenKeys(db *DB, keys ...string) []CmdLine {
var undoCmdLines [][][]byte
for _, key := range keys {
entity, ok := db.GetEntity(key)
if !ok {
undoCmdLines = append(undoCmdLines,
utils.ToCmdLine("DEL", key),
)
} else {
undoCmdLines = append(undoCmdLines,
utils.ToCmdLine("DEL", key), // clean existed first
aof.EntityToCmd(key, entity).Args,
toTTLCmd(db, key).Args,
)
}
}
return undoCmdLines
}
为什么这里key不存在 也是执行的DEL命令进行回滚
pie 是一个用于Go程序插件扩展的库,可以以独立进程插件的方式来扩展Go程序,稳定性和安全性可以有保证。
如果godis能引入pie
作为插件系统,那就很方便了,可以形成一个社区。
例如用插件扩展godis实现业务逻辑等。
用telnet连接真正的reids时 直接输入 ping 就会返回+pong
但是我按照您的第二章(Golang 实现 Redis(2): 实现 Redis 协议解析器)
实现的demo中
必须用先输入一个 *1 回车
`
➜ src telnet 127.0.0.1 8000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
*1
ping
+PONG
`
连接真实的redis服务:
`
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
ping
+PONG
`
请问产生这个区别的原因是 redis用的空格来截取cmd 而您的例子是用的\n 吗?
为什么您的例子不也使用空格呢?
感谢大佬的开源,请问datastruct里面的list在插入删除的时候为什么不用加锁
我看到这里有一句注释是中文的,是故意用中文说明的吗?还是遗漏忘记改的?
https://github.com/HDT3213/godis/blob/master/aof/rewrite.go#L154
最近都在看你的golang实现redis系列文章,受益匪浅。
目前还差redis的geo功能,对redis中georadius的功能比较感兴趣,作者不能把geo的实现也出个文章
if h.closing.Get() {
// closing handler refuse new connection
_ = conn.Close()
}
这段代码应该写到for循环里面,如果客户端一直不关闭,handler协程永远无法关闭。而当客户端再次尝试命令时,服务端主动断开连接,一般情况下我们遇到的是客户端不主动关闭连接,继续发送命令这种情况
aof重写步骤为:
开始rewrite,对应startRewrite方法
获取写锁,暂停aof写入
获取aof文件大小
创建rewriteBuffer channel(带缓冲channel)
生成临时文件
释放写锁,恢复aof写入
读取aof文件内容,加载内容到临时DB对象
根据临时DB对象的数据,生成命令写入临时文件
结束rewrite
获取写锁,暂停aof写入
读取rewriteBuffer,写入aof临时文件
关闭rewriteBuffer,并设置为nil
重命名临时文件为aof文件
open 新的aof文件,并设置为db.aofFile
释放写锁,恢复aof写入
而主程序,在第一步与第四步之间,一直可以写入aof chan,在处理aof chan中的数据时,同步写入一份到rewriteBuffer chan中,此时会存在一个问题: 程序写入aof时命令较多,超过了rewriteBuffer的缓冲大小,此时会出现 handleAof方法获取到了读锁,但是在写入rewriteBuffer时,阻塞住了,无法释放读锁
而 finishRewrite 方法,在结束rewrite时,需要先获取到写锁,才会接收 rewriteBuffer chan的数据,就会出现锁已被 handleAof占用,finishRewrite方法获取不到锁的情况,从而导致死锁
func (dict *SimpleDict) Keys() []string {
result := make([]string, len(dict.m))
i := 0
for k := range dict.m {
result[i] = k
}
return result
}
i的值一直不变
ps:虽然除了测试并没有发现这个方法在其它地方用到过
打不过就加入,跟着大佬学了,感谢分享,感谢开源。
启动项目电脑系统:windows10 / 64位
go版本:go version go1.17.7 windows/386
在clone项目之后,进入到目录cmd命令行,执行以下命令
go mod tidy
安装依赖之后在目录下执行:
go run main.go
报以下错误:
# github.com/hdt3213/rdb/core
D:\Code\golang\go\pkg\mod\github.com\hdt3213\[email protected]\core\list.go:283:21: constant 4294967295 overflows int
这个问题只在 Windows10
操作系统出现,我另外一台 Ubuntu 20.04
操作系统的电脑项目正常运行。
那请问如何在 Windows10
操作系统电脑上运行项目代码呢?
大佬,时间轮是不并没有真正的运行起来,而是靠 IsExpired 方法实现过期 key 的删除,我没有看到在哪里调用了 timewheel.init 方法。
keys := append(writeKeys, readKeys...)
请问这个不会有改变原 writeKeys 切片的风险吗
logger.Setup函数里面,
fileName := fmt.Sprintf("%s-%s.%s",
settings.Name,
time.Now().Format(settings.TimeFormat),
settings.Ext)
因为拓展名加了.,格式化字符串也有.,导致最后文件名有两个.
expectedLine的类型是uint64,而uint64的范围是0 到 18446744073709551615,只会等于或大于0不会出现小于0的情况,最后的else是走不进去的,是否可以去掉呢?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.