Các tùy chọn sau đây đã được nâng lên quá (không chắc chắn nếu điều này phải ở trong một câu trả lời cho bản thân mình nhưng không thể tìm thấy một nơi tốt hơn để viết những dòng này):
Tạo một wrapper cho hàng đợi. Trình bao bọc này sẽ có một màn hình sẽ được chờ đợi khi đọc bởi người tiêu dùng và sẽ được nhà sản xuất thông báo bất cứ khi nào một đối tượng mới được thêm vào hoặc cờ của isDone được nâng lên.
Khi người tiêu dùng đọc các đối tượng từ hàng đợi, các đối tượng này sẽ được bao bọc bằng thứ gì đó tương tự như những gì @ yann-ramin đề xuất ở trên. Tuy nhiên, để giảm chi phí, người tiêu dùng sẽ cung cấp một trường hợp duy nhất, có thể tái sử dụng, của QueueMessage khi mọi cuộc gọi đọc (nó sẽ luôn luôn là cùng một ví dụ). Trình bao bọc hàng đợi sẽ cập nhật các trường tương ứng trước khi trả về cá thể cho người tiêu dùng.
Điều này tránh bất kỳ việc sử dụng hết thời gian chờ, ngủ, v.v.
EDITED Đây là một thực hiện đề xuất:
/**
* This work queue is designed to be used by ONE producer and ONE consumer
* (no more, no less of neither). The work queue has certain added features, such
* as the ability to signal that the workload generation is done and nothing will be
* added to the queue.
*
* @param <E>
*/
public class DefiniteWorkQueue<E> {
private final E[] EMPTY_E_ARRAY;
private LinkedBlockingQueue<E> underlyingQueue = new LinkedBlockingQueue<E>();
private boolean isDone = false;
// This monitor allows for flagging when a change was done.
private Object changeMonitor = new Object();
public DefiniteWorkQueue(Class<E> clazz) {
// Reuse this instance, makes calling toArray easier
EMPTY_E_ARRAY = (E[]) Array.newInstance(clazz, 0);
}
public boolean isDone() {
return isDone;
}
public void setIsDone() {
synchronized (changeMonitor) {
isDone = true;
changeMonitor.notifyAll();
}
}
public int size() {
return underlyingQueue.size();
}
public boolean isEmpty() {
return underlyingQueue.isEmpty();
}
public boolean contains(E o) {
return underlyingQueue.contains(o);
}
public Iterator<E> iterator() {
return underlyingQueue.iterator();
}
public E[] toArray() {
// The array we create is too small on purpose, the underlying
// queue will extend it as needed under a lock
return underlyingQueue.toArray(EMPTY_E_ARRAY);
}
public boolean add(E o) {
boolean retval;
synchronized (changeMonitor) {
retval = underlyingQueue.add(o);
if (retval)
changeMonitor.notifyAll();
}
return retval;
}
public boolean addAll(Collection<? extends E> c) {
boolean retval;
synchronized (changeMonitor) {
retval = underlyingQueue.addAll(c);
if (retval)
changeMonitor.notifyAll();
}
return retval;
}
public void remove(RemovalResponse<E> responseWrapper) throws InterruptedException {
synchronized (changeMonitor) {
// If there's nothing in the queue but it has not
// ended yet, wait for someone to add something.
if (isEmpty() && !isDone())
changeMonitor.wait();
// When we get here, we've been notified or
// the current underlying queue's state is already something
// we can respond about.
if (!isEmpty()) {
responseWrapper.type = ResponseType.ITEM;
responseWrapper.item = underlyingQueue.remove();
} else if (isDone()) {
responseWrapper.type = ResponseType.IS_DONE;
responseWrapper.item = null;
} else {
// This should not happen
throw new IllegalStateException(
"Unexpected state where a notification of change was made but " +
"nothing is in the queue and work is not done.");
}
}
}
public static class RemovalResponse<E> {
public enum ResponseType {
/**
* Used when the response contains the first item of the queue.
*/
ITEM,
/**
* Used when the work load is done and nothing new will arrive.
*/
IS_DONE
};
private ResponseType type;
private E item;
public ResponseType getType() {
return type;
}
public void setType(ResponseType type) {
this.type = type;
}
public E getItem() {
return item;
}
public void setItem(E item) {
this.item = item;
}
}
}
Tôi đã thêm giải pháp được đề xuất bên dưới nếu bất kỳ ai quan tâm để xem xét giải pháp đó. Có đúng không khi thêm nó làm câu trả lời? – Yon