2013-06-27 35 views
5

Tôi đang sử dụng KafkaSpout. Vui lòng tìm chương trình thử nghiệm dưới đây.Tích hợp Kafka Storm bằng Kafka Spout

Tôi đang sử dụng Storm 0.8.1. Lớp Multischeme có trong Storm 0.8.2. Tôi sẽ sử dụng nó. Tôi chỉ muốn biết các phiên bản trước đó hoạt động như thế nào bằng cách khởi tạo lớp StringScheme()? Tôi có thể tải xuống phiên bản Kafka Spout trước đây ở đâu? Nhưng tôi nghi ngờ đó sẽ là một thay thế chính xác hơn để làm việc trên Storm 0.8.2. ??? (Confused)

Khi tôi chạy mã (được đưa ra dưới đây) trên cụm bão (tức là khi tôi đẩy cấu trúc liên kết), tôi nhận được lỗi sau (Điều này xảy ra khi phần Đề án được nhận xét khác tất nhiên tôi sẽ gặp lỗi trình biên dịch lớp học không có trong 0.8.1):

java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme 
     at storm.kafka.TestTopology.main(TestTopology.java:37) 
Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme 

Trong mã dưới đây bạn có thể tìm thấy spoutConfig.scheme = new StringScheme(); một phần nhận xét. Tôi đã nhận được lỗi biên dịch nếu tôi không bình luận rằng dòng đó là nhưng tự nhiên như không có nhà thầu trong đó. Ngoài ra khi tôi khởi tạo MultiScheme tôi nhận được lỗi như tôi không có lớp đó trong 0.8.1.

public class TestTopology { 
    public static class PrinterBolt extends BaseBasicBolt { 
     public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     } 

     public void execute(Tuple tuple, BasicOutputCollector collector) { 
      System.out.println(tuple.toString()); 
     } 
    } 

    public static void main(String [] args) throws Exception { 
     List<HostPort> hosts = new ArrayList<HostPort>(); 
     hosts.add(new HostPort("127.0.0.1",9092)); 
     LocalCluster cluster = new LocalCluster(); 
     TopologyBuilder builder = new TopologyBuilder(); 
     SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID"); 
     spoutConfig.zkServers=ImmutableList.of("localhost"); 
     spoutConfig.zkPort=2181; 
     //spoutConfig.scheme=new StringScheme(); 
     spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
     builder.setSpout("spout",new KafkaSpout(spoutConfig)); 
     builder.setBolt("printer", new PrinterBolt()) 
       .shuffleGrouping("spout"); 
     Config config = new Config(); 

     cluster.submitTopology("kafka-test", config, builder.createTopology()); 

     Thread.sleep(600000); 
    } 
+0

Tôi đoán Tôi không hiểu vấn đề: nó chỉ hoạt động nếu bạn đi đến 0.8.2? Nếu có, tại sao thậm chí cố gắng chạy trong 0.8.1: 0.8.2 thay thế nó bằng một số sửa lỗi và cải tiến khác. –

Trả lời

8

Tôi cũng gặp vấn đề tương tự. Cuối cùng giải quyết nó, và tôi đưa ví dụ chạy hoàn chỉnh lên trên github.

Bạn được chào đón để kiểm tra xem nó ra ở đây> https://github.com/buildlackey/cep

(bấm vào thư mục bão + Kafka cho một chương trình mẫu mà nên giúp bạn có được và chạy).

+8

Cân nhắc thêm một hoặc hai câu vào câu trả lời của bạn để mô tả những gì bạn đã làm để câu trả lời của bạn có liên quan mà không dựa vào kho Git đang hoạt động. – neontapir

+0

Chắc chắn: dự án chứa các bài kiểm tra đơn vị và các chương trình mẫu minh họa cách phát triển các ứng dụng xử lý sự kiện phức tạp (CEP) trên Storm, Kafka và Esper. –

+0

nghe có vẻ tốt với tôi –

5

Chúng tôi gặp sự cố tương tự.

giải pháp của chúng tôi:

  1. mở pom.xml

  2. Thay đổi phạm vi từ cung cấp cho <scope>compile</scope>

Nếu bạn muốn biết thêm về phạm vi phụ thuộc kiểm tra docu maven: Maven docu - dependency scopes