вторник, 22 июля 2008 г.

Нерешенная задачка: RebindableCondition

Реализовать RebindableCondition. Дополнительную информацию взять из обсуждение в этой ветке:
http://cs.oswego.edu/pipermail/concurrency-interest/2002-January/000000.html.
В качестве отправной точки взять такой пример кода:
class RebindableCondition implements Condition {
private Condition delegee;

public RebindableCondition(Condition delegee) {
this.delegee = delegee;
}

public void await()
throws InterruptedException {
delegee.await();
}

public void awaitUninterruptibly() {
delegee.awaitUninterruptibly();
}

public long awaitNanos(long nanosTimeout)
throws InterruptedException {
return delegee.awaitNanos(nanosTimeout);
}

public boolean await(long time, TimeUnit unit)
throws InterruptedException {
return delegee.await(time, unit);
}

public boolean awaitUntil(Date deadline)
throws InterruptedException {
return delegee.awaitUntil(deadline);
}

public void signal() {
delegee.signal();
}

public void signalAll() {
delegee.signalAll();
}

public void rebind(Condition newDelegee) {
this.delegee = newDelegee;
}
}

Компоненты:
1) конструктор, инициализирующий поле this.delegee;
2) все методы интерфейса Condition делегируются полю this.delegee;
3) метод rebind(Condition newDelegee) позволяет "сменить" this.delegee.

воскресенье, 20 июля 2008 г.

Потеря сигнала

Чем хороши пары [synchronize(...) {} + notifyAll()] или [Lock.lock() + Condition.signalAll()], так это тем, что при их использовании меньше возможностей потерять сигнал.

Собственно ошибка:
Пусть у нас есть "классическая" реализация bounded buffer на на основе пары [synchronize(...) {} + notifyAll()]

public class WaitNotifyAllBuffer {
private final T[] items;
private int takeIndex;
private int putIndex;
private int count;
private final Object lock = new Object();

public WaitNotifyAllBuffer(int capacity) {
this.items = (T[]) new Object[capacity];
}

private boolean isFull() {
return count == items.length;
}

private boolean isEmpty() {
return count == 0;
}

private void doPut(T elem) {
items[takeIndex] = elem;
if (++takeIndex == items.length) {
takeIndex = 0;
}
++count;
}

private T doTake() {
T elem = items[putIndex];
items[putIndex] = null;
if (++putIndex == items.length) {
putIndex = 0;
}
--count;
return elem;
}

public void put(T elem) throws InterruptedException {
synchronized (lock) {
while (isFull()) {
lock.wait();
}
doPut(elem);
lock.notifyAll();
}
}


public T take() throws InterruptedException {
synchronized (lock) {
while (isEmpty()) {
lock.wait();
}
lock.notifyAll();
return doTake();
}
}

}

И по каким-то причинам мы решаем перейти на пару [Lock.lock() + Condition.signal()], и делаем это вот таким способом

public class LockConditionBuffer {

protected final Lock lock = new ReentrantLock();
protected final Condition notEmpty = lock.newCondition();
protected final Condition notFull = lock.newCondition();


...
...
...

public void put(T elem) throws InterruptedException {
lock.lock();
try {
while (isFull()) {
notFull.await();
}
doPut(elem);
notEmpty.signal();
} finally {
lock.unlock();
}
}


public T take() throws InterruptedException {
lock.lock();
try {
while (isEmpty()) {
notEmpty.await();
}
notFull.signal();
return doTake();
} finally {
lock.unlock();
}
}

}

Сдесь кроется весьма тонкая ошибка:
Мы передаем сигнал из-рук-в-руки и если вылетим из, например, метода notEmpty.await(); в методе take(){...} по InterruptedException, ТО - ПОТЕРЯЕМ СИГНАЛ! в том смысле, что мы были единственными кто получил сигнал о возможности осуществить выемку из буфера. Допустим, что в очереди на take() стояло два потока на пустом буфере, третий поток вызвал put() и положил что-то в буфер, далее он нотифицировал об этом один из потоков в очереди на take(), но тот вылетел по ...

Суть проблемы в том, что метод Condition.await() может И получить сигнал И получить статус interrupted а в ответ ТОЛЬКО бросить InterruptedException, т.е. "проглотить" сигнал. Вот что об этом сказано в javadoc для Condition.await():
"... An implementation can favor responding to an interrupt over normal method return in response to a signal. In that case the implementation must ensure that the signal is redirected to another waiting thread, if there is one."
Таким образом, в качестве правильного решения следует рассматривать такое:

public class LockConditionBuffer {

protected final Lock lock = new ReentrantLock();
protected final Condition notEmpty = lock.newCondition();
protected final Condition notFull = lock.newCondition();


...
...
...

public void put(T elem) throws InterruptedException {
lock.lock();
try {
while (isFull()) {
try {
notFull.await();
} catch(InterruptedException e) {
notFull.signal();
throw e;
}
}
doPut(elem);
notEmpty.signal();
} finally {
lock.unlock();
}
}


public T take() throws InterruptedException {
lock.lock();
try {
while (isEmpty()) {
try {
notEmpty.await();
} catch(InterruptedException e) {
notEmpty.signal();
throw e;
}
}
notFull.signal();
return doTake();
} finally {
lock.unlock();
}
}

}

Замечу, что
1) "эталонная" реализация bounded buffer - java.util.concurrent.ArrayBlockingQueue содержит именно такой вариант;
2) пример из книги "Java Concurrency in Practice - Addison Wesley - 2006" содержит первый, не совсем корректный вариант.

Открытый вопрос: привести пример нетривиальной потери сигнала при использовании notifyAll() или signalAll().