2010-08-06 5 views
6

Tôi đang thử nghiệm với các thuật toán song song trong Java. Tôi bắt đầu với việc sắp xếp hợp nhất và đăng nỗ lực của mình vào số này question. Cố gắng sửa đổi của tôi là trong đoạn code dưới đây, nơi mà bây giờ tôi cố gắng song song sắp xếp nhanh chóng.Java: Song song sắp xếp nhanh qua đa luồng

Có bất kỳ sai lầm tân binh nào trong việc triển khai đa luồng hoặc cách tiếp cận của tôi cho vấn đề này không? Nếu không, tôi không nên mong đợi hơn một tốc độ tăng 32% giữa một tuần tự và một thuật toán song song trên một duel-core (xem timings ở dưới)?

Đây là thuật toán đa luồng:

public class ThreadedQuick extends Thread 
    { 
     final int MAX_THREADS = Runtime.getRuntime().availableProcessors(); 

     CountDownLatch doneSignal; 
     static int num_threads = 1; 

     int[] my_array; 
     int start, end; 

     public ThreadedQuick(CountDownLatch doneSignal, int[] array, int start, int end) { 
      this.my_array = array; 
      this.start = start; 
      this.end = end; 
      this.doneSignal = doneSignal; 
     } 

     public static void reset() { 
      num_threads = 1; 
     } 

     public void run() { 
      quicksort(my_array, start, end); 
      doneSignal.countDown(); 
      num_threads--; 
     } 

     public void quicksort(int[] array, int start, int end) { 
      int len = end-start+1; 

      if (len <= 1) 
       return; 

      int pivot_index = medianOfThree(array, start, end); 
      int pivotValue = array[pivot_index]; 

      swap(array, pivot_index, end); 

      int storeIndex = start; 
      for (int i = start; i < end; i++) { 
       if (array[i] <= pivotValue) { 
        swap(array, i, storeIndex); 
        storeIndex++; 
       } 
      } 

      swap(array, storeIndex, end); 

      if (num_threads < MAX_THREADS) { 
       num_threads++; 

       CountDownLatch completionSignal = new CountDownLatch(1); 

       new ThreadedQuick(completionSignal, array, start, storeIndex - 1).start(); 
       quicksort(array, storeIndex + 1, end); 

       try { 
        completionSignal.await(1000, TimeUnit.SECONDS); 
       } catch(Exception ex) { 
        ex.printStackTrace(); 
       } 
      } else { 
       quicksort(array, start, storeIndex - 1); 
       quicksort(array, storeIndex + 1, end); 
      } 
     } 
    } 

Sau đây là cách tôi bắt đầu nó đi:

ThreadedQuick.reset(); 
CountDownLatch completionSignal = new CountDownLatch(1); 
new ThreadedQuick(completionSignal, array, 0, array.length-1).start(); 
try { 
    completionSignal.await(1000, TimeUnit.SECONDS); 
} catch(Exception ex){ 
    ex.printStackTrace(); 
} 

Tôi đã thử nghiệm này để chống lại Arrays.sort và tuần tự thuật toán sắp xếp nhanh chóng tương tự. Đây là kết quả thời gian trên một intel đấu-core máy tính xách tay dell, chỉ trong vài giây:

Elements: 500.000, tuần tự: 0,068592, luồng: 0,046871, Arrays.sort: 0,079677

Elements: 1.000.000, tuần tự: 0,14416, luồng: 0,095492, Arrays.sort: 0,167155

Elements: 2.000.000, tuần tự: 0,301666, luồng: 0,205719, Arrays.sort: 0,350982

Elements: 4.000.000, tuần tự: 0,623291, luồng: 0,424119, Arrays.sort: 0,712698

Elements: 8.000.000, tuần tự: 1,279374, luồng: 0,859363, Arrays.sort: 1,487671

Mỗi số ở trên là thời gian trung bình của 100 bài kiểm tra, ném ra 3 trường hợp thấp nhất và 3 trường hợp cao nhất. Tôi đã sử dụng Random.nextInt (Integer.MAX_VALUE) để tạo một mảng cho mỗi bài kiểm tra, được khởi tạo mỗi 10 bài kiểm tra với cùng một hạt giống. Mỗi bài kiểm tra bao gồm thời gian thuật toán đã cho với System.nanoTime. Tôi làm tròn đến sáu chữ số thập phân sau khi lấy trung bình. Và rõ ràng, tôi đã kiểm tra xem từng loại có hoạt động hay không.

Như bạn có thể thấy, tốc độ tăng khoảng 32% giữa các trường hợp tuần tự và luồng trong mỗi bộ kiểm tra. Như tôi đã hỏi ở trên, tôi có nên mong đợi nhiều hơn thế không?

+1

câu hỏi của bạn là gì? –

+0

Bạn đang nói về ngôn ngữ nào? Bạn có muốn biết làm thế nào để sắp xếp trong Java hoặc C# –

+0

Vì vậy, câu hỏi là làm thế nào để thực hiện cú pháp này trong Java và tối ưu hóa các chủ đề làm việc với CPU. –

Trả lời

10

Làm numThreads tĩnh có thể gây ra sự cố, rất có khả năng bạn sẽ kết thúc với nhiều hơn MAX_THREADS đang chạy tại một số thời điểm.

Có lẽ lý do tại sao bạn không có được hiệu suất gấp đôi đầy đủ là việc sắp xếp nhanh chóng của bạn không thể hoàn toàn song song. Lưu ý rằng cuộc gọi đầu tiên tới quicksort sẽ thực hiện một lần truyền qua toàn bộ mảng trong chuỗi ban đầu trước khi nó bắt đầu thực sự chạy song song. Ngoài ra còn có một đầu vào song song hóa một thuật toán trong các hình thức chuyển đổi ngữ cảnh và chuyển đổi chế độ khi nuôi ra để tách chủ đề.

Hãy nhìn vào khung Fork/Join, vấn đề này có thể sẽ khá gọn gàng ở đó.

Một vài điểm về triển khai. Thực hiện Runnable thay vì mở rộng Thread. Mở rộng một Thread chỉ nên được sử dụng khi bạn tạo một số phiên bản mới của lớp Thread. Khi bạn chỉ muốn thực hiện một số công việc để chạy song song, bạn nên sử dụng Runnable. Trong khi thực hiện một Runnable bạn vẫn có thể mở rộng một lớp khác cho phép bạn linh hoạt hơn trong thiết kế OO. Sử dụng một hồ bơi thread được giới hạn số lượng các chủ đề bạn có sẵn trong hệ thống. Ngoài ra không sử dụng numThreads để đưa ra quyết định về việc có nên bỏ qua một chuỗi mới hay không. Bạn có thể tính toán điều này lên phía trước. Sử dụng kích thước phân vùng tối thiểu là kích thước của tổng mảng được chia cho số lượng bộ xử lý có sẵn. Một cái gì đó như:

public class ThreadedQuick implements Runnable { 

    public static final int MAX_THREADS = Runtime.getRuntime().availableProcessors(); 
    static final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS); 

    final int[] my_array; 
    final int start, end; 

    private final int minParitionSize; 

    public ThreadedQuick(int minParitionSize, int[] array, int start, int end) { 
     this.minParitionSize = minParitionSize; 
     this.my_array = array; 
     this.start = start; 
     this.end = end; 
    } 

    public void run() { 
     quicksort(my_array, start, end); 
    } 

    public void quicksort(int[] array, int start, int end) { 
     int len = end - start + 1; 

     if (len <= 1) 
      return; 

     int pivot_index = medianOfThree(array, start, end); 
     int pivotValue = array[pivot_index]; 

     swap(array, pivot_index, end); 

     int storeIndex = start; 
     for (int i = start; i < end; i++) { 
      if (array[i] <= pivotValue) { 
       swap(array, i, storeIndex); 
       storeIndex++; 
      } 
     } 

     swap(array, storeIndex, end); 

     if (len > minParitionSize) { 

      ThreadedQuick quick = new ThreadedQuick(minParitionSize, array, start, storeIndex - 1); 
      Future<?> future = executor.submit(quick); 
      quicksort(array, storeIndex + 1, end); 

      try { 
       future.get(1000, TimeUnit.SECONDS); 
      } catch (Exception ex) { 
       ex.printStackTrace(); 
      } 
     } else { 
      quicksort(array, start, storeIndex - 1); 
      quicksort(array, storeIndex + 1, end); 
     } 
    }  
} 

Bạn có thể kick nó ra bằng cách thực hiện:

ThreadedQuick quick = new ThreadedQuick(array/ThreadedQuick.MAX_THREADS, array, 0, array.length - 1); 
quick.run(); 

này sẽ bắt đầu các loại trong cùng một thread, mà tránh một hop chủ đề không cần thiết khi khởi động lên.

Lưu ý: Không chắc việc triển khai ở trên thực sự sẽ nhanh hơn vì tôi chưa đánh giá nó.

1

Couple ý kiến ​​nếu tôi hiểu đúng mã của bạn:

  1. Tôi không thấy một khóa xung quanh numthreads đối tượng mặc dù nó có thể được truy cập thông qua nhiều chủ đề. Có lẽ bạn nên biến nó thành AtomicInteger.

  2. Sử dụng nhóm chủ đề và sắp xếp các tác vụ, tức là một cuộc gọi đơn lẻ đến quicksort, để tận dụng lợi thế của một nhóm chủ đề. Sử dụng tương lai.

Phương pháp chia phân chia hiện tại của bạn có thể để phân chia nhỏ hơn với chỉ và một bộ phận lớn hơn không có chỉ. Đó là để nói, nó không ưu tiên phân đoạn lớn hơn với chủ đề của riêng họ.

+0

Trên thực tế cho một int nguyên thủy bạn có thể sử dụng dễ bay hơi để đảm bảo bạn làm o bộ nhớ và không thông qua bộ nhớ cache. –

3

Điều này sử dụng kết hợp sắp xếp nhanh và sắp xếp hợp nhất.

import java.util.Arrays; 
import java.util.Random; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

public class ParallelSortMain { 
    public static void main(String... args) throws InterruptedException { 
     Random rand = new Random(); 
     final int[] values = new int[100*1024*1024]; 
     for (int i = 0; i < values.length; i++) 
      values[i] = rand.nextInt(); 

     int threads = Runtime.getRuntime().availableProcessors(); 
     ExecutorService es = Executors.newFixedThreadPool(threads); 
     int blockSize = (values.length + threads - 1)/threads; 
     for (int i = 0; i < values.length; i += blockSize) { 
      final int min = i; 
      final int max = Math.min(min + blockSize, values.length); 
      es.submit(new Runnable() { 
       @Override 
       public void run() { 
        Arrays.sort(values, min, max); 
       } 
      }); 
     } 
     es.shutdown(); 
     es.awaitTermination(10, TimeUnit.MINUTES); 
     for (int blockSize2 = blockSize; blockSize2 < values.length/2; blockSize2 *= 2) { 
      for (int i = 0; i < values.length; i += blockSize2) { 
       final int min = i; 
       final int mid = Math.min(min + blockSize2, values.length); 
       final int max = Math.min(min + blockSize2 * 2, values.length); 
       mergeSort(values, min, mid, max); 
      } 
     } 
    } 

    private static boolean mergeSort(int[] values, int left, int mid, int end) { 
     int[] results = new int[end - left]; 
     int l = left, r = mid, m = 0; 
     for (; l < left && r < mid; m++) { 
      int lv = values[l]; 
      int rv = values[r]; 
      if (lv < rv) { 
       results[m] = lv; 
       l++; 
      } else { 
       results[m] = rv; 
       r++; 
      } 
     } 
     while (l < mid) 
      results[m++] = values[l++]; 
     while (r < end) 
      results[m++] = values[r++]; 
     System.arraycopy(results, 0, values, left, results.length); 
     return false; 
    } 
}