Tôi đang sử dụng KafkaSpout. Vui lòng tìm chương trình thử nghiệm dưới đây.Tích hợp Kafka Storm bằng Kafka Spout
Tôi đang sử dụng Storm 0.8.1. Lớp Multischeme có trong Storm 0.8.2. Tôi sẽ sử dụng nó. Tôi chỉ muốn biết các phiên bản trước đó hoạt động như thế nào bằng cách khởi tạo lớp StringScheme()? Tôi có thể tải xuống phiên bản Kafka Spout trước đây ở đâu? Nhưng tôi nghi ngờ đó sẽ là một thay thế chính xác hơn để làm việc trên Storm 0.8.2. ??? (Confused)
Khi tôi chạy mã (được đưa ra dưới đây) trên cụm bão (tức là khi tôi đẩy cấu trúc liên kết), tôi nhận được lỗi sau (Điều này xảy ra khi phần Đề án được nhận xét khác tất nhiên tôi sẽ gặp lỗi trình biên dịch lớp học không có trong 0.8.1):
java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme
at storm.kafka.TestTopology.main(TestTopology.java:37)
Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme
Trong mã dưới đây bạn có thể tìm thấy spoutConfig.scheme = new StringScheme(); một phần nhận xét. Tôi đã nhận được lỗi biên dịch nếu tôi không bình luận rằng dòng đó là nhưng tự nhiên như không có nhà thầu trong đó. Ngoài ra khi tôi khởi tạo MultiScheme tôi nhận được lỗi như tôi không có lớp đó trong 0.8.1.
public class TestTopology {
public static class PrinterBolt extends BaseBasicBolt {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.toString());
}
}
public static void main(String [] args) throws Exception {
List<HostPort> hosts = new ArrayList<HostPort>();
hosts.add(new HostPort("127.0.0.1",9092));
LocalCluster cluster = new LocalCluster();
TopologyBuilder builder = new TopologyBuilder();
SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID");
spoutConfig.zkServers=ImmutableList.of("localhost");
spoutConfig.zkPort=2181;
//spoutConfig.scheme=new StringScheme();
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
builder.setSpout("spout",new KafkaSpout(spoutConfig));
builder.setBolt("printer", new PrinterBolt())
.shuffleGrouping("spout");
Config config = new Config();
cluster.submitTopology("kafka-test", config, builder.createTopology());
Thread.sleep(600000);
}
Tôi đoán Tôi không hiểu vấn đề: nó chỉ hoạt động nếu bạn đi đến 0.8.2? Nếu có, tại sao thậm chí cố gắng chạy trong 0.8.1: 0.8.2 thay thế nó bằng một số sửa lỗi và cải tiến khác. –