2013-07-06 21 views
6

Vì vậy, dữ liệu đầu vào của tôi có hai lĩnh vực/cột: id1 & id2, và mã của tôi là như sau:Scalding: Làm thế nào để giữ lại trường khác, sau một groupBy ('field) {. Size}?

TextLine(args("input")) 
.read 
.mapTo('line->('id1,'id2)) {line: String => 
    val fields = line.split("\t") 
     (fields(0),fields(1)) 
} 
.groupBy('id2){.size} 
.write(Tsv(args("output"))) 

Các kết quả đầu ra trong (những gì tôi giả định) hai lĩnh vực: id2 * kích thước. Tôi là một chút khó khăn về việc tìm ra nếu nó có thể giữ lại giá trị id1 cũng được nhóm lại với id2 và thêm nó như là một lĩnh vực khác?

Trả lời

8

Bạn không thể làm điều này theo cách tốt đẹp mà tôi sợ. Hãy suy nghĩ về cách nó hoạt động dưới mui xe - nó chia tách dữ liệu được tính thành các khối và gửi nó đi đến các quy trình khác nhau, mỗi quá trình đếm nó là đoạn, sau đó một bộ giảm tốc duy nhất thêm tất cả vào cuối. Trong khi mỗi quá trình đếm nó không biết toàn bộ kích thước để nó không thể thêm trường vào. Cách duy nhất là quay trở lại và thêm nó vào dữ liệu khi toàn bộ kích thước được biết (tức là tham gia).

Nếu mỗi nhóm phù hợp trong bộ nhớ (và bạn có thể cấu hình bộ nhớ), bạn có thể:

Tsv(args("input"), ('id1, 'id2)) 
.groupBy('id2)(_.size.toList[(String, String)](('id1, 'id2) -> 'list)) 
.flatMapTo[(Iterable[(String, String)], Int), (String, String, Int)](('list, 'size) -> ('id1, 'id2, 'size)) { 
    case (list, size) => list.map(record => (record._1, record._2, size)) 
} 
.write(Tsv(args("output"))) 

Nhưng nếu hệ thống của bạn không có đủ bộ nhớ, bạn sẽ phải sử dụng một đắt tham gia.

Lưu ý: Bạn có thể sử dụng Tsv thay vì TextLine, sau đó là mapTo và tách.

+0

Vui lòng xem liệu điều đó có hợp lý không, tôi cảm thấy cùng một nỗi đau. http://stackoverflow.com/questions/25994879/scalding-flatten-fields-after-groupby – Sergey