Hbase和Hdfs的相互通讯/Hbase的mapreduce操作

1、HBase 结合 MapReduce

为什么需要用 mapreduce 去访问 hbase 的数据?
——加快分析速度和扩展分析能力
Mapreduce 访问 hbase 数据作分析一定是在离线分析的场景下应用
在这里插入图片描述

1.1将hbase数据转到hdfs中

需求:将hbase中的数据导出到hdfs中,下边结合代码进行解释

/***********************************************
 * 将hbase中的数据输出到hdfs中 现有一个user_info的hbase表表中的数据如下 有两非列簇 三个列
 *   baiyc_20150716_0001               column=base_info:age, timestamp=1559098923727, value=21                                          
 baiyc_20150716_0001               column=base_info:name, timestamp=1559098922536, value=baiyc1                                     
 baiyc_20150716_0001               column=extra_info:Hobbies, timestamp=1559098925248, value=music                                  
 baiyc_20150716_0002               column=base_info:age, timestamp=1559098923899, value=22                                          
 baiyc_20150716_0002               column=base_info:name, timestamp=1559098922677, value=baiyc2                                     
 baiyc_20150716_0002               column=extra_info:Hobbies, timestamp=1559098925396, value=sport                                  
 baiyc_20150716_0003               column=base_info:age, timestamp=1559098924045, value=23                                          
 baiyc_20150716_0003               column=base_info:name, timestamp=1559098922856, value=baiyc3                                     
 baiyc_20150716_0003               column=extra_info:Hobbies, timestamp=1559098925524, value=music                                  
 baiyc_20150716_0004               column=base_info:age, timestamp=1559098924176, value=24                                          
 baiyc_20150716_0004               column=base_info:name, timestamp=1559098923008, value=baiyc4                                     
 baiyc_20150716_0004               column=extra_info:Hobbies, timestamp=1559098925673, value=sport                                  
 baiyc_20150716_0005               column=base_info:age, timestamp=1559098924356, value=25                                          
 baiyc_20150716_0005               column=base_info:name, timestamp=1559098923175, value=baiyc5                                     
 baiyc_20150716_0005               column=extra_info:Hobbies, timestamp=1559098925778, value=music                                  
 baiyc_20150716_0006               column=base_info:age, timestamp=1559098924839, value=26                                          
 baiyc_20150716_0006               column=base_info:name, timestamp=1559098923301, value=baiyc6                                     
 baiyc_20150716_0006               column=extra_info:Hobbies, timestamp=1559098925898, value=sport                                  
 baiyc_20150716_0007               column=base_info:age, timestamp=1559098924997, value=27                                          
 baiyc_20150716_0007               column=base_info:name, timestamp=1559098923438, value=baiyc7                                     
 baiyc_20150716_0007               column=extra_info:Hobbies, timestamp=1559098925954, value=music                                  
 baiyc_20150716_0008               column=base_info:age, timestamp=1559098925132, value=28                                          
 baiyc_20150716_0008               column=base_info:name, timestamp=1559098923570, value=baiyc8                                     
 baiyc_20150716_0008               column=extra_info:Hobbies, timestamp=1559098927723, value=sport                                  
 rk0001                            column=base_info:name, timestamp=1559098940985, value=zhangsan                                   
 user0000                          column=base_info:age, timestamp=1559098810355, value=18                                          
 user0000                          column=base_info:gender, timestamp=1559098810512, value=female                                   
 user0000                          column=base_info:name, timestamp=1559098810155, value=luoyufeng                                  
 user0000                          column=extra_info:size, timestamp=1559098825877, value=34                                        
 user0001                          column=base_info:name, timestamp=1559098837152, value=zhangsan1                                  
 zhangsan_20150701_0001            column=base_info:age, timestamp=1559098919139, value=21                                          
 zhangsan_20150701_0001            column=base_info:name, timestamp=1559098837291, value=zhangsan1                                  
 zhangsan_20150701_0001            column=extra_info:Hobbies, timestamp=1559098920946, value=music                                  
 zhangsan_20150701_0002            column=base_info:age, timestamp=1559098919522, value=22                                          
 zhangsan_20150701_0002            column=base_info:name, timestamp=1559098837458, value=zhangsan2                                  
 zhangsan_20150701_0002            column=extra_info:Hobbies, timestamp=1559098921133, value=sport                                  
 zhangsan_20150701_0003            column=base_info:age, timestamp=1559098919915, value=23                                          
 zhangsan_20150701_0003            column=base_info:name, timestamp=1559098837744, value=zhangsan3                                  
 zhangsan_20150701_0003            column=extra_info:Hobbies, timestamp=1559098921300, value=music                                  
 zhangsan_20150701_0004            column=base_info:age, timestamp=1559098920096, value=24                                          
 zhangsan_20150701_0004            column=base_info:name, timestamp=1559098838083, value=zhangsan4                                  
 zhangsan_20150701_0004            column=extra_info:Hobbies, timestamp=1559098921640, value=sport                                  
 zhangsan_20150701_0005            column=base_info:age, timestamp=1559098920288, value=25                                          
 zhangsan_20150701_0005            column=base_info:name, timestamp=1559098838237, value=zhangsan5                                  
 zhangsan_20150701_0005            column=extra_info:Hobbies, timestamp=1559098921866, value=music                                  
 zhangsan_20150701_0006            column=base_info:age, timestamp=1559098920456, value=26                                          
 zhangsan_20150701_0006            column=base_info:name, timestamp=1559098838431, value=zhangsan6                                  
 zhangsan_20150701_0006            column=extra_info:Hobbies, timestamp=1559098922014, value=sport                                  
 zhangsan_20150701_0007            column=base_info:age, timestamp=1559098920622, value=27                                          
 zhangsan_20150701_0007            column=base_info:name, timestamp=1559098838602, value=zhangsan7                                  
 zhangsan_20150701_0007            column=extra_info:Hobbies, timestamp=1559098922228, value=music                                  
 zhangsan_20150701_0008            column=base_info:age, timestamp=1559098920792, value=28                                          
 zhangsan_20150701_0008            column=base_info:name, timestamp=1559098902687, value=zhangsan8   
 *               将表中的数据读取到hdfs的文件中 了路径 /user_inf/
 ***********************************************/
public class HbasetoHdfs {
	/**
	 * 这是一个专门熊hbase中读取的mapper的类,它只有两个参数,
	 * 参数1:需要发送给reduce端的key
	 * 参数2是发送到reduce端的value
	 */
	static class MyMapper extends TableMapper<Text, Text> {
		Text mk = new Text();
		Text mv = new Text();
		/**
		 * 这个map方法
		 * 参数1:是读取的hbase的行键,也就是这个map方法也是一行一行读的,也就是将一个行键的所有信息读取相出来
		 * 参数2就是一行的所有的信息
		 * 参数3是上下文的对象
		 */
		@SuppressWarnings("deprecation")
		@Override
		protected void map(ImmutableBytesWritable key, Result value,
				Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
				throws IOException, InterruptedException {
			//通过key的get()方法获得行键
			byte[] bs = key.get();
			mk.set(new String(bs));
			//通过value获得所有的单元格的对象集合,一个单元格对象就是一个列
			List<Cell> listCells = value.listCells();
			StringBuilder sb = new StringBuilder();
			//遍历集合获得单元格的对象
			for (Cell cell : listCells) {
				//通过单元格获得列簇  列还有值
				byte[] family = cell.getFamily();
				byte[] qualifier = cell.getQualifier();
				byte[] value2 = cell.getValue();
				String family_str = new String(family);
				String qualifer_str = new String(qualifier);
				String value_str = new String(value2);
				//将一行中所有的单元格的信息组装进行输出
				sb.append(family_str).append(":").append(qualifer_str).append(":").append(value_str).append(",");
			}
			String substring = sb.substring(0, sb.length()-1);
			mv.set(substring.toString());
			//key是行键  vlaue是单元格的信息
			context.write(mk, mv);
			
		}
	}

	static class MyReduce extends Reducer<Text, Text, Text, Text> {
		Text rv = new Text();
		@Override
		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			//将接收到的map端的数据不加处理直接输出,如果业务需求可以在这里进行相应逻辑处理
			for (Text text : values) {
				rv.set(text);
				context.write(key, rv);
			}
		}
	}

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// 设置用户名
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		// 加载配置文件
		Configuration conf = new Configuration();
		// 设定zookeeper的访问路径,通过zookeeper来寻找active的namenode
		conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181");
		// 设置hdfs的访问入口
		conf.set("fs.defaultFS", "hdfs://bd1901/");
		// 创建job
		Job job = Job.getInstance(conf);
		// 设置类
		job.setJarByClass(HbasetoHdfs.class);
		// 设置Mapper
		Scan scan = new Scan();
		//通过工具类设置和Mapper和Hbase 表的关联关系 并设置Mapper端的输出类型
		TableMapReduceUtil.initTableMapperJob("user_info", scan, MyMapper.class, Text.class, Text.class, job);
		// 设置reduce
		job.setReducerClass(MyReduce.class);
		// 设置输出
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		// 设置输出的路径
		Path path = new Path("/user_inf/");
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(path)) {
			fs.delete(path, true);
		}
		FileOutputFormat.setOutputPath(job, path);
		job.waitForCompletion(true);

	}

}

输出结果如下:
baiyc_20150716_0001 base_info:age:21,base_info:name:baiyc1,extra_info:Hobbies:music
baiyc_20150716_0002 base_info:age:22,base_info:name:baiyc2,extra_info:Hobbies:sport
baiyc_20150716_0003 base_info:age:23,base_info:name:baiyc3,extra_info:Hobbies:music
baiyc_20150716_0004 base_info:age:24,base_info:name:baiyc4,extra_info:Hobbies:sport
baiyc_20150716_0005 base_info:age:25,base_info:name:baiyc5,extra_info:Hobbies:music
baiyc_20150716_0006 base_info:age:26,base_info:name:baiyc6,extra_info:Hobbies:sport
baiyc_20150716_0007 base_info:age:27,base_info:name:baiyc7,extra_info:Hobbies:music
baiyc_20150716_0008 base_info:age:28,base_info:name:baiyc8,extra_info:Hobbies:sport
rk0001 base_info:name:zhangsan
user0000 base_info:age:18,base_info:gender:female,base_info:name:luoyufeng,extra_info:size:34
user0001 base_info:name:zhangsan1
zhangsan_20150701_0001 base_info:age:21,base_info:name:zhangsan1,extra_info:Hobbies:music
zhangsan_20150701_0002 base_info:age:22,base_info:name:zhangsan2,extra_info:Hobbies:sport
zhangsan_20150701_0003 base_info:age:23,base_info:name:zhangsan3,extra_info:Hobbies:music
zhangsan_20150701_0004 base_info:age:24,base_info:name:zhangsan4,extra_info:Hobbies:sport
zhangsan_20150701_0005 base_info:age:25,base_info:name:zhangsan5,extra_info:Hobbies:music
zhangsan_20150701_0006 base_info:age:26,base_info:name:zhangsan6,extra_info:Hobbies:sport
zhangsan_20150701_0007 base_info:age:27,base_info:name:zhangsan7,extra_info:Hobbies:music
zhangsan_20150701_0008 base_info:age:28,base_info:name:zhangsan8

1.2将hdfs中的数据转到hbase中

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.aura.cn.HM.HbasetoHdfs.MyMapper;
import com.aura.cn.HM.HbasetoHdfs.MyReduce;

/***********************************************
 * 将hdfs的数据导出到hbase中 下边是一个hdfs文件中的数据 baiyc_20150716_0001
 * base_info:age:21,base_info:name:baiyc1,extra_info:Hobbies:music 行键
 * 列簇:列:值,列簇:列:值,列簇:列:值
 **baiyc_20150716_0001	base_info:age:21,base_info:name:baiyc1,extra_info:Hobbies:music
baiyc_20150716_0002	base_info:age:22,base_info:name:baiyc2,extra_info:Hobbies:sport
baiyc_20150716_0003	base_info:age:23,base_info:name:baiyc3,extra_info:Hobbies:music
baiyc_20150716_0004	base_info:age:24,base_info:name:baiyc4,extra_info:Hobbies:sport
baiyc_20150716_0005	base_info:age:25,base_info:name:baiyc5,extra_info:Hobbies:music
baiyc_20150716_0006	base_info:age:26,base_info:name:baiyc6,extra_info:Hobbies:sport
baiyc_20150716_0007	base_info:age:27,base_info:name:baiyc7,extra_info:Hobbies:music
baiyc_20150716_0008	base_info:age:28,base_info:name:baiyc8,extra_info:Hobbies:sport
rk0001	base_info:name:zhangsan
user0000	base_info:age:18,base_info:gender:female,base_info:name:luoyufeng,extra_info:size:34
user0001	base_info:name:zhangsan1
zhangsan_20150701_0001	base_info:age:21,base_info:name:zhangsan1,extra_info:Hobbies:music
zhangsan_20150701_0002	base_info:age:22,base_info:name:zhangsan2,extra_info:Hobbies:sport
zhangsan_20150701_0003	base_info:age:23,base_info:name:zhangsan3,extra_info:Hobbies:music
zhangsan_20150701_0004	base_info:age:24,base_info:name:zhangsan4,extra_info:Hobbies:sport
zhangsan_20150701_0005	base_info:age:25,base_info:name:zhangsan5,extra_info:Hobbies:music
zhangsan_20150701_0006	base_info:age:26,base_info:name:zhangsan6,extra_info:Hobbies:sport
zhangsan_20150701_0007	base_info:age:27,base_info:name:zhangsan7,extra_info:Hobbies:music
zhangsan_20150701_0008	base_info:age:28,base_info:name:zhangsan8
 * 
 * 
 ***********************************************/
public class HdfsToHBase {
//用mapper读取一行的hdfs数据解析
	static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
		Text mk = new Text();
		Text mv = new Text();

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String[] split = value.toString().trim().split("\t");
			mk.set(split[0]);
			mv.set(split[1]);
			context.write(mk, mv);
		}
	}
//创建reduce类来连接reduce将map端的数据输出到hbase中  ,这个类有三个泛型  前两个是map输出的类型  第三个是reduce输出的key  
//ImmutableBytesWritable    带表就是行键   将行键进行一个封装  如果将key作为行键的话就用这种类型  
//reduce输出的value默认就是Mutation类型 Mutation类的实现类有Put  Delete append 等类 也就是hbase的数据的操作的对象(容器)
//本例就是将key作为行键
	static class MyReduce extends TableReducer<Text, Text, ImmutableBytesWritable> {
		@Override
		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context)
				throws IOException, InterruptedException {
			Put put = new Put(key.toString().getBytes());
			for (Text value : values) {
				String[] split = value.toString().trim().split(",");
				for (String sp : split) {
					String[] column = sp.trim().split(":");
					put.addColumn(column[0].getBytes(), column[1].getBytes(), column[2].getBytes());
					context.write(new ImmutableBytesWritable(key.toString().getBytes()), put);
				}

			}
		}
	}

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// 设置用户名
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		// 加载配置文件
		Configuration conf = new Configuration();
		// 设定zookeeper的访问路径,通过zookeeper来寻找active的namenode
		conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181");
		// 设置hdfs的访问入口
		conf.set("fs.defaultFS", "hdfs://bd1901/");
		// 创建job
		Job job = Job.getInstance(conf);
		// 设置类
		job.setJarByClass(HdfsToHBase.class);
		// 设置Reducer
		Scan scan = new Scan();
		// 通过工具类设置和Reducer和Hbase 
		TableMapReduceUtil.initTableReducerJob("bd1901:user_info", MyReduce.class, job);
		// 设置Mapper
		job.setMapperClass(MyMapper.class);
		// 设置输出类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(ImmutableBytesWritable.class);
		job.setOutputValueClass(Mutation.class);
		
		FileInputFormat.setInputPaths(job, "/user_inf/");
		job.waitForCompletion(true);

	}

}

hbase端的结果:
baiyc_20150716_0001 column=base_info:age, timestamp=1559380160116, value=21
baiyc_20150716_0001 column=base_info:name, timestamp=1559380160116, value=baiyc1
baiyc_20150716_0001 column=extra_info:Hobbies, timestamp=1559380160116, value=music
baiyc_20150716_0002 column=base_info:age, timestamp=1559380160116, value=22
baiyc_20150716_0002 column=base_info:name, timestamp=1559380160116, value=baiyc2
baiyc_20150716_0002 column=extra_info:Hobbies, timestamp=1559380160116, value=sport
baiyc_20150716_0003 column=base_info:age, timestamp=1559380160116, value=23
baiyc_20150716_0003 column=base_info:name, timestamp=1559380160116, value=baiyc3
baiyc_20150716_0003 column=extra_info:Hobbies, timestamp=1559380160116, value=music
baiyc_20150716_0004 column=base_info:age, timestamp=1559380160116, value=24
baiyc_20150716_0004 column=base_info:name, timestamp=1559380160116, value=baiyc4
baiyc_20150716_0004 column=extra_info:Hobbies, timestamp=1559380160116, value=sport
baiyc_20150716_0005 column=base_info:age, timestamp=1559380160116, value=25
baiyc_20150716_0005 column=base_info:name, timestamp=1559380160116, value=baiyc5
baiyc_20150716_0005 column=extra_info:Hobbies, timestamp=1559380160116, value=music
baiyc_20150716_0006 column=base_info:age, timestamp=1559380160116, value=26
baiyc_20150716_0006 column=base_info:name, timestamp=1559380160116, value=baiyc6
baiyc_20150716_0006 column=extra_info:Hobbies, timestamp=1559380160116, value=sport
baiyc_20150716_0007 column=base_info:age, timestamp=1559380160116, value=27
baiyc_20150716_0007 column=base_info:name, timestamp=1559380160116, value=baiyc7
baiyc_20150716_0007 column=extra_info:Hobbies, timestamp=1559380160116, value=music
baiyc_20150716_0008 column=base_info:age, timestamp=1559380160116, value=28
baiyc_20150716_0008 column=base_info:name, timestamp=1559380160116, value=baiyc8
baiyc_20150716_0008 column=extra_info:Hobbies, timestamp=1559380160116, value=sport
rk0001 column=base_info:name, timestamp=1559380160116, value=zhangsan
user0000 column=base_info:age, timestamp=1559380160116, value=18
user0000 column=base_info:gender, timestamp=1559380160116, value=female
user0000 column=base_info:name, timestamp=1559380160116, value=luoyufeng
user0000 column=extra_info:size, timestamp=1559380160116, value=34
user0001 column=base_info:name, timestamp=1559380160116, value=zhangsan1
zhangsan_20150701_0001 column=base_info:age, timestamp=1559380160116, value=21
zhangsan_20150701_0001 column=base_info:name, timestamp=1559380160116, value=zhangsan1
zhangsan_20150701_0001 column=extra_info:Hobbies, timestamp=1559380160116, value=music
zhangsan_20150701_0002 column=base_info:age, timestamp=1559380160116, value=22
zhangsan_20150701_0002 column=base_info:name, timestamp=1559380160116, value=zhangsan2
zhangsan_20150701_0002 column=extra_info:Hobbies, timestamp=1559380160116, value=sport
zhangsan_20150701_0003 column=base_info:age, timestamp=1559380160116, value=23
zhangsan_20150701_0003 column=base_info:name, timestamp=1559380160116, value=zhangsan3
zhangsan_20150701_0003 column=extra_info:Hobbies, timestamp=1559380160116, value=music
zhangsan_20150701_0004 column=base_info:age, timestamp=1559380160116, value=24
zhangsan_20150701_0004 column=base_info:name, timestamp=1559380160116, value=zhangsan4
zhangsan_20150701_0004 column=extra_info:Hobbies, timestamp=1559380160116, value=sport
zhangsan_20150701_0005 column=base_info:age, timestamp=1559380160116, value=25
zhangsan_20150701_0005 column=base_info:name, timestamp=1559380160116, value=zhangsan5
zhangsan_20150701_0005 column=extra_info:Hobbies, timestamp=1559380160116, value=music
zhangsan_20150701_0006 column=base_info:age, timestamp=1559380160116, value=26
zhangsan_20150701_0006 column=base_info:name, timestamp=1559380160116, value=zhangsan6
zhangsan_20150701_0006 column=extra_info:Hobbies, timestamp=1559380160116, value=sport
zhangsan_20150701_0007 column=base_info:age, timestamp=1559380160116, value=27
zhangsan_20150701_0007 column=base_info:name, timestamp=1559380160116, value=zhangsan7
zhangsan_20150701_0007 column=extra_info:Hobbies, timestamp=1559380160116, value=music
zhangsan_20150701_0008 column=base_info:age, timestamp=1559380160116, value=28
zhangsan_20150701_0008 column=base_info:name, timestamp=1559380160116, value=zhangsan8

1.3使用mapreduce对Hbase进行统计,将统计结果放在hbase的表中

需求,根据hbase的ratings表的内容计算所有的人的影评的平均分,将结果放在一张新表中

/**
 * 
 */
package com.aura.cn.HM;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;


/***********************************************
*将Hbase中数据使用MR进行统计,之后将结果输出到Hbase的表中
*1.将hbase的ratings表的数据进行分析  求每一个用户的平均评分 
将最终的分析结果  存在hbase中
ratings表中的数据是这样的
 rk1                               column=info:movieid, timestamp=1559219500445, value=1                                            
 rk1                               column=info:rate, timestamp=1559219500445, value=5                                               
 rk1                               column=info:ts, timestamp=1559219500445, value=978824268                                         
 rk1                               column=info:user_id, timestamp=1559219500445, value=1                                            
 rk10                              column=info:movieid, timestamp=1559219500445, value=1246                                         
 rk10                              column=info:rate, timestamp=1559219500445, value=4                                               
 rk10                              column=info:ts, timestamp=1559219500445, value=978302091                                         
 rk10                              column=info:user_id, timestamp=1559219500445, value=1                                            
 rk100                             column=info:movieid, timestamp=1559219500445, value=1220                                         
 rk100                             column=info:rate, timestamp=1559219500445, value=4                                               
 rk100                             column=info:ts, timestamp=1559219500445, value=978227912                                         
 rk100                             column=info:user_id, timestamp=1559219500445, value=10      
***********************************************/
public class HbaseToHbase {
	/**
	 * map端将user_id作为key   rate作为value发送的reduce端
	 * 根据key进行分组
	 */
static class MyMapper extends TableMapper<Text, Text>{
	Text mk =new Text();
	Text mv =new Text();
	@SuppressWarnings("deprecation")
	@Override
	protected void map(ImmutableBytesWritable key, Result value,
			Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
			throws IOException, InterruptedException {
			//记录mk mv的赋值情况
			boolean flag=false;
			boolean flag1=false;
			//获取单元格的集合
			List<Cell> cells = value.listCells();
			//遍历单元格集合获取列及属性
			for (Cell cell : cells) {
				byte[] cloneQualifier = CellUtil.cloneQualifier(cell);
				String qualifier = new String(cloneQualifier);
				if("user_id".equals(qualifier)) {
					mk.set(CellUtil.cloneValue(cell));
					flag=true;
				}
				if("rate".equals(qualifier)) {
					mv.set(CellUtil.cloneValue(cell));
					flag1=true;
				}
				//key和value均赋值结束之后直接进行输出
				if(flag&&flag1) {
					context.write(mk, mv);
					break;
				}
			}
	}
}
/**
 * reduce端根据key进行分组,组内计算所有评分的平均分
 * 将user_id作为行键
 * 将平均评分作为value进行输出
 */
static class MyReduce extends TableReducer<Text, Text, ImmutableBytesWritable>{
	DecimalFormat df=new DecimalFormat("#.00");
	@Override
	protected void reduce(Text key, Iterable<Text> values,
			Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context)
					throws IOException, InterruptedException {
		int count=0;
		double sum=0;
		double avg=0;
		for (Text value : values) {
			count++;
			double rate = Double.parseDouble(value.toString().trim());
			sum+=rate;
		}
		avg=sum/count;
		String avg_rate= df.format(avg);
		//创建Put对象,指定行键
		Put put =new Put(key.toString().getBytes());
		//指定列 和值
		put.addColumn("info".getBytes(), "avg-rate".getBytes(), avg_rate.getBytes());
		//输出到hbase
		context.write(new ImmutableBytesWritable(key.toString().getBytes()), put);
	}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
	// 设置用户名
	System.setProperty("HADOOP_USER_NAME", "hadoop");
	// 加载配置文件
	Configuration conf = new Configuration();
	// 设定zookeeper的访问路径,通过zookeeper来寻找active的namenode
	conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181");
	// 设置hdfs的访问入口
	conf.set("fs.defaultFS", "hdfs://bd1901/");
	// 创建job
	Job job = Job.getInstance(conf);
	// 设置类
	job.setJarByClass(HbaseToHbase.class);
	Scan scan = new Scan();
	//设置Mapper
	TableMapReduceUtil.initTableMapperJob("bd1901:ratings", scan, MyMapper.class, 
			Text.class, Text.class, job);
	// 设置Reducer
	// 通过工具类设置和Reducer和Hbase 
	TableMapReduceUtil.initTableReducerJob("bd1901:user_avg_rate",MyReduce.class, job);
	// 设置输出类型
	job.setOutputKeyClass(ImmutableBytesWritable.class);
	job.setOutputValueClass(Mutation.class);
	job.waitForCompletion(true);
	
}
}

hbase新表中生成数据如下
162 column=info:avg-rate, timestamp=1559389843125, value=4.12
1620 column=info:avg-rate, timestamp=1559389843125, value=3.94
1621 column=info:avg-rate, timestamp=1559389843125, value=3.14
1622 column=info:avg-rate, timestamp=1559389843125, value=3.73
1623 column=info:avg-rate, timestamp=1559389843125, value=4.21
1624 column=info:avg-rate, timestamp=1559389843125, value=3.01
1625 column=info:avg-rate, timestamp=1559389843125, value=3.36
1626 column=info:avg-rate, timestamp=1559389843125, value=3.44
1627 column=info:avg-rate, timestamp=1559389843125, value=2.98
1628 column=info:avg-rate, timestamp=1559389843125, value=3.75
1629 column=info:avg-rate, timestamp=1559389843125, value=3.49
163 column=info:avg-rate, timestamp=1559389843125, value=2.18
1630 column=info:avg-rate, timestamp=1559389843125, value=2.26
1631 column=info:avg-rate, timestamp=1559389843125, value=3.84
1632 column=info:avg-rate, timestamp=1559389843125, value=2.78
1633 column=info:avg-rate, timestamp=1559389843125, value=2.97
1634 column=info:avg-rate, timestamp=1559389843125, value=3.58
1635 column=info:avg-rate, timestamp=1559389843125, value=3.78
1636 column=info:avg-rate, timestamp=1559389843125, value=2.80
1637 column=info:avg-rate, timestamp=1559389843125, value=3.99
1638 column=info:avg-rate, timestamp=1559389843125, value=4.21
1639 column=info:avg-rate, timestamp=1559389843125, value=3.66
164 column=info:avg-rate, timestamp=1559389843125, value=4.38
1640 column=info:avg-rate, timestamp=1559389843125, value=2.47
1641 column=info:avg-rate, timestamp=1559389843125, value=2.68
1642 column=info:avg-rate, timestamp=1559389843125, value=3.50
1643 column=info:avg-rate, timestamp=1559389843125, value=4.29
1644 column=info:avg-rate, timestamp=1559389843125, value=3.06