import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.StringTokenizer;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.util.Bytes;
public class InvertedIndexHbase {
//创建表并进行简单配置
public static void createHBaseTable (Configuration conf, String tablename) throws IOException {
// HBaseConfiguration configuration = new HBaseConfiguration();
HBaseAdmin admin = new HBaseAdmin(conf);
if (admin.tableExists(tablename)) { //如果表已经存在
System.out.println( "table exits, Trying recreate table!" );
admin.disableTable(tablename);
admin.deleteTable(tablename);
}
HTableDescriptor htd = new HTableDescriptor(tablename); //row
HColumnDescriptor col = new HColumnDescriptor( "content" ); //列族
htd.addFamily(col); //创建列族
System.out.println( "Create new table: " + tablename);
admin.createTable(htd); //创建表
}
//map函数不变
public static class Map
extends Mapper < Object , Text , Text , Text > {
private Text keyWord = new Text();
private Text valueDocCount = new Text();
public void map (Object key, Text value, Context context)
throws IOException, InterruptedException {
//获取文档
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String fileName = fileSplit.getPath().getName();
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
keyWord.set(itr.nextToken() + ":" + fileName); // key为key#doc
valueDocCount.set( "1" ); // value为词频
context.write(keyWord, valueDocCount);
}
}
}
//combine函数不变
public static class InvertedIndexCombiner
extends Reducer < Text , Text , Text , Text > {
private Text wordCount = new Text();
private Text wordDoc = new Text();
//将key-value转换为word-doc:词频
public void reduce (Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
int sum = 0 ;
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
int splitIndex = key.toString().indexOf( ":" ); // 找到:的位置
wordDoc.set(key.toString().substring( 0 , splitIndex)); //key变为单词
wordCount.set(sum + "" ); //value变为doc:词频
context.write(wordDoc, wordCount);
}
}
//reduce将数据存入HBase
public static class Reduce
extends TableReducer < Text , Text , NullWritable > {
private Text temp = new Text();
public void reduce (Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
int sum = 0 ;
int count = 0 ;
Iterator<Text> it = values.iterator();
//形成最终value
for (;it.hasNext();) {
count++;
temp.set(it.next());
sum += Integer.parseInt(temp.toString());
}
float averageCount = ( float )sum / ( float )count;
FloatWritable average = new FloatWritable(averageCount);
//加入row为key.toString()
Put put = new Put(Bytes.toBytes(key.toString())); //Put实例, 每一词存一行
put.add(Bytes.toBytes( "content" ), Bytes.toBytes( "average" ), Bytes.toBytes(average.toString()));
context.write(NullWritable.get(), put);
}
}
public static void main (String[] args) throws Exception {
String tablename = "Wuxia" ;
Configuration conf = HBaseConfiguration.create();
conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
createHBaseTable(conf, tablename);
Job job = Job.getInstance(conf, "Wuxia" ); //配置作业名
//配置作业的各个类
job.setJarByClass(InvertedIndexHbase.class);
job.setMapperClass(Map.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[ 0 ]));
FileOutputFormat.setOutputPath(job, new Path(args[ 1 ]));
System.exit(job.waitForCompletion( true ) ? 0 : 1 );
}
}