2013-02-12 15 views
5

Tôi mới đến apache flume.
Tôi đang cố gắng để xem làm thế nào tôi có thể nhận được một json (như http nguồn), phân tích nó và lưu nó vào một đường dẫn động trên hdfs theo nội dung.
Ví dụ:
nếu json là:Có thể apache flume hdfs sink chấp nhận đường dẫn động để viết?

[{ 
    "field1" : "value1", 
    "field2" : "value2" 
}] 

sau đó con đường HDFS sẽ là:
/some-mặc-root-path/value1/value2/some-trị-tên-file
Có cấu hình như vậy cho phép tôi làm điều đó không?

Dưới đây là cấu hình hiện tại của tôi (chấp nhận một json qua http, và lưu trữ nó trong một đường dẫn theo timestamp):

#flume.conf: http source, hdfs sink 

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource 
a1.sources.r1.port = 9000 
#a1.sources.r1.handler = org.apache.flume.http.JSONHandler 

# Describe the sink 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = /user/uri/events/%y-%m-%d/%H%M/%S 
a1.sinks.k1.hdfs.filePrefix = events- 
a1.sinks.k1.hdfs.round = true 
a1.sinks.k1.hdfs.roundValue = 10 
a1.sinks.k1.hdfs.roundUnit = minute 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

Cảm ơn!

Trả lời

8

Giải pháp là trong máng documentation for the hdfs sink:

Dưới đây là cấu hình sửa đổi:

#flume.conf: http source, hdfs sink 

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource 
a1.sources.r1.port = 9000 
#a1.sources.r1.handler = org.apache.flume.http.JSONHandler 

# Describe the sink 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = /user/uri/events/%{field1} 
a1.sinks.k1.hdfs.filePrefix = events- 
a1.sinks.k1.hdfs.round = true 
a1.sinks.k1.hdfs.roundValue = 10 
a1.sinks.k1.hdfs.roundUnit = minute 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

và curl:

curl -X POST -d '[{ "headers" : {   "timestamp" : "434324343", "host" :"random_host.example.com", "field1" : "val1"   }, "body" : "random_body" }]' localhost:9000 
+0

Tôi đang sử dụng một nguồn khác nhau (RabbitMQ) và tôi đang tự mình tải xuống một tải trọng JSON. Phương pháp bạn mô tả dường như không hoạt động trong trường hợp của tôi. Tôi giả sử, có điều gì đó sai ở cuối của tôi trừ khi bạn gặp phải các vấn đề tương tự –

+0

Cảm ơn, điều này đã làm việc cho tôi –