2012-06-14 11 views
8

Tôi đang nghĩ đến việc sử dụng HBase như một nguồn cho một trong những công việc MapReduce của tôi. Tôi biết rằng TableInputFormat chỉ định một phân tách đầu vào (và do đó một bản đồ) cho mỗi Vùng. Tuy nhiên, điều này có vẻ không hiệu quả. Tôi thực sự muốn có nhiều người lập bản đồ làm việc trên một Vùng nhất định cùng một lúc. Tôi có thể đạt được điều này bằng cách mở rộng TableInputFormatBase không? Bạn có thể vui lòng chỉ cho tôi một ví dụ không? Hơn nữa, đây có phải là một ý tưởng hay không?Khi sử dụng HBase làm nguồn cho MapReduce, tôi có thể mở rộng TableInputFormatBase để tạo nhiều phân tách và nhiều trình lập bản đồ cho từng vùng không?

Cảm ơn sự giúp đỡ.

Trả lời

1

Không chắc chắn nếu bạn có thể chỉ định nhiều người vẽ bản đồ cho một khu vực nhất định, nhưng hãy cân nhắc những điều sau đây:

Nếu bạn nghĩ rằng một ánh xạ là không hiệu quả cho mỗi khu vực (có thể các nút dữ liệu của bạn không có đủ nguồn lực như #cpus) , bạn có thể xác định kích thước vùng nhỏ hơn trong tệp hbase-site.xml.

đây là một trang web cho các tùy chọn configs mặc định nếu bạn muốn xem xét thay đổi điều đó: http://hbase.apache.org/configuration.html#hbase_default_configurations

xin lưu ý rằng bằng cách làm cho kích thước vùng nhỏ, bạn sẽ được tăng số lượng các tập tin trong DFS của bạn, và điều này có thể hạn chế dung lượng của DFS hadoop của bạn tùy thuộc vào bộ nhớ của nút tên của bạn. Hãy nhớ rằng việc sử dụng bộ nhớ của nút tên có liên quan trực tiếp đến số lượng tệp trong DFS của bạn. Điều này có thể hoặc có thể không liên quan đến tình huống của bạn vì tôi không biết cụm của bạn đang được sử dụng như thế nào. Không bao giờ có một câu trả lời viên đạn bạc cho những câu hỏi này!

-1

Điều này sẽ hữu ích nếu bạn muốn quét các khu vực lớn (hàng trăm triệu hàng) bằng quét có điều kiện chỉ tìm thấy một vài bản ghi. Điều này sẽ ngăn không cho ScannerTimeoutException

package org.apache.hadoop.hbase.mapreduce; 

import java.io.IOException; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.List; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.util.Bytes; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.JobContext; 

public class RegionSplitTableInputFormat extends TableInputFormat { 

    public static final String REGION_SPLIT = "region.split"; 

    @Override 
    public List<InputSplit> getSplits(JobContext context) throws IOException { 

     Configuration conf = context.getConfiguration(); 
     int regionSplitCount = conf.getInt(REGION_SPLIT, 0); 
     List<InputSplit> superSplits = super.getSplits(context); 
     if (regionSplitCount <= 0) { 
      return superSplits; 
     } 

     List<InputSplit> splits = new ArrayList<InputSplit>(superSplits.size() * regionSplitCount); 

     for (InputSplit inputSplit : superSplits) { 
      TableSplit tableSplit = (TableSplit) inputSplit; 
      System.out.println("splitting by " + regionSplitCount + " " + tableSplit); 
      byte[] startRow0 = tableSplit.getStartRow(); 
      byte[] endRow0 = tableSplit.getEndRow(); 
      boolean discardLastSplit = false; 
      if (endRow0.length == 0) { 
       endRow0 = new byte[startRow0.length]; 
       Arrays.fill(endRow0, (byte) 255); 
       discardLastSplit = true; 
      } 
      byte[][] split = Bytes.split(startRow0, endRow0, regionSplitCount); 
      if (discardLastSplit) { 
       split[split.length - 1] = new byte[0]; 
      } 
      for (int regionSplit = 0; regionSplit < split.length - 1; regionSplit++) { 
       byte[] startRow = split[regionSplit]; 
       byte[] endRow = split[regionSplit + 1]; 
       TableSplit newSplit = new TableSplit(tableSplit.getTableName(), startRow, endRow, 
         tableSplit.getLocations()[0]); 
       splits.add(newSplit); 
      } 
     } 

     return splits; 
    } 
} 
0

1. Nó hoàn toàn tốt đẹp chỉ cần đảm bảo rằng tập hợp khóa là loại trừ lẫn nhau giữa những người lập bản đồ.

  1. bạn arent tạo quá nhiều khách hàng vì điều này có thể dẫn đến rất nhiều gc, như trong HBase đọc HBase khối bộ nhớ cache khuấy động hơi xảy ra
0

Sử dụng MultipleScanTableInputFormat này, bạn có thể sử dụng Cấu hình MultipleScanTableInputFormat.PARTITIONS_PER_REGION_SERVER để kiểm soát số lượng người lập bản đồ nên thực thi đối với một máy chủ vùng. Lớp này sẽ nhóm tất cả các phân đoạn đầu vào chia theo vị trí của chúng (máy chủ vùng), và RecordReader sẽ lặp lại đúng cách thông qua tất cả các phân chia tổng hợp cho trình ánh xạ.

Dưới đây là ví dụ

https://gist.github.com/bbeaudreault/9788499#file-multiplescantableinputformat-java-L90

Đó làm việc bạn đã tạo ra nhiều chia rẽ tổng hợp cho một mapper đơn

private List<InputSplit> getAggregatedSplits(JobContext context) throws IOException { 
final List<InputSplit> aggregatedSplits = new ArrayList<InputSplit>(); 

final Scan scan = getScan(); 

for (int i = 0; i < startRows.size(); i++) { 
    scan.setStartRow(startRows.get(i)); 
    scan.setStopRow(stopRows.get(i)); 

    setScan(scan); 

    aggregatedSplits.addAll(super.getSplits(context)); 
} 

// set the state back to where it was.. 
scan.setStopRow(null); 
scan.setStartRow(null); 

setScan(scan); 

return aggregatedSplits; 
} 

Tạo phân vùng của máy chủ Region

@Override 
public List<InputSplit> getSplits(JobContext context) throws IOException { 
List<InputSplit> source = getAggregatedSplits(context); 

if (!partitionByRegionServer) { 
    return source; 
} 

// Partition by regionserver 
Multimap<String, TableSplit> partitioned = ArrayListMultimap.<String, TableSplit>create(); 
for (InputSplit split : source) { 
    TableSplit cast = (TableSplit) split; 
    String rs = cast.getRegionLocation(); 

    partitioned.put(rs, cast); 
}