13

Tôi có đoạn mã sau đây về cơ bản quét qua danh sách nhiệm vụ cần được thực thi và mỗi tác vụ sau đó được trao cho người thi hành để thực thi.Xử lý ngoại lệ cho ThreadPoolExecutor

Lần lượt JobExecutor tạo một trình xử lý khác (để thực hiện công cụ db ... đọc và ghi dữ liệu vào hàng đợi) và hoàn thành tác vụ.

JobExecutor trả về Future<Boolean> cho các tác vụ được gửi. Khi một trong những nhiệm vụ thất bại, tôi muốn gracefully ngắt tất cả các chủ đề và tắt máy thực thi bằng cách bắt tất cả các trường hợp ngoại lệ. Tôi cần phải thay đổi những gì?

public class DataMovingClass { 
    private static final AtomicInteger uniqueId = new AtomicInteger(0); 

    private static final ThreadLocal<Integer> uniqueNumber = new IDGenerator(); 

    ThreadPoolExecutor threadPoolExecutor = null ; 

    private List<Source> sources = new ArrayList<Source>(); 

    private static class IDGenerator extends ThreadLocal<Integer> { 
     @Override 
     public Integer get() { 
      return uniqueId.incrementAndGet(); 
     } 
    } 

    public void init(){ 

    // load sources list 

    } 

    public boolean execute() { 

    boolean succcess = true ; 
    threadPoolExecutor = new ThreadPoolExecutor(10,10, 
       10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024), 
       new ThreadFactory() { 
        public Thread newThread(Runnable r) { 
         Thread t = new Thread(r); 
         t.setName("DataMigration-" + uniqueNumber.get()); 
         return t; 
        }// End method 
       }, new ThreadPoolExecutor.CallerRunsPolicy()); 

    List<Future<Boolean>> result = new ArrayList<Future<Boolean>>(); 

    for (Source source : sources) { 
        result.add(threadPoolExecutor.submit(new JobExecutor(source))); 
    } 

    for (Future<Boolean> jobDone : result) { 
       try { 
        if (!jobDone.get(100000, TimeUnit.SECONDS) && success) { 
         // in case of successful DbWriterClass, we don't need to change 
         // it. 
         success = false; 
        } 
       } catch (Exception ex) { 
        // handle exceptions 
       } 
      } 

    } 

    public class JobExecutor implements Callable<Boolean> { 

     private ThreadPoolExecutor threadPoolExecutor ; 
     Source jobSource ; 
     public SourceJobExecutor(Source source) { 
      this.jobSource = source; 
      threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024), 
        new ThreadFactory() { 
         public Thread newThread(Runnable r) { 
          Thread t = new Thread(r); 
          t.setName("Job Executor-" + uniqueNumber.get()); 
          return t; 
         }// End method 
        }, new ThreadPoolExecutor.CallerRunsPolicy()); 
     } 

     public Boolean call() throws Exception { 
      boolean status = true ; 
      System.out.println("Starting Job = " + jobSource.getName()); 
      try { 

         // do the specified task ; 


      }catch (InterruptedException intrEx) { 
       logger.warn("InterruptedException", intrEx); 
       status = false ; 
      } catch(Exception e) { 
       logger.fatal("Exception occurred while executing task "+jobSource.getName(),e); 
       status = false ; 
      } 
      System.out.println("Ending Job = " + jobSource.getName()); 
      return status ; 
     } 
    } 
} 

Trả lời

14

Khi bạn gửi tác vụ cho người thực thi, nó sẽ trả lại cho bạn một phiên bản FutureTask.

FutureTask.get() sẽ tái ném bất kỳ ngoại lệ nào được thực hiện bởi nhiệm vụ dưới dạng ExecutorException.

Vì vậy, khi bạn lặp lại thông qua List<Future> và gọi nhận trên mỗi máy, hãy bắt ExecutorException và gọi tắt máy theo thứ tự.

+0

ok .. bạn có thấy bất kỳ sai sót hoặc địa điểm nào khác mà tôi cần xử lý ngoại lệ không? – jagamot

1

Phân lớp ThreadPoolExecutor và ghi đè phương thức protected afterExecute (Runnable r, Throwable t) của nó.

Nếu bạn đang tạo một nhóm chủ đề thông qua lớp học tiện lợi java.util.concurrent.Executors (mà bạn không có), hãy xem nguồn của nó để xem cách nó gọi ThreadPoolExecutor.

0

Vì bạn đang gửi tác vụ đến ThreadPoolExecutor, ngoại lệ sẽ bị nuốt bởi FutureTask.

Có xem xét này code

**Inside FutureTask$Sync** 

void innerRun() { 
    if (!compareAndSetState(READY, RUNNING)) 
     return; 

    runner = Thread.currentThread(); 
    if (getState() == RUNNING) { // recheck after setting thread 
     V result; 
     try { 
      result = callable.call(); 
     } catch (Throwable ex) { 
      setException(ex); 
      return; 
     } 
     set(result); 
    } else { 
     releaseShared(0); // cancel 
    } 

}

protected void setException(Throwable t) { 
    sync.innerSetException(t); 
} 

Từ trên mã, rõ ràng là setException phương pháp đánh bắt Throwable.Vì lý do này, FutureTask được nuốt tất cả các trường hợp ngoại lệ nếu bạn sử dụng "submit()" phương pháp trên ThreadPoolExecutor

Theo java documentation, bạn có thể mở rộng afterExecute() phương pháp trong ThreadPoolExecutor

protected void afterExecute(Runnable r, 
          Throwable t) 

Mẫu mã theo tài liệu:

class ExtendedExecutor extends ThreadPoolExecutor { 
    // ... 
    protected void afterExecute(Runnable r, Throwable t) { 
    super.afterExecute(r, t); 
    if (t == null && r instanceof Future<?>) { 
     try { 
     Object result = ((Future<?>) r).get(); 
     } catch (CancellationException ce) { 
      t = ce; 
     } catch (ExecutionException ee) { 
      t = ee.getCause(); 
     } catch (InterruptedException ie) { 
      Thread.currentThread().interrupt(); // ignore/reset 
     } 
    } 
    if (t != null) 
     System.out.println(t); 
    } 
} 

Bạn có thể bắt Exceptions theo ba cách

  1. Future.get() như đề xuất trong câu trả lời được chấp nhận
  2. bọc toàn bộ run() hoặc call() phương pháp trong try{}catch{}Exceptoion{} khối
  3. override afterExecute của ThreadPoolExecutor phương pháp như trên

Để duyên dáng ngắt Chủ đề khác, có một cái nhìn tại bên dưới Câu hỏi SE:

How to stop next thread from running in a ScheduledThreadPoolExecutor

How to forcefully shutdown java ExecutorService