这几天在做分库分表,涉及到数据迁移,然后设计的方案是使用mybatis将数据分批读取出来,每批大概40--50万条数据,然后将查询来的数据再insert进去,发现这个效率很低,程序有卡顿,而且客户端很容易OOM。
找了一下原因就是jdbc默认的读取数据的时候,会将要查询的数据一次性读到内存中,再通过resultSet循环读取出来,这样子40--50万条数据很容易就撑爆内存,然后调研了下发现,其实可以通过jdbc流的方式读取数据,这种方式可以将读取出来的每一条数据查询到客户端,再执行insert操作,这样机器负载就会轻很多,实际操作了下,确实是这样的
示例代码
package io.tapdata.flow.engine.V2.node.hazelcast.data.pdk;
public class StreamTest {
public static void main(String[] args) throws SQLException, InterruptedException {
StreamTest streamTest = new StreamTest();
System.out.println("---------------------------");
streamTest.selectData("select * from t_item_base_snapshot");
System.out.println("---------------------------");
streamTest.selectDataStream("select * from t_item_base_snapshot");
}
public static Connection getSqlConnection() {
String url = "";
String user = "";
String password = "";
String driverClass = "com.mysql.jdbc.Driver";
Connection connection = null;
try {
Class.forName(driverClass);
Properties info = new Properties();
info.setProperty("user", user);//property的key必须是"user"
info.setProperty("password", password);//property的key必须是"password"
connection = DriverManager.getConnection(url, info);
return connection;
} catch (ClassNotFoundException e) {
System.out.println("Not found driver class, load driver failed.");
} catch (SQLException e) {
System.out.println("create db connection error.");
}
return null;
}
public void selectData(String sqlCmd) throws SQLException {
Connection conn = null;
PreparedStatement stmt = null;
ResultSet rs = null;
try {
conn = getSqlConnection();
stmt = conn.prepareStatement(sqlCmd);
rs = stmt.executeQuery();
try {
while (rs.next()) {
try {
System.out.println("one:" + rs.getString(1) + "two:" + rs.getString(2) + "thrid:" + rs.getString(3));
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} finally {
}
}
public void selectDataStream(String sqlCmd) throws SQLException {
Connection conn = null;
PreparedStatement stmt = null;
ResultSet rs = null;
try {
conn = getSqlConnection();
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(Integer.MIN_VALUE);
rs = stmt.executeQuery();
try {
while (rs.next()) {
try {
System.out.println("one:" + rs.getString(1) + "two:" + rs.getString(2) + "thrid:" + rs.getString(3));
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} finally {
}
}
}
查询的表中大概有10万条数据,分别运行main方法中的selectData和selectDataStream方法,发现selectData执行过程中程序会有卡顿,说明正在将符合条件的数据全部load进内存,然后一次展示出来;selectDataStream执行的时候没有卡顿,数据会快速的分批的查询出来;最重要的selectData占用的内存比selectDataStream高两三倍,cpu使用率也高了一些
DataStream运行情况:
selectDataStream运行情况:
具体使用stream来读取数据,只是需要在生成prepareStatement时加上ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY这两个参数,再设置一下FetchSize为Integer.MIN_VALUE就可以了,mysql判断是否开启流式读取结果就是看这三个条件forward-only,read-only,fatch size=Integer.MIN_VALUE有没有被设置
conn = DriverManager.getConnection("jdbc:mysql://localhost/?useCursorFetch=true", "user", "s3cr3t");
stmt = conn.createStatement();
stmt.setFetchSize(100);
rs = stmt.executeQuery("SELECT * FROM your_table_here");
如果不设置(不使用stream),就是默认的,mysql会将查询出来的数据全部返回客户端后,再执行ResultSet的next方法,在数据全部返回之前,next方法是阻塞的,设置了之后(使用stream),ResultSet的next方法就会立即返回server端返回的数据,这样客户端就不用使用大量的内存存储返回的数据了
除此之外,insert数据的时候,也可以使用jdbc提供的批次insert的方法addBatch(),以提高效率
还有就是,不管何种操作mysql的方式,比如常用的mybatis,因为底层都是使用的jdbc,所以要想高效的使用mysql,还是要从jdbc着手,要不好多坑还是绕不过去的
参考资料:
- https://dev.mysql.com/doc/connectors/en/connector-j-reference-implementation-notes.html
- https://blog.csdn.net/qq_22912803/article/details/88998415