方法其实挺简单的,disruptor有优雅停机方法,不用我们自己去实现逻辑,只需要调用disruptor.shutdown()
;
就可以实现优雅关闭。
使用kill -9
命令强制终止进程在某些情况下可能会导致数据丢失或资源未正确释放。以下是一些原因和替代方案,帮助你安全地停止应用程序:
kill -9
数据丢失:kill -9
会立即终止进程,不会给应用程序任何机会去保存数据或完成正在进行的操作。
资源泄漏:进程被强制终止后,可能无法正确释放内存、文件句柄或网络连接等资源。
不执行清理逻辑:应用程序通常在关闭时执行一些清理逻辑(如关闭数据库连接、写入日志等),kill -9
会跳过这些步骤。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
放开shutdown接口
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
shutdown:
enabled: true
server:
port: 8088
然后post http://127.0.0.1:8088/actuator/shutdown
实现优雅停机,但是spring boot 2.3以下,停止后不能停止api继续对外。我们可以使用过滤器来禁止api对外提供服务,手动设置HttpServletResponse.SC_SERVICE_UNAVAILABLE
package com.et.disruptor.config;
import org.springframework.stereotype.Component;
import javax.servlet.*;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
@Component
public class GracefulShutdownFilter implements Filter {
private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
if (shuttingDown.get()) {
((HttpServletResponse) response).setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return;
}
chain.doFilter(request, response);
}
public void startShutdown() {
shuttingDown.set(true);
}
}
DisposableBean
是Spring框架中的一个接口,用于在Spring容器销毁Bean时执行一些自定义的清理逻辑。实现这个接口的Bean会在容器关闭时自动调用其destroy()
方法。这对于需要在应用程序关闭时释放资源或执行其他清理操作的Bean非常有用。
package com.et.disruptor.config;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class GracefulShutdownManager implements DisposableBean {
@Autowired
private GracefulShutdownFilter shutdownFilter;
@Autowired
MqManager mqManager;
@Override
public void destroy() throws Exception {
// reject new requests
shutdownFilter.startShutdown();
//graceful shutdown Disruptor
mqManager.shutdownDisruptor(); // wait all events to complete
// wait all your self-definite task finish
waitForTasksToComplete();
}
private void waitForTasksToComplete() throws InterruptedException {
System.out.println("Waiting for tasks to complete...");
// use CountDownLatch or other
//mock task process
Thread.sleep(100000);
}
}
@PreDestroy
package com.et.disruptor.config;
import com.et.disruptor.event.HelloEventFactory;
import com.et.disruptor.event.HelloEventHandler;
import com.et.disruptor.model.MessageModel;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Configuration
public class MqManager {
private static Disruptor<MessageModel> disruptor;
@Bean("ringBuffer")
public RingBuffer<MessageModel> messageModelRingBuffer() {
//define the thread pool for consumer message hander, Disruptor touch the consumer event to process by java.util.concurrent.ExecutorSerivce
ExecutorService executor = Executors.newFixedThreadPool(2);
//define Event Factory
HelloEventFactory factory = new HelloEventFactory();
//ringbuffer byte size
int bufferSize = 1024 * 256;
disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy());
//set consumer event
disruptor.handleEventsWith(new HelloEventHandler());
//start disruptor thread
disruptor.start();
//gain ringbuffer ring,to product event
RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();
return ringBuffer;
}
//@PreDestroy
public void shutdownDisruptor() {
if (disruptor != null) {
System.out.println("close Disruptor...");
disruptor.shutdown(); //cl0se Disruptor
}
}
}
public void shutdown(long timeout, TimeUnit timeUnit) throws TimeoutException {
long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout);
do {
if (!this.hasBacklog()) {
this.halt();
return;
}
} while(timeout < 0L || System.currentTimeMillis() <= timeOutAt);
throw TimeoutException.INSTANCE;
}
private boolean hasBacklog() {
long cursor = this.ringBuffer.getCursor();
Sequence[] var3 = this.consumerRepository.getLastSequenceInChain(false);
int var4 = var3.length;
for(int var5 = 0; var5 < var4; ++var5) {
Sequence consumer = var3[var5];
if (cursor > consumer.get()) {
return true;
}
}
return false;
}