2013-06-04 26 views
10

Tôi đang cố gắng để viết một topo nào sau đây:Grouping trong một topo bão kết hợp đơn giản

  1. Một vòi mà đặt mua một thức ăn twitter (dựa trên một từ khóa)
  2. Một tia tập hợp đó tổng hợp một số tweet (nói N) trong một bộ sưu tập và gửi cho chúng bolt
  3. Một bu-lông đơn giản in bộ sưu tập vào bảng điều khiển cùng một lúc.

Thực tế tôi muốn thực hiện thêm một số thao tác trên bộ sưu tập.

Tôi đã thử nghiệm tại địa phương và có vẻ như nó hoạt động. Tuy nhiên, tôi không chắc chắn nếu tôi đã đặt các nhóm trên bu lông một cách chính xác và nếu điều này sẽ làm việc một cách chính xác khi triển khai trên một cụm bão thực tế. Tôi sẽ đánh giá cao nếu ai đó có thể giúp xem xét cấu trúc liên kết này và đề xuất bất kỳ lỗi, thay đổi hoặc cải tiến nào.

Cảm ơn.

Đây là cấu trúc liên kết của tôi.

builder.setSpout("spout", new TwitterFilterSpout("pittsburgh")); 
    builder.setBolt("sampleaggregate", new SampleAggregatorBolt()) 
       .shuffleGrouping("spout"); 
    builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate"); 

Aggregation bolt

public class SampleAggregatorBolt implements IRichBolt { 

    protected OutputCollector collector; 
    protected Tuple currentTuple; 
    protected Logger log; 
    /** 
    * Holds the messages in the bolt till you are ready to send them out 
    */ 
    protected List<Status> statusCache; 

    @Override 
    public void prepare(Map stormConf, TopologyContext context, 
         OutputCollector collector) { 
     this.collector = collector; 

     log = Logger.getLogger(getClass().getName()); 
     statusCache = new ArrayList<Status>(); 
    } 

    @Override 
    public void execute(Tuple tuple) { 
     currentTuple = tuple; 

     Status currentStatus = null; 
     try { 
      currentStatus = (Status) tuple.getValue(0); 
     } catch (ClassCastException e) { 
     } 
     if (currentStatus != null) { 

      //add it to the status cache 
      statusCache.add(currentStatus); 
      collector.ack(tuple); 


      //check the size of the status cache and pass it to the next stage if you have enough messages to emit 
      if (statusCache.size() > 10) { 
       collector.emit(new Values(statusCache)); 
      } 

     } 
    } 

    @Override 
    public void cleanup() { 


    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("tweets")); 

    } 

    @Override 
    public Map<String, Object> getComponentConfiguration() { 
     return null; //To change body of implemented methods use File | Settings | File Templates. 
    } 


    protected void setupNonSerializableAttributes() { 

    } 

} 

Printer bolt

public class PrinterBolt extends BaseBasicBolt { 

    @Override 
    public void execute(Tuple tuple, BasicOutputCollector collector) { 
     System.out.println(tuple.size() + " " + tuple); 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer ofd) { 
    } 

} 

Trả lời

4

Từ những gì tôi có thể nhìn thấy nó có vẻ tốt. Con quỷ trong chi tiết, mặc dù. Tôi không chắc chắn những gì bolt tổng hợp của bạn không nhưng nếu nó làm cho bất kỳ giả định về các giá trị được truyền cho nó thì bạn nên xem xét một nhóm lĩnh vực thích hợp. Điều này có thể không tạo ra sự khác biệt lớn như bạn đang sử dụng gợi ý song song mặc định là 1, nhưng bạn nên quyết định chia tỷ lệ với nhiều trường hợp bolt tổng hợp các giả định logic tiềm ẩn mà bạn thực hiện có thể gọi cho nhóm không phân nhóm.

+0

Tôi đã cung cấp mã cho bu lông tổng hợp ở trên (xem phương thức thực thi). Bây giờ nó đang chờ đợi cho đến khi nó đã tích lũy N (10 trong ví dụ trên) tin nhắn và chia chúng ra ngay sau khi nó có 10 tin nhắn. BTW Tôi chỉ tìm thấy một lỗi mà tôi sẽ sửa chữa. Tôi cần phải xóa bộ nhớ cache khi tôi phát ra các giá trị. Vì vậy, những thay đổi cần phải được cần thiết nếu tôi cần phải sử dụng nhiều hơn một aggregator. –

0

Xin chào ngay khi bạn đang cố đăng ký nhiều hơn một từ khóa, bạn sẽ gặp sự cố. Tôi đề nghị vòi của bạn cũng phát ra từ khóa gốc đã được sử dụng để lọc.

Sau đó, thay vì làm shuffleGrouping tôi sẽ làm một fieldsGrouping

builder.setBolt("sampleaggregate", new SampleAggregatorBolt()) 
      .shuffleGrouping("spout", new Fields("keyword")); 

Bằng cách này bạn chắc chắn rằng kết quả của một từ khóa duy nhất kết thúc trên bolt cùng mỗi lần. Như vậy bạn có thể tính toán tổng hợp một cách chính xác. Nếu bạn bỏ qua các fieldGrouping Storm có thể khởi tạo bất kỳ số lượng bolt tổng hợp của bạn và gửi bất kỳ tin nhắn từ vòi cho bất kỳ trường hợp của bu lông tổng hợp mà sẽ trong trường hợp kết quả sai kết quả.