Java手写生产者消费者

January 23, 2020

BlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class Main {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
        new Thread(new Producer(queue), "Producer1").start();
        new Thread(new Producer(queue), "Producer2").start();
        new Thread(new Consumer(queue), "Consumer1").start();
    }
}

class Producer implements Runnable {
    private final BlockingQueue<String> queue;

    public Producer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        final String name = Thread.currentThread().getName();
        for (int i = 0; i < 5; i++) {
            try {
                System.out.println(name + " produces message " + i);
                queue.put("Message " + i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable {
    private final BlockingQueue<String> queue;

    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        final String name = Thread.currentThread().getName();
        while (true) {
            try {
                System.out.println(name + " consumes " + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Synchronized-wait-notifyAll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class Main {
    private static final int THREAD_COUNT = 2;
    private static final int THREAD_PRODUCTS = 5;
    private static final Buffer<Integer> buffer = new Buffer<>(3);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            new Thread(() -> {
                for (int j = 0; j < THREAD_PRODUCTS; j++) {
                    try {
                        buffer.add(j);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        new Thread(() -> {
            while (true) {
                try {
                    System.out.println("消费数值:" + buffer.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

class Buffer<V> {
    private final List<V> list = new LinkedList<>();
    private final int size;

    public Buffer(int size) {
        this.size = size;
    }

    public void add(V v) throws InterruptedException {
        synchronized (this) {
            while (list.size() == size) {
                wait();
            }

            list.add(v);
            notifyAll();
        }
    }

    public V take() throws InterruptedException {
        synchronized (this) {
            while (list.size() == 0) {
                wait();
            }

            V v = list.remove(0);
            notifyAll();
            return v;
        }
    }
}

ReentrantLock-Condition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class Main {
    private static final int THREAD_COUNT = 2;
    private static final int THREAD_PRODUCTS = 5;
    private static final Buffer<Integer> buffer = new Buffer<>(2);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            new Thread(() -> {
                for (int j = 0; j < THREAD_PRODUCTS; j++) {
                    try {
                        buffer.add(j);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        new Thread(() -> {
            while (true) {
                try {
                    System.out.println("消费数值:" + buffer.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

class Buffer<V> {
    private final List<V> list = new LinkedList<V>();
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    private final int size;

    public Buffer(int size) {
        this.size = size;
    }

    public void add(V v) throws InterruptedException {
        lock.lock();
        try {
            while (list.size() == size) {
                notFull.await();
            }

            list.add(v);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public V take() throws InterruptedException {
        lock.lock();
        try {
            while (list.isEmpty()) {
                notEmpty.await();
            }

            V v = list.remove(0);
            notFull.signal();
            return v;
        } finally {
            lock.unlock();
        }
    }
}

Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class Main {
    private static final int THREAD_COUNT = 2;
    private static final int THREAD_PRODUCTS = 5;
    private static final Buffer<Integer> buffer = new Buffer<>(2);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            new Thread(() -> {
                for (int j = 0; j < THREAD_PRODUCTS; j++) {
                    try {
                        buffer.add(j);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        new Thread(() -> {
            while (true) {
                try {
                    System.out.println("消费数值:" + buffer.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

class Buffer<V> {
    private final List<V> list = new LinkedList<>();
    private final Semaphore mutex = new Semaphore(1);
    private final Semaphore notEmpty = new Semaphore(0);
    private final Semaphore notFull;

    public Buffer(int size) {
        notFull = new Semaphore(size);
    }

    public void add(V v) throws InterruptedException {
        notFull.acquire();

        try {
            mutex.acquire();
            list.add(v);
        } catch (InterruptedException e) {
            notFull.release();
        } finally {
            mutex.release();
            notEmpty.release();
        }
    }

    public V take() throws InterruptedException {
        notEmpty.acquire();

        try {
            mutex.acquire();
            return list.remove(0);
        } catch (InterruptedException e) {
            notEmpty.release();
            throw e;
        } finally {
            mutex.release();
            notFull.release();
        }
    }
}