Skip to content

Latest commit

 

History

History
161 lines (142 loc) · 5.2 KB

生产者和消费者.md

File metadata and controls

161 lines (142 loc) · 5.2 KB

synchronized

public class Test {

    private final LinkedList<String> lists = new LinkedList<>();

    public synchronized void put(String s) {
        while (lists.size() != 0) { // 用while怕有存在虚拟唤醒线程
            // 满了, 不生产了
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        lists.add(s);
        System.out.println(Thread.currentThread().getName() + " " + lists.peekFirst());
        this.notifyAll(); // 这里可是通知所有被挂起的线程,包括其他的生产者线程
    }

    public synchronized void get() {
        while (lists.size() == 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(Thread.currentThread().getName() + " " + lists.removeFirst());
        this.notifyAll(); // 通知所有被wait挂起的线程  用notify可能就死锁了。
    }

    public static void main(String[] args) {
        Test test = new Test();

        // 启动消费者线程
        for (int i = 0; i < 5; i++) {
            new Thread(test::get, "ConsA" + i).start();
        }

        // 启动生产者线程
        for (int i = 0; i < 5; i++) {
            int tempI = i;
            new Thread(() -> {
                test.put("" + tempI);
            }, "ProdA" + i).start();
        }
    }
}

这里讲一下为什么用while,不用if?

假如,此时队列元素为空,那么消费者肯定都挂起来了哈。在挂起前通知了生产者线程去生产,那么,生产者产了一个之后唤醒消费者,所有消费者醒了以后,就一个消费者抢到锁,开始消费,当消费过后释放锁,其他消费者线程的某一个抢到锁之后,从唤醒处走代码,如果是if,往下走取元素发现队列空的,直接抛异常。如果是while的话,还会继续判断队列是否为空,空就挂起。不会抛异常。

ReentrantLock

public class Test {

    private LinkedList<String> lists = new LinkedList<>();
    private Lock lock = new ReentrantLock();
    private Condition prod = lock.newCondition();
    private Condition cons = lock.newCondition();

    public void put(String s) {
        lock.lock();
        try {
            // 1. 判断
            while (lists.size() != 0) {
                // 只要队列有元素,就不生产了,就停会儿
                prod.await();
            }
            // 2.干活
            lists.add(s);
            System.out.println(Thread.currentThread().getName() + " " + lists.peekFirst());
            // 3. 通知
            cons.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void get() {
        lock.lock();
        try {
            // 1. 判断
            while (lists.size() == 0) {
                // 队列为空,消费者肯定等待呀
                cons.await();
            }
            // 2.干活
            System.out.println(Thread.currentThread().getName() + " " + lists.removeFirst());
            // 3. 通知
            prod.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        Test test = new Test();
        for (int i = 0; i < 5; i++) {
            int tempI = i;
            new Thread(() -> {
                test.put(tempI + "");
            }, "ProdA" + i).start();
        }
        for (int i = 0; i < 5; i++) {
            new Thread(test::get, "ConsA" + i).start();
        }
    }
}

BlockingQueue

public class Test {
    public static void main(String[] args) {
        ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
        // 生产者
        Runnable product = () -> {
            while (true) {
                try {
                    String s = "生产者:" + Thread.currentThread().getName() + " "+ new Object();
                    System.out.println(s);
                    queue.put(s);
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        new Thread(product, "p1").start();
        new Thread(product, "p2").start();
        // 消费者
        Runnable consume = () -> {
            while (true) {
                try {
                    Object o = queue.take();
                    System.out.println("消费者:" + Thread.currentThread().getName() + " " + o);
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        new Thread(consume, "c1").start();
        new Thread(consume, "c2").start();
    }
}

利用 BlockingQueue 实现生产者消费者模式的代码。虽然代码非常简单,但实际上 ArrayBlockingQueue 已经在背后完成了很多工作,比如队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等。