mynote's Introduction
mynote's People
mynote's Issues
`go runtime` 中的`netpoll`
network poller
所有网络操作都以网络描述符 netFD 为中心,同时netFD 与底层 pollDesc 结构绑定,
当在一个 netFD 上读写遇到 EAGAIN 错误时,就将当前 G 存储到这个 netFD 对应的 pollDesc 中,然后调用 gopark,将当前 G 设置为等待状态;
直到这个 netFD 上再次发生读写事件,调用 goready,将当前 G 设置为可运行状态;
// net/fd_posix.go
type netFD struct {
pfd poll.FD
// ...
}
// internal/poll/fd_unix.go
type FD struct {
// ...
// Platform dependent state of the file descriptor. // 真正的系统文件描述符
SysFile
// I/O poller. // 底层事件的封装,netFD 通过它完成各种 I/O 相关操作
pd pollDesc
// ...
}
// internal/poll/fd_poll_runtime.go
//
// 只是一个指针,但是可以通过 init 方法 中 调用的 runtime_pollOpen 函数
// 得到 指针指向 runtime.pollDesc
type pollDesc struct {
runtimeCtx uintptr
}
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}
// runtime/netpoll.go
//
// 编译器将 poll_runtime_pollOpen 函数链接到 runtime_pollOpen 函数
//
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {}
// runtime/netpoll.go
//
// Network poller descriptor.
//
// No heap pointers.
type pollDesc struct {
// ...
link *pollDesc // in pollcache, protected by pollcache.lock // 用以实现链表
fd uintptr // constant for pollDesc usage lifetime // 文件描述符
// ...
// rg, wg are accessed atomically and hold g pointers. // 取值分别可能是 pdReady、pdWait、等待 fd 可读或可写的 G 、nil
// (Using atomic.Uintptr here is similar to using guintptr elsewhere.)
rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil
// ...
rseq uintptr // protects from stale read timers // rseq/wseq 表示fd被重用或者计时器被重置
rt timer // read deadline timer (set if rt.f != nil) // rt/wt 等待文件描述符的计时器
rd int64 // read deadline (a nanotime in the future, -1 when expired) // rd/wd 等待fd可读或可写的截至日期
wseq uintptr // protects from stale write timers // 上面提到
wt timer // write deadline timer // 上面提到
wd int64 // write deadline (a nanotime in the future, -1 when expired)// 上面提到
// ...
}
// pollDesc 缓存:包含一个用于保护轮询数据的互斥锁和链表头
// 运行时包中的全局变量 pollcache
//
// runtime/netpoll.go
type pollCache struct {
lock mutex
first *pollDesc
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descriptor is reused.
}
// 初次调用,批量初始化 pollDesc
func (c *pollCache) alloc() *pollDesc {}
// 回收 pollDesc,结构体被重复利用时才会由 runtime.poll_runtime_pollOpen 函数重置
func (c *pollCache) free(pd *pollDesc) {}
实现
网络轮询器,与平台无关,具体实现需要定义如下函数:
netpollinit()
: 初始化网络轮询器,仅调用一次netpollopen(fd uintptr, pd *pollDesc) int32
: 为文件描述符 fd 启用边缘触发通知,pd 参数用于在 fd 就绪时传递给 netpollready,返回一个 errno 值netpollclose(fd uintptr) int32
: 禁用文件描述符 fd 的通知,返回一个 errno 值netpoll(delta int64) (gList, int32)
: 进行网络轮询,如果 delta < 0,则无限期阻塞,如果 delta == 0,则不阻塞,如果 delta > 0,则阻塞至多 delta 纳秒,返回通过调用 netpollready 构建的 goroutine 列表,以及要添加到 netpollWaiters 的 delta。这永远不会返回一个非空列表和一个非零的 delta。netpollBreak()
: 唤醒网络轮询器,假设它在 netpoll 中被阻塞netpollIsPollDescriptor(fd uintptr) bool
: 报告 fd 是否是轮询器使用的文件描述符
epoll 实现
runtime/netpoll_epoll.go
- 网络轮询器的
初始化
- 向网络轮询器
加入待监控
的任务 - 从网络轮询器
获取触发
的事件
// 1. 网络轮询器的`初始化`
//
// 调用链 poll.runtime_pollServerInit -> runtime.poll_runtime_pollServerInit -> runtime.netpollGenericInit -> runtime.netpollinit
func netpollinit() {
var errno uintptr // 系统调用返回的错误码
epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC) // 创建一个 epoll 示例 epfd,作为 runtime 的唯一 event-loop 使用
// ...
r, w, errpipe := nonblockingPipe() // 创建一个非阻塞管道,用于和 epoll 实例通信
// ...
ev := syscall.EpollEvent{
Events: syscall.EPOLLIN, // 表示读事件
}
*(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRd // 将netpollBreakRd 通知信号量,封装成EpollEvent,注册进 epoll 实例,从而监听读事件
errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
// ...
netpollBreakRd = uintptr(r) // 保存管道的读文件描述符
netpollBreakWr = uintptr(w) // 保存管道的写文件描述符
}
// 2. 向网络轮询器`加入待监控`的任务
//
// 调用链 poll.runtime_pollOpen -> runtime.poll_runtime_pollOpen -> runtime.netpollopen
// 调用 poll_runtime_pollOpen(fd), 注册 fd 到 epoll 实例,pd 从 pollcache 中获取
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
var ev syscall.EpollEvent
ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET // 可读/可写事件、对端断开连接事件、边缘触发模式
tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load()) // 打包 pd 数据
*(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tp
return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev) // 将 fd 加入 epoll 进行监听
}
func netpollclose(fd uintptr) uintptr {
var ev syscall.EpollEvent
return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, int32(fd), &ev) // 将文件描述符 fd 从 epoll 实例中移除监听
}
// 3. 从网络轮询器`获取触发`的事件
//
// 1. 根据参数 delay,设置对应的调用 epollwait 的 timeout 值
// 2. 调用 epollwait 等待发生了可读/可写事件的 fd
// 3. 循环 epollwait 返回的事件列表,处理对应的事件类型,组装可运行的 goroutine 链表并返回。
// 随后调用 `runtime.injectglist(glist *gList)` 加入到全局队列或 P 本地队列
//
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) (gList, int32) { // 参数 delay 表等待的时间,返回一个 gList,也就是可运行的G列表
// ...
var waitms int32 // 根据 delay 设置`epollwait` 等待的时间:无限制阻塞、不阻塞只轮询、阻塞1ms、阻塞 delay ns、阻塞约11.5天
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
waitms = 1e9
}
var events [128]syscall.EpollEvent // 声明长度为 128 的 EpollEvent 数组,用于储存 epoll 事件
retry:
n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
if errno != 0 { // 是否发生了错误
if errno != _EINTR {
// ...
}
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}, 0
}
goto retry // 进行重试
}
var toRun gList
delta := int32(0)
for i := int32(0); i < n; i++ { // 遍历 epoll 事件数组
ev := events[i]
if ev.Events == 0 { // 如果事件为 0,表示没有发生事件,跳过当前循环
continue
}
if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd { // 是否是唤醒网络轮询的事件
if ev.Events != syscall.EPOLLIN { // 如果事件类型不是 EPOLLIN,则打印错误信息并抛出异常
// ...
}
if delay != 0 { // 不等于 0,表示需要阻塞
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp))) // 从 netpollBreakRd 读取一个字节,清除唤醒标志
netpollWakeSig.Store(0)
}
continue
}
var mode int32 // 表示事件的模式
if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 { // 发生读取相关的事件
mode += 'r'
}
if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 { // 发生写入相关的事件
mode += 'w'
}
if mode != 0 { // 模式不为 0
tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data)) // 获取与事件相关联的 pollDesc 结构体(在 netpollopen 时储存的)
pd := (*pollDesc)(tp.pointer())
tag := tp.tag()
if pd.fdseq.Load() == tag {
pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
delta += netpollready(&toRun, pd, mode) // 处理准备好的网络连接(不同的模式:读/写)
}
}
}
return toRun, delta
}
// 往通信管道里写入信号去唤醒 epollwait
//
// netpollBreak interrupts an epollwait.
func netpollBreak() {
// Failing to cas indicates there is an in-flight wakeup, so we're done here. // 避免重复的唤醒信号
if !netpollWakeSig.CompareAndSwap(0, 1) {
return
}
for {
// ...
}
}
无关实现
runtime/netpoll.go
// 调用链 netpoll -> netpollready
// 调用 netpollunblock,返回就绪 pd 对应的 G
//
// netpollready is called by the platform-specific netpoll function.
// It declares that the fd associated with pd is ready for I/O.
// The toRun argument is used to build a list of goroutines to return
// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
// whether the fd is ready for reading or writing or both.
//
// This returns a delta to apply to netpollWaiters.
//
// This may run while the world is stopped, so write barriers are not allowed.
//
//go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
// ...
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true, &delta)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true, &delta)
}
// ...
}
// 根据传入的 mode 决定从 pollDesc 的 rg 或者 wg 取出当时 gopark 之时存入的 G
//
// netpollunblock moves either pd.rg (if mode == 'r') or
// pd.wg (if mode == 'w') into the pdReady state.
// This returns any goroutine blocked on pd.{rg,wg}.
// It adds any adjustment to netpollWaiters to *delta;
// this adjustment should be applied after the goroutine has
// been marked ready.
func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {}
// 在文件描述符上执行读写操作时,如果文件描述符不可读或者不可写
// 调用链 poll.runtime_pollWait -> runtime.poll_runtime_pollWait -> netpollblock
//
// returns true if IO is ready, or false if timed out or closed
// waitio - wait only for completed IO, ignore errors
// Concurrent calls to netpollblock in the same mode are forbidden, as pollDesc
// can hold only a single waiting goroutine for each mode.
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' { // 根据模式获取到 G
gpp = &pd.wg
}
// set the gpp semaphore to pdWait // 循环等待 IO ready 或者 IO wait
for {
// Consume notification if already ready. // IO ready
if gpp.CompareAndSwap(pdReady, pdNil) {
return true // 直接返回并执行相关操作
}
if gpp.CompareAndSwap(pdNil, pdWait) { // 没有 IO 事件发生
break
}
// Double check that this isn't corrupt; otherwise we'd loop
// forever.
if v := gpp.Load(); v != pdReady && v != pdNil {
throw("runtime: double wait")
}
}
// need to recheck error states after setting gpp to pdWait
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
if waitio || netpollcheckerr(pd, mode) == pollNoError {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5) // 将 G 设为等待状态,通过 netpollblockcommit 将 G 存入 pd 中
}
// be careful to not lose concurrent pdReady notification
old := gpp.Swap(pdNil)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
`luajit`的`ffi`库
luajit ffi
FFI 库允许在 Lua 代码里调用外部 C 函数并使用 C 数据结构
FFI 库很大程度上消除了用 C 编写繁琐的手动 Lua/C 绑定的需要
对`git`四种`object`的理解
假设你有一个文件夹projectName
,希望对这个文件夹进行版本管理,你可能想到的办法:
- 复制整个文件夹
- 给新复制的文件夹重命名,加上版本后缀,比如
projectName_Version2
然后,你会发现,不同版本之间的文件夹,比如projectName_Version2
和projectName_Version3
之间的差异可能只有少数几个文件,而剩余的文件是相同的,所以你会希望:
3. 相同的文件只存储一份,然后在不同版本的文件夹中,相同的文件不存储实际文件内容,只储存实际文件内容的地址
然后,你希望在每次复制文件夹的时候,添加一些额外信息,比如:为什么复制?复制的时间?谁复制的?
最后,你希望标记某个重要的复制节点,比如projectName_Version12
。
在上面的场景中,已经出现了git
的四种对象:文件对象(blob)、树对象(tree)、提交对象(commit)、标签对象(tag)
blob对象就是文件,tree对象就是文件夹,commit对象指向一棵树同时携带一些额外信息,tag对象指向一个commit对象同时携带一些额外信息。
`go`中的`Garbage Collector`
Garbage collector (GC)
Go 语言使用的是一种并发标记-清除(concurrent mark-and-sweep)垃圾回收算法:
- GC 和用户代码(mutator)线程并发运行,且允许多个 GC 线程并发运行
- GC 是类型准确的,精确知道内存中的每个指针
- GC 不是分代回收、不对内存进行压缩
- 每个P中都有着大小分离的内存以最小化内存碎片、同时减小锁粒度
算法步骤:
- GC 执行清扫终止
a. Stop The Word: 所有的 P 到达 GC 安全点
b. 清扫未清扫的内存块:GC 提前才会有未清扫的内存块 - GC 执行标记阶段
a. 准备标记阶段:设置 gcphase 状态_GCoff -> _GCmark
,启用写屏障和 mutator 助手,
根标记任务入对,通过 STW 确保所有P启用写屏障后才开始扫描
b. Start The Word: 标记线程和助手开始执行,写屏障对任意指针写操作置灰?新分配对象标记为黑色
c. GC 执行根标记任务:扫描所有的栈,全局变量、堆指针置灰,扫描栈时会暂停G
d. GC 排空灰色对象的工作队列:扫描灰色对象,置为黑色,其关联的对象置灰,递归操作
e. 分布式终止算法 - GC 执行标记终止
a. Stop The Word:
b. 设置 gcphase 状态_GCmark -> _GCmarktermination
,禁用工作线程、mutator 助手
c. 执行清理工作,例如刷新 mcaches - GC 执行清扫阶段
a. 准备清扫阶段,设置 gcphase 状态_GCmarktermination -> _GCoff
,设置清扫状态,禁用写屏障
b. Start The Word: 分配的对象都是白色,在必要分配时会清扫内存块
c. 在后台或响应分配时并发清扫 - 当分配了足够的内存后,重新开始上述步骤。GC 速率由 GOGC 环境变量控制
NOTE:
GC清扫阶段与正常程序执行并发进行。
堆按span逐个清扫,惰性(当一个G需要span时)或后台G并发同时进行。
写屏障
写屏障(write barrier)是一种在并发
垃圾回收(GC)中使用的技术,用于跟踪对堆中对象的写操作
。
在并发 GC 中,当 mutator 线程(用户代码)修改指向堆对象的指针时,可能会导致垃圾回收器无法正确地追踪堆中对象的引用关系。
为了解决这个问题,GC 在一些写操作(通常是对指针的写入)时会插入一些额外的代码,即写屏障代码。
- 在 GC 的标记阶段,当 mutator 线程执行写操作时,写屏障会同时将旧指针值和新指针值标记为灰色,以便 GC 可以跟踪到这些对象的引用关系
- 新分配的对象会立即标记为黑色(已扫描过),以确保 GC 可以正确地追踪对象的引用关系
实现
// runtime/mgc.go
// 从 _GCoff 转换至 _GCmark 阶段,进入并发标记阶段并打开写屏障
func gcStart(trigger gcTrigger) {}
// 如果所有可达对象都已经完成扫描,调用 gcMarkTermination
func gcMarkDone() {}
// 从 _GCmark 转换 _GCmarktermination 阶段,进入标记终止阶段并在完成后进入 _GCoff
func gcMarkTermination(stw worldStop) {}
`linux`中的`io_uring`
Linux I/O 的演进
最开始是同步(阻塞式)系统调用
随着实际需求和具体场景,不断加入新的异步接口,还要保持与老接口的兼容和协同工作(在非阻塞式读写的问题上并没有形成统一方案)
到 io_uring 的出现
- 阻塞式:基于文件描述符fd的系统调用,fd指向文件/网络套接字
- 非阻塞式+轮询:只支持网络套接字/管道,部分不支持文件
- 线程池:主线程分发I/O,工作线程进行阻塞调用
- 直接I/O访问:指定
O_DIRECT
flag,零拷贝I/O - 异步I/O linux-aio:只支持 O_DIRECT 文件,对非数据库应用基本无用;拓展很复杂;可能导致阻塞
- io_uring
io_uring
Linux内核(5.1)提供的新异步I/O框架
原生 Linux AIO 框架存在各种限制,io_uring 旨在克服这些限制:
- 原生AIO不支持缓冲 I/O,仅支持直接 I/O
- 原生AIO具有不确定性行为,可能在各种情况下阻塞
- 原生AIO有一个不理想的 API,每个 I/O 至少需要两次系统调用,一次用于提交请求,一次用于等待其完成
io_uring实例有两个环:
提交任务队列SQ
和完成任务队列CQ
,被内核和用户程序共享;
这两个队列都是单生产者、单消费者,大小为2的幂;
- 用户程序创建一个或多个
提交任务项SQE
,更新提交任务队列SQ
队尾; - 内核消费
提交任务项SQE
,更新提交任务队列SQ
队头; - 内核创建
完成任务项CQE
,更新完成任务队列CQ
队尾; - 用户程序消费
完成任务项CQE
,更新完成任务队列CQ
队头;
API
io_uring_setup
io_uring_register
io_uring_enter
// 设置好`提交队列`和`完成队列`,至少`entries`个项
// 返回的 fd 用于执行后续操作
// 通过`io_uring_params`可以配置三种模式:Interrupt driven, Polled, Kernel polled
//
// setup a context for performing asynchronous I/O
int io_uring_setup(u32 entries, struct io_uring_params *p);
// 注册文件或用户缓冲区允许:
// 1. 内核长期引用与文件关联的内部内核数据结构,
// 2. 内核创建与缓冲区关联的应用程序内存的长期映射,
// 仅在注册期间而不是在处理每个 I/O 期间进行一次请求,从而减少每个 I/O 开销。
// 根据不同的 opcode 决定不同的 arg
//
// register files or user buffers for asynchronous I/O
int io_uring_register(unsigned int fd, unsigned int opcode, void *arg, unsigned int nr_args);
// 使用共享的SQ和CQ(由io_uring_setup创建)进行初始化或完成I/O操作
// 单次调用`io_uring_enter`可以同时:1. 提交新的I/O操作 2. 等待上一次调用`io_uring_enter`提交的I/O完成
//
// initiate and/or complete asynchronous I/O
int io_uring_enter(unsigned int fd, unsigned int to_submit, unsigned int min_complete, unsigned int flags, sigset_t *sig);
阅读`go runtime`中的`sysmon`函数
// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
lock(&sched.lock) // 获取调度器的锁
sched.nmsys++
checkdead()
unlock(&sched.lock) // 释放调度器的锁
lasttrace := int64(0) // 记录上一次调度追踪的时间
idle := 0 // how many cycles in succession we had not wokeup somebody /// 循环中的空闲周期数
delay := uint32(0) // 控制休眠时长
for { /// 进入循环
if idle == 0 { // start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
usleep(delay) /// 休眠20us,如果空闲次数idle大于50次,则休眠时间逐次翻倍,直到10ms
// sysmon should not enter deep sleep if schedtrace is enabled so that
// it can print that information at the right time.
//
// It should also not enter deep sleep if there are any active P's so
// that it can retake P's from syscalls, preempt long running G's, and
// poll the network if all P's are busy for long stretches.
//
// It should wakeup from deep sleep if any P's become active either due
// to exiting a syscall or waking up due to a timer expiring so that it
// can resume performing those duties. If it wakes from a syscall it
// resets idle and delay as a bet that since it had retaken a P from a
// syscall before, it may need to do it again shortly after the
// application starts work again. It does not reset idle when waking
// from a timer to avoid adding system load to applications that spend
// most of their time sleeping.
now := nanotime()
if debug.schedtrace <= 0 && (sched.gcwaiting.Load() || sched.npidle.Load() == gomaxprocs) { /// 没有启动schedtrace,GC等待中或没有活跃P,进入深度休眠
lock(&sched.lock)
if sched.gcwaiting.Load() || sched.npidle.Load() == gomaxprocs { /// GC等待中或没有活跃P
syscallWake := false
next := timeSleepUntil() /// 触发下一个timer的时间
if next > now {
sched.sysmonwait.Store(true) /// 标记进入深度休眠
unlock(&sched.lock) /// 释放sched.lock锁
// Make wake-up period small enough
// for the sampling to be correct.
sleep := forcegcperiod / 2 /// 计算睡眠时长
if next-now < sleep {
sleep = next - now /// 睡眠时长不超过下一个定时器的触发时间与当前时间的差值
}
shouldRelax := sleep >= osRelaxMinNS
if shouldRelax { /// 睡眠时长大于等于osRelaxMinNS
osRelax(true) /// 告知操作系统进入休眠状态。
}
syscallWake = notetsleep(&sched.sysmonnote, sleep) /// 使用信号sysmonnote进入休眠状态,等待被唤醒
if shouldRelax {
osRelax(false) /// 告知操作系统退出休眠状态
}
lock(&sched.lock) /// 重新获取sched.lock锁
sched.sysmonwait.Store(false) /// 标记退出深度休眠
noteclear(&sched.sysmonnote) /// 清除sysmonnote信号
}
if syscallWake { /// 唤醒后,重置idle和delay
idle = 0
delay = 20
}
}
unlock(&sched.lock)
}
lock(&sched.sysmonlock)
// Update now in case we blocked on sysmonnote or spent a long time
// blocked on schedlock or sysmonlock above.
now = nanotime()
// trigger libc interceptors if needed
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil)
}
// poll network if not polled for more than 10ms
lastpoll := sched.lastpoll.Load()
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { /// 网络轮询已初始化,且上次轮询时间超过10ms,则进行网络轮询,并调整等待时间
sched.lastpoll.CompareAndSwap(lastpoll, now)
list, delta := netpoll(0) // non-blocking - returns list of goroutines
if !list.empty() {
// Need to decrement number of idle locked M's
// (pretending that one more is running) before injectglist.
// Otherwise it can lead to the following situation:
// injectglist grabs all P's but before it starts M's to run the P's,
// another M returns from syscall, finishes running its G,
// observes that there is no work to do and no other running M's
// and reports deadlock.
incidlelocked(-1)
injectglist(&list)
incidlelocked(1)
netpollAdjustWaiters(delta)
}
}
if GOOS == "netbsd" && needSysmonWorkaround {
// netpoll is responsible for waiting for timer
// expiration, so we typically don't have to worry
// about starting an M to service timers. (Note that
// sleep for timeSleepUntil above simply ensures sysmon
// starts running again when that timer expiration may
// cause Go code to run again).
//
// However, netbsd has a kernel bug that sometimes
// misses netpollBreak wake-ups, which can lead to
// unbounded delays servicing timers. If we detect this
// overrun, then startm to get something to handle the
// timer.
//
// See issue 42515 and
// https://gnats.netbsd.org/cgi-bin/query-pr-single.pl?number=50094.
if next := timeSleepUntil(); next < now {
startm(nil, false, false)
}
}
if scavenger.sysmonWake.Load() != 0 { /// 有清理器的唤醒信号,唤醒清理器
// Kick the scavenger awake if someone requested it.
scavenger.wake()
}
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 { /// 重新获取被阻塞在系统调用中的P,并抢占运行时间过长的G
idle = 0
} else {
idle++
}
// check if we need to force a GC
if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && forcegc.idle.Load() { /// 是否需要进行强制的GC
lock(&forcegc.lock)
forcegc.idle.Store(false)
var list gList
list.push(forcegc.g)
injectglist(&list)
unlock(&forcegc.lock)
}
if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now { /// 启用了schedtrace,并且到达了指定的时间间隔,记录调度追踪信息
lasttrace = now
schedtrace(debug.scheddetail > 0)
}
unlock(&sched.sysmonlock) /// 释放sysmonlock锁,完成一次系统监控周期的循环
}
}
`linux`中5种`IO`模型
Five different types IO in Linux
阻塞I/O、非阻塞I/O、I/O多路复用、信号驱动I/O、异步I/O
以数据读取为例,分为:1. 准备数据 2. 将数据从内核复制到应用
阻塞I/O
在阻塞 I/O 模式下,应用程序执行 I/O 操作时,会一直阻塞,直到操作完成
第1步的准备数据:阻塞
第2步的复制数据:阻塞
非阻塞I/O
在非阻塞 I/O 模式下,应用程序执行 I/O 操作时,不会被阻塞,即使数据不可用也会立即返回
如果操作因为数据不可用而无法立即完成,它将返回一个错误代码(通常是 EAGAIN 或 EWOULDBLOCK),而不会阻塞应用程序的执行
第1步的准备数据:不阻塞,请求即立刻返回(无论是否准备好数据)
第2步的复制数据:阻塞
I/O多路复用
多路复用 I/O 模式允许应用程序同时监视多个文件描述符,以确定哪些文件描述符上有数据可读或可写
第1步的准备数据:监听多个fd,阻塞(可选)在epoll_wait
第2步的复制数据:阻塞
信号驱动 I/O
当数据可读或可写时,内核会向应用程序发送一个信号,应用程序可以捕获该信号并执行相应的操作
第1步的准备数据:不阻塞
第2步的复制数据:阻塞
异步 I/O
异步 I/O 模式允许应用程序在发起 I/O 操作后继续执行其他任务,而不必等待操作完成
当操作完成时,内核会通知应用程序,应用程序可以继续处理操作的结果
第1步的准备数据:不阻塞
第2步的复制数据:不阻塞(内核完成复制操作,也就是说其他方式是内核通知应用启动IO操作,而异步IO是内核通知应用IO操作已完成)
`go runtime`的GMP
GMP
go version go1.22.2 linux/amd64
G
G 是被调度的对象,有状态、栈
status
src/runtime/runtime2.go:18
// _Grunnable: 创建一个可运行的 G
func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g {}
// _Grunnable -> _Grunning: 将G调度在M上运行
func execute(gp *g, inheritTime bool) {}
// _Grunning -> _Grunnable: 当前 G 让出当前 CPU,允许其他 G 使用
func goyield() {} // 调用 goyield_m
func goyield_m(gp *g) {}
// _Grunning -> _Gdead: G 表示的任务已经完成
//
// 每个 G 的调用栈都构造得像是 goexit 调用了该 G 的入口函数,
// 因此当入口函数返回时,它会返回到 goexit,
// goexit 会调用 goexit1 来执行实际的退出操作
func goexit(neverCallThisFunction)
func goexit1() {} // 调用 goexit0
func goexit0(gp *g) {} // 调用 gdestroy
func gdestroy(gp *g) {} // _Grunning -> _Gdead
// _Grunning -> _Gsyscall: 执行 G 的时候,进入系统调用状态
//
// 此时 G 不在使用 CPU 资源
func entersyscall() {} // 调用 reentersyscall
func reentersyscall(pc, sp uintptr) {}
//
func entersyscallblock() {} // 同上,但是进入了一个阻塞的系统调用
// _Gsyscall -> _Grunning:
// _Gsyscall -> _Grunnable:
//
// 退出系统调用状态:
// 调用 exitsyscallfast 为true,则 _Gsyscall -> _Grunning
// 否则调用 exitsyscall0,则 _Gsyscall -> _Grunnable
func exitsyscall() {}
// _Grunning -> _Gwaiting: 将 G 设置为等待状态
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceReason traceBlockReason, traceskip int) {} // 调用 park_m
func park_m(gp *g) {} // _Grunning -> _Gwaiting
// _Gwaiting -> _Grunnable: 将 G 设置为可运行状态
func goready(gp *g, traceskip int) {} // 调用 ready
func ready(gp *g, traceskip int, next bool) {}
---
title: G status
---
stateDiagram-v2
direction LR
gidle : _Gidle
grunable: _Grunnable
grunning: _Grunning
gsyscall: _Gsyscall
gwaiting: _Gwaiting
gdead : _Gdead
[*] --> gidle
gdead --> [*]
gidle --> grunable: newproc1
grunable --> grunning: execute
grunning --> grunable: goyield
grunning --> gsyscall: entersyscall/entersyscallblock
state gsyscall_state <<choice>>
gsyscall --> gsyscall_state
gsyscall_state --> grunning: exitsyscall,exitsyscallfast
gsyscall_state --> grunable: exitsyscall,exitsyscall0
grunning --> gwaiting: gopark
gwaiting --> grunable: goready
grunning --> gdead: goexit
P
P是逻辑CPU,主要保存着可执行的G队列
status
src/runtime/runtime2.go:110
// _Pidle:
//
// 所有的 P 被设置为 _Pidle 状态
func procresize(nprocs int32) *p {}
// _Pidle -> _Prunning:
//
// 将 P 和 M 绑定,运行 G 或调度器
func acquirep(pp *p) {} // 调用 wirep
func wirep(pp *p) {} // pp.status = _Prunning
// _Prunning -> _Pidle:
//
// 将 P 和 M 解绑
func releasep() *p {} // 调用 releasepNoTrace
func releasepNoTrace() *p {} // pp.status = _Pidle
// _Pdead
func procresize(nprocs int32) *p {} // 重新设置 P 数量时,如果少于原 P 的数量,多余的 P 调用 destroy
func (pp *p) destroy() {} // pp.status = _Pdead
// _Psyscall
//
// P 关联了一个进行系统调用的M,也就是没有在运行G,此时可能被其他M窃取的
func entersyscall() {} // atomic.Store(&pp.status, _Psyscall)
---
title: P status
---
stateDiagram-v2
pidle : _Pidle
prunning: _Prunning
psyscall: _Psyscall
pdead : _Pdead
pidle --> prunning
prunning --> pidle
prunning --> psyscall
M
M 是OS线程,执行代码,必须绑定一个P才能获取到G进行执行
status
running/spinning: 运行状态或自旋状态
// 运行M: 新建M或唤醒M
// 新建M
func startm(pp *p, spinning, lockheld bool) {}
func newm(fn func(), pp *p, id int64) {} // 调用 newosproc, 不同`OS`不同实现
func newosproc(mp *m) {}
// 唤醒M
func wakep() {} // 调用 startm
func startm(pp *p, spinning, lockheld bool) {} // 调用 mget
func mget() *m {}
// 自旋M
func (mp *m) becomeSpinning() {}
// 从自旋中恢复
func resetspinning() {}
// 休眠 M
// M 没有 G 可以执行,也不是自旋状态,进入休眠状态
func stopm() {} // 调用 mput
func mput(mp *m) {}
TODO 状态图
`git`中的`revisions`是什么?
revision
通常是指commit对象,但是也是可以用来指向commit对象指向的tree对象和里面的blob对象。
`cloudwego`的`netpoll`包
netpoll
1. 监听net.Listener
的fd
// 新建`EventLoop`,是一个事件驱动的调度器
// 处于连接管理、事件调度等...
eventLoop, _ := netpoll.NewEventLoop(onRequest OnRequest, ops ...Option)
// 通过绑定`net.Listener`提供服务
eventLoop.Serve(net.Listener)
// 优雅关闭服务
eventLoop.Shutdown(context.Context)
- 先看第一步返回的
EventLoop
:
// eventloop.go
// 提供服务/优雅退出
//
// A EventLoop is a network server.
type EventLoop interface {
// Serve registers a listener and runs blockingly to provide services, including listening to ports,
// accepting connections and processing trans data. When an exception occurs or Shutdown is invoked,
// Serve will return an error which describes the specific reason.
Serve(ln net.Listener) error
// Shutdown is used to graceful exit.
// It will close all idle connections on the server, but will not change the underlying pollers.
//
// Argument: ctx set the waiting deadline, after which an error will be returned,
// but will not force the closing of connections in progress.
Shutdown(ctx context.Context) error
}
- 实现了
EventLoop
接口的eventLoop
结构体的Serve
方法:
// netpoll.go
type eventLoop struct {
sync.Mutex
opts *options
svr *server
stop chan error
}
// Serve implements EventLoop.
func (evl *eventLoop) Serve(ln net.Listener) error {
npln, err := ConvertListener(ln) // 转换 listener
// ...
// 新建一个server并运行
evl.svr = newServer(npln, evl.opts, evl.quit)
evl.svr.Run()
// ...
}
// Shutdown signals a shutdown a begins server closing.
func (evl *eventLoop) Shutdown(ctx context.Context) error {}
- 看函数
newServer
新建的server
,和(*server).Run
方法:
// netpoll_server.go
type server struct {
operator FDOperator
ln Listener
opts *options
onQuit func(err error)
connections sync.Map // key=fd, value=connection
}
// Run this server.
func (s *server) Run() (err error) {
s.operator = FDOperator{ // FDOperator: 在 fd 上操作的集合
FD: s.ln.Fd(), // Listener 的 fd
OnRead: s.OnRead,
OnHup: s.OnHup,
}
s.operator.poll = pollmanager.Pick() // pollmanager(在 init 函数初始化)管理多个(每20个cpu一个) pollers,这里获取一个poll(用于监听 fd)
err = s.operator.Control(PollReadable)
// ...
}
- 看结构体
FDOperator
的Control
方法,参数为PollReadable
:
// fd_operator.go
// FDOperator is a collection of operations on file descriptors.
type FDOperator struct {
// ...
}
func (op *FDOperator) Control(event PollEvent) error {
if event == PollDetach && atomic.AddInt32(&op.detached, 1) > 1 {
return nil
}
return op.poll.Control(op, event) // 调用 poll 的 Control 方法, poll是一个接口Poll,具体实现是结构体 defaultPoll
}
- 看接口
Poll
的具体实现defaultPoll
的Control
方法,参数为FDOperator
和PollReadable
:
// poll.go
// Poll monitors fd(file descriptor), calls the FDOperator to perform specific actions,
// and shields underlying differences. On linux systems, poll uses epoll by default,
// and kevent by default on bsd systems.
type Poll interface {
// ...
// Control the event of file descriptor and the operations is defined by PollEvent.
Control(operator *FDOperator, event PollEvent) error
// ...
}
// poll_default_linux.go
type defaultPoll struct {
// ...
}
// Control implements Poll.
func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
// ...
p.setOperator(unsafe.Pointer(&evt.data), operator) /// 将 FDOperator 设置为事件的自定义数据,在触发事件的时候可以取出
switch event {
case PollReadable: // server accept a new connection and wait read
operator.inuse() /// 设置状态,表示 FDOperator 正在使用
op, evt.events = syscall.EPOLL_CTL_ADD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR /// 设置 EPOLL_CTL 系统调用参数:增加监听,监听可读、对端关闭...
// ...
}
return EpollCtl(p.fd, op, operator.FD, &evt) /// 调用 EPOLL_CTL 系统调用
}
// sys_epoll_linux.go
// 封装的 SYS_EPOLL_CTL 系统调用函数
//
// EpollCtl implements epoll_ctl.
func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) {
_, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0)
if err == syscall.Errno(0) {
err = nil
}
return err
}
2. poll
运行:轮询是否有就绪事件
上一段第5点提到:
(*server).Run
方法内会通过pollmanager.Pick()
选择一个poll
,方法内已经将所有的poll运行起来了
// poll_manager.go
// LoadBalance is used to do load balancing among multiple pollers.
// a single poller may not be optimal if the number of cores is large (40C+).
type manager struct {
numLoops int32
status int32 // 0: uninitialized, 1: initializing, 2: initialized
balance loadbalance // load balancing method
polls []Poll // all the polls
}
// Pick will select the poller for use each time based on the LoadBalance.
func (m *manager) Pick() Poll {
START:
// fast path
if atomic.LoadInt32(&m.status) == managerInitialized { /// 已经初始化完成,直接选择一个poll返回
return m.balance.Pick()
}
// slow path
// try to get initializing lock failed, wait others finished the init work, and try again
if !atomic.CompareAndSwapInt32(&m.status, managerUninitialized, managerInitializing) { /// 尝试设置初始化状态,失败则重试
runtime.Gosched()
goto START
}
// adjust polls
// m.Run() will finish very quickly, so will not many goroutines block on Pick.
_ = m.Run() /// 运行 manager, 会运行其管理的所有 poll
if !atomic.CompareAndSwapInt32(&m.status, managerInitializing, managerInitialized) { /// 尝试设置初始化状态
// SetNumLoops called during m.Run() which cause CAS failed
// The polls will be adjusted next Pick
}
return m.balance.Pick() /// 选择一个poll返回
}
// Run all pollers.
func (m *manager) Run() (err error) {
defer func() {
if err != nil {
_ = m.Close()
}
}()
numLoops := int(atomic.LoadInt32(&m.numLoops)) /// numLoops 的值在 init 函数内通过调用 newManager 函数进行设置了
if numLoops == len(m.polls) {
return nil
}
var polls = make([]Poll, numLoops) /// 初始化 poll 设置数量
if numLoops < len(m.polls) { /// 小于原 poll 的个数,收缩
// shrink polls
copy(polls, m.polls[:numLoops]) /// 保留numLoops个poll
for idx := numLoops; idx < len(m.polls); idx++ {
// close redundant polls
if err = m.polls[idx].Close(); err != nil { /// 多余的关闭
logger.Printf("NETPOLL: poller close failed: %v\n", err)
}
}
} else { /// 大于等于原 poll 的个数,收缩
// growth polls
copy(polls, m.polls) /// 复制原所有的 poll
for idx := len(m.polls); idx < numLoops; idx++ {/// 需要新增的 poll
var poll Poll
poll, err = openPoll() /// 新增一个 poll
if err != nil {
return err
}
polls[idx] = poll
go poll.Wait() /// 运行 poll
}
}
m.polls = polls
// LoadBalance must be set before calling Run, otherwise it will panic.
m.balance.Rebalance(m.polls)
return nil
}
新建poll
:
// poll_default_linux.go
func openPoll() (Poll, error) {
return openDefaultPoll()
}
func openDefaultPoll() (*defaultPoll, error) {
var poll = new(defaultPoll)
// ...
var p, err = EpollCreate(0) /// 创建一个 epoll
// ...
poll.fd = p /// 保存 epoll 的 fd
var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0) /// 事件通知,返回`事件fd`
// ...
// ...
poll.Handler = poll.handler /// 注册处理就绪事件的函数`(p *defaultPoll) handler`
poll.wop = &FDOperator{FD: int(r0)} /// 用于 epoll_wait 的时候唤醒
if err = poll.Control(poll.wop, PollReadable); err != nil { /// 上一段分析过,增加对`事件fd`的监听
// ...
return nil, err
}
// ...
return poll, nil
}
// sys_epoll_linux.go
// 创建一个 epoll
//
// EpollCreate implements epoll_create1.
func EpollCreate(flag int) (fd int, err error) {
var r0 uintptr
r0, _, err = syscall.RawSyscall(syscall.SYS_EPOLL_CREATE1, uintptr(flag), 0, 0)
if err == syscall.Errno(0) {
err = nil
}
return int(r0), err
}
运行poll
:
// poll.go
// Poll monitors fd(file descriptor), calls the FDOperator to perform specific actions,
// and shields underlying differences. On linux systems, poll uses epoll by default,
// and kevent by default on bsd systems.
type Poll interface {
// Wait will poll all registered fds, and schedule processing based on the triggered event.
// The call will block, so the usage can be like:
//
// go wait()
//
Wait() error
//...
}
// poll_default_linux.go
type defaultPoll struct {
// ...
}
// Wait implements Poll.
func (p *defaultPoll) Wait() (err error) {
// ...
// wait
for {
// ...
n, err = EpollWait(p.fd, p.events, msec) /// 调用 epoll_wait,将就绪的事件保存在 p.events 中;参数 msec 表示是否阻塞调用 -1 为永久阻塞;返回值n表示就绪事件数量
// ...
if n <= 0 { /// 发生错误
msec = -1
runtime.Gosched()
continue
}
msec = 0
if p.Handler(p.events[:n]) { /// 处理就绪的事件,处理函数在新建 poll 的时候注册,即`(p *defaultPoll) handler`
return nil
}
// ...
}
}
// sys_epoll_linux.go
// 对 EPOLL_WAIT 的封装
// 参数 msec 决定是否阻塞,如果阻塞,则需要调用 Syscall6,会通知 runtime 调度当前 P
//
// EpollWait implements epoll_wait.
func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) {
var r0 uintptr
var _p0 = unsafe.Pointer(&events[0])
if msec == 0 {
r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0)
} else {
r0, _, err = syscall.Syscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0)
}
if err == syscall.Errno(0) {
err = nil
}
return int(r0), err
}
3. 处理就绪事件
上一段的
运行poll
提到:(*defaultPoll).Wait
方法内会通过Handler
方法处理就绪的事件
此处理函数在新建 poll 的时候注册,即(p *defaultPoll) handler
func (p *defaultPoll) handler(events []epollevent) (closed bool) {
var triggerRead, triggerWrite, triggerHup, triggerError bool
var err error
for i := range events {
operator := p.getOperator(0, unsafe.Pointer(&events[i].data)) /// 获取 FDOperator,在调用`(*defaultPoll).Control`方法时被设置为事件的自定义数据
// ...
var totalRead int
evt := events[i].events /// 获取事件,进行判断:
triggerRead = evt&syscall.EPOLLIN != 0 /// 触发读事件
triggerWrite = evt&syscall.EPOLLOUT != 0 /// 触发写事件
triggerHup = evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0 /// 触发挂起或对端关闭事件
triggerError = evt&syscall.EPOLLERR != 0 /// 触发错误
// trigger or exit gracefully
if operator.FD == p.wop.FD { /// 事件通知 fd
// must clean trigger first
syscall.Read(p.wop.FD, p.buf)
atomic.StoreUint32(&p.trigger, 0)
// if closed & exit
if p.buf[0] > 0 {
syscall.Close(p.wop.FD)
syscall.Close(p.fd) /// 关闭
operator.done()
return true
}
operator.done()
continue
}
if triggerRead { /// 1. 触发读
if operator.OnRead != nil {
// for non-connection /// 非连接
operator.OnRead(p) /// 调用 OnRead 方法
} else if operator.Inputs != nil {
// for connection /// 连接
bs := operator.Inputs(p.barriers[i].bs) /// 调用 Inputs 方法
if len(bs) > 0 {
n, err := ioread(operator.FD, bs, p.barriers[i].ivs) /// 读取数据
operator.InputAck(n) /// 确认读取完
totalRead += n
if err != nil {
p.appendHup(operator)
continue
}
}
} else {
logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
}
}
if triggerHup { /// 2. 触发挂起
if triggerRead && operator.Inputs != nil {
// read all left data if peer send and close
var leftRead int
// read all left data if peer send and close
if leftRead, err = readall(operator, p.barriers[i]); err != nil && !errors.Is(err, ErrEOF) {
logger.Printf("NETPOLL: readall(fd=%d)=%d before close: %s", operator.FD, total, err.Error())
}
totalRead += leftRead
}
// only close connection if no further read bytes
if totalRead == 0 {
p.appendHup(operator)
continue
}
}
if triggerError { /// 3. 触发错误
// Under block-zerocopy, the kernel may give an error callback, which is not a real error, just an EAGAIN.
// So here we need to check this error, if it is EAGAIN then do nothing, otherwise still mark as hup.
if _, _, _, _, err := syscall.Recvmsg(operator.FD, nil, nil, syscall.MSG_ERRQUEUE); err != syscall.EAGAIN {
p.appendHup(operator)
} else {
operator.done()
}
continue
}
if triggerWrite { /// 4. 触发写
if operator.OnWrite != nil {
// for non-connection /// 非连接
operator.OnWrite(p) /// 调用 OnWrite 方法
} else if operator.Outputs != nil {
// for connection /// 连接
bs, supportZeroCopy := operator.Outputs(p.barriers[i].bs) /// 调用 Outputs 方法
if len(bs) > 0 {
// TODO: Let the upper layer pass in whether to use ZeroCopy.
n, err := iosend(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) /// 发送数据
operator.OutputAck(n) /// 确认发送完
if err != nil {
p.appendHup(operator)
continue
}
}
} else {
logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
}
}
operator.done()
}
// hup conns together to avoid blocking the poll.
p.onhups()
return false
}
重新看一下FDOperator
结构体:
// FDOperator is a collection of operations on file descriptors.
type FDOperator struct {
// FD is file descriptor, poll will bind when register.
FD int
// The FDOperator provides three operations of reading, writing, and hanging.
// The poll actively fire the FDOperator when fd changes, no check the return value of FDOperator.
OnRead func(p Poll) error
OnWrite func(p Poll) error
OnHup func(p Poll) error
// The following is the required fn, which must exist when used, or directly panic.
// Fns are only called by the poll when handles connection events.
Inputs func(vs [][]byte) (rs [][]byte)
InputAck func(n int) (err error)
// Outputs will locked if len(rs) > 0, which need unlocked by OutputAck.
Outputs func(vs [][]byte) (rs [][]byte, supportZeroCopy bool)
OutputAck func(n int) (err error)
// poll is the registered location of the file descriptor.
poll Poll
// protect only detach once
detached int32
// private, used by operatorCache
next *FDOperator
state int32 // CAS: 0(unused) 1(inuse) 2(do-done)
index int32 // index in operatorCache
}
OnRead
// netpoll_server.go
type server struct {
// ...
}
// Run this server.
func (s *server) Run() (err error) {
s.operator = FDOperator{
FD: s.ln.Fd(),
OnRead: s.OnRead, /// 注册 OnRead 方法
OnHup: s.OnHup,
}
// ...
}
// OnRead implements FDOperator.
func (s *server) OnRead(p Poll) error {
// accept socket
conn, err := s.ln.Accept() /// 接受连接
if err == nil {
if conn != nil {
s.onAccept(conn.(Conn)) /// 处理连接:存出连接池,并处理连接
}
// EAGAIN | EWOULDBLOCK if conn and err both nil
return nil
}
logger.Printf("NETPOLL: accept conn failed: %v", err)
// delay accept when too many open files
// ...
// shut down
// ...
return err
}
OnWrite
type netFD struct {
// ...
}
func (c *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, retErr error) {
// ...
c.pd = newPollDesc(c.fd) /// 初始化 pd,注册 pd.onwrite 方法到OnWrite,在处理时间时被调用,通过 pd.writeTrigger 通知
for {
// Performing multiple connect system calls on a
// non-blocking socket under Unix variants does not
// necessarily result in earlier errors being
// returned. Instead, once runtime-integrated network
// poller tells us that the socket is ready, get the
// SO_ERROR socket option to see if the connection
// succeeded or failed. See issue 7474 for further
// details.
if err := c.pd.WaitWrite(ctx); err != nil { /// 阻塞,会通过 pd.writeTrigger 唤醒
return nil, err
}
// ...
}
}
Inputs, InputAck; Outputs, OutputAck
先看一下LinkBuffer的解析
// connection_impl.go
type connection struct {
//...
}
func (c *connection) init(conn Conn, opts *options) (err error) {
// ...
c.initFDOperator()
// ...
}
func (c *connection) initFDOperator() {
// ...
op.Inputs, op.InputAck = c.inputs, c.inputAck
op.Outputs, op.OutputAck = c.outputs, c.outputAck
// ...
}
// connection_reactor.go
// inputs implements FDOperator.
func (c *connection) inputs(vs [][]byte) (rs [][]byte) {
vs[0] = c.inputBuffer.book(c.bookSize, c.maxSize)
return vs[:1]
}
// inputAck implements FDOperator.
func (c *connection) inputAck(n int) (err error) {
if n <= 0 {
c.inputBuffer.bookAck(0)
return nil
}
// Auto size bookSize.
if n == c.bookSize && c.bookSize < mallocMax {
c.bookSize <<= 1
}
length, _ := c.inputBuffer.bookAck(n)
if c.maxSize < length {
c.maxSize = length
}
if c.maxSize > mallocMax {
c.maxSize = mallocMax
}
var needTrigger = true
if length == n { // first start onRequest
needTrigger = c.onRequest()
}
if needTrigger && length >= int(atomic.LoadInt64(&c.waitReadSize)) {
c.triggerRead(nil)
}
return nil
}
// outputs implements FDOperator.
func (c *connection) outputs(vs [][]byte) (rs [][]byte, supportZeroCopy bool) {
if c.outputBuffer.IsEmpty() {
c.rw2r()
return rs, c.supportZeroCopy
}
rs = c.outputBuffer.GetBytes(vs)
return rs, c.supportZeroCopy
}
// outputAck implements FDOperator.
func (c *connection) outputAck(n int) (err error) {
if n > 0 {
c.outputBuffer.Skip(n)
c.outputBuffer.Release()
}
if c.outputBuffer.IsEmpty() {
c.rw2r()
}
return nil
}
`go`中的`stack`
stack for G
- 编译器会在编译阶段会通过 cmd/internal/obj/x86.stacksplit 在调用函数前插入 runtime.morestack 或者 runtime.morestack_noctxt 函数;
- 运行时在创建新的 G 时会在 runtime.malg 中调用 runtime.stackalloc 申请新的栈内存,并在编译器插入的 runtime.morestack 中检查栈空间是否充足;
// runtime/runtime2.go
// Stack describes a Go execution stack.
// The bounds of the stack are exactly [lo, hi),
// with no implicit data structures on either side.
type stack struct {
lo uintptr
hi uintptr
}
// runtime/stack.go
//
// Stack frame layout
//
// (x86)
// +------------------+
// | args from caller |
// +------------------+ <- frame->argp
// | return address |
// +------------------+
// | caller's BP (*) | (*) if framepointer_enabled && varp > sp
// +------------------+ <- frame->varp
// | locals |
// +------------------+
// | args to callee |
// +------------------+ <- frame->sp
//
// (arm)
// +------------------+
// | args from caller |
// +------------------+ <- frame->argp
// | caller's retaddr |
// +------------------+
// | caller's FP (*) | (*) on ARM64, if framepointer_enabled && varp > sp
// +------------------+ <- frame->varp
// | locals |
// +------------------+
// | args to callee |
// +------------------+
// | return address |
// +------------------+ <- frame->sp
//
// varp > sp means that the function has a frame;
// varp == sp means frameless function.
初始化
// runtime/stack.go
// stackpool 表示全局栈缓存:分配小于 32KB 的栈空间
// stackLarge 表示大栈缓存:分配大于 32KB 的栈空间
// Global pool of spans that have free stacks.
// Stacks are assigned an order according to size.
//
// order = log_2(size/FixedStack)
//
// There is a free list for each order.
var stackpool [_NumStackOrders]struct {
item stackpoolItem
_ [(cpu.CacheLinePadSize - unsafe.Sizeof(stackpoolItem{})%cpu.CacheLinePadSize) % cpu.CacheLinePadSize]byte
}
type stackpoolItem struct {
_ sys.NotInHeap
mu mutex
span mSpanList
}
// Global pool of large stack spans.
var stackLarge struct {
lock mutex
free [heapAddrBits - pageShift]mSpanList // free lists by log_2(s.npages)
}
// runtime/stack.go
// 初始化 stackpool 和 stackLarge
// 两者都与`mSpanList`有关,也就是`mspan`相关,可以认为 Go 的栈内存都是分配在堆上的
func stackinit() {
if _StackCacheSize&_PageMask != 0 {
throw("cache size must be a multiple of page size")
}
for i := range stackpool {
stackpool[i].item.span.init()
lockInit(&stackpool[i].item.mu, lockRankStackpool)
}
for i := range stackLarge.free {
stackLarge.free[i].init()
lockInit(&stackLarge.lock, lockRankStackLarge)
}
}
// 使用全局进行分配的话,会造成锁竞争
// 栈与 P 关系密切,在每一个 P 增加栈内存缓存
type p struct {
// ...
mcache *mcache
// ...
}
type mcache struct {
// ...
stackcache [_NumStackOrders]stackfreelist
// ...
}
type stackfreelist struct {
list gclinkptr // linked list of free stacks
size uintptr // total size of stacks in list
}
分配
// runtime/stack.go
// 分配 n 字节的栈
// 1. 栈空间较小:使用全局缓存或P上缓存进行分配
// 2. 栈空间较大:使用大栈缓存 stackLarge 分配
// 3. 栈空间较大且 stackLarge 空间不足:从堆上申请内存
//
// stackalloc allocates an n byte stack.
//
// stackalloc must run on the system stack because it uses per-P
// resources and must not split the stack.
//
//go:systemstack
func stackalloc(n uint32) stack {
// Stackalloc must be called on scheduler stack, so that we
// never try to grow the stack during the code that stackalloc runs.
// Doing so would cause a deadlock (issue 1547).
thisg := getg()
// ...
// Small stacks are allocated with a fixed-size free-list allocator.
// If we need a stack of a bigger size, we fall back on allocating
// a dedicated span.
var v unsafe.Pointer
if n < fixedStack<<_NumStackOrders && n < _StackCacheSize { // n < 32768 /// 1. 小于32KB字节
order := uint8(0)
n2 := n
for n2 > fixedStack {
order++
n2 >>= 1
}
var x gclinkptr
if stackNoCache != 0 || thisg.m.p == 0 || thisg.m.preemptoff != "" { /// 禁用 P 的栈缓存,当前P为空,当前G不可被抢占
// thisg.m.p == 0 can happen in the guts of exitsyscall
// or procresize. Just get a stack from the global pool.
// Also don't touch stackcache during gc
// as it's flushed concurrently.
lock(&stackpool[order].item.mu)
x = stackpoolalloc(order) /// 从 stackpool 中分配
unlock(&stackpool[order].item.mu)
} else { /// 从 P 的缓存分配
c := thisg.m.p.ptr().mcache
x = c.stackcache[order].list
if x.ptr() == nil {
stackcacherefill(c, order) /// 从 stackpool 中拿一些到 P 缓存
x = c.stackcache[order].list
}
c.stackcache[order].list = x.ptr().next
c.stackcache[order].size -= uintptr(n)
}
v = unsafe.Pointer(x)
} else { /// 2. 大于等于32KB
var s *mspan
npage := uintptr(n) >> _PageShift
log2npage := stacklog2(npage)
// Try to get a stack from the large stack cache.
lock(&stackLarge.lock)
if !stackLarge.free[log2npage].isEmpty() { /// 如果大栈缓存不为空,进行分配
s = stackLarge.free[log2npage].first
stackLarge.free[log2npage].remove(s)
}
unlock(&stackLarge.lock)
lockWithRankMayAcquire(&mheap_.lock, lockRankMheap)
if s == nil { /// 分配失败了
// Allocate a new stack from the heap.
s = mheap_.allocManual(npage, spanAllocStack) /// 直接从堆上分配一个
if s == nil {
throw("out of memory")
}
osStackAlloc(s) /// 初始化分配的栈内存
s.elemsize = uintptr(n)
}
v = unsafe.Pointer(s.base())
}
// ...
return stack{uintptr(v), uintptr(v) + uintptr(n)}
}
扩容
// runtime/asm_amd64.s
// Called during function prolog when more stack is needed.
//
// The traceback routines see morestack on a g0 as being
// the top of a stack (for example, morestack calling newstack
// calling the scheduler calling newm calling gc), so we must
// record an argument size. For that purpose, it has no arguments.
TEXT runtime·morestack(SB),NOSPLIT|NOFRAME,$0-0
// ...
CALL runtime·newstack(SB)
// ...
// runtime/stack.go
// 需要更多栈空间时调用:
// 1. 分配更大的空间
// 2. 将当前栈内容迁移到新的栈空间中
//
// Called from runtime·morestack when more stack is needed.
// Allocate larger stack and relocate to new stack.
// Stack growth is multiplicative, for constant amortized cost.
//
// g->atomicstatus will be Grunning or Gscanrunning upon entry.
// If the scheduler is trying to stop this g, then it will set preemptStop.
//
// This must be nowritebarrierrec because it can be called as part of
// stack growth from other nowritebarrierrec functions, but the
// compiler doesn't check this.
//
//go:nowritebarrierrec
func newstack() {
thisg := getg()
// ...
gp := thisg.m.curg
// ...
morebuf := thisg.m.morebuf /// 清空 morebuf
thisg.m.morebuf.pc = 0
thisg.m.morebuf.lr = 0
thisg.m.morebuf.sp = 0
thisg.m.morebuf.g = 0
// NOTE: stackguard0 may change underfoot, if another thread
// is about to try to preempt gp. Read it just once and use that same
// value now and below.
stackguard0 := atomic.Loaduintptr(&gp.stackguard0) /// 加载当前栈的 stackguard0
// Be conservative about where we preempt.
// We are interested in preempting user Go code, not runtime code.
// If we're holding locks, mallocing, or preemption is disabled, don't
// preempt.
// This check is very early in newstack so that even the status change
// from Grunning to Gwaiting and back doesn't happen in this case.
// That status change by itself can be viewed as a small preemption,
// because the GC might change Gwaiting to Gscanwaiting, and then
// this goroutine has to wait for the GC to finish before continuing.
// If the GC is in some way dependent on this goroutine (for example,
// it needs a lock held by the goroutine), that small preemption turns
// into a real deadlock.
preempt := stackguard0 == stackPreempt /// 检查是否需要抢占
if preempt {
if !canPreemptM(thisg.m) { /// 不能抢占,则继续运行
// Let the goroutine keep running for now.
// gp->preempt is set, so it will be preempted next time.
gp.stackguard0 = gp.stack.lo + stackGuard
gogo(&gp.sched) // never return
}
}
// ...
// Allocate a bigger segment and move the stack.
oldsize := gp.stack.hi - gp.stack.lo
newsize := oldsize * 2 /// 分配新栈空间,旧栈大小的两倍
// Make sure we grow at least as much as needed to fit the new frame.
// (This is just an optimization - the caller of morestack will
// recheck the bounds on return.)
if f := findfunc(gp.sched.pc); f.valid() {
max := uintptr(funcMaxSPDelta(f))
needed := max + stackGuard
used := gp.stack.hi - gp.sched.sp
for newsize-used < needed {
newsize *= 2
}
}
if stackguard0 == stackForceMove {
// Forced stack movement used for debugging.
// Don't double the stack (or we may quickly run out
// if this is done repeatedly).
newsize = oldsize
}
// ...
// The goroutine must be executing in order to call newstack,
// so it must be Grunning (or Gscanrunning).
casgstatus(gp, _Grunning, _Gcopystack)
// The concurrent GC will not scan the stack while we are doing the copy since
// the gp is in a Gcopystack status.
copystack(gp, newsize) /// 迁移旧栈内容到新栈道
if stackDebug >= 1 {
print("stack grow done\n")
}
casgstatus(gp, _Gcopystack, _Grunning)
gogo(&gp.sched) /// 继续运行
}
// Copies gp's stack to a new stack of a different size.
// Caller must have changed gp status to Gcopystack.
func copystack(gp *g, newsize uintptr) {
old := gp.stack /// 旧栈
used := old.hi - gp.sched.sp /// 旧栈使用大小
// Add just the difference to gcController.addScannableStack.
// g0 stacks never move, so this will never account for them.
// It's also fine if we have no P, addScannableStack can deal with
// that case.
gcController.addScannableStack(getg().m.p.ptr(), int64(newsize)-int64(old.hi-old.lo)) /// 调整GC可以扫描的栈大小
// allocate new stack
new := stackalloc(uint32(newsize)) /// 分配新栈
// ...
// Compute adjustment.
var adjinfo adjustinfo /// 记录栈调整信息
adjinfo.old = old
adjinfo.delta = new.hi - old.hi
// Adjust sudogs, synchronizing with channel ops if necessary.
ncopy := used
if !gp.activeStackChans { /// 是否有chan指向当前栈
if newsize < old.hi-old.lo && gp.parkingOnChan.Load() {
// It's not safe for someone to shrink this stack while we're actively
// parking on a channel, but it is safe to grow since we do that
// ourselves and explicitly don't want to synchronize with channels
// since we could self-deadlock.
throw("racy sudog adjustment due to parking on channel")
}
adjustsudogs(gp, &adjinfo) /// 调整
} else {
// sudogs may be pointing in to the stack and gp has
// released channel locks, so other goroutines could
// be writing to gp's stack. Find the highest such
// pointer so we can handle everything there and below
// carefully. (This shouldn't be far from the bottom
// of the stack, so there's little cost in handling
// everything below it carefully.)
adjinfo.sghi = findsghi(gp, old)
// Synchronize with channel ops and copy the part of
// the stack they may interact with.
ncopy -= syncadjustsudogs(gp, used, &adjinfo) /// 调整,同步相关的chan
}
// Copy the stack (or the rest of it) to the new location
memmove(unsafe.Pointer(new.hi-ncopy), unsafe.Pointer(old.hi-ncopy), ncopy) /// 复制栈内容
// Adjust remaining structures that have pointers into stacks.
// We have to do most of these before we traceback the new
// stack because gentraceback uses them.
adjustctxt(gp, &adjinfo) /// 调整指向新栈的结构:上下文
adjustdefers(gp, &adjinfo) /// 调整指向新栈的结构:defer 调用
adjustpanics(gp, &adjinfo) /// 调整指向新栈的结构:panic 调用
if adjinfo.sghi != 0 {
adjinfo.sghi += adjinfo.delta
}
// Swap out old stack for new one
gp.stack = new /// g指向新栈
gp.stackguard0 = new.lo + stackGuard // NOTE: might clobber a preempt request /// 更新g的栈保护
gp.sched.sp = new.hi - used /// 调整g栈sp
gp.stktopsp += adjinfo.delta /// g栈最高sp
// Adjust pointers in the new stack.
var u unwinder
for u.init(gp, 0); u.valid(); u.next() { /// 遍历栈帧
adjustframe(&u.frame, &adjinfo) /// 进行调整
}
// free old stack
if stackPoisonCopy != 0 {
fillstack(old, 0xfc)
}
stackfree(old) /// 释放旧栈
}
缩容
// runtime/stack.go
// Maybe shrink the stack being used by gp.
//
// gp must be stopped and we must own its stack. It may be in
// _Grunning, but only if this is our own user G.
func shrinkstack(gp *g) {
//...
oldsize := gp.stack.hi - gp.stack.lo
newsize := oldsize / 2 /// 计算新栈大小:新栈的大小会是原始栈的一半
// Don't shrink the allocation below the minimum-sized stack
// allocation.
if newsize < fixedStack { /// 大小低于程序的最低限制 2KB,不进行缩容
return
}
// Compute how much of the stack is currently in use and only
// shrink the stack if gp is using less than a quarter of its
// current stack. The currently used stack includes everything
// down to the SP plus the stack guard space that ensures
// there's room for nosplit functions.
avail := gp.stack.hi - gp.stack.lo
if used := gp.stack.hi - gp.sched.sp + stackNosplit; used >= avail/4 { /// 栈内存使用大于 1/4 时不进行缩容
return
}
// ...
copystack(gp, newsize) /// 复制栈
}
阅读`go`中的`runtime·rt0_go`函数
go version go1.22.2 linux/amd64
runtime·rt0_go
给runtime.g0分配栈空间,将g0写入TLS,将m0和g0互相绑定、调用
osinit, schedinit
进行初始化,新建goroutine,运行runtime.main,运行当前M
逐行代码分析
TEXT runtime·rt0_go(SB),NOSPLIT|NOFRAME|TOPFRAME,$0
TEXT
: 定义代码段
runtime·rt0_go(SB)
: 定义函数
NOSPLIT|NOFRAME|TOPFRAME
: 汇编指令Directives,不允许栈分裂,不使用帧指针,在栈的顶部
$0
: 表示不使用栈空间
// copy arguments forward on an even stack
MOVQ DI, AX // argc
MOVQ SI, BX // argv
SUBQ $(5*8), SP // 3args 2auto
ANDQ $~15, SP
MOVQ AX, 24(SP)
MOVQ BX, 32(SP)
MOVQ DI, AX
: 将参数 argc 复制到寄存器 AX 中
MOVQ SI, BX
: 将参数 argv 复制到寄存器 BX 中
SUBQ $(5*8), SP
: 在栈上分配 5 个 8 字节的空间,用于存储参数(后面的第2,3个操作)
ANDQ $~15, SP
: 将栈指针 SP 按 16 字节边界对齐
MOVQ AX, 24(SP)
: 将参数 argc 写入栈上的偏移量为 24 的位置
MOVQ BX, 32(SP)
: 将参数 argv 写入栈上的偏移量为 32 的位置
NOTE:
栈是从高到低地址进行增长,ANDQ $~15, SP
操作中的SP与16进行与操作,SP的低4位被清零,也就是栈空间变大,同时对齐了16字节边界
// create istack out of the given (operating system) stack.
// _cgo_init may update stackguard.
MOVQ $runtime·g0(SB), DI
LEAQ (-64*1024)(SP), BX
MOVQ BX, g_stackguard0(DI)
MOVQ BX, g_stackguard1(DI)
MOVQ BX, (g_stack+stack_lo)(DI)
MOVQ SP, (g_stack+stack_hi)(DI)
MOVQ $runtime·g0(SB), DI
: 将 runtime.g0 的地址加载到寄存器 DI 中
LEAQ (-64*1024)(SP), BX
: 计算栈顶减去 64KB 后的地址,并加载到寄存器 BX 中
MOVQ BX, g_stackguard0(DI)
: 将 g0 的 stackguard0 的值赋值为 BX 的内容
MOVQ BX, g_stackguard1(DI)
: 将 g0 的 stackguard1 的值赋值为 BX 的内容
MOVQ BX, (g_stack+stack_lo)(DI)
: 将 g0 的 stack 的 stack_lo 的值赋值为 BX 的内容
MOVQ SP, (g_stack+stack_hi)(DI)
: 将 g0 的 stack 的 stack_hi 的值赋值为 SP 的内容
// find out information about the processor we're on
MOVL $0, AX
CPUID
CMPL AX, $0
JE nocpuinfo
CMPL BX, $0x756E6547 // "Genu"
JNE notintel
CMPL DX, $0x49656E69 // "ineI"
JNE notintel
CMPL CX, $0x6C65746E // "ntel"
JNE notintel
MOVB $1, runtime·isIntel(SB)
notintel:
// Load EAX=1 cpuid flags
MOVL $1, AX
CPUID
MOVL AX, runtime·processorVersionInfo(SB)
nocpuinfo:
// if there is an _cgo_init, call it.
MOVQ _cgo_init(SB), AX
TESTQ AX, AX
JZ needtls
// arg 1: g0, already in DI
MOVQ $setg_gcc<>(SB), SI // arg 2: setg_gcc
MOVQ $0, DX // arg 3, 4: not used when using platform's TLS
MOVQ $0, CX
MOVL $0, AX
: 将寄存器 AX 设置为 0。
CPUID
: 执行 CPUID 指令,将结果存储在寄存器 AX、BX、CX 和 DX 中。
CMPL AX, $0
: 比较寄存器 AX 和立即数 0。
JE nocpuinfo
: 如果 AX 和 0 相等,则跳转到标签 nocpuinfo。
CMPL BX, $0x756E6547
: 比较 BX 和字符串 "Genu" 的 ASCII 码。
JNE notintel
: 如果 BX 不等于 "Genu" 的 ASCII 码,则跳转到标签 notintel。
CMPL DX, $0x49656E69
: 比较 DX 和字符串 "ineI" 的 ASCII 码。
JNE notintel
: 如果 DX 不等于 "ineI" 的 ASCII 码,则跳转到标签 notintel。
CMPL CX, $0x6C65746E
: 比较 CX 和字符串 "ntel" 的 ASCII 码。
JNE notintel
: 如果 CX 不等于 "ntel" 的 ASCII 码,则跳转到标签 notintel。
MOVB $1, runtime·isIntel(SB)
: 将地址为 runtime·isIntel 的变量设置为 1,表示当前的处理器是 Intel 架构。
标签notintel:
MOVL $1, AX
: 将立即数 1 装载到寄存器 AX。
CPUID
: 执行 CPUID 指令,获取处理器信息。
MOVL AX, runtime·processorVersionInfo(SB)
: 将 CPUID 的结果存储到 runtime·processorVersionInfo 变量中。
标签nocpuinfo:
MOVQ _cgo_init(SB), AX
: 将 _cgo_init 函数的地址加载到寄存器 AX 中。
TESTQ AX, AX
: 测试寄存器 AX 是否为 0。
JZ needtls
: 如果 AX 为 0,则跳转到标签 needtls。
MOVQ $setg_gcc<>(SB), SI
: 将 setg_gcc 函数的地址加载到寄存器 SI 中。
MOVQ $0, DX
: 将 0 装载到寄存器 DX 中。
MOVQ $0, CX
: 将 0 装载到寄存器 CX 中。
TODO: setg_gcc做了什么?
#ifdef GOOS_android
MOVQ $runtime·tls_g(SB), DX // arg 3: &tls_g
// arg 4: TLS base, stored in slot 0 (Android's TLS_SLOT_SELF).
// Compensate for tls_g (+16).
MOVQ -16(TLS), CX
#endif
#ifdef GOOS_windows
MOVQ $runtime·tls_g(SB), DX // arg 3: &tls_g
// Adjust for the Win64 calling convention.
MOVQ CX, R9 // arg 4
MOVQ DX, R8 // arg 3
MOVQ SI, DX // arg 2
MOVQ DI, CX // arg 1
#endif
TODO: 针对安卓和windows一段逻辑
CALL AX
// update stackguard after _cgo_init
MOVQ $runtime·g0(SB), CX
MOVQ (g_stack+stack_lo)(CX), AX
ADDQ $const_stackGuard, AX
MOVQ AX, g_stackguard0(CX)
MOVQ AX, g_stackguard1(CX)
CALL AX
: 调用 _cgo_init 函数。
MOVQ $runtime·g0(SB), CX
: 将 runtime·g0 的地址加载到寄存器 CX 中
MOVQ (g_stack+stack_lo)(CX), AX
: 将 g_stack 中的 stack_lo 值加载到寄存器 AX 中
ADDQ $const_stackGuard, AX
: 将常数 const_stackGuard 加到 AX 寄存器中的值上
MOVQ AX, g_stackguard0(CX)
: 将 AX 中的值写入 g_stackguard0
MOVQ AX, g_stackguard1(CX)
: 将 AX 中的值写入 g_stackguard1
#ifndef GOOS_windows
JMP ok
#endif
needtls:
#ifdef GOOS_plan9
// skip TLS setup on Plan 9
JMP ok
#endif
#ifdef GOOS_solaris
// skip TLS setup on Solaris
JMP ok
#endif
#ifdef GOOS_illumos
// skip TLS setup on illumos
JMP ok
#endif
#ifdef GOOS_darwin
// skip TLS setup on Darwin
JMP ok
#endif
#ifdef GOOS_openbsd
// skip TLS setup on OpenBSD
JMP ok
#endif
#ifdef GOOS_windows
CALL runtime·wintls(SB)
#endif
LEAQ runtime·m0+m_tls(SB), DI
CALL runtime·settls(SB)
// store through it, to make sure it works
get_tls(BX)
MOVQ $0x123, g(BX)
MOVQ runtime·m0+m_tls(SB), AX
CMPQ AX, $0x123
JEQ 2(PC)
CALL runtime·abort(SB)
JMP ok
: 在某些操作系统,忽略TLS设置,直接跳转到标签 ok
CALL runtime·wintls(SB)
: 在 Windows 系统上设置 TLS
LEAQ runtime·m0+m_tls(SB), DI
: 将 runtime·m0+m_tls 的地址加载到寄存器 DI 中
CALL runtime·settls(SB)
: 调用 runtime·settls 函数(参数在DI寄存器中,也就是m0.tls地址),设置 TLS
get_tls(BX)
: 获取 TLS
MOVQ $0x123, g(BX)
: 将 0x123 存储到 TLS 中
MOVQ runtime·m0+m_tls(SB), AX
: 将 runtime·m0+m_tls 的地址加载到寄存器 AX 中
CMPQ AX, $0x123
: 比较 AX 和 0x123
JEQ 2(PC)
: 如果相等,跳转到当前位置的下两条指令(也就是跳过了下面的abort函数调用)
CALL runtime·abort(SB)
: 调用 runtime·abort 函数
NOTE:
TLS(线程局部存储),存储在线程开始时分配,线程结束时回收,在Go汇编中,TLS
是一个伪寄存器
get_tls(BX)
是一个macro:#define get_tls(r) MOVQ TLS, r
,将TLS
伪寄存器存储的值加载到某个寄存器,这里是BX,定义在src/runtime/go_tls.h
中
g(BX)
,同上,也是一个macro:#define g(r) 0(r)(TLS*1)
,off(reg)(TLS*1)
是一种TLS偏移量的写法
ok:
// set the per-goroutine and per-mach "registers"
get_tls(BX)
LEAQ runtime·g0(SB), CX
MOVQ CX, g(BX)
LEAQ runtime·m0(SB), AX
// save m->g0 = g0
MOVQ CX, m_g0(AX)
// save m0 to g0->m
MOVQ AX, g_m(CX)
CLD // convention is D is always left cleared
get_tls(BX)
: 获取 TLS,并将其保存到寄存器 BX 中
LEAQ runtime·g0(SB), CX
: 将 runtime·g0 的地址加载到寄存器 CX 中
MOVQ CX, g(BX)
: 将 CX 寄存器中的值(即 runtime·g0 的地址)写入 TLS
LEAQ runtime·m0(SB), AX
: 将 runtime·m0 的地址加载到寄存器 AX 中
MOVQ CX, m_g0(AX)
: 将 CX 寄存器中的值(即 runtime·g0 的地址)写入 m_g0
MOVQ AX, g_m(CX)
: 将 AX 寄存器中的值(即 runtime·m0 的地址)写入 g_m
NOTE: 这里将runtime.g0写入TLS,将runtime.m0和runtime.g0互相绑定
// Check GOAMD64 requirements
// We need to do this after setting up TLS, so that
// we can report an error if there is a failure. See issue 49586.
#ifdef NEED_FEATURES_CX
MOVL $0, AX
CPUID
CMPL AX, $0
JE bad_cpu
MOVL $1, AX
CPUID
ANDL $NEED_FEATURES_CX, CX
CMPL CX, $NEED_FEATURES_CX
JNE bad_cpu
#endif
#ifdef NEED_MAX_CPUID
MOVL $0x80000000, AX
CPUID
CMPL AX, $NEED_MAX_CPUID
JL bad_cpu
#endif
#ifdef NEED_EXT_FEATURES_BX
MOVL $7, AX
MOVL $0, CX
CPUID
ANDL $NEED_EXT_FEATURES_BX, BX
CMPL BX, $NEED_EXT_FEATURES_BX
JNE bad_cpu
#endif
#ifdef NEED_EXT_FEATURES_CX
MOVL $0x80000001, AX
CPUID
ANDL $NEED_EXT_FEATURES_CX, CX
CMPL CX, $NEED_EXT_FEATURES_CX
JNE bad_cpu
#endif
#ifdef NEED_OS_SUPPORT_AX
XORL CX, CX
XGETBV
ANDL $NEED_OS_SUPPORT_AX, AX
CMPL AX, $NEED_OS_SUPPORT_AX
JNE bad_cpu
#endif
#ifdef NEED_DARWIN_SUPPORT
MOVQ $commpage64_version, BX
CMPW (BX), $13 // cpu_capabilities64 undefined in versions < 13
JL bad_cpu
MOVQ $commpage64_cpu_capabilities64, BX
MOVQ (BX), BX
MOVQ $NEED_DARWIN_SUPPORT, CX
ANDQ CX, BX
CMPQ BX, CX
JNE bad_cpu
#endif
CALL runtime·check(SB)
TODO: 该部分代码主要用于检查 GOAMD64 的要求,并调用 runtime·check
函数进行检查
MOVL 24(SP), AX // copy argc
MOVL AX, 0(SP)
MOVQ 32(SP), AX // copy argv
MOVQ AX, 8(SP)
CALL runtime·args(SB)
CALL runtime·osinit(SB)
CALL runtime·schedinit(SB)
MOVL 24(SP), AX
: 将参数 argc 复制到 AX 寄存器中。
MOVL AX, 0(SP)
: 将 AX 寄存器中的值写入栈顶位置。
MOVQ 32(SP), AX
: 将参数 argv 复制到 AX 寄存器中。
MOVQ AX, 8(SP)
: 将 AX 寄存器中的值写入栈顶位置的下一个位置。
CALL runtime·args(SB)
: 调用 runtime·args 函数。
CALL runtime·osinit(SB)
: 调用 runtime·osinit 函数。
CALL runtime·schedinit(SB)
: 调用 runtime·schedinit 函数。
NOTE: 处理命令行参数,初始化程序
TODO: osinit做了什么? schedinit做了什么?
// create a new goroutine to start program
MOVQ $runtime·mainPC(SB), AX // entry
PUSHQ AX
CALL runtime·newproc(SB)
POPQ AX
// start this M
CALL runtime·mstart(SB)
CALL runtime·abort(SB) // mstart should never return
RET
MOVQ $runtime·mainPC(SB), AX
: 将 runtime·mainPC 的地址加载到寄存器 AX 中。
PUSHQ AX
: 将 AX 寄存器中的值推入栈中。
CALL runtime·newproc(SB)
: 调用 runtime·newproc 函数,创建一个新的 goroutine。
POPQ AX
: 将栈顶的值弹出,并存储到寄存器 AX 中。
CALL runtime·mstart(SB)
: 调用 runtime·mstart 函数,启动当前 M
NOTE: 创建一个新的 goroutine 来启动程序,$runtime·mainPC(SB)
指向的是runtime.main
TODO: 具体分析下newproc和mstart函数
bad_cpu: // show that the program requires a certain microarchitecture level.
MOVQ $2, 0(SP)
MOVQ $bad_cpu_msg<>(SB), AX
MOVQ AX, 8(SP)
MOVQ $84, 16(SP)
CALL runtime·write(SB)
MOVQ $1, 0(SP)
CALL runtime·exit(SB)
CALL runtime·abort(SB)
RET
// Prevent dead-code elimination of debugCallV2, which is
// intended to be called by debuggers.
MOVQ $runtime·debugCallV2<ABIInternal>(SB), AX
RET
NOTE: 在 CPU 不满足要求的情况下,输出错误消息并退出程序
总结
- 首先,保存了命令行参数的地址,并对栈顶地址SP进行对齐
- 分配了一个 64KB 大小的栈空间给
runtime.g0
- 进行了一些 CPU 信息相关的检查
- 调用了
_cgo_init
函数,并在此之后更新了stackguard
- 将当前线程的 TLS 地址存储到 m0.tls 中,并测试了读写是否成功
- 将 g0 的地址写入 TLS,并将 g0 与 m0 相互绑定
- 对于 GOAMD64 平台进行了一些检查
- 处理了命令行参数,调用了
osinit
和schedinit
函数,初始化了与操作系统相关的功能和调度器 - 调用了
newproc
函数创建一个新的 Goroutine,并运行了runtime.main
函数,最后调用了mstart
启动当前 M
阅读 `go runtime`中的 `systemstack`, `mcall`函数
// func systemstack(fn func())
TEXT runtime·systemstack(SB), NOSPLIT, $0-8 // NOSPLIT 不允许栈分割
MOVQ fn+0(FP), DI // DI = fn // 参数 fn 的指针储存到 DI 寄存器中
get_tls(CX)
MOVQ g(CX), AX // AX = g // 获取当前 g
MOVQ g_m(AX), BX // BX = m // 获取当前 m
CMPQ AX, m_gsignal(BX) // 检查当前 g 是不是gsignal,是则跳转到 noswitch
JEQ noswitch
MOVQ m_g0(BX), DX // DX = g0 // 将当前 m 的 g0 储存到 DX 寄存器中
CMPQ AX, DX // 检查当前 g 是不是 g0,是则跳转到 noswitch
JEQ noswitch
CMPQ AX, m_curg(BX) // 检查当前 g 是不是 m.curg,不是则跳转到 bad
JNE bad
// Switch stacks.
// The original frame pointer is stored in BP,
// which is useful for stack unwinding.
// Save our state in g->sched. Pretend to
// be systemstack_switch if the G stack is scanned.
CALL gosave_systemstack_switch<>(SB) // 保存当前 goroutine 的状态,在切换栈之后恢复
// switch to g0
MOVQ DX, g(CX) // DX 里是 g0, 将 g0 存储到 TLS 中
MOVQ DX, R14 // set the g register
MOVQ (g_sched+gobuf_sp)(DX), SP // 将 g0 的 栈指针 SP 设置到 SP 寄存器,也就是切换了栈
// call target function
MOVQ DI, DX // 储存函数指针到 DX
MOVQ 0(DI), DI // 将函数实际地址储存到 DI
CALL DI // 调用函数
// switch back to g
get_tls(CX)
MOVQ g(CX), AX // g0
MOVQ g_m(AX), BX // g0.m
MOVQ m_curg(BX), AX // g
MOVQ AX, g(CX) // 将 g 存储到 TLS 中
MOVQ (g_sched+gobuf_sp)(AX), SP // 切换回 g 的栈
MOVQ (g_sched+gobuf_bp)(AX), BP
MOVQ $0, (g_sched+gobuf_sp)(AX) // 清零,帮助垃圾回收器(GC)定位未使用的栈空间
MOVQ $0, (g_sched+gobuf_bp)(AX)
RET
noswitch:
// already on m stack; tail call the function
// Using a tail call here cleans up tracebacks since we won't stop
// at an intermediate systemstack.
MOVQ DI, DX // 将函数指针移动到 DX 寄存器中
MOVQ 0(DI), DI // 函数地址储存到 DI 寄存器中
// The function epilogue is not called on a tail call.
// Pop BP from the stack to simulate it.
POPQ BP // 模拟函数调用的栈帧弹出操作
JMP DI // 直接跳转函数处,执行代码:将当前函数的控制器移动到新函数上,且未保留当前函数的调用帧
bad:
// Bad: g is not gsignal, not g0, not curg. What is it?
MOVQ $runtime·badsystemstack(SB), AX
CALL AX
INT $3
`go`中的`syscall`
go syscall
系统调用描述的是
用户程序
进入内核
后执行的任务。
用户程序利用系统调用能执行许多操作:创建进程、网络、文件以及I/O操作等。
man page for syscalls(2) 列出了全部系统调用。
// syscall/syscall_linux.go
func Syscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno) {}
func Syscall6(trap, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2 uintptr, err Errno) {}
func RawSyscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno) {}
func RawSyscall6(trap, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2 uintptr, err Errno){}
// 不带 Raw 函数会在进入/退出系统调用的时候,通知 runtime
// 不通知 runtime,则没有办法通过调度,把这个 G 的 M 的 P 调度走
// runtime/internal/syscall/asm_linux_amd64.s
// func Syscall6(num, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2, errno uintptr)
//
// We need to convert to the syscall ABI.
//
// arg | ABIInternal | Syscall
// ---------------------------
// num | AX | AX
// a1 | BX | DI
// a2 | CX | SI
// a3 | DI | DX
// a4 | SI | R10
// a5 | R8 | R8
// a6 | R9 | R9
//
// r1 | AX | AX
// r2 | BX | DX
// err | CX | part of AX
//
// Note that this differs from "standard" ABI convention, which would pass 4th
// arg in CX, not R10.
TEXT ·Syscall6<ABIInternal>(SB),NOSPLIT,$0
// a6 already in R9.
// a5 already in R8.
MOVQ SI, R10 // a4
MOVQ DI, DX // a3
MOVQ CX, SI // a2
MOVQ BX, DI // a1
// num already in AX.
SYSCALL
CMPQ AX, $0xfffffffffffff001
JLS ok
NEGQ AX
MOVQ AX, CX // errno
MOVQ $-1, AX // r1
MOVQ $0, BX // r2
RET
ok:
// r1 already in AX.
MOVQ DX, BX // r2
MOVQ $0, CX // errno
RET
以上函数的实现都是汇编,按照
linux
的syscall
调用规范:
在汇编中把参数依次传入相关寄存器,并调用SYSCALL
指令即可进入内核处理逻辑,系统调用执行完毕之后,返回值放在 RAX 中。
RDI | RSI | RDX | R10 | R8 | R9 | RAX |
---|---|---|---|---|---|---|
参数1 | 参数2 | 参数3 | 参数4 | 参数5 | 参数6 | 系统调用编号/返回值 |
SYSCALL 定义
syscall/syscall_linux_amd64.go
阻塞的系统调用,定义成:
//sys sendmsg(s int, msg *Msghdr, flags int) (n int, err error)
非阻塞的系统调用,定义成:
//sysnb socket(domain int, typ int, proto int) (fd int, err error)
根据这些注释,mksyscall.pl 脚本会生成对应的平台的具体实现:
标记 //sys 的系统调用使用的是 Syscall 或者 Syscall6,因为阻塞调用需要通知runtime调度p
标记 //sysnb 的系统调用使用的是 RawSyscall 或 RawSyscall6
runtime 中的 SYSCALL
提供给用户的 syscall 库,在使用时,会使 G 和 P 分别进入 Gsyscall 和 Psyscall 状态(使用 entersyscall/exitsyscall 与调度交互)
而 runtime 封装的这些 syscall 无论是否阻塞,都不会调用 entersyscall 和 exitsyscall
`linux` 中的`epoll`
epoll
epoll 是 Linux 下高效的 I/O 事件通知机制,用于监视多个文件描述符上的事件。
它允许程序等待多个 I/O 事件同时发生,而不必为每个 I/O 事件都创建一个线程或进程。
// 1. 创建 epoll 实例
// int epoll_create1(int flags);
int epoll_fd;
epoll_fd = epoll_create1(0);
// 2. 将标准输入文件描述符添加到 epoll 实例中
// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 参数 op 包含:EPOLL_CTL_ADD 增加、EPOLL_CTL_MOD 修改、EPOLL_CTL_DEL 删除;
// 参数 event 的 events 字段描述了epoll监听的事件类型:EPOLLIN 读、EPOLLOUT 写等...
// 参数 event 的 data 字段描述了 fd 关联的数据,在调用epoll_wait 同 fd 一起返回;
struct epoll_event event;
event.events = EPOLLIN; // 监听读事件
event.data.fd = STDIN_FILENO;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, STDIN_FILENO, &event)
#define MAX_EVENTS 10 // 事件数量
struct epoll_event events[MAX_EVENTS]; // 存储 epoll 事件
int num_events;
// 3. 等待事件发生
// int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
// 参数 events 保存触发的事件列表
// 参数 timeout 表示 epoll_wait 会阻塞的时间
num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
// 4. 处理触发的事件列表
for (int i = 0; i < num_events; i++) {
// event 同2中一致,可以获取2中 fd 关联的 data
}
// 5. 关闭 epoll
close(epoll_fd);
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.