2011-09-24 7 views
16

Điều này phần nào là ảnh chụp trong bóng tối trong trường hợp ai đó hiểu biết về việc triển khai Java của Apache Avro đang đọc.Trong Java, làm cách nào tôi có thể tạo một tệp tương đương với tệp vùng chứa Avro Apache mà không bị buộc phải sử dụng Tệp làm phương tiện?

Mục tiêu cấp cao của tôi là có một số cách để truyền tải một số dữ liệu avro qua mạng (ví dụ, chỉ cần nói HTTP, nhưng giao thức cụ thể không quan trọng cho mục đích này). Trong bối cảnh của tôi, tôi có một HttpServletResponse tôi cần phải viết dữ liệu này bằng cách nào đó.

ban đầu tôi cố gắng ghi dữ liệu như những gì lên tới một phiên bản ảo của một tập tin chứa Avro (giả sử rằng "phản ứng" là loại HttpServletResponse):

response.setContentType("application/octet-stream"); 
response.setHeader("Content-transfer-encoding", "binary"); 
ServletOutputStream outStream = response.getOutputStream(); 
BufferedOutputStream bos = new BufferedOutputStream(outStream); 

Schema someSchema = Schema.parse(".....some valid avro schema...."); 
GenericRecord someRecord = new GenericData.Record(someSchema); 
someRecord.put("somefield", someData); 
... 

GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema); 
DataFileWriter<GenericRecord> fileWriter = new DataFileWriter<GenericRecord>(datumWriter); 
fileWriter.create(someSchema, bos); 
fileWriter.append(someRecord); 
fileWriter.close(); 
bos.flush(); 

Đây là tất cả tiền phạt và dandy, ngoại trừ mà nó quay ra Avro không thực sự cung cấp một cách để đọc một tập tin chứa ngoài từ một tập tin thực tế: các DataFileReader chỉ có hai cấu trúc:

public DataFileReader(File file, DatumReader<D> reader); 

public DataFileReader(SeekableInput sin, DatumReader<D> reader); 

trong đó SeekableInput là một số biểu mẫu tùy chỉnh dành riêng cho từng người có sáng tạo cũng kết thúc đọc từ một tệp. Bây giờ cho rằng, trừ khi có một số cách để bằng cách nào đó ép buộc một InputStream vào một File (http://stackoverflow.com/questions/578305/create-a-java-file-object-or-equivalent-using-a-byte- array-in-memory-without-a gợi ý rằng không có, và tôi đã thử tìm kiếm xung quanh tài liệu Java), cách tiếp cận này sẽ không hoạt động nếu người đọc ở đầu kia của OutputStream nhận được tệp chứa avro đó (Tôi không chắc chắn lý do tại sao họ cho phép một trong những đầu ra tập tin container nhị phân avro để một OutputStream tùy ý mà không cung cấp một cách để đọc chúng từ InputStream tương ứng ở đầu bên kia, nhưng đó là bên cạnh điểm). Dường như việc triển khai trình đọc tệp chứa yêu cầu chức năng "có thể tìm kiếm" mà một Tệp cụ thể cung cấp.

Được rồi, do đó, có vẻ như cách tiếp cận đó sẽ không làm những gì tôi muốn. Làm thế nào về việc tạo ra một phản ứng JSON bắt chước tập tin container avro?

public static Schema WRAPPER_SCHEMA = Schema.parse(
    "{\"type\": \"record\", " + 
    "\"name\": \"AvroContainer\", " + 
    "\"doc\": \"a JSON avro container file\", " + 
    "\"namespace\": \"org.bar.foo\", " + 
    "\"fields\": [" + 
    "{\"name\": \"schema\", \"type\": \"string\", \"doc\": \"schema representing the included data\"}, " + 
    "{\"name\": \"data\", \"type\": \"bytes\", \"doc\": \"packet of data represented by the schema\"}]}" 
); 

Tôi không chắc đây có phải là cách tốt nhất để tiếp cận những hạn chế nêu trên không, nhưng có vẻ như điều này có thể làm được điều đó. Ví dụ, tôi sẽ đặt lược đồ (ví dụ "Schema someSchema" ở trên) dưới dạng một String bên trong trường "schema", và sau đó đặt vào biểu mẫu tuần tự hóa nhị phân của một bản ghi phù hợp với lược đồ đó (ví dụ: "GenericRecord someRecord ") bên trong trường" dữ liệu ". Tôi thực sự muốn biết về một chi tiết cụ thể được mô tả dưới đây, nhưng tôi nghĩ rằng nó sẽ là đáng giá để cung cấp cho một bối cảnh lớn hơn là tốt, để nếu có một cách tiếp cận cao cấp tốt hơn tôi có thể tham gia (phương pháp này hoạt động nhưng không cảm thấy tối ưu) xin vui lòng cho tôi biết.

Câu hỏi của tôi là, giả sử tôi đi với phương pháp dựa trên JSON này, làm cách nào để viết biểu diễn nhị phân avro của Bản ghi của tôi vào trường "dữ liệu" của lược đồ AvroContainer? Ví dụ, tôi đứng dậy để ở đây:

ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema); 
Encoder e = new BinaryEncoder(baos); 
datumWriter.write(resultsRecord, e); 
e.flush(); 

GenericRecord someRecord = new GenericData.Record(someSchema); 
someRecord.put("schema", someSchema.toString()); 
someRecord.put("data", ByteBuffer.wrap(baos.toByteArray())); 
datumWriter = new GenericDatumWriter<GenericRecord>(WRAPPER_SCHEMA); 
JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(baos, JsonEncoding.UTF8); 
e = new JsonEncoder(WRAPPER_SCHEMA, jsonGenerator); 
datumWriter.write(someRecord, e); 
e.flush(); 

PrintWriter printWriter = response.getWriter(); // recall that response is the HttpServletResponse 
response.setContentType("text/plain"); 
response.setCharacterEncoding("UTF-8"); 
printWriter.print(baos.toString("UTF-8")); 

ban đầu tôi cố gắng bỏ qua mệnh đề ByteBuffer.wrap, nhưng sau đó thì dòng

datumWriter.write(someRecord, e); 

ném một ngoại lệ mà tôi không thể đúc một mảng byte vào ByteBuffer.Khá công bằng, có vẻ như khi lớp Encoder (trong đó JsonEncoder là một lớp con) được gọi để viết một đối tượng Byte avro, nó đòi hỏi một ByteBuffer được đưa ra như một đối số. Vì vậy, tôi đã thử đóng gói byte [] bằng java.nio.ByteBuffer.wrap, nhưng khi dữ liệu được in ra, nó được in dưới dạng một chuỗi byte thẳng, mà không được chuyển qua đại diện hệ thập lục phân avro:

"data": {"bytes": ".....some gibberish other than the expected format...} 

Điều đó có vẻ không đúng. Theo tài liệu hướng dẫn avro, đối tượng byte ví dụ mà chúng đưa ra nói rằng tôi cần đưa vào một đối tượng json, một ví dụ trong đó trông giống như "\ u00FF", và những gì tôi đưa vào đó rõ ràng không phải là định dạng đó. Những gì tôi muốn biết là:

  • Ví dụ về định dạng byte avro là gì? Nó trông giống như "\ uDEADBEEFDEADBEEF ..."?
  • Làm cách nào để ép buộc dữ liệu nhị phân của tôi (như đầu ra của BinaryEncoder thành mảng byte []) thành định dạng mà tôi có thể gắn vào đối tượng GenericRecord và in chính xác trong JSON? Ví dụ, tôi muốn một DATA đối tượng mà tôi có thể gọi trên một số GenericRecord "someRecord.put (" dữ liệu ", DATA);" với dữ liệu tuần tự hóa của tôi ở bên trong?
  • Làm thế nào sau đó tôi sẽ đọc dữ liệu đó trở lại thành một mảng byte ở đầu kia (người tiêu dùng), khi nó được biểu diễn văn bản JSON và muốn tạo lại GenericRecord như được biểu diễn bằng định dạng AvroContainer JSON?
  • (nhắc lại câu hỏi từ trước) Có cách nào tốt hơn tôi có thể làm tất cả những điều này không?
+1

org.apache.avro.file.DataFileStream? – Chikei

+3

SeekableInput không chỉ là một số biểu mẫu tùy chỉnh cụ thể cho từng công ty mà việc tạo ra kết thúc đọc từ một tệp. Có [SeekableByteArrayInput] (http://avro.apache.org/docs/current/api/java/org/apache/avro/file/SeekableByteArrayInput.html) đọc từ một mảng byte trong bộ nhớ. –

+0

Rất tốt câu hỏi - và yêu cầu cần truy cập ngẫu nhiên là rất lạ, vì nó là không thể đáp ứng mà không có bộ đệm rất lớn. Và có vẻ như không cần thiết phải làm tốt ... Tôi không biết tại sao nó được cảm thấy truy cập ngẫu nhiên là cần thiết. Nhiều định dạng dữ liệu khác không thêm các yêu cầu như vậy để xử lý. – StaxMan

Trả lời

1

Như Knut nói, nếu bạn muốn sử dụng một cái gì đó khác hơn là một tập tin, bạn có thể:

  • sử dụng SeekableByteArrayInput, như Knut cho biết, đối với bất cứ điều gì bạn có thể giày-sừng vào một mảng byte
  • Thực hiện SeekablInput theo cách riêng của bạn - ví dụ nếu bạn đã nhận được nó từ một số cấu trúc cơ sở dữ liệu lạ.
  • Hoặc chỉ sử dụng một tệp. Tại sao không?

Đó là câu trả lời của bạn.

+0

Tuyệt vời, đó là chính xác những gì tôi cần. –

+4

Ngoài ra, sử dụng một tập tin làm tăng chi phí cho đĩa I/O vì vậy nếu bạn đang nhận được một mảng byte thông qua mạng bạn không muốn đặt nó trong một tập tin đầu tiên và sau đó đọc nó (đĩa I/O vòng chuyến đi! !!). –

0

Cách tôi giải quyết vấn đề này là gửi riêng các lược đồ từ dữ liệu. Tôi thiết lập một cái bắt tay kết nối truyền các lược đồ xuống từ máy chủ, sau đó tôi gửi dữ liệu được mã hóa qua lại. Bạn phải tạo đối tượng bao bọc bên ngoài như thế này:

{'name':'Wrapper','type':'record','fields':[ 
    {'name':'schemaName','type':'string'}, 
    {'name':'records','type':{'type':'array','items':'bytes'}} 
]} 

Lần đầu tiên bạn mã hóa mảng bản ghi, thành một mảng mảng byte được mã hóa. Mọi thứ trong một mảng phải có cùng một lược đồ. Sau đó, bạn mã hóa đối tượng trình bao bọc với lược đồ trên - đặt "schemaName" là tên của lược đồ mà bạn đã sử dụng để mã hóa mảng.

Trên máy chủ, trước tiên bạn sẽ giải mã đối tượng trình bao bọc. Khi bạn giải mã đối tượng bao bọc, bạn biết schemaName và bạn có một mảng các đối tượng mà bạn biết cách giải mã - sử dụng như bạn sẽ làm!

Lưu ý rằng bạn có thể thoát ra mà không sử dụng đối tượng bao bọc nếu bạn sử dụng giao thức như WebSockets và động cơ như Socket.IO (cho Node.js) Socket.io cung cấp cho bạn lớp giao tiếp dựa trên kênh giữa trình duyệt và máy chủ. Trong trường hợp đó, chỉ cần sử dụng một lược đồ cụ thể cho mỗi kênh, mã hóa từng tin nhắn trước khi bạn gửi nó. Bạn vẫn phải chia sẻ các lược đồ khi kết nối khởi tạo - nhưng nếu bạn đang sử dụng WebSockets thì việc này dễ thực hiện.Và khi bạn hoàn thành, bạn có một số lượng tùy ý các luồng hai chiều mạnh mẽ giữa máy khách và máy chủ.

+0

Trong khi không phải là một giải pháp tồi, nó thậm chí không đến gần để giải quyết câu hỏi đã nêu của OP. – rbellamy

0

Trong Java và Scala, chúng tôi đã thử sử dụng khởi tạo qua mã được tạo bằng cách sử dụng codegen Scala nitro. Khởi đầu là cách thư viện Javascript mtth/avsc giải quyết được điều này problem. Tuy nhiên, chúng tôi đã gặp phải một số vấn đề về tuần tự hóa bằng cách sử dụng thư viện Java, nơi có các byte sai được tiêm vào luồng byte, nhất quán - và chúng tôi có thể không phải tìm ra các byte đó đến từ đâu.

Tất nhiên điều đó có nghĩa là xây dựng triển khai của riêng chúng ta về Varint với mã hóa ZigZag. Meh.

Ở đây là:

package com.terradatum.query 

import java.io.ByteArrayOutputStream 
import java.nio.ByteBuffer 
import java.security.MessageDigest 
import java.util.UUID 

import akka.actor.ActorSystem 
import akka.stream.stage._ 
import akka.stream.{Attributes, FlowShape, Inlet, Outlet} 
import com.nitro.scalaAvro.runtime.GeneratedMessage 
import com.terradatum.diagnostics.AkkaLogging 
import org.apache.avro.Schema 
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord} 
import org.apache.avro.io.EncoderFactory 
import org.elasticsearch.search.SearchHit 

import scala.collection.mutable.ArrayBuffer 
import scala.reflect.ClassTag 

/* 
* The original implementation of this helper relied exclusively on using the Header Avro record and inception to create 
* the header. That didn't work for us because somehow erroneous bytes were injected into the output. 
* 
* Specifically: 
* 1. 0x08 prepended to the magic 
* 2. 0x0020 between the header and the sync marker 
* 
* Rather than continue to spend a large number of hours trying to troubleshoot why the Avro library was producing such 
* erroneous output, we build the Avro Container File using a combination of our own code and Avro library code. 
* 
* This means that Terradatum code is responsible for the Avro Container File header (including magic, file metadata and 
* sync marker) and building the blocks. We only use the Avro library code to build the binary encoding of the Avro 
* records. 
* 
* @see https://avro.apache.org/docs/1.8.1/spec.html#Object+Container+Files 
*/ 
object AvroContainerFileHelpers { 

    val magic: ByteBuffer = { 
    val magicBytes = "Obj".getBytes ++ Array[Byte](1.toByte) 
    val mg = ByteBuffer.allocate(magicBytes.length).put(magicBytes) 
    mg.position(0) 
    mg 
    } 

    def makeSyncMarker(): Array[Byte] = { 
    val digester = MessageDigest.getInstance("MD5") 
    digester.update(s"${UUID.randomUUID}@${System.currentTimeMillis()}".getBytes) 
    val marker = ByteBuffer.allocate(16).put(digester.digest()).compact() 
    marker.position(0) 
    marker.array() 
    } 

    /* 
    * Note that other implementations of avro container files, such as the javascript library 
    * mtth/avsc uses "inception" to encode the header, that is, a datum following a header 
    * schema should produce valid headers. We originally had attempted to do the same but for 
    * an unknown reason two bytes wore being inserted into our header, one at the very beginning 
    * of the header before the MAGIC marker, and one right before the syncmarker of the header. 
    * We were unable to determine why this wasn't working, and so this solution was used instead 
    * where the record/map is encoded per the avro spec manually without the use of "inception." 
    */ 
    def header(schema: Schema, syncMarker: Array[Byte]): Array[Byte] = { 
    def avroMap(map: Map[String, ByteBuffer]): Array[Byte] = { 
     val mapBytes = map.flatMap { 
     case (k, vBuff) => 
      val v = vBuff.array() 
      val byteStr = k.getBytes() 
      Varint.encodeLong(byteStr.length) ++ byteStr ++ Varint.encodeLong(v.length) ++ v 
     } 
     Varint.encodeLong(map.size.toLong) ++ mapBytes ++ Varint.encodeLong(0) 
    } 

    val schemaBytes = schema.toString.getBytes 
    val schemaBuffer = ByteBuffer.allocate(schemaBytes.length).put(schemaBytes) 
    schemaBuffer.position(0) 
    val metadata = Map("avro.schema" -> schemaBuffer) 
    magic.array() ++ avroMap(metadata) ++ syncMarker 
    } 

    def block(binaryRecords: Seq[Array[Byte]], syncMarker: Array[Byte]): Array[Byte] = { 
    val countBytes = Varint.encodeLong(binaryRecords.length.toLong) 
    val sizeBytes = Varint.encodeLong(binaryRecords.foldLeft(0)(_+_.length).toLong) 

    val buff: ArrayBuffer[Byte] = new scala.collection.mutable.ArrayBuffer[Byte]() 

    buff.append(countBytes:_*) 
    buff.append(sizeBytes:_*) 
    binaryRecords.foreach { rec => 
     buff.append(rec:_*) 
    } 
    buff.append(syncMarker:_*) 

    buff.toArray 
    } 

    def encodeBlock[T](schema: Schema, records: Seq[GenericRecord], syncMarker: Array[Byte]): Array[Byte] = { 
    //block(records.map(encodeRecord(schema, _)), syncMarker) 
    val writer = new GenericDatumWriter[GenericRecord](schema) 
    val out = new ByteArrayOutputStream() 
    val binaryEncoder = EncoderFactory.get().binaryEncoder(out, null) 
    records.foreach(record => writer.write(record, binaryEncoder)) 
    binaryEncoder.flush() 
    val flattenedRecords = out.toByteArray 
    out.close() 

    val buff: ArrayBuffer[Byte] = new scala.collection.mutable.ArrayBuffer[Byte]() 

    val countBytes = Varint.encodeLong(records.length.toLong) 
    val sizeBytes = Varint.encodeLong(flattenedRecords.length.toLong) 

    buff.append(countBytes:_*) 
    buff.append(sizeBytes:_*) 
    buff.append(flattenedRecords:_*) 
    buff.append(syncMarker:_*) 

    buff.toArray 
    } 

    def encodeRecord[R <: GeneratedMessage with com.nitro.scalaAvro.runtime.Message[R]: ClassTag](
     entity: R 
): Array[Byte] = 
    encodeRecord(entity.companion.schema, entity.toMutable) 

    def encodeRecord(schema: Schema, record: GenericRecord): Array[Byte] = { 
    val writer = new GenericDatumWriter[GenericRecord](schema) 
    val out = new ByteArrayOutputStream() 
    val binaryEncoder = EncoderFactory.get().binaryEncoder(out, null) 
    writer.write(record, binaryEncoder) 
    binaryEncoder.flush() 
    val bytes = out.toByteArray 
    out.close() 
    bytes 
    } 
} 

/** 
    * Encoding of integers with variable-length encoding. 
    * 
    * The avro specification uses a variable length encoding for integers and longs. 
    * If the most significant bit in a integer or long byte is 0 then it knows that no 
    * more bytes are needed, if the most significant bit is 1 then it knows that at least one 
    * more byte is needed. In signed ints and longs the most significant bit is traditionally 
    * used to represent the sign of the integer or long, but for us it's used to encode whether 
    * more bytes are needed. To get around this limitation we zig-zag through whole numbers such that 
    * negatives are odd numbers and positives are even numbers: 
    * 
    * i.e. -1, -2, -3 would be encoded as 1, 3, 5, and so on 
    * while 1, 2, 3 would be encoded as 2, 4, 6, and so on. 
    * 
    * More information is available in the avro specification here: 
    * @see http://lucene.apache.org/core/3_5_0/fileformats.html#VInt 
    *  https://developers.google.com/protocol-buffers/docs/encoding?csw=1#types 
    */ 
object Varint { 

    import scala.collection.mutable 

    def encodeLong(longVal: Long): Array[Byte] = { 
    val buff = new ArrayBuffer[Byte]() 
    Varint.zigZagSignedLong(longVal, buff) 
    buff.toArray[Byte] 
    } 

    def encodeInt(intVal: Int): Array[Byte] = { 
    val buff = new ArrayBuffer[Byte]() 
    Varint.zigZagSignedInt(intVal, buff) 
    buff.toArray[Byte] 
    } 

    def zigZagSignedLong[T <: mutable.Buffer[Byte]](x: Long, dest: T): Unit = { 
    // sign to even/odd mapping: http://code.google.com/apis/protocolbuffers/docs/encoding.html#types 
    writeUnsignedLong((x << 1)^(x >> 63), dest) 
    } 

    def writeUnsignedLong[T <: mutable.Buffer[Byte]](v: Long, dest: T): Unit = { 
    var x = v 
    while ((x & 0xFFFFFFFFFFFFFF80L) != 0L) { 
     dest += ((x & 0x7F) | 0x80).toByte 
     x >>>= 7 
    } 
    dest += (x & 0x7F).toByte 
    } 

    def zigZagSignedInt[T <: mutable.Buffer[Byte]](x: Int, dest: T): Unit = { 
    writeUnsignedInt((x << 1)^(x >> 31), dest) 
    } 

    def writeUnsignedInt[T <: mutable.Buffer[Byte]](v: Int, dest: T): Unit = { 
    var x = v 
    while ((x & 0xFFFFF80) != 0L) { 
     dest += ((x & 0x7F) | 0x80).toByte 
     x >>>= 7 
    } 
    dest += (x & 0x7F).toByte 
    } 
}