http://cs.oswego.edu/pipermail/concurrency-interest/2002-January/000000.html.
В качестве отправной точки взять такой пример кода:
class RebindableCondition implements Condition {
private Condition delegee;
public RebindableCondition(Condition delegee) {
this.delegee = delegee;
}
public void await()
throws InterruptedException {
delegee.await();
}
public void awaitUninterruptibly() {
delegee.awaitUninterruptibly();
}
public long awaitNanos(long nanosTimeout)
throws InterruptedException {
return delegee.awaitNanos(nanosTimeout);
}
public boolean await(long time, TimeUnit unit)
throws InterruptedException {
return delegee.await(time, unit);
}
public boolean awaitUntil(Date deadline)
throws InterruptedException {
return delegee.awaitUntil(deadline);
}
public void signal() {
delegee.signal();
}
public void signalAll() {
delegee.signalAll();
}
public void rebind(Condition newDelegee) {
this.delegee = newDelegee;
}
}
Компоненты:
1) конструктор, инициализирующий поле this.delegee;
2) все методы интерфейса Condition делегируются полю this.delegee;
3) метод rebind(Condition newDelegee) позволяет "сменить" this.delegee.
1 комментарий:
Соображение #1:
Мы не можем оставить все как есть, так как без всякой синхронизации мы, как минимум, сталкиваемся с такими проблемами:
Проблема №1(unsafe publishing):
- Результат публикации нового delegee методом rebind() будет неопределенно долго не виден остальным методам;
- В том случае, если delegee не приспособлен к unsafe publishing, когда мы и увидим изменившуюся ссылку на новый delegee, мы можем не увидеть все дерево объяектов, связанное с delegee. Т.е. мы приходим к проблемам схожим с "DCL is broken";
Проблема №2(lost notify):
- Если мы вызываем rebind() в тот момент, когда какой-то поток "залип" в одном из методов awaitXXX(), то появляется возможность, что этот поток никогда не получит signal()/signalAll(), так как ссылка this.delegee будет указывать уже на другой объект.
Соображение #2:
Просто сделать this.delegee volatile/AtomicReference решает проблему №1 (unsafe publishing), но не решает проблему №2 (lost notify)
Соображение #3:
Окружить все методы synchronize{...} или ReentrantLock.lock()/.unlock() решает проблемы №1(unsafe publishing) и №2(lost notify), но порождает
Проблема №3(serialized access):
- теперь в каждый момент времени ТОЛЬКО ОДИН поток может находиться в awaitXXX(), что никак не соответствует спецификации для Condition.
Эта проблема связана с тем, что synchronize/ReentrantLock предоставляют exclusive доступ, а нам требуется для awaitXXX() - shared access + семантика read volatile, для rebind() - exclusive access + семантика write volatile, для signal()+signalAll() - семантика read volatile (нет необходимости в контроле доступа в принципе, мы посылаем сигнал тому delegee, который сейчас установлен).
Все идет к тому, что нам необходима ReadWriteLock. Делаем набросок:
class RebindableCondition implements Condition {
// set 'fair=true'
private final ReentrantReadWriteLock rwLock
= new ReentrantReadWriteLock(true);
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private volatile Condition delegee;
....
// other fields and methods here
....
public void await() throws InterruptedException {
readLock.lockInterruptibly();
try {
delegee.await();
} finally {
readLock.unlock();
}
}
public void signal() {
readLock.lock();
try {
delegee.signal();
} finally {
readLock.unlock();
}
}
public void rebind(Condition newDelegee) throws InterruptedException {
writeLock.lockInterruptibly();
try {
this.delegee = newDelegee;
} finally {
writeLock.unlock();
}
}
// added new method
public boolean tryRebind(Condition newDelegee) {
if (writeLock.tryLock()) {
try {
this.delegee = newDelegee;
} finally {
writeLock.unlock();
}
return true;
} else {
return false;
}
}
}
Соображение #4:
решает проблемы №1(unsafe publishing), №2(lost notify), №3(serialized access), но порождает как минимум 2 новых проблемы:
Проблема №4(fairness degradation):
Заключается в том, что могут ослабится условия честности(fairness) для "слишком честных" delegee. Например в методе await():
1: public void await() throws InterruptedException {
2: readLock.lockInterruptibly();
3: try {
4: delegee.await();
5: } finally {
6: readLock.unlock();
7: }
8: }
между строками 1 и 3 планировщик потоков может приостановить поток и тогда, хотя он ПЕРВЫМ вошел в RebindableCondition.await() и ПЕРВЫМ прошел readLock.lockInterruptibly(), он может войти в delegee.await() - НЕ ПЕРВЫМ. Похоже это проблема для всех вложенных вызовов сопровождаемых получением lock with shared access. Если бы в нашем случае в строке 2 было fair writeLock, а не readLock, то тот, кто бы первым достиг writeLock тот бы первым и вызвал delegee.await(). Важно заметить, что часть свойств честности все же сохраняется. А именно: тот кто достиг RebindableCondition.await() достигнет и delegee.await(). Это звучит странно, но ведь при большой конкуренции на захват non fair lock нам спецификация не обещает, что каждый из потоков в конце концов получит доступ. Спецификация не запрещает ситуацию когда другие потоки будут раз за разом получать доступ, а один из потоков не получит никогда (предполагается, что в очереди на захвал блокировки он все время не один). Таким образом нам удалось частично сохранить свойства живучести. Будем считать, что мы хорошо задокументируем свойства нашей обертки и остановимся на этом.
Проблема №5(non correct blocking in await):
Проблема состоит в том, что единственная реализация ReadWriteLock поставляемая с JDK 6.0, а именно java.util.concurrent.ReentrantReadWriteLock с параметров fairness=true, обладает следующим свойством(from javadoc) "...[in] Fair mode[:] ... A thread that tries to acquire a fair read lock (non-reentrantly) will block if either the write lock is held, or there is a waiting writer thread. The thread will not acquire the read lock until after the oldest currently waiting writer thread has acquired and released the write lock. ...". Таким образом, если есть читатель/читатели внутри и снаружи ожидает хотя бы один писатель, то новые читатели не войдут, а будут ждать пока не обслужатся все писатели пришедшие раньше. Это приведет к тому, что поток, вызвавший RebindableCondition.await() может быть, как и ожидалось, заблокирован, но не реагировать на signal/signalAll, так как будет ожидать получения readLock.lock() а не спать в delegee.await(). Что бы этого не произошло, можно использовать tryRebind(), этот метод не блокирует новых читателей, но его неоходимо вызывать в цикле, пока не удасться сменить delegee. Таким образом мы приходим к тому, что было бы крайне желетельно иметь такую реализацию RWL, что новые читатели могут войти если внутри уже есть читатели и их не блокирует наличие писателя в очереди ожидания. Конечно, тогда писатель сможет войти исключительно когда читатели кончатся "самопроизвольно", так как он не может никак на них влиять. Соответственно писатель может надолго быть заблокирован в воем вызове, но это точнее сохранит свойтсва "оборачиваемого" delegee.
Выходит, мы должны или написать сами или найти другую реализацию RWL.
Отправить комментарий