在这一小节中,我将介绍基于数据(函数式)的方法来构建数据应用。这里会介绍monadic设计来创建动态工作流,利用依赖注入这样的高级函数式特性来构建轻便的计算工作流。
在统计学和概率论中,一个模型通过描述从一个系统中观察到的数据来表达任何形式的不确定性,模型使得我们可以用来推断规则,进行预测,从数据中学习有用的东西。对于有经验的Scala程序员而言,模型常常和monoid联系起来。monoid是一些观测的集合,其中的操作是实现模型所需的函数。
关于模型的特征
模型特征的选择是从可用变量中发现最小集合来构建模型的过程。数据中常常包含多余和不相干的特征,这些多余特征并不能提供任何有用信息,所以需要通过特征选择将有用的特征挑选出来。
特征选择包含两个具体步骤
观测数据是一组隐含特征(也称为隐含变量,latent variables)的间接测量,他们可能是噪声,也可能包含高度的相关性和冗余。直接使用原始观测进行预测任务常常得到不准确的结果,使用从观测数据提取的所有特征又带来了计算代价。特征抽取可以通过去除冗余或不相关的特征来减少特征数量或维度。
首先,所选的数学模型是从原始输入数据中抽取知识的,那么模型的选择中需要考虑以下几个方面:
然后,从工程角度出发,需要选择一种计算调度框架来处理数据,这需要考虑以下几个方面:
下面的图标给出了计算模型的工作流程:
在这个流程图中,下游的数据转换(data transformation)的参数需要根据上游数据转换的输出进行配置,Scala的高阶函数非常适合实现可配置的数据转换。
创建足够灵活和可重用的框架的目的是为了更好地适应不同工作流程,支持各种类型的机器学习算法。
Scala通过特质(traits)语法实现了丰富的语言特性,可以通过下面的设计层级来构建复杂的程序框架:
数据转换是对数据进行分类、训练验证模型、结果可视化等每个步骤环节的基础。定义一个符号,表示不同类型的数据转换,而不暴露算法实现的内部状态。而管道操作符就是用来表示数据转换的。
trait PipeOperator[-T, +U] {
def |>(data: T): Option[U]
}
|>
操作符将类型为T的数据转换成类型为U的数据,返回一个Option来处理中间的错误和异常。
接下来需要创建单子化的设计(monadic design)来实现管道操作(pipe operator)。通过单子化设计来包装类 _FCT
。 _FCT
类的方法代表了传统Scala针对集合的高阶函数子集。
class _FCT[+T](val _fct: T) {
def map[U](c: T => U): _FCT[U] = new _FCT[U]( c(_fct))
def flatMap[U](f: T =>_FCT[U]): _FCT[U] = f(_fct)
def filter(p: T =>Boolean): _FCT[T] =
if( p(_fct) ) new _FCT[T](_fct) else zeroFCT(_fct)
def reduceLeft[U](f: (U,T) => U)(implicit c: T=> U): U =
f(c(_fct),_fct)
def foldLeft[U](zero: U)(f: (U, T) => U)(implicit c: T=> U): U =
f(c(_fct), _fct)
def foreach(p: T => Unit): Unit = p(_fct)
}
最后, Transform
类将 PipeOperator
实例作为参数输入,自动调用其操作符,像这样:
class Transform[-T, +U](val op: PipeOperator[T, U]) extends _FCT[Function[T, Option[U]]](op.|>) {
def |>(data: T): Option[U] = _fct(data)
}
也许你会对数据转换 Transform
的单子化表示背后的原因表示怀疑,毕竟本来可以通过 PipeOperator
的实现来创建任何算法。
原因是 Transform
含有丰富的方法,使得开发者可以创建丰富的工作流。
下面的代码片段描述的是使用单子化方法来进行数据转换组合:
val op = new PipeOperator[Int, Double] {
def |> (n: Int):Option[Double] =Some(Math.sin(n.toDouble))
}
def g(f: Int =>Option[Double]): (Int=> Long) = {
(n: Int) => {
f(n) match {
case Some(x) => x.toLong
case None => -1L
}
}
}
val gof = new Transform[Int,Double](op).map(g(_))
这里使用函数g作为现有的数据转换来扩展op。
一个由可配置的数据转换构成的工作流在其不同的流程阶段都需要动态的模块化。蛋糕模式(Cake Pattern)是使用混入特质(mix-in traits)来满足可配置计算工作流的一种高级类组合模式。Scala通过特质这一语法特性使得开发者能够使用一种灵活的、可重用的方法来创建和管理模块,特质是可嵌套的、可混入类中的、可堆叠的、可继承的。
val myApp = new Classification with Validation with PreProcessing {
val filter = ..
}
val myApp = new Clustering with Validation with PreProcessing {
val filter = ..
}
对于上面两个应用来说,都需要数据的预处理和验证模块,在代码中都重复定义了filter方法,使得代码重复、缺乏灵活性。当特质在组合中存在依赖性时,这个问题凸现出来。
混入的线性化
在混入的特质中,方法调用遵循从右到左的顺序:
M => B => C => A => N
的线性顺序来实现 trait PreProcessingWithValidation extends PreProcessing {
self: Validation =>
val filter = ..
}
val myApp = new Classification with PreProcessingWithValidation {
val validation: Validation
}
在PreProcessingWithValidation中使用self类型来解决上述问题。(tips:原书的内容在这里我没怎么搞清楚,不知道是通过自身类型混入了Validation后filter方法具体是怎么实现的,以及实例化Classification时混入PreProcessingWithValidation难道不需要混入Validation吗?我表示疑问)
由PipeOperator定义的数据转换动态地嵌入了通过抽象val定义的模块中,下面我们定义工作流的三个阶段:
trait PreprocModule[-T, +U] { val preProc: PipeOperator[T, U] }
trait ProcModule[-T, +U] { val proc: PipeOperator[T, U] }
trait PostprocModule[-T, +U] { val postProc: PipeOperator[T, U] }
上面的特质(模块)仅包含一个抽象值,蛋糕模式的一个特点是用模块内部封装的类型初始化抽象值来执行严格的模块化:
trait ProcModule[-T, +U] {
val proc: PipeOperator [T, U]
class Classification[-T, +U] extends PipeOperator [T,U] { }
}
构建框架的一个目的是允许开发者可以从任何工作流中独立创建数据转换(继承自PipeOperator)。
接下来就是将不同的模块写入一个工作流中,通过上一小节中的三个特质的堆叠作为自身引用来实现:
class WorkFlow[T, U, V, W] {
self: PreprocModule[T,U] with ProcModule[U,V] with PostprocModule[V,W] =>
def |> (data: T): Option[W] = {
preProc |> data match {
case Some(input) => {
proc |> input match {
case Some(output) => postProc |> output
case None => { … }
}
}
case None => { … }
}
}
}
下面介绍如何具体地实现一个工作流。
首先通过继承PipeOperator来定义集中数据转换:
class Sampler(val samples: Int) extends PipeOperator[Double => Double, DblVector] {
override def |> (f: Double => Double): Option[DblVector] =
Some(Array.tabulate(samples)(n => f(n.toDouble/samples)) )
}
class Normalizer extends PipeOperator[DblVector, DblVector] {
override def |> (data: DblVector): Option[DblVector] =
Some(Stats[Double](data).normalize)
}
class Reducer extends PipeOperator[DblVector, Int] {
override def |> (data: DblVector): Option[Int] =
Range(0, data.size) find(data(_) == 1.0)
}
工作流工厂由这个UML类图描述。
最终通过动态地初始化抽象值preProc、proc和postProc来实例化工作流。
val dataflow = new Workflow[Double => Double, DblVector, DblVector, Int]
with PreprocModule[Double => Double, DblVector]
with ProcModule[DblVector, DblVector]
with PostprocModule[DblVector, Int] {
val preProc: PipeOperator[Double => Double,DblVector] = new Sampler(100) //1
val proc: PipeOperator[DblVector,DblVector]= new Normalizer //1
val postProc: PipeOperator[DblVector,Int] = new Reducer//1
}
dataflow |> ((x: Double) => Math.log(x+1.0)+Random.nextDouble) match {
case Some(index) => …