2012-07-04 17 views
7

Tôi cần tạo ra các luồng tiêu dùng N, xử lý cùng một InputStream đồng thời, ví dụ: - biến đổi nó bằng cách nào đó, tính toán tổng kiểm tra hoặc chữ ký số vv Những người tiêu dùng này không phụ thuộc vào nhau và tất cả chúng đang sử dụng thư viện của bên thứ ba chấp nhận InputStream làm nguồn dữ liệu.Xử lý đồng thời một InputStream đơn lẻ với người tiêu dùng độc lập

Vì vậy, những gì tôi có thể làm là - tạo ra một số thực hiện của InputStream, mà sẽ

  • đọc đoạn dữ liệu từ "cha mẹ" dòng
  • người tiêu dùng bỏ cấm
  • chờ đợi cho đến khi mỗi người tiêu dùng đọc toàn bộ đoạn
  • đọc đoạn tiếp theo

trong khi trông đơn giản, nó có thể làm tăng các vấn đề khác nhau như livelo ck khi một số người tiêu dùng chết, thực hiện tất cả các phương pháp InputStream, kiểm soát ngã ba/tham gia của người tiêu dùng bằng cách sử dụng các rào cản/chốt, v.v.

Một người bạn nói với tôi rằng đó là nửa giờ để thực hiện.

Tôi muốn sử dụng một cái gì đó đủ trưởng thành (googling không đi kèm với kết quả như vậy, google-fu của tôi không đủ tốt?) Hoặc không bận tâm và sao chép toàn bộ nguồn "" vào một tệp tạm thời và sử dụng nó làm nguồn dữ liệu. Các giải pháp sau này có vẻ đáng tin cậy hơn, nhưng có thể kết thúc trong việc tạo ra các tập tin gigabyte (khi xử lý streaming âm thanh ví dụ).

+0

Bạn có thể ghi dữ liệu vào tệp và tạo N FileInputStream không? –

+0

@JonLin Khi anh ấy nói về phía cuối câu hỏi, anh ấy có thể. –

Trả lời

3

Cách tôi xem, bạn nên có ít nhất một số loại đệm để người tiêu dùng khác nhau có thể di chuyển qua luồng với tốc độ khác nhau mà không bị mọi người liên tục bị chậm lại. Điều đó về cơ bản đảm bảo hiệu suất trường hợp xấu nhất và rất ít lợi ích của đồng thời.

Bạn có thể nói, gắn thẻ từng đoạn với người tiêu dùng đã sử dụng nó cho đến nay và sau đó xóa những người đã sử dụng hết. Có lẽ điều này có thể đạt được bởi mỗi người tiêu dùng giữ một tham chiếu đến từng đoạn nó chưa được sử dụng, mà sẽ cho phép GC tự động chăm sóc các khối đã sử dụng. Nhà sản xuất có thể giữ một danh sách WeakReference s để các khối do đó, nó có một xử lý về số lượng các khối chưa được sử dụng và cơ sở điều chỉnh của nó trên đó.

Tôi cũng đang nghĩ về việc có một phiên bản riêng biệt InputStream cho mỗi luồng, liên lạc nội bộ với nhà sản xuất InputStream. Bằng cách này, bạn có một giải pháp dễ dàng cho mối nguy hiểm cho livelock của bạn: try ... finally { is.close(); } - người tiêu dùng sắp chết sẽ đóng luồng đầu vào của chính nó. Điều này được truyền đạt tới nhà sản xuất.

Tôi có một số ý tưởng với việc sử dụng một ArrayBlockingQueue cho mỗi người tiêu dùng. Sẽ có một số khó khăn trong việc đảm bảo rằng tất cả người tiêu dùng được cho ăn đúng cách, mà không làm cho nhà sản xuất hoặc chặn hoặc bận rộn chờ đợi.

+0

Tôi sẽ không nói rằng nó là rất ít lợi ích - có 5 người tiêu dùng làm việc cho 1 secons và một người tiêu dùng làm việc trong 2 giây, yêu cầu đồng thời sẽ cho 2 giây trong khi tuần tự sẽ cho 7 giây. Hay tôi đang thiếu thứ gì đó ở đây? Với việc có các khối và bộ đệm được gắn thẻ, tôi sẽ đạt mức tiêu thụ bộ nhớ mà tôi muốn tránh. – jdevelop

+0

Vâng, những gì bạn nói là không thể tránh khỏi. Tuy nhiên, nếu bạn có người tiêu dùng cân bằng trung bình, nhưng hiệu suất của họ thay đổi rất nhiều, bạn sẽ mất cơ hội cho sự đồng bộ nếu bạn luôn chờ đợi mỗi người tiêu dùng hiện đang tụt lại phía sau. Buffering sẽ giúp bạn. Và nếu bạn giới thiệu cân bằng ưu tiên luồng, bạn thực sự có thể đạt được một tình huống như vậy. –

0

Bạn đã cân nhắc sử dụng các luồng ống? Nhà sản xuất của bạn có thể có một hoặc nhiều PipedOuputStream, trong đó nó ném bất cứ điều gì nó đọc từ tập tin. Ở phía bên kia của đường ống, bạn có các chủ đề tiêu dùng khác nhau đọc trên PipedInputstream tương ứng (đây là một InputStream mà bạn có thể chia sẻ với thư viện của mình).

Chủ đề sản xuất của bạn có thể quyết định thông qua dữ liệu đường ống nào, bằng cách này, cung cấp dữ liệu được xử lý cho một chuỗi người tiêu dùng đã đọc ở phía bên kia của đường ống.

Nếu bạn cần lấy lại dữ liệu từ chuỗi tiêu thụ, bạn có thể tạo một đường ống khác theo hướng ngược lại để gửi dữ liệu về cho bạn.

+1

Một 'PipedOutputStream' sẽ chặn nhà sản xuất ngay sau khi bất kỳ người tiêu dùng nào bị tụt lại phía sau, bỏ đói tất cả người tiêu dùng khác. –

0

Bạn có thể thử một số dịch vụ Java Messaging Service (JMS) như Apache ActiveMQ.

Trong trường hợp của bạn, bạn cần tạo một cái gọi là Chủ đề (xem Topics vs. Queues). Một chủ đề được tạo bởi nhà sản xuất và được xuất bản cho người tiêu dùng N, có thể chạy đồng thời, với mỗi người tiêu dùng nhận được chính xác cùng một dữ liệu.

Vì bạn muốn sử dụng InputStream s, có một chương về cách send messages are streams.

Tôi cho rằng, thông thường, nhà sản xuất và người tiêu dùng sẽ là các quy trình riêng biệt, có thể chạy trên các máy khác nhau trên mạng. Tôi nghĩ rằng bạn có thể cấu hình nó để chạy hoàn toàn trong một JVM duy nhất, mặc dù. Điều này sẽ phụ thuộc vào việc thực hiện JMS. Đây cũng là khá nổi tiếng: HornetQ by JBoss, RabbitMQ, và một bó toàn bộ những người khác.