原文刊载于《日经Linux》2015/01号中(©日经BP社 2015)。原文题为《跟Matz边做边学编程语言:21世纪的并发编程语言》。文/松本行弘,译/刘斌。
随着多核CPU的普及,shell脚本的(一部分)价值也在逐渐被我们重新认识。shell脚本的基本计算模型是基于管道来连接多个进程。如果操作系统支持多核的话,则各进程会被分配到不同的CPU上去执行,这样就可以充分发挥多核CPU的优势。同时这也证明了一点,那就是只要选择合适的计算模型,就能非常容易地实现并发执行。
在实际的业务系统中,我也听说有人采用shell脚本来进行处理。虽说是用shell脚本进行信息的筛选和加工,但是和传统的软件开发模式相比,它有着成本低、灵活性高等优点。
但也并不能说shell脚本有多么理想,实际上它也有它的局限性。
比如,创建OS进程的成本非常高,如果需要使用shell脚本创建大量轻量进程的话,那么在性能上将会非常不利。
还有另外一种成本,由于连接进程的管道只能发送字节数组的数据,所以发送方需要先将数据转换为字节数组,接收方则需要将字节数组还原。比如很多时候我们都会使用以逗号分隔的CSV(Comma Separated Values)格式或表示JavaScript对象的JSON(JavaScript Object Notation)格式,将数据从这些格式转换为字节数组,或者对字节数组进行解析并还原,这样做的成本是非常高的。
在进行大数据处理、高性能计算等时,我们多会选择使用多核CPU。因此,数据转换或创建进程所花费的成本是不可忽视的。这可以说是shell脚本的一个缺陷。
更进一步来说,构成管道的进程(process)所执行的命令(command),可能并不是由同一个开发者所开发的,这些命令的参数设置方法等往往并不统一,因此要想熟练使用这些命令,难度会有所增加。
这样说来,如果能将shell脚本的优点,和通用编程语言的优点结合起来的话,应该就可以创造出一门非常强大的语言。
首先我们来看看这门强大的语言都需要满足哪些必要条件。
第1个条件是可以进行轻量的并发。由于不管是OS级别的进程还是线程,创建成本都很高,因此我们应该尽量避免去使用它们。比较现实的方式是在一个OS的进程中,预先生成与CPU的核数(+α)相同个数的线程,让它们轮番去执行各种操作请求。采用这种实现方式的典型语言包括Erlang和Go。在本文中,我们将相当于Erlang中的“process”、Go中的“goroutine”的概念称为“任务”(task)。
第2个条件就是解决并发执行时的竞争条件。具体来说就是“状态”的排除。也就是说,如果变量或者属性的值发生变化,就会产生一个新的状态,这也带来了因执行时机(timing)不同而产生问题的危险。所以需要将所有数据都设为不可变(immutable),这样就可以避免因执行时机而出现的缺陷。
第3个条件是计算模型。线程模型虽然应用领域非常广泛,但自由程度也很高,因此程序可能会变得难以掌控。于是我们可以参考shell的执行模型,引入一个抽象度非常高的并发计算模型。抽象度高了,反过来表现的自由度就会降低,所以在编写代码的时候就要下一番功夫。而另一方面,我们的程序也会变得非常容易调试。
于是我就开始设计一门满足上述条件的新语言。由于是以流(Stream)为计算模型的语言,因此我们就将它命名为“Streem”。
首先我们来看看它的语法。由于它是基于shell的,因此也没有什么特别的语法。它的基本语法如下所示。
表达式1 | 表达式2 | ...
若要采用这种语法来实现一个从标准输入读取数据,然后输出到标准输出,功能类似于cat命令的程序,只需编写如下代码即可。
STDIN | STOUT
就是如此简单。STDIN和STDOUT是用常量表示标准输入输出的对象。Streem程序中的STDIN会从标准输入中一行一行地读取(字符串)数据,并将这一行行的数据传递给其他表达式,就像“流”一样。像这样用来表示数据的流动的对象就称为“流”(对象)。STDOUT则正相反,它是一个接收字符串参数并输出到外部(标准输出)的流。如果想读取指定文件名的文件内容的话,可以使用如下代码。
read(path)
如果是写入到指定文件,则可以使用如下代码。
write(path)
这两个方法都会返回用来读取或者写入的流对象。
Streem的表达式包括常量、变量引用、函数调用、数组表达式、Map表达式、函数表达式、运算符表达式以及if表达式。它们的语法如表1所示。如果你有其他语言的编程经验的话,这些内容应该都很容易理解。
类型 | 语法 | 示例 |
字符串常量(字面值) | "字符串" | "foobar" |
数值常量 | 数值表现形式 | 123 |
符号(Symbol)常量 | :标识符 | :Foo |
变量引用 | 标识符 | FooBar |
函数调用 | 标识符(参数...) | square(2) |
方法调用 | 表达式.标识符(参数...) | ary.push(2) |
数组表达式 | [表达式, ...] | [1,2,3] |
Map表达式 | [表达式:表达式, ...] | [1:"2", 3:"4"], [:](空Map) |
函数表达式 | {|变量..| 语句 } | {|x| x + 1} |
运算符表达式 | 表达式 运算符 表达式 | 1 + 1 |
if表达式 | if 表达式 {语句 ...} else { 语句 ...} | if true {1} else {2} |
Streem中的赋值有两种方式。第一种是和其他语言类似的使用“=”的方式。
ONE = 1
还有一种就是使用“->”的反方向赋值的方法。如果把上面采用等号的赋值语句改为使用“->”的话,代码将如下所示。
1 -> ONE
这种赋值方式非常适合在将管道执行结果存放到变量时使用,这样代码的顺序就能和程序的执行流程保持一致,非常方便。
不管采取上面哪种赋值方式,为了避免变量状态的改变,都需要遵循如下规则。
规则1:不能给同一个变量进行多次赋值。对一个变量的赋值在其作用域之内只能进行一次。
规则2:仅在交互执行环境的顶层作用域(top level)下,才允许对一个变量进行重复赋值。不过,你可以将这看作是对变量名相同的不同变量进行的赋值。
Streem支持将多条表达式语句并列编写。语句之间用分号(;)或者换行来分割。也可以认为这些语句会按照编码的顺序来执行。如果这些语句之间没有依赖关系的话,在实际执行的时候也可能会并发执行。
接下来让我们来看一些Streem程序的具体例子。
前面我们看到了一个实现类似于cat命令的例子,下面我们来看一个稍微有点不同的例子。这里我们将使用Streem来实现经常被拿来举例的FizzBuzz游戏(图1)。这个游戏要求玩家从1开始输入数字,当这个数字能被3整除的时候,会输出“Fizz”;当能被5整除的时候,会输出“Buzz”;当能同时被3和5整除的时候,则会输出“FizzBuzz”。
seq(100) | { |x| if x % 15 == 0 { "FizzBuzz" } else if x % 3 == 0 { "Fizz" } else if x % 5 == 0 { "Buzz" } else { x } } | STOUT
seq函数用来生成一个从1到指定参数的整数数列。如果将该数列连接到管道上的话,则该数列会将各元素的值按顺序传递给管道。STDOUT则将接收到的数列的值进行输出。
从上面的例子我们可以看出,Streem的管道表述方式直接体现了要干什么,是不是更为直截了当呢?
通过图1的例子,我们已经知道了使用Streem可以非常简单地完成诸如对数值序列进行处理并输出的程序。然而现实中的程序并不完全都是对这种1对1关系的数据进行处理。比如类似于grep(单词搜索)这样“查找所有满足指定条件”的类型,以及类似于wc(统计单词数量)这样对数据进行聚合计算的类型。
Streem也支持这种应用场景,并提供了一些关键字来进行这类操作。
在一次执行需要返回多个值的时候,可以使用emit。如果给它传递多个(参数)值的话,那么它也会返回多个值。也就是说,
emit 1, 2
就相当于下面这行代码。
emit 1; emit 2
此外,如果在数组前面加上“*”的话,就表示要返回这个数组的所有元素。比如,
a = [1, 2, 3]; emit *a
就相当于如下代码。
emit 1; emit 2; emit 3
图2是一个使用emit的例子。这个程序会将从1到100之间的整数每个都打印两次。
# 将从1到100的整数分别打印两次 seq(100) | {|x| emit x, x} | STDOUT
return用来终止函数的执行并返回值。return可以返回多个值,这时候它就相当于对多个值进行了emit操作。有一点前面我们没有提到,那就是如果一个函数主体只有一个表达式的话,那么即使不使用return,这个表达式的执行结果也会作为函数的返回值。
使用emit和return的话,就可以产生比输入值个数更多的返回值。与之相反,如果我们想生成少于输入值个数的返回值的话,则可以使用skip函数。skip用来终止当前函数的执行,但是并不产生任何返回值。图3是一个使用skip的例子,该程序用来筛选出1到100之间的偶数。
# skip奇数,选择偶数 seq(100) | {|x| if x % 2 == 1 {skip}; x} | STDOUT
前面我们已经说过,在Streem中,为了避免竞争条件的出现,所有的数据结构都是不可变的。数组和Map(类似于Ruby中的Hash)类型的变量也是不可变的。向这些结构的数据添加新元素的时候,并不是直接修改已有的数据,而是在原数据的基础上添加新元素来创建新的数据(图4)。
a = [1, 2, 3, 4] # a是一个拥有4个元素的数组 b = a.push(5) # b是在a之后添加了5的数组
在一般的面向对象编程语言中,对象的属性(实例的变量)都是可以修改的,而在Streem中,这种操作是被禁止的,这需要注意一下。从这一点上来说,Streem非常像函数式编程语言。
接着我们再看另一个Streem程序的例子。这里我们选择了在介绍MapReduce时经常使用的一个例子——统计单词出现次数。下面我们用Streem来实现一下(图5)。
STDIN | { |line| return *line.split } | reduce([:]) { |map, word| # [:]是一个空Map map.set(word, map.get(word,0) + 1) } | STDOUT
首先我们对图5的程序中新出现的语法进行说明。在调用reduce函数的地方,我们看到了类似于Ruby中的Block的语句。这是Streem语言中的一个语法糖,如果函数的参数列表后面是一个函数表达式的话,那么这个函数表达式就会被视为该函数的参数列表的最后一个元素。也就是说表达式
reduce(0) {|x, y| x + y }
是下面的表达式的另一种写法。
reduce(0, {|x, y| x + y })
这也是Streem为了能在普通函数调用中将类似于Ruby中的Block变量作为参数而做出的努力,而不必使用&block的方式。
如果我们看一下图5中程序的实际执行情况,就会看到具体流程是首先从STDIN中一行行地读取数据,用split进行单词分割,再通过reduce函数来统计各个单词出现的次数,并将结果存放到Map中去。如果作为key的单词不存在的话,map.get就会返回第二个参数作为默认值(这里是0),这样就可以通过map.get得到该单词的出现次数。map.set用来更新单词出现的次数,并创建一个新的Map。因为每次更新单词出现次数时都会创建一个新的Map,所以看上去有点浪费系统资源,但实际上我们无需为此担心,完全可以将这些问题交给垃圾收集器或者系统运行时环境的内部实现。实际上Clojure及Haskell等很多函数式编程语言也都采用了相同的策略。
最后,程序将生成的Map和STDIN通过管道连接起来,将Map中的键值对打印出来,显示各个单词及其出现次数。这个例子中我们并没有做其他的额外处理,需要的话你可以增加一个管道以在输出结果之前对单词进行排序等工作。
Unix的Socket也是基于流而设计的,Streem当然也支持Socket操作。图6的程序是一个最简单的使用了Socket的网络Echo服务器(将接收到的数据原封不动地返回给客户端)。
# 在8007端口提供服务 tcp_server(8007) | { |s| s | s # 直接将输入数据作为输出返回给客户端 }
代码是不是非常简单?如果程序的应用场景非常匹配流模型,那么采用Streem语言的话,编码工作将会非常简单。
我们还是来解析一下这段代码吧。tcp_server会在参数指定的端口上打开一个服务器端Socket进行监听,等待客户端的连接。在Streem中,服务器端Socket是客户端Socket的流对象。
客户端Socket是客户端的输入和输出的流,所以如下代码
s | s
的实际功能就是“原封不动地将客户端输入直接返回给客户端”。如果需要对输入内容进行加工处理的话,只需要在管道之间加入一个进行数据处理的流就可以了。
目前为止我们看到的管道的组成都是如下方式。
表达式1 | 表达式2 ... | 表达式n
表达式1是一个产生值的流(产生器,generator),表达式2及后续表达式都是对值进行变换、处理的流(过滤器,filter),管道最后的表达式n则可以认为是输出目的地(消费者,consumer)。
产生器有很多种,比如像STDIN这样的从外部获取输入的流,以及像seq()这样的通过计算产生值序列的函数。如果将产生器替换为一个函数表达式的话,那么这个函数表达式就成为了一个通过return或emit来产生值的产生器。
过滤器在大多数情况下都是一个函数,通过参数接收前面的流传过来的值,再通过emit或return将值传递给下一个流。
最后的消费者只会接收值,是一个不会emit值的流。
Streem程序的基本结构就是像这样将流通过管道串联起来,从产生器开始对数据进行流式处理。也许我们也可以称之为“管道业务”。虽说这种计算模型并不是万能的,但是它具有抽象程度高、容易理解、支持并发编程等优点。有时候我们并不需要做到100%的功能,而是专注于那重要的80%就可以了。
但是,并不是说所有程序中的数据流都只有一种(即一条管线),因此完全放弃这样的程序的做法也有点过头。我们需要更加复杂的管线配置。具体来说,我们还需要将多个流合并(merge)为一个流,以及从一个流派生出多条通知(广播)这两种类型的结构。
更进一步来说,在将流进行连接的时候,如果有一个能指定缓冲区大小的方法的话,是不是更好呢?
到这里为止我们看到的例子中数据流都只有一条管线,这在简单的应用场景下倒没什么问题,但是这种方式并不能解决现实中的所有问题。
有时候我们可能需要将多个管道合并为一个,或者对一条管道进行分割操作。管道的合并可以使用“&”操作符。
管道1 & 管道2
通过使用“&”操作符,就能将管道1和管道2的值合并成一个数组,并创建一个新的管道。合并后的新管道在任意一个原管道(这里为管道1和管道2)终止的时候都会同时终止。比如本文前面的cat的例子,我们如果想像cat -n一样同时输出行号的话,可以使用图9中的代码。
seq() & STDIN | STDOUT
由于“&”运算符的优先级高于“|”,所以下面的代码
a & b | c
会被解释为
(a & b) | c
当省略seq()的参数的时候,该函数会从1开始进行无限循环。由于STDIN是从标准输入一行一行地读取数据并写入到管道中的,因此管道合并的结果如下所示。
[行号, 行内容]
将这个流合并后得到的新数组写入到STDOUT(标准输出),就实现了带行号的cat。从实用角度来讲,也许我们还需要对行号进行显示位数的格式化等工作,不过这也只需要你在STDOUT之前加入一个用来格式化的管道操作就可以了 注1 。
注1 由于Streem还在开发之中,因此还没有格式化相关的规范。
如果管道中最后一个流不是消费者的话,则会返回一个被称为“通道”(channel)的对象。比如下面的代码。
seq() & STDIN -> sequence
这里的sequence就是一个用来表现合并了seq()产生的数列和从STDIN读取的输入内容的通道。我们可以将管道理解为使用通道将进行流处理的task串联起来的结构。
当然各个流中对数据进行处理的速度都有所不同。如果前面的流中数据产生速度太快的话,就会将数据堆积到通道中,进而导致占用大量内存。反过来说,如果通道中没有任何缓存数据的话,则会增加前面处理的等待时间,从而降低整体效率。
所以,Streem会将适当数量(当然这个数量既不多也不少最理想了)的通道放到缓冲区中。但是真正合适的缓冲区大小则是由程序来决定的,我们不能进行准确的预测。从性能的角度来讲,有时需要根据实际情况来手动设置这个缓冲区的大小。这时候我们可以使用chan()这个非常方便的函数。
chan()函数用来显式地创建通道对象。管道运算符“|”的右边如果是通道对象的话,则该通道就会直接作为输出目的地。另外你也可以为chan()指定一个整数型的参数,来设置缓冲区的大小。也就是说,如果我们想在图9的程序中将缓冲区大小显式地设置为3的话,代码就会变为图10那样。
seq() & STDIN | chan(3) | STDOUT
如果将缓冲区大小设置为0的话,那么在一个通道对象被创建之后,直到其被消费掉之前,流会进行等待,这样管道就会以前后交互的方式来运行。这在单核CPU环境下也许会非常实用。
在聊天类的应用程序中,一个人发送的消息要被广播给所有参与聊天的成员。通道也可以应用在这种场景下。如果将通过chan()创建的通道连接到多个流的话,那么作为输入发送给该通道的值就会被广播给所有和其连接的流。
如果我们将图6中的Echo服务器修改为聊天服务器,将接收到的消息发送给所有参与者,则代码如图9所示。
broadcast = chan() # 打开8008端口上的服务 tcp_server(8008) | { |s| broadcast | s # 返回参与者的消息 s | broadcast # 将消息发送给所有参与者 }
聪明的你也许已经发现了,广播通道是具有状态的。也就是说,连接到broadcast的流作为消息接收方,是会被保存到broadcast中的。另外,作为输出目标的流如果关闭了的话,或者通过disconnect方法被显式地断开连接的话,则该流就不再是输出目标了。immutable是基本的Streem,但是为了编写容易理解的程序,有时候我们需要牺牲一点纯粹性。当然,由于broadcast的状态变化在Streem内部实现了互斥操作,因此即使在并行环境下运行也不会有问题。
我们围绕管道计算模型设计了的新语言Streem。如果是非常适合流处理的程序的话,写起来将简单得让人吃惊。
实际上Streem语言刚开始设计没多久,在达到实用的程度之前,还有许多需要考虑的东西。比如如何进行异常处理、如何支持用户自定义流、类似于对象的概念该如何定义等问题。随着软件规模变得越来越大,编程语言不得不考虑的问题也会越来越多。
“这种语言不能用来编写大型软件项目”,这是编程语言设计者经常使用的“借口”。但是,只要这种语言还不是一无是处,还没有什么证据能表明这种借口会有什么实际作用。
下次我们将会对Streem的设计进行更深入的讲解,同时也会涉及一些具体的实现细节。