2012-03-03 8 views
5

Tôi có một tình huống của một nhà sản xuất và một người tiêu dùng duy nhất làm việc với hàng đợi các đối tượng. Có hai trường hợp khi hàng đợi có thể trống:Java LinkedBlockingQueue có khả năng báo hiệu khi hoàn thành?

  1. Người tiêu dùng xử lý các đối tượng nhanh hơn nhà sản xuất có khả năng tạo đối tượng mới (nhà sản xuất sử dụng I/O trước khi tạo đối tượng).
  2. Nhà sản xuất đã tạo xong các đối tượng.

Nếu hàng đợi trống, tôi muốn người tiêu dùng đợi cho đến khi có đối tượng mới hoặc cho đến khi nhà sản xuất báo hiệu là xong.

Nghiên cứu của tôi cho đến nay không có nơi nào vì tôi vẫn kết thúc bằng một vòng lặp kiểm tra cả hàng đợi và cờ boolean riêng biệt (isDone). Cho rằng không có cách nào để chờ đợi trên nhiều ổ khóa (suy nghĩ của chờ đợi trên hàng đợi VÀ cờ), những gì có thể được thực hiện để giải quyết điều này?

+0

Tôi đã thêm giải pháp được đề xuất bên dưới nếu bất kỳ ai quan tâm để xem xét giải pháp đó. Có đúng không khi thêm nó làm câu trả lời? – Yon

Trả lời

2

Trước hết, gợi ý rằng việc sử dụng một wrapper là "quá nhiều chi phí" là một đoán, và IMO là một trong rất xấu. Giả thiết này phải là được đo với thử nghiệm hiệu suất với các yêu cầu thực tế. Nếu và chỉ khi thử nghiệm không thành công, thì hãy xác minh bằng cách sử dụng một trình bao gồm đối tượng hàng đợi là lý do tại sao. Nếu bạn làm điều đó và bao bọc đối tượng hàng đợi (trong trường hợp này là String) thực sự là nguyên nhân của hiệu năng không được chấp nhận, thì bạn có thể sử dụng kỹ thuật này: tạo một chuỗi đã biết, duy nhất để phục vụ như là một "kết thúc của tin nhắn." " thông điệp.

public static final String NO_MORE_MESSAGES = UUID.randomUUID().toString(); 

Sau đó, khi truy xuất chuỗi từ hàng đợi, chỉ cần kiểm tra (có thể là kiểm tra tham chiếu) nếu chuỗi là NO_MORE_MESSAGES. Nếu có, thì bạn đã xử lý xong.

+0

Tôi cũng đã trả lời @esaj về điều này. Giả sử chúng ta có một String đủ độc đáo, không phải chi phí bổ sung của việc chạy "bằng" trên tất cả các chuỗi (hàng ngàn mỗi hàng đợi, hàng trăm hàng đợi một phút) có phải là một sự lãng phí không? – Yon

+1

Lưu ý câu trả lời - vì đây là trường cuối cùng tĩnh, bạn có thể thực hiện kiểm tra bình đẳng tham chiếu (==, không phải bằng .equals()) cho chuỗi đặc biệt. –

+0

Điểm tốt, tôi đã bỏ lỡ điều đó! – Yon

1

Đơn giản. Xác định một đối tượng đặc biệt mà nhà sản xuất có thể gửi để báo hiệu "đã hoàn thành".

+0

Trong hầu hết các trường hợp, hàng đợi là của Strings. Chúng tôi không muốn bọc đối tượng String bởi vì nó sẽ tạo ra một chi phí bổ sung. – Yon

+0

Sau đó, sử dụng giá trị chuỗi không thể xảy ra trong dữ liệu thông thường của bạn. Làm thế nào về một chuỗi không dài? –

1

Một tùy chọn là bao bọc dữ liệu của bạn trong một đối tượng giữ, có thể được sử dụng để báo hiệu kết thúc xử lý.

Ví dụ:

public class QueueMessage { 
public MessageType type; 
public Object thingToWorkOn; 
} 

nơi MessageType là một enum xác định một "công việc" tin nhắn hoặc một thông báo "tắt máy".

+0

Đây là một chi phí khá lớn, đặc biệt nếu nó là hàng đợi của hàng nghìn chuỗi, bạn có nghĩ vậy không? – Yon

+0

@Yon: Không hẳn. –

1

Bạn có thể sử dụng LinkedBlockingQueuespoll(long timeout, TimeUnit unit) -method trong người tiêu dùng và nếu nó trả về null (hết thời gian chờ), hãy kiểm tra cờ boolean. Một cách khác sẽ đi qua một số đặc biệt "EndOfWork" -object vào hàng đợi như là người cuối cùng, vì vậy người tiêu dùng biết rằng đó là kết thúc công việc.

Tuy nhiên, một cách khác sẽ làm gián đoạn chuỗi tiêu thụ từ chuỗi nhà sản xuất, nhưng điều này sẽ yêu cầu chuỗi nhà sản xuất phải nhận thức được người tiêu dùng. Nếu cả hai sẽ được thực hiện như các lớp lồng nhau, bạn có thể sử dụng lớp cha để giữ giá trị chạy boolean, cả hai đều có thể truy cập và chấm dứt cả hai luồng với boolean đơn.

+0

Cuộc thăm dò với bộ hẹn giờ là điều tôi nghĩ nhưng vẫn có vẻ khá lãng phí. Việc sử dụng một đối tượng đặc biệt có vấn đề ở đây vì hàng đợi là bình thường của Strings và chúng tôi không muốn bọc chúng (gây ra phí bổ sung). – Yon

+0

Bạn có thể sử dụng một số Chuỗi ký tự đặc biệt để biểu thị kết thúc công việc và yêu cầu nhà sản xuất viết nó làm Chuỗi cuối cùng trong hàng đợi không? Một cái gì đó ngắn mà sẽ không bao giờ xuất hiện trong các chuỗi công việc bình thường, sau đó chỉ cần sử dụng equals() để xem liệu chuỗi công việc có chính xác không, và nếu như vậy, chấm dứt người tiêu dùng. – esaj

+0

Nội dung của các chuỗi rất năng động. Nếu chúng ta muốn tạo một chuỗi như vậy, nó có lẽ sẽ khá dài và sử dụng các ký tự không chuẩn, mà sẽ đặt một tải CPU khi chạy bằng rất nhiều lần. – Yon

1

Các tùy chọn sau đây đã được nâng lên quá (không chắc chắn nếu điều này phải ở trong một câu trả lời cho bản thân mình nhưng không thể tìm thấy một nơi tốt hơn để viết những dòng này):

Tạo một wrapper cho hàng đợi. Trình bao bọc này sẽ có một màn hình sẽ được chờ đợi khi đọc bởi người tiêu dùng và sẽ được nhà sản xuất thông báo bất cứ khi nào một đối tượng mới được thêm vào hoặc cờ của isDone được nâng lên.

Khi người tiêu dùng đọc các đối tượng từ hàng đợi, các đối tượng này sẽ được bao bọc bằng thứ gì đó tương tự như những gì @ yann-ramin đề xuất ở trên. Tuy nhiên, để giảm chi phí, người tiêu dùng sẽ cung cấp một trường hợp duy nhất, có thể tái sử dụng, của QueueMessage khi mọi cuộc gọi đọc (nó sẽ luôn luôn là cùng một ví dụ). Trình bao bọc hàng đợi sẽ cập nhật các trường tương ứng trước khi trả về cá thể cho người tiêu dùng.

Điều này tránh bất kỳ việc sử dụng hết thời gian chờ, ngủ, v.v.

EDITED Đây là một thực hiện đề xuất:

/** 
* This work queue is designed to be used by ONE producer and ONE consumer 
* (no more, no less of neither). The work queue has certain added features, such 
* as the ability to signal that the workload generation is done and nothing will be 
* added to the queue. 
* 
* @param <E> 
*/ 
public class DefiniteWorkQueue<E> { 
    private final E[] EMPTY_E_ARRAY; 
    private LinkedBlockingQueue<E> underlyingQueue = new LinkedBlockingQueue<E>(); 
    private boolean isDone = false; 

    // This monitor allows for flagging when a change was done. 
    private Object changeMonitor = new Object(); 

    public DefiniteWorkQueue(Class<E> clazz) { 
     // Reuse this instance, makes calling toArray easier 
     EMPTY_E_ARRAY = (E[]) Array.newInstance(clazz, 0); 
    } 

    public boolean isDone() { 
     return isDone; 
    } 

    public void setIsDone() { 
     synchronized (changeMonitor) { 
      isDone = true; 
      changeMonitor.notifyAll(); 
     } 
    } 

    public int size() { 
     return underlyingQueue.size(); 
    } 

    public boolean isEmpty() { 
     return underlyingQueue.isEmpty(); 
    } 

    public boolean contains(E o) { 
     return underlyingQueue.contains(o); 
    } 

    public Iterator<E> iterator() { 
     return underlyingQueue.iterator(); 
    } 

    public E[] toArray() { 
     // The array we create is too small on purpose, the underlying 
     // queue will extend it as needed under a lock 
     return underlyingQueue.toArray(EMPTY_E_ARRAY); 
    } 

    public boolean add(E o) { 
     boolean retval; 
     synchronized (changeMonitor) { 
      retval = underlyingQueue.add(o); 
      if (retval) 
       changeMonitor.notifyAll(); 
     } 
     return retval; 
    } 

    public boolean addAll(Collection<? extends E> c) { 
     boolean retval; 
     synchronized (changeMonitor) { 
      retval = underlyingQueue.addAll(c); 
      if (retval) 
       changeMonitor.notifyAll(); 
     } 
     return retval; 
    } 

    public void remove(RemovalResponse<E> responseWrapper) throws InterruptedException { 
     synchronized (changeMonitor) { 
      // If there's nothing in the queue but it has not 
      // ended yet, wait for someone to add something. 
      if (isEmpty() && !isDone()) 
       changeMonitor.wait(); 

      // When we get here, we've been notified or 
      // the current underlying queue's state is already something 
      // we can respond about. 
      if (!isEmpty()) { 
       responseWrapper.type = ResponseType.ITEM; 
       responseWrapper.item = underlyingQueue.remove(); 
      } else if (isDone()) { 
       responseWrapper.type = ResponseType.IS_DONE; 
       responseWrapper.item = null; 
      } else { 
       // This should not happen 
       throw new IllegalStateException(
        "Unexpected state where a notification of change was  made but " + 
         "nothing is in the queue and work is not  done."); 
      } 
     } 
    } 

    public static class RemovalResponse<E> { 
     public enum ResponseType { 
      /** 
      * Used when the response contains the first item of the queue. 
      */ 
      ITEM, 

      /** 
      * Used when the work load is done and nothing new will arrive. 
      */ 
      IS_DONE 
     }; 

     private ResponseType type; 
     private E item; 

     public ResponseType getType() { 
      return type; 
     } 

     public void setType(ResponseType type) { 
      this.type = type; 
     } 

     public E getItem() { 
      return item; 
     } 

     public void setItem(E item) { 
      this.item = item; 
     } 

    } 
}