原创

jdbc流streaming读取超大数据

这几天在做分库分表,涉及到数据迁移,然后设计的方案是使用mybatis将数据分批读取出来,每批大概40--50万条数据,然后将查询来的数据再insert进去,发现这个效率很低,程序有卡顿,而且客户端很容易OOM。
找了一下原因就是jdbc默认的读取数据的时候,会将要查询的数据一次性读到内存中,再通过resultSet循环读取出来,这样子40--50万条数据很容易就撑爆内存,然后调研了下发现,其实可以通过jdbc流的方式读取数据,这种方式可以将读取出来的每一条数据查询到客户端,再执行insert操作,这样机器负载就会轻很多,实际操作了下,确实是这样的

示例代码

package io.tapdata.flow.engine.V2.node.hazelcast.data.pdk;

public class StreamTest {

    public static void main(String[] args) throws SQLException, InterruptedException {
        StreamTest streamTest = new StreamTest();

        System.out.println("---------------------------");
 
        streamTest.selectData("select * from t_item_base_snapshot");

        System.out.println("---------------------------");

        streamTest.selectDataStream("select * from t_item_base_snapshot");
    }

    public static Connection getSqlConnection() {

        String url = "";
        String user = "";
        String password = "";
        String driverClass = "com.mysql.jdbc.Driver";

        Connection connection = null;
        try {
            Class.forName(driverClass);
            Properties info = new Properties();
            info.setProperty("user", user);//property的key必须是"user"
            info.setProperty("password", password);//property的key必须是"password"
            connection = DriverManager.getConnection(url, info);
            return connection;
        } catch (ClassNotFoundException e) {
            System.out.println("Not found driver class, load driver failed.");
        } catch (SQLException e) {
            System.out.println("create db connection error.");
        }
        return null;
    }


    public void selectData(String sqlCmd) throws SQLException {

        Connection conn = null;
        PreparedStatement stmt = null;
        ResultSet rs = null;

        try {

            conn = getSqlConnection();
            stmt = conn.prepareStatement(sqlCmd);
            rs = stmt.executeQuery();

            try {
                while (rs.next()) {
                    try {
                        System.out.println("one:" + rs.getString(1) + "two:" + rs.getString(2) + "thrid:" + rs.getString(3));
                    } catch (SQLException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            } catch (SQLException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        } finally {

        }
    }


    public void selectDataStream(String sqlCmd) throws SQLException {

        Connection conn = null;
        PreparedStatement stmt = null;
        ResultSet rs = null;

        try {

            conn = getSqlConnection();

            stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            stmt.setFetchSize(Integer.MIN_VALUE);

            rs = stmt.executeQuery();

            try {
                while (rs.next()) {
                    try {
                        System.out.println("one:" + rs.getString(1) + "two:" + rs.getString(2) + "thrid:" + rs.getString(3));
                    } catch (SQLException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            } catch (SQLException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        } finally {

        }
    }

}
查询的表中大概有10万条数据,分别运行main方法中的selectData和selectDataStream方法,发现selectData执行过程中程序会有卡顿,说明正在将符合条件的数据全部load进内存,然后一次展示出来;selectDataStream执行的时候没有卡顿,数据会快速的分批的查询出来;最重要的selectData占用的内存比selectDataStream高两三倍,cpu使用率也高了一些

DataStream运行情况:

20190403163143477

selectDataStream运行情况:

20190403162957294 具体使用stream来读取数据,只是需要在生成prepareStatement时加上ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY这两个参数,再设置一下FetchSize为Integer.MIN_VALUE就可以了,mysql判断是否开启流式读取结果就是看这三个条件forward-only,read-only,fatch size=Integer.MIN_VALUE有没有被设置
conn = DriverManager.getConnection("jdbc:mysql://localhost/?useCursorFetch=true", "user", "s3cr3t"); 
stmt = conn.createStatement(); 
stmt.setFetchSize(100);
rs = stmt.executeQuery("SELECT * FROM your_table_here");
如果不设置(不使用stream),就是默认的,mysql会将查询出来的数据全部返回客户端后,再执行ResultSet的next方法,在数据全部返回之前,next方法是阻塞的,设置了之后(使用stream),ResultSet的next方法就会立即返回server端返回的数据,这样客户端就不用使用大量的内存存储返回的数据了
除此之外,insert数据的时候,也可以使用jdbc提供的批次insert的方法addBatch(),以提高效率
还有就是,不管何种操作mysql的方式,比如常用的mybatis,因为底层都是使用的jdbc,所以要想高效的使用mysql,还是要从jdbc着手,要不好多坑还是绕不过去的

参考资料:

  • https://dev.mysql.com/doc/connectors/en/connector-j-reference-implementation-notes.html
  • https://blog.csdn.net/qq_22912803/article/details/88998415
正文到此结束
Loading...