2012-04-18 10 views
15

Tôi có một định lượng & vấn đề lặp lại bằng cách sử dụng thư viện Parallel Task, BlockingCollection<T>, ConcurrentQueue<T> & GetConsumingEnumerable trong khi cố gắng để tạo ra một đường ống dẫn đơn giản.Tại sao iterating trên GetConsumingEnumerable() không hoàn toàn trống nằm bên dưới bộ sưu tập chặn

Tóm lại, thêm các mục vào một mặc định BlockingCollection<T> (mà dưới mui xe được dựa vào một ConcurrentQueue<T>) từ một thread, không đảm bảo rằng họ sẽ được popped ra khỏi BlockingCollection<T> từ một thread gọi phương pháp GetConsumingEnumerable().

Tôi đã tạo một Ứng dụng Winforms rất đơn giản để tái tạo/mô phỏng điều này chỉ in số nguyên vào màn hình.

  • Timer1 có trách nhiệm xếp hàng các hạng mục công trình ... Nó sử dụng một từ điển đồng thời gọi _tracker để nó biết những gì nó đã được bổ sung vào bộ sưu tập chặn.
  • Timer2 chỉ được đăng nhập trạng thái đếm cả BlockingCollection & của nút _tracker
  • START khởi động một Paralell.ForEach mà chỉ đơn giản lặp trên các bộ sưu tập chặn GetConsumingEnumerable() và bắt đầu in chúng vào hộp danh sách thứ hai.
  • Nút DỪNG dừng lại Timer1 ngăn không cho thêm nhiều mục nhập vào bộ sưu tập chặn.
public partial class Form1 : Form 
{ 
    private int Counter = 0; 
    private BlockingCollection<int> _entries; 
    private ConcurrentDictionary<int, int> _tracker; 
    private CancellationTokenSource _tokenSource; 
    private TaskFactory _factory; 

    public Form1() 
    { 
     _entries = new BlockingCollection<int>(); 
     _tracker = new ConcurrentDictionary<int, int>(); 
     _tokenSource = new CancellationTokenSource(); 
     _factory = new TaskFactory(); 
     InitializeComponent(); 
    } 

    private void timer1_Tick(object sender, EventArgs e) 
    { //ADDING TIMER -> LISTBOX 1 
     for(var i = 0; i < 3; i++,Counter++) 
     { 
      if (_tracker.TryAdd(Counter, Counter)) 
      _entries.Add(Counter); 
      listBox1.Items.Add(string.Format("Adding {0}", Counter)); 
     } 
    } 

    private void timer2_Tick_1(object sender, EventArgs e) 
    { //LOGGING TIMER -> LIST BOX 3 
     listBox3.Items.Add(string.Format("Tracker Count : {0}/Entries Count : {1}", _tracker.Count, _entries.Count)); 
    } 

    private void button1_Click(object sender, EventArgs e) 
    { //START BUTTON -> LOGS TO LIST BOX 2 

     var options = new ParallelOptions { 
           CancellationToken = _tokenSource.Token, 
           MaxDegreeOfParallelism = 1 
          }; 

     _factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); }); 

     timer1.Enabled = timer2.Enabled = true; 
     timer1.Start(); 
     timer2.Start(); 
    } 

    private void DoWork(int entry) 
    { 
     Thread.Sleep(1000); //Sleep for 1 second to simulate work being done. 
     Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry)))); 
     int oldEntry; 
     _tracker.TryRemove(entry, out oldEntry); 
    } 

    private void button2_Click(object sender, EventArgs e) 
    { //STOP BUTTON 
     timer1.Stop(); 
     timer1.Enabled = false; 
    } 

Đây là chuỗi các sự kiện:

  • Nhấn Start
  • Timer1 ve & ListBox1 ngay lập tức được cập nhật với 3 thông điệp (Thêm 0, 1, 2)
  • ListBox2 được cập nhật tiếp theo với 3 tin nhắn, cách nhau 1 giây
    • Processing 0
    • Processing 1
    • Processing 2
  • Timer1 ve & ListBox1 ngay lập tức được cập nhật với 3 thông điệp (Thêm 3, 4, 5)
  • ListBox2 được sbsequent cập nhật với 2 bài viết, 1 thứ hai ngoài
    • Processing 3
    • Processing 4
    • Processing 5 không được in ... sẽ dường như đã "mất tích"
  • Nhấn STOP để ngăn chặn nhiều tin nhắn được bổ sung bởi timer 1
  • Chờ ..."Xử lý 5" vẫn không xuất hiện

Missing Entry

Bạn có thể thấy rằng các từ điển đồng thời vẫn theo dõi đó 1 item vẫn chưa được xử lý & sau đó lấy ra từ _tracker

Nếu tôi Nhấn Start một lần nữa, sau đó timer1 bắt đầu thêm nhiều hơn 3 mục và vòng lặp song song trở lại với cuộc sống in ấn 5, 6, 7 & 8.

Entry returned after subsequent items shoved in behind it

Tôi mất hoàn toàn về lý do tại sao điều này xảy ra. Gọi bắt đầu một lần nữa rõ ràng là gọi một newtask, trong đó kêu gọi một foreach song song, và tái thực hiện GetConsumingEnumerable() mà kỳ diệu phát hiện sự xâm nhập bị mất ... Tôi

Tại sao BlockingCollection.GetConsumingEnumerable() không đảm bảo để lặp qua tất cả các mục đó là thêm vào bộ sưu tập.

Tại sao việc thêm nhiều mục nhập hơn sau đó khiến nó bị "bỏ liên tục" và tiếp tục xử lý?

+0

Xin cảm ơn các bạn. Cả hai đã chỉ cho tôi đúng hướng dẫn tôi về phía này. @Svick này có lẽ là lý do tại sao nó không phải là một vấn đề trong .net4.5 beta http://connect.microsoft.com/VisualStudio/feedback/details/674705/blockingcollection-getconsumingenumerable-and-parallel-foreach-hang và steven toup cho nhóm MS Parallel đã thực sự viết blog về vấn đề này. http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx –

Trả lời

17

Bạn không thể sử dụng GetConsumingEnumerable() trong Parallel.ForEach().

Sử dụng GetConsumingPartitioner từ TPL extras

Trong bài viết trên blog bạn cũng sẽ nhận được một lời giải thích lý do tại sao không thể sử dụng GetConsumingEnumerable()

Thuật toán phân vùng làm việc mặc định của cả hai Parallel.ForEach và PLINQ sử dụng chunking để giảm thiểu chi phí đồng bộ hóa: thay vì lấy khóa một lần cho mỗi phần tử, nó sẽ lấy khóa, lấy một nhóm các phần tử (một đoạn) và sau đó nhả khóa.

tức là Parallel.ForEach đợi cho đến khi nó nhận được một nhóm mục công việc trước khi tiếp tục. Chính xác những gì thử nghiệm của bạn hiển thị.

+2

Các tính năng bổ sung TPL là theo giấy phép MS-LPL có nghĩa là nếu bạn sử dụng chúng, bạn đang khóa toàn bộ tác phẩm phái sinh của mình lên Windows. Nó không phải là giấy phép được OSI phê chuẩn ... –

+0

@Daniel, Tốt để biết. Cảm ơn các cập nhật. Bạn có biết rằng nếu không phải cửa sổ TPL cũng sử dụng một phân vùng nhóm như mặc định – adrianm

2

Tôi không thể sao chép hành vi của bạn bằng ứng dụng giao diện điều khiển đơn giản thực hiện cùng một điều (chạy trên .Net 4.5 beta, có thể tạo sự khác biệt). Nhưng tôi nghĩ rằng lý do này xảy ra là Parallel.ForEach() cố gắng tối ưu hóa việc thực hiện bằng cách chia bộ sưu tập đầu vào thành nhiều phần. Và với số đếm của bạn, một đoạn không thể được tạo ra cho đến khi bạn thêm nhiều mục vào bộ sưu tập. Để biết thêm thông tin, xem Custom Partitioners for PLINQ and TPL on MSDN.

Để khắc phục điều này, không sử dụng Parallel.ForEach(). Nếu bạn vẫn muốn xử lý các mục song song, bạn có thể bắt đầu Task trong mỗi lần lặp lại.

+1

Cảm ơn bạn đã kiểm tra. Một trong những tính năng "đẹp" mà parallel.foreach cung cấp là khả năng điều tiết MaxDegreesOfParallelism cho tôi (phiên bản thế giới thực đang gọi một Dịch vụ WCF). Nếu tôi chỉ cần xoay qua một foreach bình thường để làm mới Nhiệm vụ trong mỗi lần lặp lại, bạn sẽ đề nghị tôi điều chỉnh số lượng tối đa các nhiệm vụ đồng nhất như thế nào. –

0

Tôi cảm thấy như tôi cần lưu ý chỉ để rõ ràng rằng trong trường hợp bạn có thể gọi phương thức .CompleteAdding() của BlockingCollection trước khi thực thi Parallel.foreach, vấn đề bạn mô tả ở trên sẽ không thành vấn đề. Tôi đã sử dụng hai đối tượng này với nhau nhiều lần với kết quả tuyệt vời.

Bên cạnh đó, bạn luôn có thể tái thiết lập BlockingCollection của bạn sau khi gọi CompleteAdding() để thêm các mục khi cần thiết (_entries = new BlockingCollection();)

Thay đổi mã sự kiện click trên như sau sẽ giải quyết của bạn vấn đề với mục mất tích và làm cho nó làm việc như mong đợi, nếu bạn nhấp vào nút khởi động và dừng nhiều lần:

private void button2_Click(object sender, EventArgs e) 
{ //STOP BUTTON 
    timer1.Stop(); 
    timer1.Enabled = false; 
>>>>_entries.CompleteAdding(); 
>>>>_entries = new BlockingCollection<int>(); 
} 
5

Tính đến .net 4.5, bạn có thể tạo một phân vùng đó sẽ chỉ mất 1 mục tại một thời điểm :

var partitioner = Partitioner.Create(jobsBatchesQ.queue.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering); 
Parallel.ForEach(partitioner, new ParallelOptions { MaxDegreeOfParallelism = (currentTask.ParallelLevel > 0 ? currentTask.ParallelLevel : 1) }, (batch, state) => {//do stuff} 

https://msdn.microsoft.com/en-us/library/system.collections.concurrent.enumerablepartitioneroptions(v=vs.110).aspx

+0

Tôi xác nhận rằng như của .NET 4.5, đây là tùy chọn ưa thích. – MaYaN

+0

Câu trả lời được chấp nhận là thông tin. Điều này dường như được cập nhật nhiều hơn. –