前段时间Google的Sameer Ajmani在Google I/O上做了关于Go的并发模式的介绍。Slides在此,youtube视频在此(注:上述链接均需翻墙)。

本篇的前提是对goroutine+channel的并发编程模式有基本的了解,建议能读懂下面这个经典ping-pong程序为好。

package main

import (
	"fmt"
	"time"
)
//定义一个结构
type Ball struct{ hits int }

func main() {
	//创建一个可传输Ball的channel
	table := make(chan *Ball)
	//分别启动ping/pong的goroutine 
	go Player("Ping", table)
	go Player("Pong", table)
	//一个Ball进入channel,游戏开始
	table <- new(Ball)
	//“主”程序暂停1s,等待ping/pong的goroutine执行
	time.Sleep(1 * time.Second)
	//从channel取出Ball,游戏开始
	<-table
	//可通过引发异常,显示调用栈的详细信息
	//panic("show me the stacks")
}

func Player(name string, table chan *Ball) {
	for {
	    //channel取出Ball,并hits++
	    ball := <-table
	    ball.hits++
	    fmt.Println(name, ball.hits)
	    //暂停1ms
	    time.Sleep(1 * time.Millisecond)
	    //将Ball放回channel
	    table <- ball
	}
} 

ping-pong程序的执行过程,可以用下图来表示。

ping-pong程序执行过程

接下来主要说说Go的并发编程里的一些“文艺”使用:如何通信?如何周期性处理事件?如何取消执行?这些高级用法的支持,除了依赖我们上面看到的goroutine+channel外,还要依赖于Go的一个statement:select+case。它可以用来管理和监听多个channel,从而起到“多路复用”的效果。他的基本语法如下。

select {
	case xc <- x:
		// 向channel(xc)发送一个对象(x)
	case y := <-yc:
		// 从channel(yc)获取一个对象并赋值到变量(y)
}

下面我们以一个能持续从RSS获取资源项的例子来说明select的使用。 假设我们已经拥有下面这个接口所定义的功能:从一个RSS url获取资源项目(一次调用,获取一次,这个接口的模拟实现,见附1。)

type Fetcher interface {
	Fetch() (items []Item, next time.Time, err error)//能从某个rss url获取它的资源项,并能同时返回下一次获取的时间next。
}

`我们用下面这个接口来表示我们希望达到的功能:能从rss url上循环获取资源项,形成资源流的形式;循环获取功能,可以中止。

type Subscription interface {
	Updates() <-chan Item//用channel来存放资源,即可实现流的显示
	Close() error//关闭获取
}

先看一个这项功能的简单实现,熟悉多线程编程的,应该觉得很眼熟。


type NavieSub struct {
	closed  bool
	err     error
	updates chan Item
	fetcher Fetcher
}

func (s *NavieSub) Close() error {
	s.closed = true//设置关闭标识为true
	return s.err
}
func (s *NavieSub) Updates() <-chan Item {
	return s.updates//返回已经获取的资源项
}
func (s *NavieSub) loop() {//循环获取的方法实现
	for {
	    if s.closed {//判断关闭标识
	        close(s.updates)//close是内置函数
	        return
	    }
	    items, next, err := s.fetcher.Fetch()//执行一次获取
	    if err != nil {
	        s.err = err
	        time.Sleep(10 * time.Second)
	        continue//出错时暂停10秒后再开始下次循环
	    }
	    for _, item := range items {//将获取的资源项写入,用于返回
	        s.updates <- item
	    }
	    if now := time.Now(); next.After(now) {//暂停到下次获取时间时,再开始下一次获取
	        time.Sleep(next.Sub(now))
	    }
	}
}

func main() {
	fetcher := &FakeFether{channel: "sharecore.info"}
	s := &NavieSub{
	    fetcher: fetcher,
	    updates: make(chan Item),
	}
	
	go s.loop()//启动一个例程执行loop方法(与启动一个线程类似)
	
	time.AfterFunc(3*time.Second, func() {
	    fmt.Println("closed", s.Close())
	})
	
	for item := range s.Updates() {
	    fmt.Println(item.Channel, item.Title)
	}
} 

`那以上的简单实现,会有什么问题呢?

首先,明显发现s.err和s.closed的访问是非同步的。

s.closed = true //设置关闭标识为true

if s.closed {//判断关闭标识
	        close(s.updates) //close是内置函数
	        return
	    }

`然后,我们看到s.updates的定义如下:

s := &NavieSub{
	    fetcher: fetcher,
	    updates: make(chan Item),//定义为没有buffer的channel,一个channel中同时只能有一个元素
	}

`根据上面的定义,s.updates一次只能有一个item进入,当它没有其他goroutine从它里面取出元素时,下面这行代码会发生堵塞

s.updates <- item

`那以上问题我们有什么办法来避免呢?

对于第一个问题,自然想到的解决办法是加锁,但加锁的方式太不符合Go的“口味”了,同时,加锁的方式,在面对比较复杂的并发场景时,容易导致各类由“锁”引发的问题,这也是线程模型的“恶魔”了。

对于第二个问题,普通的办法时当然是将s.updates定义为一个带buffer的channel。但是buffer定义为多大才合适呢?当取出元素的routine太慢,还是一样可能会导致buffer满了,发生堵塞。

下面,我们来看看如何用一个比较“文艺”的办法来解决上面的问题吧。前面我们提到的select这时可以派上用场了。select的机制实现,差不多可以称为是“事件驱动”的,当然这里的“事件”并不是我们平常其他的事件驱动模型里常看到的I/O,网络请求/响应这样的“事件”,而是监听channel变更的“事件”。

将select与for循环结合起来,可以构造持续监听channel的结构,如下:

func(s *AdvSub) loop(){
	//可变状态的定义
	for{
	    //设置不同的channel的监听case
	    select{
	    case <-c1:
	        // 读/写状态
	    case <-c2:
	        // 读/写状态
	    case y:<-c3:
	        // 读/写状态
	    }
	}
}

我们先来看怎么利用for-select结构来解决第一个close同步的问题:

type AdvSub struct{
	closing chan chan error
}

`如上代码所示,我们给sub定义,加了一个“状态”——closing,而我们就可以利用for-select结构来监听从closing的“状态变化”。

`//close方法,
func (s *AdvSub) Close() error{
	errc:=make(chan error)
	 	s.closing<-errc
	 	return <-errc
	}
	var err error//错误状态信息
	for{
		select{
	case errc:=<-s.closing//当Close方法调用时会触发
		errc<-err//将错误信息放到Close返回的channel
	close(s.updates)
	return 
	}
}

采用这种方式,同步的实现是完全依赖且只依赖于channel的同步机制的,这是可以信赖的。

对于第二个问题,我们可以给sub在加一个状态,比如队列,用来保持它已经获到的资源项。如下代码:

var pending []item//存入新获取的资源(Fetch方法调时),并同时被消费(Update()被调用,取出元素时)
for{
	select{
	    case s.updates<-pending[0]:
	        pending=pending[1:]//取出后将第一个元素移除,更新状态
	}
}

可是你会发现,上面的代码这么修改后并不会如期的正常运行,而是出现如下的错误: s.updates获取错误

这是因为,一开始pending数组是空的,当执行s.updates<-pending[0]时,会抛出数组越界的异常。 我们可以采取下面的方式来解决这个问题:

var pending []item
for{
	    var first Item
	    var updates chan Item
	    if len(pending) > 0 {
	        first = pending[0]
	        updates = s.updates
	    }
	    select{
	    case updates<-first:
	        pending=pending[1:]
	    }
}

上面,我们通过for-select结构,解决了同步的问题和堵塞的问题。在NavieSub的loop实现中,我们发现有time.Sleep的调用,对于time.Sleep的模式,我们其实也可以通过for-select结构来解决,这得益于time包下的许多方法/状态,也提供了返回channel的方式来便于监听,比如time.After(duration),time.Ticker等。所以我们就可以方便地将time.Sleep去掉,整合到for-select的结构中来

	var pending []Item
	var next time.Time
	var err error
	for{
	    var fetchDelay time.Duration //下次获取的延迟时间,默认是0(无延迟)
	    if now:=time.Now();next.After(now){
	        fetchDelay=next.Sub(now)//计算延迟时间    
	    }
	    startFetch:=time.After(fetchDelay)//startFetch是一个channnel,时间到达后,会被写入
	    select {
	        case startFetch://到达下一次获取时间
	        var fetched []Item
	        fetched,next,err=s.fetcher.Fetch()
	        if err!=nil{
	            next=time.Now().Add(10*time.Sencond)
	            break
	        }
	        pending=append(pending,fetched...)
	    }
	}

对于以上三种情况,我们总结起来的for-select实现代码就如下:

func (s *AdvSub) loop() {
	var err error
	var next time.Time
	var pending []Item
	//综合了三种情况的for-select结构
	for {
	    var fetchDelay time.Duration
	    if now := time.Now(); next.After(now) {
	        fetchDelay = next.Sub(now)
	    }
	    startFetch := time.After(fetchDelay)
	    var first Item
	    var updates chan Item
	    if len(pending) > 0 {
	        first = pending[0]
	        updates = s.updates
	    }
	    select {
	    case errc := <-s.closing://关闭
	        errc <- err
	        close(s.updates)
	        return
	    case <-startFetch://获取资源
	        var fetched []Item
	        fetched, next, err = s.fetcher.Fetch()
	        if err != nil {
	            next = time.Now().Add(10 * time.Second)
	            break
	        }
	        pending = append(pending, fetched...)
	    case updates <- first://取出资源
	        pending = pending[1:]
	    }
	}
}

最后,分别附上Fetcher接口的模拟实现,以及普通方式和for-select结构方式的完整实现代码。

附1:Fetcher接口的一个实现

type Item struct {
	Title, Channel, GUID string
}
type FakeFether struct {
	channel string
	items   []Item
}
func (f *FakeFether) Fetch() (items []Item, next time.Time, err error) {
	now := time.Now()
	next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond)
	item := Item{
	    Channel: f.channel,
	    Title:   fmt.Sprintf("Item %d", len(f.items)),
	}
	item.GUID = item.Channel + "/" + item.Title
	f.items = append(f.items, item)
	items = []Item{item}
	return
}

##附2:普通方式完整实现

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Item struct {
	Title, Channel, GUID string
}
type Fetcher interface {
	Fetch() (tems []Item, next time.Time, err error)
}
type Subscription interface {
	Updates() <-chan Item
	Close() error
}

type FakeFether struct {
	channel string
	items   []Item
}

func (f *FakeFether) Fetch() (items []Item, next time.Time, err error) {
	now := time.Now()
	next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond)
	item := Item{
	    Channel: f.channel,
	    Title:   fmt.Sprintf("Item %d", len(f.items)),
	}
	item.GUID = item.Channel + "/" + item.Title
	f.items = append(f.items, item)
	items = []Item{item}
	return
}

type NaiveSub struct {
	closed  bool
	err     error
	updates chan Item
	fetcher Fetcher
}

func (s *NaiveSub) Close() error {
	s.closed = true
	return s.err
}
func (s *NaiveSub) Updates() <-chan Item {
	return s.updates
}
func (s *NaiveSub) loop() {
	for {
	    if s.closed {
	        close(s.updates)
	        return
	    }
	    items, next, err := s.fetcher.Fetch()
	    if err != nil {
	        s.err = err
	        time.Sleep(10 * time.Second)
	        continue
	    }
	    for _, item := range items {
	        s.updates <- item
	    }
	    if now := time.Now(); next.After(now) {
	        time.Sleep(next.Sub(now))
	    }
	}
}

type NaiveMerge struct {
	subs    []Subscription
	updates chan Item
}

func (m *NaiveMerge) Close() (err error) {
	for _, sub := range m.subs {
	    if e := sub.Close(); err == nil && e != nil {
	        err = e
	    }
	}
	close(m.updates)
	return
}

func (m *NaiveMerge) Updates() <-chan Item {
	return m.updates
}
func Merge(subs ...Subscription) Subscription {
	m := &NaiveMerge{
	    subs:    subs,
	    updates: make(chan Item),
	}
	for _, sub := range subs {
	    go func(s Subscription) {
	        for it := range s.Updates() {
	            m.updates <- it
	        }
	    }(sub)
	}
	return m
}

func Subscripbe(fetcher Fetcher) Subscription {
	s := &NaiveSub{
	    fetcher: fetcher,
	    updates: make(chan Item),
	}
	go s.loop()
	return s
}

func main() {
	fetcher1 := &FakeFether{channel: "sharecore.info"}
	fetcher2 := &FakeFether{channel: "blog.golang.org"}
	fetcher3 := &FakeFether{channel: "googleblog.blogspot.com"}
	fetcher4 := &FakeFether{channel: "googledevelopers.blogspot.com"}
	m := Merge(Subscripbe(fetcher1), Subscripbe(fetcher2), Subscripbe(fetcher3), Subscripbe(fetcher4))
	time.AfterFunc(3*time.Second, func() {
	    fmt.Println("closed:", m.Close())
	})
	for item := range m.Updates() {
	    fmt.Println(item.Channel, item.Title)
	}
	panic("show me the stacks")
}

##附3:for-select结构的完整实现

package main

import (
	//  "errors"
	"fmt"
	"math/rand"
	"time"
)

type Item struct {
	Title, Channel, GUID string
}
type Fetcher interface {
	Fetch() (tems []Item, next time.Time, err error)
}
type Subscription interface {
	Update() <-chan Item
	Close() error
}

type FakeFether struct {
	channel string
	items   []Item
}

func (f *FakeFether) Fetch() (items []Item, next time.Time, err error) {
	now := time.Now()
	next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond)
	item := Item{
	    Channel: f.channel,
	    Title:   fmt.Sprintf("Item %d", len(f.items)),
	}
	item.GUID = item.Channel + "/" + item.Title
	f.items = append(f.items, item)
	items = []Item{item}
	return
}

type AdvSub struct {
	closed  bool
	err     error
	updates chan Item
	fetcher Fetcher
	closing chan chan error
}

func (s *AdvSub) Close() error {
	errc := make(chan error)
	s.closing <- errc
	err := <-errc
	fmt.Println(err)
	return err
}
func (s *AdvSub) Update() <-chan Item {
	return s.updates
}
func (s *AdvSub) loop() {
	var err error
	var next time.Time
	var pending []Item
	
	for {
	    var fetchDelay time.Duration
	    if now := time.Now(); next.After(now) {
	        fetchDelay = next.Sub(now)
	    }
	    startFetch := time.After(fetchDelay)
	    var first Item
	    var updates chan Item
	    if len(pending) > 0 {
	        first = pending[0]
	        updates = s.updates
	    }
	    select {
	    case errc := <-s.closing:
	        errc <- err
	        close(s.updates)
	        return
	    case <-startFetch:
	        var fetched []Item
	        fetched, next, err = s.fetcher.Fetch()
	        if err != nil {
	            next = time.Now().Add(10 * time.Second)
	            break
	        }
	        pending = append(pending, fetched...)
	    case updates <- first:
	        pending = pending[1:]
	    }
	}
}

type NaiveMerge struct {
	subs    []Subscription
	updates chan Item
}

func (m *NaiveMerge) Close() (err error) {
	for _, sub := range m.subs {
	    if e := sub.Close(); err == nil && e != nil {
	        err = e
	    }
	}
	close(m.updates)
	return
}

func (m *NaiveMerge) Update() <-chan Item {
	return m.updates
}
func Merge(subs ...Subscription) Subscription {
	m := &NaiveMerge{
	    subs:    subs,
	    updates: make(chan Item),
	}
	for _, sub := range subs {
	    go func(s Subscription) {
	        for it := range s.Update() {
	            m.updates <- it
	        }
	    }(sub)
	}
	return m
}

func Subscripbe(fetcher Fetcher) Subscription {
	s := &AdvSub{
	    fetcher: fetcher,
	    updates: make(chan Item),
	    closing: make(chan chan error),
	}
	go s.loop()
	return s
}

func main() {
	fetcher1 := &FakeFether{channel: "sharecore.info"}
	fetcher2 := &FakeFether{channel: "blog.golang.org"}
	fetcher3 := &FakeFether{channel: "googleblog.blogspot.com"}
	fetcher4 := &FakeFether{channel: "googledevelopers.blogspot.com"}
	m := Merge(Subscripbe(fetcher1), Subscripbe(fetcher2), Subscripbe(fetcher3), Subscripbe(fetcher4))
	time.AfterFunc(3*time.Second, func() {
	    fmt.Println("closed:", m.Close())
	})
	for item := range m.Update() {
	    fmt.Println(item.Channel, item.Title)
	}
	//panic("show me the stacks")
}