Có cách nào thanh lịch, dễ dàng và nhanh chóng để di chuyển dữ liệu ra khỏi Hive vào MongoDB không?Cách hiệu quả nhất để di chuyển dữ liệu ra khỏi Hive và vào MongoDB là gì?
Trả lời
Bạn có thể thực hiện xuất bằng đầu nối Hadoop-MongoDB. Chỉ cần chạy truy vấn Hive trong phương thức chính của công việc của bạn. Đầu ra này sẽ được Mapper sử dụng để chèn dữ liệu vào MongoDB
.
Ví dụ:
Ở đây tôi đang chèn một dấu chấm phẩy tách ra tập tin văn bản (id; firstname; LastName) vào một MongoDB bộ sưu tập sử dụng một truy vấn Hive đơn giản:
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.mongodb.hadoop.MongoOutputFormat;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.util.MongoConfigUtil;
public class HiveToMongo extends Configured implements Tool {
private static class HiveToMongoMapper extends
Mapper<LongWritable, Text, IntWritable, BSONWritable> {
//See: https://issues.apache.org/jira/browse/HIVE-634
private static final String HIVE_EXPORT_DELIMETER = '\001' + "";
private IntWritable k = new IntWritable();
private BSONWritable v = null;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String [] split = value.toString().split(HIVE_EXPORT_DELIMETER);
k.set(Integer.parseInt(split[0]));
v = new BSONWritable();
v.put("firstname", split[1]);
v.put("lastname", split[2]);
context.write(k, v);
}
}
public static void main(String[] args) throws Exception {
try {
Class.forName("org.apache.hadoop.hive.jdbc.HiveDriver");
}
catch (ClassNotFoundException e) {
System.out.println("Unable to load Hive Driver");
System.exit(1);
}
try {
Connection con = DriverManager.getConnection(
"jdbc:hive://localhost:10000/default");
Statement stmt = con.createStatement();
String sql = "INSERT OVERWRITE DIRECTORY " +
"'hdfs://localhost:8020/user/hive/tmp' select * from users";
stmt.executeQuery(sql);
}
catch (SQLException e) {
System.exit(1);
}
int res = ToolRunner.run(new Configuration(), new HiveToMongo(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Path inputPath = new Path("/user/hive/tmp");
String mongoDbPath = "mongodb://127.0.0.1:6900/mongo_users.mycoll";
MongoConfigUtil.setOutputURI(conf, mongoDbPath);
/*
Add dependencies to distributed cache via
DistributedCache.addFileToClassPath(...) :
- mongo-hadoop-core-x.x.x.jar
- mongo-java-driver-x.x.x.jar
- hive-jdbc-x.x.x.jar
HadoopUtils is an own utility class
*/
HadoopUtils.addDependenciesToDistributedCache("/libs/mongodb", conf);
HadoopUtils.addDependenciesToDistributedCache("/libs/hive", conf);
Job job = new Job(conf, "HiveToMongo");
FileInputFormat.setInputPaths(job, inputPath);
job.setJarByClass(HiveToMongo.class);
job.setMapperClass(HiveToMongoMapper.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(MongoOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
job.submit();
System.out.println("Job submitted.");
return 0;
}
}
Một nhược điểm là một khu vực dàn dựng ' (/ người dùng/hive/tmp) là cần thiết để lưu trữ đầu ra trung gian Hive. Hơn nữa, theo như tôi biết, đầu nối Mongo-Hadoop không hỗ trợ upserts.
Tôi không khá chắc chắn nhưng bạn cũng có thể thử để lấy dữ liệu từ Hive
mà không cần chạy hiveserver
đó cho thấy một dịch vụ tiết kiệm để bạn có lẽ có thể tiết kiệm một số chi phí. Nhìn vào mã nguồn của phương thức org.apache.hadoop.hive.cli.CliDriver#processLine(String line, boolean allowInterupting)
của Hive thực sự thực hiện truy vấn. Sau đó, bạn có thể hack cùng một cái gì đó như thế này:
...
LogUtils.initHiveLog4j();
CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class));
ss.in = System.in;
ss.out = new PrintStream(System.out, true, "UTF-8");
ss.err = new PrintStream(System.err, true, "UTF-8");
SessionState.start(ss);
Driver qp = new Driver();
processLocalCmd("SELECT * from users", qp, ss); //taken from CliDriver
...
ghi chú Side:
Ngoài ra còn có một thực hiện hive-mongo kết nối bạn cũng có thể kiểm tra. Cũng cần xem xét việc triển khai trình kết nối Hive-HBase để có ý tưởng nào đó nếu bạn muốn triển khai một kết nối tương tự cho MongoDB
.
Bạn đã xem xét Sqoop? Nó được coi là rất đơn giản để di chuyển dữ liệu giữa các cơ sở dữ liệu Hadoop và SQL/NoSQL. This article cũng đưa ra một ví dụ về việc sử dụng nó với Hive.
Hãy nhìn vào các dự án hadoop-MongoDB
nối:
http://api.mongodb.org/hadoop/MongoDB%2BHadoop+Connector.html
"kết nối này có các hình thức cho phép cả dữ liệu MongoDB đọc vào Hadoop (để sử dụng trong công việc MapReduce cũng như các thành phần khác của Hadoop hệ sinh thái), cũng như viết kết quả của các công việc Hadoop lên MongoDB. "
không chắc chắn nếu nó sẽ làm việc cho trường hợp sử dụng của bạn nhưng nó đáng xem.
Sqoop chỉ hỗ trợ chuyển giữa hadoop và cơ sở dữ liệu quan hệ trong thời điểm này: http://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html#_supported_databases –
Vì @Jp Bond chỉ ra Sqoop có thể được sử dụng với JDBC các DBs tương thích, nhưng không có hỗ trợ JDBC cho MongoDB: https://jira.mongodb.org/browse/JAVA-539 –