Tôi có một định lượng & vấn đề lặp lại bằng cách sử dụng thư viện Parallel Task, BlockingCollection<T>
, ConcurrentQueue<T>
& GetConsumingEnumerable
trong khi cố gắng để tạo ra một đường ống dẫn đơn giản.Tại sao iterating trên GetConsumingEnumerable() không hoàn toàn trống nằm bên dưới bộ sưu tập chặn
Tóm lại, thêm các mục vào một mặc định BlockingCollection<T>
(mà dưới mui xe được dựa vào một ConcurrentQueue<T>
) từ một thread, không đảm bảo rằng họ sẽ được popped ra khỏi BlockingCollection<T>
từ một thread gọi phương pháp GetConsumingEnumerable()
.
Tôi đã tạo một Ứng dụng Winforms rất đơn giản để tái tạo/mô phỏng điều này chỉ in số nguyên vào màn hình.
Timer1
có trách nhiệm xếp hàng các hạng mục công trình ... Nó sử dụng một từ điển đồng thời gọi_tracker
để nó biết những gì nó đã được bổ sung vào bộ sưu tập chặn.Timer2
chỉ được đăng nhập trạng thái đếm cảBlockingCollection
& của nút_tracker
- START khởi động một
Paralell.ForEach
mà chỉ đơn giản lặp trên các bộ sưu tập chặnGetConsumingEnumerable()
và bắt đầu in chúng vào hộp danh sách thứ hai. - Nút DỪNG dừng lại
Timer1
ngăn không cho thêm nhiều mục nhập vào bộ sưu tập chặn.
public partial class Form1 : Form
{
private int Counter = 0;
private BlockingCollection<int> _entries;
private ConcurrentDictionary<int, int> _tracker;
private CancellationTokenSource _tokenSource;
private TaskFactory _factory;
public Form1()
{
_entries = new BlockingCollection<int>();
_tracker = new ConcurrentDictionary<int, int>();
_tokenSource = new CancellationTokenSource();
_factory = new TaskFactory();
InitializeComponent();
}
private void timer1_Tick(object sender, EventArgs e)
{ //ADDING TIMER -> LISTBOX 1
for(var i = 0; i < 3; i++,Counter++)
{
if (_tracker.TryAdd(Counter, Counter))
_entries.Add(Counter);
listBox1.Items.Add(string.Format("Adding {0}", Counter));
}
}
private void timer2_Tick_1(object sender, EventArgs e)
{ //LOGGING TIMER -> LIST BOX 3
listBox3.Items.Add(string.Format("Tracker Count : {0}/Entries Count : {1}", _tracker.Count, _entries.Count));
}
private void button1_Click(object sender, EventArgs e)
{ //START BUTTON -> LOGS TO LIST BOX 2
var options = new ParallelOptions {
CancellationToken = _tokenSource.Token,
MaxDegreeOfParallelism = 1
};
_factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });
timer1.Enabled = timer2.Enabled = true;
timer1.Start();
timer2.Start();
}
private void DoWork(int entry)
{
Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
int oldEntry;
_tracker.TryRemove(entry, out oldEntry);
}
private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
timer1.Stop();
timer1.Enabled = false;
}
Đây là chuỗi các sự kiện:
- Nhấn Start
- Timer1 ve & ListBox1 ngay lập tức được cập nhật với 3 thông điệp (Thêm 0, 1, 2)
- ListBox2 được cập nhật tiếp theo với 3 tin nhắn, cách nhau 1 giây
- Processing 0
- Processing 1
- Processing 2
- Timer1 ve & ListBox1 ngay lập tức được cập nhật với 3 thông điệp (Thêm 3, 4, 5)
- ListBox2 được sbsequent cập nhật với 2 bài viết, 1 thứ hai ngoài
- Processing 3
- Processing 4
- Processing 5 không được in ... sẽ dường như đã "mất tích"
- Nhấn STOP để ngăn chặn nhiều tin nhắn được bổ sung bởi timer 1
- Chờ ..."Xử lý 5" vẫn không xuất hiện
Bạn có thể thấy rằng các từ điển đồng thời vẫn theo dõi đó 1 item vẫn chưa được xử lý & sau đó lấy ra từ _tracker
Nếu tôi Nhấn Start một lần nữa, sau đó timer1 bắt đầu thêm nhiều hơn 3 mục và vòng lặp song song trở lại với cuộc sống in ấn 5, 6, 7 & 8.
Tôi mất hoàn toàn về lý do tại sao điều này xảy ra. Gọi bắt đầu một lần nữa rõ ràng là gọi một newtask, trong đó kêu gọi một foreach song song, và tái thực hiện GetConsumingEnumerable() mà kỳ diệu phát hiện sự xâm nhập bị mất ... Tôi
Tại sao BlockingCollection.GetConsumingEnumerable()
không đảm bảo để lặp qua tất cả các mục đó là thêm vào bộ sưu tập.
Tại sao việc thêm nhiều mục nhập hơn sau đó khiến nó bị "bỏ liên tục" và tiếp tục xử lý?
Xin cảm ơn các bạn. Cả hai đã chỉ cho tôi đúng hướng dẫn tôi về phía này. @Svick này có lẽ là lý do tại sao nó không phải là một vấn đề trong .net4.5 beta http://connect.microsoft.com/VisualStudio/feedback/details/674705/blockingcollection-getconsumingenumerable-and-parallel-foreach-hang và steven toup cho nhóm MS Parallel đã thực sự viết blog về vấn đề này. http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx –