Tôi đang cố gắng để viết một topo nào sau đây:Grouping trong một topo bão kết hợp đơn giản
- Một vòi mà đặt mua một thức ăn twitter (dựa trên một từ khóa)
- Một tia tập hợp đó tổng hợp một số tweet (nói N) trong một bộ sưu tập và gửi cho chúng bolt
- Một bu-lông đơn giản in bộ sưu tập vào bảng điều khiển cùng một lúc.
Thực tế tôi muốn thực hiện thêm một số thao tác trên bộ sưu tập.
Tôi đã thử nghiệm tại địa phương và có vẻ như nó hoạt động. Tuy nhiên, tôi không chắc chắn nếu tôi đã đặt các nhóm trên bu lông một cách chính xác và nếu điều này sẽ làm việc một cách chính xác khi triển khai trên một cụm bão thực tế. Tôi sẽ đánh giá cao nếu ai đó có thể giúp xem xét cấu trúc liên kết này và đề xuất bất kỳ lỗi, thay đổi hoặc cải tiến nào.
Cảm ơn.
Đây là cấu trúc liên kết của tôi.
builder.setSpout("spout", new TwitterFilterSpout("pittsburgh"));
builder.setBolt("sampleaggregate", new SampleAggregatorBolt())
.shuffleGrouping("spout");
builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");
Aggregation bolt
public class SampleAggregatorBolt implements IRichBolt {
protected OutputCollector collector;
protected Tuple currentTuple;
protected Logger log;
/**
* Holds the messages in the bolt till you are ready to send them out
*/
protected List<Status> statusCache;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
log = Logger.getLogger(getClass().getName());
statusCache = new ArrayList<Status>();
}
@Override
public void execute(Tuple tuple) {
currentTuple = tuple;
Status currentStatus = null;
try {
currentStatus = (Status) tuple.getValue(0);
} catch (ClassCastException e) {
}
if (currentStatus != null) {
//add it to the status cache
statusCache.add(currentStatus);
collector.ack(tuple);
//check the size of the status cache and pass it to the next stage if you have enough messages to emit
if (statusCache.size() > 10) {
collector.emit(new Values(statusCache));
}
}
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tweets"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
protected void setupNonSerializableAttributes() {
}
}
Printer bolt
public class PrinterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.size() + " " + tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
}
}
Tôi đã cung cấp mã cho bu lông tổng hợp ở trên (xem phương thức thực thi). Bây giờ nó đang chờ đợi cho đến khi nó đã tích lũy N (10 trong ví dụ trên) tin nhắn và chia chúng ra ngay sau khi nó có 10 tin nhắn. BTW Tôi chỉ tìm thấy một lỗi mà tôi sẽ sửa chữa. Tôi cần phải xóa bộ nhớ cache khi tôi phát ra các giá trị. Vì vậy, những thay đổi cần phải được cần thiết nếu tôi cần phải sử dụng nhiều hơn một aggregator. –