Linux文件系统十问

今天读到这篇文章:Linux文件系统十问,你知道吗?,作了个总结笔记: 1、机械磁盘随机读写时速度非常慢,操作系统是采用什么技巧来提高随机读写

怎么写Go的基准测试

Dave Cheney在他的blog写了一篇关于Go的基准测试编写的基本介绍(链接)。我以此为内容,整理输出内容。 对自己编写package编写基准测

春秋五霸之首——齐桓公的故事

昨晚重新翻阅了《史记·齐太公世家》,今天就跟大家重新说说春秋五霸之首齐桓公姜小白的故事吧。 在说齐桓公这位“明君”的故事前,我们得先说点“恶心

Go并发编程模式进阶

前段时间Google的Sameer Ajmani在Google I/O上做了关于Go的并发模式的介绍。Slides在此,youtube视频在此(注:上述链接均需翻墙)。

本篇的前提是对goroutine+channel的并发编程模式有基本的了解,建议能读懂下面这个经典ping-pong程序为好。

package main

import (
	"fmt"
	"time"
)
//定义一个结构
type Ball struct{ hits int }

func main() {
	//创建一个可传输Ball的channel
	table := make(chan *Ball)
	//分别启动ping/pong的goroutine 
	go Player("Ping", table)
	go Player("Pong", table)
	//一个Ball进入channel,游戏开始
	table <- new(Ball)
	//“主”程序暂停1s,等待ping/pong的goroutine执行
	time.Sleep(1 * time.Second)
	//从channel取出Ball,游戏开始
	<-table
	//可通过引发异常,显示调用栈的详细信息
	//panic("show me the stacks")
}

func Player(name string, table chan *Ball) {
	for {
	    //channel取出Ball,并hits++
	    ball := <-table
	    ball.hits++
	    fmt.Println(name, ball.hits)
	    //暂停1ms
	    time.Sleep(1 * time.Millisecond)
	    //将Ball放回channel
	    table <- ball
	}
} 

ping-pong程序的执行过程,可以用下图来表示。

ping-pong程序执行过程

接下来主要说说Go的并发编程里的一些“文艺”使用:如何通信?如何周期性处理事件?如何取消执行?这些高级用法的支持,除了依赖我们上面看到的goroutine+channel外,还要依赖于Go的一个statement:select+case。它可以用来管理和监听多个channel,从而起到“多路复用”的效果。他的基本语法如下。

select {
	case xc <- x:
		// 向channel(xc)发送一个对象(x)
	case y := <-yc:
		// 从channel(yc)获取一个对象并赋值到变量(y)
}

下面我们以一个能持续从RSS获取资源项的例子来说明select的使用。 假设我们已经拥有下面这个接口所定义的功能:从一个RSS url获取资源项目(一次调用,获取一次,这个接口的模拟实现,见附1。)

type Fetcher interface {
	Fetch() (items []Item, next time.Time, err error)//能从某个rss url获取它的资源项,并能同时返回下一次获取的时间next。
}

`我们用下面这个接口来表示我们希望达到的功能:能从rss url上循环获取资源项,形成资源流的形式;循环获取功能,可以中止。

type Subscription interface {
	Updates() <-chan Item//用channel来存放资源,即可实现流的显示
	Close() error//关闭获取
}

先看一个这项功能的简单实现,熟悉多线程编程的,应该觉得很眼熟。


type NavieSub struct {
	closed  bool
	err     error
	updates chan Item
	fetcher Fetcher
}

func (s *NavieSub) Close() error {
	s.closed = true//设置关闭标识为true
	return s.err
}
func (s *NavieSub) Updates() <-chan Item {
	return s.updates//返回已经获取的资源项
}
func (s *NavieSub) loop() {//循环获取的方法实现
	for {
	    if s.closed {//判断关闭标识
	        close(s.updates)//close是内置函数
	        return
	    }
	    items, next, err := s.fetcher.Fetch()//执行一次获取
	    if err != nil {
	        s.err = err
	        time.Sleep(10 * time.Second)
	        continue//出错时暂停10秒后再开始下次循环
	    }
	    for _, item := range items {//将获取的资源项写入,用于返回
	        s.updates <- item
	    }
	    if now := time.Now(); next.After(now) {//暂停到下次获取时间时,再开始下一次获取
	        time.Sleep(next.Sub(now))
	    }
	}
}

func main() {
	fetcher := &FakeFether{channel: "sharecore.info"}
	s := &NavieSub{
	    fetcher: fetcher,
	    updates: make(chan Item),
	}
	
	go s.loop()//启动一个例程执行loop方法(与启动一个线程类似)
	
	time.AfterFunc(3*time.Second, func() {
	    fmt.Println("closed", s.Close())
	})
	
	for item := range s.Updates() {
	    fmt.Println(item.Channel, item.Title)
	}
} 

`那以上的简单实现,会有什么问题呢?

首先,明显发现s.err和s.closed的访问是非同步的。

s.closed = true //设置关闭标识为true

if s.closed {//判断关闭标识
	        close(s.updates) //close是内置函数
	        return
	    }

`然后,我们看到s.updates的定义如下:

s := &NavieSub{
	    fetcher: fetcher,
	    updates: make(chan Item),//定义为没有buffer的channel,一个channel中同时只能有一个元素
	}

`根据上面的定义,s.updates一次只能有一个item进入,当它没有其他goroutine从它里面取出元素时,下面这行代码会发生堵塞

s.updates <- item

`那以上问题我们有什么办法来避免呢?