2012-06-25 11 views
9

Đơn đăng ký của tôi chấm dứt khi một người quan sát bị lỗi khi ObserveOn(Scheduler.ThreadPool) phát hiện lỗi. Cách duy nhất tôi đã tìm thấy để đối phó với điều này là bằng cách sử dụng một phương pháp mở rộng tùy chỉnh dưới đây (ngoài việc đảm bảo OnNext không bao giờ ném một ngoại lệ). Và sau đó đảm bảo rằng mỗi ObserveOn được theo sau là ExceptionToError.Làm cách nào để xử lý ngoại lệ trong OnNext khi sử dụng ObserveOn?

public static IObservable<T> ExceptionToError<T>(this IObservable<T> source) { 
     var sub = new Subject<T>(); 
     source.Subscribe(i => { 
      try { 
       sub.OnNext(i); 
      } catch (Exception err) { 
       sub.OnError(err); 
      } 
     } 
      , e => sub.OnError(e),() => sub.OnCompleted()); 
     return sub; 
    } 

Tuy nhiên, điều này không đúng. Có cách nào tốt hơn để giải quyết vấn đề này không?

Ví dụ

Chương trình này bị treo vì ngoại lệ còn tự do.

class Program { 
    static void Main(string[] args) { 
     try { 
      var xs = new Subject<int>(); 

      xs.ObserveOn(Scheduler.ThreadPool).Subscribe(x => { 
       Console.WriteLine(x); 
       if (x % 5 == 0) { 
        throw new System.Exception("Bang!"); 
       } 
      }, ex => Console.WriteLine("Caught:" + ex.Message)); // <- not reached 

      xs.OnNext(1); 
      xs.OnNext(2); 
      xs.OnNext(3); 
      xs.OnNext(4); 
      xs.OnNext(5); 
     } catch (Exception e) { 
      Console.WriteLine("Caught : " + e.Message); // <- also not reached 
     } finally { 

      Console.ReadKey(); 
     } 
    } 
} 

Trả lời

14

Chúng tôi đang giải quyết vấn đề này trong Rx v2.0, bắt đầu với bản phát hành RC. Bạn có thể đọc tất cả về nó trên blog của chúng tôi tại http://blogs.msdn.com/rxteam. Về cơ bản, nó xử lý lỗi xử lý kỷ luật trong chính đường ống dẫn, kết hợp với phương pháp mở rộng SubscribeSafe (để chuyển hướng lỗi trong quá trình đăng ký vào kênh OnError) và phương thức mở rộng Catch trên IScheduler (để bọc lịch biểu với logic xử lý ngoại lệ theo lịch biểu hành động).

Liên quan đến phương pháp ExceptionToError được đề xuất ở đây, nó có một lỗ hổng. Đối tượng đăng ký IDisposable vẫn có thể là null khi các cuộc gọi lại chạy; có một điều kiện chủng tộc cơ bản. Để giải quyết vấn đề này, bạn phải sử dụng một SingleAssignmentDisposable.

0

Bạn nói đúng - nó sẽ cảm thấy xấu. Sử dụng và trả lại các chủ đề như thế này không phải là một cách tốt để đi.

Ít nhất bạn nên thực hiện phương pháp này như sau:

public static IObservable<T> ExceptionToError<T>(this IObservable<T> source) 
{ 
    return Observable.Create<T>(o => 
    { 
     var subscription = (IDisposable)null; 
     subscription = source.Subscribe(x => 
     { 
      try 
      { 
       o.OnNext(x); 
      } 
      catch (Exception ex) 
      { 
       o.OnError(ex); 
       subscription.Dispose(); 
      } 
     }, e => o.OnError(e),() => o.OnCompleted()); 
     return subscription; 
    }); 
} 

Thông báo không có đối tượng sử dụng và rằng nếu tôi bắt một lỗi mà tôi sau đó xử lý các thuê bao để ngăn chặn các chuỗi liên tục qua một lỗi.

Tuy nhiên, tại sao không chỉ thêm trình xử lý OnError trong đăng ký của bạn. Một chút như sau:

var xs = new Subject<int>(); 

xs.ObserveOn(Scheduler.ThreadPool).Subscribe(x => 
{ 
    Console.WriteLine(x); 
    if (x % 5 == 0) 
    { 
     throw new System.Exception("Bang!"); 
    } 
}, ex => Console.WriteLine(ex.Message)); 

xs.OnNext(1); 
xs.OnNext(2); 
xs.OnNext(3); 
xs.OnNext(4); 
xs.OnNext(5); 

Mã này gặp lỗi đúng trong đăng ký.

Phương pháp khác là sử dụng phương pháp mở rộng Materialize, nhưng có thể hơi quá mức nếu các giải pháp trên không hoạt động.

+0

OnError không được gọi trong ví dụ OnError của bạn, trừ khi ExceptionToError được thêm vào (đã thử sử dụng ứng dụng bảng điều khiển và với dự án thử nghiệm đơn vị). Bạn đã thử nó chưa? Rất cám ơn sự cải tiến của ExceptionToError, tuy nhiên nó vẫn cảm thấy không tự nhiên. Có thể xử lý lỗi nên được thêm vào bộ lập lịch? Tức là một trình bao bọc SafeScheduler. – Herman

+0

Tôi đã xem xét 'Materialize' nhưng không thấy cách tôi có thể sử dụng. OnError không được gọi mà không có 'ExceptionToError'. Thử nghiệm nó với: public static IObservable Log (IObservable nguồn) { trở source.Materialize (Chọn (n => { Trace.WriteLine ("Lỗi săn:"). + N); trở n; }). Dematerialize(); } – Herman

+0

@Herman - Bạn có thể cung cấp một số mã hủy bỏ lỗi của bạn không? – Enigmativity

5

Có sự khác biệt giữa các lỗi trong đăng ký và lỗi ở mức có thể quan sát được. Xét nghiệm nhanh:

var xs = new Subject<int>(); 

xs.Subscribe(x => { Console.WriteLine(x); if (x % 3 == 0) throw new System.Exception("Error in subscription"); }, 
      ex => Console.WriteLine("Error in source: " + ex.Message)); 

Run với điều này và bạn sẽ nhận được một lỗi xử lý thoải mái tại nguồn:

xs.OnNext(1); 
xs.OnNext(2); 
xs.OnError(new Exception("from source")); 

Run với điều này và bạn sẽ nhận được một lỗi unhandled trong đăng ký:

xs.OnNext(1); 
xs.OnNext(2); 
xs.OnNext(3); 

Giải pháp của bạn đã xảy ra là có lỗi trong đăng ký và khiến lỗi trong nguồn. Và bạn đã thực hiện điều này trên luồng gốc, thay vì trên cơ sở mỗi đăng ký. Bạn có thể hoặc có thể không có ý định làm điều này, nhưng nó gần như chắc chắn sai.

Cách 'đúng' để thực hiện việc này là thêm xử lý lỗi bạn cần trực tiếp vào hành động đăng ký, là nơi nó thuộc về.Nếu bạn không muốn thay đổi các chức năng đăng ký của bạn trực tiếp, bạn có thể sử dụng một chút helper:

public static Action<T> ActionAndCatch<T>(Action<T> action, Action<Exception> catchAction) 
{ 
    return item => 
    { 
     try { action(item); } 
     catch (System.Exception e) { catchAction(e); } 
    }; 
} 

Và bây giờ sử dụng nó, một lần nữa cho thấy sự khác biệt giữa các lỗi khác nhau:

xs.Subscribe(ActionAndCatch<int>(x => { Console.WriteLine(x); if (x % 3 == 0) throw new System.Exception("Error in subscription"); }, 
           ex => Console.WriteLine("Caught error in subscription: " + ex.Message)), 
      ex => Console.WriteLine("Error in source: " + ex.Message)); 

Bây giờ chúng tôi có thể xử lý (riêng) các lỗi trong nguồn và lỗi trong đăng ký. Tất nhiên, bất kỳ những hành động này có thể được định nghĩa trong một phương pháp, làm cho mã trên đơn giản như (khả năng):

xs.Subscribe(ActionAndCatch(Handler, ExceptionHandler), SourceExceptionHandler); 

Sửa

Trong những ý kiến ​​sau đó chúng tôi bắt đầu thảo luận về thực tế là lỗi trong đăng ký đang trỏ đến lỗi trong chính luồng đó và bạn sẽ không muốn những người đăng ký khác trên luồng đó. Đây là loại hoàn toàn khác nhau của vấn đề. Tôi xin được nghiêng để viết một Validate mở rộng quan sát để xử lý tình huống này:

public static IObservable<T> Validate<T>(this IObservable<T> source, Predicate<T> valid) 
{ 
    return Observable.Create<T>(o => { 
     return source.Subscribe(
      x => { 
       if (valid(x)) o.OnNext(x); 
       else  o.OnError(new Exception("Could not validate: " + x)); 
      }, e => o.OnError(e),() => o.OnCompleted() 
     ); 
    }); 
} 

Sau đó, đơn giản để sử dụng, mà không cần trộn ẩn dụ (lỗi chỉ trong nguồn):

xs 
.Validate(x => x != 3) 
.Subscribe(x => Console.WriteLine(x), 
      ex => Console.WriteLine("Error in source: " + ex.Message)); 

Nếu bạn vẫn muốn ngoại lệ ức chế trong Subscribe bạn nên sử dụng một trong các phương pháp được thảo luận khác.

+0

Tôi đồng ý rằng làm cho nó có lỗi trong nguồn có thể không chính xác. Tuy nhiên, với giải pháp của bạn, mỗi người quan sát phải tự xử lý lỗi. Cách tiếp cận của tôi có thể được thay đổi thành 'ExceptionToAction (nguồn IObservable này, Hành động xử lý)'. Dù bằng cách nào, nó cảm thấy không hoàn toàn đúng với tôi. – Herman

+0

Tôi không thực sự hiểu. Trong mọi trường hợp, bạn cần cung cấp một trình xử lý ngoại lệ. Hoặc bạn có thể đính kèm trình xử lý ngoại lệ vào thời điểm đăng ký (phương thức ưa thích) hoặc bạn có thể đính kèm trình xử lý ngoại lệ vào nguồn có thể quan sát được. Sau này đi ngược lại thiết kế RX chung kể từ khi bạn đang sửa đổi một thuê bao trong tương lai trên quan sát của bạn, chứ không phải là quan sát chính nó. Rất 'hiệu quả phụ'. Thay đổi được đề xuất của bạn ('ExceptionToAction') là tối thiểu cần thiết, nhưng nó vẫn không làm cho nó rõ ràng rằng đây là trường hợp ngoại lệ trong đăng ký. – yamen

+0

Cũng lưu ý bạn nói "mỗi người quan sát phải tự xử lý lỗi". Nếu bạn muốn 'mở rộng' một cái gì đó, có lẽ tạo ra một người quan sát tiêu chuẩn xử lý các lỗi, và chỉ là một? Đó là bản chất 'ActionAndCatch' là gì, ngoại trừ đó là người quan sát tối thiểu cho' OnNext'. – yamen

3

Giải pháp hiện tại của bạn không lý tưởng. Như đã nêu bởi một trong những người Rx here:

Nhà khai thác Rx không bắt ngoại lệ xảy ra trong cuộc gọi đến OnNext, OnError hoặc OnCompleted. Điều này là do chúng tôi hy vọng rằng (1) người thực hiện quan sát biết cách xử lý những trường hợp ngoại lệ đó và chúng tôi không thể làm gì hợp lý với họ và (2) nếu một ngoại lệ xảy ra thì chúng tôi muốn bong bóng ra ngoài và không được xử lý bởi Rx .

Giải pháp hiện tại của bạn được IObservable để xử lý các lỗi do IObserver ném, điều này không có ý nghĩa về mặt ngữ nghĩa mà IObservable không có kiến ​​thức về những thứ quan sát nó. Hãy xem xét ví dụ sau:

var errorFreeSource = new Subject<int>(); 
var sourceWithExceptionToError = errorFreeSource.ExceptionToError(); 
var observerThatThrows = Observer.Create<int>(x => 
    { 
     if (x % 5 == 0) 
      throw new Exception(); 
    }, 
    ex => Console.WriteLine("There's an argument that this should be called"), 
() => Console.WriteLine("OnCompleted")); 
var observerThatWorks = Observer.Create<int>(
    x => Console.WriteLine("All good"), 
    ex => Console.WriteLine("But definitely not this"), 
    () => Console.WriteLine("OnCompleted")); 
sourceWithExceptionToError.Subscribe(observerThatThrows); 
sourceWithExceptionToError.Subscribe(observerThatWorks); 
errorFreeSource.OnNext(1); 
errorFreeSource.OnNext(2); 
errorFreeSource.OnNext(3); 
errorFreeSource.OnNext(4); 
errorFreeSource.OnNext(5); 
Console.ReadLine(); 

Ở đây không có vấn đề với nguồn, hoặc observerThatWorks, nhưng onerror của nó sẽ được gọi là do một lỗi liên quan với Observer khác. Để ngăn chặn các ngoại lệ trong một luồng khác từ kết thúc quá trình, bạn sẽ phải bắt chúng trong chuỗi đó, vì vậy hãy đặt một khối try/catch trong quan sát của bạn.