本文主要研究一下flink的JDBCAppendTableSink
JDBCAppendTableSink sink = JDBCAppendTableSink.builder() .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") .setDBUrl("jdbc:derby:memory:ebookshop") .setQuery("INSERT INTO books (id) VALUES (?)") .setParameterTypes(INT_TYPE_INFO) .build(); tableEnv.registerTableSink( "jdbcOutputTable", // specify table schema new String[]{"id"}, new TypeInformation[]{Types.INT}, sink); Table table = ... table.insertInto("jdbcOutputTable");
flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> { private final JDBCOutputFormat outputFormat; private String[] fieldNames; private TypeInformation[] fieldTypes; JDBCAppendTableSink(JDBCOutputFormat outputFormat) { this.outputFormat = outputFormat; } public static JDBCAppendTableSinkBuilder builder() { return new JDBCAppendTableSinkBuilder(); } @Override public void emitDataStream(DataStream<Row> dataStream) { dataStream .addSink(new JDBCSinkFunction(outputFormat)) .name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames)); } @Override public void emitDataSet(DataSet<Row> dataSet) { dataSet.output(outputFormat); } @Override public TypeInformation<Row> getOutputType() { return new RowTypeInfo(fieldTypes, fieldNames); } @Override public String[] getFieldNames() { return fieldNames; } @Override public TypeInformation<?>[] getFieldTypes() { return fieldTypes; } @Override public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { int[] types = outputFormat.getTypesArray(); String sinkSchema = String.join(", ", IntStream.of(types).mapToObj(JDBCTypeUtil::getTypeName).collect(Collectors.toList())); String tableSchema = String.join(", ", Stream.of(fieldTypes).map(JDBCTypeUtil::getTypeName).collect(Collectors.toList())); String msg = String.format("Schema of output table is incompatible with JDBCAppendTableSink schema. " + "Table schema: [%s], sink schema: [%s]", tableSchema, sinkSchema); Preconditions.checkArgument(fieldTypes.length == types.length, msg); for (int i = 0; i < types.length; ++i) { Preconditions.checkArgument( JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i], msg); } JDBCAppendTableSink copy; try { copy = new JDBCAppendTableSink(InstantiationUtil.clone(outputFormat)); } catch (IOException | ClassNotFoundException e) { throw new RuntimeException(e); } copy.fieldNames = fieldNames; copy.fieldTypes = fieldTypes; return copy; } @VisibleForTesting JDBCOutputFormat getOutputFormat() { return outputFormat; } }
flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
public class JDBCAppendTableSinkBuilder { private String username; private String password; private String driverName; private String dbURL; private String query; private int batchSize = DEFAULT_BATCH_INTERVAL; private int[] parameterTypes; /** * Specify the username of the JDBC connection. * @param username the username of the JDBC connection. */ public JDBCAppendTableSinkBuilder setUsername(String username) { this.username = username; return this; } /** * Specify the password of the JDBC connection. * @param password the password of the JDBC connection. */ public JDBCAppendTableSinkBuilder setPassword(String password) { this.password = password; return this; } /** * Specify the name of the JDBC driver that will be used. * @param drivername the name of the JDBC driver. */ public JDBCAppendTableSinkBuilder setDrivername(String drivername) { this.driverName = drivername; return this; } /** * Specify the URL of the JDBC database. * @param dbURL the URL of the database, whose format is specified by the * corresponding JDBC driver. */ public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) { this.dbURL = dbURL; return this; } /** * Specify the query that the sink will execute. Usually user can specify * INSERT, REPLACE or UPDATE to push the data to the database. * @param query The query to be executed by the sink. * @see org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.JDBCOutputFormatBuilder#setQuery(String) */ public JDBCAppendTableSinkBuilder setQuery(String query) { this.query = query; return this; } /** * Specify the size of the batch. By default the sink will batch the query * to improve the performance * @param batchSize the size of batch */ public JDBCAppendTableSinkBuilder setBatchSize(int batchSize) { this.batchSize = batchSize; return this; } /** * Specify the type of the rows that the sink will be accepting. * @param types the type of each field */ public JDBCAppendTableSinkBuilder setParameterTypes(TypeInformation<?>... types) { int[] ty = new int[types.length]; for (int i = 0; i < types.length; ++i) { ty[i] = JDBCTypeUtil.typeInformationToSqlType(types[i]); } this.parameterTypes = ty; return this; } /** * Specify the type of the rows that the sink will be accepting. * @param types the type of each field defined by {@see java.sql.Types}. */ public JDBCAppendTableSinkBuilder setParameterTypes(int... types) { this.parameterTypes = types; return this; } /** * Finalizes the configuration and checks validity. * * @return Configured JDBCOutputFormat */ public JDBCAppendTableSink build() { Preconditions.checkNotNull(parameterTypes, "Types of the query parameters are not specified." + " Please specify types using the setParameterTypes() method."); JDBCOutputFormat format = JDBCOutputFormat.buildJDBCOutputFormat() .setUsername(username) .setPassword(password) .setDBUrl(dbURL) .setQuery(query) .setDrivername(driverName) .setBatchInterval(batchSize) .setSqlTypes(parameterTypes) .finish(); return new JDBCAppendTableSink(format); } }