Tôi đang làm việc thông qua các quickstart Kafka:Kafka Quickstart: Tôi cần những phụ thuộc nào?
http://kafka.apache.org/07/quickstart.html
và cơ bản ví dụ Nhóm Tiêu Dùng:
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
Tôi đã được mã hóa lên người tiêu dùng và ConsumerThreadPool như trên:
import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerIterator;
public class Consumer implements Runnable {
private KafkaStream m_stream;
private Integer m_threadNumber;
public Consumer(KafkaStream a_stream, Integer a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext()) {
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
}
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}
Một vài khía cạnh khác: Tôi là người dùng ng mùa xuân để quản lý sở thú của tôi:
import javax.inject.Named;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan("com.truecar.inventory.worker.core")
public class AppConfig {
@Bean
@Named("consumerConfig")
private static ConsumerConfig createConsumerConfig() {
String zookeeperAddress = "127.0.0.1:2181";
String groupId = "inventory";
Properties props = new Properties();
props.put("zookeeper.connect", zookeeperAddress);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
}
Và tôi đang biên dịch với Maven và plugin OneJar Maven. Tuy nhiên, tôi biên dịch và sau đó chạy kết quả một bình tôi nhận được lỗi sau:
Aug 26, 2013 6:15:41 PM org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider registerDefaultFilters
INFO: JSR-330 'javax.inject.Named' annotation found and supported for component scanning
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: scala/ScalaObject
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:792)
at com.simontuffs.onejar.JarClassLoader.defineClass(JarClassLoader.java:803)
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:710)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2521)
at java.lang.Class.getDeclaredMethods(Class.java:1845)
at org.springframework.core.type.StandardAnnotationMetadata.getAnnotatedMethods(StandardAnnotationMetadata.java:180)
at org.springframework.context.annotation.ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClassParser.java:222)
at org.springframework.context.annotation.ConfigurationClassParser.processConfigurationClass(ConfigurationClassParser.java:165)
at org.springframework.context.annotation.ConfigurationClassParser.parse(ConfigurationClassParser.java:140)
at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:282)
at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:223)
at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:630)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:461)
at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73)
at com.truecar.inventory.worker.core.consumer.ConsumerThreadPool.<clinit>(ConsumerThreadPool.java:31)
at com.truecar.inventory.worker.core.application.Starter.main(Starter.java:20)
... 6 more
Caused by: java.lang.ClassNotFoundException: scala.ScalaObject
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:713)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 27 more
Bây giờ, tôi biết rất ít về Kafka, và không có gì về Scala. Làm thế nào để sửa lỗi này? Tôi nên thử gì tiếp theo? Đây có phải là một vấn đề được biết đến? Tôi có cần phụ thuộc khác không? Dưới đây là phiên bản Kafka trong pom.xml của tôi:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.0-beta1</version>
</dependency>
Cập nhật: Tôi đã liên lạc với mailing list Kafka dev, và họ cho tôi biết một số yêu cầu phiên bản cụ thể cho các phụ thuộc scala. Tuy nhiên, cũng có một phụ thuộc log4j không có giấy tờ, kết quả trong một thời gian chạy khác, không biên dịch thời gian, ngoại lệ.
Exception in thread "main" java.lang.reflect.InvocationTargetException
Caused by: java.lang.NoSuchMethodError: ch.qos.logback.classic.Logger.filterAndLog(Ljava/lang/String;Lorg/slf4j/Marker;Lch/qos/logback/classic/Level;Ljava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V
at org.apache.log4j.Category.log(Category.java:333)
at org.apache.commons.logging.impl.Log4JLogger.debug(Log4JLogger.java:177)
Một Cập nhật:
Tôi tìm thấy sự phụ thuộc log4j đúng:
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
Nhưng bây giờ tôi đang gặp một thậm chí khó hiểu hơn thời gian chạy ngoại lệ:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: org/I0Itec/zkclient/IZkStateListener
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
Tại đây tôi nhận được loại cảm giác WTF. Vì vậy, tôi đã thêm một sự phụ thuộc:
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
Nhưng tiếp xúc thêm một ngoại lệ runtime này:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge
at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:146)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:113)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
Tôi hy vọng sẽ có thể nhận được ví dụ bé này lên và chạy, nhưng có lẽ đây là mức giá để trả tiền để sử dụng các sản phẩm beta? Có lẽ tôi nên chuyển sang Apache Active MQ. Nhưng điều đó nghe có vẻ ít vui hơn. Tui bỏ lỡ điều gì vậy?
Awesome, cảm ơn bạn đã trả lời. Tôi muốn thử 0.7, nhưng trên Maven chỉ có 0.8 lọ có sẵn. Bạn sẽ đề xuất điều gì về truy cập có lập trình? –
@DavidWilliams chúng tôi sử dụng [kafka được xây dựng bởi twitter] (http://search.maven.org/#artifactdetails%7Ccom.twitter%7Ckafka_2.9.2%7C0.7.0%7Cjar) cho 0.7. Bạn có ý nghĩa gì khi truy cập * lập trình *? –
Ah, phải cụ thể hơn trong Java. Các hiện vật Maven duy nhất cho Kafka là 0,8 –