转载

Java性能 -- 并发设计模式

  1. 线程上下文指的是 贯穿线程整个生命周期 的对象中的一些 全局 信息,如Spring中的 ApplicationContext
  2. 可以使用 ThreadLocal 实现上下文
    • ThreadLocal是 线程本地变量 ,可以实现 多线程的数据隔离 ,每个线程只能访问各自内部的副本变量

Thread-Per-Message模式

  1. 一个消息一个线程
    • 在Socket通信中,一个线程监听IO事件,每当监听到一个IO事件,就交给另一个处理线程执行IO操作
  2. 如果遇到高并发,就会出现 严重的性能问题 ,因为线程在操作系统中也是昂贵的资源,不能无限制地创建
    • 如果针对每个IO请求都创建一个线程来处理,在有大量请求同时进来时,就会创建大量线程
    • 每次请求都需要 创建销毁 线程,性能开销很大
  3. 可以使用 线程池 来代替线程的创建和销毁

ServerHandler

@AllArgsConstructor
public class ServerHandler implements Runnable {
    private Socket socket;

    @Override
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        String msg;
        try {
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);
            while ((msg = in.readLine()) != null && msg.length() != 0) {
                System.out.println("server received : " + msg);
                out.print("received~/n");
                out.flush();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                in.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                out.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Server

public class Server {
    private static final int DEFAULT_PORT = 12345;
    private static ServerSocket server;

    public static void start() throws IOException {
        start(DEFAULT_PORT);
    }

    public static void start(int port) throws IOException {
        if (server != null) {
            return;
        }

        try {
            server = new ServerSocket(port);
            while (true) {
                Socket socket = server.accept();
                new Thread(new ServerHandler(socket)).start();
            }
        } finally {
            if (server != null) {
                server.close();
            }
        }
    }

    public static void main(String[] args) {
        new Thread(() -> {
            try {
                Server.start();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

Worker-Thread模式

  1. Worker是 工人 的意思,代表在Worker-Thread模式中,会有一些工人不断轮流处理过来的工作
    • 当没有工作时,工人会处于 等待 状态,直到有新的工作进来
    • 除了 工人 角色,Worker-Thread模式还包括了 流水线产品
  2. 相比于Thread-Per-Message模式
    • 可以减少频繁创建、销毁线程所带来的性能开销
    • 也能避免无限制创建线程所带来的内存溢出风险

Package

@Data
public class Package {
    private String name;
    private String address;

    public void execute() {
        System.out.println(Thread.currentThread().getName() + " executed " + this);
    }
}

Worker

public class Worker extends Thread {
    private static final Random RANDOM = new Random(System.currentTimeMillis());
    private final PackageChannel channel;

    public Worker(String name, PackageChannel channel) {
        super(name);
        this.channel = channel;
    }

    @Override
    public void run() {
        while (true) {
            channel.take().execute();
            try {
                Thread.sleep(RANDOM.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

PackageChannel

public class PackageChannel {
    private final static int MAX_PACKAGE_NUM = 100;

    private final Package[] packageQueue;
    private final Worker[] workerPool;
    private int head;
    private int tail;
    private int count;

    public PackageChannel(int workers) {
        this.packageQueue = new Package[MAX_PACKAGE_NUM];
        this.head = 0;
        this.tail = 0;
        this.count = 0;
        this.workerPool = new Worker[workers];
        this.init();
    }

    private void init() {
        for (int i = 0; i < workerPool.length; i++) {
            workerPool[i] = new Worker("Worker-" + i, this);
        }
    }

    /**
     * push switch to start all of worker to work
     */
    public void startWorker() {
        Arrays.asList(workerPool).forEach(Worker::start);
    }

    public synchronized void put(Package packageReq) {
        while (count >= packageQueue.length) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.packageQueue[tail] = packageReq;
        this.tail = (tail + 1) % packageQueue.length;
        this.count++;
        this.notifyAll();
    }

    public synchronized Package take() {
        while (count <= 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        Package request = this.packageQueue[head];
        this.head = (this.head + 1) % this.packageQueue.length;
        this.count--;
        this.notifyAll();
        return request;
    }
}

Test

public class Test {
    public static void main(String[] args) {
        // 新建8个工人
        final PackageChannel channel = new PackageChannel(8);
        // 开始工作
        channel.startWorker();
        // 为流水线添加包裹
        for (int i = 0; i < 100; i++) {
            Package packageReq = new Package();
            packageReq.setAddress("test");
            packageReq.setName("test");
            channel.put(packageReq);
        }
    }
}
原文  http://zhongmingmao.me/2019/09/21/java-performance-concurrent-design-pattern/
正文到此结束
Loading...