Giter VIP home page Giter VIP logo

mynote's Introduction

myNote

mynote's People

Contributors

gh-liu avatar

Watchers

 avatar

mynote's Issues

`go runtime` 中的`netpoll`

network poller

所有网络操作都以网络描述符 netFD 为中心,同时netFD 与底层 pollDesc 结构绑定,
当在一个 netFD 上读写遇到 EAGAIN 错误时,就将当前 G 存储到这个 netFD 对应的 pollDesc 中,然后调用 gopark,将当前 G 设置为等待状态;
直到这个 netFD 上再次发生读写事件,调用 goready,将当前 G 设置为可运行状态;

// net/fd_posix.go
type netFD struct {
	pfd poll.FD

	// ...
}

// internal/poll/fd_unix.go
type FD struct {
	// ...

	// Platform dependent state of the file descriptor. // 真正的系统文件描述符
	SysFile

	// I/O poller.                                      // 底层事件的封装,netFD 通过它完成各种 I/O 相关操作
	pd pollDesc

	// ...
}

// internal/poll/fd_poll_runtime.go
// 
// 只是一个指针,但是可以通过 init 方法 中 调用的 runtime_pollOpen 函数
// 得到 指针指向 runtime.pollDesc
type pollDesc struct {
	runtimeCtx uintptr
}

func (pd *pollDesc) init(fd *FD) error {
	serverInit.Do(runtime_pollServerInit)
	ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
	if errno != 0 {
		return errnoErr(syscall.Errno(errno))
	}
	pd.runtimeCtx = ctx
	return nil
}

// runtime/netpoll.go
//
// 编译器将 poll_runtime_pollOpen 函数链接到 runtime_pollOpen 函数
//
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {}

// runtime/netpoll.go
// 
// Network poller descriptor.
//
// No heap pointers.
type pollDesc struct {
	// ...
	link  *pollDesc      // in pollcache, protected by pollcache.lock              // 用以实现链表
	fd    uintptr        // constant for pollDesc usage lifetime                   // 文件描述符

	// ...

	// rg, wg are accessed atomically and hold g pointers.                         // 取值分别可能是 pdReady、pdWait、等待 fd 可读或可写的 G 、nil
	// (Using atomic.Uintptr here is similar to using guintptr elsewhere.)
	rg atomic.Uintptr                                                              // pdReady, pdWait, G waiting for read or pdNil
	wg atomic.Uintptr                                                              // pdReady, pdWait, G waiting for write or pdNil
	// ...
	rseq    uintptr   // protects from stale read timers                           // rseq/wseq 表示fd被重用或者计时器被重置
	rt      timer     // read deadline timer (set if rt.f != nil)                  // rt/wt 等待文件描述符的计时器
	rd      int64     // read deadline (a nanotime in the future, -1 when expired) // rd/wd 等待fd可读或可写的截至日期
	wseq    uintptr   // protects from stale write timers                          // 上面提到
	wt      timer     // write deadline timer                                      // 上面提到
	wd      int64     // write deadline (a nanotime in the future, -1 when expired)// 上面提到
	// ...
}
// pollDesc 缓存:包含一个用于保护轮询数据的互斥锁和链表头
// 运行时包中的全局变量 pollcache 
//
// runtime/netpoll.go
type pollCache struct {
	lock  mutex
	first *pollDesc
	// PollDesc objects must be type-stable,
	// because we can get ready notification from epoll/kqueue
	// after the descriptor is closed/reused.
	// Stale notifications are detected using seq variable,
	// seq is incremented when deadlines are changed or descriptor is reused.
}

// 初次调用,批量初始化 pollDesc
func (c *pollCache) alloc() *pollDesc {}
// 回收 pollDesc,结构体被重复利用时才会由 runtime.poll_runtime_pollOpen 函数重置
func (c *pollCache) free(pd *pollDesc) {}

实现

网络轮询器,与平台无关,具体实现需要定义如下函数:

  1. netpollinit(): 初始化网络轮询器,仅调用一次
  2. netpollopen(fd uintptr, pd *pollDesc) int32: 为文件描述符 fd 启用边缘触发通知,pd 参数用于在 fd 就绪时传递给 netpollready,返回一个 errno 值
  3. netpollclose(fd uintptr) int32: 禁用文件描述符 fd 的通知,返回一个 errno 值
  4. netpoll(delta int64) (gList, int32): 进行网络轮询,如果 delta < 0,则无限期阻塞,如果 delta == 0,则不阻塞,如果 delta > 0,则阻塞至多 delta 纳秒,返回通过调用 netpollready 构建的 goroutine 列表,以及要添加到 netpollWaiters 的 delta。这永远不会返回一个非空列表和一个非零的 delta。
  5. netpollBreak(): 唤醒网络轮询器,假设它在 netpoll 中被阻塞
  6. netpollIsPollDescriptor(fd uintptr) bool: 报告 fd 是否是轮询器使用的文件描述符

epoll 实现

runtime/netpoll_epoll.go

  1. 网络轮询器的初始化
  2. 向网络轮询器加入待监控的任务
  3. 从网络轮询器获取触发的事件
// 1. 网络轮询器的`初始化`
// 
// 调用链 poll.runtime_pollServerInit -> runtime.poll_runtime_pollServerInit -> runtime.netpollGenericInit -> runtime.netpollinit
func netpollinit() {
	var errno uintptr                                         // 系统调用返回的错误码
	epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC) // 创建一个 epoll 示例 epfd,作为 runtime 的唯一 event-loop 使用
	// ...
	r, w, errpipe := nonblockingPipe()                        // 创建一个非阻塞管道,用于和 epoll 实例通信
	// ...
	ev := syscall.EpollEvent{
		Events: syscall.EPOLLIN,                              // 表示读事件
	}
	*(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRd  // 将netpollBreakRd 通知信号量,封装成EpollEvent,注册进 epoll 实例,从而监听读事件
	errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
	// ...
	netpollBreakRd = uintptr(r)                               // 保存管道的读文件描述符
	netpollBreakWr = uintptr(w)                               // 保存管道的写文件描述符
}

// 2. 向网络轮询器`加入待监控`的任务
//
// 调用链 poll.runtime_pollOpen -> runtime.poll_runtime_pollOpen -> runtime.netpollopen
// 调用 poll_runtime_pollOpen(fd), 注册 fd 到 epoll 实例,pd 从 pollcache 中获取
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
	var ev syscall.EpollEvent
	ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET // 可读/可写事件、对端断开连接事件、边缘触发模式
	tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())                          // 打包 pd 数据
	*(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tp
	return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)                  // 将 fd 加入 epoll 进行监听
}

func netpollclose(fd uintptr) uintptr {
	var ev syscall.EpollEvent
	return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, int32(fd), &ev)                  // 将文件描述符 fd 从 epoll 实例中移除监听
}

// 3. 从网络轮询器`获取触发`的事件
// 
// 1. 根据参数 delay,设置对应的调用 epollwait 的 timeout 值
// 2. 调用 epollwait 等待发生了可读/可写事件的 fd
// 3. 循环 epollwait 返回的事件列表,处理对应的事件类型,组装可运行的 goroutine 链表并返回。
// 随后调用 `runtime.injectglist(glist *gList)` 加入到全局队列或 P 本地队列
// 
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) (gList, int32) { // 参数 delay 表等待的时间,返回一个 gList,也就是可运行的G列表 
	// ...
	var waitms int32 // 根据 delay 设置`epollwait` 等待的时间:无限制阻塞、不阻塞只轮询、阻塞1ms、阻塞 delay ns、阻塞约11.5天
	if delay < 0 {
		waitms = -1
	} else if delay == 0 {
		waitms = 0
	} else if delay < 1e6 {
		waitms = 1
	} else if delay < 1e15 {
		waitms = int32(delay / 1e6)
	} else {
		// An arbitrary cap on how long to wait for a timer.
		// 1e9 ms == ~11.5 days.
		waitms = 1e9
	}

	var events [128]syscall.EpollEvent	// 声明长度为 128 的 EpollEvent 数组,用于储存 epoll 事件
retry:
	n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
	if errno != 0 {						// 是否发生了错误
		if errno != _EINTR {
			// ...
		}
		// If a timed sleep was interrupted, just return to
		// recalculate how long we should sleep now.
		if waitms > 0 {
			return gList{}, 0
		}
		goto retry						// 进行重试
	}

	var toRun gList
	delta := int32(0)
	for i := int32(0); i < n; i++ { // 遍历 epoll 事件数组
		ev := events[i]
		if ev.Events == 0 { // 如果事件为 0,表示没有发生事件,跳过当前循环
			continue
		}

		if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd { // 是否是唤醒网络轮询的事件
			if ev.Events != syscall.EPOLLIN { // 如果事件类型不是 EPOLLIN,则打印错误信息并抛出异常
				// ...
			}
			if delay != 0 { // 不等于 0,表示需要阻塞
				// netpollBreak could be picked up by a
				// nonblocking poll. Only read the byte
				// if blocking.
				var tmp [16]byte
				read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp))) // 从 netpollBreakRd 读取一个字节,清除唤醒标志
				netpollWakeSig.Store(0)
			}
			continue
		}

		var mode int32                                                                             // 表示事件的模式
		if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 { // 发生读取相关的事件
			mode += 'r'
		}
		if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {                   // 发生写入相关的事件
			mode += 'w'
		}
		if mode != 0 {                                                                             // 模式不为 0
			tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))                                      // 获取与事件相关联的 pollDesc 结构体(在 netpollopen 时储存的)
			pd := (*pollDesc)(tp.pointer())
			tag := tp.tag()
			if pd.fdseq.Load() == tag {
				pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
				delta += netpollready(&toRun, pd, mode)                                            // 处理准备好的网络连接(不同的模式:读/写)
			}
		}
	}
	return toRun, delta
}

// 往通信管道里写入信号去唤醒 epollwait
//
// netpollBreak interrupts an epollwait.
func netpollBreak() {
	// Failing to cas indicates there is an in-flight wakeup, so we're done here. // 避免重复的唤醒信号
	if !netpollWakeSig.CompareAndSwap(0, 1) {
		return
	}

	for {
		// ...
	}
}

无关实现

runtime/netpoll.go

// 调用链 netpoll -> netpollready
// 调用 netpollunblock,返回就绪 pd 对应的 G
//
// netpollready is called by the platform-specific netpoll function.
// It declares that the fd associated with pd is ready for I/O.
// The toRun argument is used to build a list of goroutines to return
// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
// whether the fd is ready for reading or writing or both.
//
// This returns a delta to apply to netpollWaiters.
//
// This may run while the world is stopped, so write barriers are not allowed.
//
//go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
	// ...
	var rg, wg *g
	if mode == 'r' || mode == 'r'+'w' {
		rg = netpollunblock(pd, 'r', true, &delta)
	}
	if mode == 'w' || mode == 'r'+'w' {
		wg = netpollunblock(pd, 'w', true, &delta)
	}
	// ...
}

// 根据传入的 mode 决定从 pollDesc 的 rg 或者 wg 取出当时 gopark 之时存入的 G
//
// netpollunblock moves either pd.rg (if mode == 'r') or
// pd.wg (if mode == 'w') into the pdReady state.
// This returns any goroutine blocked on pd.{rg,wg}.
// It adds any adjustment to netpollWaiters to *delta;
// this adjustment should be applied after the goroutine has
// been marked ready.
func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {}
// 在文件描述符上执行读写操作时,如果文件描述符不可读或者不可写
// 调用链 poll.runtime_pollWait -> runtime.poll_runtime_pollWait -> netpollblock
//
// returns true if IO is ready, or false if timed out or closed
// waitio - wait only for completed IO, ignore errors
// Concurrent calls to netpollblock in the same mode are forbidden, as pollDesc
// can hold only a single waiting goroutine for each mode.
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
	gpp := &pd.rg 
	if mode == 'w' { // 根据模式获取到 G
		gpp = &pd.wg
	}

	// set the gpp semaphore to pdWait			  // 循环等待 IO ready 或者 IO wait
	for {
		// Consume notification if already ready. // IO ready
		if gpp.CompareAndSwap(pdReady, pdNil) {
			return true							  // 直接返回并执行相关操作
		}
		if gpp.CompareAndSwap(pdNil, pdWait) {	  // 没有 IO 事件发生
			break
		}

		// Double check that this isn't corrupt; otherwise we'd loop
		// forever.
		if v := gpp.Load(); v != pdReady && v != pdNil {
			throw("runtime: double wait")
		}
	}

	// need to recheck error states after setting gpp to pdWait
	// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
	// do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
	if waitio || netpollcheckerr(pd, mode) == pollNoError {
		gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5) // 将 G 设为等待状态,通过 netpollblockcommit 将 G 存入 pd 中
	}
	// be careful to not lose concurrent pdReady notification
	old := gpp.Swap(pdNil)
	if old > pdWait {
		throw("runtime: corrupted polldesc")
	}
	return old == pdReady
}

`luajit`的`ffi`库

luajit ffi

FFI 库允许在 Lua 代码里调用外部 C 函数并使用 C 数据结构
FFI 库很大程度上消除了用 C 编写繁琐的手动 Lua/C 绑定的需要

对`git`四种`object`的理解

假设你有一个文件夹projectName,希望对这个文件夹进行版本管理,你可能想到的办法:

  1. 复制整个文件夹
  2. 给新复制的文件夹重命名,加上版本后缀,比如projectName_Version2

然后,你会发现,不同版本之间的文件夹,比如projectName_Version2projectName_Version3之间的差异可能只有少数几个文件,而剩余的文件是相同的,所以你会希望:
3. 相同的文件只存储一份,然后在不同版本的文件夹中,相同的文件不存储实际文件内容,只储存实际文件内容的地址

然后,你希望在每次复制文件夹的时候,添加一些额外信息,比如:为什么复制?复制的时间?谁复制的?

最后,你希望标记某个重要的复制节点,比如projectName_Version12

在上面的场景中,已经出现了git的四种对象:文件对象(blob)、树对象(tree)、提交对象(commit)、标签对象(tag)

blob对象就是文件,tree对象就是文件夹,commit对象指向一棵树同时携带一些额外信息,tag对象指向一个commit对象同时携带一些额外信息。

`go`中的`Garbage Collector`

Garbage collector (GC)

Go 语言使用的是一种并发标记-清除(concurrent mark-and-sweep)垃圾回收算法:

  1. GC 和用户代码(mutator)线程并发运行,且允许多个 GC 线程并发运行
  2. GC 是类型准确的,精确知道内存中的每个指针
  3. GC 不是分代回收、不对内存进行压缩
  4. 每个P中都有着大小分离的内存以最小化内存碎片、同时减小锁粒度

算法步骤:

  1. GC 执行清扫终止
    a. Stop The Word: 所有的 P 到达 GC 安全点
    b. 清扫未清扫的内存块:GC 提前才会有未清扫的内存块
  2. GC 执行标记阶段
    a. 准备标记阶段:设置 gcphase 状态_GCoff -> _GCmark ,启用写屏障和 mutator 助手,
    根标记任务入对,通过 STW 确保所有P启用写屏障后才开始扫描
    b. Start The Word: 标记线程和助手开始执行,写屏障对任意指针写操作置灰?新分配对象标记为黑色
    c. GC 执行根标记任务:扫描所有的栈,全局变量、堆指针置灰,扫描栈时会暂停G
    d. GC 排空灰色对象的工作队列:扫描灰色对象,置为黑色,其关联的对象置灰,递归操作
    e. 分布式终止算法
  3. GC 执行标记终止
    a. Stop The Word:
    b. 设置 gcphase 状态_GCmark -> _GCmarktermination,禁用工作线程、mutator 助手
    c. 执行清理工作,例如刷新 mcaches
  4. GC 执行清扫阶段
    a. 准备清扫阶段,设置 gcphase 状态_GCmarktermination -> _GCoff,设置清扫状态,禁用写屏障
    b. Start The Word: 分配的对象都是白色,在必要分配时会清扫内存块
    c. 在后台或响应分配时并发清扫
  5. 当分配了足够的内存后,重新开始上述步骤。GC 速率由 GOGC 环境变量控制

NOTE:
GC清扫阶段与正常程序执行并发进行。
堆按span逐个清扫,惰性(当一个G需要span时)或后台G并发同时进行。

写屏障

写屏障(write barrier)是一种在并发垃圾回收(GC)中使用的技术,用于跟踪对堆中对象的写操作
在并发 GC 中,当 mutator 线程(用户代码)修改指向堆对象的指针时,可能会导致垃圾回收器无法正确地追踪堆中对象的引用关系。
为了解决这个问题,GC 在一些写操作(通常是对指针的写入)时会插入一些额外的代码,即写屏障代码。

  1. 在 GC 的标记阶段,当 mutator 线程执行写操作时,写屏障会同时将旧指针值和新指针值标记为灰色,以便 GC 可以跟踪到这些对象的引用关系
  2. 新分配的对象会立即标记为黑色(已扫描过),以确保 GC 可以正确地追踪对象的引用关系

实现

// runtime/mgc.go

// 从 _GCoff 转换至 _GCmark 阶段,进入并发标记阶段并打开写屏障
func gcStart(trigger gcTrigger) {}

//  如果所有可达对象都已经完成扫描,调用 gcMarkTermination
func gcMarkDone() {}

// 从 _GCmark 转换 _GCmarktermination 阶段,进入标记终止阶段并在完成后进入 _GCoff
func gcMarkTermination(stw worldStop) {}

`linux`中的`io_uring`

Linux I/O 的演进

最开始是同步(阻塞式)系统调用
随着实际需求和具体场景,不断加入新的异步接口,还要保持与老接口的兼容和协同工作(在非阻塞式读写的问题上并没有形成统一方案)
到 io_uring 的出现

  1. 阻塞式:基于文件描述符fd的系统调用,fd指向文件/网络套接字
  2. 非阻塞式+轮询:只支持网络套接字/管道,部分不支持文件
  3. 线程池:主线程分发I/O,工作线程进行阻塞调用
  4. 直接I/O访问:指定O_DIRECTflag,零拷贝I/O
  5. 异步I/O linux-aio:只支持 O_DIRECT 文件,对非数据库应用基本无用;拓展很复杂;可能导致阻塞
  6. io_uring

io_uring

Linux内核(5.1)提供的新异步I/O框架

原生 Linux AIO 框架存在各种限制,io_uring 旨在克服这些限制:

  1. 原生AIO不支持缓冲 I/O,仅支持直接 I/O
  2. 原生AIO具有不确定性行为,可能在各种情况下阻塞
  3. 原生AIO有一个不理想的 API,每个 I/O 至少需要两次系统调用,一次用于提交请求,一次用于等待其完成

io_uring实例有两个环:提交任务队列SQ完成任务队列CQ,被内核和用户程序共享;
这两个队列都是单生产者、单消费者,大小为2的幂;

  1. 用户程序创建一个或多个提交任务项SQE,更新提交任务队列SQ队尾;
  2. 内核消费提交任务项SQE,更新提交任务队列SQ队头;
  3. 内核创建完成任务项CQE,更新完成任务队列CQ队尾;
  4. 用户程序消费完成任务项CQE,更新完成任务队列CQ队头;

API

io_uring_setup
io_uring_register
io_uring_enter

// 设置好`提交队列`和`完成队列`,至少`entries`个项
// 返回的 fd 用于执行后续操作
// 通过`io_uring_params`可以配置三种模式:Interrupt driven, Polled, Kernel polled
// 
// setup a context for performing asynchronous I/O
int io_uring_setup(u32 entries, struct io_uring_params *p);

// 注册文件或用户缓冲区允许:
// 1. 内核长期引用与文件关联的内部内核数据结构,
// 2. 内核创建与缓冲区关联的应用程序内存的长期映射,
// 仅在注册期间而不是在处理每个 I/O 期间进行一次请求,从而减少每个 I/O 开销。
// 根据不同的 opcode 决定不同的 arg 
//
// register files or user buffers for asynchronous I/O
int io_uring_register(unsigned int fd, unsigned int opcode, void *arg, unsigned int nr_args);

// 使用共享的SQ和CQ(由io_uring_setup创建)进行初始化或完成I/O操作
// 单次调用`io_uring_enter`可以同时:1. 提交新的I/O操作 2. 等待上一次调用`io_uring_enter`提交的I/O完成
//
// initiate and/or complete asynchronous I/O
int io_uring_enter(unsigned int fd, unsigned int to_submit, unsigned int min_complete, unsigned int flags, sigset_t *sig);

阅读`go runtime`中的`sysmon`函数

// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
	lock(&sched.lock) // 获取调度器的锁
	sched.nmsys++
	checkdead()
	unlock(&sched.lock) // 释放调度器的锁

	lasttrace := int64(0) // 记录上一次调度追踪的时间
	idle := 0             // how many cycles in succession we had not wokeup somebody                   /// 循环中的空闲周期数
	delay := uint32(0)    // 控制休眠时长

	for {                                                                                               /// 进入循环
		if idle == 0 { // start with 20us sleep...
			delay = 20
		} else if idle > 50 { // start doubling the sleep after 1ms...
			delay *= 2
		}
		if delay > 10*1000 { // up to 10ms
			delay = 10 * 1000
		}
		usleep(delay)                                                                               /// 休眠20us,如果空闲次数idle大于50次,则休眠时间逐次翻倍,直到10ms

		// sysmon should not enter deep sleep if schedtrace is enabled so that
		// it can print that information at the right time.
		//
		// It should also not enter deep sleep if there are any active P's so
		// that it can retake P's from syscalls, preempt long running G's, and
		// poll the network if all P's are busy for long stretches.
		//
		// It should wakeup from deep sleep if any P's become active either due
		// to exiting a syscall or waking up due to a timer expiring so that it
		// can resume performing those duties. If it wakes from a syscall it
		// resets idle and delay as a bet that since it had retaken a P from a
		// syscall before, it may need to do it again shortly after the
		// application starts work again. It does not reset idle when waking
		// from a timer to avoid adding system load to applications that spend
		// most of their time sleeping.
		now := nanotime()
		if debug.schedtrace <= 0 && (sched.gcwaiting.Load() || sched.npidle.Load() == gomaxprocs) { /// 没有启动schedtrace,GC等待中或没有活跃P,进入深度休眠
			lock(&sched.lock)
			if sched.gcwaiting.Load() || sched.npidle.Load() == gomaxprocs {                    /// GC等待中或没有活跃P
				syscallWake := false
				next := timeSleepUntil()                                                    /// 触发下一个timer的时间
				if next > now {
					sched.sysmonwait.Store(true)                                        /// 标记进入深度休眠
					unlock(&sched.lock)                                                 /// 释放sched.lock锁
					// Make wake-up period small enough
					// for the sampling to be correct.
					sleep := forcegcperiod / 2                                          /// 计算睡眠时长
					if next-now < sleep {
						sleep = next - now                                          /// 睡眠时长不超过下一个定时器的触发时间与当前时间的差值
					}
					shouldRelax := sleep >= osRelaxMinNS
					if shouldRelax {                                                    /// 睡眠时长大于等于osRelaxMinNS
						osRelax(true)                                               /// 告知操作系统进入休眠状态。
					}
					syscallWake = notetsleep(&sched.sysmonnote, sleep)                  /// 使用信号sysmonnote进入休眠状态,等待被唤醒
					if shouldRelax {
						osRelax(false)                                              /// 告知操作系统退出休眠状态
					}
					lock(&sched.lock)                                                   /// 重新获取sched.lock锁
					sched.sysmonwait.Store(false)                                       /// 标记退出深度休眠
					noteclear(&sched.sysmonnote)                                        /// 清除sysmonnote信号
				}
				if syscallWake {                                                            /// 唤醒后,重置idle和delay
					idle = 0
					delay = 20
				}
			}
			unlock(&sched.lock)
		}

		lock(&sched.sysmonlock)
		// Update now in case we blocked on sysmonnote or spent a long time
		// blocked on schedlock or sysmonlock above.
		now = nanotime()

		// trigger libc interceptors if needed
		if *cgo_yield != nil {
			asmcgocall(*cgo_yield, nil)
		}
		// poll network if not polled for more than 10ms
		lastpoll := sched.lastpoll.Load()
		if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {                        /// 网络轮询已初始化,且上次轮询时间超过10ms,则进行网络轮询,并调整等待时间
			sched.lastpoll.CompareAndSwap(lastpoll, now)
			list, delta := netpoll(0) // non-blocking - returns list of goroutines
			if !list.empty() {
				// Need to decrement number of idle locked M's
				// (pretending that one more is running) before injectglist.
				// Otherwise it can lead to the following situation:
				// injectglist grabs all P's but before it starts M's to run the P's,
				// another M returns from syscall, finishes running its G,
				// observes that there is no work to do and no other running M's
				// and reports deadlock.
				incidlelocked(-1)
				injectglist(&list)
				incidlelocked(1)
				netpollAdjustWaiters(delta)
			}
		}
		if GOOS == "netbsd" && needSysmonWorkaround {
			// netpoll is responsible for waiting for timer
			// expiration, so we typically don't have to worry
			// about starting an M to service timers. (Note that
			// sleep for timeSleepUntil above simply ensures sysmon
			// starts running again when that timer expiration may
			// cause Go code to run again).
			//
			// However, netbsd has a kernel bug that sometimes
			// misses netpollBreak wake-ups, which can lead to
			// unbounded delays servicing timers. If we detect this
			// overrun, then startm to get something to handle the
			// timer.
			//
			// See issue 42515 and
			// https://gnats.netbsd.org/cgi-bin/query-pr-single.pl?number=50094.
			if next := timeSleepUntil(); next < now {
				startm(nil, false, false)
			}
		}
		if scavenger.sysmonWake.Load() != 0 {                                                       /// 有清理器的唤醒信号,唤醒清理器
			// Kick the scavenger awake if someone requested it.
			scavenger.wake()
		}
		// retake P's blocked in syscalls
		// and preempt long running G's
		if retake(now) != 0 {                                                                       /// 重新获取被阻塞在系统调用中的P,并抢占运行时间过长的G
			idle = 0
		} else {
			idle++
		}
		// check if we need to force a GC
		if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && forcegc.idle.Load() {       /// 是否需要进行强制的GC
			lock(&forcegc.lock)
			forcegc.idle.Store(false)
			var list gList
			list.push(forcegc.g)
			injectglist(&list)
			unlock(&forcegc.lock)
		}
		if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {               /// 启用了schedtrace,并且到达了指定的时间间隔,记录调度追踪信息
			lasttrace = now
			schedtrace(debug.scheddetail > 0)
		}
		unlock(&sched.sysmonlock)                                                                   /// 释放sysmonlock锁,完成一次系统监控周期的循环
	}
}

`linux`中5种`IO`模型

Five different types IO in Linux

阻塞I/O、非阻塞I/O、I/O多路复用、信号驱动I/O、异步I/O

以数据读取为例,分为:1. 准备数据 2. 将数据从内核复制到应用

阻塞I/O

在阻塞 I/O 模式下,应用程序执行 I/O 操作时,会一直阻塞,直到操作完成

第1步的准备数据:阻塞
第2步的复制数据:阻塞

bio

非阻塞I/O

在非阻塞 I/O 模式下,应用程序执行 I/O 操作时,不会被阻塞,即使数据不可用也会立即返回
如果操作因为数据不可用而无法立即完成,它将返回一个错误代码(通常是 EAGAIN 或 EWOULDBLOCK),而不会阻塞应用程序的执行

第1步的准备数据:不阻塞,请求即立刻返回(无论是否准备好数据)
第2步的复制数据:阻塞
nbio

I/O多路复用

多路复用 I/O 模式允许应用程序同时监视多个文件描述符,以确定哪些文件描述符上有数据可读或可写

第1步的准备数据:监听多个fd,阻塞(可选)在epoll_wait
第2步的复制数据:阻塞

mulio

信号驱动 I/O

当数据可读或可写时,内核会向应用程序发送一个信号,应用程序可以捕获该信号并执行相应的操作

第1步的准备数据:不阻塞
第2步的复制数据:阻塞

sigio

异步 I/O

异步 I/O 模式允许应用程序在发起 I/O 操作后继续执行其他任务,而不必等待操作完成
当操作完成时,内核会通知应用程序,应用程序可以继续处理操作的结果

第1步的准备数据:不阻塞
第2步的复制数据:不阻塞(内核完成复制操作,也就是说其他方式是内核通知应用启动IO操作,而异步IO是内核通知应用IO操作已完成)

aio

`go runtime`的GMP

GMP

go version go1.22.2 linux/amd64

G

G 是被调度的对象,有状态、栈

status

src/runtime/runtime2.go:18

// _Grunnable: 创建一个可运行的 G
func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g {}

// _Grunnable -> _Grunning: 将G调度在M上运行
func execute(gp *g, inheritTime bool) {}

// _Grunning -> _Grunnable: 当前 G 让出当前 CPU,允许其他 G 使用
func goyield() {} // 调用 goyield_m
func goyield_m(gp *g) {}

// _Grunning -> _Gdead: G 表示的任务已经完成
// 
// 每个 G 的调用栈都构造得像是 goexit 调用了该 G 的入口函数,
// 因此当入口函数返回时,它会返回到 goexit,
// goexit 会调用 goexit1 来执行实际的退出操作
func goexit(neverCallThisFunction)
func goexit1() {} // 调用 goexit0
func goexit0(gp *g) {} // 调用 gdestroy
func gdestroy(gp *g) {} // _Grunning -> _Gdead

// _Grunning -> _Gsyscall: 执行 G 的时候,进入系统调用状态
// 
// 此时 G 不在使用 CPU 资源
func entersyscall() {} // 调用 reentersyscall
func reentersyscall(pc, sp uintptr) {}
//
func entersyscallblock() {} // 同上,但是进入了一个阻塞的系统调用

// _Gsyscall -> _Grunning: 
// _Gsyscall -> _Grunnable: 
// 
// 退出系统调用状态:
// 调用 exitsyscallfast 为true,则 _Gsyscall -> _Grunning
// 否则调用 exitsyscall0,则 _Gsyscall -> _Grunnable
func exitsyscall() {}

// _Grunning -> _Gwaiting: 将 G 设置为等待状态
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceReason traceBlockReason, traceskip int) {} // 调用 park_m
func park_m(gp *g) {} // _Grunning -> _Gwaiting

// _Gwaiting -> _Grunnable: 将 G 设置为可运行状态
func goready(gp *g, traceskip int) {} // 调用 ready
func ready(gp *g, traceskip int, next bool) {}
---
title: G status
---
stateDiagram-v2
	direction LR

	gidle   : _Gidle     
	grunable: _Grunnable 
	grunning: _Grunning   
	gsyscall: _Gsyscall   
	gwaiting: _Gwaiting   
	gdead   : _Gdead      

	[*]      --> gidle
	gdead    --> [*]

	gidle    --> grunable: newproc1

	grunable --> grunning: execute
	grunning --> grunable: goyield

	grunning --> gsyscall: entersyscall/entersyscallblock

	state gsyscall_state <<choice>>
	gsyscall --> gsyscall_state
	gsyscall_state --> grunning: exitsyscall,exitsyscallfast
	gsyscall_state --> grunable: exitsyscall,exitsyscall0

	grunning --> gwaiting: gopark
	gwaiting --> grunable: goready

	grunning --> gdead: goexit

P

P是逻辑CPU,主要保存着可执行的G队列

status

src/runtime/runtime2.go:110

// _Pidle:
//
// 所有的 P 被设置为 _Pidle 状态
func procresize(nprocs int32) *p {}

// _Pidle -> _Prunning:
//
// 将 P 和 M 绑定,运行 G 或调度器
func acquirep(pp *p) {} // 调用 wirep
func wirep(pp *p) {} // pp.status = _Prunning

// _Prunning -> _Pidle:
// 
// 将 P 和 M 解绑
func releasep() *p {} // 调用 releasepNoTrace
func releasepNoTrace() *p {} // pp.status = _Pidle

// _Pdead
func procresize(nprocs int32) *p {} // 重新设置 P 数量时,如果少于原 P 的数量,多余的 P 调用 destroy
func (pp *p) destroy() {} // pp.status = _Pdead

// _Psyscall
//
// P 关联了一个进行系统调用的M,也就是没有在运行G,此时可能被其他M窃取的
func entersyscall() {} // atomic.Store(&pp.status, _Psyscall)
---
title: P status
---
stateDiagram-v2

pidle   : _Pidle
prunning: _Prunning
psyscall: _Psyscall
pdead   : _Pdead

pidle --> prunning
prunning --> pidle

prunning --> psyscall

M

M 是OS线程,执行代码,必须绑定一个P才能获取到G进行执行

status

running/spinning: 运行状态或自旋状态

// 运行M: 新建M或唤醒M
// 新建M
func startm(pp *p, spinning, lockheld bool) {} 
func newm(fn func(), pp *p, id int64) {} // 调用 newosproc, 不同`OS`不同实现
func newosproc(mp *m) {}
// 唤醒M
func wakep() {} // 调用 startm
func startm(pp *p, spinning, lockheld bool) {} // 调用 mget
func mget() *m {} 

// 自旋M
func (mp *m) becomeSpinning() {}

// 从自旋中恢复
func resetspinning() {}

// 休眠 M
// M 没有 G 可以执行,也不是自旋状态,进入休眠状态
func stopm() {} // 调用 mput
func mput(mp *m) {}

TODO 状态图

`cloudwego`的`netpoll`包

netpoll

1. 监听net.Listenerfd

// 新建`EventLoop`,是一个事件驱动的调度器
// 处于连接管理、事件调度等...
eventLoop, _ := netpoll.NewEventLoop(onRequest OnRequest, ops ...Option)

// 通过绑定`net.Listener`提供服务
eventLoop.Serve(net.Listener)

// 优雅关闭服务
eventLoop.Shutdown(context.Context)
  1. 先看第一步返回的EventLoop
// eventloop.go

// 提供服务/优雅退出
//
// A EventLoop is a network server.
type EventLoop interface {
	// Serve registers a listener and runs blockingly to provide services, including listening to ports,
	// accepting connections and processing trans data. When an exception occurs or Shutdown is invoked,
	// Serve will return an error which describes the specific reason.
	Serve(ln net.Listener) error

	// Shutdown is used to graceful exit.
	// It will close all idle connections on the server, but will not change the underlying pollers.
	//
	// Argument: ctx set the waiting deadline, after which an error will be returned,
	// but will not force the closing of connections in progress.
	Shutdown(ctx context.Context) error
}
  1. 实现了EventLoop接口的eventLoop结构体的Serve方法:
// netpoll.go

type eventLoop struct {
	sync.Mutex
	opts *options
	svr  *server
	stop chan error
}

// Serve implements EventLoop.
func (evl *eventLoop) Serve(ln net.Listener) error {
	npln, err := ConvertListener(ln) // 转换 listener
	// ...
	// 新建一个server并运行
	evl.svr = newServer(npln, evl.opts, evl.quit)
	evl.svr.Run()
	// ...
}

// Shutdown signals a shutdown a begins server closing.
func (evl *eventLoop) Shutdown(ctx context.Context) error {}
  1. 看函数newServer新建的server,和(*server).Run方法:
// netpoll_server.go

type server struct {
	operator    FDOperator
	ln          Listener
	opts        *options
	onQuit      func(err error)
	connections sync.Map // key=fd, value=connection
}

// Run this server.
func (s *server) Run() (err error) {
	s.operator = FDOperator{             // FDOperator: 在 fd 上操作的集合
		FD:     s.ln.Fd(),               // Listener 的 fd
		OnRead: s.OnRead, 
		OnHup:  s.OnHup,
	}
	s.operator.poll = pollmanager.Pick() // pollmanager(在 init 函数初始化)管理多个(每20个cpu一个) pollers,这里获取一个poll(用于监听 fd)
	err = s.operator.Control(PollReadable)
	// ...
}
  1. 看结构体FDOperatorControl方法,参数为PollReadable
// fd_operator.go

// FDOperator is a collection of operations on file descriptors.
type FDOperator struct {
	// ...
}

func (op *FDOperator) Control(event PollEvent) error {
	if event == PollDetach && atomic.AddInt32(&op.detached, 1) > 1 {
		return nil
	}
	return op.poll.Control(op, event) // 调用 poll 的 Control 方法, poll是一个接口Poll,具体实现是结构体 defaultPoll
}
  1. 看接口Poll的具体实现defaultPollControl方法,参数为FDOperatorPollReadable
// poll.go

// Poll monitors fd(file descriptor), calls the FDOperator to perform specific actions,
// and shields underlying differences. On linux systems, poll uses epoll by default,
// and kevent by default on bsd systems.
type Poll interface {
	// ...

	// Control the event of file descriptor and the operations is defined by PollEvent.
	Control(operator *FDOperator, event PollEvent) error

	// ...
}
// poll_default_linux.go

type defaultPoll struct {
	// ...
}

// Control implements Poll.
func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
	// ...
	p.setOperator(unsafe.Pointer(&evt.data), operator)                                              /// 将 FDOperator 设置为事件的自定义数据,在触发事件的时候可以取出
	switch event {
	case PollReadable: // server accept a new connection and wait read
		operator.inuse()                                                                            /// 设置状态,表示 FDOperator 正在使用
		op, evt.events = syscall.EPOLL_CTL_ADD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR /// 设置 EPOLL_CTL 系统调用参数:增加监听,监听可读、对端关闭...
	// ...
	}
	return EpollCtl(p.fd, op, operator.FD, &evt)                                                    /// 调用 EPOLL_CTL 系统调用
}
// sys_epoll_linux.go

// 封装的 SYS_EPOLL_CTL 系统调用函数
//
// EpollCtl implements epoll_ctl.
func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) {
	_, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0)
	if err == syscall.Errno(0) {
		err = nil
	}
	return err
}

2. poll运行:轮询是否有就绪事件

上一段第5点提到:(*server).Run方法内会通过pollmanager.Pick()选择一个poll,方法内已经将所有的poll运行起来了

// poll_manager.go

// LoadBalance is used to do load balancing among multiple pollers.
// a single poller may not be optimal if the number of cores is large (40C+).
type manager struct {
	numLoops int32
	status   int32       // 0: uninitialized, 1: initializing, 2: initialized
	balance  loadbalance // load balancing method
	polls    []Poll      // all the polls
}

// Pick will select the poller for use each time based on the LoadBalance.
func (m *manager) Pick() Poll {
START:
	// fast path
	if atomic.LoadInt32(&m.status) == managerInitialized {                                 /// 已经初始化完成,直接选择一个poll返回
		return m.balance.Pick()
	}
	// slow path
	// try to get initializing lock failed, wait others finished the init work, and try again
	if !atomic.CompareAndSwapInt32(&m.status, managerUninitialized, managerInitializing) { /// 尝试设置初始化状态,失败则重试
		runtime.Gosched()
		goto START
	}
	// adjust polls
	// m.Run() will finish very quickly, so will not many goroutines block on Pick.
	_ = m.Run()                                                                            /// 运行 manager, 会运行其管理的所有 poll

	if !atomic.CompareAndSwapInt32(&m.status, managerInitializing, managerInitialized) {   /// 尝试设置初始化状态
		// SetNumLoops called during m.Run() which cause CAS failed
		// The polls will be adjusted next Pick
	}
	return m.balance.Pick()                                                                /// 选择一个poll返回
}

// Run all pollers.
func (m *manager) Run() (err error) {
	defer func() {
		if err != nil {
			_ = m.Close()
		}
	}()

	numLoops := int(atomic.LoadInt32(&m.numLoops))      /// numLoops 的值在 init 函数内通过调用 newManager 函数进行设置了
	if numLoops == len(m.polls) {
		return nil
	}
	var polls = make([]Poll, numLoops)                  /// 初始化 poll 设置数量
	if numLoops < len(m.polls) {                        /// 小于原 poll 的个数,收缩
		// shrink polls
		copy(polls, m.polls[:numLoops])                 /// 保留numLoops个poll
		for idx := numLoops; idx < len(m.polls); idx++ {
			// close redundant polls
			if err = m.polls[idx].Close(); err != nil { /// 多余的关闭
				logger.Printf("NETPOLL: poller close failed: %v\n", err)
			}
		}
	} else {                                            /// 大于等于原 poll 的个数,收缩
		// growth polls
		copy(polls, m.polls)                            /// 复制原所有的 poll
		for idx := len(m.polls); idx < numLoops; idx++ {/// 需要新增的 poll
			var poll Poll
			poll, err = openPoll()                      /// 新增一个 poll
			if err != nil {
				return err
			}
			polls[idx] = poll
			go poll.Wait()                              /// 运行 poll
		}
	}
	m.polls = polls

	// LoadBalance must be set before calling Run, otherwise it will panic.
	m.balance.Rebalance(m.polls)
	return nil
}

新建poll:

// poll_default_linux.go

func openPoll() (Poll, error) {
	return openDefaultPoll()
}

func openDefaultPoll() (*defaultPoll, error) {
	var poll = new(defaultPoll)
	// ...
	var p, err = EpollCreate(0)                                     /// 创建一个 epoll
	// ...
	poll.fd = p                                                     /// 保存 epoll 的 fd

	var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0)  /// 事件通知,返回`事件fd`
	// ...

	// ...
	poll.Handler = poll.handler                                     /// 注册处理就绪事件的函数`(p *defaultPoll) handler`
	poll.wop = &FDOperator{FD: int(r0)}                             /// 用于 epoll_wait 的时候唤醒

	if err = poll.Control(poll.wop, PollReadable); err != nil {     /// 上一段分析过,增加对`事件fd`的监听
		// ...
		return nil, err
	}

	// ...
	return poll, nil
}
// sys_epoll_linux.go

// 创建一个 epoll
// 
// EpollCreate implements epoll_create1.
func EpollCreate(flag int) (fd int, err error) {
	var r0 uintptr
	r0, _, err = syscall.RawSyscall(syscall.SYS_EPOLL_CREATE1, uintptr(flag), 0, 0)
	if err == syscall.Errno(0) {
		err = nil
	}
	return int(r0), err
}

运行poll:

// poll.go

// Poll monitors fd(file descriptor), calls the FDOperator to perform specific actions,
// and shields underlying differences. On linux systems, poll uses epoll by default,
// and kevent by default on bsd systems.
type Poll interface {
	// Wait will poll all registered fds, and schedule processing based on the triggered event.
	// The call will block, so the usage can be like:
	//
	//  go wait()
	//
	Wait() error

	//...
}
// poll_default_linux.go

type defaultPoll struct {
	// ...
}

// Wait implements Poll.
func (p *defaultPoll) Wait() (err error) {
	// ...
	// wait
	for {
		// ...
		n, err = EpollWait(p.fd, p.events, msec) /// 调用 epoll_wait,将就绪的事件保存在 p.events 中;参数 msec 表示是否阻塞调用 -1 为永久阻塞;返回值n表示就绪事件数量
		// ...
		if n <= 0 {                              /// 发生错误
			msec = -1
			runtime.Gosched()
			continue
		}
		msec = 0
		if p.Handler(p.events[:n]) {             /// 处理就绪的事件,处理函数在新建 poll 的时候注册,即`(p *defaultPoll) handler`
			return nil
		}
		// ...
	}
}
// sys_epoll_linux.go

// 对 EPOLL_WAIT 的封装
// 参数 msec 决定是否阻塞,如果阻塞,则需要调用 Syscall6,会通知 runtime 调度当前 P
//
// EpollWait implements epoll_wait.
func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) {
	var r0 uintptr
	var _p0 = unsafe.Pointer(&events[0])
	if msec == 0 {
		r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0)
	} else {
		r0, _, err = syscall.Syscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0)
	}
	if err == syscall.Errno(0) {
		err = nil
	}
	return int(r0), err
}

3. 处理就绪事件

上一段的运行poll提到:(*defaultPoll).Wait方法内会通过Handler方法处理就绪的事件
此处理函数在新建 poll 的时候注册,即(p *defaultPoll) handler

func (p *defaultPoll) handler(events []epollevent) (closed bool) {
	var triggerRead, triggerWrite, triggerHup, triggerError bool
	var err error
	for i := range events {
		operator := p.getOperator(0, unsafe.Pointer(&events[i].data)) /// 获取 FDOperator,在调用`(*defaultPoll).Control`方法时被设置为事件的自定义数据
		// ...

		var totalRead int
		evt := events[i].events                                       /// 获取事件,进行判断:
		triggerRead = evt&syscall.EPOLLIN != 0                        /// 触发读事件
		triggerWrite = evt&syscall.EPOLLOUT != 0                      /// 触发写事件
		triggerHup = evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0   /// 触发挂起或对端关闭事件
		triggerError = evt&syscall.EPOLLERR != 0                      /// 触发错误

		// trigger or exit gracefully
		if operator.FD == p.wop.FD {                                  /// 事件通知 fd
			// must clean trigger first
			syscall.Read(p.wop.FD, p.buf)
			atomic.StoreUint32(&p.trigger, 0)
			// if closed & exit
			if p.buf[0] > 0 {
				syscall.Close(p.wop.FD)
				syscall.Close(p.fd)                                   /// 关闭
				operator.done()
				return true
			}
			operator.done()
			continue
		}

		if triggerRead {                                                 /// 1. 触发读
			if operator.OnRead != nil {
				// for non-connection                                    /// 非连接
				operator.OnRead(p)                                       /// 调用 OnRead 方法
			} else if operator.Inputs != nil {
				// for connection                                        /// 连接
				bs := operator.Inputs(p.barriers[i].bs)                  /// 调用 Inputs 方法
				if len(bs) > 0 {
					n, err := ioread(operator.FD, bs, p.barriers[i].ivs) /// 读取数据
					operator.InputAck(n)                                 /// 确认读取完
					totalRead += n
					if err != nil {
						p.appendHup(operator)
						continue
					}
				}
			} else {
				logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
			}
		}
		if triggerHup {                                               /// 2. 触发挂起
			if triggerRead && operator.Inputs != nil {
				// read all left data if peer send and close
				var leftRead int
				// read all left data if peer send and close
				if leftRead, err = readall(operator, p.barriers[i]); err != nil && !errors.Is(err, ErrEOF) {
					logger.Printf("NETPOLL: readall(fd=%d)=%d before close: %s", operator.FD, total, err.Error())
				}
				totalRead += leftRead
			}
			// only close connection if no further read bytes
			if totalRead == 0 {
				p.appendHup(operator)
				continue
			}
		}
		if triggerError {                                             /// 3. 触发错误
			// Under block-zerocopy, the kernel may give an error callback, which is not a real error, just an EAGAIN.
			// So here we need to check this error, if it is EAGAIN then do nothing, otherwise still mark as hup.
			if _, _, _, _, err := syscall.Recvmsg(operator.FD, nil, nil, syscall.MSG_ERRQUEUE); err != syscall.EAGAIN {
				p.appendHup(operator)
			} else {
				operator.done()
			}
			continue
		}
		if triggerWrite {                                                                          /// 4. 触发写
			if operator.OnWrite != nil {
				// for non-connection                                                              /// 非连接
				operator.OnWrite(p)                                                                /// 调用 OnWrite 方法
			} else if operator.Outputs != nil {
				// for connection                                                                  /// 连接
				bs, supportZeroCopy := operator.Outputs(p.barriers[i].bs)                          /// 调用 Outputs 方法
				if len(bs) > 0 {
					// TODO: Let the upper layer pass in whether to use ZeroCopy.
					n, err := iosend(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) /// 发送数据
					operator.OutputAck(n)                                                          /// 确认发送完
					if err != nil {
						p.appendHup(operator)
						continue
					}
				}
			} else {
				logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
			}
		}
		operator.done()
	}
	// hup conns together to avoid blocking the poll.
	p.onhups()
	return false
}

重新看一下FDOperator结构体:

// FDOperator is a collection of operations on file descriptors.
type FDOperator struct {
	// FD is file descriptor, poll will bind when register.
	FD int

	// The FDOperator provides three operations of reading, writing, and hanging.
	// The poll actively fire the FDOperator when fd changes, no check the return value of FDOperator.
	OnRead  func(p Poll) error
	OnWrite func(p Poll) error
	OnHup   func(p Poll) error

	// The following is the required fn, which must exist when used, or directly panic.
	// Fns are only called by the poll when handles connection events.
	Inputs   func(vs [][]byte) (rs [][]byte)
	InputAck func(n int) (err error)

	// Outputs will locked if len(rs) > 0, which need unlocked by OutputAck.
	Outputs   func(vs [][]byte) (rs [][]byte, supportZeroCopy bool)
	OutputAck func(n int) (err error)

	// poll is the registered location of the file descriptor.
	poll Poll

	// protect only detach once
	detached int32

	// private, used by operatorCache
	next  *FDOperator
	state int32 // CAS: 0(unused) 1(inuse) 2(do-done)
	index int32 // index in operatorCache
}
OnRead
// netpoll_server.go

type server struct {
	// ...
}

// Run this server.
func (s *server) Run() (err error) {
	s.operator = FDOperator{
		FD:     s.ln.Fd(),
		OnRead: s.OnRead,           /// 注册 OnRead 方法
		OnHup:  s.OnHup,
	}
	// ...
}

// OnRead implements FDOperator.
func (s *server) OnRead(p Poll) error {
	// accept socket
	conn, err := s.ln.Accept()      /// 接受连接
	if err == nil {
		if conn != nil {
			s.onAccept(conn.(Conn)) /// 处理连接:存出连接池,并处理连接
		}
		// EAGAIN | EWOULDBLOCK if conn and err both nil
		return nil
	}
	logger.Printf("NETPOLL: accept conn failed: %v", err)

	// delay accept when too many open files
	// ...

	// shut down
	// ...

	return err
}
OnWrite
type netFD struct {
	// ...
}

func (c *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, retErr error) {
	// ...
	c.pd = newPollDesc(c.fd) /// 初始化 pd,注册 pd.onwrite 方法到OnWrite,在处理时间时被调用,通过 pd.writeTrigger 通知
	for {
		// Performing multiple connect system calls on a
		// non-blocking socket under Unix variants does not
		// necessarily result in earlier errors being
		// returned. Instead, once runtime-integrated network
		// poller tells us that the socket is ready, get the
		// SO_ERROR socket option to see if the connection
		// succeeded or failed. See issue 7474 for further
		// details.
		if err := c.pd.WaitWrite(ctx); err != nil { /// 阻塞,会通过 pd.writeTrigger 唤醒
			return nil, err
		}
		// ...
	}
}
Inputs, InputAck; Outputs, OutputAck

先看一下LinkBuffer的解析

// connection_impl.go

type connection struct {
	//... 
}

func (c *connection) init(conn Conn, opts *options) (err error) {
	// ...
	c.initFDOperator()
	// ...
}

func (c *connection) initFDOperator() {
	// ...
	op.Inputs, op.InputAck = c.inputs, c.inputAck
	op.Outputs, op.OutputAck = c.outputs, c.outputAck
	// ...
}
// connection_reactor.go

// inputs implements FDOperator.
func (c *connection) inputs(vs [][]byte) (rs [][]byte) {
	vs[0] = c.inputBuffer.book(c.bookSize, c.maxSize)
	return vs[:1]
}

// inputAck implements FDOperator.
func (c *connection) inputAck(n int) (err error) {
	if n <= 0 {
		c.inputBuffer.bookAck(0)
		return nil
	}

	// Auto size bookSize.
	if n == c.bookSize && c.bookSize < mallocMax {
		c.bookSize <<= 1
	}

	length, _ := c.inputBuffer.bookAck(n)
	if c.maxSize < length {
		c.maxSize = length
	}
	if c.maxSize > mallocMax {
		c.maxSize = mallocMax
	}

	var needTrigger = true
	if length == n { // first start onRequest
		needTrigger = c.onRequest()
	}
	if needTrigger && length >= int(atomic.LoadInt64(&c.waitReadSize)) {
		c.triggerRead(nil)
	}
	return nil
}

// outputs implements FDOperator.
func (c *connection) outputs(vs [][]byte) (rs [][]byte, supportZeroCopy bool) {
	if c.outputBuffer.IsEmpty() {
		c.rw2r()
		return rs, c.supportZeroCopy
	}
	rs = c.outputBuffer.GetBytes(vs)
	return rs, c.supportZeroCopy
}

// outputAck implements FDOperator.
func (c *connection) outputAck(n int) (err error) {
	if n > 0 {
		c.outputBuffer.Skip(n)
		c.outputBuffer.Release()
	}
	if c.outputBuffer.IsEmpty() {
		c.rw2r()
	}
	return nil
}

`go`中的`stack`

stack for G

  1. 编译器会在编译阶段会通过 cmd/internal/obj/x86.stacksplit 在调用函数前插入 runtime.morestack 或者 runtime.morestack_noctxt 函数;
  2. 运行时在创建新的 G 时会在 runtime.malg 中调用 runtime.stackalloc 申请新的栈内存,并在编译器插入的 runtime.morestack 中检查栈空间是否充足;
// runtime/runtime2.go

// Stack describes a Go execution stack.
// The bounds of the stack are exactly [lo, hi),
// with no implicit data structures on either side.
type stack struct {
	lo uintptr
	hi uintptr
}

// runtime/stack.go
//
// Stack frame layout
//
// (x86)
// +------------------+
// | args from caller |
// +------------------+ <- frame->argp
// |  return address  |
// +------------------+
// |  caller's BP (*) | (*) if framepointer_enabled && varp > sp
// +------------------+ <- frame->varp
// |     locals       |
// +------------------+
// |  args to callee  |
// +------------------+ <- frame->sp
//
// (arm)
// +------------------+
// | args from caller |
// +------------------+ <- frame->argp
// | caller's retaddr |
// +------------------+
// |  caller's FP (*) | (*) on ARM64, if framepointer_enabled && varp > sp
// +------------------+ <- frame->varp
// |     locals       |
// +------------------+
// |  args to callee  |
// +------------------+
// |  return address  |
// +------------------+ <- frame->sp
//
// varp > sp means that the function has a frame;
// varp == sp means frameless function.

初始化

// runtime/stack.go
// stackpool 表示全局栈缓存:分配小于 32KB 的栈空间
// stackLarge 表示大栈缓存:分配大于 32KB 的栈空间

// Global pool of spans that have free stacks.
// Stacks are assigned an order according to size.
//
//	order = log_2(size/FixedStack)
//
// There is a free list for each order.
var stackpool [_NumStackOrders]struct {
	item stackpoolItem
	_    [(cpu.CacheLinePadSize - unsafe.Sizeof(stackpoolItem{})%cpu.CacheLinePadSize) % cpu.CacheLinePadSize]byte
}

type stackpoolItem struct {
	_    sys.NotInHeap
	mu   mutex
	span mSpanList
}

// Global pool of large stack spans.
var stackLarge struct {
	lock mutex
	free [heapAddrBits - pageShift]mSpanList // free lists by log_2(s.npages)
}
// runtime/stack.go
// 初始化 stackpool 和 stackLarge
// 两者都与`mSpanList`有关,也就是`mspan`相关,可以认为 Go 的栈内存都是分配在堆上的

func stackinit() {
	if _StackCacheSize&_PageMask != 0 {
		throw("cache size must be a multiple of page size")
	}
	for i := range stackpool {
		stackpool[i].item.span.init()
		lockInit(&stackpool[i].item.mu, lockRankStackpool)
	}
	for i := range stackLarge.free {
		stackLarge.free[i].init()
		lockInit(&stackLarge.lock, lockRankStackLarge)
	}
}
// 使用全局进行分配的话,会造成锁竞争
// 栈与 P 关系密切,在每一个 P 增加栈内存缓存
type p struct {
	// ...
	mcache      *mcache
	// ...
}

type mcache struct {
	// ...
	stackcache [_NumStackOrders]stackfreelist
	// ...
}

type stackfreelist struct {
	list gclinkptr // linked list of free stacks
	size uintptr   // total size of stacks in list
}

分配

// runtime/stack.go

// 分配 n 字节的栈
// 1. 栈空间较小:使用全局缓存或P上缓存进行分配
// 2. 栈空间较大:使用大栈缓存 stackLarge 分配
// 3. 栈空间较大且 stackLarge 空间不足:从堆上申请内存
//
// stackalloc allocates an n byte stack.
//
// stackalloc must run on the system stack because it uses per-P
// resources and must not split the stack.
//
//go:systemstack
func stackalloc(n uint32) stack {
	// Stackalloc must be called on scheduler stack, so that we
	// never try to grow the stack during the code that stackalloc runs.
	// Doing so would cause a deadlock (issue 1547).
	thisg := getg()
	// ...

	// Small stacks are allocated with a fixed-size free-list allocator.
	// If we need a stack of a bigger size, we fall back on allocating
	// a dedicated span.
	var v unsafe.Pointer
	if n < fixedStack<<_NumStackOrders && n < _StackCacheSize { // n < 32768 /// 1. 小于32KB字节
		order := uint8(0)
		n2 := n
		for n2 > fixedStack {
			order++
			n2 >>= 1
		}
		var x gclinkptr
		if stackNoCache != 0 || thisg.m.p == 0 || thisg.m.preemptoff != "" { /// 禁用 P 的栈缓存,当前P为空,当前G不可被抢占
			// thisg.m.p == 0 can happen in the guts of exitsyscall
			// or procresize. Just get a stack from the global pool.
			// Also don't touch stackcache during gc
			// as it's flushed concurrently.
			lock(&stackpool[order].item.mu)
			x = stackpoolalloc(order)                                        /// 从 stackpool 中分配
			unlock(&stackpool[order].item.mu)
		} else {                                                             /// 从 P 的缓存分配
			c := thisg.m.p.ptr().mcache
			x = c.stackcache[order].list
			if x.ptr() == nil {
				stackcacherefill(c, order)                                   /// 从 stackpool 中拿一些到 P 缓存
				x = c.stackcache[order].list
			}
			c.stackcache[order].list = x.ptr().next
			c.stackcache[order].size -= uintptr(n)
		}
		v = unsafe.Pointer(x)
	} else {                                                                 /// 2. 大于等于32KB
		var s *mspan
		npage := uintptr(n) >> _PageShift
		log2npage := stacklog2(npage)

		// Try to get a stack from the large stack cache.
		lock(&stackLarge.lock)
		if !stackLarge.free[log2npage].isEmpty() {                           /// 如果大栈缓存不为空,进行分配
			s = stackLarge.free[log2npage].first
			stackLarge.free[log2npage].remove(s)
		}
		unlock(&stackLarge.lock)

		lockWithRankMayAcquire(&mheap_.lock, lockRankMheap)

		if s == nil {                                                        /// 分配失败了
			// Allocate a new stack from the heap.
			s = mheap_.allocManual(npage, spanAllocStack)                    /// 直接从堆上分配一个
			if s == nil {
				throw("out of memory")
			}
			osStackAlloc(s)                                                  /// 初始化分配的栈内存
			s.elemsize = uintptr(n)
		}
		v = unsafe.Pointer(s.base())
	}
	// ...

	return stack{uintptr(v), uintptr(v) + uintptr(n)}
}

扩容

// runtime/asm_amd64.s

// Called during function prolog when more stack is needed.
//
// The traceback routines see morestack on a g0 as being
// the top of a stack (for example, morestack calling newstack
// calling the scheduler calling newm calling gc), so we must
// record an argument size. For that purpose, it has no arguments.
TEXT runtime·morestack(SB),NOSPLIT|NOFRAME,$0-0
	// ...
	CALL	runtime·newstack(SB)
	// ...
// runtime/stack.go

// 需要更多栈空间时调用:
// 1. 分配更大的空间
// 2. 将当前栈内容迁移到新的栈空间中
// 
// Called from runtime·morestack when more stack is needed.
// Allocate larger stack and relocate to new stack.
// Stack growth is multiplicative, for constant amortized cost.
//
// g->atomicstatus will be Grunning or Gscanrunning upon entry.
// If the scheduler is trying to stop this g, then it will set preemptStop.
//
// This must be nowritebarrierrec because it can be called as part of
// stack growth from other nowritebarrierrec functions, but the
// compiler doesn't check this.
//
//go:nowritebarrierrec
func newstack() {
	thisg := getg()
	// ...
	gp := thisg.m.curg
	// ...

	morebuf := thisg.m.morebuf                         /// 清空 morebuf
	thisg.m.morebuf.pc = 0
	thisg.m.morebuf.lr = 0
	thisg.m.morebuf.sp = 0
	thisg.m.morebuf.g = 0

	// NOTE: stackguard0 may change underfoot, if another thread
	// is about to try to preempt gp. Read it just once and use that same
	// value now and below.
	stackguard0 := atomic.Loaduintptr(&gp.stackguard0) /// 加载当前栈的 stackguard0

	// Be conservative about where we preempt.
	// We are interested in preempting user Go code, not runtime code.
	// If we're holding locks, mallocing, or preemption is disabled, don't
	// preempt.
	// This check is very early in newstack so that even the status change
	// from Grunning to Gwaiting and back doesn't happen in this case.
	// That status change by itself can be viewed as a small preemption,
	// because the GC might change Gwaiting to Gscanwaiting, and then
	// this goroutine has to wait for the GC to finish before continuing.
	// If the GC is in some way dependent on this goroutine (for example,
	// it needs a lock held by the goroutine), that small preemption turns
	// into a real deadlock.
	preempt := stackguard0 == stackPreempt             /// 检查是否需要抢占
	if preempt {
		if !canPreemptM(thisg.m) {                     /// 不能抢占,则继续运行
			// Let the goroutine keep running for now.
			// gp->preempt is set, so it will be preempted next time.
			gp.stackguard0 = gp.stack.lo + stackGuard
			gogo(&gp.sched) // never return
		}
	}

	// ...

	// Allocate a bigger segment and move the stack.
	oldsize := gp.stack.hi - gp.stack.lo
	newsize := oldsize * 2                             /// 分配新栈空间,旧栈大小的两倍

	// Make sure we grow at least as much as needed to fit the new frame.
	// (This is just an optimization - the caller of morestack will
	// recheck the bounds on return.)
	if f := findfunc(gp.sched.pc); f.valid() {
		max := uintptr(funcMaxSPDelta(f))
		needed := max + stackGuard
		used := gp.stack.hi - gp.sched.sp
		for newsize-used < needed {
			newsize *= 2
		}
	}

	if stackguard0 == stackForceMove {
		// Forced stack movement used for debugging.
		// Don't double the stack (or we may quickly run out
		// if this is done repeatedly).
		newsize = oldsize
	}

	// ...

	// The goroutine must be executing in order to call newstack,
	// so it must be Grunning (or Gscanrunning).
	casgstatus(gp, _Grunning, _Gcopystack)

	// The concurrent GC will not scan the stack while we are doing the copy since
	// the gp is in a Gcopystack status.
	copystack(gp, newsize)                             /// 迁移旧栈内容到新栈道
	if stackDebug >= 1 {
		print("stack grow done\n")
	}
	casgstatus(gp, _Gcopystack, _Grunning)
	gogo(&gp.sched)                                    /// 继续运行
}


// Copies gp's stack to a new stack of a different size.
// Caller must have changed gp status to Gcopystack.
func copystack(gp *g, newsize uintptr) {
	old := gp.stack                                                                       /// 旧栈
	used := old.hi - gp.sched.sp                                                          /// 旧栈使用大小
	// Add just the difference to gcController.addScannableStack.
	// g0 stacks never move, so this will never account for them.
	// It's also fine if we have no P, addScannableStack can deal with
	// that case.
	gcController.addScannableStack(getg().m.p.ptr(), int64(newsize)-int64(old.hi-old.lo)) /// 调整GC可以扫描的栈大小

	// allocate new stack
	new := stackalloc(uint32(newsize))                                                    /// 分配新栈
	// ...

	// Compute adjustment.
	var adjinfo adjustinfo                                                                /// 记录栈调整信息
	adjinfo.old = old
	adjinfo.delta = new.hi - old.hi

	// Adjust sudogs, synchronizing with channel ops if necessary.
	ncopy := used
	if !gp.activeStackChans {                                                             /// 是否有chan指向当前栈
		if newsize < old.hi-old.lo && gp.parkingOnChan.Load() {
			// It's not safe for someone to shrink this stack while we're actively
			// parking on a channel, but it is safe to grow since we do that
			// ourselves and explicitly don't want to synchronize with channels
			// since we could self-deadlock.
			throw("racy sudog adjustment due to parking on channel")
		}
		adjustsudogs(gp, &adjinfo)                                                        /// 调整
	} else {
		// sudogs may be pointing in to the stack and gp has
		// released channel locks, so other goroutines could
		// be writing to gp's stack. Find the highest such
		// pointer so we can handle everything there and below
		// carefully. (This shouldn't be far from the bottom
		// of the stack, so there's little cost in handling
		// everything below it carefully.)
		adjinfo.sghi = findsghi(gp, old)

		// Synchronize with channel ops and copy the part of
		// the stack they may interact with.
		ncopy -= syncadjustsudogs(gp, used, &adjinfo)                                     /// 调整,同步相关的chan
	}

	// Copy the stack (or the rest of it) to the new location
	memmove(unsafe.Pointer(new.hi-ncopy), unsafe.Pointer(old.hi-ncopy), ncopy)            /// 复制栈内容

	// Adjust remaining structures that have pointers into stacks.
	// We have to do most of these before we traceback the new
	// stack because gentraceback uses them.
	adjustctxt(gp, &adjinfo)                                                              /// 调整指向新栈的结构:上下文
	adjustdefers(gp, &adjinfo)                                                            /// 调整指向新栈的结构:defer 调用
	adjustpanics(gp, &adjinfo)                                                            /// 调整指向新栈的结构:panic 调用
	if adjinfo.sghi != 0 {
		adjinfo.sghi += adjinfo.delta
	}

	// Swap out old stack for new one
	gp.stack = new                                                                        /// g指向新栈
	gp.stackguard0 = new.lo + stackGuard // NOTE: might clobber a preempt request         /// 更新g的栈保护
	gp.sched.sp = new.hi - used                                                           /// 调整g栈sp
	gp.stktopsp += adjinfo.delta                                                          /// g栈最高sp

	// Adjust pointers in the new stack.
	var u unwinder
	for u.init(gp, 0); u.valid(); u.next() {                                              /// 遍历栈帧
		adjustframe(&u.frame, &adjinfo)                                                   /// 进行调整
	}

	// free old stack
	if stackPoisonCopy != 0 {
		fillstack(old, 0xfc)
	}
	stackfree(old)                                                                        /// 释放旧栈
}

缩容

// runtime/stack.go

// Maybe shrink the stack being used by gp.
//
// gp must be stopped and we must own its stack. It may be in
// _Grunning, but only if this is our own user G.
func shrinkstack(gp *g) {
	//...

	oldsize := gp.stack.hi - gp.stack.lo
	newsize := oldsize / 2                                                 /// 计算新栈大小:新栈的大小会是原始栈的一半
	// Don't shrink the allocation below the minimum-sized stack
	// allocation.
	if newsize < fixedStack {                                              /// 大小低于程序的最低限制 2KB,不进行缩容
		return
	}
	// Compute how much of the stack is currently in use and only
	// shrink the stack if gp is using less than a quarter of its
	// current stack. The currently used stack includes everything
	// down to the SP plus the stack guard space that ensures
	// there's room for nosplit functions.
	avail := gp.stack.hi - gp.stack.lo
	if used := gp.stack.hi - gp.sched.sp + stackNosplit; used >= avail/4 { /// 栈内存使用大于 1/4 时不进行缩容
		return
	}

	// ...

	copystack(gp, newsize)                                                 /// 复制栈
}

阅读`go`中的`runtime·rt0_go`函数

go version go1.22.2 linux/amd64

runtime·rt0_go

给runtime.g0分配栈空间,将g0写入TLS,将m0和g0互相绑定、调用 osinit, schedinit 进行初始化,新建goroutine,运行runtime.main,运行当前M

逐行代码分析

TEXT runtime·rt0_go(SB),NOSPLIT|NOFRAME|TOPFRAME,$0

TEXT: 定义代码段
runtime·rt0_go(SB): 定义函数
NOSPLIT|NOFRAME|TOPFRAME: 汇编指令Directives,不允许栈分裂,不使用帧指针,在栈的顶部
$0: 表示不使用栈空间

	// copy arguments forward on an even stack
	MOVQ	DI, AX		// argc
	MOVQ	SI, BX		// argv
	SUBQ	$(5*8), SP		// 3args 2auto
	ANDQ	$~15, SP
	MOVQ	AX, 24(SP)
	MOVQ	BX, 32(SP)

MOVQ DI, AX: 将参数 argc 复制到寄存器 AX 中
MOVQ SI, BX: 将参数 argv 复制到寄存器 BX 中
SUBQ $(5*8), SP: 在栈上分配 5 个 8 字节的空间,用于存储参数(后面的第2,3个操作)
ANDQ $~15, SP: 将栈指针 SP 按 16 字节边界对齐
MOVQ AX, 24(SP): 将参数 argc 写入栈上的偏移量为 24 的位置
MOVQ BX, 32(SP): 将参数 argv 写入栈上的偏移量为 32 的位置
NOTE:
栈是从高到低地址进行增长,ANDQ $~15, SP操作中的SP与16进行与操作,SP的低4位被清零,也就是栈空间变大,同时对齐了16字节边界

	// create istack out of the given (operating system) stack.
	// _cgo_init may update stackguard.
	MOVQ	$runtime·g0(SB), DI
	LEAQ	(-64*1024)(SP), BX
	MOVQ	BX, g_stackguard0(DI)
	MOVQ	BX, g_stackguard1(DI)
	MOVQ	BX, (g_stack+stack_lo)(DI)
	MOVQ	SP, (g_stack+stack_hi)(DI)

MOVQ $runtime·g0(SB), DI: 将 runtime.g0 的地址加载到寄存器 DI 中
LEAQ (-64*1024)(SP), BX: 计算栈顶减去 64KB 后的地址,并加载到寄存器 BX 中
MOVQ BX, g_stackguard0(DI): 将 g0 的 stackguard0 的值赋值为 BX 的内容
MOVQ BX, g_stackguard1(DI): 将 g0 的 stackguard1 的值赋值为 BX 的内容
MOVQ BX, (g_stack+stack_lo)(DI): 将 g0 的 stack 的 stack_lo 的值赋值为 BX 的内容
MOVQ SP, (g_stack+stack_hi)(DI): 将 g0 的 stack 的 stack_hi 的值赋值为 SP 的内容

	// find out information about the processor we're on
	MOVL	$0, AX
	CPUID
	CMPL	AX, $0
	JE	nocpuinfo

	CMPL	BX, $0x756E6547  // "Genu"
	JNE	notintel
	CMPL	DX, $0x49656E69  // "ineI"
	JNE	notintel
	CMPL	CX, $0x6C65746E  // "ntel"
	JNE	notintel
	MOVB	$1, runtime·isIntel(SB)

notintel:
	// Load EAX=1 cpuid flags
	MOVL	$1, AX
	CPUID
	MOVL	AX, runtime·processorVersionInfo(SB)

nocpuinfo:
	// if there is an _cgo_init, call it.
	MOVQ	_cgo_init(SB), AX
	TESTQ	AX, AX
	JZ	needtls
	// arg 1: g0, already in DI
	MOVQ	$setg_gcc<>(SB), SI // arg 2: setg_gcc
	MOVQ	$0, DX	// arg 3, 4: not used when using platform's TLS
	MOVQ	$0, CX

MOVL $0, AX: 将寄存器 AX 设置为 0。
CPUID: 执行 CPUID 指令,将结果存储在寄存器 AX、BX、CX 和 DX 中。
CMPL AX, $0: 比较寄存器 AX 和立即数 0。
JE nocpuinfo: 如果 AX 和 0 相等,则跳转到标签 nocpuinfo。
CMPL BX, $0x756E6547: 比较 BX 和字符串 "Genu" 的 ASCII 码。
JNE notintel: 如果 BX 不等于 "Genu" 的 ASCII 码,则跳转到标签 notintel。
CMPL DX, $0x49656E69: 比较 DX 和字符串 "ineI" 的 ASCII 码。
JNE notintel: 如果 DX 不等于 "ineI" 的 ASCII 码,则跳转到标签 notintel。
CMPL CX, $0x6C65746E: 比较 CX 和字符串 "ntel" 的 ASCII 码。
JNE notintel: 如果 CX 不等于 "ntel" 的 ASCII 码,则跳转到标签 notintel。
MOVB $1, runtime·isIntel(SB): 将地址为 runtime·isIntel 的变量设置为 1,表示当前的处理器是 Intel 架构。

标签notintel:
MOVL $1, AX: 将立即数 1 装载到寄存器 AX。
CPUID: 执行 CPUID 指令,获取处理器信息。
MOVL AX, runtime·processorVersionInfo(SB): 将 CPUID 的结果存储到 runtime·processorVersionInfo 变量中。

标签nocpuinfo:
MOVQ _cgo_init(SB), AX: 将 _cgo_init 函数的地址加载到寄存器 AX 中。
TESTQ AX, AX: 测试寄存器 AX 是否为 0。
JZ needtls: 如果 AX 为 0,则跳转到标签 needtls。
MOVQ $setg_gcc<>(SB), SI: 将 setg_gcc 函数的地址加载到寄存器 SI 中。
MOVQ $0, DX: 将 0 装载到寄存器 DX 中。
MOVQ $0, CX: 将 0 装载到寄存器 CX 中。

TODO: setg_gcc做了什么?

#ifdef GOOS_android
	MOVQ	$runtime·tls_g(SB), DX 	// arg 3: &tls_g
	// arg 4: TLS base, stored in slot 0 (Android's TLS_SLOT_SELF).
	// Compensate for tls_g (+16).
	MOVQ	-16(TLS), CX
#endif
#ifdef GOOS_windows
	MOVQ	$runtime·tls_g(SB), DX 	// arg 3: &tls_g
	// Adjust for the Win64 calling convention.
	MOVQ	CX, R9 // arg 4
	MOVQ	DX, R8 // arg 3
	MOVQ	SI, DX // arg 2
	MOVQ	DI, CX // arg 1
#endif

TODO: 针对安卓和windows一段逻辑

	CALL	AX

	// update stackguard after _cgo_init
	MOVQ	$runtime·g0(SB), CX
	MOVQ	(g_stack+stack_lo)(CX), AX
	ADDQ	$const_stackGuard, AX
	MOVQ	AX, g_stackguard0(CX)
	MOVQ	AX, g_stackguard1(CX)

CALL AX: 调用 _cgo_init 函数。
MOVQ $runtime·g0(SB), CX: 将 runtime·g0 的地址加载到寄存器 CX 中
MOVQ (g_stack+stack_lo)(CX), AX: 将 g_stack 中的 stack_lo 值加载到寄存器 AX 中
ADDQ $const_stackGuard, AX: 将常数 const_stackGuard 加到 AX 寄存器中的值上
MOVQ AX, g_stackguard0(CX): 将 AX 中的值写入 g_stackguard0
MOVQ AX, g_stackguard1(CX): 将 AX 中的值写入 g_stackguard1

#ifndef GOOS_windows
	JMP ok
#endif
needtls:
#ifdef GOOS_plan9
	// skip TLS setup on Plan 9
	JMP ok
#endif
#ifdef GOOS_solaris
	// skip TLS setup on Solaris
	JMP ok
#endif
#ifdef GOOS_illumos
	// skip TLS setup on illumos
	JMP ok
#endif
#ifdef GOOS_darwin
	// skip TLS setup on Darwin
	JMP ok
#endif
#ifdef GOOS_openbsd
	// skip TLS setup on OpenBSD
	JMP ok
#endif

#ifdef GOOS_windows
	CALL	runtime·wintls(SB)
#endif

	LEAQ	runtime·m0+m_tls(SB), DI
	CALL	runtime·settls(SB)

	// store through it, to make sure it works
	get_tls(BX)
	MOVQ	$0x123, g(BX)
	MOVQ	runtime·m0+m_tls(SB), AX
	CMPQ	AX, $0x123
	JEQ 2(PC)
	CALL	runtime·abort(SB)

JMP ok: 在某些操作系统,忽略TLS设置,直接跳转到标签 ok
CALL runtime·wintls(SB): 在 Windows 系统上设置 TLS
LEAQ runtime·m0+m_tls(SB), DI: 将 runtime·m0+m_tls 的地址加载到寄存器 DI 中
CALL runtime·settls(SB): 调用 runtime·settls 函数(参数在DI寄存器中,也就是m0.tls地址),设置 TLS
get_tls(BX): 获取 TLS
MOVQ $0x123, g(BX): 将 0x123 存储到 TLS 中
MOVQ runtime·m0+m_tls(SB), AX: 将 runtime·m0+m_tls 的地址加载到寄存器 AX 中
CMPQ AX, $0x123: 比较 AX 和 0x123
JEQ 2(PC): 如果相等,跳转到当前位置的下两条指令(也就是跳过了下面的abort函数调用)
CALL runtime·abort(SB): 调用 runtime·abort 函数

NOTE:
TLS(线程局部存储),存储在线程开始时分配,线程结束时回收,在Go汇编中,TLS是一个伪寄存器
get_tls(BX) 是一个macro:#define get_tls(r) MOVQ TLS, r,将TLS伪寄存器存储的值加载到某个寄存器,这里是BX,定义在src/runtime/go_tls.h
g(BX),同上,也是一个macro:#define g(r) 0(r)(TLS*1)off(reg)(TLS*1)是一种TLS偏移量的写法

ok:
	// set the per-goroutine and per-mach "registers"
	get_tls(BX)
	LEAQ	runtime·g0(SB), CX
	MOVQ	CX, g(BX)
	LEAQ	runtime·m0(SB), AX

	// save m->g0 = g0
	MOVQ	CX, m_g0(AX)
	// save m0 to g0->m
	MOVQ	AX, g_m(CX)

	CLD				// convention is D is always left cleared

get_tls(BX): 获取 TLS,并将其保存到寄存器 BX 中
LEAQ runtime·g0(SB), CX: 将 runtime·g0 的地址加载到寄存器 CX 中
MOVQ CX, g(BX): 将 CX 寄存器中的值(即 runtime·g0 的地址)写入 TLS
LEAQ runtime·m0(SB), AX: 将 runtime·m0 的地址加载到寄存器 AX 中
MOVQ CX, m_g0(AX): 将 CX 寄存器中的值(即 runtime·g0 的地址)写入 m_g0
MOVQ AX, g_m(CX): 将 AX 寄存器中的值(即 runtime·m0 的地址)写入 g_m

NOTE: 这里将runtime.g0写入TLS,将runtime.m0和runtime.g0互相绑定

	// Check GOAMD64 requirements
	// We need to do this after setting up TLS, so that
	// we can report an error if there is a failure. See issue 49586.
#ifdef NEED_FEATURES_CX
	MOVL	$0, AX
	CPUID
	CMPL	AX, $0
	JE	bad_cpu
	MOVL	$1, AX
	CPUID
	ANDL	$NEED_FEATURES_CX, CX
	CMPL	CX, $NEED_FEATURES_CX
	JNE	bad_cpu
#endif

#ifdef NEED_MAX_CPUID
	MOVL	$0x80000000, AX
	CPUID
	CMPL	AX, $NEED_MAX_CPUID
	JL	bad_cpu
#endif

#ifdef NEED_EXT_FEATURES_BX
	MOVL	$7, AX
	MOVL	$0, CX
	CPUID
	ANDL	$NEED_EXT_FEATURES_BX, BX
	CMPL	BX, $NEED_EXT_FEATURES_BX
	JNE	bad_cpu
#endif

#ifdef NEED_EXT_FEATURES_CX
	MOVL	$0x80000001, AX
	CPUID
	ANDL	$NEED_EXT_FEATURES_CX, CX
	CMPL	CX, $NEED_EXT_FEATURES_CX
	JNE	bad_cpu
#endif

#ifdef NEED_OS_SUPPORT_AX
	XORL    CX, CX
	XGETBV
	ANDL	$NEED_OS_SUPPORT_AX, AX
	CMPL	AX, $NEED_OS_SUPPORT_AX
	JNE	bad_cpu
#endif

#ifdef NEED_DARWIN_SUPPORT
	MOVQ	$commpage64_version, BX
	CMPW	(BX), $13  // cpu_capabilities64 undefined in versions < 13
	JL	bad_cpu
	MOVQ	$commpage64_cpu_capabilities64, BX
	MOVQ	(BX), BX
	MOVQ	$NEED_DARWIN_SUPPORT, CX
	ANDQ	CX, BX
	CMPQ	BX, CX
	JNE	bad_cpu
#endif

	CALL	runtime·check(SB)

TODO: 该部分代码主要用于检查 GOAMD64 的要求,并调用 runtime·check 函数进行检查

	MOVL	24(SP), AX		// copy argc
	MOVL	AX, 0(SP)
	MOVQ	32(SP), AX		// copy argv
	MOVQ	AX, 8(SP)
	CALL	runtime·args(SB)
	CALL	runtime·osinit(SB)
	CALL	runtime·schedinit(SB)

MOVL 24(SP), AX: 将参数 argc 复制到 AX 寄存器中。
MOVL AX, 0(SP): 将 AX 寄存器中的值写入栈顶位置。
MOVQ 32(SP), AX: 将参数 argv 复制到 AX 寄存器中。
MOVQ AX, 8(SP): 将 AX 寄存器中的值写入栈顶位置的下一个位置。
CALL runtime·args(SB): 调用 runtime·args 函数。
CALL runtime·osinit(SB): 调用 runtime·osinit 函数。
CALL runtime·schedinit(SB): 调用 runtime·schedinit 函数。

NOTE: 处理命令行参数,初始化程序
TODO: osinit做了什么? schedinit做了什么?

	// create a new goroutine to start program
	MOVQ	$runtime·mainPC(SB), AX		// entry
	PUSHQ	AX
	CALL	runtime·newproc(SB)
	POPQ	AX

	// start this M
	CALL	runtime·mstart(SB)

	CALL	runtime·abort(SB)	// mstart should never return
	RET

MOVQ $runtime·mainPC(SB), AX: 将 runtime·mainPC 的地址加载到寄存器 AX 中。
PUSHQ AX: 将 AX 寄存器中的值推入栈中。
CALL runtime·newproc(SB): 调用 runtime·newproc 函数,创建一个新的 goroutine。
POPQ AX: 将栈顶的值弹出,并存储到寄存器 AX 中。
CALL runtime·mstart(SB): 调用 runtime·mstart 函数,启动当前 M

NOTE: 创建一个新的 goroutine 来启动程序,$runtime·mainPC(SB)指向的是runtime.main
TODO: 具体分析下newproc和mstart函数

bad_cpu: // show that the program requires a certain microarchitecture level.
	MOVQ	$2, 0(SP)
	MOVQ	$bad_cpu_msg<>(SB), AX
	MOVQ	AX, 8(SP)
	MOVQ	$84, 16(SP)
	CALL	runtime·write(SB)
	MOVQ	$1, 0(SP)
	CALL	runtime·exit(SB)
	CALL	runtime·abort(SB)
	RET

	// Prevent dead-code elimination of debugCallV2, which is
	// intended to be called by debuggers.
	MOVQ	$runtime·debugCallV2<ABIInternal>(SB), AX
	RET

NOTE: 在 CPU 不满足要求的情况下,输出错误消息并退出程序

总结

  1. 首先,保存了命令行参数的地址,并对栈顶地址SP进行对齐
  2. 分配了一个 64KB 大小的栈空间给 runtime.g0
  3. 进行了一些 CPU 信息相关的检查
  4. 调用了 _cgo_init 函数,并在此之后更新了 stackguard
  5. 将当前线程的 TLS 地址存储到 m0.tls 中,并测试了读写是否成功
  6. 将 g0 的地址写入 TLS,并将 g0 与 m0 相互绑定
  7. 对于 GOAMD64 平台进行了一些检查
  8. 处理了命令行参数,调用了 osinitschedinit 函数,初始化了与操作系统相关的功能和调度器
  9. 调用了 newproc 函数创建一个新的 Goroutine,并运行了 runtime.main 函数,最后调用了 mstart 启动当前 M

阅读 `go runtime`中的 `systemstack`, `mcall`函数

// func systemstack(fn func())
TEXT runtime·systemstack(SB), NOSPLIT, $0-8 // NOSPLIT 不允许栈分割
	MOVQ	fn+0(FP), DI	// DI = fn  // 参数 fn 的指针储存到 DI 寄存器中
	get_tls(CX)
	MOVQ	g(CX), AX	// AX = g   // 获取当前 g
	MOVQ	g_m(AX), BX	// BX = m   // 获取当前 m

	CMPQ	AX, m_gsignal(BX)           // 检查当前 g 是不是gsignal,是则跳转到 noswitch
	JEQ	noswitch

	MOVQ	m_g0(BX), DX	// DX = g0  // 将当前 m 的 g0 储存到 DX 寄存器中
	CMPQ	AX, DX                      // 检查当前 g 是不是 g0,是则跳转到 noswitch
	JEQ	noswitch

	CMPQ	AX, m_curg(BX)              // 检查当前 g 是不是 m.curg,不是则跳转到 bad
	JNE	bad

	// Switch stacks.
	// The original frame pointer is stored in BP,
	// which is useful for stack unwinding.
	// Save our state in g->sched. Pretend to
	// be systemstack_switch if the G stack is scanned.
	CALL	gosave_systemstack_switch<>(SB) // 保存当前 goroutine 的状态,在切换栈之后恢复

	// switch to g0
	MOVQ	DX, g(CX)                     // DX 里是 g0, 将 g0 存储到 TLS 中
	MOVQ	DX, R14 // set the g register 
	MOVQ	(g_sched+gobuf_sp)(DX), SP    // 将 g0 的 栈指针 SP 设置到 SP 寄存器,也就是切换了栈

	// call target function
	MOVQ	DI, DX                        // 储存函数指针到 DX
	MOVQ	0(DI), DI                     // 将函数实际地址储存到 DI 
	CALL	DI                            // 调用函数

	// switch back to g
	get_tls(CX)
	MOVQ	g(CX), AX                  // g0
	MOVQ	g_m(AX), BX                // g0.m
	MOVQ	m_curg(BX), AX             // g
	MOVQ	AX, g(CX)                  // 将 g 存储到 TLS 中
	MOVQ	(g_sched+gobuf_sp)(AX), SP // 切换回 g 的栈
	MOVQ	(g_sched+gobuf_bp)(AX), BP 
	MOVQ	$0, (g_sched+gobuf_sp)(AX) // 清零,帮助垃圾回收器(GC)定位未使用的栈空间
	MOVQ	$0, (g_sched+gobuf_bp)(AX)
	RET

noswitch:
	// already on m stack; tail call the function
	// Using a tail call here cleans up tracebacks since we won't stop
	// at an intermediate systemstack.
	MOVQ	DI, DX    // 将函数指针移动到 DX 寄存器中
	MOVQ	0(DI), DI // 函数地址储存到 DI 寄存器中
	// The function epilogue is not called on a tail call.
	// Pop BP from the stack to simulate it.
	POPQ	BP        // 模拟函数调用的栈帧弹出操作
	JMP	DI        // 直接跳转函数处,执行代码:将当前函数的控制器移动到新函数上,且未保留当前函数的调用帧

bad:
	// Bad: g is not gsignal, not g0, not curg. What is it?
	MOVQ	$runtime·badsystemstack(SB), AX
	CALL	AX
	INT	$3

`go`中的`syscall`

go syscall

系统调用描述的是用户程序进入内核后执行的任务。
用户程序利用系统调用能执行许多操作:创建进程、网络、文件以及I/O操作等。

man page for syscalls(2) 列出了全部系统调用。

// syscall/syscall_linux.go

func Syscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno) {}
func Syscall6(trap, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2 uintptr, err Errno) {}
func RawSyscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno) {}
func RawSyscall6(trap, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2 uintptr, err Errno){}
// 不带 Raw 函数会在进入/退出系统调用的时候,通知 runtime
// 不通知 runtime,则没有办法通过调度,把这个 G 的 M 的 P 调度走
// runtime/internal/syscall/asm_linux_amd64.s

// func Syscall6(num, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2, errno uintptr)
//
// We need to convert to the syscall ABI.
//
// arg | ABIInternal | Syscall
// ---------------------------
// num | AX          | AX
// a1  | BX          | DI
// a2  | CX          | SI
// a3  | DI          | DX
// a4  | SI          | R10
// a5  | R8          | R8
// a6  | R9          | R9
//
// r1  | AX          | AX
// r2  | BX          | DX
// err | CX          | part of AX
//
// Note that this differs from "standard" ABI convention, which would pass 4th
// arg in CX, not R10.
TEXT ·Syscall6<ABIInternal>(SB),NOSPLIT,$0
	// a6 already in R9.
	// a5 already in R8.
	MOVQ	SI, R10 // a4
	MOVQ	DI, DX  // a3
	MOVQ	CX, SI  // a2
	MOVQ	BX, DI  // a1
	// num already in AX.
	SYSCALL
	CMPQ	AX, $0xfffffffffffff001
	JLS	ok
	NEGQ	AX
	MOVQ	AX, CX  // errno
	MOVQ	$-1, AX // r1
	MOVQ	$0, BX  // r2
	RET
ok:
	// r1 already in AX.
	MOVQ	DX, BX // r2
	MOVQ	$0, CX // errno
	RET

以上函数的实现都是汇编,按照linuxsyscall调用规范:
在汇编中把参数依次传入相关寄存器,并调用SYSCALL指令即可进入内核处理逻辑,系统调用执行完毕之后,返回值放在 RAX 中。

RDI RSI RDX R10 R8 R9 RAX
参数1 参数2 参数3 参数4 参数5 参数6 系统调用编号/返回值

SYSCALL 定义

syscall/syscall_linux_amd64.go

阻塞的系统调用,定义成:

//sys sendmsg(s int, msg *Msghdr, flags int) (n int, err error)

非阻塞的系统调用,定义成:

//sysnb socket(domain int, typ int, proto int) (fd int, err error)

根据这些注释,mksyscall.pl 脚本会生成对应的平台的具体实现:
标记 //sys 的系统调用使用的是 Syscall 或者 Syscall6,因为阻塞调用需要通知runtime调度p
标记 //sysnb 的系统调用使用的是 RawSyscall 或 RawSyscall6

runtime 中的 SYSCALL

提供给用户的 syscall 库,在使用时,会使 G 和 P 分别进入 Gsyscall 和 Psyscall 状态(使用 entersyscall/exitsyscall 与调度交互)
而 runtime 封装的这些 syscall 无论是否阻塞,都不会调用 entersyscall 和 exitsyscall

`linux` 中的`epoll`

epoll

epoll 是 Linux 下高效的 I/O 事件通知机制,用于监视多个文件描述符上的事件。
它允许程序等待多个 I/O 事件同时发生,而不必为每个 I/O 事件都创建一个线程或进程。

// 1. 创建 epoll 实例
// int epoll_create1(int flags);
int epoll_fd;
epoll_fd = epoll_create1(0);

// 2. 将标准输入文件描述符添加到 epoll 实例中
// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 参数 op 包含:EPOLL_CTL_ADD 增加、EPOLL_CTL_MOD 修改、EPOLL_CTL_DEL 删除;
// 参数 event 的 events 字段描述了epoll监听的事件类型:EPOLLIN 读、EPOLLOUT 写等...
// 参数 event 的 data 字段描述了 fd 关联的数据,在调用epoll_wait 同 fd 一起返回;
struct epoll_event event;
event.events = EPOLLIN;                // 监听读事件
event.data.fd = STDIN_FILENO; 
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, STDIN_FILENO, &event)


#define MAX_EVENTS 10                  // 事件数量
struct epoll_event events[MAX_EVENTS]; // 存储 epoll 事件
int num_events;
// 3. 等待事件发生
// int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
// 参数 events 保存触发的事件列表
// 参数 timeout 表示 epoll_wait 会阻塞的时间
num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);

// 4. 处理触发的事件列表
for (int i = 0; i < num_events; i++) {
    // event 同2中一致,可以获取2中 fd 关联的 data
}

// 5. 关闭 epoll 
close(epoll_fd);

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.