2012-03-28 35 views
9

Tôi đã rối tung xung quanh với các chiến lược khác nhau cho việc tạo nhóm bằng cách sử dụng ThreadPoolExecutor với JDK6. Tôi có một hàng đợi ưu tiên làm việc nhưng không chắc chắn nếu tôi thích cách hồ bơi không kích thước sau khi keepAliveTime (những gì bạn nhận được với một hàng đợi không bị chặn). Vì vậy, tôi đang xem xét một ThreadPoolExecutor bằng cách sử dụng một LinkedBlockingQueue và chính sách CallerRuns.Tại sao ThreadPoolExecutor giảm các chủ đề bên dưới corePoolSize sau keepAliveTime?

Vấn đề tôi gặp phải bây giờ là các hồ bơi dốc lên, vì các tài liệu giải thích rằng nó nên, nhưng sau khi hoàn thành nhiệm vụ và keepAliveTime đi vào getPoolSize cho thấy hồ bơi bị giảm xuống không. Đoạn mã ví dụ dưới đây sẽ cho phép bạn xem cơ sở cho câu hỏi của tôi:

public class ThreadPoolingDemo { 
    private final static Logger LOGGER = 
     Logger.getLogger(ThreadPoolingDemo.class.getName()); 

    public static void main(String[] args) throws Exception { 
     LOGGER.info("MAIN THREAD:starting"); 
     runCallerTestPlain(); 
    } 

    private static void runCallerTestPlain() throws InterruptedException { 
     //10 core threads, 
     //50 max pool size, 
     //100 tasks in queue, 
     //at max pool and full queue - caller runs task 
     ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 50, 
      5L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100), 
      new ThreadPoolExecutor.CallerRunsPolicy()); 

     //dump 5000 tasks on the queue 
     for (int i = 0; i < 5000; i++) { 
      tpe.submit(new Runnable() { 
       @Override 
       public void run() { 
        //just to eat some time and give a little feedback 
        for (int j = 0; j < 20; j++) { 
         LOGGER.info("First-batch Task, looping:" + j + "[" 
           + Thread.currentThread().getId() + "]"); 
        } 
       } 
      }, null); 
     } 
     LOGGER.info("MAIN THREAD:!!Done queueing!!"); 

     //check tpe statistics forever 
     while (true) { 
      LOGGER.info("Active count: " + tpe.getActiveCount() + " Pool size: " 
       + tpe.getPoolSize() + " Largest Pool: " + tpe.getLargestPoolSize()); 
      Thread.sleep(1000); 
     } 
    } 
} 

Tôi tìm thấy một lỗi cũ mà dường như là vấn đề này nhưng nó đã được đóng cửa: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6458662. Điều này có thể vẫn còn hiện diện trong 1,6 hoặc tôi thiếu cái gì?

Dường như tôi Rubber Ducked cái này (http://www.codinghorror.com/blog/2012/03/rubber-duck-problem-solving.html). Lỗi tôi liên kết ở trên có liên quan đến lỗi này: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6576792, trong đó vấn đề dường như được giải quyết trong 1,7 (tôi tải lên 1,7 và được xác minh - đã sửa ...). Tôi đoán vấn đề chính của tôi là một lỗi cơ bản này vẫn tồn tại trong gần một thập kỷ. Tôi đã dành quá nhiều thời gian để viết bài này để không đăng nó ngay bây giờ, hy vọng nó sẽ giúp ai đó.

+0

+1 Tìm kiếm thú vị, đã rất ngạc nhiên khi thấy hành vi này. –

+1

Có lẽ sẽ tốt hơn khi cấu trúc bài đăng của bạn dưới dạng câu hỏi và sau đó cung cấp những gì bạn đã học được dưới dạng câu trả lời? –

Trả lời

6

... sau khi hoàn thành nhiệm vụ và keepAliveTime đi vào hoạt động getPoolSize cho thấy hồ bơi bị giảm xuống không.

Vì vậy, đây có vẻ là điều kiện chạy đua trong số ThreadPoolExecutor. Tôi đoán nó đang làm việc theo thiết kế mặc dù không mong đợi. Trong getTask() phương pháp mà đề người lao động lặp qua để có được nhiệm vụ từ hàng đợi chặn, bạn sẽ thấy mã này:

if (state == SHUTDOWN) // Help drain queue 
    r = workQueue.poll(); 
else if (poolSize > corePoolSize || allowCoreThreadTimeOut) 
    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); 
else 
    r = workQueue.take(); 
if (r != null) 
    return r; 
if (workerCanExit()) { 
    if (runState >= SHUTDOWN) // Wake up others 
     interruptIdleWorkers(); 
    return null; 
} 

Nếu poolSize mọc trên corePoolSize sau đó nếu các lần thăm dò ra sau keepAliveTime, mã rơi xuống đến workerCanExit() từ rnull. Tất cả các chủ đề có thể trở lại true từ phương pháp đó vì nó chỉ được kiểm tra nhà nước của poolSize:

mainLock.lock(); 
    boolean canExit; 
    try { 
     canExit = runState >= STOP || 
      workQueue.isEmpty() || 
      (allowCoreThreadTimeOut && 
      poolSize > Math.max(1, corePoolSize)); << test poolSize here 
    } finally { 
     mainLock.unlock();       << race to workerDone() begins 
    } 

Khi đã trả true sau đó thoát ra sợi nhân và sau đó các poolSize được giảm đi. Nếu tất cả các chủ đề công nhân thực hiện kiểm tra đó cùng một lúc thì tất cả chúng sẽ xuất phát vì cuộc đua giữa thử nghiệm của poolSize và sự dừng của nhân viên khi xảy ra --poolSize.

Điều làm tôi ngạc nhiên là mức độ phù hợp của điều kiện chủng tộc đó. Nếu bạn thêm một số ngẫu nhiên vào sleep() bên trong của run() dưới đây thì bạn có thể nhận được một số chủ đề cốt lõi để không bỏ thuốc lá nhưng tôi đã có thể nghĩ rằng điều kiện chủng tộc sẽ có khó khăn hơn để đạt.


Bạn có thể thấy hành vi này trong các thử nghiệm sau đây:

@Test 
public void test() throws Exception { 
    int before = Thread.activeCount(); 
    int core = 10; 
    int max = 50; 
    int queueSize = 100; 
    ThreadPoolExecutor tpe = 
      new ThreadPoolExecutor(core, max, 1L, TimeUnit.SECONDS, 
        new LinkedBlockingQueue<Runnable>(queueSize), 
        new ThreadPoolExecutor.CallerRunsPolicy()); 
    tpe.allowCoreThreadTimeOut(false); 
    assertEquals(0, tpe.getActiveCount()); 
    // if we start 1 more than can go into core or queue, poolSize goes to 0 
    int startN = core + queueSize + 1; 
    // if we only start jobs the core can take care of, then it won't go to 0 
    // int startN = core + queueSize; 
    for (int i = 0; i < startN; i++) { 
     tpe.submit(new Runnable() { 
      @Override 
      public void run() { 
       try { 
        Thread.sleep(100); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 
     }); 
    } 
    while (true) { 
     System.out.println("active = " + tpe.getActiveCount() + ", poolSize = " + tpe.getPoolSize() 
       + ", largest = " + tpe.getLargestPoolSize() + ", threads = " + (Thread.activeCount() - before)); 
     Thread.sleep(1000); 
    } 
} 

Nếu bạn thay đổi sleep dòng bên trong của phương pháp run() một cái gì đó như thế này:

private final Random random = new Random(); 
... 
    Thread.sleep(100 + random.nextInt(100)); 

Điều này sẽ làm các điều kiện chủng tộc khó khăn hơn để nhấn vì vậy một số chủ đề cốt lõi vẫn sẽ được xung quanh.

+0

OP quan tâm nhiều hơn đến 'getPoolSize()' và không phải 'getActiveCount()' Sau khi hoàn thành tất cả các nhiệm vụ và keepAliveTime hết hạn tất cả các luồng làm trong thực tế chấm dứt mặc dù TPE không. –

+0

@Gray thông số TPE của tôi rất quan trọng. Điều này không phải lúc nào cũng thất bại. Sử dụng các tham số bạn đã sử dụng, chúng ta sẽ thấy nó hoạt động như mong đợi. Các thông số tôi đã chỉ định chơi với các quy tắc của TPE theo cách độc đáo nhưng cũng thực tế (nghĩa là công việc hàng loạt lớn được bán phá giá trên hàng đợi). Vấn đề chính với các thông số của bạn là bạn không bao giờ kích hoạt tình huống khiến cho các luồng được thêm vào nhóm bên trên và vượt quá kích thước lõi. Để vượt quá/thêm ngoài kích thước lõi, nhóm phải ở kích thước lõi (10) và hàng đợi phải vượt quá giới hạn (100). Hãy thử ví dụ của bạn với 'for (int i = 0; i andematt

+0

@andematt Hành trình thú vị. Đây là một điều kiện chủng tộc trong 'ThreadPoolExecutor'. Tôi đã chỉnh sửa câu trả lời của mình. – Gray