Random walk to my blog

my blog for sharing my knowledge,experience and viewpoint

0%

Golang context包源码分析

本文基于 go1.15.2 darwin/amd64

作用介绍

context的目的是实现主协程对子协程的控制,作用包括取消执行、设置超时时间、携带键值对等。
下面是一个使用context防止协程泄露的例子。不使用context,创建了goroutine之后没有办法取消,在程序退出之前,会一直打印”in go loop”。

1
2
3
4
5
6
7
8
9
10
// 协程泄露的例子
func TestRoutine1(t *testing.T) {
// 创建了go routine,没有办法取消
go func() {
for {
fmt.Print("in go loop\n")
}
}()
time.Sleep(2 * time.Second)
}

为了实现对子协程的控制,一般在goroutine中添加对context信号的等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func TestRoutine2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Print("context control\n")
return
default:
fmt.Print("in go loop\n")
time.Sleep(500 * time.Millisecond)
}
}
}(ctx)
time.Sleep(2 * time.Second)
cancel()
}

Context 内部结构

context包里面,有接口Context,对应四个实现emptyCtx, cancelCtx, timerCtx,valueCtx

Context接口

1
2
3
4
5
6
7
8
9
type Context interface {
Deadline() (deadline time.Time, ok bool)

Done() <-chan struct{}

Err() error

Value(key interface{}) interface{}
}
  • Deadline:获取到期时间。如果没有到期时间,ok返回false。
  • Done:返回一个channel,表示取消的信号。如果通道关闭则代表该 Context 已经被取消;如果返回的为 nil,则代表该 Context 是一个永远不会被取消的 Context。
  • Err:返回该 Context 被取消的原因。如果只使用 Context 包的 Context 类型的话,那么只可能返回 Canceled (代表被明确取消)或者 DeadlineExceeded (因超时而取消)。
  • Value:获取Context中的键值对。

emptyCtx

emptyCtxint类型的重新定义,它不是空的struct{}是因为每个这种类型的变量需要用不同的内存地址。
emptyCtx没有过期时间,不能被取消。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}

func (*emptyCtx) Done() <-chan struct{} {
return nil
}

func (*emptyCtx) Err() error {
return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}

emptyCtx的作用是作为Context树的根节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)

// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
return background
}

// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it's unclear which Context to use or it is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter).
func TODO() Context {
return todo
}

cancelCtx

cancelFunc

1
type CancelFunc func()

调用函数WithCancel,WithTimeout,WithDeadline都会返回新的子ContextCancelFunc类型的函数。
CancelFunc用于取消的操作。

cancenler接口

1
2
3
4
5
6
// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}

可以被取消的context都实现了这个接口。

cancelCtx内部结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type cancelCtx struct {
Context

mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}

func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}

cancelCtx每个字段作用:

  • 包含了一个Context类型的值,存储了当前cancelCtx的父Context的指针。
  • done作为取消信号的channel,子协程监听该通道了解到是否需要取消任务。
  • children存储了当前Context衍生的所有可取消类型的子Context
  • err 会被第一次取消的时候设置

cancelCtx实现了canceler接口。
下面看一下cancel(removeFromParent bool, err error)方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()

if removeFromParent {
removeChild(c.Context, c)
}
}
  • 如果cancelCtx(父协程)被调用cancel方法,cancel会调用childeren里面每个子Context(子协程)的cancel方法。

  • 如果当前的cancelCtx是一个子Context,它被取消了,其父 Context 的children中也就没有必要再存储该子Context了。这通过调用了removeChild来实现:根据存储的父 Context 向上一层层的找(由 parentCancelCtx 实现),如果父Context是已知的 cancelCtxtimerCtx类型就从children中删除,如果是valueCtx类型则继续向上层查找其父Context

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    // parentCancelCtx returns the underlying *cancelCtx for parent.
    // It does this by looking up parent.Value(&cancelCtxKey) to find
    // the innermost enclosing *cancelCtx and then checking whether
    // parent.Done() matches that *cancelCtx. (If not, the *cancelCtx
    // has been wrapped in a custom implementation providing a
    // different done channel, in which case we should not bypass it.)
    func parentCancelCtx(parent Context) (*cancelCtx, bool) {
    done := parent.Done()
    if done == closedchan || done == nil {
    return nil, false
    }
    p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
    if !ok {
    return nil, false
    }
    p.mu.Lock()
    ok = p.done == done
    p.mu.Unlock()
    if !ok {
    return nil, false
    }
    return p, true
    }

    // removeChild removes a context from its parent.
    func removeChild(parent Context, child canceler) {
    p, ok := parentCancelCtx(parent)
    if !ok {
    return
    }
    p.mu.Lock()
    if p.children != nil {
    delete(p.children, child)
    }
    p.mu.Unlock()
    }

WithCancel方法

WithCancel帮助创建一个子cancelCtx,并保证父Context取消时该新建的子cancelCtx也能被通知取消。

  • propagateCancel 根据传入的父Context值沿着树向上查找到cancelCtx类型的节点,将新建的子cancelCtx加入到该节点的 children中。如果发现父 Context 已经取消了,那么会立刻将当前新产生的子 Context 也取消掉。

  • 如果找不到cancelCtx类型的节点的话,那么就要新启一个协程等待父Context被取消的时候明确调用新产生的子 cancelCtx的取消函数,从而将parent和子cancelCtx组织成一树形结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
done := parent.Done()
if done == nil {
return // parent is never canceled
}

select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}

if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
// 找到父 cancelCtx
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
atomic.AddInt32(&goroutines, +1)
// 找不到父 cancelCtx,新建goroutine去等待
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}

Context树:
context_tree

timerCtx

timerCtx是超时自动取消的Context,内部使用cancelCtx实现取消功能,它增加了定时器Timer,定时调用cancle函数实现该功能。
但是如果其父 Context 也有超时过期的取消功能,且父 Context 的超时时间点在传入的时间点之前,那么就没有必要再使用 timerCtx 生成子 Context 了,使用 WithCancel 就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.

deadline time.Time
}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}

func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}

下面是WithTimeOutWithCancel函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}

// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}

valueCtx

valueCtx内部仍然使用Context存储父Context的指针,并用interface{}存储键值。
如果当前valueCtx找不到需要的key,会沿着树向上一直查找直到根节点,类似链表的搜索。

1
2
3
4
5
6
7
8
9
10
11
12
13
// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
Context
key, val interface{}
}

func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}

使用WithValue创建时,会判断key是否实现Comparable接口。如果没有实现,会触发panic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
//
// The provided key must be comparable and should not be of type
// string or any other built-in type to avoid collisions between
// packages using context. Users of WithValue should define their own
// types for keys. To avoid allocating when assigning to an
// interface{}, context keys often have concrete type
// struct{}. Alternatively, exported context key variables' static
// type should be a pointer or interface.
func WithValue(parent Context, key, val interface{}) Context {
if parent == nil {
panic("cannot create context from nil parent")
}
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}

注释里写了,key的类类型不应该是内置类型,以避免冲突。使用的时候应该自定义类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func main() {
ProcessRequest("jane","abc123")
}

type ctxKey int

const (
ctxUserID ctxKey = iota
ctxAuthToken
)

func UserID(c context.Context)string{
return c.Value(ctxUserID).(string)
}

func AuthToken(c context.Context)string{
return c.Value(ctxAuthToken).(string)
}

func ProcessRequest(userID,authToken string){
ctx := context.WithValue(context.Background(),ctxUserID,userID)
ctx = context.WithValue(ctx,ctxAuthToken,authToken)
HandleRequest(ctx)
}

func HandleRequest(ctx context.Context){
fmt.Printf("handling response for %v (%v)",
UserID(ctx),
AuthToken(ctx),
)
}

参考文档

我的公众号:lyp分享的地方

我的知乎专栏: https://zhuanlan.zhihu.com/c_1275466546035740672

我的博客:www.liangyaopei.com
Github Page: https://liangyaopei.github.io/