转载

mysql 流式查询中 中断关闭流出现假死现象

版本

mysql:mysql-connector-java:8.0.30

现象

使用流式处理,处理过程中希望中断,关闭流却无响应
@Override
public void close() throws Exception {
    if (!connection.isClosed()) {
        if (!statement.isClosed()) {
            statement.close();//这行代码也等待很长时间,表越大,等待时间越长
        }
        connection.close();
    }
}

原因

mysql需要将流式查询中的所有记录全部读取才能关闭流 中断时剩余的记录数量过多,遍历时间长导致假死现象

源码

com.mysql.cj.protocol.a.result.ResultsetRowsStreaming
// 关闭流
public void close() {
 Object mutex = this.owner != null && this.owner.getSyncMutex() != null ? this.owner.getSyncMutex() : this;

 boolean hadMore = false;
 int howMuchMore = 0;

 synchronized (mutex) {
 // 读取剩余的所有记录
 while (next() != null) {
 hadMore = true;
 howMuchMore++;

 if (howMuchMore % 100 == 0) {
 Thread.yield();
 }
 }

 if (!this.protocol.getPropertySet().getBooleanProperty(PropertyKey.clobberStreamingResults).getValue()
 && this.protocol.getPropertySet().getIntegerProperty(PropertyKey.netTimeoutForStreamingResults).getValue() > 0) {
 int oldValue = this.protocol.getServerSession().getServerVariable("net_write_timeout", 60);

 this.protocol.clearInputStream();

 try {
 this.protocol.sendCommand(this.commandBuilder.buildComQuery(this.protocol.getSharedSendPacket(), "SET net_write_timeout=" + oldValue,
 this.protocol.getPropertySet().getStringProperty(PropertyKey.characterEncoding).getValue()), false, 0);
 } catch (Exception ex) {
 throw ExceptionFactory.createException(ex.getMessage(), ex, this.exceptionInterceptor);
 }
 }

 if (this.protocol.getPropertySet().getBooleanProperty(PropertyKey.useUsageAdvisor).getValue()) {
 if (hadMore) {
 this.owner.getSession().getProfilerEventHandler().processEvent(ProfilerEvent.TYPE_USAGE, this.owner.getSession(),
 this.owner.getOwningQuery(), null, 0, new Throwable(),
 Messages.getString("RowDataDynamic.1", new String[] { String.valueOf(howMuchMore), this.owner.getPointOfOrigin() }));
 }
 }
 }

 this.metadata = null;
 this.owner = null;
}

修改方案

原来的流式查询代码修改成游标查询

流式查询

public void enableStreamingResults() throws SQLException {
    try {
        synchronized(this.checkClosed().getConnectionMutex()) {
            this.originalResultSetType = this.query.getResultType();
            this.originalFetchSize = this.query.getResultFetchSize();
            this.setFetchSize(Integer.MIN_VALUE);
            this.setResultSetType(Type.FORWARD_ONLY);
        }
    } catch (CJException var5) {
        throw SQLExceptionsMapping.translateException(var5, this.getExceptionInterceptor());
    }
}

游标查询

public void query(String sql, ResultSetConsumer resultSetConsumer) throws SQLException {
    try (
            Connection connection = getConnection();
            Statement statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
    ) {
        statement.setFetchSize(2000); //protected from OM
        try (
                ResultSet resultSet = statement.executeQuery(sql)
        ) {
            if (EmptyKit.isNotNull(resultSet)) {
                resultSetConsumer.accept(resultSet);
            }
        }
    }
}
 
正文到此结束
Loading...