2012-06-26 19 views
9

Tôi cần một hàm Java trả về kết quả của truy vấn SQL SELECT làm tham số InputStream cho một hệ thống khác gửi kết quả qua mạng.Kết quả SQL Java vào InputStream

Tuy nhiên, InputStream phải là String với dấu phân tách tùy chỉnh (ví dụ: thường, nhưng không phải lúc nào cũng là CSV).

Trong khi tôi có thể dễ dàng tạo một hàm để truy xuất kết quả, hãy tạo String được phân cách và cuối cùng chuyển đổi String thành InputStream, kết quả SQL thường quá lớn để xử lý trong bộ nhớ. Ngoài ra, việc xử lý toàn bộ tập kết quả trước khi trả lại kết quả sẽ phải chịu một thời gian chờ không mong muốn.

Làm cách nào tôi có thể trả lại InputStream để lặp qua kết quả SQL và gửi dữ liệu đã xử lý (được phân cách) khi dữ liệu được trả về từ cơ sở dữ liệu?

+0

bạn đã xem xét sử dụng tập hợp hàng jdbc được lưu trong bộ nhớ cache chưa? Điều đó có thể hữu ích cho những gì bạn đang cố gắng làm. http://docs.oracle.com/javase/1.5.0/docs/api/javax/sql/rowset/CachedRowSet.html – ChadNC

+0

Không, nhưng điều đó có thể giúp tôi như thế nào? Vấn đề không phải là rời khỏi kết nối mở, nhưng có kết quả trong bộ nhớ. –

+0

đó là những gì một hàng rào được lưu trữ. cung cấp một cách dễ dàng hơn để gửi kết quả truy vấn qua mạng tới các thiết bị, ứng dụng khác, v.v. – ChadNC

Trả lời

8

viết bài (không kiểm tra) đoạn mã, mà sẽ cho bạn ý tưởng cơ bản:

/** 
* Implementors of this interface should only convert current row to byte array and return it. 
* 
* @author yura 
*/ 
public interface RowToByteArrayConverter { 
    byte[] rowToByteArray(ResultSet resultSet); 
} 

public class ResultSetAsInputStream extends InputStream { 

    private final RowToByteArrayConverter converter; 
    private final PreparedStatement statement; 
    private final ResultSet resultSet; 

    private byte[] buffer; 
    private int position; 

    public ResultSetAsInputStream(final RowToByteArrayConverter converter, final Connection connection, final String sql, final Object... parameters) throws SQLException { 
     this.converter = converter; 
     statement = createStatement(connection, sql, parameters); 
     resultSet = statement.executeQuery(); 
    } 

    private static PreparedStatement createStatement(final Connection connection, final String sql, final Object[] parameters) { 
     // PreparedStatement should be created here from passed connection, sql and parameters 
     return null; 
    } 

    @Override 
    public int read() throws IOException { 
     try { 
      if(buffer == null) { 
       // first call of read method 
       if(!resultSet.next()) { 
        return -1; // no rows - empty input stream 
       } else { 
        buffer = converter.rowToByteArray(resultSet); 
        position = 0; 
        return buffer[position++] & (0xff); 
       } 
      } else { 
       // not first call of read method 
       if(position < buffer.length) { 
        // buffer already has some data in, which hasn't been read yet - returning it 
        return buffer[position++] & (0xff); 
       } else { 
        // all data from buffer was read - checking whether there is next row and re-filling buffer 
        if(!resultSet.next()) { 
         return -1; // the buffer was read to the end and there is no rows - end of input stream 
        } else { 
         // there is next row - converting it to byte array and re-filling buffer 
         buffer = converter.rowToByteArray(resultSet); 
         position = 0; 
         return buffer[position++] & (0xff); 
        } 
       } 
      } 
     } catch(final SQLException ex) { 
      throw new IOException(ex); 
     } 
    } 



    @Override 
    public void close() throws IOException { 
     try { 
      statement.close(); 
     } catch(final SQLException ex) { 
      throw new IOException(ex); 
     } 
    } 
} 

Đây là thực hiện rất thẳng về phía trước và nó có thể được cải thiện trong những cách sau đây:

  • đang sao chép giữa nếu và khác trong phương pháp đọc có thể được gỡ bỏ - nó đã được đăng chỉ để làm rõ
  • thay vì tái tạo bộ đệm mảng byte cho mỗi hàng (new byte[] là tốn kém opera tion), logic phức tạp hơn có thể được thực hiện để sử dụng bộ đệm mảng byte được khởi tạo chỉ một lần và sau đó được điền lại. Sau đó, bạn phải thay đổi chữ ký của phương thức RowToByteArrayConverter.rowToByteArray thành int fillByteArrayFromRow(ResultSet rs, byte[] array) để trả về số byte đã điền và điền vào mảng byte đã qua.

Vì mảng byte chứa ký byte nó có thể chứa -1 (mà thực sự là 255 byte như unsigned) và do đó cho biết kết thúc không chính xác của dòng, vì vậy & (0xff) được sử dụng để chuyển đổi byte ký hợp đồng với byte unsigned như các giá trị số nguyên. Để biết chi tiết, hãy tham khảo How does Java convert int into byte?. Cũng xin lưu ý rằng nếu tốc độ truyền mạng chậm, điều này có thể giữ bộ kết quả mở cho một thời gian dài, do đó gây ra các vấn đề cho cơ sở dữ liệu.

Hope this helps ...

2

tôi sẽ cải thiện câu trả lời gợi ý của @Yura, bằng cách giới thiệu như sau:
Sử dụng DataOutputStream được khởi tạo với một ByteArrayOutputStream để thuận tiện ghi dữ liệu vào mảng byte, bên trong triển khai RowToByteArrayConverter.
Trong thực tế, tôi sẽ đề nghị để có một hệ thống các bộ chuyển đổi, tất cả trong số họ mở rộng các lớp trừu tượng cùng (đây là một đoạn mã của ý tưởng của tôi - có thể không biên dịch từ lần đầu tiên)

public abstract class RowToByteArrayConverter { 
    public byte[] rowToByteArray(ResultSet resultSet) { 
     parseResultSet(dataOutputStream, resultSet); 
     return byteArrayOutputSteam.toByteArray(); 
    } 

    public RowToByteArrayConverter() { 
    dataOutputStream = new DataOutputStream(byteArrayOutputStream); 
    } 

    protected DataOutputStream dataOutputStream; 
    protected ByteArrayOutputStream byteArrayOutputStream; 

    protected abstract void parseResultSet(DataOutputStream dataOutputStresm, ResultSet rs); 
} 

Bây giờ, bạn có thể ghi đè lên lớp này bằng cách chỉ đơn giản là ghi đè phương thức parseResultSet, ví dụ:
- viết mã nhận dạng String một tên từ cột "tên" trong bản ghi. và thực hiện writeUTF8 trên DataOputputStream.

0

Các câu trả lời ở trên cung cấp giải pháp hữu ích cho vấn đề của trình tạo chuỗi kích thước có giới hạn bị vượt quá.Chúng cũng là bộ nhớ hiệu quả. Tuy nhiên, thử nghiệm của tôi cho thấy rằng họ là chậm hơn so với chỉ ghi dữ liệu vào một StringBuilder, và kêu gọi

ByteArrayInputStream mới (data.getBytes ("UTF-8"))

để có được một InputStream.

Những gì tôi tìm thấy là xa performant hơn là để phân loại dữ liệu đến bằng cách sử dụng một phân vùng chức năng và sau đó sử dụng nhiều chủ đề cho mỗi:

  1. truy vấn cơ sở dữ liệu nguồn cho một tập hợp con của dữ liệu
  2. Ghi dữ liệu vào mục tiêu

Điều này cũng tránh được vấn đề trong đó tổng dữ liệu có thể vượt quá kích thước tối đa của bộ đệm chuỗi.

Ví dụ tôi có bản ghi 6 triệu với cột được gọi là "RecordDate" trong bảng SQL Server. Giá trị trong Recorddate khác nhau giữa năm 2013 và 2016. Vì vậy, tôi cấu hình mỗi thread cho mỗi yêu cầu dữ liệu cho 2013,14,15,16 tương ứng. Sau đó, mỗi luồng ghi dữ liệu được chuyển mã vào một StringBuilder và mỗi tải khối lượng lớn đến đích bằng cách chuyển đổi sang một Inputstream bằng cách sử dụng getBytes() như trên.

Điều này dẫn đến tăng tốc 2x.

Tại sao? Bởi vì cơ sở dữ liệu nguồn và đích có thể xử lý nhiều yêu cầu đồng thời, và do đó khối lượng công việc tổng thể được trải rộng trên nhiều luồng trong cả ba quy trình: Cơ sở dữ liệu nguồn, bộ chuyển mã, cơ sở dữ liệu đích.