使用 Flink 编写处理逻辑时,新手总是容易被林林总总的概念所混淆:
为什么 Flink 有那么多的类型声明方式?
BasicTypeInfo.STRING_TYPE_INFO、Types.STRING 、Types.STRING() 有何区别?
TypeInfoFactory 又是什么?
TypeInformation.of 和 TypeHint 是如何使用的呢?
接下来本文将逐步解密 Flink 的类型和序列化机制。
Flink 的类型系统源码位于 org.apache.flink.api.common.typeinfo 包,让我们对图 1 深入追踪,看一下类的继承关系图:
可以看到,图 1 和 图 2 是一一对应的,TypeInformation 类是描述一切类型的公共基类,它和它的所有子类必须可序列化(Serializable),因为类型信息将会伴随 Flink 的作业提交,被传递给每个执行节点。
由于 Flink 自己管理内存,采用了一种非常紧凑的存储格式(见 官方博文 ),因而类型信息在整个数据处理流程中属于至关重要的元数据。
Flink 内部实现了名为 TypeExtractror 的类,可以利用方法签名、子类信息等蛛丝马迹,自动提取和恢复类型信息(当然也可以显式声明,即本文所介绍的内容)。
然而由于 Java 的类型擦除,自动提取并不是总是有效。因而一些情况下(例如通过 URLClassLoader 动态加载的类),仍需手动处理;例如下图中对 DataSet 变换时,使用 .returns() 方法声明返回类型。
这里需要说明一下,returns() 接受三种类型的参数:字符串描述的类名(例如 "String")、TypeHint(接下来会讲到,用于泛型类型参数)、Java 原生 Class(例如 String.class) 等;不过字符串形式的用法即将废弃,如果确实有必要,请使用 Class.forName() 等方法来解决。
下面是 ExecutionEnvironment 类的 registerType 方法,它可以向 Flink 注册子类信息(Flink 认识父类,但不一定认识子类的一些独特特性,因而需要注册),下面是 Flink-ML 机器学习库代码的例子:
从下图可以看到,如果通过 TypeExtractor.createTypeInfo(type) 方法获取到的类型信息属于 PojoTypeInfo 及其子类,那么将其注册到一起;否则统一交给 Kryo 去处理,Flink 并不过问(这种情况下性能会变差)。
通过 TypeInformation.of() 方法,可以简单地创建类型信息对象。
TypeHint 的原理是创建匿名子类,运行时 TypeExtractor 可以通过 getGenericSuperclass(). getActualTypeArguments() 方法获取保存的实际类型。
例如 BasicTypeInfo,这个类定义了一系列常用类型的快捷方式,对于 String、Boolean、Byte、Short、Integer、Long、Float、Double、Char 等基本类型的类型声明,可以直接使用。
例如下面是对 Row 类型各字段的类型声明,使用方法非常简明,不再需要 new XxxTypeInfo<>(很多很多参数)
当然,如果觉得 BasicTypeInfo 还是太长,Flink 还提供了完全等价的 Types 类(org.apache.flink.api.common.typeinfo.Types):
通过自定义 TypeInfo 为任意类提供 Flink 原生内存管理(而非 Kryo),可令存储更紧凑,运行时也更高效。
开发者在自定义类上使用 @TypeInfo 注解,随后创建相应的 TypeInfoFactory 并覆盖 createTypeInfo 方法。
注意需要继承 TypeInformation 类,为每个字段定义类型,并覆盖元数据方法,例如是否是基本类型(isBasicType)、是否是 Tuple(isTupleType)、元数(对于一维的 Row 类型,等于字段的个数)等等,从而为 TypeExtractor 提供决策依据。
更多示例,请参考 Flink 源码的 org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
Flink 自带了很多 TypeSerializer 子类,大多数情况下各种自定义类型都是常用类型的排列组合,因而可以直接复用:
如果不能满足,那么可以继承 TypeSerializer 及其子类以实现自己的序列化器。
对于 Flink 无法序列化的类型(例如用户自定义类型,没有 registerType,也没有自定义 TypeInfo 和 TypeInfoFactory),默认会交给 Kryo 处理。
如果 Kryo 仍然无法处理(例如 Guava、Thrift、Protobuf 等第三方库的一些类),有以下两种解决方案:
1. 可以强制使用 Avro 来替代 Kryo:
env.getConfig().enableForceAvro(); // env 代表 ExecutionEnvironment 对象, 下同
2. 为 Kryo 增加自定义的 Serializer 以增强 Kryo 的功能:
env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass
以及
env.getConfig().registerTypeWithKryoSerializer(Class<?> type, T serializer)
如果希望完全禁用 Kryo(100% 使用 Flink 的序列化机制),则可以使用以下设置,但注意一切无法处理的类都将导致异常:
env.getConfig().disableGenericTypes();
金无足赤,人无完人。Flink 内置的类型系统虽然强大而灵活,但仍然有一些需要注意的点:
由于 Flink 类型提取依赖于继承等机制,而 lambda 函数比较特殊,它是匿名的,也没有与之相关的类,所以其类型信息较难获取。
Eclipse 的 JDT 编译器会把 lambda 函数的泛型签名等信息写入编译后的字节码中,而对于 javac 等常见的其他编译器,则不会这样做,因而 Flink 就无法获取具体类型信息了。
2. Kryo 的 JavaSerializer 在 Flink 下存在 Bug
推荐使用 org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer 而非 com.esotericsoftware.kryo.serializers.JavaSerializer 以防止与 Flink 不兼容。
下面以 StringSerializer 为例,来看下 Flink 是如何紧凑管理内存的:
下面是具体的序列化过程:
可以看到,Flink 对于内存管理是非常细致的,层次分明,代码也容易理解。
Data Types & Serialization
Flink 原理与实现:内存管理
Flink 的数据类型和序列化