前段时间Google的Sameer Ajmani在Google I/O上做了关于Go的并发模式的介绍。Slides在此,youtube视频在此(注:上述链接均需翻墙)。

本篇的前提是对goroutine+channel的并发编程模式有基本的了解,建议能读懂下面这个经典ping-pong程序为好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main
import (
"fmt"
"time"
)
//定义一个结构
type Ball struct{ hits int }
func main() {
//创建一个可传输Ball的channel
table := make(chan *Ball)
//分别启动ping/pong的goroutine
go Player("Ping", table)
go Player("Pong", table)
//一个Ball进入channel,游戏开始
table <- new(Ball)
//“主”程序暂停1s,等待ping/pong的goroutine执行
time.Sleep(1 * time.Second)
//从channel取出Ball,游戏开始
<-table
//可通过引发异常,显示调用栈的详细信息
//panic("show me the stacks")
}
func Player(name string, table chan *Ball) {
for {
//channel取出Ball,并hits++
ball := <-table
ball.hits++
fmt.Println(name, ball.hits)
//暂停1ms
time.Sleep(1 * time.Millisecond)
//将Ball放回channel
table <- ball
}
}

ping-pong程序的执行过程,可以用下图来表示。

ping-pong程序执行过程

接下来主要说说Go的并发编程里的一些“文艺”使用:如何通信?如何周期性处理事件?如何取消执行?这些高级用法的支持,除了依赖我们上面看到的goroutine+channel外,还要依赖于Go的一个statement:select+case。它可以用来管理和监听多个channel,从而起到“多路复用”的效果。他的基本语法如下。

1
2
3
4
5
6
select {
case xc <- x:
// 向channel(xc)发送一个对象(x)
case y := <-yc:
// 从channel(yc)获取一个对象并赋值到变量(y)
}

下面我们以一个能持续从RSS获取资源项的例子来说明select的使用。 假设我们已经拥有下面这个接口所定义的功能:从一个RSS url获取资源项目(一次调用,获取一次,这个接口的模拟实现,见附1。)

1
2
3
type Fetcher interface {
Fetch() (items []Item, next time.Time, err error)//能从某个rss url获取它的资源项,并能同时返回下一次获取的时间next
}

`我们用下面这个接口来表示我们希望达到的功能:能从rss url上循环获取资源项,形成资源流的形式;循环获取功能,可以中止。

1
2
3
4
type Subscription interface {
Updates() <-chan Item//用channel来存放资源,即可实现流的显示
Close() error//关闭获取
}

先看一个这项功能的简单实现,熟悉多线程编程的,应该觉得很眼熟。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
type NavieSub struct {
closed bool
err error
updates chan Item
fetcher Fetcher
}
func (s *NavieSub) Close() error {
s.closed = true//设置关闭标识为true
return s.err
}
func (s *NavieSub) Updates() <-chan Item {
return s.updates//返回已经获取的资源项
}
func (s *NavieSub) loop() {//循环获取的方法实现
for {
if s.closed {//判断关闭标识
close(s.updates)//close是内置函数
return
}
items, next, err := s.fetcher.Fetch()//执行一次获取
if err != nil {
s.err = err
time.Sleep(10 * time.Second)
continue//出错时暂停10秒后再开始下次循环
}
for _, item := range items {//将获取的资源项写入,用于返回
s.updates <- item
}
if now := time.Now(); next.After(now) {//暂停到下次获取时间时,再开始下一次获取
time.Sleep(next.Sub(now))
}
}
}
func main() {
fetcher := &FakeFether{channel: "sharecore.info"}
s := &NavieSub{
fetcher: fetcher,
updates: make(chan Item),
}
go s.loop()//启动一个例程执行loop方法(与启动一个线程类似)
time.AfterFunc(3*time.Second, func() {
fmt.Println("closed", s.Close())
})
for item := range s.Updates() {
fmt.Println(item.Channel, item.Title)
}
}

`那以上的简单实现,会有什么问题呢?

首先,明显发现s.err和s.closed的访问是非同步的。

1
2
3
4
5
6
s.closed = true //设置关闭标识为true
if s.closed {//判断关闭标识
close(s.updates) //close是内置函数
return
}

`然后,我们看到s.updates的定义如下:

1
2
3
4
s := &NavieSub{
fetcher: fetcher,
updates: make(chan Item),//定义为没有buffer的channel,一个channel中同时只能有一个元素
}

`根据上面的定义,s.updates一次只能有一个item进入,当它没有其他goroutine从它里面取出元素时,下面这行代码会发生堵塞

1
s.updates <- item

`那以上问题我们有什么办法来避免呢?

对于第一个问题,自然想到的解决办法是加锁,但加锁的方式太不符合Go的“口味”了,同时,加锁的方式,在面对比较复杂的并发场景时,容易导致各类由“锁”引发的问题,这也是线程模型的“恶魔”了。

对于第二个问题,普通的办法时当然是将s.updates定义为一个带buffer的channel。但是buffer定义为多大才合适呢?当取出元素的routine太慢,还是一样可能会导致buffer满了,发生堵塞。

下面,我们来看看如何用一个比较“文艺”的办法来解决上面的问题吧。前面我们提到的select这时可以派上用场了。select的机制实现,差不多可以称为是“事件驱动”的,当然这里的“事件”并不是我们平常其他的事件驱动模型里常看到的I/O,网络请求/响应这样的“事件”,而是监听channel变更的“事件”。

将select与for循环结合起来,可以构造持续监听channel的结构,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func(s *AdvSub) loop(){
//可变状态的定义
for{
//设置不同的channel的监听case
select{
case <-c1:
// 读/写状态
case <-c2:
// 读/写状态
case y:<-c3:
// 读/写状态
}
}
}

我们先来看怎么利用for-select结构来解决第一个close同步的问题:

1
2
3
type AdvSub struct{
closing chan chan error
}

`如上代码所示,我们给sub定义,加了一个“状态”——closing,而我们就可以利用for-select结构来监听从closing的“状态变化”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
`//close方法,
func (s *AdvSub) Close() error{
errc:=make(chan error)
s.closing<-errc
return <-errc
}
var err error//错误状态信息
for{
select{
case errc:=<-s.closing//当Close方法调用时会触发
errc<-err//将错误信息放到Close返回的channel
close(s.updates)
return
}
}

采用这种方式,同步的实现是完全依赖且只依赖于channel的同步机制的,这是可以信赖的。

对于第二个问题,我们可以给sub在加一个状态,比如队列,用来保持它已经获到的资源项。如下代码:

1
2
3
4
5
6
7
var pending []item//存入新获取的资源(Fetch方法调时),并同时被消费(Update()被调用,取出元素时)
for{
select{
case s.updates<-pending[0]:
pending=pending[1:]//取出后将第一个元素移除,更新状态
}
}

可是你会发现,上面的代码这么修改后并不会如期的正常运行,而是出现如下的错误: s.updates获取错误

这是因为,一开始pending数组是空的,当执行s.updates<-pending[0]时,会抛出数组越界的异常。 我们可以采取下面的方式来解决这个问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
var pending []item
for{
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates
}
select{
case updates<-first:
pending=pending[1:]
}
}

上面,我们通过for-select结构,解决了同步的问题和堵塞的问题。在NavieSub的loop实现中,我们发现有time.Sleep的调用,对于time.Sleep的模式,我们其实也可以通过for-select结构来解决,这得益于time包下的许多方法/状态,也提供了返回channel的方式来便于监听,比如time.After(duration),time.Ticker等。所以我们就可以方便地将time.Sleep去掉,整合到for-select的结构中来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var pending []Item
var next time.Time
var err error
for{
var fetchDelay time.Duration //下次获取的延迟时间,默认是0(无延迟)
if now:=time.Now();next.After(now){
fetchDelay=next.Sub(now)//计算延迟时间
}
startFetch:=time.After(fetchDelay)//startFetch是一个channnel,时间到达后,会被写入
select {
case startFetch://到达下一次获取时间
var fetched []Item
fetched,next,err=s.fetcher.Fetch()
if err!=nil{
next=time.Now().Add(10*time.Sencond)
break
}
pending=append(pending,fetched...)
}
}

对于以上三种情况,我们总结起来的for-select实现代码就如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (s *AdvSub) loop() {
var err error
var next time.Time
var pending []Item
//综合了三种情况的for-select结构
for {
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
startFetch := time.After(fetchDelay)
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates
}
select {
case errc := <-s.closing://关闭
errc <- err
close(s.updates)
return
case <-startFetch://获取资源
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
case updates <- first://取出资源
pending = pending[1:]
}
}
}

最后,分别附上Fetcher接口的模拟实现,以及普通方式和for-select结构方式的完整实现代码。

附1:Fetcher接口的一个实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Item struct {
Title, Channel, GUID string
}
type FakeFether struct {
channel string
items []Item
}
func (f *FakeFether) Fetch() (items []Item, next time.Time, err error) {
now := time.Now()
next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond)
item := Item{
Channel: f.channel,
Title: fmt.Sprintf("Item %d", len(f.items)),
}
item.GUID = item.Channel + "/" + item.Title
f.items = append(f.items, item)
items = []Item{item}
return
}

##附2:普通方式完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package main
import (
"fmt"
"math/rand"
"time"
)
type Item struct {
Title, Channel, GUID string
}
type Fetcher interface {
Fetch() (tems []Item, next time.Time, err error)
}
type Subscription interface {
Updates() <-chan Item
Close() error
}
type FakeFether struct {
channel string
items []Item
}
func (f *FakeFether) Fetch() (items []Item, next time.Time, err error) {
now := time.Now()
next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond)
item := Item{
Channel: f.channel,
Title: fmt.Sprintf("Item %d", len(f.items)),
}
item.GUID = item.Channel + "/" + item.Title
f.items = append(f.items, item)
items = []Item{item}
return
}
type NaiveSub struct {
closed bool
err error
updates chan Item
fetcher Fetcher
}
func (s *NaiveSub) Close() error {
s.closed = true
return s.err
}
func (s *NaiveSub) Updates() <-chan Item {
return s.updates
}
func (s *NaiveSub) loop() {
for {
if s.closed {
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
s.err = err
time.Sleep(10 * time.Second)
continue
}
for _, item := range items {
s.updates <- item
}
if now := time.Now(); next.After(now) {
time.Sleep(next.Sub(now))
}
}
}
type NaiveMerge struct {
subs []Subscription
updates chan Item
}
func (m *NaiveMerge) Close() (err error) {
for _, sub := range m.subs {
if e := sub.Close(); err == nil && e != nil {
err = e
}
}
close(m.updates)
return
}
func (m *NaiveMerge) Updates() <-chan Item {
return m.updates
}
func Merge(subs ...Subscription) Subscription {
m := &NaiveMerge{
subs: subs,
updates: make(chan Item),
}
for _, sub := range subs {
go func(s Subscription) {
for it := range s.Updates() {
m.updates <- it
}
}(sub)
}
return m
}
func Subscripbe(fetcher Fetcher) Subscription {
s := &NaiveSub{
fetcher: fetcher,
updates: make(chan Item),
}
go s.loop()
return s
}
func main() {
fetcher1 := &FakeFether{channel: "sharecore.info"}
fetcher2 := &FakeFether{channel: "blog.golang.org"}
fetcher3 := &FakeFether{channel: "googleblog.blogspot.com"}
fetcher4 := &FakeFether{channel: "googledevelopers.blogspot.com"}
m := Merge(Subscripbe(fetcher1), Subscripbe(fetcher2), Subscripbe(fetcher3), Subscripbe(fetcher4))
time.AfterFunc(3*time.Second, func() {
fmt.Println("closed:", m.Close())
})
for item := range m.Updates() {
fmt.Println(item.Channel, item.Title)
}
panic("show me the stacks")
}

##附3:for-select结构的完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package main
import (
// "errors"
"fmt"
"math/rand"
"time"
)
type Item struct {
Title, Channel, GUID string
}
type Fetcher interface {
Fetch() (tems []Item, next time.Time, err error)
}
type Subscription interface {
Update() <-chan Item
Close() error
}
type FakeFether struct {
channel string
items []Item
}
func (f *FakeFether) Fetch() (items []Item, next time.Time, err error) {
now := time.Now()
next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond)
item := Item{
Channel: f.channel,
Title: fmt.Sprintf("Item %d", len(f.items)),
}
item.GUID = item.Channel + "/" + item.Title
f.items = append(f.items, item)
items = []Item{item}
return
}
type AdvSub struct {
closed bool
err error
updates chan Item
fetcher Fetcher
closing chan chan error
}
func (s *AdvSub) Close() error {
errc := make(chan error)
s.closing <- errc
err := <-errc
fmt.Println(err)
return err
}
func (s *AdvSub) Update() <-chan Item {
return s.updates
}
func (s *AdvSub) loop() {
var err error
var next time.Time
var pending []Item
for {
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
startFetch := time.After(fetchDelay)
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates
}
select {
case errc := <-s.closing:
errc <- err
close(s.updates)
return
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
case updates <- first:
pending = pending[1:]
}
}
}
type NaiveMerge struct {
subs []Subscription
updates chan Item
}
func (m *NaiveMerge) Close() (err error) {
for _, sub := range m.subs {
if e := sub.Close(); err == nil && e != nil {
err = e
}
}
close(m.updates)
return
}
func (m *NaiveMerge) Update() <-chan Item {
return m.updates
}
func Merge(subs ...Subscription) Subscription {
m := &NaiveMerge{
subs: subs,
updates: make(chan Item),
}
for _, sub := range subs {
go func(s Subscription) {
for it := range s.Update() {
m.updates <- it
}
}(sub)
}
return m
}
func Subscripbe(fetcher Fetcher) Subscription {
s := &AdvSub{
fetcher: fetcher,
updates: make(chan Item),
closing: make(chan chan error),
}
go s.loop()
return s
}
func main() {
fetcher1 := &FakeFether{channel: "sharecore.info"}
fetcher2 := &FakeFether{channel: "blog.golang.org"}
fetcher3 := &FakeFether{channel: "googleblog.blogspot.com"}
fetcher4 := &FakeFether{channel: "googledevelopers.blogspot.com"}
m := Merge(Subscripbe(fetcher1), Subscripbe(fetcher2), Subscripbe(fetcher3), Subscripbe(fetcher4))
time.AfterFunc(3*time.Second, func() {
fmt.Println("closed:", m.Close())
})
for item := range m.Update() {
fmt.Println(item.Channel, item.Title)
}
//panic("show me the stacks")
}