BlockingQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class Main {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
new Thread(new Producer(queue), "Producer1").start();
new Thread(new Producer(queue), "Producer2").start();
new Thread(new Consumer(queue), "Consumer1").start();
}
}
class Producer implements Runnable {
private final BlockingQueue<String> queue;
public Producer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
final String name = Thread.currentThread().getName();
for (int i = 0; i < 5; i++) {
try {
System.out.println(name + " produces message " + i);
queue.put("Message " + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
private final BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
final String name = Thread.currentThread().getName();
while (true) {
try {
System.out.println(name + " consumes " + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Synchronized-wait-notifyAll
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class Main {
private static final int THREAD_COUNT = 2;
private static final int THREAD_PRODUCTS = 5;
private static final Buffer<Integer> buffer = new Buffer<>(3);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
for (int j = 0; j < THREAD_PRODUCTS; j++) {
try {
buffer.add(j);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
new Thread(() -> {
while (true) {
try {
System.out.println("消费数值:" + buffer.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
class Buffer<V> {
private final List<V> list = new LinkedList<>();
private final int size;
public Buffer(int size) {
this.size = size;
}
public void add(V v) throws InterruptedException {
synchronized (this) {
while (list.size() == size) {
wait();
}
list.add(v);
notifyAll();
}
}
public V take() throws InterruptedException {
synchronized (this) {
while (list.size() == 0) {
wait();
}
V v = list.remove(0);
notifyAll();
return v;
}
}
}
ReentrantLock-Condition
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class Main {
private static final int THREAD_COUNT = 2;
private static final int THREAD_PRODUCTS = 5;
private static final Buffer<Integer> buffer = new Buffer<>(2);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
for (int j = 0; j < THREAD_PRODUCTS; j++) {
try {
buffer.add(j);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
new Thread(() -> {
while (true) {
try {
System.out.println("消费数值:" + buffer.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
class Buffer<V> {
private final List<V> list = new LinkedList<V>();
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final int size;
public Buffer(int size) {
this.size = size;
}
public void add(V v) throws InterruptedException {
lock.lock();
try {
while (list.size() == size) {
notFull.await();
}
list.add(v);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public V take() throws InterruptedException {
lock.lock();
try {
while (list.isEmpty()) {
notEmpty.await();
}
V v = list.remove(0);
notFull.signal();
return v;
} finally {
lock.unlock();
}
}
}
Semaphore
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class Main {
private static final int THREAD_COUNT = 2;
private static final int THREAD_PRODUCTS = 5;
private static final Buffer<Integer> buffer = new Buffer<>(2);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
for (int j = 0; j < THREAD_PRODUCTS; j++) {
try {
buffer.add(j);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
new Thread(() -> {
while (true) {
try {
System.out.println("消费数值:" + buffer.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
class Buffer<V> {
private final List<V> list = new LinkedList<>();
private final Semaphore mutex = new Semaphore(1);
private final Semaphore notEmpty = new Semaphore(0);
private final Semaphore notFull;
public Buffer(int size) {
notFull = new Semaphore(size);
}
public void add(V v) throws InterruptedException {
notFull.acquire();
try {
mutex.acquire();
list.add(v);
} catch (InterruptedException e) {
notFull.release();
} finally {
mutex.release();
notEmpty.release();
}
}
public V take() throws InterruptedException {
notEmpty.acquire();
try {
mutex.acquire();
return list.remove(0);
} catch (InterruptedException e) {
notEmpty.release();
throw e;
} finally {
mutex.release();
notFull.release();
}
}
}