转载

Spark处理Json格式数据(Python)

前言

Spark能够自动推断出Json数据集的“数据模式”(Schema),并将它加载为一个SchemaRDD实例。这种“自动”的行为是通过下述两种方法实现的:

jsonFile:从一个文件目录中加载数据,这个目录中的文件的每一行均为一个JSON字符串(如果JSON字符串“跨行”,则可能导致解析错误);

jsonRDD:从一个已经存在的RDD中加载数据,这个RDD中的每一个元素均为一个JSON字符串;

这里我们仅讨论jsonFile的场景,jsonRDD处理方法类似。

典型示例

JSON的数据模式是非常灵活的,我们例举常见的几种可能性进行讨论。

(1)JSON文件中的数据模式一致,每一行的数据均为JSON字符串(非JSON数组)

假设数据模式包含三个属性:id、name、birthdate,测试数据如下所示:

Spark处理Json格式数据(Python)

Python测试代码如下:

Spark处理Json格式数据(Python)

在终端上执行可以看到有两部分输出:

Spark处理Json格式数据(Python)

Spark处理Json格式数据(Python)

可以看到数据模式一致的情况下,数据模式被正确推断,数据被正确解析。

(2)JSON文件中的数据模式一致,每一行的数据均为JSON数组字符串

假设数据模式包含三个属性:id、name、birthdate,测试数据如下所示:

Spark处理Json格式数据(Python)

Python测试代码同上,在终端上执行可以看到有两部分输出:

Spark处理Json格式数据(Python)

Spark处理Json格式数据(Python)

可以看到数据数据模式被正确推断,JSON数组中的“多个”对象数据被正确解析,解析结果共有三行。

(3)JSON文件中的数据模式一致,但每一行数据以两种形式出现:JSON字符串(非数组)、JSON数组字符串

假设数据模式包含三个属性:id、name、birthdate,测试数据如下所示:

Spark处理Json格式数据(Python)

其中第一行为JSON数组字符串,包含有两个JSON对象;第二行为JSON字符串,表示一个JSON对象。

Python测试代码同上,在终端上执行可以看到有两部分输出:

Spark处理Json格式数据(Python)

Spark处理Json格式数据(Python)

可以看到数据行混搭的情况下(同时包含JSON字符串(非数组)和JSON数组字符串),数据数据模式被正确推断,数据被正确解析。

(4)JSON文件中的数据模式不一致

假设数据模式有以下两种情况:

模式一:id、name、birthdate

模式二:id、name、birthdate、weight

测试数据如下所示:

Spark处理Json格式数据(Python)

其中第一行数据为JSON数组字符串,数据模式为模式一;第二行数据为JSON字符串(非数组),数据模式为模式二。

Python测试代码同上,在终端上执行可以看到有两部分输出:

Spark处理Json格式数据(Python)

Spark处理Json格式数据(Python)

可以看到数据模式被推断为模式二,数据也是按照模式二被解析;模式二相对于模式一多出一个属性“weight”,如果数据匹配模式一,则属性“weigth”的值以“None”的形式出现。

数据模式不一致还有一种比较复杂的情况:模式交错,如下:

模式一:id、name、birthdate、height

模式二:id、name、birthdate、weight

测试数据如下所示:

Spark处理Json格式数据(Python)

Python测试代码同上,在终端上执行可以看到有两部分输出:

Spark处理Json格式数据(Python)

Spark处理Json格式数据(Python)

可以看出数据模式被推断为模式一与模式二的并集,缺失的属性以“None”的形式出现。

结论:数据模式不一致时,推断模式为多个数据模式的并集。

(5)JSON文件中的数据模式不一致,且数据模式存在类型多样以及嵌套的情况;

模式一:id、name、birthdate、msg、extras

模式二:id、name、birthdate、weight

测试数据如下所示:

Spark处理Json格式数据(Python)

第一行是一个JSON数组字符串,包含两个JSON字符串,每个JSON字符串的属性msg是一个JSON字符串,属性extras是一个JSON数组字符串。

Python测试代码同上,在终端上执行可以看到有两部分输出:

Spark处理Json格式数据(Python)

Spark处理Json格式数据(Python)

可以看出数据模式被推断为模式一与模式二的并集,其中msg被解析为struct类型,extras被解析为array类型(元素类型为struct),缺失的属性以“None”的形式出现。

如果我们需要处理的JSON数据的模式是不一致的(对象属性不一致),我们在代码逻辑中如何进行判断呢?以(5)中的测试数据为例,属性weight并不是出现在所有数据行中,为了避免运行时抛出异常,需要根据是否包含属性weight作出不同的处理,这时就需要用到Python内建函数hasattr,如下:

Spark处理Json格式数据(Python)

代码中函数handle的作用:如果对象row含有属性weight,则返回对应的属性值;否则返回“no value”。而判断row是否含有属性“weight”是使用函数hasattr实现的。

通过上述的几个示例我们可以发现Spark可以正确应对常见的JSON日志格式,但我们在充分利用JSON特性的同时(日志属性灵活扩展、JSON处理方式统一),也应该考虑合理设计JSON日志数据模式,避免数据模式不一致或数据模式复杂给数据分析带来的不便。

jsonFile相当于是Spark提供给我们的便利工具,省去了我们自己解析JSON数据的麻烦,但也可能会出现工具失效的情况,大部分原因来自于数据格式不统一或不规范,此时我们就得自己解析JSON数据。

Python自带了一个操作JSON数据的库“json”,我们会经常使用到两个方法:

dumps:用于JSON对象序列化,即JSON对象 —> JSON字符串

loads:用于JSON字符串反序列化,即JSON字符串 —> JSON对象

方法使用示例如下所示:

Spark处理Json格式数据(Python)

解决了JSON数据的序列化与反序列化的问题后,还需要考虑一个情况:

数据分析时多数需要处理的都是文本数据,我们将一行文本(字符串)反序列化为一个JSON对象后,我们如何判断这个对象是一个普通JSON对象,还是一个JSON数组?特别是我们需要根据不同的数据类型进行不同的处理时,这种数据类型的判断就不可避免。

此时我们就需要用到Python的内建函数:isinstance,如下所示:

Spark处理Json格式数据(Python)

在终端执行上述代码,可得结果:

Spark处理Json格式数据(Python)

可以看出JSON数组对象的类型为“list”,而普通JSON对象的类型为“dict”。

总结

Spark在处理JSON数据时通常可以直接使用jsonFile,如果数据格式不统一、不规范或者我们需要更为灵活的数据处理方式时,则可以将文本数据以字符串的形式按行读入,然后使用Python JSON相关库、内建函数自行解析JSON数据。

正文到此结束
Loading...