2013-07-05 20 views
52

Teaser: các bạn, câu hỏi này không phải là về cách triển khai chính sách thử lại. Đó là về việc hoàn thành chính xác khối TPL Dataflow.Thực hiện hoàn thành chính xác một khối có thể thử lại

Câu hỏi này chủ yếu là sự tiếp nối câu hỏi trước của tôi Retry policy within ITargetBlock. Câu trả lời cho câu hỏi này là giải pháp thông minh của @ svick sử dụng TransformBlock (nguồn) và TransformManyBlock (mục tiêu). Vấn đề duy nhất còn lại là hoàn thành khối này theo cách đúng đắn : chờ cho tất cả các lần thử lại được hoàn thành trước, sau đó hoàn tất khối mục tiêu. Đây là những gì tôi đã kết thúc với (nó chỉ là một đoạn, không phải trả quá nhiều sự chú ý đến một retries bộ phi thread):

var retries = new HashSet<RetryingMessage<TInput>>(); 

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null; 
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message => 
    { 
     try 
     { 
      var result = new[] { await transform(message.Data) }; 
      retries.Remove(message); 
      return result; 
     } 
     catch (Exception ex) 
     { 
      message.Exceptions.Add(ex); 
      if (message.RetriesRemaining == 0) 
      { 
       if (failureHandler != null) 
        failureHandler(message.Exceptions); 

       retries.Remove(message); 
      } 
      else 
      { 
       retries.Add(message); 
       message.RetriesRemaining--; 

       Task.Delay(retryDelay) 
        .ContinueWith(_ => target.Post(message)); 
      } 
      return null; 
     } 
    }, dataflowBlockOptions); 

source.LinkTo(target); 

source.Completion.ContinueWith(async _ => 
{ 
    while (target.InputCount > 0 || retries.Any()) 
     await Task.Delay(100); 

    target.Complete(); 
}); 

Ý tưởng là để thực hiện một số loại phiếu và xác minh xem có các thư vẫn đang chờ xử lý và không có thư nào yêu cầu thử lại. Nhưng trong giải pháp này tôi không thích ý tưởng bỏ phiếu.

Có, tôi có thể đóng gói logic của việc thêm/xóa thử lại vào một lớp riêng biệt và thậm chí ví dụ: thực hiện một số hành động khi tập hợp các lần thử lại bị trống, nhưng cách xử lý với điều kiện target.InputCount > 0? Không có một cuộc gọi lại như vậy được gọi khi không có tin nhắn đang chờ xử lý cho khối, do đó, có vẻ như việc xác minh target.ItemCount trong một vòng lặp với một sự chậm trễ nhỏ là một lựa chọn duy nhất.

Có ai biết cách thông minh hơn để đạt được điều này không?

+1

Dường như ITargetBlock hỗ trợ thông báo dựa trên đẩy thông qua người quan sát được trả về bởi phương pháp Mở rộng AsObserver. Xem http://msdn.microsoft.com/en-us/library/hh160359.aspx và http://msdn.microsoft.com/en-us/library/ee850490.aspx. – JamieSee

+0

Có vẻ như bạn đang cố gắng sử dụng các ngoại lệ như luồng chương trình bình thường, đó là thực hành không tốt. Tìm kiếm trên Google hoặc xem chủ đề sau trên SO: http://stackoverflow.com/questions/729379/why-not-use-exceptions-as-regular-flow-of-control Tất cả logic thử lại nên ở trong khối thử, không phải trong khối ngoại lệ. Không phải là câu trả lời cho câu hỏi của bạn mà là điều tôi nghĩ bạn nên biết. – Nullius

+4

@Nullius, thử lại logic dựa trên * ngoại lệ * - thử lại trong trường hợp có lỗi tạm thời. Tôi không nghĩ rằng thử lại logic trong một khối 'try' là một ý tưởng tốt, vì bạn không biết loại lỗi và cho dù loại lỗi này là thoáng qua hay không. – Alex

Trả lời

1

Kết hợp câu trả lời hwcverwe và JamieSee bình luận có thể là giải pháp lý tưởng.

Trước tiên, bạn cần phải tạo ra nhiều hơn một sự kiện:

var signal = new ManualResetEvent(false); 
var completedEvent = new ManualResetEvent(false); 

Sau đó, bạn phải tạo một người quan sát, và đăng ký với TransformManyBlock, vì vậy bạn sẽ được thông báo khi một sự kiện có liên quan xảy ra:

var observer = new RetryingBlockObserver<TOutput>(completedEvent); 
var observable = target.AsObservable(); 
observable.Subscribe(observer); 

Các quan sát được có thể khá dễ dàng:

private class RetryingBlockObserver<T> : IObserver<T> { 
     private ManualResetEvent completedEvent; 

     public RetryingBlockObserver(ManualResetEvent completedEvent) {     
      this.completedEvent = completedEvent; 
     } 

     public void OnCompleted() { 
      completedEvent.Set(); 
     } 

     public void OnError(Exception error) { 
      //TODO 
     } 

     public void OnNext(T value) { 
      //TODO 
     } 
    } 

Và bạn ca n chờ tín hiệu hoặc hoàn thành (hết hiệu lực của tất cả các mục nguồn) hoặc cả hai

source.Completion.ContinueWith(async _ => { 

      WaitHandle.WaitAll(completedEvent, signal); 
      // Or WaitHandle.WaitAny, depending on your needs! 

      target.Complete(); 
     }); 

Bạn có thể kiểm tra giá trị kết quả của WaitAll để hiểu sự kiện nào đã được đặt và phản ứng tương ứng. Bạn cũng có thể thêm các sự kiện khác vào mã, chuyển chúng đến người quan sát để có thể đặt chúng khi cần.Bạn có thể phân biệt hành vi của mình và phản hồi khác nhau khi lỗi được nâng lên, ví dụ:

2

Có thể một ManualResetEvent có thể thực hiện thủ thuật cho bạn.

Thêm một tài sản công cộng để TransformManyBlock

private ManualResetEvent _signal = new ManualResetEvent(false); 
public ManualResetEvent Signal { get { return _signal; } } 

Và ở đây bạn đi:

var retries = new HashSet<RetryingMessage<TInput>>(); 

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null; 
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message => 
    { 
     try 
     { 
      var result = new[] { await transform(message.Data) }; 
      retries.Remove(message); 

      // Sets the state of the event to signaled, allowing one or more waiting threads to proceed 
      if(!retries.Any()) Signal.Set(); 
      return result; 
     } 
     catch (Exception ex) 
     { 
      message.Exceptions.Add(ex); 
      if (message.RetriesRemaining == 0) 
      { 
       if (failureHandler != null) 
        failureHandler(message.Exceptions); 

       retries.Remove(message); 

       // Sets the state of the event to signaled, allowing one or more waiting threads to proceed 
       if(!retries.Any()) Signal.Set(); 
      } 
      else 
      { 
       retries.Add(message); 
       message.RetriesRemaining--; 

       Task.Delay(retryDelay) 
        .ContinueWith(_ => target.Post(message)); 
      } 
      return null; 
     } 
    }, dataflowBlockOptions); 

source.LinkTo(target); 

source.Completion.ContinueWith(async _ => 
{ 
    //Blocks the current thread until the current WaitHandle receives a signal. 
    target.Signal.WaitOne(); 

    target.Complete(); 
}); 

Tôi không chắc chắn nơi target.InputCount của bạn được thiết lập. Vì vậy, tại nơi bạn thay đổi target.InputCount bạn có thể thêm mã sau đây:

if(InputCount == 0) Signal.Set(); 
+0

Điều này là: 'target.InputCount' là một hộp đen * - đó là thuộc tính chỉ đọc của' TransformManyBlock' từ TPL Dataflow. – Alex