2012-04-09 20 views
16

chéo được đưa lên http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9biểu kiến ​​BufferBlock.Post/Receive/ReceiveAsync đua/lỗi

Tôi biết ... Tôi không thực sự sử dụng TplDataflow để tối đa tiềm năng của nó. ATM Tôi chỉ đơn giản là sử dụng BufferBlock làm hàng đợi an toàn cho việc chuyển tin nhắn, nơi nhà sản xuất và người tiêu dùng đang chạy ở các mức giá khác nhau. Tôi thấy một số hành vi kỳ lạ khiến tôi bối rối như thế nào để tiến hành .

private BufferBlock<object> messageQueue = new BufferBlock<object>(); 

public void Send(object message) 
{ 
    var accepted=messageQueue.Post(message); 
    logger.Info("Send message was called qlen = {0} accepted={1}", 
    messageQueue.Count,accepted); 
} 

public async Task<object> GetMessageAsync() 
{ 
    try 
    { 
     var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30)); 
     //despite messageQueue.Count>0 next line 
     //occasionally does not execute 
     logger.Info("message received"); 
     //....... 
    } 
    catch(TimeoutException) 
    { 
     //do something 
    } 
} 

Trong mã trên (là một phần của giải pháp phân phối 2000), Send được gọi định kỳ 100ms hoặc hơn. Điều này có nghĩa là một mục là Post được chỉnh sửa thành messageQueue vào khoảng 10 lần mỗi giây. Điều này đã được xác minh. Tuy nhiên, đôi khi có vẻ như ReceiveAsync không hoàn thành trong khoảng thời gian chờ (tức là Post không gây ra ReceiveAsync để hoàn tất) và TimeoutException sẽ được cập nhật sau 30 giây. Tại thời điểm này, messageQueue.Count nằm trong hàng trăm. Điều này là bất ngờ. Vấn đề này đã được quan sát thấy ở tốc độ gửi bài chậm hơn (1 bài/giây) và thường xảy ra trước khi 1000 mục đã vượt qua BufferBlock.

Vì vậy, để làm việc xung quanh vấn đề này, tôi đang sử dụng đoạn mã sau, mà làm việc, nhưng đôi khi gây ra độ trễ 1s khi nhận được (do lỗi ở trên xảy ra)

public async Task<object> GetMessageAsync() 
    { 
     try 
     { 
      object m; 
      var attempts = 0; 
      for (; ;) 
      { 
       try 
       { 
        m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1)); 
       } 
       catch (TimeoutException) 
       { 
        attempts++; 
        if (attempts >= 30) throw; 
        continue; 
       } 
       break; 

      } 

      logger.Info("message received"); 
      //....... 
     } 
     catch(TimeoutException) 
     { 
      //do something 
     } 
    } 

này trông giống như một điều kiện chủng tộc trong TDF với tôi, nhưng tôi không thể hiểu được lý do tại sao điều này không xảy ra ở những nơi khác mà tôi sử dụng BufferBlock theo cách tương tự. Thử nghiệm thay đổi từ ReceiveAsync thành Receive không giúp ích gì. Tôi đã không kiểm tra, nhưng tôi tưởng tượng trong sự cô lập, các mã ở trên hoạt động hoàn hảo. Đó là một mẫu mà tôi đã thấy trong "Giới thiệu về TPL Dataflow" tpldataflow.docx.

Tôi có thể làm gì để tìm hiểu điều này? Có bất kỳ số liệu nào có thể giúp suy ra điều gì đang xảy ra không? Nếu tôi không thể tạo ra một trường hợp kiểm tra đáng tin cậy, tôi có thể cung cấp thêm thông tin gì?

Trợ giúp!

+1

Tôi không thấy bất kỳ điều gì sai trái với những gì bạn đang làm hoặc mong đợi của bạn ở đây. Tôi chắc chắn nghĩ rằng bạn cần phải giữ cho hoạt động này trên diễn đàn MSDN hơn ở đây. Bạn đã nhận được sự chú ý của @StephenToub và anh ấy chắc chắn là người bạn muốn nhìn vào nó. –

+0

Không. Chưa bao giờ đến đáy của nó. Tôi không thể tái tạo vấn đề trong một ví dụ nhỏ, khép kín. Vì tôi chỉ sử dụng BufferBlock nên tôi đã triển khai thực hiện hàng đợi async của riêng mình. Tôi không phải thay đổi bất kỳ mã nào khác ... Tôi chỉ đơn giản là reimplemented các phần của giao diện BufferBlock mà tôi đã sử dụng. Làm việc một điều trị bây giờ, mà lá tôi nghĩ rằng có cái gì đó không ổn, nhưng tôi không thể chứng minh điều đó. Grr. – spender

+0

@spendor Rất thú vị, kỳ quặc, đủ, tôi đã loại bỏ việc thực hiện hàng đợi đồng thời không đồng bộ của riêng mình sau khi tìm BufferBlock ...bây giờ tôi sẽ phải xem xét lại. Cảm ơn. –

Trả lời

1

Stephen dường như nghĩ rằng đây là giải pháp

var m = chờ đợi messageQueue.ReceiveAsync();

thay vì:

var m = chờ đợi messageQueue.ReceiveAsync (TimeSpan.FromSeconds (30));

Bạn có thể xác nhận hoặc từ chối điều này không?

+0

Điều đó không thành công. Nó không quan trọng mà quá tải ReceiveAsync tôi đã chọn, kết quả là như nhau. Xem bình luận của tôi ở trên cho độ phân giải của tôi. – spender