转载

使用 JAQL 语言读写大数据文件

简介

JAQL 是 Query Language for JSON 的简称,是 InfoSphere BigInsights 的主要组件之一。作为一种 high-level 的查询语言,它主要帮助用户使用和操纵“大数据”。它可以执行各种类型的数据分析,无论是需求分析,探索性分析或是实际的生产环境中,它都是一个自动化能力强大,并提供多种控制和丰富功能的脚本语言。

对于初学者来说,第一步是将各种不同格式的文本正确的读入,才能进行下一步的分析操作。而由于 JAQL 语言的灵活性,初学者很难正确的选择合适的适配函数来读取不同格式的文本。

本文列举了各种不同格式的 JSON 文本,以及读写范例,从而帮助初学者快速掌握 JAQL 语言的 I/O,为进一步的分析打下基础。

回页首

JAQL 脚本的读写函数

JAQL 作为一个灵活的大数据处理脚本,可以很方便的处理各类数据存储格式。JAQL 可以处理的文件格式主要分为文本文件和二进制文件。其提供两类函数来进行这两种文件的读写,一类是用于读写的函数,一类是文件描述函数。

读写函数

JAQL 脚本共有 5 个读写函数,分别是 jaqlGet() 函数、read() 函数、write() 函数、localRead() 函数和 localWrite() 函数:

  • jaqlGet() 函数:该函数只能读取单条 JAQL 记录的文档,它并不会使用 Hadoop 的 I/O 功能,也就是说,该函数并不支持分布式文件系统,也不会被解析为一个 map-reduce job,因此,它只支持本地文件系统和 HTTP 请求。jaqlGet() 函数的输入为一条 URI 记录。例如,当文件在本地文件系统中,并且只有一条 JAQL 记录,就可以使用 jaqlGet() 函数来读取。
  • read() 函数: read() 函数是使用最为广泛的读函数,任何 JAQL 支持的文件,都可以使用 read() 函数来读取。read() 函数支持 map-reduce job,并且以文件描述函数作为输入参数,因此,read() 函数可以支持各种格式的输入文件,有更大的灵活性。
  • write() 函数:write() 函数用于将 JSON 格式的数据流写入到文件系统中。输入参数为文件描述函数,并且根据选择的文件描述函数不同,可以输出成不同格式的文件。
  • localRead() 函数:localRead() 函数与 read() 函数类似,唯一不同的是,localRead() 函数并不会使用 map-reduce 过程来读取文件。localRead() 函数同样可以读取本地文件系统的文件和分布式文件系统中的文件。
  • localWrite() 函数:localWrite() 函数与 write() 函数的关系,如同 localRead() 函数与 read() 函数的关系,不同的是,由于 write() 函数会调用 map-reduce 过程,它的最终结果可能会被写在一个目录下,而 localWrite() 函数的结果则会被写成一个文件。

文件描述函数

文件描述函数主要用来描述输入文件的格式信息,可以将其理解为输入文件和读写文件之间的适配器,只有选择正确的适配器,读写函数才能正确的读取输入文件的内容。表格 1 中描述了常用的文件描述函数的名称,使用范围等信息。

表 1. 文件描述函数列表

名称 描述 使用范围 支持的文件类型
del() 用于读取以逗号(也可以是其他符号)为分隔符的文本文件 当CSV文件中无空行、错行时,可以使用del()文件描述函数读取 本地文件系统、分布式文件系统
lines() 用于读取以行为单位的文本文件 读取以行为单位的文件,每一行可以是CSV文件,也可以是JSON格式的记录 本地文件系统、分布式文件系统
jaqltemp() 用于JAQL临时文件,例如map-reduce的中间步骤 该文件格式以JAQL模式存储数据,比一般的二进制格式更简洁,从而提高性能,节省磁盘空间。以jaqltemp()描述函数存储的文件,需要以它来读取 本地文件系统、分布式文件系统
seq() 用于读取二进制格式的JSON文件 当JSON格式的文件以二进制存储时,使用该描述函数读取 本地文件系统、分布式文件系统
jsonTextFile() 用于从本地文件系统读写JSON格式的文本文件 读写的文件内容必须是正确的JSON格式文件 本地文件系统
jsonText() 可以读写本地文件系统或分布式文件系统的JSON格式文本文件 读写的文件内容必须是正确的JSON格式文件 本地文件系统、分布式文件系统
http() 用于读取http协议返回的JSON格式数据 该函数的输入参数为URL URL

需要注意的是,JAQL 的二进制文件并不适合夸版本的应用,当需要在不同的 JAQL 版本中交叉使用时,需要将文件存储为文本格式。

当文件存储在本地文件系统时,我们通过给文件路径加前缀“file://”来标明,而当文件存储在分布式文件系统时,则加前缀“hdfs://”。

回页首

使用 JAQL 脚本读文件

以下我们通过读取不同格式的文本文件为例,详细说明各个文件描述函数的用法。

CSV 文件

通常,CSV 文件使用 del() 文件描述函数来读取。本文以文件 filecsv.csv 为例,描述如何正确读入 CSV 文件。filecsv.csv 文件内容如清单 1 所示:

清单 1. filecsv.csv 文件

> cat filecsv.csv Tom,man,23,student Alice,female,35,teacher Ben, male,33,police

使用 del() 函数读取 filecsv.csv 的代码如清单 2 所示:

清单 2. 读取 filecsv.csv 文件命令

jaql> read(del(location='file:///xvdc/gaoyunhe/develop/filecsv.csv')); [  [  "Tom",  "man",  "23",  "student"  ],  [  "Alice",  "female",  "35",  "teacher"  ],  [  "Ben",  "male",  "33",  "police"  ] ]

而在实际应用中,我们很难保证输入的 CSV 文件中无空行、错行,而一旦输入的 CSV 文件不是完全正确的,使用 del() 函数就会出错。

仍以上述的 filecsv.csv 文件为例,我们添加几个错误的记录和空行,修改后 filecsv.csv 文件如清单 3 所示:

清单 3. 修改后的 filecsv.csv 文件

> cat filecsv.csv  Tom,man,23,student Alice,female,35,teacher Ben,male,33,polic Bob,18,student

可以看到,第 3 行是空行,而第 5 行缺少了性别列。当我们仍然使用 del()函数来读取的时候,就会发生清单 4 所示错误。

清单 4. JAQL 错误示例

jaql> read(del(location='file:///xvdc/gaoyunhe/develop/filecsv.csv',   schema=schema{name: string, sex: string, age: long, occupation:string})); encountered an exception during the evaluation of a statement java.lang.reflect.UndeclaredThrowableException originating expression ends at <stdin> (line: 2, column: 136) java.lang.RuntimeException: Wrong number of fields on input at position 43 [  {  "name": "Tom",  "sex": "man",  "age": 23,  "occupation": "student"  },  {  "name": "Alice",  "sex": "female",  "age": 35,  "occupation": "teacher"  }

此时,为了增加容错性,我们需要使用 lines() 函数替代 del() 函数。首先需要加入错误检查,当某一行不满足 schema 格式的时候抛出错误,并继续执行,而不是终止程序。errorDel()module 用来处理在一个 CSV 文件中不满足指定格式的错误。因此,我们需要首先在 jaqlshell 中 import 该 module:

jaql> import errorDel;

然后,我们使用 lines() 函数来读取这个 CSV 文件,将 lines() 函数的输出传给 transform 操作,由 transform 操作调用 catch() 函数,将所有的 error 捕获,但不终止程序;然后将 transform 操作的输出传给 filter 操作,过滤掉所有不满足条件的记录(列数不为 4 的记录),最后再调用 transfrom 操作将 filter 的输出转化为 JSON 格式。代码如清单 5 所示。

清单 5. 包含容错的 csv 文件读取

jaql> read(lines('file:///xvdc/gaoyunhe/develop/filecsv.csv'))  -> transform catch (delToJson($), { errThresh: 3}, $ ) -> filter count($) == 4  -> transform {   name: $[0],   sex: $[1],   age: $[2],   occupation: $[3]  }; [  {  "name": "Tom",  "sex": "man",  "age": "23",  "occupation": "student"  },  {  "name": "Alice",  "sex": "female",  "age": "35",  "occupation": "teacher"  },  {  "name": "Ben",  "sex": "male",  "age": "33",  "occupation": "police"  } ]

至此,我们基本列出了所有 CSV 格式可能会遇到的问题,当原始文件中无错误行,使用 del() 来读取,当原始文件中可能存在错误行,则需使用 lines() 函数来读取。

一行为一条 JSON 格式记录

在实际应用的过程中,我们还会遇到这样的文件:每一行是一个独立的 JSON 格式记录,而文件的整体并不满足 JSON 格式的要求。给出一个实例文件 jsonline.txt,如清单 6 所示:

清单 6. 一行为一条 JSON 记录的文本文件

> cat jsonline.txt  {"name": "Tom", "sex": "man", "age": "23", "occupation": "student"} {"name": "Alice", "sex": "femal", "age": "35", "occupation": "teacher"} {"name": "Ben", "sex": "male", "age": "33", "occupation": "police"}

可以看到,每一行都是一个标准格式的 JSON 记录,而整个文档并不满足 JSON 的格式要求。

因此,我们不能简单的使用 jsonTextFile() 或 seq() 来读取。和之前读取 CSV 文件一样,需要使用 lines() 函数现将每一行读取,然后再对每一行进行分别处理。以下给出了一个示例,通过对清单 6 所示文件的读取,来阐述如何读取此类文件。

首先,当使用 lines() 函数读取时,可以将每一行读入到一个 string 的记录中,如清单 7 所示:

清单 7. 使用 lines() 函数读取 JSON 文件

jaql> read(lines('file:///xvdc/gaoyunhe/develop/jsonline.txt')); [  "{/"name/": /"Tom/", /"sex/": /"man/", /"age/": /"23/", /"occupation/": /"student/"}",  "{/"name/": /"Alice/", /"sex/": /"femal/", /"age/": 35/", /"occupation/": /"teacher/"}",  "{/"name/": /"Ben/", /"sex/": /"male/", /"age/": /"33/", /"occupation/": /"police/"}" ]

而仅仅读入并不是我们需要的,我们需要读入为 JSON 的格式,因此我们需要添加 transform 操作来将每一行转换为 JSON 格式。这里用到了 json() 函数。如清单 8 示例:

清单 8. 将读取的文件转换成 JSON 格式

jaql> read(lines('file:///xvdc/gaoyunhe/develop/jsonline.txt')) -> transform json($); [  {  "age": "33",  "name": "Ben",  "occupation": "police",  "sex": "male"  },  {  "age": "23",  "name": "Tom",  "occupation": "student",  "sex": "man"  },  {  "age": "35",  "name": "Alice",  "occupation": "teacher",  "sex": "femal"  } ]

同样的,为了避免原始文件中的空行或错误的 JSON 记录导致程序终止,需要 catch 错误。当我们给 jsonline.txt 文件添加空行和错误行后,需要修改代码如清单 9 所示:

清单 9. 包含容错的 JSON 格式文件读取

jaql> read(lines('file:///xvdc/gaoyunhe/develop/jsonline.txt')) ->   transform catch( json($), {errThresh: 3} , $) -> filter not(isnull($)); [  {  "age": "33",  "name": "Ben",  "occupation": "police",  "sex": "male"  },  {  "age": "23",  "name": "Tom",  "occupation": "student",  "sex": "man"  },  {  "age": "35",  "name": "Alice",  "occupation": "teacher",  "sex": "femal"  } ]

以上描述的文件都是文本文件,当以二进制格式存储文件,并且一行上一条 JSON 记录时,我们需要使用 seq() 函数来代替 lines() 函数。例如,当我们将 jsonline.txt 存储为二进制格式,则需使用如清单 10 所示的函数来读取:

清单 10. 读取二进制格式文件

jaql> read(seq("file:///xvdc/gaoyunhe/develop/jsonline.seq")) -> transform catch( json($), {errThresh: 3} , $) -> filter not(isnull($)); [  {  "age": "23",  "name": "Tom",  "occupation": "student",  "sex": "man"  },  {  "age": "35",  "name": "Alice",  "occupation": "teacher",  "sex": "femal"  },  {  "age": "33",  "name": "Ben",  "occupation": "police",  "sex": "male"  } ]

整个文件符合 JSON 格式

当输入文件的内容,完全符合 JSON 格式,那么可以使用 jsonTextFile() 和 jsonText() 函数读取文本文件,而使用 seq() 函数读取二进制文件。如清单 11 所示:

清单 11. 读取 JSON 格式文件

jaql> read(jsonTextFile('file:///xvdc/gaoyunhe/develop/json.txt')); [  {  "age": 23,  "name": "Ton",  "occupation": "student",  "sex": "male"  },  {  "name": "Ben",  "occupation": "police",  "sex": "male"  } ]

读取流文件

JAQL 不但可以读取本地文件,还可以读取从 HTTP 请求下载的文件。可以使用 http() 函数来读取这类流文件。

以一个 solr server 的查询为例,当输入查询参数,solr server 将会返回一组 JSON 格式的查询结果数据,如清单 12 所示:

清单 12. 读取流文件

jaql> url = 'http://****:8983/solr/inst/select?q=love&wt=json&indent=true&rows=2'; jaql> read(http(url)); [  {  "response": {  "docs": [  {  "_version_": 1502821708428476417,  "created_at": "2015-06-01T08:57:17Z",  "doctype": "tweet",  "documentType": "tweet",  "favorite_count": 0,  "id": "605221554826412032",  "id_str": "605221554826412032",  "is_retweet": false,  "retweet_count": 0,  "text": "Love"  },  {  "_version_": 1502821871360409603,  "created_at": "2015-06-01T09:19:11Z",  "doctype": "tweet",  "documentType": "tweet",  "favorite_count": 0,  "id": "605227063230881793",  "id_str": "605227063230881793",  "is_retweet": false,  "retweet_count": 0,  "text": "Love"  }  ],  "numFound": 11944745,  "start": 0  },  "responseHeader": {  "QTime": 1,  "params": {  "indent": "true",  "q": "love",  "rows": "5",  "wt": "json"  },  "status": 0  }  } ]

至此,我们已经列出了最常用的四种文件格式的读取方式。JAQL 提供了各种文件描述函数,选择正确的文件描述函数是正确读入文件的第一步。通常情况下,使用 JAQL 脚本读取文件,如果文件是以行来存储的,那么首要选择是 lines() 函数或 seq() 函数,而如果整个文件满足 JSON 格式,则可以选择 jsonText() 函数或 seq() 函数。

当文件读入 JAQL 脚本后,则可以使用 JAQL 提供的各种操作将读入的数据流进行转换,使之成为需要的格式。主要的操作由 transform、filter 等。

回页首

写文件

通过写文件函数 write()、localWrite() 与文件描述函数的配合,我们可以将数据流写成 CSV 格式文件或 JSON 格式文件。

写 CSV 文件

以清单 1 所示的读入文件为例,一般情况下,我们处理的结果数据流是以 JSON 格式显示,当需要将其输出为 CSV 格式文件,则可以通过 write()/localWrite(0 函数来实现。此时传入输出函数的文件描述函数为 del() 函数,并且我们可以选择需要输出的列,以 schema 参数来描述。

如下示例,example 为清单 1 所示的读入文件,我们选择只输出两列,分别为 name 和 occupation,代码如清单 13 所示:

清单 13. 输出 JSON 格式文件

jaql> example; [  {  "name": "Tom",  "sex": "man",  "age": 23,  "occupation": "student"  },  {  "name": "Alice",  "sex": "female",  "age": 35,  "occupation": "teacher"  },  {  "name": "Ben",  "sex": "male",  "age": 33,  "occupation": "police"  } ] jaql> example -> localWrite(del (location='file:///xvdc/gaoyunhe/develop/csvoutput.txt', schema=schema{name, occupation})); {  "location": "file:///xvdc/gaoyunhe/develop/csvoutput.txt",  "inoptions": {  "adapter": "com.ibm.jaql.io.hadoop.DelInputAdapter",  "format": "com.ibm.biginsights.compress.mapred.CompressedTextInputFormat",  "configurator": "com.ibm.jaql.io.hadoop.FileInputConfigurator",  "converter": "com.ibm.jaql.io.hadoop.converter.FromDelConverter",  "delimiter": ",",  "quoted": true,  "doubleq": true,  "ddquote": true,  "escape": true,  "schema": schema {  "name",  "occupation"  }  },  "outoptions": {  "adapter": "com.ibm.jaql.io.hadoop.DelOutputAdapter",  "format": "org.apache.hadoop.mapred.TextOutputFormat",  "configurator": "com.ibm.jaql.io.hadoop.FileOutputConfigurator",  "converter": "com.ibm.jaql.io.hadoop.converter.ToDelConverter",  "delimiter": ",",  "quoted": true,  "doubleq": true,  "ddquote": true,  "escape": true,  "schema": schema {  "name",  "occupation"  }  },  "options": {  "schema": schema {  "name",  "occupation"  }  } }

这里我们使用 localWrite() 函数,是为了将数据流输出到一个文件中,如果使用 write() 函数,则会调用 map-reduce 过程,数据流将会被分割为多份,分别输出到 part 文件中。从 localWrite() 函数的返回值中可以看到,我们在 schema 参数中只指定了 name 和 occupation,因此,只有这两项写入了新文件中。

写 json 格式文件

当我们需要将上例中的 example 参数输出为 json 格式,则 jsonTextFile(), jsonText(), seq() 函数均可以使用。如清单 14 所示,分别将 example 参数输出到 json 文本文件和 json 格式的二进制文件中。

清单 14. 输出为 JSON 文件

jaql> example -> localWrite(jsonTextFile('file:///xvdc/gaoyunhe/develop/csvoutput.json')); {  "location": "file:///xvdc/gaoyunhe/develop/csvoutput.json",  "inoptions": {  "adapter": "com.ibm.jaql.io.stream.FileStreamInputAdapter",  "format": "com.ibm.jaql.io.stream.converter.JsonTextInputStream"  },  "outoptions": {  "adapter": "com.ibm.jaql.io.stream.FileStreamOutputAdapter",  "format": "com.ibm.jaql.io.stream.converter.JsonTextOutputStream"  } } jaql> example -> localWrite(seq('file:///xvdc/gaoyunhe/develop/csvoutput.json')); {  "location": "file:///xvdc/gaoyunhe/develop/csvoutput.json",  "inoptions": {  "adapter": "com.ibm.jaql.io.hadoop.DefaultHadoopInputAdapter",  "format": "org.apache.hadoop.mapred.SequenceFileInputFormat",  "configurator": "com.ibm.jaql.io.hadoop.FileInputConfigurator"  },  "outoptions": {  "adapter": "com.ibm.jaql.io.hadoop.DefaultHadoopOutputAdapter",  "format": "org.apache.hadoop.mapred.SequenceFileOutputFormat",  "configurator": "com.ibm.jaql.io.hadoop.FileOutputConfigurator"  } }

将数据流临时输出为二进制文件

jaqltemp() 函数用于将数据流输出为二进制文件,而以 jaqltemp () 函数写出的二进制文件比一般的二进制文件更节省空间。如清单 15 所示,将 example 以 jaqltemp() 函数输出:

清单 15. 将数据流临时输出为二进制文件

jaql> example -> localWrite(jaqltemp(location='file:///xvdc/gaoyunhe/develop/csvoutput.bin',   schema=schema{"name":string, "sex":string, "age":long, "occupation":string})); {  "location": "file:///xvdc/gaoyunhe/develop/csvoutput.bin",  "inoptions": {  "adapter": "com.ibm.jaql.io.hadoop.TempHadoopInputAdapter",  "format": "org.apache.hadoop.mapred.SequenceFileInputFormat",  "configurator": "com.ibm.jaql.io.hadoop.TempFileInputConfigurator",  "schema": schema {  "name": string,  "sex": string,  "age": long,  "occupation": string  }  },  "outoptions": {  "adapter": "com.ibm.jaql.io.hadoop.TempHadoopOutputAdapter",  "format": "org.apache.hadoop.mapred.SequenceFileOutputFormat",  "configurator": "com.ibm.jaql.io.hadoop.TempFileOutputConfigurator",  "schema": schema {  "name": string,  "sex": string,  "age": long,  "occupation": string  }  },  "options": {  "schema": schema {  "name": string,  "sex": string,  "age": long,  "occupation": string  }  } }

对比 csvoutput.seq 和 csvoutput.bin,可以发现,同样的数据量,csvoutput.bin 使用的存储空间要比 csvoutput.seq 的存储空间小,如清单 16 所示:

清单 16. 输出文件大小对比

-rwxrwxrwx 1 hadoop hadoop 233 09-28 02:08 csvoutput.bin -rwxrwxrwx 1 hadoop hadoop 332 09-28 02:09 csvoutput.seq

jaqltemp() 函数的缺点是它并不适合作为长期存储的数据使用,因为 jaqltemp() 各个版本的格式并不兼容,所以一般情况下,我们只是在同一个程序中,临时存储数据使用。

回页首

结束语

JAQL 作为一种灵活的,强大的大数据处理脚本语言,其提供了各种输入文件的数据描述函数,使用正确的文件数据描述函数才可以将输入文件读入 JAQL 脚本中。虽然文件描述函数众多,只要我们明确要读入的数据是以怎样的格式来描述,就可以选择需要的文件描述函数。

正文到此结束
Loading...