2010-03-15 12 views
13

Tôi có một kiểm soát dòng chảy không đồng bộ như sau:Làm cách nào để tách và gửi luồng điều khiển async bằng Continuations?

ActorA ! DoA(dataA, callback1, callbackOnErrorA) 

def callback1() = { 
    ... 
    ActorB ! DoB(dataB, callback2, callbackOnErrorB) 
} 

def callback2() = { 
    ActorC ! DoC(dataC, callback3, callbackOnErrorC) 
} 

... 

Làm thế nào tôi sẽ chia dòng chảy này ra thành nhiều phần (continuations) và liên tục cử các các diễn viên khác nhau (hoặc đề/nhiệm vụ) trong khi duy trì trạng thái tổng thể ?

Bất kỳ gợi ý nào được đánh giá cao, Cảm ơn

Trả lời

7

này rất đơn giản, nhưng cho thấy làm thế nào để chia tay một dòng điều khiển duy nhất trong số ba diễn viên, đi qua nhà nước cùng với nhau:

package blevins.example 

import scala.continuations._ 
import scala.continuations.ControlContext._ 
import scala.actors.Actor._ 
import scala.actors._ 

object App extends Application { 

    val actorA, actorB, actorC = actor { 
    receive { 
     case f: Function1[Unit,Unit] => { f() } 
    } 
    } 

    def handle(a: Actor) = shift { k: (Unit=>Unit) => 
    a ! k 
    } 

    // Control flow to split up 
    reset { 
     // this is not handled by any actor 
     var x = 1 
     println("a: " + x) 

     handle(actorA) // actorA handles the below 
     x += 4 
     println("b: " + x) 

     handle(actorB) // then, actorB handles the rest 
     var y = 2 
     x += 2 
     println("c: " + x) 

     handle(actorC) // and so on... 
     y += 1 
     println("d: " + x + ":" + y) 
    } 

} 
+0

Tuyệt vời, cảm ơn bạn! – hotzen

9

Tôi thích sử dụng scalaz.concurrent.Promise. Ví dụ này không giống như ví dụ trong câu hỏi của bạn, nhưng nó cho bạn ý tưởng.

object Async extends Application { 
    import scalaz._ 
    import Scalaz._ 
    import concurrent._ 
    import concurrent.strategy._ 
    import java.util.concurrent.{ExecutorService, Executors} 

    case class ResultA(resultb: ResultB, resulta: ResultC) 
    case class ResultB() 
    case class ResultC() 

    run 

    def run { 
    implicit val executor: ExecutorService = Executors.newFixedThreadPool(8) 
    import Executor.strategy 

    val promiseA = doA 
    println("waiting for results") 
    val a: ResultA = promiseA.get 
    println("got " + a) 
    executor.shutdown  
    } 

    def doA(implicit s: Strategy[Unit]): Promise[ResultA] = { 
    println("triggered A") 
    val b = doB 
    val c = doC 
    for {bb <- b; cc <- c} yield ResultA(bb, cc) 
    } 

    def doB(implicit s: Strategy[Unit]): Promise[ResultB] = { 
    println("triggered B") 
    promise { Thread.sleep(1000); println("returning B"); ResultB() } 
    } 

    def doC(implicit s: Strategy[Unit]): Promise[ResultC] = { 
    println("triggered C") 
    promise { Thread.sleep(1000); println("returning C"); ResultC() } 
    } 
} 

Output:

triggered A 
triggered B 
triggered C 
waiting for results 
returning B 
returning C 
got ResultA(ResultB(),ResultC()) 

Bạn sẽ tìm thấy một giới thiệu về Scalaz đồng thời trong presentation này từ Runar.

Cách tiếp cận này không linh hoạt như diễn viên, nhưng sáng tác tốt hơn và không thể bế tắc.

+0

Nice Scalaz Promise-Example, cảm ơn bạn. Tuy nhiên tôi muốn hiểu sâu hơn về điều Scala2.8-CPS mới và sẽ đánh giá cao câu trả lời cụ thể của CPS. – hotzen

+0

+1 để đề cập đến những lợi thế của tương lai trên diễn viên và việc sử dụng các vals ngầm để xác định chiến lược. – thSoft