Spring JdbcTemplate从1.0版开始就一直在使用这个类,并且它的发展很好,但我希望在版本5中它会包含一些流式处理功能,用于查询很大数据结果,可惜没有发生。
尽管如此,有时我需要执行返回数百万行的查询,而且我不能使用JdbcTemplate方法来返回列表,RowCallbackHandler非常适合,但是如果收到Stream会好得多,不是吗?特别是如果你有自定义RowMappers ...
<b>public</b> <b>static</b> <T> Stream<T> streamForQuery(<b>int</b> bufferSize, T endOfStreamMarker, Consumer<Consumer<T>> query) { <b>final</b> LinkedBlockingQueue<T> queue = <b>new</b> LinkedBlockingQueue<>(bufferSize); <font><i>//This is the consumer that is usually passed to queries;</i></font><font> </font><font><i>//it will receive each item from the query and put it in the queue</i></font><font> Consumer<T> filler = t -> { <b>try</b> { </font><font><i>//Try to add to the queue, waiting up to 1 second</i></font><font> </font><font><i>//Honestly if after 1 second the queue is still full, either the stream consumer</i></font><font> </font><font><i>//needs some serious optimization or, more likely, a short-circuit terminal</i></font><font> </font><font><i>//operation was performed on the stream.</i></font><font> <b>if</b> (!queue.offer(t, 1, TimeUnit.SECONDS)) { </font><font><i>//If the queue is full after 1 second, time out.</i></font><font> </font><font><i>//Throw an exception to stop the producer queue.</i></font><font> log.error(</font><font>"Timeoud waiting to feed elements to stream"</font><font>); <b>throw</b> <b>new</b> BufferOverflowException(); } } <b>catch</b> (InterruptedException ex) { System.err.println(</font><font>"Interrupted trying to add item to stream"</font><font>); ex.printStackTrace(); } }; </font><font><i>//For the stream that we return, we use a Spliterator.</i></font><font> <b>return</b> StreamSupport.stream(() -> <b>new</b> Spliterators.AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED) { </font><font><i>//We need to know if the producer thread has been started</i></font><font> <b>private</b> <b>boolean</b> started = false; </font><font><i>//If there's an exception in the producer, keep it here</i></font><font> <b>private</b> <b>volatile</b> Throwable boom; </font><font><i>/** This method is called once, before advancing to the first element. * It will start the producer thread, which runs the query, passing it our * queue filler. */</i></font><font> <b>private</b> <b>void</b> startProducer() { </font><font><i>//Get the consumer thread</i></font><font> Thread interruptMe = Thread.currentThread(); </font><font><i>//First time this is called it will run the query in a separate thread</i></font><font> </font><font><i>//This is the producer thread</i></font><font> <b>new</b> Thread(() -> { <b>try</b> { </font><font><i>//Run the query, with our special consumer</i></font><font> query.accept(filler); } <b>catch</b> (BufferOverflowException ignore) { </font><font><i>//The filler threw this, means the queue is not being consumed fast enough</i></font><font> </font><font><i>//(or, more likely, not at all)</i></font><font> } <b>catch</b> (Throwable thr) { </font><font><i>//Something bad happened, store the exception and interrupt the reader</i></font><font> boom = thr; interruptMe.interrupt(); } }).start(); started = <b>true</b>; } @Override <b>public</b> <b>boolean</b> tryAdvance(Consumer<? <b>super</b> T> action) { <b>if</b> (!started) { startProducer(); } <b>try</b> { </font><font><i>//Take an item from the queue and if it's not the end of stream maker, pass it</i></font><font> </font><font><i>//to the action consumer.</i></font><font> T t = queue.take(); <b>if</b> (t != endOfStreamMarker) { action.accept(t); <b>return</b> <b>true</b>; } } <b>catch</b> (InterruptedException ex) { <b>if</b> (boom == <b>null</b>) { System.err.println(</font><font>"Interrupted reading from stream"</font><font>); ex.printStackTrace(); } <b>else</b> { </font><font><i>//Throw the exception from the producer on the consumer side</i></font><font> <b>throw</b> <b>new</b> RuntimeException(boom); } } <b>return</b> false; } }, Spliterator.IMMUTABLE, false); } </font>
<b>final</b> MyRow marker = <b>new</b> MyRow(); Stream<MyRow> stream = streamForQuery(100, marker, callback -> { <font><i>//Pass a RowCallbackHandler that passes a MyRow to the callback</i></font><font> jdbcTemplate.query(</font><font>"SELECT * FROM really_big_table_with_millions_of_rows"</font><font>, rs -> { callback.accept(myRowMapper.mapRow(rs, 0)); } ); </font><font><i>//Pass the marker to the callback, to signal end of stream</i></font><font> callback.accept(marker); }); </font>
stream = stream.filter(row -> row.isPretty());
Optional<MyRow> row = stream.skip(100_000).limit(1000).findAny();