|
文章目录
- 分布式NoSQL列存储数据库Hbase(九)
- 知识点01:课程回顾
- 知识点02:课程目标
- 知识点03:MR集成Hbase:读Hbase规则
- 知识点04:MR集成Hbase:读Hbase实现
- 知识点05:MR集成Hbase:写Hbase规则
- 知识点06:MR集成Hbase:写Hbase实现
- 知识点07:BulkLoad的介绍
- 知识点08:BulkLoad的实现
- 知识点09:ImportTSV的使用
- 知识点10:协处理器的介绍
- 知识点11:协处理器的实现
- 知识点12:Hbase优化:内存分配
- 知识点13:Hbase优化:压缩机制
- 知识点14:Hbase优化:布隆过滤
- 知识点15:Hbase优化:列族属性
- 知识点16:Hbase优化:其他优化
- 附录一:Maven依赖
分布式NoSQL列存储数据库Hbase(九)
知识点01:课程回顾
- 简述Hbase中hbase:meta表的功能及存储内容
- 功能:记录表的元数据信息
- 内容
- rowkey:Hbase中每张表的每个Region的名称
- 列
- Region名称
- Region范围:startKey,stopKey
- Region所在的RegionServer地址
- 简述Hbase中数据写入流程
- step1:客户端连接ZK,获取meta表所在的地址,读取meta表数据
- step2:根据表名,获取当前要操作的表的所有region的信息
region名称前缀:表名,startKey
- step3:根据Rowkey,判断具体操作哪个Region
- step4:获取对应Region的地址,请求对应的RegionServer
- step5:RegionServer接受请求,将数据写入Region,先写入WAL
- step6:根据列族来判断写入哪个Store中
- 简述Hbase中数据读取流程
- step1:客户端连接ZK,获取meta表所在的地址,读取meta表数据
- step2:根据表名,获取当前要操作的表的所有region的信息
region名称前缀:表名,startKey
- step3:根据Rowkey,判断具体操作哪个Region
- step4:获取对应Region的地址,请求对应的RegionServer
- step5::RegionServer接受请求,从Region中读取数据
- 先读memstore
- 判断查询数据是否做了缓存,如果做了缓存:就读BlockCache
- 最后读StoreFile
- 如果开启了缓存,查询结果会放入BlockCache
- 简述LSM模型的流程设计
- step1:不论什么数据操作:增删改,都只对内存进行操作
- 删除和修改都是写入操作来代替的
- 内存写入成功,就返回
- 顺序读写内存
- 顺序读写磁盘
- 随机读写内存:memStore,BlockCache
- 随机读写磁盘:StoreFile
- step2:数据写入内存,达到一定阈值,会将内存的数据写入磁盘
- step3:定期将所有小文件和并为大文件,加快检索的效率
- 简述Hbase中的Flush、Compaction、Split的功能
- Flush:将memstore中的数据刷写到HDFS,变成StoreFile文件
- 2.0之前
- memstore:单个memstore达到128M,就会Flush
- 所有的memstore总存储达到95%,就会触发整个RS的Flush
- 2.0之后
- 设置一个水位线:max(128 / 列族个数,16)
- 高于水位线的memstore:就会flush
- 低于水位线的memstore:不会flush
- 所有都低于,都flush
- Compaction:用于将storefile文件进行合并,并且删除过期数据【被标记为更新和删除的数据】
- minor compact:轻量级合并,将最早的几个小的storefile文件进行合并,不会删除过期数据
- major compact:重量级合并,将所有的storefile合并为一个storefile,会删除过期数据
- 2.0版本开始:in-memory-compact:在memstore中将数据提前进行合并
- none:不开启
- basic:只合并,不删除过期数据
- eager:合并并且删除过期数据
- adapter:合并,根据数据量来判断是否自动删除过期数据
- Split:为了避免一个Region存储的数据量过大,导致负载过高,通过Split将一个region分为两个region,分摊负载
知识点02:课程目标
- MapReduce读写Hbase
- 重点:记住读写的规则
- Spark中读写Hbase规则与MapReduce的规则是一模一样的
- 应用:一般在工作中都是使用Spark来读写Hbase,如果是MapReduce可以使用Hive来实现
- BulkLoad的实现【了解】
- 问题:大量的数据并发往Hbase中写入,会导致内存和磁盘的利用率非常高,会影响其他程序的性能
- Hbase中提供两种写入数据的方式
- Put:直接写入memstore
- BulkLoad:先将数据转换为storefile文件,将storefile文件直接放入Hbase表的目录中
- 实现方式
- 协处理的介绍【了解】
- 什么是协处理器,分类
- 怎么开发协处理器:自己开发协处理器,实现索引表与原表数据同步
- Hbase中的优化方案【重点:记住】
- 对于Hbase做了哪些性能的优化?
- 内存优化
- 压缩优化
- 参数优化
- ……
知识点03:MR集成Hbase:读Hbase规则
- 目标
- 分析
- 读取由InputFormat决定
- TextInputFormat:读取文件中的内容,每一行返回一个KV
- K:行的偏移量:LongWritable
- V:行的内容值:Text
- TableInputFormat:负责实现读取Hbase的数据,将每个Rowkey的数据转换为一个KV对象
- K:Rowkey的字节对象:ImmutableBytesWritable
- V:Rowkey的数据内容:Result
- 实现
- step1:调用工具类方法,初始化Input和Map
- MapReduce中封装了工具类,实现读取Hbase数据
TableMapReduceUtil.initTableMapperJob
public static void initTableMapperJob(
String table,
Scan scan,
Class<? extends TableMapper> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass,
Job job
);
- step2:构建Map类继承TableMapper类
/**
* Extends the base <code>Mapper</code> class to add the required input key
* and value classes.
*
* @param <KEYOUT> The type of the key.
* @param <VALUEOUT> The type of the value.
* @see org.apache.hadoop.mapreduce.Mapper
*/
@InterfaceAudience.Public
public abstract class TableMapper<KEYOUT, VALUEOUT>
extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
}
- 总结
- MapReduce读取Hbase数据的API已经封装好了,只需要调用工具类实现即可
知识点04:MR集成Hbase:读Hbase实现
- 目标
- 分析
- step1:使用TableInputFormat读取Hbase数据
- step2:使用TextOutputFormat写入文件
- 实现
package bigdata.itcast.cn.hbase.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* @ClassName ReadHbaseTable
* @Description TODO 通过MapReduce读取Hbase表中的数据
* @Create By Frank
*/
public class ReadHbaseTable extends Configured implements Tool {
public int run(String[] args) throws Exception {
//todo:1-创建
Job job = Job.getInstance(this.getConf(),"read");
job.setJarByClass(ReadHbaseTable.class);
//todo:2-配置
//input&map
// job.setInputFormatClass(TextInputFormat.class);
// TextInputFormat.setInputPaths(job,new Path(""));
// job.setMapperClass(null);
// job.setMapOutputKeyClass(null);
// job.setMapOutputValueClass(null);
//input&map
/**
* public static void initTableMapperJob(
* String table, 指定从哪张表读取
* Scan scan, 读取Hbase数据使用的Scan对象,自定义过滤器
* Class<? extends TableMapper> mapper, Mapper类
* Class<?> outputKeyClass, Map输出的Key类型
* Class<?> outputValueClass, Map输出的Value类型
* Job job 当前的job
* )
*/
//构建TableInputFormat用于读取Hbase的scan对象
Scan scan = new Scan();//为了方便让你使用过滤器,提前过滤数据,再传递到MapReduce中,所以让你自定义一个scan对象
//可以为scan设置过滤器,将过滤后的数据加载到MapReduce程序中
TableMapReduceUtil.initTableMapperJob(
"itcast:t1",
scan,
ReadHbaseMap.class,
Text.class,
Text.class,
job
);
//reduce
job.setNumReduceTasks(0);
//output
TextOutputFormat.setOutputPath(job,new Path("datas/output/hbase"));
//todo:3-提交
return job.waitForCompletion(true) ? 0:-1;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
//指定Hbase服务端地址
conf.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181");
int status = ToolRunner.run(conf, new ReadHbaseTable(), args);
System.exit(status);
}
/**
* TableMapper<KEYOUT, VALUEOUT>
* extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT>
*/
public static class ReadHbaseMap extends TableMapper<Text, Text>{
//rowkey
Text outputKey = new Text();
//每一列的数据
Text outputValue = new Text();
/**
* 每个KV【一个Rowkey】调用一次map方法
* @param key:rowkey
* @param value:这个rowkey的数据
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//给key进行赋值
String rowkey = Bytes.toString(key.get());
this.outputKey.set(rowkey);
//给value赋值
for(Cell cell : value.rawCells()){
//得到每一列的数据
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String val = Bytes.toString(CellUtil.cloneValue(cell));
long ts = cell.getTimestamp();
this.outputValue.set(family+"\t"+column+"\t"+val+"\t"+ts);
//输出每一列的数据
context.write(this.outputKey,this.outputValue);
}
}
}
}
- 总结
- 最终也是调用了Hbase Java API
- 通过Scan来读取表的数据,返回到MapReduce程序汇总
知识点05:MR集成Hbase:写Hbase规则
- 目标
- 分析
- 实现
- step1:调用工具类初始化Reduce和Output
- MapReduce中封装了工具类,实现读取Hbase数据
TableMapReduceUtil.initTableReducerJob
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
*
* @param table The output table.
* @param reducer The reducer class to use.
* @param job The current job to adjust.
* @throws IOException When determining the region count fails.
*/
public static void initTableReducerJob(
String table,
Class<? extends TableReducer> reducer, 指定Reduce类,不用传递Key和Value类型,因为Key不重要,Value定死了
Job job
);
- step2:构建Reduce类继承TableReducer
/**
* Extends the basic <code>Reducer</code> class to add the required key and
* value input/output classes.
*
* @param <KEYIN> The type of the input key.
* @param <VALUEIN> The type of the input value.
* @param <KEYOUT> The type of the output key.
* @see org.apache.hadoop.mapreduce.Reducer
*/
@InterfaceAudience.Public
public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> {
}
- 总结
- MapReduce写入Hbase数据的API已经封装好了,只需要调用工具类实现即可
知识点06:MR集成Hbase:写Hbase实现
- 目标
- 分析
- step1:使用TextInputFormat读取文件中的数据
- step2:构建Put对象,封装Rowkey以及列
- step3:使用TableOutputFormat将数据写入Hbase表中
- 实现
- Hbase中建表
create 'itcast:mrwrite','info'
- 实现
package bigdata.itcast.cn.hbase.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* @ClassName WriteHbaseTable
* @Description TODO 通过MapReduce将数据写入Hbase
* @Create By Frank
*/
public class WriteHbaseTable extends Configured implements Tool {
public int run(String[] args) throws Exception {
//todo:1-创建
Job job = Job.getInstance(this.getConf(),"write");
job.setJarByClass(WriteHbaseTable.class);
//todo:2-配置
//input
TextInputFormat.setInputPaths(job,new Path("datas/hbase/writeHbase.txt"));
//map
job.setMapperClass(WriteToHbaseMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Put.class);
//shuffle
//reduce&output
/**
* public static void initTableReducerJob(
* String table, 将数据写入Hbase的哪张表
* Class<? extends TableReducer> reducer, reducer的类
* Job job) 当前的job
*
* 以前输出的写法:
* job.setoutputKey:因为Key可以任意的,这里根本用不到
* job.setoutputValue:在TableReduce中将outputValue定死了,所以不用写
*
*/
TableMapReduceUtil.initTableReducerJob(
"itcast:mrwrite",
WriteToHbaseReduce.class,
job
);
//output & reduce
// job.setReducerClass(null);
// job.setOutputKeyClass(null);
// job.setOutputValueClass(null);
// job.setOutputFormatClass(TextOutputFormat.class);
// TextOutputFormat.setOutputPath(job,new Path(""));
//todo:3-提交
return job.waitForCompletion(true) ? 0:-1;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181");
int status = ToolRunner.run(conf, new WriteHbaseTable(), args);
System.exit(status);
}
/**
* 读取文件,将文件中的内容,id作为key,其他的每一列作为一个Put对象
*/
public static class WriteToHbaseMap extends Mapper<LongWritable,Text,Text, Put>{
Text rowkey = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//value:1liudehua18male
String[] split = value.toString().split("\t");
String row = split[0];
String name = split[1];
String age = split[2];
String sex = split[3];
//将id作为rowkey,放在key中输出
this.rowkey.set(row);
//构造输出的Value
Put putname = new Put(Bytes.toBytes(row));
putname.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(name));
context.write(rowkey,putname);
Put putage = new Put(Bytes.toBytes(row));
putage.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(age));
context.write(rowkey,putage);
Put putsex = new Put(Bytes.toBytes(row));
putsex.addColumn(Bytes.toBytes("info"),Bytes.toBytes("sex"),Bytes.toBytes(sex));
context.write(rowkey,putsex);
}
}
/**
* public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
* extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation>
* 最后Reduce输出的Value类型必须为Put类型,才能将数据写入Hbase
*/
public static class WriteToHbaseReduce extends TableReducer<Text,Put,Text>{
/**
* 相同rowkey的所有Put都在一个迭代器中
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
//直接遍历每个put对象,输出即可
for (Put value : values) {
context.write(key,value);
}
}
}
}
- 总结
- 最终还是调用了Hbase Java API来实现的
- 通过构建Table对象,执行所有的Put对象实现将数据写入Hbase
知识点07:BulkLoad的介绍
- 目标
- 分析
- 问题:有一批大数据量的数据,要写入Hbase中,如果按照传统的方案来写入Hbase,必须先写入内存,然后内存溢写到HDFS,导致Hbase的内存负载和HDFS的磁盘负载过高,影响业务
- 解决
- 写入Hbase方式
- 方式一:构建Put对象,先写内存
- 方式二:BulkLoad,直接将数据变成StoreFile文件,放入Hbase对应的HDFS目录中
- 实现
- step1:先将要写入的数据转换为HFILE文件
- step2:将HFILE文件加载到Hbase的表中【直接将文件放入了Hbase表对应的HDFS目录中】
- 总结
- 应用场景:Hbase提供BulkLoad来实现大数据量不经过内存直接写入Hbase
- 特点
- 优点:不经过内存,降低了内存和磁盘的IO吞吐
- 缺点:性能上相对来说要慢一些,所有数据都不会在内存中被读取
知识点08:BulkLoad的实现
- 目标
- 实现BulkLoad方式加载数据到Hbase的表中
- 分析
- step1:先将要写入的数据转换为HFILE文件
- step2:将HFILE文件加载到Hbase的表中【直接将文件放入了Hbase表对应的HDFS目录中】
- 实现
- 开发代码
- 创建表
create 'mrhbase','info'
- 上传测试文件
hdfs dfs -mkdir -p /bulkload/input
hdfs dfs -put writeHbase.txt /bulkload/input/
- 上传jar包到Linux上
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dQV6Ylwx-1616741489711)(20210326_分布式NoSQL列存储数据库Hbase(九).assets/image-20210326103006352.png)]
- step1:转换为HFILE
yarn jar bulkload.jar bigdata.itcast.cn.hbase.bulkload.TransHfileMR /bulkload/input/ /bulkload/output
- 运行找不到Hbase的jar包,手动申明HADOOP的环境变量即可,只在当前窗口有效
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/export/server/hbase-2.1.0/lib/shaded-clients/hbase-shaded-mapreduce-2.1.0.jar:/export/server/hbase-2.1.0/lib/client-facing-thirdparty/audience-annotations-0.5.0.jar:/export/server/hbase-2.1.0/lib/client-facing-thirdparty/commons-logging-1.2.jar:/export/server/hbase-2.1.0/lib/client-facing-thirdparty/findbugs-annotations-1.3.9-1.jar:/export/server/hbase-2.1.0/lib/client-facing-thirdparty/htrace-core4-4.2.0-incubating.jar:/export/server/hbase-2.1.0/lib/client-facing-thirdparty/log4j-1.2.17.jar:/export/server/hbase-2.1.0/lib/client-facing-thirdparty/slf4j-api-1.7.25.jar
- step2:加载到Hbase表中
yarn jar bulkload.jar bigdata.itcast.cn.hbase.bulkload.BulkLoadToHbase /bulkload/output
- 总结
- step1:先将数据转换为HFILE文件
- step2:将HFILE加载到Hbase表中
知识点09:ImportTSV的使用
- 目标
- 了解ImportTSV工具的功能及使用
- 字面意思:导入tsv格式的数据文件
- tsv:以制表符分隔每一列的文件
- csv:以逗号分隔每一列的文件
- 分析
- importtsv功能:将可以将任何一种结构化的文件导入Hbase的表中,【默认是使用Put方式来导入的】
- 实现
|
免责声明:
1. 本站所有资源来自网络搜集或用户上传,仅作为参考不担保其准确性!
2. 本站内容仅供学习和交流使用,版权归原作者所有!© 查看更多
3. 如有内容侵害到您,请联系我们尽快删除,邮箱:kf@codeae.com
|