Собственно ошибка:
Пусть у нас есть "классическая" реализация bounded buffer на на основе пары [synchronize(...) {} + notifyAll()]
public class WaitNotifyAllBuffer{
private final T[] items;
private int takeIndex;
private int putIndex;
private int count;
private final Object lock = new Object();
public WaitNotifyAllBuffer(int capacity) {
this.items = (T[]) new Object[capacity];
}
private boolean isFull() {
return count == items.length;
}
private boolean isEmpty() {
return count == 0;
}
private void doPut(T elem) {
items[takeIndex] = elem;
if (++takeIndex == items.length) {
takeIndex = 0;
}
++count;
}
private T doTake() {
T elem = items[putIndex];
items[putIndex] = null;
if (++putIndex == items.length) {
putIndex = 0;
}
--count;
return elem;
}
public void put(T elem) throws InterruptedException {
synchronized (lock) {
while (isFull()) {
lock.wait();
}
doPut(elem);
lock.notifyAll();
}
}
public T take() throws InterruptedException {
synchronized (lock) {
while (isEmpty()) {
lock.wait();
}
lock.notifyAll();
return doTake();
}
}
}
И по каким-то причинам мы решаем перейти на пару [Lock.lock() + Condition.signal()], и делаем это вот таким способом
public class LockConditionBuffer{
protected final Lock lock = new ReentrantLock();
protected final Condition notEmpty = lock.newCondition();
protected final Condition notFull = lock.newCondition();
...
...
...
public void put(T elem) throws InterruptedException {
lock.lock();
try {
while (isFull()) {
notFull.await();
}
doPut(elem);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (isEmpty()) {
notEmpty.await();
}
notFull.signal();
return doTake();
} finally {
lock.unlock();
}
}
}
Сдесь кроется весьма тонкая ошибка:
Мы передаем сигнал из-рук-в-руки и если вылетим из, например, метода notEmpty.await(); в методе take(){...} по InterruptedException, ТО - ПОТЕРЯЕМ СИГНАЛ! в том смысле, что мы были единственными кто получил сигнал о возможности осуществить выемку из буфера. Допустим, что в очереди на take() стояло два потока на пустом буфере, третий поток вызвал put() и положил что-то в буфер, далее он нотифицировал об этом один из потоков в очереди на take(), но тот вылетел по ...
Суть проблемы в том, что метод Condition.await() может И получить сигнал И получить статус interrupted а в ответ ТОЛЬКО бросить InterruptedException, т.е. "проглотить" сигнал. Вот что об этом сказано в javadoc для Condition.await():
"... An implementation can favor responding to an interrupt over normal method return in response to a signal. In that case the implementation must ensure that the signal is redirected to another waiting thread, if there is one."
Таким образом, в качестве правильного решения следует рассматривать такое:
public class LockConditionBuffer{
protected final Lock lock = new ReentrantLock();
protected final Condition notEmpty = lock.newCondition();
protected final Condition notFull = lock.newCondition();
...
...
...
public void put(T elem) throws InterruptedException {
lock.lock();
try {
while (isFull()) {
try {
notFull.await();
} catch(InterruptedException e) {
notFull.signal();
throw e;
}
}
doPut(elem);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (isEmpty()) {
try {
notEmpty.await();
} catch(InterruptedException e) {
notEmpty.signal();
throw e;
}
}
notFull.signal();
return doTake();
} finally {
lock.unlock();
}
}
}
Замечу, что
1) "эталонная" реализация bounded buffer - java.util.concurrent.ArrayBlockingQueue содержит именно такой вариант;
2) пример из книги "Java Concurrency in Practice - Addison Wesley - 2006" содержит первый, не совсем корректный вариант.
Открытый вопрос: привести пример нетривиальной потери сигнала при использовании notifyAll() или signalAll().
Комментариев нет:
Отправить комментарий