2013-02-18 55 views
29

Tôi đang sử dụng vườn thú để lấy dữ liệu từ kafka. Và ở đây tôi luôn lấy dữ liệu từ điểm bù cuối cùng. Có cách nào để xác định thời gian bù đắp để lấy dữ liệu cũ không?Làm cách nào để lấy dữ liệu từ điểm bù cũ ở Kafka?

Có một tùy chọn autooffset.reset. Nó chấp nhận nhỏ nhất hoặc lớn nhất. Ai đó có thể giải thích những gì nhỏ nhất và lớn nhất. Autooffset.reset có thể giúp lấy dữ liệu từ điểm bù cũ thay vì điểm bù mới nhất không?

Trả lời

20

Người tiêu dùng luôn thuộc nhóm và đối với mỗi phân vùng, Người quản lý theo dõi tiến độ của nhóm người tiêu dùng đó trong phân vùng.

Để lấy từ đầu, bạn có thể xóa tất cả các dữ liệu liên quan đến sự tiến bộ như Hussain refered

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}"); 

Bạn cũng có thể chỉ định bù đắp của phân vùng mà bạn muốn, theo quy định tại lõi/src/main/scala /kafka/tools/UpdateOffsetsInZK.scala

ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString) 

Tuy nhiên, chênh lệch không được lập chỉ mục thời gian, nhưng bạn biết cho mỗi phân vùng là một chuỗi.

Nếu thư của bạn chứa dấu thời gian (và hãy cẩn thận rằng dấu thời gian này không liên quan đến thời điểm Kafka nhận được thư của bạn), bạn có thể thử làm một người lập chỉ mục cố gắng truy xuất một mục nhập theo các bước bằng cách tăng độ lệch bằng N và lưu trữ tuple (chủ đề X, phần 2, offset 100, dấu thời gian) ở đâu đó.

Khi bạn muốn truy xuất các mục từ một thời điểm cụ thể, bạn có thể áp dụng tìm kiếm nhị phân cho chỉ mục thô cho đến khi tìm thấy mục nhập bạn muốn và tìm nạp từ đó.

3

Tham khảo tài liệu về cấu hình kafka: http://kafka.apache.org/08/configuration.html cho truy vấn của bạn về thông số bù trừ nhỏ nhất và lớn nhất.

BTW, Trong khi khám phá kafka, tôi đã tự hỏi làm thế nào để phát lại tất cả các tin nhắn cho người tiêu dùng. Tôi có nghĩa là nếu một nhóm người tiêu dùng đã thăm dò ý kiến ​​tất cả các tin nhắn và nó muốn lấy lại những thông điệp đó.

Cách có thể đạt được là xóa dữ liệu khỏi sở thú. Sử dụng lớp kafka.utils.ZkUtils để xóa một nút trên zookeeper. Dưới đây là cách sử dụng của nó:

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}"); 
7

Từ Kafka documentation họ nói "kafka.api.OffsetRequest.EarliestTime() tìm thấy sự khởi đầu của dữ liệu trong các bản ghi và bắt đầu trực tiếp từ đó, kafka.api.OffsetRequest.LatestTime() sẽ chỉ phát tin nhắn mới Đừng cho rằng offset 0 là giá trị bắt đầu bù trừ, vì các thông điệp đã hết tuổi trong nhật ký theo thời gian."

Sử dụng SimpleConsumerExample đây: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

câu hỏi tương tự: Kafka High Level Consumer Fetch All Messages From Topic Using Java API (Equivalent to --from-beginning)

này có thể giúp

+0

Họ cũng có mẫu mã để tham khảo. đáng xem xét – Hild

+0

Ví dụ mà Hild đề cập đến là: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example Bạn không thể sử dụng ví dụ 'Người tiêu dùng', bạn phải sử dụng Ví dụ 'SimpleConsumerDemo' để chơi với các offset. – pherris

1

Kafka Nghị định thư Doc là một nguồn tuyệt vời để chơi với request/response/Hiệu số/Tin nhắn: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol bạn sử dụng ví dụ Đơn giản về người tiêu dùng như mã sau đây minh họa trạng thái:

FetchRequest req = new FetchRequestBuilder() 

     .clientId(clientName) 

     .addFetch(a_topic, a_partition, readOffset, 100000) 

     .build(); 

FetchResponse fetchResponse = simpleConsumer.fetch(req); 

đặt readOffset để bắt đầu bù đắp ban đầu từ. nhưng bạn cần phải kiểm tra bù đắp tối đa cũng như ở trên sẽ cung cấp bù đắp giới hạn tính theo FetchSize trong param cuối cùng của phương thức addFetch.

+0

Làm kiểm tra Api mới được cung cấp trong phiên bản 0.9.0.0 của Kafka họ đã đi một bước lên bằng cách kết hợp kết hợp người tiêu dùng đơn giản và cao cấp. – usman

1

Đối Bây giờ

Kafka FAQ đưa ra một câu trả lời cho vấn đề này.

Làm cách nào để nhận được chính xác các thông báo cho một dấu thời gian nhất định bằng cách sử dụng OffsetRequest?

Kafka cho phép truy vấn tập hợp các thông điệp theo thời gian và nó thực hiện tại phân đoạn chi tiết. Tham số dấu thời gian là dấu thời gian unix và truy vấn bù đắp bằng dấu thời gian trả về độ lệch có thể mới nhất của thông điệp được nối thêm không muộn hơn dấu thời gian đã cho. Có 2 giá trị đặc biệt của dấu thời gian - mới nhất và sớm nhất. Đối với bất kỳ giá trị nào khác của dấu thời gian unix, Kafka sẽ nhận được độ lệch bắt đầu của đoạn nhật ký được tạo không muộn hơn dấu thời gian đã cho. Do điều này và do yêu cầu bù đắp chỉ được phân phối ở mức độ chi tiết của phân khúc, yêu cầu tìm nạp bù sẽ trả lại kết quả ít chính xác hơn cho các kích thước phân đoạn lớn hơn.

Để có kết quả chính xác hơn, bạn có thể định cấu hình kích thước phân khúc nhật ký dựa trên thời gian (log.roll.ms) thay vì kích thước (log.segment.bytes). Tuy nhiên, bạn nên cẩn thận vì làm như vậy có thể tăng số lượng trình xử lý tệp do phân khúc nhật ký thường xuyên lăn.


Kế hoạch tương lai

Kafka sẽ thêm dấu thời gian để định dạng tin nhắn. Tham khảo

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata

0

bạn đã thử điều này chưa?

bin/kafka-console-consumer.sh --bootstrap-server localhost: 9092 kiểm tra --topic --from-bắt đầu

Nó sẽ in ra tất cả các thông điệp cho các chủ đề nhất định, "test" trong ví dụ này.

Chi tiết khác từ liên kết này https://kafka.apache.org/quickstart