2010-07-15 14 views
8

Có một số lượng lớn công việc. Mỗi tác vụ thuộc về một nhóm duy nhất. Yêu cầu là mỗi nhóm nhiệm vụ nên được thực thi serially giống như được thực thi trong một luồng đơn và thông lượng phải được tối đa hóa trong một môi trường đa lõi (hoặc đa CPU). Lưu ý: cũng có một số lượng lớn các nhóm tỷ lệ thuận với số lượng nhiệm vụ.Tôi nên sử dụng ThreadPool nào trong Java?

Giải pháp ngây thơ đang sử dụng ThreadPoolExecutor và đồng bộ hóa (hoặc khóa). Tuy nhiên, các luồng sẽ chặn lẫn nhau và thông lượng không được tối đa hóa.

Bạn có ý tưởng nào tốt hơn không? Hoặc là có tồn tại một thư viện của bên thứ ba đáp ứng các yêu cầu?

+2

"Tuy nhiên, chủ đề sẽ chặn lẫn nhau và thông không được tối đa hóa.". Bạn có nghĩa là các nhiệm vụ cá nhân đang truy cập vào một cấu trúc dữ liệu được chia sẻ hoặc tài nguyên và đây là nguyên nhân gây tranh cãi? – Adamski

+0

Bạn có biết trước tất cả các nhiệm vụ của một nhóm không? Điều này là quan trọng khi lựa chọn một giải pháp (hàng đợi vs không có hàng đợi) –

Trả lời

3

một cách tiếp cận đơn giản sẽ được "nối" tất cả các nhiệm vụ nhóm vào một nhiệm vụ siêu, do đó làm cho tiểu nhiệm vụ chạy nối tiếp. Nhưng điều này có thể sẽ gây ra sự chậm trễ trong các nhóm khác mà sẽ không bắt đầu trừ khi một số nhóm khác hoàn toàn kết thúc và làm cho một số không gian trong hồ bơi thread.

Thay thế, hãy cân nhắc việc chuỗi các tác vụ của nhóm. Mã sau đây minh họa:

public class MultiSerialExecutor { 
    private final ExecutorService executor; 

    public MultiSerialExecutor(int maxNumThreads) { 
     executor = Executors.newFixedThreadPool(maxNumThreads); 
    } 

    public void addTaskSequence(List<Runnable> tasks) { 
     executor.execute(new TaskChain(tasks)); 
    } 

    private void shutdown() { 
     executor.shutdown(); 
    } 

    private class TaskChain implements Runnable { 
     private List<Runnable> seq; 
     private int ind; 

     public TaskChain(List<Runnable> seq) { 
      this.seq = seq; 
     } 

     @Override 
     public void run() { 
      seq.get(ind++).run(); //NOTE: No special error handling 
      if (ind < seq.size()) 
       executor.execute(this); 
     }  
    } 

Lợi thế là không có thêm tài nguyên (thread/queue) đang được sử dụng và độ chi tiết của các tác vụ tốt hơn so với cách tiếp cận ngây thơ. Bất lợi là tất cả các nhiệm vụ của nhóm nên được biết trước.

--edit--

Để thực hiện giải pháp này chung chung và đầy đủ, bạn có thể muốn để quyết định về xử lý lỗi (tức là liệu một chuỗi tiếp tục ngay cả khi một lỗi occures), và cũng có thể nó sẽ là một ý tưởng tốt để triển khai ExecutorService và ủy nhiệm tất cả các cuộc gọi đến trình thực thi bên dưới.

+0

Giải pháp thông minh! +1 –

+0

Tôi thích giải pháp này. – James

+0

Có lẽ chúng ta cũng nên thêm một bản đồ để chúng ta có thể tìm thấy TaskChain của một Task đã được chỉ định và thêm nó vào TaskChain của nó. – James

2

tôi sẽ đề nghị để sử dụng hàng đợi công việc:

  • Đối với mỗi nhóm nhiệm vụ Bạn có tạo ra một hàng đợi và chèn tất cả các nhiệm vụ từ nhóm đó vào nó.
  • Bây giờ tất cả các hàng đợi của bạn có thể được thực hiện song song trong khi các tác vụ bên trong một hàng đợi được thực thi theo kiểu serially.

Tìm kiếm nhanh trên google gợi ý rằng java api không có hàng đợi nhiệm vụ/chủ đề. Tuy nhiên có rất nhiều hướng dẫn có sẵn trên mã hóa một. Mọi người đều cảm thấy thoải mái khi liệt kê các hướng dẫn/triển khai tốt nếu bạn biết một số:

+0

Cảm ơn Dave. Nếu có một số lượng lớn các nhóm, thì số lượng chuỗi sẽ đạt đến giới hạn. – James

+0

@James Không nhất thiết. Chỉ vì bạn có n nhóm không có nghĩa là bạn cần tạo n luồng để thực thi chúng. Chỉ cần tạo ra bao nhiêu chủ đề như bạn nghĩ là phù hợp và họ sẽ chăm sóc hàng đợi hoặc trong một vòng robin thời trang hoặc serially. –

1

Tôi chủ yếu đồng ý với câu trả lời của Dave, nhưng nếu bạn cần cắt thời gian CPU trên tất cả các nhóm, nghĩa là tất cả các nhóm nhiệm vụ sẽ tiến hành song song, bạn có thể tìm loại cấu trúc hữu ích (. sử dụng loại bỏ như "khóa" này làm việc tốt trong trường hợp của tôi mặc dù tôi tưởng tượng nó có xu hướng sử dụng nhiều bộ nhớ hơn):

class TaskAllocator { 
    private final ConcurrentLinkedQueue<Queue<Runnable>> entireWork 
     = childQueuePerTaskGroup(); 

    public Queue<Runnable> lockTaskGroup(){ 
     return entireWork.poll(); 
    } 

    public void release(Queue<Runnable> taskGroup){ 
     entireWork.offer(taskGroup); 
    } 
} 

class DoWork implmements Runnable { 
    private final TaskAllocator allocator; 

    public DoWork(TaskAllocator allocator){ 
     this.allocator = allocator; 
    } 

    pubic void run(){ 
     for(;;){ 
      Queue<Runnable> taskGroup = allocator.lockTaskGroup(); 
      if(task==null){ 
       //No more work 
       return; 
      } 
      Runnable work = taskGroup.poll(); 
      if(work == null){ 
       //This group is done 
       continue; 
      } 

      //Do work, but never forget to release the group to 
      // the allocator. 
      try { 
       work.run(); 
      } finally { 
       allocator.release(taskGroup); 
      } 
     }//for 
    } 
} 

Bạn có thể sau đó sử dụng số lượng chủ đề tối ưu để chạy DoWork bài tập. Đó là loại cân bằng tải round robin ..

Bạn thậm chí có thể làm điều gì đó phức tạp hơn, bằng cách sử dụng này thay vì một hàng đợi đơn giản trong TaskAllocator (nhóm nhiệm vụ với nhiều nhiệm vụ còn lại có xu hướng để thực hiện)

ConcurrentSkipListSet<MyQueue<Runnable>> sophisticatedQueue = 
    new ConcurrentSkipListSet(new SophisticatedComparator()); 

nơi SophisticatedComparator

class SophisticatedComparator implements Comparator<MyQueue<Runnable>> { 
    public int compare(MyQueue<Runnable> o1, MyQueue<Runnable> o2){ 
     int diff = o2.size() - o1.size(); 
     if(diff==0){ 
      //This is crucial. You must assign unique ids to your 
      //Subqueue and break the equality if they happen to have same size. 
      //Otherwise your queues will disappear... 
      return o1.id - o2.id; 
     } 
     return diff; 
    } 
} 
+1

Hàng đợi công việc +1 cho phép bạn sử dụng bất kỳ thuật toán lập lịch nào phù hợp với nhu cầu của bạn. –

+0

Có vẻ như bạn đang triển khai lại một nhóm luồng. Tại sao không sử dụng tiêu chuẩn ThreadPoolExecutor cộng với một số chức năng bổ sung như trong giải pháp của tôi? Giải pháp của tôi không yêu cầu hàng đợi và không đồng bộ hóa. –

+0

@Eyal: Nếu có thể tiêu thụ các nhóm tác vụ theo tuần tự, tôi đồng ý với bạn. Tuy nhiên, nếu chúng phải được tiêu thụ song song, điều này là cần thiết. –

0

Diễn viên cũng là một giải pháp khác cho loại vấn đề được chỉ định này. Scala có các diễn viên và cũng là Java, được cung cấp bởi AKKA.

-2

Tôi gặp sự cố tương tự như của bạn và tôi đã sử dụng ExecutorCompletionService hoạt động với Executor để hoàn thành các bộ sưu tập tác vụ. Dưới đây là một chiết xuất từ ​​API java.util.concurrent, vì Java7:

Giả sử bạn có một bộ giải quyết cho một vấn đề nhất định, mỗi trả lại một giá trị của một số loại quả, và muốn chạy chúng đồng thời , xử lý các kết quả của mỗi trong số chúng trả về một giá trị không null, trong một số phương thức sử dụng (Result r). Bạn có thể viết này như:

void solve(Executor e, Collection<Callable<Result>> solvers) 
     throws InterruptedException, ExecutionException { 
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e); 
    for (Callable<Result> s : solvers) 
     ecs.submit(s); 
    int n = solvers.size(); 
    for (int i = 0; i < n; ++i) { 
     Result r = ecs.take().get(); 
     if (r != null) 
      use(r); 
    } 
} 

Vì vậy, trong trường hợp của bạn, mỗi nhiệm vụ sẽ là một single Callable<Result>, và nhiệm vụ sẽ được nhóm lại trong một Collection<Callable<Result>>.

tham khảo: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html