2013-06-14 28 views
12

Tôi muốn chạy một Task có "heartbeat" tiếp tục chạy tại một khoảng thời gian cụ thể cho đến khi tác vụ hoàn tất.Tạo một nhiệm vụ với nhịp tim

Tôi đang nghĩ đến một phương pháp mở rộng như thế này sẽ làm việc tốt:

public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken) 

Ví dụ:

public class Program { 
    public static void Main() { 
     var cancelTokenSource = new CancellationTokenSource(); 
     var cancelToken = cancelTokenSource.Token; 
     var longRunningTask = Task.Factory.StartNew(SomeLongRunningTask, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current); 
     var withHeartbeatTask = longRunningTask.WithHeartbeat(TimeSpan.FromSeconds(1), PerformHeartbeat, cancelToken); 
     withHeartbeatTask.Wait(); 
     Console.WriteLine("Long running task completed!"); 
     Console.ReadLine() 
    } 

    private static void SomeLongRunningTask() { 
     Console.WriteLine("Starting long task"); 
     Thread.Sleep(TimeSpan.FromSeconds(9.5)); 
    } 
    private static int _heartbeatCount = 0; 
    private static void PerformHeartbeat(CancellationToken cancellationToken) { 
     Console.WriteLine("Heartbeat {0}", ++_heartbeatCount); 
    } 
} 

Chương trình này nên đầu ra:

Starting long task 
Heartbeat 1 
Heartbeat 2 
Heartbeat 3 
Heartbeat 4 
Heartbeat 5 
Heartbeat 6 
Heartbeat 7 
Heartbeat 8 
Heartbeat 9 
Long running task completed! 

Lưu ý rằng nó không nên (trong trường hợp bình thường) đầu ra "Heartbeat 10" vì nhịp tim bắt đầu sau thời gian chờ ban đầu (tức là 1 giây). Tương tự, nếu tác vụ mất ít thời gian hơn khoảng nhịp tim, thì nhịp tim sẽ không xảy ra chút nào.

Cách tốt để thực hiện điều này là gì?

Thông tin cơ bản: Tôi có dịch vụ đang nghe hàng đợi Azure Service Bus. Tôi không muốn Complete thông báo (sẽ xóa vĩnh viễn thư đó khỏi hàng đợi) cho đến khi tôi xử lý xong, có thể mất nhiều thời gian hơn thông báo tối đa LockDuration trong 5 phút. Vì vậy, tôi cần sử dụng cách tiếp cận nhịp tim này để gọi RenewLockAsync trước khi thời hạn khóa hết hạn để thông báo không hết thời gian chờ trong khi xử lý kéo dài.

+0

này nghe có vẻ tương tự như báo cáo tiến bộ trong một nhiệm vụ async (chỉ là điều gây ra một báo cáo là một khoảng thời gian, và không có tiến bộ thực sự để báo cáo, ngoại trừ có thể đếm nhịp tim). Một trong các liên kết này có giúp ích không? http://blogs.msdn.com/b/dotnet/archive/2012/06/06/async-in-4-5-enabling-progress-and-cancellation-in-async-apis.aspx http: // stackoverflow .com/questions/15408148/c-sharp-async-await-progress-event-on-task-object –

+0

@TimS. Chúng tương tự nhưng không hoàn toàn là những gì tôi muốn, đặc biệt là với trường hợp không bao giờ báo cáo nếu nhiệm vụ hoàn thành một cách nhanh chóng. Ngoài ra, nhịp tim không biết tiến trình nói. Tuy nhiên, tôi rất sẵn lòng xem liệu bạn có thể triển khai phương pháp tiếp cận tiến trình để khớp với API tiện ích mở rộng của tôi và có cùng hiệu ứng thực với mã đơn giản hơn không. –

Trả lời

11

Đây là nỗ lực của tôi:

public static class TaskExtensions { 
    /// <summary> 
    /// Issues the <paramref name="heartbeatAction"/> once every <paramref name="heartbeatInterval"/> while <paramref name="primaryTask"/> is running. 
    /// </summary> 
    public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken) { 
     if (cancellationToken.IsCancellationRequested) { 
      return; 
     } 

     var stopHeartbeatSource = new CancellationTokenSource(); 
     cancellationToken.Register(stopHeartbeatSource.Cancel); 

     await Task.WhenAny(primaryTask, PerformHeartbeats(heartbeatInterval, heartbeatAction, stopHeartbeatSource.Token)); 
     stopHeartbeatSource.Cancel(); 
    } 

    private static async Task PerformHeartbeats(TimeSpan interval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken) { 
     while (!cancellationToken.IsCancellationRequested) { 
      try { 
       await Task.Delay(interval, cancellationToken); 
       if (!cancellationToken.IsCancellationRequested) { 
        heartbeatAction(cancellationToken); 
       } 
      } 
      catch (TaskCanceledException tce) { 
       if (tce.CancellationToken == cancellationToken) { 
        // Totally expected 
        break; 
       } 
       throw; 
      } 
     } 
    } 
} 

hoặc với một tinh chỉnh nhẹ, bạn thậm chí có thể làm cho nhịp tim async như trong:

/// <summary> 
    /// Awaits a fresh Task created by the <paramref name="heartbeatTaskFactory"/> once every <paramref name="heartbeatInterval"/> while <paramref name="primaryTask"/> is running. 
    /// </summary> 
    public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Func<CancellationToken, Task> heartbeatTaskFactory, CancellationToken cancellationToken) { 
     if (cancellationToken.IsCancellationRequested) { 
      return; 
     } 

     var stopHeartbeatSource = new CancellationTokenSource(); 
     cancellationToken.Register(stopHeartbeatSource.Cancel); 

     await Task.WhenAll(primaryTask, PerformHeartbeats(heartbeatInterval, heartbeatTaskFactory, stopHeartbeatSource.Token)); 

     if (!stopHeartbeatSource.IsCancellationRequested) { 
      stopHeartbeatSource.Cancel(); 
     } 
    } 

    public static Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Func<CancellationToken, Task> heartbeatTaskFactory) { 
     return WithHeartbeat(primaryTask, heartbeatInterval, heartbeatTaskFactory, CancellationToken.None); 
    } 

    private static async Task PerformHeartbeats(TimeSpan interval, Func<CancellationToken, Task> heartbeatTaskFactory, CancellationToken cancellationToken) { 
     while (!cancellationToken.IsCancellationRequested) { 
      try { 
       await Task.Delay(interval, cancellationToken); 
       if (!cancellationToken.IsCancellationRequested) { 
        await heartbeatTaskFactory(cancellationToken); 
       } 
      } 
      catch (TaskCanceledException tce) { 
       if (tce.CancellationToken == cancellationToken) { 
        // Totally expected 
        break; 
       } 
       throw; 
      } 
     } 
    } 

mà sẽ cho phép bạn thay đổi mẫu mã để một cái gì đó như này:

private static async Task PerformHeartbeat(CancellationToken cancellationToken) { 
    Console.WriteLine("Starting heartbeat {0}", ++_heartbeatCount); 
    await Task.Delay(1000, cancellationToken); 
    Console.WriteLine("Finishing heartbeat {0}", _heartbeatCount); 
} 

PerformHeartbeat có thể được thay thế bằng một cuộc gọi không đồng bộ như RenewLockAsync để bạn không phải lãng phí thời gian của luồng bằng cách sử dụng lệnh gọi chặn như RenewLock mà cách tiếp cận Hành động sẽ yêu cầu.

Tôi là answering my own question per SO guidelines, nhưng tôi cũng mở ra các cách tiếp cận trang nhã hơn cho vấn đề này.

+0

Xin chào, tôi đã nhận bài đăng này từ nhận xét của bạn về câu hỏi của tôi về sở thích tương tự. Trong trường hợp hàng đợi SB khi nào và chính xác thì bạn đang gia hạn Khóa? vai trò công nhân mà tôi có chỉ là một luồng đơn và việc nhận và xử lý thông điệp được chạy trong một vòng lặp while trong phương thức Run(). – Aravind

+0

@Aravind Khi tôi nhận được thông báo SB, tôi tạo một Task để xử lý nó. Nhiệm vụ sử dụng helper heartbeat này cho nhịp tim miễn là nó đang chạy. –

+0

Oh ok. Vì quá trình xử lý thư tôi sử dụng có thể không được sử dụng thường xuyên nên tôi không tạo các tác vụ để xử lý mọi thư. Tôi không tìm thấy việc sử dụng RenewLock trong mã mẫu đó là lý do tại sao được hỏi như vậy. – Aravind

0

Dưới đây là cách tiếp cận của tôi

using System; 
using System.Threading; 
using System.Threading.Tasks; 

namespace ConsoleApplication3 
{ 
class Program 
{ 
    static void Main(string[] args) 
    { 
     Console.WriteLine("Start Main"); 
     StartTest().Wait(); 
     Console.ReadLine(); 
     Console.WriteLine("Complete Main"); 
    } 

    static async Task StartTest() 
    { 
     var cts = new CancellationTokenSource(); 

     // ***Use ToArray to execute the query and start the download tasks. 
     Task<bool>[] tasks = new Task<bool>[2]; 
     tasks[0] = LongRunningTask("", 20, cts.Token); 
     tasks[1] = Heartbeat("", 1, cts.Token); 

     // ***Call WhenAny and then await the result. The task that finishes 
     // first is assigned to firstFinishedTask. 
     Task<bool> firstFinishedTask = await Task.WhenAny(tasks); 

     Console.WriteLine("first task Finished."); 
     // ***Cancel the rest of the downloads. You just want the first one. 
     cts.Cancel(); 

     // ***Await the first completed task and display the results. 
     // Run the program several times to demonstrate that different 
     // websites can finish first. 
     var isCompleted = await firstFinishedTask; 
     Console.WriteLine("isCompleted: {0}", isCompleted); 
    } 

    private static async Task<bool> LongRunningTask(string id, int sleep, CancellationToken ct) 
    { 
     Console.WriteLine("Starting long task"); 


     await Task.Delay(TimeSpan.FromSeconds(sleep)); 

     Console.WriteLine("Completed long task"); 
     return true; 
    } 

    private static async Task<bool> Heartbeat(string id, int sleep, CancellationToken ct) 
    { 
     while(!ct.IsCancellationRequested) 
     { 
      await Task.Delay(TimeSpan.FromSeconds(sleep)); 
      Console.WriteLine("Heartbeat Task Sleep: {0} Second", sleep); 
     } 

     return true; 
    } 

} 

}