JAQL 是 Query Language for JSON 的简称,是 InfoSphere BigInsights 的主要组件之一。作为一种 high-level 的查询语言,它主要帮助用户使用和操纵“大数据”。它可以执行各种类型的数据分析,无论是需求分析,探索性分析或是实际的生产环境中,它都是一个自动化能力强大,并提供多种控制和丰富功能的脚本语言。
对于初学者来说,第一步是将各种不同格式的文本正确的读入,才能进行下一步的分析操作。而由于 JAQL 语言的灵活性,初学者很难正确的选择合适的适配函数来读取不同格式的文本。
本文列举了各种不同格式的 JSON 文本,以及读写范例,从而帮助初学者快速掌握 JAQL 语言的 I/O,为进一步的分析打下基础。
回页首
JAQL 作为一个灵活的大数据处理脚本,可以很方便的处理各类数据存储格式。JAQL 可以处理的文件格式主要分为文本文件和二进制文件。其提供两类函数来进行这两种文件的读写,一类是用于读写的函数,一类是文件描述函数。
JAQL 脚本共有 5 个读写函数,分别是 jaqlGet() 函数、read() 函数、write() 函数、localRead() 函数和 localWrite() 函数:
文件描述函数主要用来描述输入文件的格式信息,可以将其理解为输入文件和读写文件之间的适配器,只有选择正确的适配器,读写函数才能正确的读取输入文件的内容。表格 1 中描述了常用的文件描述函数的名称,使用范围等信息。
表 1. 文件描述函数列表
名称 | 描述 | 使用范围 | 支持的文件类型 |
---|---|---|---|
del() | 用于读取以逗号(也可以是其他符号)为分隔符的文本文件 | 当CSV文件中无空行、错行时,可以使用del()文件描述函数读取 | 本地文件系统、分布式文件系统 |
lines() | 用于读取以行为单位的文本文件 | 读取以行为单位的文件,每一行可以是CSV文件,也可以是JSON格式的记录 | 本地文件系统、分布式文件系统 |
jaqltemp() | 用于JAQL临时文件,例如map-reduce的中间步骤 | 该文件格式以JAQL模式存储数据,比一般的二进制格式更简洁,从而提高性能,节省磁盘空间。以jaqltemp()描述函数存储的文件,需要以它来读取 | 本地文件系统、分布式文件系统 |
seq() | 用于读取二进制格式的JSON文件 | 当JSON格式的文件以二进制存储时,使用该描述函数读取 | 本地文件系统、分布式文件系统 |
jsonTextFile() | 用于从本地文件系统读写JSON格式的文本文件 | 读写的文件内容必须是正确的JSON格式文件 | 本地文件系统 |
jsonText() | 可以读写本地文件系统或分布式文件系统的JSON格式文本文件 | 读写的文件内容必须是正确的JSON格式文件 | 本地文件系统、分布式文件系统 |
http() | 用于读取http协议返回的JSON格式数据 | 该函数的输入参数为URL | URL |
需要注意的是,JAQL 的二进制文件并不适合夸版本的应用,当需要在不同的 JAQL 版本中交叉使用时,需要将文件存储为文本格式。
当文件存储在本地文件系统时,我们通过给文件路径加前缀“file://”来标明,而当文件存储在分布式文件系统时,则加前缀“hdfs://”。
回页首
以下我们通过读取不同格式的文本文件为例,详细说明各个文件描述函数的用法。
通常,CSV 文件使用 del() 文件描述函数来读取。本文以文件 filecsv.csv 为例,描述如何正确读入 CSV 文件。filecsv.csv 文件内容如清单 1 所示:
清单 1. filecsv.csv 文件
> cat filecsv.csv Tom,man,23,student Alice,female,35,teacher Ben, male,33,police
使用 del() 函数读取 filecsv.csv 的代码如清单 2 所示:
清单 2. 读取 filecsv.csv 文件命令
jaql> read(del(location='file:///xvdc/gaoyunhe/develop/filecsv.csv')); [ [ "Tom", "man", "23", "student" ], [ "Alice", "female", "35", "teacher" ], [ "Ben", "male", "33", "police" ] ]
而在实际应用中,我们很难保证输入的 CSV 文件中无空行、错行,而一旦输入的 CSV 文件不是完全正确的,使用 del() 函数就会出错。
仍以上述的 filecsv.csv 文件为例,我们添加几个错误的记录和空行,修改后 filecsv.csv 文件如清单 3 所示:
清单 3. 修改后的 filecsv.csv 文件
> cat filecsv.csv Tom,man,23,student Alice,female,35,teacher Ben,male,33,polic Bob,18,student
可以看到,第 3 行是空行,而第 5 行缺少了性别列。当我们仍然使用 del()函数来读取的时候,就会发生清单 4 所示错误。
清单 4. JAQL 错误示例
jaql> read(del(location='file:///xvdc/gaoyunhe/develop/filecsv.csv', schema=schema{name: string, sex: string, age: long, occupation:string})); encountered an exception during the evaluation of a statement java.lang.reflect.UndeclaredThrowableException originating expression ends at <stdin> (line: 2, column: 136) java.lang.RuntimeException: Wrong number of fields on input at position 43 [ { "name": "Tom", "sex": "man", "age": 23, "occupation": "student" }, { "name": "Alice", "sex": "female", "age": 35, "occupation": "teacher" }
此时,为了增加容错性,我们需要使用 lines() 函数替代 del() 函数。首先需要加入错误检查,当某一行不满足 schema 格式的时候抛出错误,并继续执行,而不是终止程序。errorDel()module 用来处理在一个 CSV 文件中不满足指定格式的错误。因此,我们需要首先在 jaqlshell 中 import 该 module:
jaql> import errorDel;
然后,我们使用 lines() 函数来读取这个 CSV 文件,将 lines() 函数的输出传给 transform 操作,由 transform 操作调用 catch() 函数,将所有的 error 捕获,但不终止程序;然后将 transform 操作的输出传给 filter 操作,过滤掉所有不满足条件的记录(列数不为 4 的记录),最后再调用 transfrom 操作将 filter 的输出转化为 JSON 格式。代码如清单 5 所示。
清单 5. 包含容错的 csv 文件读取
jaql> read(lines('file:///xvdc/gaoyunhe/develop/filecsv.csv')) -> transform catch (delToJson($), { errThresh: 3}, $ ) -> filter count($) == 4 -> transform { name: $[0], sex: $[1], age: $[2], occupation: $[3] }; [ { "name": "Tom", "sex": "man", "age": "23", "occupation": "student" }, { "name": "Alice", "sex": "female", "age": "35", "occupation": "teacher" }, { "name": "Ben", "sex": "male", "age": "33", "occupation": "police" } ]
至此,我们基本列出了所有 CSV 格式可能会遇到的问题,当原始文件中无错误行,使用 del() 来读取,当原始文件中可能存在错误行,则需使用 lines() 函数来读取。
在实际应用的过程中,我们还会遇到这样的文件:每一行是一个独立的 JSON 格式记录,而文件的整体并不满足 JSON 格式的要求。给出一个实例文件 jsonline.txt,如清单 6 所示:
清单 6. 一行为一条 JSON 记录的文本文件
> cat jsonline.txt {"name": "Tom", "sex": "man", "age": "23", "occupation": "student"} {"name": "Alice", "sex": "femal", "age": "35", "occupation": "teacher"} {"name": "Ben", "sex": "male", "age": "33", "occupation": "police"}
可以看到,每一行都是一个标准格式的 JSON 记录,而整个文档并不满足 JSON 的格式要求。
因此,我们不能简单的使用 jsonTextFile() 或 seq() 来读取。和之前读取 CSV 文件一样,需要使用 lines() 函数现将每一行读取,然后再对每一行进行分别处理。以下给出了一个示例,通过对清单 6 所示文件的读取,来阐述如何读取此类文件。
首先,当使用 lines() 函数读取时,可以将每一行读入到一个 string 的记录中,如清单 7 所示:
清单 7. 使用 lines() 函数读取 JSON 文件
jaql> read(lines('file:///xvdc/gaoyunhe/develop/jsonline.txt')); [ "{/"name/": /"Tom/", /"sex/": /"man/", /"age/": /"23/", /"occupation/": /"student/"}", "{/"name/": /"Alice/", /"sex/": /"femal/", /"age/": 35/", /"occupation/": /"teacher/"}", "{/"name/": /"Ben/", /"sex/": /"male/", /"age/": /"33/", /"occupation/": /"police/"}" ]
而仅仅读入并不是我们需要的,我们需要读入为 JSON 的格式,因此我们需要添加 transform 操作来将每一行转换为 JSON 格式。这里用到了 json() 函数。如清单 8 示例:
清单 8. 将读取的文件转换成 JSON 格式
jaql> read(lines('file:///xvdc/gaoyunhe/develop/jsonline.txt')) -> transform json($); [ { "age": "33", "name": "Ben", "occupation": "police", "sex": "male" }, { "age": "23", "name": "Tom", "occupation": "student", "sex": "man" }, { "age": "35", "name": "Alice", "occupation": "teacher", "sex": "femal" } ]
同样的,为了避免原始文件中的空行或错误的 JSON 记录导致程序终止,需要 catch 错误。当我们给 jsonline.txt 文件添加空行和错误行后,需要修改代码如清单 9 所示:
清单 9. 包含容错的 JSON 格式文件读取
jaql> read(lines('file:///xvdc/gaoyunhe/develop/jsonline.txt')) -> transform catch( json($), {errThresh: 3} , $) -> filter not(isnull($)); [ { "age": "33", "name": "Ben", "occupation": "police", "sex": "male" }, { "age": "23", "name": "Tom", "occupation": "student", "sex": "man" }, { "age": "35", "name": "Alice", "occupation": "teacher", "sex": "femal" } ]
以上描述的文件都是文本文件,当以二进制格式存储文件,并且一行上一条 JSON 记录时,我们需要使用 seq() 函数来代替 lines() 函数。例如,当我们将 jsonline.txt 存储为二进制格式,则需使用如清单 10 所示的函数来读取:
清单 10. 读取二进制格式文件
jaql> read(seq("file:///xvdc/gaoyunhe/develop/jsonline.seq")) -> transform catch( json($), {errThresh: 3} , $) -> filter not(isnull($)); [ { "age": "23", "name": "Tom", "occupation": "student", "sex": "man" }, { "age": "35", "name": "Alice", "occupation": "teacher", "sex": "femal" }, { "age": "33", "name": "Ben", "occupation": "police", "sex": "male" } ]
当输入文件的内容,完全符合 JSON 格式,那么可以使用 jsonTextFile() 和 jsonText() 函数读取文本文件,而使用 seq() 函数读取二进制文件。如清单 11 所示:
清单 11. 读取 JSON 格式文件
jaql> read(jsonTextFile('file:///xvdc/gaoyunhe/develop/json.txt')); [ { "age": 23, "name": "Ton", "occupation": "student", "sex": "male" }, { "name": "Ben", "occupation": "police", "sex": "male" } ]
JAQL 不但可以读取本地文件,还可以读取从 HTTP 请求下载的文件。可以使用 http() 函数来读取这类流文件。
以一个 solr server 的查询为例,当输入查询参数,solr server 将会返回一组 JSON 格式的查询结果数据,如清单 12 所示:
清单 12. 读取流文件
jaql> url = 'http://****:8983/solr/inst/select?q=love&wt=json&indent=true&rows=2'; jaql> read(http(url)); [ { "response": { "docs": [ { "_version_": 1502821708428476417, "created_at": "2015-06-01T08:57:17Z", "doctype": "tweet", "documentType": "tweet", "favorite_count": 0, "id": "605221554826412032", "id_str": "605221554826412032", "is_retweet": false, "retweet_count": 0, "text": "Love" }, { "_version_": 1502821871360409603, "created_at": "2015-06-01T09:19:11Z", "doctype": "tweet", "documentType": "tweet", "favorite_count": 0, "id": "605227063230881793", "id_str": "605227063230881793", "is_retweet": false, "retweet_count": 0, "text": "Love" } ], "numFound": 11944745, "start": 0 }, "responseHeader": { "QTime": 1, "params": { "indent": "true", "q": "love", "rows": "5", "wt": "json" }, "status": 0 } } ]
至此,我们已经列出了最常用的四种文件格式的读取方式。JAQL 提供了各种文件描述函数,选择正确的文件描述函数是正确读入文件的第一步。通常情况下,使用 JAQL 脚本读取文件,如果文件是以行来存储的,那么首要选择是 lines() 函数或 seq() 函数,而如果整个文件满足 JSON 格式,则可以选择 jsonText() 函数或 seq() 函数。
当文件读入 JAQL 脚本后,则可以使用 JAQL 提供的各种操作将读入的数据流进行转换,使之成为需要的格式。主要的操作由 transform、filter 等。
回页首
通过写文件函数 write()、localWrite() 与文件描述函数的配合,我们可以将数据流写成 CSV 格式文件或 JSON 格式文件。
以清单 1 所示的读入文件为例,一般情况下,我们处理的结果数据流是以 JSON 格式显示,当需要将其输出为 CSV 格式文件,则可以通过 write()/localWrite(0 函数来实现。此时传入输出函数的文件描述函数为 del() 函数,并且我们可以选择需要输出的列,以 schema 参数来描述。
如下示例,example 为清单 1 所示的读入文件,我们选择只输出两列,分别为 name 和 occupation,代码如清单 13 所示:
清单 13. 输出 JSON 格式文件
jaql> example; [ { "name": "Tom", "sex": "man", "age": 23, "occupation": "student" }, { "name": "Alice", "sex": "female", "age": 35, "occupation": "teacher" }, { "name": "Ben", "sex": "male", "age": 33, "occupation": "police" } ] jaql> example -> localWrite(del (location='file:///xvdc/gaoyunhe/develop/csvoutput.txt', schema=schema{name, occupation})); { "location": "file:///xvdc/gaoyunhe/develop/csvoutput.txt", "inoptions": { "adapter": "com.ibm.jaql.io.hadoop.DelInputAdapter", "format": "com.ibm.biginsights.compress.mapred.CompressedTextInputFormat", "configurator": "com.ibm.jaql.io.hadoop.FileInputConfigurator", "converter": "com.ibm.jaql.io.hadoop.converter.FromDelConverter", "delimiter": ",", "quoted": true, "doubleq": true, "ddquote": true, "escape": true, "schema": schema { "name", "occupation" } }, "outoptions": { "adapter": "com.ibm.jaql.io.hadoop.DelOutputAdapter", "format": "org.apache.hadoop.mapred.TextOutputFormat", "configurator": "com.ibm.jaql.io.hadoop.FileOutputConfigurator", "converter": "com.ibm.jaql.io.hadoop.converter.ToDelConverter", "delimiter": ",", "quoted": true, "doubleq": true, "ddquote": true, "escape": true, "schema": schema { "name", "occupation" } }, "options": { "schema": schema { "name", "occupation" } } }
这里我们使用 localWrite() 函数,是为了将数据流输出到一个文件中,如果使用 write() 函数,则会调用 map-reduce 过程,数据流将会被分割为多份,分别输出到 part 文件中。从 localWrite() 函数的返回值中可以看到,我们在 schema 参数中只指定了 name 和 occupation,因此,只有这两项写入了新文件中。
当我们需要将上例中的 example 参数输出为 json 格式,则 jsonTextFile(), jsonText(), seq() 函数均可以使用。如清单 14 所示,分别将 example 参数输出到 json 文本文件和 json 格式的二进制文件中。
清单 14. 输出为 JSON 文件
jaql> example -> localWrite(jsonTextFile('file:///xvdc/gaoyunhe/develop/csvoutput.json')); { "location": "file:///xvdc/gaoyunhe/develop/csvoutput.json", "inoptions": { "adapter": "com.ibm.jaql.io.stream.FileStreamInputAdapter", "format": "com.ibm.jaql.io.stream.converter.JsonTextInputStream" }, "outoptions": { "adapter": "com.ibm.jaql.io.stream.FileStreamOutputAdapter", "format": "com.ibm.jaql.io.stream.converter.JsonTextOutputStream" } } jaql> example -> localWrite(seq('file:///xvdc/gaoyunhe/develop/csvoutput.json')); { "location": "file:///xvdc/gaoyunhe/develop/csvoutput.json", "inoptions": { "adapter": "com.ibm.jaql.io.hadoop.DefaultHadoopInputAdapter", "format": "org.apache.hadoop.mapred.SequenceFileInputFormat", "configurator": "com.ibm.jaql.io.hadoop.FileInputConfigurator" }, "outoptions": { "adapter": "com.ibm.jaql.io.hadoop.DefaultHadoopOutputAdapter", "format": "org.apache.hadoop.mapred.SequenceFileOutputFormat", "configurator": "com.ibm.jaql.io.hadoop.FileOutputConfigurator" } }
jaqltemp() 函数用于将数据流输出为二进制文件,而以 jaqltemp () 函数写出的二进制文件比一般的二进制文件更节省空间。如清单 15 所示,将 example 以 jaqltemp() 函数输出:
清单 15. 将数据流临时输出为二进制文件
jaql> example -> localWrite(jaqltemp(location='file:///xvdc/gaoyunhe/develop/csvoutput.bin', schema=schema{"name":string, "sex":string, "age":long, "occupation":string})); { "location": "file:///xvdc/gaoyunhe/develop/csvoutput.bin", "inoptions": { "adapter": "com.ibm.jaql.io.hadoop.TempHadoopInputAdapter", "format": "org.apache.hadoop.mapred.SequenceFileInputFormat", "configurator": "com.ibm.jaql.io.hadoop.TempFileInputConfigurator", "schema": schema { "name": string, "sex": string, "age": long, "occupation": string } }, "outoptions": { "adapter": "com.ibm.jaql.io.hadoop.TempHadoopOutputAdapter", "format": "org.apache.hadoop.mapred.SequenceFileOutputFormat", "configurator": "com.ibm.jaql.io.hadoop.TempFileOutputConfigurator", "schema": schema { "name": string, "sex": string, "age": long, "occupation": string } }, "options": { "schema": schema { "name": string, "sex": string, "age": long, "occupation": string } } }
对比 csvoutput.seq 和 csvoutput.bin,可以发现,同样的数据量,csvoutput.bin 使用的存储空间要比 csvoutput.seq 的存储空间小,如清单 16 所示:
清单 16. 输出文件大小对比
-rwxrwxrwx 1 hadoop hadoop 233 09-28 02:08 csvoutput.bin -rwxrwxrwx 1 hadoop hadoop 332 09-28 02:09 csvoutput.seq
jaqltemp() 函数的缺点是它并不适合作为长期存储的数据使用,因为 jaqltemp() 各个版本的格式并不兼容,所以一般情况下,我们只是在同一个程序中,临时存储数据使用。
回页首
JAQL 作为一种灵活的,强大的大数据处理脚本语言,其提供了各种输入文件的数据描述函数,使用正确的文件数据描述函数才可以将输入文件读入 JAQL 脚本中。虽然文件描述函数众多,只要我们明确要读入的数据是以怎样的格式来描述,就可以选择需要的文件描述函数。