//首先,构造出一个PariRDD形式的RDD JavaPairRDD<String, JSONObject> javaPairRDD =xxx //使用saveAsHadoopFile方法输出到目标目录下,方法参数分别为(目标目录,key的class类型,value的class类型,输出format类) javaPairRDD.saveAsHadoopFile("D://Test",String.class,JSONObject.class,RDDMultipleTextOutputFormat2.class); //自定义一个RDDMultipleTextOutputFormat2继承MultipleTextOutputFormat public static class RDDMultipleTextOutputFormat2 extends MultipleTextOutputFormat<String, JSONObject> { @Override public String generateFileNameForKeyValue(String key, JSONObject value, String name) { String object_type = value.getString("object_type"); String object_id = value.getString("object_id"); return object_type + "/" + object_id+".json"; } } 复制代码
最后输出的结果就是按 "D:/Test/object_type/object_id.json
//输出文件内容形式如下 key value 复制代码
/首先,构造出一个PariRDD形式的RDD JavaPairRDD<String, JSONObject> javaPairRDD =xxx //将PariRDD转为<NullWritable,T>的形式 JavaPairRDD<NullWritable, JSONObject> nullKeyJavaPairRDD = javaPairRDD.mapToPair(tuple2 -> { return new Tuple2(NullWritable.get(),tuple2._2); }); //接下来的操作和上面一样 nullKeyJavaPairRDD.saveAsHadoopFile("D://Test",NullWritable.class,JSONObject.class,RDDMultipleTextOutputFormat.class); public static class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat<NullWritable, JSONObject> { @Override public String generateFileNameForKeyValue(NullWritable key, JSONObject value, String name) { String object_type = value.getString("object_type"); String object_id = value.getString("object_id"); return object_type + "/" + object_id+".json"; } } 复制代码
于是我们跟进我们继承的 MultipleTextOutputFormat
public class MultipleTextOutputFormat<K, V> extends MultipleOutputFormat<K, V> { private TextOutputFormat<K, V> theTextOutputFormat = null; @Override protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException { if (theTextOutputFormat == null) { theTextOutputFormat = new TextOutputFormat<K, V>(); } return theTextOutputFormat.getRecordWriter(fs, job, name, arg3); } } 复制代码
并没有发现有相关的方法,我们继续跟进父类 MultipleOutputFormat
public void write(K key, V value) throws IOException { // get the file name based on the key String keyBasedPath = generateFileNameForKeyValue(key, value, myName); // get the file name based on the input file name String finalPath = getInputFileBasedOutputFileName(myJob, keyBasedPath); // get the actual key K actualKey = generateActualKey(key, value); V actualValue = generateActualValue(key, value); RecordWriter<K, V> rw = this.recordWriters.get(finalPath); if (rw == null) { // if we don't have the record writer yet for the final path, create // one // and add it to the cache rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable); this.recordWriters.put(finalPath, rw); } rw.write(actualKey, actualValue); }; 复制代码
感觉真相就在眼前了,我们继续跟进 rw.write(actualKey, actualValue);
方法,通过断点我们可以知道他进入的是 TextOutPutFormat #write()
public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } 复制代码
了解到这里,我们就很容易得出解决方案,我们只要将传入 write()
方法中的key传入null就可以了。回到 MultipleOutputFormat
类中,我们看到传入的key是由这个方法获取的 K actualKey = generateActualKey(key, value);
protected K generateActualKey(K key, V value) { return key; } 复制代码
public static class RDDMultipleTextOutputFormat3 extends MultipleTextOutputFormat<String, JSONObject> { @Override public String generateFileNameForKeyValue(String key, JSONObject value, String name) { //输出格式 /ouput/key/key.json String object_type = value.getString("object_type"); String object_id = value.getString("object_id"); return object_type + "/" + object_id+".json"; } @Override public String generateActualKey(String key, JSONObject value) { return null; } } 复制代码