2012-03-18 4 views
25

Tôi đang cố gắng để mô hình một truy vấn Rx đó không phải là tầm thường (với tôi):Tham gia Rx Streams

  • Trong một căn phòng có nam và nữ.
  • Họ vào và rời khỏi phòng, và trong khi trong phòng đôi khi họ thay đổi vị trí của họ.
  • Mỗi người đàn ông có thể nhìn vào một (hoặc không) người phụ nữ tại một thời điểm nhất định.
  • Mỗi người đều có các thuộc tính sau:

    class Man 
    { 
        public const int LookingAtNobody = 0; 
        public int Id { get; set; } 
        public double Location { get; set; } 
        public int LookingAt { get; set; } 
    } 
    
  • Mỗi người phụ nữ có các thuộc tính sau:

    class Woman 
    { 
        public int Id { get; set; } 
        public double Location { get; set; } 
    } 
    
  • Để đại diện cho Đàn ông chúng tôi có IObservable<IObservable<Man>>, và đại diện cho phụ nữ chúng tôi có IObservable<IObservable<Woman>> .

Làm cách nào bạn sử dụng Rx để tạo vectơ từ nam sang nữ mà họ đang xem: IObservable<IObservable<Tuple<double,double>>>?

Để giúp đỡ, đây là một vài đơn vị xét nghiệm cho một số trường hợp đơn giản:

public class Tests : ReactiveTest 
{ 
    [Test] 
    public void Puzzle1() 
    { 
     var scheduler = new TestScheduler(); 

     var m1 = scheduler.CreateHotObservable(
      OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }), 
      OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }), 
      OnCompleted<Man>(300)); 

     var w1 = scheduler.CreateHotObservable(
      OnNext(150, new Woman { Id = 10, Location = 10.0 }), 
      OnNext(250, new Woman { Id = 10, Location = 20.0 }), 
      OnCompleted<Woman>(350)); 

     var men = scheduler.CreateHotObservable(OnNext(50, m1)); 
     var women = scheduler.CreateHotObservable(OnNext(50, w1)); 

     var results = runQuery(scheduler, women, men); 

     var innerResults = (from msg in results 
          where msg.Value.HasValue 
          select msg.Value.Value).ToArray(); 
     var expectedVector1 = new[] 
         { 
          OnNext(200, Tuple.Create(2.0, 10.0)), 
          OnNext(250, Tuple.Create(2.0, 20.0)), 
          OnCompleted<Tuple<double,double>>(300), 
         }; 
     ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]); 
    } 
    [Test] 
    public void Puzzle2() 
    { 
     var scheduler = new TestScheduler(); 

     var m1 = scheduler.CreateHotObservable(
      OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }), 
      OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }), 
      OnCompleted<Man>(400)); 

     var w1 = scheduler.CreateHotObservable(
      OnNext(150, new Woman { Id = 10, Location = 10.0 }), 
      OnNext(250, new Woman { Id = 10, Location = 20.0 }), 
      OnCompleted<Woman>(350)); 

     var men = scheduler.CreateHotObservable(OnNext(50, m1)); 
     var women = scheduler.CreateHotObservable(OnNext(50, w1)); 

     var results = runQuery(scheduler, women, men); 

     var innerResults = (from msg in results 
          where msg.Value.HasValue 
          select msg.Value.Value).ToArray(); 
     var expectedVector1 = new[] 
         { 
          OnNext(200, Tuple.Create(2.0, 10.0)), 
          OnNext(250, Tuple.Create(2.0, 20.0)), 
          OnCompleted<Tuple<double,double>>(350), 
         }; 
     ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]); 
    } 
    [Test] 
    public void Puzzle3() 
    { 
     var scheduler = new TestScheduler(); 

     var m1 = scheduler.CreateHotObservable(
      OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }), 
      OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }), 
      OnNext(300, new Man { Id = 1, Location = 2.0, LookingAt = Man.LookingAtNobody }), 
      OnCompleted<Man>(400)); 

     var w1 = scheduler.CreateHotObservable(
      OnNext(150, new Woman { Id = 10, Location = 10.0 }), 
      OnNext(250, new Woman { Id = 10, Location = 20.0 }), 
      OnCompleted<Woman>(350)); 

     var men = scheduler.CreateHotObservable(OnNext(50, m1)); 
     var women = scheduler.CreateHotObservable(OnNext(50, w1)); 

     var results = runQuery(scheduler, women, men); 

     var innerResults = (from msg in results 
          where msg.Value.HasValue 
          select msg.Value.Value).ToArray(); 
     var expectedVector1 = new[] 
         { 
          OnNext(200, Tuple.Create(2.0, 10.0)), 
          OnNext(250, Tuple.Create(2.0, 20.0)), 
          OnCompleted<Tuple<double,double>>(300), 
         }; 
     ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]); 
    } 
    [Test] 
    public void Puzzle4() 
    { 
     var scheduler = new TestScheduler(); 

     var m1 = scheduler.CreateHotObservable(
      OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }), 
      OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }), 
      OnNext(300, new Man { Id = 1, Location = 3.0, LookingAt = 20 }), 
      OnNext(400, new Man { Id = 1, Location = 4.0, LookingAt = 20 }), 
      OnCompleted<Man>(500)); 

     var w1 = scheduler.CreateHotObservable(
      OnNext(150, new Woman { Id = 10, Location = 10.0 }), 
      OnNext(250, new Woman { Id = 10, Location = 20.0 }), 
      OnCompleted<Woman>(350)); 
     var w2 = scheduler.CreateHotObservable(
      OnNext(155, new Woman { Id = 20, Location = 100.0 }), 
      OnNext(255, new Woman { Id = 20, Location = 200.0 }), 
      OnNext(355, new Woman { Id = 20, Location = 300.0 }), 
      OnCompleted<Woman>(455)); 

     var men = scheduler.CreateHotObservable(OnNext(50, m1)); 
     var women = scheduler.CreateHotObservable(OnNext(50, w1), OnNext(50, w2)); 

     var results = runQuery(scheduler, women, men); 

     var innerResults = (from msg in results 
          where msg.Value.HasValue 
          select msg.Value.Value).ToArray(); 
     var expectedVector1 = new[] 
         { 
          OnNext(200, Tuple.Create(2.0, 10.0)), 
          OnNext(250, Tuple.Create(2.0, 20.0)), 
          OnCompleted<Tuple<double,double>>(300), 
         }; 
     var expectedVector2 = new[] 
         { 
          OnNext(300, Tuple.Create(3.0, 200.0)), 
          OnNext(355, Tuple.Create(3.0, 300.0)), 
          OnNext(400, Tuple.Create(4.0, 300.0)), 
          OnCompleted<Tuple<double,double>>(455), 
         }; 
     ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]); 
     ReactiveAssert.AreElementsEqual(expectedVector2, innerResults[1]); 
    } 

    private static IEnumerable<Recorded<Notification<IList<Recorded<Notification<Tuple<double, double>>>>>>> runQuery(TestScheduler scheduler, IObservable<IObservable<Woman>> women, IObservable<IObservable<Man>> men) 
    { 
     // assuming nested sequences are hot 
     var vectors = 
      from manDuration in men 
      join womanDuration in women on manDuration equals womanDuration 
      select from man in manDuration 
        join woman in womanDuration on manDuration equals womanDuration 
        where man.LookingAt == woman.Id 
        select Tuple.Create(man.Location, woman.Location); 

     var query = vectors.Select(vectorDuration => 
     { 
      var vectorResults = scheduler.CreateObserver<Tuple<double, double>>(); 
      vectorDuration.Subscribe(vectorResults); 
      return vectorResults.Messages; 
     }); 

     var results = scheduler.Start(() => query, 0, 0, 1000).Messages; 
     return results; 
    } 
} 

(lưu ý: Câu hỏi này đã được chéo được đưa lên các diễn đàn Rx: http://social.msdn.microsoft.com/Forums/en-US/rx/thread/e73ae4e2-68c3-459a-a5b6-ea957b205abe)

+4

DAT IEnumerable >>>>>> – Asti

+0

Bài đăng của bạn lên diễn đàn MSDN, và số lượng trò chuyện được tạo ra cho thấy rằng đây không phải là điều tốt câu hỏi cho trang web Hỏi & Đáp. –

+9

"Đừng băng qua suối" - Tiến sĩ Egon Spengler –

Trả lời

1

Nếu tôi hiểu bạn một cách chính xác , mục tiêu là tạo ra một quan sát “theo dõi các quan sát”, nơi một “người theo dõi được” bắt đầu khi một người đàn ông bắt đầu nhìn vào một người phụ nữ, và kết thúc khi người đàn ông dừng nhìn người phụ nữ. "Theo dõi được" nên bao gồm các bộ dữ liệu của các địa điểm gần đây nhất của người đàn ông và người phụ nữ. Ý tưởng ở đây là sử dụng CombineLatest, sẽ có hai quan sát và khi bất kỳ giá trị nào tạo ra giá trị, bộ kết hợp được đánh giá cho hai giá trị gần đây nhất của các quan sát, tạo ra giá trị trong quan sát kết hợp. Tuy nhiên, CombineLatest chỉ hoàn thành khi cả hai lần quan sát được hoàn thành. Trong trường hợp này, chúng tôi muốn hoàn thành việc quan sát được khi bất kỳ nguồn nào trong hai nguồn hoàn thành. Để làm như vậy, chúng ta định nghĩa phương pháp mở rộng sau (tôi không tin một phương pháp như vậy đã tồn tại, nhưng có thể có một giải pháp dễ dàng hơn):

public static IObservable<TSource> 
    UntilCompleted<TSource, TWhile>(this IObservable<TSource> source, 
             IObservable<TWhile> lifetime) 
{ 
    return Observable.Create<TSource>(observer => 
    { 
    var subscription = source.Subscribe(observer); 
    var limiter = lifetime.Subscribe(next => { },() => 
    { 
     subscription.Dispose(); 
     observer.OnCompleted(); 
    }); 
    return new CompositeDisposable(subscription, limiter); 
    }); 
} 

Phương pháp này cũng giống như TakeUntil, nhưng nó thay vì cho đến khi lifetime tạo ra một giá trị, phải mất đến khi lifetime hoàn tất. Chúng tôi cũng có thể định nghĩa một phương pháp mở rộng đơn giản mà mất streak đầu tiên thỏa mãn một vị:

public static IObservable<TSource> 
    Streak<TSource>(this IObservable<TSource> source, 
         Func<TSource, bool> predicate) 
{ 
    return source.SkipWhile(x => !predicate(x)).TakeWhile(predicate); 
} 

Bây giờ cho các truy vấn cuối cùng, chúng tôi kết hợp tất cả mọi người với tất cả phụ nữ sử dụng CombineLatest, và hoàn thành mà quan sát đầu sử dụng UntilCompleted. Để có được "theo dõi các quan sát", chúng tôi chọn các vệt nơi người đàn ông đang nhìn vào người phụ nữ. Sau đó, chúng tôi chỉ đơn giản là bản đồ đó đến một tuple các địa điểm.

var vectors = 
    from manDuration in men 
    from womanDuration in women 
    select manDuration 
    .CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w }) 
    .UntilCompleted(womanDuration) 
    .UntilCompleted(manDuration) 
    .Streak(pair => pair.Man.LookingAt == pair.Woman.Id) 
    .Select(pair => Tuple.Create(pair.Man.Location, pair.Woman.Location)); 

này vượt qua tất cả các thử nghiệm của bạn, nhưng nó không xử lý các kịch bản mà một người đàn ông nhìn người phụ nữ 10 trong một thời gian, sau đó ở mức 20 trong một thời gian, và sau đó ở mức 10 trong một thời gian nữa; chỉ sử dụng số đầu tiên được sử dụng.Để quan sát tất cả những vệt, chúng ta có thể sử dụng phương pháp mở rộng sau đây, mà trả về một thể quan sát được các vệt:

public static IObservable<IObservable<TSource>> 
    Streaks<TSource>(this IObservable<TSource> source, 
         Func<TSource, bool> predicate) 
{ 
    return Observable.Create<IObservable<TSource>>(observer => 
    { 
    ReplaySubject<TSource> subject = null; 
    bool previous = false; 
    return source.Subscribe(x => 
    { 
     bool current = predicate(x); 
     if (!previous && current) 
     { 
     subject = new ReplaySubject<TSource>(); 
     observer.OnNext(subject); 
     } 
     if (previous && !current) subject.OnCompleted(); 
     if (current) subject.OnNext(x); 
     previous = current; 
    },() => 
    { 
     if (subject != null) subject.OnCompleted(); 
     observer.OnCompleted(); 
    }); 
    }); 
} 

Bằng cách đăng ký một lần duy nhất vào dòng nguồn, và bằng cách sử dụng một ReplaySubject, phương pháp này làm việc cho nóng cũng như lạnh quan sát. Bây giờ cho các truy vấn cuối cùng, chúng tôi chọn tất cả các vệt như sau:

var vectors = 
    from manDuration in men 
    from womanDuration in women 
    from streak in manDuration 
    .CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w }) 
    .UntilCompleted(womanDuration) 
    .UntilCompleted(manDuration) 
    .Streaks(pair => pair.Man.LookingAt == pair.Woman.Id) 
    select streak.Select(pair => 
    Tuple.Create(pair.Man.Location, pair.Woman.Location)); 
0

Tôi không chắc là tôi hiểu lý do tại sao bạn đang mô hình hóa các dòng địa điểm của cả hai người đàn ông và người phụ nữ như một IObservable<IObservable<T>> thay vì chỉ một IObservable<T>, nhưng điều này có thể làm việc:

public static IObservable<Tuple<double, double>> GetLocationsObservable(IObservable<IObservable<Man>> menObservable, 
                      IObservable<IObservable<Woman>> womenObservable) 
{ 
    return Observable.CombineLatest(
     menObservable.Switch(), 
     womenObservable.Switch(), 
     (man, woman) => new {man, woman}) 
      .Where(manAndWoman => manAndWoman.man.LookingAt == manAndWoman.woman.Id) 
      .Select(manAndWoman => Tuple.Create(manAndWoman.man.Location, manAndWoman.woman.Location)); 
} 

các công tắc cơ bản "chuyển đổi" vào quan sát mới khi nó được đẩy, mà flattens suối. Vị trí và lựa chọn khá đơn giản.

Tôi có một nghi ngờ lén lút Tôi hiểu nhầm điều gì đó về các yêu cầu, nhưng tôi nghĩ tôi sẽ gửi câu trả lời của tôi chỉ trong trường hợp nó giúp.