Tôi nhận được NullPointerException
khi khởi chạy công việc MapReduce
. Nó được ném theo phương pháp getSerializer()
của SerializationFactory
. Tôi đang sử dụng lớp tùy chỉnh InputSplit
, InputFormat
, RecordReader
và MapReduce
.NullPointerException từ JobSplitWriter/SerializationFactory của Hadoop khi gọi getClass của InputSplit()
Tôi biết lỗi đang được ném một thời gian sau khi chia tách được tạo bởi lớp InputFormat
của tôi, nhưng trước khi tạo RecordReader
. Theo như tôi có thể nói, nó xảy ra trực tiếp sau khi "làm sạch khu vực dàn dựng".
Bằng cách kiểm tra nguồn Hadoop ở các địa điểm được chỉ báo bằng dấu vết ngăn xếp, có vẻ như lỗi xảy ra khi getSerialization()
nhận con trỏ rỗng Class<T>
. Của JobClient writeNewSplits()
cuộc gọi rằng phương pháp như thế này:
Serializer<T> serializer = factory.getSerializer((Class<T>) split.getClass());
Vì vậy, tôi cho rằng khi getClass()
đang được kêu gọi đối tượng tùy chỉnh InputSplit
của tôi, nó trả về một con trỏ null
, nhưng đó chỉ là khó hiểu. Bất kỳ ý tưởng?
Các vết đống đầy đủ từ các lỗi sau:
12/06/24 14:26:49 INFO mapred.JobClient: Cleaning up the staging area hdfs://localhost:54310/tmp/hadoop-s3cur3/mapred/staging/s3cur3/.staging/job_201206240915_0035 Exception in thread "main" java.lang.NullPointerException at org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73) at org.apache.hadoop.mapreduce.split.JobSplitWriter.writeNewSplits(JobSplitWriter.java:123) at org.apache.hadoop.mapreduce.split.JobSplitWriter.createSplitFiles(JobSplitWriter.java:74) at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:968) at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:979) at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:174) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:897) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850) at org.apache.hadoop.mapreduce.Job.submit(Job.java:500) at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:530) at edu.cs.illinois.cogcomp.hadoopinterface.infrastructure.CuratorJob.start(CuratorJob.java:94) at edu.cs.illinois.cogcomp.hadoopinterface.HadoopInterface.main(HadoopInterface.java:58) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Cảm ơn!
EDIT: Mã của tôi cho InputSplit tùy chỉnh sau:
import . . .
/**
* A document directory within the input directory.
* Returned by DirectoryInputFormat.getSplits()
* and passed to DirectoryInputFormat.createRecordReader().
*
* Represents the data to be processed by an individual Map process.
*/
public class DirectorySplit extends InputSplit {
/**
* Constructs a DirectorySplit object
* @param docDirectoryInHDFS The location (in HDFS) of this
* document's directory, complete with all annotations.
* @param fs The filesystem associated with this job
*/
public DirectorySplit(Path docDirectoryInHDFS, FileSystem fs)
throws IOException {
this.inputPath = docDirectoryInHDFS;
hash = FileSystemHandler.getFileNameFromPath(inputPath);
this.fs = fs;
}
/**
* Get the size of the split so that the input splits can be sorted by size.
* Here, we calculate the size to be the number of bytes in the original
* document (i.e., ignoring all annotations).
*
* @return The number of characters in the original document
*/
@Override
public long getLength() throws IOException, InterruptedException {
Path origTxt = new Path(inputPath, "original.txt");
HadoopInterface.logger.log(msg);
return FileSystemHandler.getFileSizeInBytes(origTxt, fs);
}
/**
* Get the list of nodes where the data for this split would be local.
* This list includes all nodes that contain any of the required data---it's
* up to Hadoop to decide which one to use.
*
* @return An array of the nodes for whom the split is local
* @throws IOException
* @throws InterruptedException
*/
@Override
public String[] getLocations() throws IOException, InterruptedException {
FileStatus status = fs.getFileStatus(inputPath);
BlockLocation[] blockLocs = fs.getFileBlockLocations(status, 0,
status.getLen());
HashSet<String> allBlockHosts = new HashSet<String>();
for(BlockLocation blockLoc : blockLocs) {
allBlockHosts.addAll(Arrays.asList(blockLoc.getHosts()));
}
return (String[])allBlockHosts.toArray();
}
/**
* @return The hash of the document that this split handles
*/
public String toString() {
return hash;
}
private Path inputPath;
private String hash;
private FileSystem fs;
}
Bạn có thể gửi mã cho Tuỳ chỉnh của bạn InputSplit - nó có mở rộng được không? đoán của tôi không phải là –
Mã được thêm vào. Tôi đã giả định rằng bởi vì tôi đã mở rộng InputSplit, mà thực hiện Writable, tôi sẽ không cần phải thực hiện có thể ghi trực tiếp. đây không phải là trường hợp? – s3cur3