版本
mysql:mysql-connector-java:8.0.30
现象
使用流式处理,处理过程中希望中断,关闭流却无响应
@Override
public void close() throws Exception {
if (!connection.isClosed()) {
if (!statement.isClosed()) {
statement.close();//这行代码也等待很长时间,表越大,等待时间越长
}
connection.close();
}
}
原因
mysql需要将流式查询中的所有记录全部读取才能关闭流
中断时剩余的记录数量过多,遍历时间长导致假死现象
源码
com.mysql.cj.protocol.a.result.ResultsetRowsStreaming
// 关闭流
public void close() {
Object mutex = this.owner != null && this.owner.getSyncMutex() != null ? this.owner.getSyncMutex() : this;
boolean hadMore = false;
int howMuchMore = 0;
synchronized (mutex) {
// 读取剩余的所有记录
while (next() != null) {
hadMore = true;
howMuchMore++;
if (howMuchMore % 100 == 0) {
Thread.yield();
}
}
if (!this.protocol.getPropertySet().getBooleanProperty(PropertyKey.clobberStreamingResults).getValue()
&& this.protocol.getPropertySet().getIntegerProperty(PropertyKey.netTimeoutForStreamingResults).getValue() > 0) {
int oldValue = this.protocol.getServerSession().getServerVariable("net_write_timeout", 60);
this.protocol.clearInputStream();
try {
this.protocol.sendCommand(this.commandBuilder.buildComQuery(this.protocol.getSharedSendPacket(), "SET net_write_timeout=" + oldValue,
this.protocol.getPropertySet().getStringProperty(PropertyKey.characterEncoding).getValue()), false, 0);
} catch (Exception ex) {
throw ExceptionFactory.createException(ex.getMessage(), ex, this.exceptionInterceptor);
}
}
if (this.protocol.getPropertySet().getBooleanProperty(PropertyKey.useUsageAdvisor).getValue()) {
if (hadMore) {
this.owner.getSession().getProfilerEventHandler().processEvent(ProfilerEvent.TYPE_USAGE, this.owner.getSession(),
this.owner.getOwningQuery(), null, 0, new Throwable(),
Messages.getString("RowDataDynamic.1", new String[] { String.valueOf(howMuchMore), this.owner.getPointOfOrigin() }));
}
}
}
this.metadata = null;
this.owner = null;
}
修改方案
原来的流式查询代码修改成游标查询
流式查询
public void enableStreamingResults() throws SQLException {
try {
synchronized(this.checkClosed().getConnectionMutex()) {
this.originalResultSetType = this.query.getResultType();
this.originalFetchSize = this.query.getResultFetchSize();
this.setFetchSize(Integer.MIN_VALUE);
this.setResultSetType(Type.FORWARD_ONLY);
}
} catch (CJException var5) {
throw SQLExceptionsMapping.translateException(var5, this.getExceptionInterceptor());
}
}
游标查询
public void query(String sql, ResultSetConsumer resultSetConsumer) throws SQLException {
try (
Connection connection = getConnection();
Statement statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
) {
statement.setFetchSize(2000); //protected from OM
try (
ResultSet resultSet = statement.executeQuery(sql)
) {
if (EmptyKit.isNotNull(resultSet)) {
resultSetConsumer.accept(resultSet);
}
}
}
}