Go并发编程模式进阶
文章目录
前段时间Google的Sameer Ajmani在Google I/O上做了关于Go的并发模式的介绍。Slides在此,youtube视频在此(注:上述链接均需翻墙)。
本篇的前提是对goroutine+channel的并发编程模式有基本的了解,建议能读懂下面这个经典ping-pong程序为好。
|
|
ping-pong程序的执行过程,可以用下图来表示。
接下来主要说说Go的并发编程里的一些“文艺”使用:如何通信?如何周期性处理事件?如何取消执行?这些高级用法的支持,除了依赖我们上面看到的goroutine+channel外,还要依赖于Go的一个statement:select+case。它可以用来管理和监听多个channel,从而起到“多路复用”的效果。他的基本语法如下。
|
|
下面我们以一个能持续从RSS获取资源项的例子来说明select的使用。 假设我们已经拥有下面这个接口所定义的功能:从一个RSS url获取资源项目(一次调用,获取一次,这个接口的模拟实现,见附1。)
|
|
`我们用下面这个接口来表示我们希望达到的功能:能从rss url上循环获取资源项,形成资源流的形式;循环获取功能,可以中止。
|
|
先看一个这项功能的简单实现,熟悉多线程编程的,应该觉得很眼熟。
|
|
`那以上的简单实现,会有什么问题呢?
首先,明显发现s.err和s.closed的访问是非同步的。
|
|
`然后,我们看到s.updates的定义如下:
|
|
`根据上面的定义,s.updates一次只能有一个item进入,当它没有其他goroutine从它里面取出元素时,下面这行代码会发生堵塞。
|
|
`那以上问题我们有什么办法来避免呢?
对于第一个问题,自然想到的解决办法是加锁,但加锁的方式太不符合Go的“口味”了,同时,加锁的方式,在面对比较复杂的并发场景时,容易导致各类由“锁”引发的问题,这也是线程模型的“恶魔”了。
对于第二个问题,普通的办法时当然是将s.updates定义为一个带buffer的channel。但是buffer定义为多大才合适呢?当取出元素的routine太慢,还是一样可能会导致buffer满了,发生堵塞。
下面,我们来看看如何用一个比较“文艺”的办法来解决上面的问题吧。前面我们提到的select这时可以派上用场了。select的机制实现,差不多可以称为是“事件驱动”的,当然这里的“事件”并不是我们平常其他的事件驱动模型里常看到的I/O,网络请求/响应这样的“事件”,而是监听channel变更的“事件”。
将select与for循环结合起来,可以构造持续监听channel的结构,如下:
|
|
我们先来看怎么利用for-select结构来解决第一个close同步的问题:
|
|
`如上代码所示,我们给sub定义,加了一个“状态”——closing,而我们就可以利用for-select结构来监听从closing的“状态变化”。
|
|
采用这种方式,同步的实现是完全依赖且只依赖于channel的同步机制的,这是可以信赖的。
对于第二个问题,我们可以给sub在加一个状态,比如队列,用来保持它已经获到的资源项。如下代码:
|
|
可是你会发现,上面的代码这么修改后并不会如期的正常运行,而是出现如下的错误:
这是因为,一开始pending数组是空的,当执行s.updates<-pending[0]时,会抛出数组越界的异常。 我们可以采取下面的方式来解决这个问题:
|
|
上面,我们通过for-select结构,解决了同步的问题和堵塞的问题。在NavieSub的loop实现中,我们发现有time.Sleep的调用,对于time.Sleep的模式,我们其实也可以通过for-select结构来解决,这得益于time包下的许多方法/状态,也提供了返回channel的方式来便于监听,比如time.After(duration),time.Ticker等。所以我们就可以方便地将time.Sleep去掉,整合到for-select的结构中来
|
|
对于以上三种情况,我们总结起来的for-select实现代码就如下:
|
|
最后,分别附上Fetcher接口的模拟实现,以及普通方式和for-select结构方式的完整实现代码。
附1:Fetcher接口的一个实现
|
|
##附2:普通方式完整实现
|
|
##附3:for-select结构的完整实现
|
|
文章作者 justin huang
上次更新 2013-06-10