2011-10-11 1 views
34

Tôi có tệp .csv chứa hơn 70 triệu dòng trong đó mỗi dòng là tạo ra một Runnable và sau đó được thực hiện bởi luồng. Runnable này sẽ chèn một bản ghi vào Mysql.Làm thế nào có thể sử dụng lại một threadpool sau khi tắt

Hơn nữa, tôi muốn ghi lại vị trí của tệp csv cho số RandomAccessFile để định vị. Vị trí được ghi vào File. Tôi muốn viết bản ghi này khi tất cả các chủ đề trong threadpool được hoàn thành.So ThreadPoolExecutor.shutdown() được gọi. Nhưng khi có nhiều dòng hơn, tôi cần một lần nữa. Làm thế nào tôi có thể tái sử dụng threadpool hiện tại này thay vì làm cho một cái mới.

Mã này là như sau:

public static boolean processPage() throws Exception { 

    long pos = getPosition(); 
    long start = System.currentTimeMillis(); 
    raf.seek(pos); 
    if(pos==0) 
     raf.readLine(); 
    for (int i = 0; i < PAGESIZE; i++) { 
     String lineStr = raf.readLine(); 
     if (lineStr == null) 
      return false; 
     String[] line = lineStr.split(","); 
     final ExperienceLogDO log = CsvExperienceLog.generateLog(line); 
     //System.out.println("userId: "+log.getUserId()%512); 

     pool.execute(new Runnable(){ 
      public void run(){ 
       try { 
        experienceService.insertExperienceLog(log); 
       } catch (BaseException e) { 
        e.printStackTrace(); 
       } 
      } 
     }); 

     long end = System.currentTimeMillis(); 
    } 

    BufferedWriter resultWriter = new BufferedWriter(
      new OutputStreamWriter(new FileOutputStream(new File(
        RESULT_FILENAME), true))); 
    resultWriter.write("\n"); 
    resultWriter.write(String.valueOf(raf.getFilePointer())); 
    resultWriter.close(); 
    long time = System.currentTimeMillis()-start; 
    System.out.println(time); 
    return true; 
} 

Cảm ơn!

Trả lời

31

Như đã nêu trong documentation, bạn không thể tái sử dụng một ExecutorService đã được đóng cửa. Tôi khuyên bạn nên chống lại bất kỳ giải pháp nào , vì (a) chúng có thể không hoạt động như mong đợi trong mọi tình huống; và (b) bạn có thể đạt được những gì bạn muốn bằng cách sử dụng các lớp tiêu chuẩn.

Bạn phải hoặc

  1. thuyết minh mới ExecutorService; hoặc

  2. không chấm dứt ExecutorService.

Giải pháp đầu tiên được triển khai dễ dàng, vì vậy tôi sẽ không trình bày chi tiết.

Thứ hai, vì bạn muốn thực hiện một hành động sau khi tất cả các tác vụ đã gửi hoàn tất, bạn có thể xem ExecutorCompletionService và sử dụng nó thay thế. Nó kết thúc tốt đẹp một ExecutorService mà sẽ làm công tác quản lý luồng, nhưng Runnables sẽ được bọc vào cái gì đó sẽ nói với ExecutorCompletionService khi họ đã hoàn thành, vì vậy nó có thể báo cáo lại cho bạn:

ExecutorService executor = ...; 
ExecutorCompletionService ecs = new ExecutorCompletionService(executor); 

for (int i = 0; i < totalTasks; i++) { 
    ... ecs.submit(...); ... 
} 

for (int i = 0; i < totalTasks; i++) { 
    ecs.take(); 
} 

Phương pháp take() trên ExecutorCompletionService lớp sẽ chặn cho đến khi một tác vụ đã hoàn thành (bình thường hoặc đột ngột). Nó sẽ trả về một số Future, vì vậy bạn có thể kiểm tra kết quả nếu muốn.

Tôi hy vọng điều này có thể giúp bạn, vì tôi đã không hoàn toàn hiểu vấn đề của bạn.

+0

làm cách nào để buộc dừng chuỗi hoặc hủy tất cả tác vụ? sử dụng exectuer.shutdownNow()? nó sẽ làm việc tốt ngay cả với wrapper này? –

3

tạo và nhóm tất cả các nhiệm vụ và nộp cho bên hồ bơi với invokeAll (mà chỉ trả lại khi tất cả các nhiệm vụ được hoàn thành)

+0

Cách tiếp cận tốt hơn để sử dụng 'ExecutorCompletionService' hoặc' invokeAll' là gì? –

1

Sau khi gọi tắt máy trên ExecutorService, no new Task will be accepted. Điều này bạn phải tạo một ExecutorService mới cho mỗi vòng nhiệm vụ.

Tuy nhiên, với Java 8 ForkJoinPool.awaitQuiescence đã được giới thiệu. Nếu bạn có thể chuyển từ một ExecutorService bình thường sang ForkJoinPool, bạn có thể sử dụng phương thức này để chờ cho đến khi không có thêm nhiệm vụ nào đang chạy trong ForkJoinPool mà không cần phải gọi tắt máy. Điều này có nghĩa là bạn có thể điền vào một ForkJoinPool with Tasks, đợi cho đến khi nó trống (bị ngừng), và sau đó lại bắt đầu điền vào nó với Tasks, và cứ thế.