2012-04-27 49 views
7

Vì vậy, các Play2.0 Enumeratee page thấy một ví dụ của việc sử dụng một phương pháp &> hoặc through để thay đổi một Enumerator[String] thành một Enumerator[Int]:Làm thế nào để viết một enumeratee đến đoạn một Enumerator dọc theo ranh giới khác nhau

val toInt: Enumeratee[String,Int] = Enumeratee.map[String]{ s => s.toInt } 
val ints: Enumerator[Int] = strings &> toInt 

Ngoài ra còn có một Enumeratee.grouped liệt kê để tạo ra một điều tra các khối từ các phần tử riêng lẻ. Điều đó dường như làm việc tốt.

Nhưng những gì tôi thấy là đầu vào thông thường sẽ ở dạng Array[Byte] (được trả về bởi Enumerator.fromFileEnumerator.fromStream). Với ý nghĩ đó, tôi muốn lấy những đầu vào Array[Byte] và biến chúng thành một Enumerator[String], ví dụ trong đó mỗi chuỗi là một dòng (được kết thúc bởi một '\n'). Ranh giới cho các đường và các phần tử Array[Byte] thường không khớp. Làm thế nào để tôi viết một điều tra viên có thể chuyển đổi các mảng chunked thành chuỗi chunked?

Mục đích là để chunk những dòng này trở lại trình duyệt vì mỗi Array[Byte] trở nên khả dụng và giữ các byte còn thừa không phải là một phần của dòng hoàn chỉnh cho đến khi đoạn nhập tiếp theo xuất hiện.

Lý tưởng nhất là tôi muốn có phương thức được cấp iter: Iteratee[Array[Byte], T]Enumerator[Array[Byte]] sẽ trả lại cho tôi Enumerator[T], trong đó các phần tử T của tôi được phân tích cú pháp theo iter.

Thông tin bổ sung: Tôi đã có một chút thời gian để dọn sạch mã của tôi và đây là một ví dụ cụ thể về những gì tôi đang cố gắng làm. Tôi có iteratees sau đó phát hiện các dòng tiếp theo:

import play.api.libs.iteratee._ 
type AB = Array[Byte] 

def takeWhile(pred: Byte => Boolean): Iteratee[AB, AB] = { 
    def step(e: Input[AB], acc: AB): Iteratee[AB, AB] = e match { 
    case Input.EOF => Done(acc, Input.EOF) 
    case Input.Empty => Cont(step(_, acc)) 
    case Input.El(arr) => 
     val (taking, rest) = arr.span(pred) 
     if (rest.length > 0) Done(acC++ taking, Input.El(rest)) 
     else Cont(step(_, acC++ taking)) 
    } 
    Cont(step(_, Array())) 
} 

val line = for { 
    bytes <- takeWhile(b => !(b == '\n' || b == '\r')) 
    _  <- takeWhile(b => b == '\n' || b == '\r') 
} yield bytes 

Và những gì tôi muốn làm là một cái gì đó như thế:

Ok.stream(Enumerator.fromFile(filename) &> chunkBy(line)).as("text/plain") 

Trả lời

5

https://github.com/playframework/Play20/commit/f979006a7e2c1c08ca56ee0bae67b5463ee099c1#L3R131 Có điều gì đó tương tự với những gì bạn đang làm. Tôi cố định nhóm để chăm sóc các đầu vào còn lại. Mã này về cơ bản trông giống như:

val upToNewLine = 
    Traversable.splitOnceAt[String,Char](_ != '\n') &>> 
    Iteratee.consume() 

Enumeratee.grouped(upToNewLine) 

Ngoài ra tôi phải sửa lặp lại trong cùng một cách

+0

mát. Có vẻ như 'nhóm lại' nên đã làm những gì tôi muốn. – huynhjl

2

Dưới đây là những gì tôi có sau vài giờ thử nghiệm. Tôi hy vọng rằng ai đó có thể đến với một thực hiện thanh lịch hơn, như tôi hầu như không thể theo tôi.

def chunkBy(chunker: Iteratee[AB, AB]) = new Enumeratee[AB, AB] { 
    def applyOn[A](inner: Iteratee[AB, A]): Iteratee[AB, Iteratee[AB, A]] = { 
    def step(e: Input[AB], in: Iteratee[AB, A], leftover: Input[AB]): 
      Iteratee[AB, Iteratee[AB, A]] = { 
     e match { 
     case Input.EOF => 
      // if we have a leftover and it's a chunk, then output it 
      leftover match { 
      case Input.EOF | Input.Empty => Done(in, leftover) 
      case Input.El(_) => 
       val lastChunk = Iteratee.flatten(Enumerator.enumInput(leftover) 
       >>> Enumerator.eof |>> chunker) 
       lastChunk.pureFlatFold(
       done = { (chunk, rest) => 
        val nextIn = Iteratee.flatten(Enumerator(chunk) |>> in) 
        nextIn.pureFlatFold(
        done = (a, e2) => Done(nextIn, e2), 
        // nothing more will come 
        cont = k => Done(nextIn, Input.EOF), 
        error = (msg, e2) => Error(msg, e2)) 
       }, 
       // not enough content to get a chunk, so drop content 
       cont = k => Done(in, Input.EOF), 
       error = (msg, e2) => Error(msg, e2)) 
      } 
     case Input.Empty => Cont(step(_, in, leftover)) 
     case Input.El(arr) => 
      // feed through chunker 
      val iChunks = Iteratee.flatten(
      Enumerator.enumInput(leftover) 
       >>> Enumerator(arr) 
       >>> Enumerator.eof // to extract the leftover 
       |>> repeat(chunker)) 
      iChunks.pureFlatFold(
      done = { (chunks, rest) => 
       // we have our chunks, feed them to the inner iteratee 
       val nextIn = Iteratee.flatten(Enumerator(chunks: _*) |>> in) 
       nextIn.pureFlatFold(
       done = (a, e2) => Done(nextIn, e2), 
       // inner iteratee needs more data 
       cont = k => Cont(step(_: Input[AB], nextIn, 
        // we have to ignore the EOF we fed to repeat 
        if (rest == Input.EOF) Input.Empty else rest)), 
       error = (msg, e2) => Error(msg, e2)) 
      }, 
      // not enough content to get a chunk, continue 
      cont = k => Cont(step(_: Input[AB], in, leftover)), 
      error = (msg, e2) => Error(msg, e2)) 
     } 
    } 
    Cont(step(_, inner, Input.Empty)) 
    } 
} 

Dưới đây là định nghĩa để tùy chỉnh của tôi repeat:

// withhold the last chunk so that it may be concatenated with the next one 
def repeat(chunker: Iteratee[AB, AB]) = { 
    def loop(e: Input[AB], ch: Iteratee[AB, AB], acc: Vector[AB], 
     leftover: Input[AB]): Iteratee[AB, Vector[AB]] = e match { 
    case Input.EOF => ch.pureFlatFold(
     done = (a, e) => Done(acc, leftover), 
     cont = k => k(Input.EOF).pureFlatFold(
     done = (a, e) => Done(acc, Input.El(a)), 
     cont = k => sys.error("divergent iter"), 
     error = (msg, e) => Error(msg, e)), 
     error = (msg, e) => Error(msg, e)) 
    case Input.Empty => Cont(loop(_, ch, acc, leftover)) 
    case Input.El(_) => 
     val i = Iteratee.flatten(Enumerator.enumInput(leftover) 
      >>> Enumerator.enumInput(e) |>> ch) 
     i.pureFlatFold(
     done = (a, e) => loop(e, chunker, acc :+ a, Input.Empty), 
     cont = k => Cont(loop(_, i, acc, Input.Empty)), 
     error = (msg, e) => Error(msg, e)) 
    } 
    Cont(loop(_: Input[AB], chunker, Vector(), Input.Empty)) 
} 

này hoạt động trên một vài mẫu trong đó có một này:

val source = Enumerator(
    "bippy".getBytes, 
    "foo\n\rbar\n\r\n\rbaz\nb".getBytes, 
    "azam\ntoto\n\n".getBytes) 
Ok.stream(source 
    &> chunkBy(line) 
    &> Enumeratee.map(l => l ++ ".\n".getBytes) 
).as("text/plain") 

nào in:

bippyfoo. 
bar. 
baz. 
bazam. 
toto.