HBase

HBase

Hbase宽表设计注意事项

郭小白hsf 回复了问题 5 人关注 5 个回复 3827 次浏览 2019-08-21 10:59 来自相关话题

我读取hbase表的快照时报错

sunbalze 回复了问题 3 人关注 7 个回复 2982 次浏览 2019-03-25 14:03 来自相关话题

如何在Hbase中同一rowkey,同一列族下存储多组数据?

回复

asdsadsa 发起了问题 1 人关注 0 个回复 2964 次浏览 2018-11-16 10:41 来自相关话题

Hbase 启动时 HRegionserver 进程启动失败

KKKK6dn 回复了问题 2 人关注 1 个回复 6980 次浏览 2018-09-04 16:38 来自相关话题

Hbase 2.0+ 不支持hbck -fixMeta等大部分指令,怎么完成数据迁移

蟋蟀哥 回复了问题 2 人关注 1 个回复 3652 次浏览 2018-08-22 20:59 来自相关话题

请教个问题,业务日志即要做实时查询,又要做离线分析,是只存hbase好呢,还是hbase和hdfs都存?

回复

存在 发起了问题 1 人关注 0 个回复 1400 次浏览 2018-03-12 19:49 来自相关话题

phoenix操作hbase,对于key 位数不一致的。

回复

JVMer 发起了问题 1 人关注 0 个回复 1095 次浏览 2018-03-12 19:49 来自相关话题

在phoenix中做查询

回复

JVMer 发起了问题 1 人关注 0 个回复 1347 次浏览 2018-03-12 19:40 来自相关话题

有个问题,实时计算广告位曝光,使用hbase。

回复

JVMer 发起了问题 1 人关注 0 个回复 1192 次浏览 2018-03-12 19:37 来自相关话题

启动hbase master 出现 Error: Could not find or load main class for谁见过

回复

JVMer 发起了问题 1 人关注 0 个回复 1775 次浏览 2018-03-12 19:34 来自相关话题

指定了版本数,后面的插入数据还是会把上一条记录替换掉

回复

岗雍木措 发起了问题 1 人关注 0 个回复 1021 次浏览 2018-03-12 19:29 来自相关话题

hbase org.apache.hadoop.hbase.mapreduce.RowCounter

回复

存在 发起了问题 1 人关注 0 个回复 2108 次浏览 2018-03-12 19:16 来自相关话题

咨询一下啊,怎么在windiws上能编写java程序操作远程linux的hbase表?

比特币 回复了问题 2 人关注 1 个回复 926 次浏览 2018-03-11 16:55 来自相关话题

一个集群的HBase数据复制到另一份个HBase

史晓江 回复了问题 2 人关注 1 个回复 1708 次浏览 2018-02-23 13:37 来自相关话题

我MR导入HFile时错误

史晓江 回复了问题 2 人关注 1 个回复 3093 次浏览 2018-02-23 13:34 来自相关话题

MR程序扫快照表

张晓慧 回复了问题 2 人关注 2 个回复 1940 次浏览 2018-02-23 13:33 来自相关话题

你们怎么做HBase数据的异地灾备的?

史晓江 回复了问题 2 人关注 3 个回复 1997 次浏览 2018-02-23 13:32 来自相关话题

把这些region的目录下的.regioninfo写到meta表中

史晓江 回复了问题 2 人关注 3 个回复 1586 次浏览 2018-02-23 13:29 来自相关话题

region的rowkey不连续

史晓江 回复了问题 2 人关注 1 个回复 1639 次浏览 2018-02-23 13:27 来自相关话题

条新动态, 点击查看
[root@dx2-1 hbase-chinahadoop]# cat pom.xml   4.0.0   com.sogou   hbase-chinahadoop   0.0.1-SNAPSHOT   testbasichbaseapi        ... 显示全部 »
[root@dx2-1 hbase-chinahadoop]# cat pom.xml   4.0.0   com.sogou   hbase-chinahadoop   0.0.1-SNAPSHOT   testbasichbaseapi                                                                      central                                         Central Repository                                         http://repo.maven.apache.org/maven2                                         default                                                                                                 false                                                                                                                                   github-releases                                         http://oss.snoatype.org/content/repositories/github-releases/                                                                                         clojars.org                                         http://clojars.org/repo                                                                                         sogou.repo                                         http://cloud.sogou-inc.com/nexus/content/groups/public                                         Sogou Repository                                                                                         sogou.repo.old                                         http://10.12.8.218:8080/nexus/content/groups/public                                         Old Sogou Repository                                      UTF-8           org.apache.hadoop     hadoop-client     2.6.0     org.apache.hbase     hbase-client     1.0.0                   junit             junit             3.8.1             test        

小米何亮亮——HBase服务化实践,DCon2015文字实录

fish 回复了问题 18 人关注 4 个回复 9890 次浏览 2015-12-08 22:28 来自相关话题

Hbase宽表设计注意事项

回复

郭小白hsf 回复了问题 5 人关注 5 个回复 3827 次浏览 2019-08-21 10:59 来自相关话题

我读取hbase表的快照时报错

回复

sunbalze 回复了问题 3 人关注 7 个回复 2982 次浏览 2019-03-25 14:03 来自相关话题

如何在Hbase中同一rowkey,同一列族下存储多组数据?

回复

asdsadsa 发起了问题 1 人关注 0 个回复 2964 次浏览 2018-11-16 10:41 来自相关话题

Hbase 启动时 HRegionserver 进程启动失败

回复

KKKK6dn 回复了问题 2 人关注 1 个回复 6980 次浏览 2018-09-04 16:38 来自相关话题

Hbase 2.0+ 不支持hbck -fixMeta等大部分指令,怎么完成数据迁移

回复

蟋蟀哥 回复了问题 2 人关注 1 个回复 3652 次浏览 2018-08-22 20:59 来自相关话题

请教个问题,业务日志即要做实时查询,又要做离线分析,是只存hbase好呢,还是hbase和hdfs都存?

回复

存在 发起了问题 1 人关注 0 个回复 1400 次浏览 2018-03-12 19:49 来自相关话题

phoenix操作hbase,对于key 位数不一致的。

回复

JVMer 发起了问题 1 人关注 0 个回复 1095 次浏览 2018-03-12 19:49 来自相关话题

在phoenix中做查询

回复

JVMer 发起了问题 1 人关注 0 个回复 1347 次浏览 2018-03-12 19:40 来自相关话题

有个问题,实时计算广告位曝光,使用hbase。

回复

JVMer 发起了问题 1 人关注 0 个回复 1192 次浏览 2018-03-12 19:37 来自相关话题

启动hbase master 出现 Error: Could not find or load main class for谁见过

回复

JVMer 发起了问题 1 人关注 0 个回复 1775 次浏览 2018-03-12 19:34 来自相关话题

指定了版本数,后面的插入数据还是会把上一条记录替换掉

回复

岗雍木措 发起了问题 1 人关注 0 个回复 1021 次浏览 2018-03-12 19:29 来自相关话题

hbase org.apache.hadoop.hbase.mapreduce.RowCounter

回复

存在 发起了问题 1 人关注 0 个回复 2108 次浏览 2018-03-12 19:16 来自相关话题

咨询一下啊,怎么在windiws上能编写java程序操作远程linux的hbase表?

回复

比特币 回复了问题 2 人关注 1 个回复 926 次浏览 2018-03-11 16:55 来自相关话题

一个集群的HBase数据复制到另一份个HBase

回复

史晓江 回复了问题 2 人关注 1 个回复 1708 次浏览 2018-02-23 13:37 来自相关话题

我MR导入HFile时错误

回复

史晓江 回复了问题 2 人关注 1 个回复 3093 次浏览 2018-02-23 13:34 来自相关话题

MR程序扫快照表

回复

张晓慧 回复了问题 2 人关注 2 个回复 1940 次浏览 2018-02-23 13:33 来自相关话题

你们怎么做HBase数据的异地灾备的?

回复

史晓江 回复了问题 2 人关注 3 个回复 1997 次浏览 2018-02-23 13:32 来自相关话题

把这些region的目录下的.regioninfo写到meta表中

回复

史晓江 回复了问题 2 人关注 3 个回复 1586 次浏览 2018-02-23 13:29 来自相关话题

region的rowkey不连续

回复

史晓江 回复了问题 2 人关注 1 个回复 1639 次浏览 2018-02-23 13:27 来自相关话题

nosql数据库比较

唐半张 发表了文章 0 个评论 1694 次浏览 2015-10-10 10:07 来自相关话题

nosql数据库比较 互联网公司常用的基本集中在以下几种,每种只举一个比较常见或者应用比较成功的例子吧。 1. In-Memory KV Store : Redis in memory key-value store,同时提 ...查看全部
nosql数据库比较
互联网公司常用的基本集中在以下几种,每种只举一个比较常见或者应用比较成功的例子吧。
1. In-Memory KV Store : Redis
in memory key-value store,同时提供了更加丰富的数据结构和运算的能力,成功用法是替代memcached,通过checkpoint和commit log提供了快速的宕机恢复,同时支持replication提供读可扩展和高可用。
2. Disk-Based KV Store: Leveldb
真正基于磁盘的key-value storage, 模型单一简单,数据量不受限于内存大小,数据落盘高可靠,Google的几位大神出品的精品,LSM模型天然写优化,顺序写盘的方式对于新硬件ssd再适合不过了,不足是仅提供了一个库,需要自己封装server端。
3. Document Store: Mongodb
分布式nosql,具备了区别mysql的最大亮点:可扩展性。mongodb 最新引人的莫过于提供了sql接口,是目前nosql里最像mysql的,只是没有ACID的特性,发展很快,支持了索引等特性,上手容易,对于数据量远超内存限制的场景来说,还需要慎重。
4. Column Table Store: HBase
这个富二代似乎不用赘述了,最大的优势是开源,对于普通的scan和基于行的get等基本查询,性能完全不是问题,只是只提供裸的api,易用性上是短板,可扩展性方面是最强的,其次坐上了Hadoop的快车,社区发展很快,各种基于其上的开源产品不少,来解决诸如join、聚集运算等复杂查询

HBase Client使用注意点

唐半张 发表了文章 0 个评论 1767 次浏览 2015-10-09 10:25 来自相关话题

1  HTable线程不安全。    建议使用HTablePool,或者每次new一个HTable出来。     2  HTable和HConnection的关系。    注意HTable对象之间通过Config ...查看全部
1  HTable线程不安全。
   建议使用HTablePool,或者每次new一个HTable出来。
   
2  HTable和HConnection的关系。
   注意HTable对象之间通过Configuration共享HConnection。
   好吧,我偷懒了,实际上是通过HConnectionKey来共享HConnection的。
   因此,相同的Configuration(更精准的说法是连接相关参数相同,参阅HConnectionKey)实际上使用的是同一个HConnection。
   HConnectionKey可以查看源码。
   
3  HTable ResultScanner等等记着close。
   这个没有什么好说的,java世界里面,凡是有close方法的类型,使用结束后,调用close是合情合理的操作。
   
4  HTablePool。
   大部分应用都会使用HTablePool来管理HTable。
   注意这个Pool和一般的Pool有些不一样。
   超出pool maxsize时,一样可以得到HTable的引用,分别在于close时,如果未超出maxsize,htable返回给pool,如果超出maxsize,就close掉了事,和pool没有关系了。

5  HTable的构建性能
   由于HTable和HConnection的关系,因此,无论是自己new Htable出来,还是使用HTablePool,都是第一次得到HConnection时比较耗时,后续相同的Configuration获取HTable时,
   都是很高效的,可以认为是一个快速操作。
   当然凡事有例外(很少见),当HConnectionImplementation的cachedRegionLocations中EMPTY_START_ROW的缓存被冲掉的时候,由于构造HTable,默认会定位一下该row所在的HRegionLocation,
   这个时候构建一个HTable又变慢了。

6  HTablePool的maxSize设置。
   从性能角度来看,由于pool的特性,不会使用maxSize来限制可以获得HTable的数量,因此,maxSize大小无关。
   因此,该问题就变成了,应用中使用HTable的数量对应用有什么影响。
   
7  HTable的数量。   
   从内存角度看,由于HTable中有writeBuffer(不论autoFlush设置为true或者false,autoFlush只是影响flush的时机),默认2M,太多的HTable并发,对内存有一定的压力。
   从线程角度,由于每个HTable中都有ExecutorService pool,因此HTable的数量会影响到应用线程的多少,线程的多少,反过来又影响内存,性能。
   
8  HTable的批量方法
   可以使用批量方法的地方尽量使用批量方法。性能比较好,原因是底层RPC次数少。
   
9  HTable的autoflush
   实时应用建议设置为true(默认值),设置为false时性能较好,但是注意有可能丢数据。
   


HBase ORM framework simplehbase除了以下ORM的功能外,提供了HTablePool的管理功能。
1 可以配置autoflush为false时,定时commit writebuffer的当前cache内容。
2 另外,支持HTable的ExecutorService灵活配置,可以多个HTable使用同一个ExecutorService。


simplehbase功能简介 https://github.com/zhang-xzhi/simplehbase
数据类型映射:java类型和hbase的bytes之间的数据转换。
简单操作封装:封装了hbase的put,get,scan等操作为简单的java操作方式。
hbase query封装:封装了hbase的filter,可以使用sql-like的方式操作hbase。
动态query封装:类似于myibatis,可以使用xml配置动态语句查询hbase。
insert,update支持: 建立在hbase的checkAndPut之上。
hbase多版本支持:提供接口可以对hbase多版本数据进行查询,映射。
HTablePool管理。  

hbase权威指南阅读随手笔记二之过滤器

唐半张 发表了文章 0 个评论 1626 次浏览 2015-10-08 10:18 来自相关话题

base过滤器的比较操作符: LESS   ...查看全部
base过滤器的比较操作符:
LESS  <
LESS_OR_EQUAL <=
EQUAL =
NOT_EQUAL <>
GREATER_OR_EQUAL >=
GREATER >
NO_OP no operation
比较器:
 
BinaryComparator  按字节索引顺序比较指定字节数组,采用Bytes.compareTo(byte[])
BinaryPrefixComparator 跟前面相同,只是比较左端的数据是否相同
NullComparator 判断给定的是否为空
BitComparator 按位比较 a BitwiseOp class 做异或,与,并操作
RegexStringComparator 提供一个正则的比较器,仅支持 EQUAL 和非EQUAL
SubstringComparator 判断提供的子串是否出现在table的value中。
 
Hbase的过滤器分类
 
比较过滤器
 
1、Comparision Filters
     1.1  RowFilter
构造函数
[java] view plaincopy
public RowFilter(org.apache.hadoop.hbase.filter.CompareFilter.CompareOp rowCompareOp, org.apache.hadoop.hbase.filter.WritableByteArrayComparable rowComparator) {}
[size=13]
[/size]
选择比较rowkey来确定选择合适的行信息。
[java] view plaincopy
public class RowFilterExample {

public static void main(String[] args) throws IOException {
Configuration conf = HBaseConfiguration.create();

HBaseHelper helper = HBaseHelper.getHelper(conf);
helper.dropTable("testtable");
helper.createTable("testtable", "colfam1", "colfam2");
System.out.println("Adding rows to table...");
helper.fillTable("testtable", 1, 100, 100, "colfam1", "colfam2");

HTable table = new HTable(conf, "testtable");

// vv RowFilterExample
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col-0"));

Filter filter1 = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, // co RowFilterExample-1-Filter1 Create filter, while specifying the comparison operator and comparator. Here an exact match is needed.
new BinaryComparator(Bytes.toBytes("row-22")));
scan.setFilter(filter1);
ResultScanner scanner1 = table.getScanner(scan);
// ^^ RowFilterExample
System.out.println("Scanning table #1...");
// vv RowFilterExample
for (Result res : scanner1) {
System.out.println(res);
}
scanner1.close();

Filter filter2 = new RowFilter(CompareFilter.CompareOp.EQUAL, // co RowFilterExample-2-Filter2 Another filter, this time using a regular expression to match the row keys.
new RegexStringComparator(".*-.5"));
scan.setFilter(filter2);
ResultScanner scanner2 = table.getScanner(scan);
// ^^ RowFilterExample
System.out.println("Scanning table #2...");
// vv RowFilterExample
for (Result res : scanner2) {
System.out.println(res);
}
scanner2.close();

Filter filter3 = new RowFilter(CompareFilter.CompareOp.EQUAL, // co RowFilterExample-3-Filter3 The third filter uses a substring match approach.
new SubstringComparator("-5"));
scan.setFilter(filter3);
ResultScanner scanner3 = table.getScanner(scan);
// ^^ RowFilterExample
System.out.println("Scanning table #3...");
// vv RowFilterExample
for (Result res : scanner3) {
System.out.println(res);
}
scanner3.close();
// ^^ RowFilterExample
}
}


     1.2  FamilyFilter
构造函数[size=13]
[/size]
[java] view plaincopy
public FamilyFilter(CompareOp familyCompareOp, WritableByteArrayComparable familyComparator) {}
通过和列簇比较得到,返回结果为真的数据,示例:
[java] view plaincopy
public class FamilyFilterExample {

public static void main(String[] args) throws IOException {
Configuration conf = HBaseConfiguration.create();

HBaseHelper helper = HBaseHelper.getHelper(conf);
helper.dropTable("testtable");
helper.createTable("testtable", "colfam1", "colfam2", "colfam3", "colfam4");
System.out.println("Adding rows to table...");
helper.fillTable("testtable", 1, 10, 2, "colfam1", "colfam2", "colfam3", "colfam4");

HTable table = new HTable(conf, "testtable");

// vv FamilyFilterExample
Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, // co FamilyFilterExample-1-Filter Create filter, while specifying the comparison operator and comparator.
new BinaryComparator(Bytes.toBytes("colfam3")));

Scan scan = new Scan();
scan.setFilter(filter1);
ResultScanner scanner = table.getScanner(scan); // co FamilyFilterExample-2-Scan Scan over table while applying the filter.
// ^^ FamilyFilterExample
System.out.println("Scanning table... ");
// vv FamilyFilterExample
for (Result result : scanner) {
System.out.println(result);
}
scanner.close();

Get get1 = new Get(Bytes.toBytes("row-5"));
get1.setFilter(filter1);
Result result1 = table.get(get1); // co FamilyFilterExample-3-Get Get a row while applying the same filter.
System.out.println("Result of get(): " + result1);

Filter filter2 = new FamilyFilter(CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes("colfam3")));
Get get2 = new Get(Bytes.toBytes("row-5")); // co FamilyFilterExample-4-Mismatch Create a filter on one column family while trying to retrieve another.
get2.addFamily(Bytes.toBytes("colfam1"));
get2.setFilter(filter2);
Result result2 = table.get(get2); // co FamilyFilterExample-5-Get2 Get the same row while applying the new filter, this will return "NONE".
System.out.println("Result of get(): " + result2);
// ^^ FamilyFilterExample
}
}
1.3 QualifierFilter构造函数
[java] view plaincopy
public QualifierFilter(CompareOp qualifierCompareOp, WritableByteArrayComparable qualifierComparator) { }
通过和列名比较,返回为真的数据,示例:
[java] view plaincopy
// vv QualifierFilterExample
Filter filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("col-2")));

Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
// ^^ QualifierFilterExample
System.out.println("Scanning table... ");
// vv QualifierFilterExample
for (Result result : scanner) {
System.out.println(result);
}
scanner.close();

Get get = new Get(Bytes.toBytes("row-5"));
get.setFilter(filter);
Result result = table.get(get);
System.out.println("Result of get(): " + result);
     1.4 ValueFilter构造函数
[java] view plaincopy
public ValueFilter(CompareOp valueCompareOp, WritableByteArrayComparable valueComparator) {}

通过和列名比较,返回为真的数据,示例:  
[java] view plaincopy
Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, // co ValueFilterExample-1-Filter Create filter, while specifying the comparison operator and comparator.
new SubstringComparator(".4") );

Scan scan = new Scan();
scan.setFilter(filter); // co ValueFilterExample-2-SetFilter Set filter for the scan.
ResultScanner scanner = table.getScanner(scan);
// ^^ ValueFilterExample
System.out.println("Results of scan:");
// vv ValueFilterExample
for (Result result : scanner) {
for (KeyValue kv : result.raw()) {
System.out.println("KV: " + kv + ", Value: " + // co ValueFilterExample-3-Print1 Print out value to check that filter works.
Bytes.toString(kv.getValue()));
}
}
scanner.close();

Get get = new Get(Bytes.toBytes("row-5"));
get.setFilter(filter); // co ValueFilterExample-4-SetFilter2 Assign same filter to Get instance.
Result result = table.get(get);
// ^^ ValueFilterExample
System.out.println("Result of get: ");
// vv ValueFilterExample
for (KeyValue kv : result.raw()) {
System.out.println("KV: " + kv + ", Value: " +
Bytes.toString(kv.getValue()));
}
     1.5 DependentColumnFilter
该过滤器有两个参数 —— 列族和列修饰。 尝试找到该列所在的每一行,并返回该行具有相同时间戳的全部键值对。如果某一行不包含指定的列,则该行的任何键值对都不返回。
该过滤器还可以有一个可选布尔参数 —— dropDependentColumn. 如果为true, 从属的列不返回。
该过滤器还可以有两个可选参数 —— 一个比较操作符和一个值比较器,用于列族和修饰的进一步检查。如果从属的列找到,其值还必须通过值检查,然后就是时间戳必须考虑。
[java] view plaincopy
package filters;

// cc DependentColumnFilterExample Example using a filter to include only specific column families
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.DependentColumnFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.util.Bytes;
import util.HBaseHelper;

import java.io.IOException;

public class DependentColumnFilterExample {

private static HTable table = null;

// vv DependentColumnFilterExample
private static void filter(boolean drop,
CompareFilter.CompareOp operator,
WritableByteArrayComparable comparator)
throws IOException {
Filter filter;
if (comparator != null) {
filter = new DependentColumnFilter(Bytes.toBytes("colfam1"), // co DependentColumnFilterExample-1-CreateFilter Create the filter with various options.
Bytes.toBytes("col-5"), drop, operator, comparator);
} else {
filter = new DependentColumnFilter(Bytes.toBytes("colfam1"),
Bytes.toBytes("col-5"), drop);

}

Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
// ^^ DependentColumnFilterExample
System.out.println("Results of scan:");
// vv DependentColumnFilterExample
for (Result result : scanner) {
for (KeyValue kv : result.raw()) {
System.out.println("KV: " + kv + ", Value: " +
Bytes.toString(kv.getValue()));
}
}
scanner.close();

Get get = new Get(Bytes.toBytes("row-5"));
get.setFilter(filter);
Result result = table.get(get);
// ^^ DependentColumnFilterExample
System.out.println("Result of get: ");
// vv DependentColumnFilterExample
for (KeyValue kv : result.raw()) {
System.out.println("KV: " + kv + ", Value: " +
Bytes.toString(kv.getValue()));
}
// ^^ DependentColumnFilterExample
System.out.println("");
// vv DependentColumnFilterExample
}

public static void main(String[] args) throws IOException {
// ^^ DependentColumnFilterExample
Configuration conf = HBaseConfiguration.create();

HBaseHelper helper = HBaseHelper.getHelper(conf);
helper.dropTable("testtable");
helper.createTable("testtable", "colfam1", "colfam2");
System.out.println("Adding rows to table...");
helper.fillTable("testtable", 1, 10, 10, true, "colfam1", "colfam2");

table = new HTable(conf, "testtable");

// vv DependentColumnFilterExample
filter(true, CompareFilter.CompareOp.NO_OP, null);
filter(false, CompareFilter.CompareOp.NO_OP, null); // co DependentColumnFilterExample-2-Filter Call filter method with various options.
filter(true, CompareFilter.CompareOp.EQUAL,
new BinaryPrefixComparator(Bytes.toBytes("val-5")));
filter(false, CompareFilter.CompareOp.EQUAL,
new BinaryPrefixComparator(Bytes.toBytes("val-5")));
filter(true, CompareFilter.CompareOp.EQUAL,
new RegexStringComparator(".*\\.5"));
filter(false, CompareFilter.CompareOp.EQUAL,
new RegexStringComparator(".*\\.5"));
}
// ^^ DependentColumnFilterExample}
2、Dedicated Filters
     2.1 SingleColumnValueFilter
选定列簇和某一列,然后与列的value相比,正确的返回全部的row,注意如果某一行不含有该列,同样返回,除非通过filterIfColumnMissing 设置成真。
构造函数
[java] view plaincopy
SingleColumnValueFilter(byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value)   
SingleColumnValueFilter(byte[] family, byte[] qualifier, CompareOp compareOp, WritableByteArrayComparable comparator)

第一个构造函数相当于构建了一个BinaryComparator的实例。其他的跟CompareFilter的参数含义一样。  
该过滤器通过下面两个参数 filterIfMissing,latestVersionOnly
[java] view plaincopy
boolean getFilterIfMissing()
void setFilterIfMissing(boolean filterIfMissing)
boolean getLatestVersionOnly()
void setLatestVersionOnly(boolean latestVersionOnly)
如果 filterIfColumnMissing 标志设为真,如果该行没有指定的列,那么该行的所有列将不发出。缺省值为假。
如果setLatestVersionOnly 标志设为假,将检查此前的版本。缺省值为真。实例如下:
[java] view plaincopy
// vv SingleColumnValueFilterExample
SingleColumnValueFilter filter = new SingleColumnValueFilter(
Bytes.toBytes("colfam1"),
Bytes.toBytes("col-5"),
CompareFilter.CompareOp.NOT_EQUAL,
new SubstringComparator("val-5"));
filter.setFilterIfMissing(true);

Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
// ^^ SingleColumnValueFilterExample
System.out.println("Results of scan:");
// vv SingleColumnValueFilterExample
for (Result result : scanner) {
for (KeyValue kv : result.raw()) {
System.out.println("KV: " + kv + ", Value: " +
Bytes.toString(kv.getValue()));
}
}
scanner.close();

Get get = new Get(Bytes.toBytes("row-6"));
get.setFilter(filter);
Result result = table.get(get);
System.out.println("Result of get: ");
for (KeyValue kv : result.raw()) {
System.out.println("KV: " + kv + ", Value: " +
Bytes.toString(kv.getValue()));
}
     2.2 SingleColumnValueExcludeFilter
该过滤器同上面的过滤器正好相反,如果条件相符,将不会返回该列的内容。
     2.3 PrefixFilter
所有的row的实例匹配prefix的时候返回结果集合
[java] view plaincopy
Filter filter = new PrefixFilter(Bytes.toBytes("row1"));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);


for(Result result: scanner){
for(KeyValue kv: result.raw()) {
System.out.println("KV:" + kv + ", Value:" + Bytes.toString(kv.getValue()));
}
}
scanner.close();


Get get = new Get(Bytes.toBytes("row-5"));
get.setFilter(filter);
Result result = table.get(get);
for(KeyValue kv : result.raw()){
System.out.println("KV:" + kv + ", Value:" + Bytes.toString(kv.getValue()));
}
     2.4 PageFilter
页过滤,通过设置pagesize参数可以返回每一页page的数量。
客户端需要记住上一次访问的row的key值。
[java] view plaincopy
package hbaseTest;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

/**
* Hello world!
*/
public class PageFilterExample {
public static void main(String[] args) throws IOException {
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "QT-H-0038");

String tableName = "testTable";
String cfName = "colfam1";
final byte[] POSTFIX = new byte[] { 0x00 };
HTable table = new HTable(config, tableName);
Filter filter = new PageFilter(15);
byte[] lastRow = null;
int totalRows = 0;
while (true) {
Scan scan = new Scan();
scan.setFilter(filter);
if(lastRow != null){
//注意这里添加了POSTFIX操作,不然死循环了
byte[] startRow = Bytes.add(lastRow,POSTFIX);
scan.setStartRow(startRow);
}
ResultScanner scanner = table.getScanner(scan);
int localRows = 0;
Result result;
while((result = scanner.next()) != null){
System.out.println(localRows++ + ":" + result);
totalRows ++;
lastRow = result.getRow();
}
scanner.close();
if(localRows == 0) break;
}
System.out.println("total rows:" + totalRows);
}

}
因为hbase的row是字典序列排列的,因此上一次的lastrow需要添加额外的0表示新的开始。另外startKey的那一行是包含在scan里面的。
2.5 KeyOnlyFilter
因为一些应用只想获取data数据,而不是真实的val,可以使用这个过滤器。该过滤器通过
[java] view plaincopy
KeyOnlyFilter(boolean lenAsVal) [size=13] [/size]
lenAsVal默认为假,表示不把val的长度作为val。否则 val的长度将作为val输出。
[java] view plaincopy
final byte[] POSTFIX = new byte[] { 0x00 };
HTable table = new HTable(config, tableName);
Filter filter = new KeyOnlyFilter(false);
byte[] lastRow = null;
int totalRows = 0;


Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
for(Result result: scanner){
for(KeyValue kv: result.raw()){
System.out.println(kv + ":" + Bytes.toString(kv.getValue()));
}
}

     2.6 FirstKeyOnlyFilter
在对hbase的表进行扫描的时候,如果指定了FirstKeyOnlyFilter过滤条件则仅仅会返回相同key的第一条kv。
当对hbase中的表进行count,sum操作等集合操作的时候,使用FirstKeyOnlyFilter会带来性能上的提升。
[java] view plaincopy
public class KeyOnlyFilterExample {  
    public static void main(String[] args) throws IOException {  
        Configuration config = HBaseConfiguration.create();  
        config.set("hbase.zookeeper.quorum", "QT-H-0038");  
  
        String tableName = "testTable";  
        String cfName = "colfam1";  
        final byte[] POSTFIX = new byte[] { 0x00 };  
        HTable table = new HTable(config, tableName);  
        Filter filter = new FirstKeyOnlyFilter();  
        byte[] lastRow = null;  
        int totalRows = 0;  
  
        Scan scan = new Scan();  
        scan.setFilter(filter);  
        ResultScanner scanner = table.getScanner(scan);  
        for(Result result: scanner){  
            for(KeyValue kv: result.raw()){  
                System.out.println(kv + ":" + Bytes.toString(kv.getValue()));  
            }  
        }  
    }  
}  
返回的结果是
[html] view plaincopy
row-5/colfam1:qual1/1354673733503/Put/vlen=4:row1  
row1/colfam1:qual1/1354432930568/Put/vlen=4:val1  
row2/colfam1:qual2/1354432930568/Put/vlen=4:val3  

如果注释掉过滤器的返回的结果是:
[html] view plaincopy
row-5/colfam1:qual1/1354673733503/Put/vlen=4:row1  
row1/colfam1:qual1/1354432930568/Put/vlen=4:val1  
row1/colfam1:qual2/1354435819120/Put/vlen=4:val2  
row2/colfam1:qual2/1354432930568/Put/vlen=4:val3  
     2.7 InclusiveStopFilter
因为hbase的scan包含start-row不包含stop-row 如果使用这个过滤器我们可以包含stop-row
[java] view plaincopy
HTable table = new HTable(config, tableName);  
  
Filter filter  = new InclusiveStopFilter(Bytes.toBytes("row1"));  
  
Scan scan = new Scan();  
scan.setFilter(filter);  
scan.setStartRow(Bytes.toBytes("row-5"));  
ResultScanner scanner = table.getScanner(scan);  
  
for(Result result: scanner){  
    System.out.println(result);  
}  

会看到row1包含在结果中了。
 
     2.8 TimestampsFilter
当访问某个Timestamp的新闻的时候,我们需要如下的代码:
[java] view plaincopy
TimestampsFilter(List timestamps)  
接受的参数的list参数,该Filter也可以和scan.setTimeRange混合使用。例如:

 

[java] view plaincopy
// vv TimestampFilterExample  
    List ts = new ArrayList();  
    ts.add(new Long(5));  
    ts.add(new Long(10)); // co TimestampFilterExample-1-AddTS Add timestamps to the list.  
    ts.add(new Long(15));  
    Filter filter = new TimestampsFilter(ts);  
  
    Scan scan1 = new Scan();  
    scan1.setFilter(filter); // co TimestampFilterExample-2-AddFilter Add the filter to an otherwise default Scan instance.  
    ResultScanner scanner1 = table.getScanner(scan1);  
    // ^^ TimestampFilterExample  
    System.out.println("Results of scan #1:");  
    // vv TimestampFilterExample  
    for (Result result : scanner1) {  
      System.out.println(result);  
    }  
    scanner1.close();  
  
    Scan scan2 = new Scan();  
    scan2.setFilter(filter);  
    scan2.setTimeRange(8, 12); // co TimestampFilterExample-3-AddTSRange Also add a time range to verify how it affects the filter  
    ResultScanner scanner2 = table.getScanner(scan2);  
    // ^^ TimestampFilterExample  
    System.out.println("Results of scan #2:");  
    // vv TimestampFilterExample  
    for (Result result : scanner2) {  
      System.out.println(result);  
    }  
    scanner2.close();  
 
 
     2.9  ColumnCountGetFilter
这个在scan时,无用。
     2.10 ColumnPaginationFilter(下来用到的时候在仔细研究下)
 * A filter, based on the ColumnCountGetFilter, takes two arguments: limit and offset.
 * This filter can be used for row-based indexing, where references to other tables are stored across many columns,
 * in order to efficient lookups and paginated results for end users.
 
[java] view plaincopy
Filter filter = new ColumnPaginationFilter(5, 15);  
  
   Scan scan = new Scan();  
   scan.setFilter(filter);  
   ResultScanner scanner = table.getScanner(scan);  
   // ^^ ColumnPaginationFilterExample  
   System.out.println("Results of scan:");  
   // vv ColumnPaginationFilterExample  
   for (Result result : scanner) {  
     System.out.println(result);  
   }  
   scanner.close();  
     2.11 ColumnPrefixFilter
跟prefxiFilter相似,只是改成了Column,实例如下:
[java] view plaincopy
// vv ColumnPaginationFilterExample  
      Filter filter = new ColumnPrefixFilter(Bytes.toBytes("qual2"));  
  
      Scan scan = new Scan();  
      scan.setFilter(filter);  
      ResultScanner scanner = table.getScanner(scan);  
      // ^^ ColumnPaginationFilterExample  
      System.out.println("Results of scan:");  
      // vv ColumnPaginationFilterExample  
      for (Result result : scanner) {  
          System.out.println(result);  
      }  
      scanner.close();  
   
值scan到与列值与前面匹配的数据。例如qual2匹配qual21。
 
     2.12 RandomRowFilter
随即的返回row的数据,构造函数为
[java] view plaincopy
RandomRowFilter(float chance)  
chance取值为0到1.0,如果<0则为空,如果>1则包含所有的行。
3、Decorating Filters
装饰性过滤器
     3.1  SkipFilter
这个过滤器只作用到keyValueFilter上。KeyValueFilter会返回所有满足条件的row及对应的列。
而加上SkipFilter以后。会发现如果某一行的某一列不符合条件,则这一行全部不返回了。
[java] view plaincopy
public static void main(String[] args) throws IOException {  
   Configuration conf = HBaseConfiguration.create();  
  
   HBaseHelper helper = HBaseHelper.getHelper(conf);  
   helper.dropTable("testtable");  
   helper.createTable("testtable", "colfam1");  
   System.out.println("Adding rows to table...");  
   helper.fillTable("testtable", 1, 30, 5, 2, true, true, "colfam1");  
  
   HTable table = new HTable(conf, "testtable");  
  
   // vv SkipFilterExample  
   Filter filter1 = new ValueFilter(CompareFilter.CompareOp.NOT_EQUAL,  
     new BinaryComparator(Bytes.toBytes("val-0")));  
  
   Scan scan = new Scan();  
   scan.setFilter(filter1); // co SkipFilterExample-1-AddFilter1 Only add the ValueFilter to the first scan.  
   ResultScanner scanner1 = table.getScanner(scan);  
   // ^^ SkipFilterExample  
   System.out.println("Results of scan #1:");  
   int n = 0;  
   // vv SkipFilterExample  
   for (Result result : scanner1) {  
     for (KeyValue kv : result.raw()) {  
       System.out.println("KV: " + kv + ", Value: " +  
         Bytes.toString(kv.getValue()));  
       // ^^ SkipFilterExample  
       n++;  
       // vv SkipFilterExample  
     }  
   }  
   scanner1.close();  
  
   Filter filter2 = new SkipFilter(filter1);  
  
   scan.setFilter(filter2); // co SkipFilterExample-2-AddFilter2 Add the decorating skip filter for the second scan.  
   ResultScanner scanner2 = table.getScanner(scan);  
   // ^^ SkipFilterExample  
   System.out.println("Total KeyValue count for scan #1: " + n);  
   n = 0;  
   System.out.println("Results of scan #2:");  
   // vv SkipFilterExample  
   for (Result result : scanner2) {  
     for (KeyValue kv : result.raw()) {  
       System.out.println("KV: " + kv + ", Value: " +  
         Bytes.toString(kv.getValue()));  
       // ^^ SkipFilterExample  
       n++;  
       // vv SkipFilterExample  
     }  
   }  
   scanner2.close();  
   // ^^ SkipFilterExample  
   System.out.println("Total KeyValue count for scan #2: " + n);  
 }  

sqoop mysql\oracle数据导入到hbase 各种异常解决

唐半张 发表了文章 0 个评论 2156 次浏览 2015-10-08 09:55 来自相关话题

使用sqoop 导mysql的数据时 一切顺利很方便   导oracle的时候 问题就来了  --query命令: 使用这个命令的时候 需要注意的是 where后面的参数,$CONDITIONS 这个参数必须加上 而且存在单 ...查看全部
使用sqoop 导mysql的数据时 一切顺利很方便  
导oracle的时候 问题就来了 
--query命令: 使用这个命令的时候 需要注意的是 where后面的参数,$CONDITIONS 这个参数必须加上
而且存在单引号与双引号的区别,如果--query后面使用的是双引号 那么需要在$CONDITIONS加上\ 即 \$CONDITIONS
./sqoop import --connect jdbcracle:thin192.168.8.130:1521:dcshdev --username User_data2 --password yhdtest123qa --query "select * from so_ext t where \$CONDITIONS " -m 4 --hbase-create-table --hbase-table hso --column-family so --hbase-row-key id --split-by id
如果使用--columns指令来指定字段 也出现了问题 
因为在一行中写太多命令是不可能的,--columns 如果字段太多在命令中写也不方便 
所以使用shell脚本要方便的多
那么在脚本中换行使用 \  来换行

有个问题就是 使用--columns 指定的字段大小写要注意 得使用小写。
如果使用大写 导数据不会有异常 会成功的,hbase中表也会创建好,但是scan的时候 你会发现没有数据 这个蛋疼吧
--columns id,order_id,order_code
而且每个字段之间用逗号分隔,但是不能有空格,在sqoop中没有处理空格,如果在这个命令后的参数有空格的话
就不能和oracle表中的字段对应上了 结果虽然没有错误能够显示成功导入多了条数据,但是scan的时候 会是0条数据

关于导mysql和oracle的时候 还有个区别:
导mysql表的数据时 不需要指定太多的命令就可以成功导入,但是oracle就需要很多命令 ,如--split-by 这个切分参数
在导mysql的时候 就不需要 ,但是如果在导oracle的时候 不加上就会出错了 不信你试试

sqoop从关系型数据库导数据到hdfs和hbase

唐半张 发表了文章 0 个评论 2289 次浏览 2015-10-06 11:28 来自相关话题

最近弄了些sqoop导数据方法和命令,关于批量导入和一般导入,在本机器上linux环境运行正常; (一)Oracle导入到hdfs上            sqoop import --connect jdbcracle:thinip: ...查看全部
最近弄了些sqoop导数据方法和命令,关于批量导入和一般导入,在本机器上linux环境运行正常;
(一)Oracle导入到hdfs上
           sqoop import --connect jdbcracle:thinip:端口号:databaseName --username userName --password password --query "sql语句 where \$CONDITIONS" --target-dir 目录名 --split-by 主键 -m 4

(二)Mysql导入到hbase上
    (1)一般导入(新建hbase表)
            sqoop import --connect jdbc:mysql://ip:端口号/databaseName?charset=utf-8 --username userName --password password --table tableName --hbase-table hbaseTableName --column-family 列族 --hbase-row-key HbaseRowKey --split-by 主键 -m 4 --hbase-create-table
    (2)批量导入
            sqoop import --connect jdbc:mysql://ip:端口号/databaseName?charset=utf-8 --username userName --password password --table tableName --hbase-table hbaseTableName --column-family 列族 --hbase-row-key hbaseRowKey --split-by 主键 -m 4  --incremental append  --check-column 主键  --last-value 0
(三)Mysql导入到hdfs上
            sqoop import --connect jdbc:mysql://ip:端口号/databaseName?charset=utf-8 --username userName --password password --table tableName --target-dir 目录名 --split-by 主键 -m 4 --incremental append  --check-column 主键 --last-value 0 

sqoop import --connect jdbc:mysql://ip:端口号/databaseName?charset=utf-8 --username userName --password password --query "SQL语句 where \$CONDITIONS " --target-dir 目录名 --split-by 主键 -m 4 --incremental append  --check-column 主键 --last-value 0 
 

hbase开启lzo压缩

唐半张 发表了文章 0 个评论 1828 次浏览 2015-10-06 11:25 来自相关话题

hbase只支持对gzip的压缩,对lzo压缩支持不好。在io成为系统瓶颈的情况下,一般开启lzo压缩会提高系统的吞吐量。但这需要参考具体的应用场景,即是否值得进行压缩、压缩率是否足够等等。 要hbase支持lzo压缩,参照以下步骤: ...查看全部
hbase只支持对gzip的压缩,对lzo压缩支持不好。在io成为系统瓶颈的情况下,一般开启lzo压缩会提高系统的吞吐量。但这需要参考具体的应用场景,即是否值得进行压缩、压缩率是否足够等等。
要hbase支持lzo压缩,参照以下步骤:
1 首先要让系统支持lzo动态库,安装lzo-2.00以上版本:http://www.oberhumer.com/opensource/lzo/download/
2 默认安装完后应该在系统的/usr/local/lib/找到liblzo*等几个so
3 到http://code.google.com/p/hadoop-gpl-compression/下载lzo相关的native库4 拷贝3中下载的 hadoop-gpl-compression-0.1.0-dev.jar 或 hadoop-gpl-compression-0.1.0.jar 到 hbase/lib以及hadoop/lib 中
5 拷贝3中下载的 lib/native/* 到 hadoop/lib/native 及 hbase/lib/native
6 确保以上4/5中的文件同步到了每台regionserver上
7 在core-site.xml中加上:

Xml代码
  • < span>property<
  • < span>namename<
  • < span>valuevalue<
  • property<
  • < span>property<
  • < span>namename<
  • < span>valuevalue<
  • property<
8 重启dfs及hbase,建表时使用:Java代码
  • create 'mytable', {NAME=<'colfam:', COMPRESSION=<'lzo'}  
此时这个column即为lzo了,往它读写数据会自动进行lzo压缩和解压缩。P.S:1 如果不清楚你的hbase是否配置好了支持lzo,可以执行以下命令检查之,如果不支持的话这个命令会提示你还缺什么文件:Java代码
  • hbase org.apache.hadoop.hbase.util.CompressionTest hdfs://namenode:9000/test_path lzo
2 如果你和我一样使用了CDH3版本的hdfs,那么该版本的hdfs与hadoop-gpl-compression会有冲突,原因是CDH3修改了compression.java,增加了reinit()接口。此时需要重新编译hadoop-gpl-compression工程,修改 src/java/com/hadoop/compression/lzo/LzoCompressor.java,增加以下行:Java代码
  • publicvoid reinit(Configuration conf) {  
  • // do nothing
  • }  



  然后重新编译工程,将生成的jar包替换以上第4步中的那个jar包

hbase flushcommit使用

唐半张 发表了文章 0 个评论 2072 次浏览 2015-10-06 10:11 来自相关话题

hbase数据入库程序不要手动做table.flushcommit(),而是使用hbase自带的memstore->硬盘的写入机制,只需要table.put即可。然后设置table的自动提交功能为false(如果为true的话每次调用table.put的时候都 ...查看全部
hbase数据入库程序不要手动做table.flushcommit(),而是使用hbase自带的memstore->硬盘的写入机制,只需要table.put即可。然后设置table的自动提交功能为false(如果为true的话每次调用table.put的时候都会触发一次操作)。

数据入库时只对自动提交选项设为false,而不对writebuffersize做设置效果会比较好,看了hbase的代码不设置的话默认值是2MB,writebuffersize的设置不能太大,也不能太小,实际测试中2-5MB之间比较合理

入库程序设计要开尽可能多的线程,推荐20-25线程,在多个机器上运行,同时读取本地文件,在并发数量大的情况下入库效率会得到较大提升。

使用HBase Shell 接口的注意事项

唐半张 发表了文章 0 个评论 1870 次浏览 2015-09-30 10:36 来自相关话题

总结培训当天反馈的问题,新手们需要注意几点: 问题1,  HBase(可以理解为不需要建'name'列,hbase自动建立一个用于存储“行标识”的“列”),举例如下: 例一: create 'employees', 'SN' ...查看全部
总结培训当天反馈的问题,新手们需要注意几点:
问题1,  HBase(可以理解为不需要建'name'列,hbase自动建立一个用于存储“行标识”的“列”),举例如下:
例一:
create 'employees', 'SN', 'department', 'address'   这个employees表的结构将为:
row_id     SN    department    address
--------------------------------------------------
共有四列,第一列用于标识行, 这里你可以当做‘name’来用
插入数据: put 'employees', 'HongKong', 'SN:', '20080501' 
注意是put,不是Ruby的puts
对比的情况:
创建表: create 'employees', 'name', 'SN', 'department', 'address'
此时数据为: 除了标识本身外,还有一个name列,下面简单设置为一样的值。
put 'employees', 'HongKong', 'name:', 'HongKong'
例二:
网上流行资料的例子:

一个存储学生成绩的表:

name grad      course:math   course:art
Tom    1                87                    97
Jerry   2            100                  80
这里grad对于表来说是一个列,course对于表来说是一个列族,这个列族由两个列组成:math和art,当然我们可以根据我们的需要在course中建立更多的列族,如computer,physics等相应的列添加入course列族.  建立一个表格 scores 具有两个列族grad 和courese
hbase(main):002:0> create 'scores', 'grade', 'course'
0 row(s) in 4.1610 seconds
分析,请注意,为什么创建的表是没有“name”这一列呢? 其实这里的name列就对应例一的row_id,不用显式创建的。
导入数据为:  put 'scores', 'Tom', 'grade:', '1'     , Tom对应name 
问题2. 参数的警告说明
很多人开始都碰到类似 
hbase(main):034:0> put 'employees', 'HongKong', 'name:', 'Hongkong', 'SN:', '20080501'
ArgumentError: wrong number of arguments (6 for 5) 
hbase(main):033:0> put 'employees', 'Kong', 'name:' 'Kong'
ArgumentError: wrong number of arguments (3 for 4)
这是参数数量不对的说明, 请尤其注意逗号, 空格不能用来分隔参数的。 
以put为例,参数一般为5个, 6个 10个都报错。但为什么又有(3 for 4)呢?  5和4个的时候可以工作呢?  timestamp 是optional的。所以参数多的时候, 按照上限5报警,少的时候按照下限4报警。
put       Put a cell 'value' at specified table/row/column and optionally
           timestamp coordinates.  To put a cell value into table 't1' at
           row 'r1' under column 'c1' marked with the time 'ts1', do:

           hbase> put 't1', 'r1', 'c1', 'value', ts1 
问题3.  插入数据
hbase(main):030:0> put 'employees', 'Tom', 'name:' 'Tom', 'SN:', '20091101', 'department:', 'D&R', 'address:country', 'China', 'address:city', 'Beijing'
ArgumentError: wrong number of arguments (11 for 5)
怎么回事呢?  不要老想着SQL, put插入的Cell数据,  这么多一起来,当然报错咯 
问题4.  删除表必须先停,然后再删: To remove the table, you must first disable it before dropping it
hbase(main):025:0> disable 'test'
09/04/19 06:40:13 INFO client.HBaseAdmin: Disabled test
0 row(s) in 6.0426 seconds
hbase(main):026:0> drop 'test'
09/04/19 06:40:17 INFO client.HBaseAdmin: Deleted test 
问题5.  如何运行脚本文件
${HBASE_HOME}/bin/hbase shell PATH_TO_SCRIPT示例: 
./hbase shell /data/automation/create_import.hbase
--------------------------------------------------------------------------------------------
disable 'employees'
drop 'employees'

create 'employees', 'SN', 'department', 'address'
put 'employees', 'HongKong', 'SN:', '20080501189'
put 'employees', 'HongKong', 'department:', 'R&D'
put 'employees', 'HongKong', 'address:country', 'China'
put 'employees', 'HongKong', 'address:city', 'Beijing'
put 'employees', 'Cudynia', 'SN:', '20010807368'
put 'employees', 'Cudynia', 'department:', 'HR'
put 'employees', 'Cudynia', 'address:country', 'US'
put 'employees', 'Cudynia', 'address:city', 'San Francisco'

exit

解读Hadoop Hbase适合存储哪类数据

唐半张 发表了文章 0 个评论 1487 次浏览 2015-09-30 10:15 来自相关话题

最适合使用Hbase存储的数据是非常稀疏的数据(非结构化或者半结构化的数据)。Hbase之所以擅长存储这类数据,是因为Hbase是column-oriented列导向的存储机制,而我们熟知的RDBMS都是row- oriented行导向的存储机制(郁闷的是我看 ...查看全部
最适合使用Hbase存储的数据是非常稀疏的数据(非结构化或者半结构化的数据)。Hbase之所以擅长存储这类数据,是因为Hbase是column-oriented列导向的存储机制,而我们熟知的RDBMS都是row- oriented行导向的存储机制(郁闷的是我看过N本关于关系数据库的介绍从来没有提到过row- oriented行导向存储这个概念)。在列导向的存储机制下对于Null值得存储是不占用任何空间的。比如,如果某个表 UserTable有10列,但在存储时只有一列有数据,那么其他空值的9列是不占用存储空间的(普通的数据库MySql是如何占用存储空间的呢?)。
Hbase适合存储非结构化的稀疏数据的另一原因是他对列集合 column families 处理机制。 打个比方,ruby和python这样的动态语言和c++、java类的编译语言有什么不同? 对于我来说,最显然的不同就是你不需要为变量预先指定一个类型。Ok ,现在Hbase为未来的DBA也带来了这个激动人心的特性,你只需要告诉你的数据存储到Hbase的那个column families 就可以了,不需要指定它的具体类型:char,varchar,int,tinyint,text等等。
Hbase还有很多特性,比如不支持join查询,但你存储时可以用:parent-child tuple 的方式来变相解决。

Hadoop Hive与Hbase整合

唐半张 发表了文章 0 个评论 1821 次浏览 2015-09-29 11:08 来自相关话题

用hbase做数据库,但由于hbase没有类sql查询方式,所以操作和计算数据非常不方便,于是整合hive,让hive支撑在hbase数据库层面 的 hql查询.hive也即 做数据仓库  1. 基于Hadoop+Hive架构对海量数 ...查看全部
用hbase做数据库,但由于hbase没有类sql查询方式,所以操作和计算数据非常不方便,于是整合hive,让hive支撑在hbase数据库层面 的 hql查询.hive也即 做数据仓库 

1. 基于Hadoop+Hive架构对海量数据进行查询:http://blog.csdn.net/kunshan_shenbin/article/details/7105319
2. HBase 0.90.5 + Hadoop 1.0.0 集成:http://blog.csdn.net/kunshan_shenbin/article/details/7209990 
本文的目的是要讲述如何让Hbase和Hive能互相访问,让Hadoop/Hbase/Hive协同工作,合为一体。 
本文测试步骤主要参考自:http://running.iteye.com/blog/898399 
当然,这边博文也是按照官网的步骤来的:http://wiki.apache.org/hadoop/Hive/HBaseIntegration 
1. 拷贝hbase-0.90.5.jar和zookeeper-3.3.2.jar到hive/lib下。 
注意:如何hive/lib下已经存在这两个文件的其他版本(例如zookeeper-3.3.1.jar),建议删除后使用hbase下的相关版本。 
2. 修改hive/conf下hive-site.xml文件,在底部添加如下内容: 
[html] view plaincopy



hive.querylog.location
/usr/local/hive/logs



hive.aux.jars.path
file:///usr/local/hive/lib/hive-hbase-handler-0.8.0.jar,file:///usr/local/hive/lib/hbase-0.90.5.jar,file:///usr/local/hive/lib/zookeeper-3.3.2.jar

注意:如果hive-site.xml不存在则自行创建,或者把hive-default.xml.template文件改名后使用。 
具体请参见:http://blog.csdn.net/kunshan_shenbin/article/details/7210020 

3. 拷贝hbase-0.90.5.jar到所有hadoop节点(包括master)的hadoop/lib下。 
4. 拷贝hbase/conf下的hbase-site.xml文件到所有hadoop节点(包括master)的hadoop/conf下。 
注意,hbase-site.xml文件配置信息参照:http://blog.csdn.net/kunshan_shenbin/article/details/7209990 
注意,如果3,4两步跳过的话,运行hive时很可能出现如下错误: 
[html] view plaincopy
org.apache.hadoop.hbase.ZooKeeperConnectionException: HBase is able to connect to ZooKeeper but the connection closes immediately.
This could be a sign that the server has too many connections (30is thedefault). Consider inspecting your ZK server logsforthat error and
then make sure you are reusing HBaseConfiguration as often as you can. See HTable's javadocformore information. at org.apache.hadoop.
hbase.zookeeper.ZooKeeperWatcher.
参考:http://blog.sina.com.cn/s/blog_410d18710100vlbq.html 

现在可以尝试启动Hive了。 
单节点启动: > bin/hive -hiveconf hbase.master=master:60000
集群启动: > bin/hive -hiveconf hbase.zookeeper.quorum=slave
如何hive-site.xml文件中没有配置hive.aux.jars.path,则可以按照如下方式启动。 
> bin/hive --auxpath /usr/local/hive/lib/hive-hbase-handler-0.8.0.jar, /usr/local/hive/lib/hbase-0.90.5.jar, /usr/local/hive/lib/zookeeper-3.3.2.jar -hiveconf hbase.zookeeper.quorum=slave
接下来可以做一些测试了。 
1.创建hbase识别的数据库: 
[sql] view plaincopy 
CREATE TABLE hbase_table_1(key int, value string)  
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'  
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val")  
TBLPROPERTIES ("hbase.table.name" = "xyz");  
hbase.table.name 定义在hbase的table名称 
hbase.columns.mapping 定义在hbase的列族 
2.使用sql导入数据 
a) 新建hive的数据表 
[sql] view plaincopy 
hive> CREATE TABLE pokes (foo INT, bar STRING);  
b) 批量插入数据 
[sql] view plaincopy 
hive> LOAD DATA LOCAL INPATH'./examples/files/kv1.txt'OVERWRITE INTO TABLE
pokes;  
c) 使用sql导入hbase_table_1 
[sql] view plaincopy 
hive> INSERT OVERWRITE TABLE hbase_table_1 SELECT * FROM pokes WHERE foo=86;  
3. 查看数据 
[sql] view plaincopy 
hive> select * from  hbase_table_1;  
这时可以登录Hbase去查看数据了. 
> /usr/local/hbase/bin/hbase shell 
hbase(main):001:0> describe 'xyz'   
hbase(main):002:0> scan 'xyz'   
hbase(main):003:0> put 'xyz','100','cf1:val','www.360buy.com' 
这时在Hive中可以看到刚才在Hbase中插入的数据了。 
hive> select * from hbase_table_1 
4. hive访问已经存在的hbase 
使用CREATE EXTERNAL TABLE 
[sql] view plaincopy 
CREATE EXTERNAL TABLE hbase_table_2(key int, value string)  
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'  
WITH SERDEPROPERTIES ("hbase.columns.mapping" = "cf1:val")  
TBLPROPERTIES("hbase.table.name" = "some_existing_table");  
多列和多列族(Multiple Columns and Families) 
1.创建数据库 
Java代码  
CREATE TABLE hbase_table_2(key int, value1 string, value2 int, value3 int)   
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'  
WITH SERDEPROPERTIES (  
"hbase.columns.mapping" = ":key,a:b,a:c,d:e"  
);  

2.插入数据 
Java代码  
INSERT OVERWRITE TABLE hbase_table_2 SELECT foo, bar, foo+1, foo+2   
FROM pokes WHERE foo=98 OR foo=100;  


这个有3个hive的列(value1和value2,value3),2个hbase的列族(a,d) 
Hive的2列(value1和value2)对应1个hbase的列族(a,在hbase的列名称b,c),hive的另外1列(value3)对应列(e)位于列族(d) 

3.登录hbase查看结构 
Java代码  
hbase(main):003:0> describe"hbase_table_2" 
DESCRIPTION ENABLED
{NAME =>'hbase_table_2', FAMILIES => [{NAME =>'a', COMPRESSION => 'Ntrue
ONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_M
EMORY =>'false', BLOCKCACHE =>'true'}, {NAME =>'d', COMPRESSION =>
'NONE', VERSIONS =>'3', TTL =>'2147483647', BLOCKSIZE =>'65536', IN
_MEMORY =>'false', BLOCKCACHE =>'true'}]}
1row(s) in1.0630seconds
4.查看hbase的数据 
Java代码  
hbase(main):004:0> scan'hbase_table_2' 
ROW COLUMN+CELL
100 column=a:b, timestamp=1297695262015, value=val_100
100 column=a:c, timestamp=1297695262015, value=101
100 column=d:e, timestamp=1297695262015, value=102
98 column=a:b, timestamp=1297695242675, value=val_98
98 column=a:c, timestamp=1297695242675, value=99
98 column=d:e, timestamp=1297695242675, value=100
2row(s) in0.0380seconds
在hive中查看 
Java代码  
hive> select * from hbase_table_2; 
OK
100 val_100101 102
98 val_98 99 100
Time taken:3.238seconds

Hfile存储结构-转载

唐半张 发表了文章 0 个评论 2142 次浏览 2015-09-29 10:15 来自相关话题

HBase中的所有数据文件都存储在Hadoop HDFS文件系统上,主要包括两种文件类型: 1. HFile, HBase中KeyValue数据的存储格式,HFile是Hadoop的二进制格式文件,实际上StoreFile就是对HFile做了轻量级 ...查看全部
HBase中的所有数据文件都存储在Hadoop HDFS文件系统上,主要包括两种文件类型:
1. HFile, HBase中KeyValue数据的存储格式,HFile是Hadoop的二进制格式文件,实际上StoreFile就是对HFile做了轻量级包装,即StoreFile底层就是HFile
2. HLog File,HBase中WAL(Write Ahead Log) 的存储格式,物理上是Hadoop的Sequence File
下面主要通过代码理解一下HFile的存储格式。
HFile下图是HFile的存储格式:
开始是两个固定长度的数值,分别表示Key的长度和Value的长度。紧接着是Key,开始是固定长度的数值,表示RowKey的长度,紧接着是 RowKey,然后是固定长度的数值,表示Family的长度,然后是Family,接着是Qualifier,然后是两个固定长度的数值,表示Time Stamp和Key Type(Put/Delete)。Value部分没有这么复杂的结构,就是纯粹的二进制数据了。
Data Block:由DATABLOCKMAGIC和若干个record组成,其中record就是一个KeyValue(key length, value length, key, value),默认大小是64k,小的数据块有利于随机读操作,而大的数据块则有利于scan操作,这是因为读KeyValue的时候,HBase会将查询到的data block全部读到Lru Block Cache中去,而不是仅仅将这个record读到cache中去。
private void append(final byte [] key, final int koffset, final int klength, final byte [] value, final int voffset, final int vlength) throws IOException {
this.out.writeInt(klength);
this.keylength += klength;
this.out.writeInt(vlength);
this.valuelength += vlength;
this.out.write(key, koffset, klength);
this.out.write(value, voffset, vlength);
}
Meta Block:由METABLOCKMAGIC和Bloom Filter信息组成。
public void close() throws IOException {
if (metaNames.size() > 0) {
for (int i = 0 ; i < metaNames.size() ; ++ i ) {
dos.write(METABLOCKMAGIC);
metaData.get(i).write(dos);
}
}
}
File Info: 由MapSize和若干个key/value,这里保存的是HFile的一些基本信息,如hfile.LASTKEY, hfile.AVG_KEY_LEN, hfile.AVG_VALUE_LEN, hfile.COMPARATOR。
private long writeFileInfo(FSDataOutputStream o) throws IOException {
if (this.lastKeyBuffer != null) {
// Make a copy.  The copy is stuffed into HMapWritable.  Needs a clean
// byte buffer.  Won’t take a tuple.
byte [] b = new byte[this.lastKeyLength];
System.arraycopy(this.lastKeyBuffer, this.lastKeyOffset, b, 0, this.lastKeyLength);
appendFileInfo(this.fileinfo, FileInfo.LASTKEY, b, false);
}
int avgKeyLen = this.entryCount == 0? 0: (int)(this.keylength/this.entryCount);
appendFileInfo(this.fileinfo, FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
int avgValueLen = this.entryCount == 0? 0: (int)(this.valuelength/this.entryCount);
appendFileInfo(this.fileinfo, FileInfo.AVG_VALUE_LEN,
Bytes.toBytes(avgValueLen), false);
appendFileInfo(this.fileinfo, FileInfo.COMPARATOR, Bytes.toBytes(this.comparator.getClass().getName()), false);
long pos = o.getPos();
this.fileinfo.write(o);
return pos;
}
Data/Meta Block Index: 由INDEXBLOCKMAGIC和若干个record组成,而每一个record由3个部分组成 — block的起始位置,block的大小,block中的第一个key。
static long writeIndex(final FSDataOutputStream o, final List keys, final List offsets, final List sizes) throws IOException {
long pos = o.getPos();
// Don’t write an index if nothing in the index.
if (keys.size() > 0) {
o.write(INDEXBLOCKMAGIC);
// Write the index.
for (int i = 0; i < keys.size(); ++i) {
o.writeLong(offsets.get(i).longValue());
o.writeInt(sizes.get(i).intValue());
byte [] key = keys.get(i);
Bytes.writeByteArray(o, key);
}
}
return pos;
}
Fixed file trailer: 大小固定,主要是可以根据它查找到File Info, Block Index的起始位置。
public void close() throws IOException {
trailer.fileinfoOffset = writeFileInfo(this.outputStream);
trailer.dataIndexOffset = BlockIndex.writeIndex(this.outputStream,
this.blockKeys, this.blockOffsets, this.blockDataSizes);
if (metaNames.size() > 0) {
trailer.metaIndexOffset = BlockIndex.writeIndex(this.outputStream,
this.metaNames, metaOffsets, metaDataSizes);
}
trailer.dataIndexCount = blockKeys.size();
trailer.metaIndexCount = metaNames.size();
trailer.totalUncompressedBytes = totalBytes;
trailer.entryCount = entryCount;
trailer.compressionCodec = this.compressAlgo.ordinal();
trailer.serialize(outputStream);
}
注:上面的代码剪切自HFile.java中的代码,更多信息可以查看Hbase源代码。
参考:
http://www.searchtb.com/2011/01/understanding-hbase.html
http://th30z.blogspot.com/2011/02/hbase-io-hfile.html
 

hadoop2.6.0对应的hbase版本

唐半张 发表了文章 0 个评论 3846 次浏览 2015-09-25 10:46 来自相关话题

adoop2.6.0对应的hbase版本是      我们安装了hadoop,想学习habse,但是hbase的版本选择不正确,会造成后面不能进行下去。 那么hbase和hadoop、hive版本是如何对应的呢? 下面给大家列出来: ...查看全部
adoop2.6.0对应的hbase版本是      我们安装了hadoop,想学习habse,但是hbase的版本选择不正确,会造成后面不能进行下去。
那么hbase和hadoop、hive版本是如何对应的呢?
下面给大家列出来:
hadoop与HBase版本对应关系:
Hbase          Hadoop
0.92.0   1.0.0
0.92.1   1.0.0
0.92.2   1.0.3
0.94.0   1.0.2
0.94.1   1.0.3
0.94.2   1.0.3
0.94.3   1.0.4
0.94.4   1.0.4
0.94.5   1.0.4
0.94.9   1.2.0
0.95.0   1.2.0

hadoop1.2+hbase0.95.0+hive0.11.0 会产生hbase+hive的不兼容,创建hive+hbase的关联表就会报pair对异常。
hadoop1.2+hbase0.94.9+hive0.10.0 没问题,解决了上个版本的不兼容问题。

hadoop zookeeper hbase部署

夕阳丶一抹红颜 发表了文章 0 个评论 2153 次浏览 2015-09-22 11:33 来自相关话题

hadoop zookeeper hbase部署 一 机器 192.168.0.203 hd203: hadoop namenode & hbase HMaster 192.168.0.204 hd204: hadoop d ...查看全部
hadoop zookeeper hbase部署
一 机器
192.168.0.203 hd203: hadoop namenode & hbase HMaster
192.168.0.204 hd204: hadoop datanode & hbase HRegionServer & zookeeper
192.168.0.205 hd205: hadoop datanode & hbase HRegionServer & zookeeper
192.168.0.206 hd206: hadoop datanode & hbase HRegionServer & zookeeper
192.168.0.202 hd202: hadoop second namenode & hbase HMaster
共 5 台机器 (生产环境需要把zookeeper单独安装)

二 操作系统和软件版本
1 操作系统:CentOS release 5.6 (Final)x64,关闭防火墙
2 相关软件:
jdk-6u24-linux-x64.bin
hadoop-0.20.2-cdh3u0.tar.gz
hbase-0.90.1-cdh3u0.tar.gz
zookeeper-3.3.3-cdh3u0.tar.gz

三 安装

1 安装jdk (所有机器)
mkdir /usr/java
mv jdk-6u24-linux-x64.bin /usr/java
chmod 744 jdk-6u24-linux-x64.bin
./jdk-6u24-linux-x64.bin
ln -s jdk1.6.0_24 default

2 建立集群平台用户 (所有机器)
useradd cbcloud
passwd cbcloud 密码设为123456


3 编辑/etc/hosts (所有机器)
192.168.0.203 hd203
192.168.0.204 hd204
192.168.0.205 hd205
192.168.0.206 hd206
192.168.0.202 hd202

4 hd203登录集群所有机器免密码设置 (hd203)
su - cbcloud
ssh登录本机也要免密码 如下设置

 $ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
 $ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

将hd203上的id_dsa.pub 文件追加到所有机器的cbcloud用户的authorized_keys文件内
细节不在详述
ssh 所有机器 第一次需要确认一下

5 安装hadoop
5.1 建立目录 (所有机器)
mkdir /home/cbcloud/hdtmp
mkdir /home/cbcloud/hddata
mkdir /home/cbcloud/hdconf
chown -R cbcloud:cbcloud 以上目录
tar zxvf hadoop-0.20.2-cdh3u0.tar.gz -C /home/cbcloud
cd /home/cbcloud
mv hadoop-0.20.2-cdh3u0 hadoop
chown -R cbcloud:cbcloud hadoop/


5.2 配置环境变量 vi /etc/profile (所有机器)
export JAVA_HOME=/usr/java/default
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
export PATH=$JAVA_HOME/bin:$JAVA_HOME/lib:$JAVA_HOME/jre/bin:$PATH:$HOME/bin
export HADOOP_HOME=/home/cbcloud/hadoop
export HADOOP_CONF_DIR=/home/cbcloud/hdconf
export PATH=$PATH:$HADOOP_HOME/bin

把hadoop的配置文件目录从源目录拿出来,方便以后升级hadoop
mv hadoop的conf目录内文件到/home/cbcloud/hdconf内

5.3 编辑 hadoop 配置文件 core-site.xml
加入


fs.default.name
hdfs://hd203:9000


  fs.checkpoint.dir
  /home/cbcloud/hdtmp/dfs/namesecondary
  Determines where on the local filesystem the DFS secondary
      name node should store the temporary images to merge.
      If this is a comma-delimited list of directories then the image is
      replicated in all of the directories for redundancy.
 



  fs.checkpoint.period
  60
  Determines where on the local filesystem the DFS secondary
      name node should store the temporary images to merge.
      If this is a comma-delimited list of directories then the image is
      replicated in all of the directories for redundancy.
 



5.4 编辑hdfs-site.xml
加入

       dfs.replication
       3
   



       dfs.data.dir
       /home/cbcloud/hddata
   


   
       hadoop.tmp.dir
       /home/cbcloud/hdtmp/
   



    dfs.balance.bandwidthPerSec
    10485760


    dfs.hosts.exclude
    /home/cbcloud/hdconf/excludes
    true


5.5 编辑mapred-site.xml
加入

        mapred.job.tracker
        hd203:9001
   


5.6 编辑 hadoop-env.sh
export JAVA_HOME=/usr/java/default

5.7  编辑masters 该文件指定 secondary name 机器,
加入
hd202
编辑 slaves
加入
hd204
hd205
hd206

5.8 拷贝hd203的hadoop和hdconf到所有机器
# scp -r /home/cbcloud/hadoop cbcloud@hd204:/home/cbcloud
# scp -r /home/cbcloud/hdconf cbcloud@hd204:/home/cbcloud

完成后,在hd203 格式化hadoop文件系统 
执行
 hadoop namenode -format
启动
 start-all.sh 
查看集群内datanode的机器 
执行jps
5764 Jps
18142 DataNode
18290 TaskTracker
看到以上结果 说明启动正确
web方式
http://hd203:50070/dfshealth.jsp 
注意 本地PC hosts文件也要配置
192.168.0.203 hd203
192.168.0.204 hd204
192.168.0.205 hd205
192.168.0.206 hd206
192.168.0.202 hd202
web方式可以查看集群状态和job状态等,至此hadoop安装完毕

6 安装zookeeper (hd203)
tar zxvf zookeeper-3.3.3-cdh3u0.tar.gz -C /home/cbcloud
在hd204-hd206上
mkdir /home/cbcloud/zookeeperdata

chown -R cbcloud:cbcloud /home/cbcloud/zookeeperdata
chown -R cbcloud:cbcloud /home/cbcloud/zookeeper-3.3.3-cdh3u0

编辑 /home/cbcloud/zookeeper-3.3.3-cdh3u0/conf/zoo.cfg 

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/home/cbcloud/zookeeperdata
# the port at which the clients will connect
clientPort=2181
server.1=hd204:2888:3888
server.2=hd205:2888:3888
server.3=hd206:2888:3888

scp hd203的zookeeper到hd204,hd205,hd206
# scp -r /home/cbcloud/zookeeper-3.3.3-cdh3u0/ cbcloud@hd205:/home/cbcloud/ 
在hd204-206 的/home/cbcloud/zookeeperdata目录touch myid文件,
内容分别为1,2,3 和server编号一致 chown   cbcloud:cbcloud myid

启动zookeeper,在hd204-206上bin目录下 执行
# zkServer.sh start
启动后 通过
# zkServer.sh status
查看状态 注意 在centos5.6上 执行这个会报错
Error contacting service. It is probably not running.
通过查看脚本 是因为
echo stat | nc -q 1 localhost
nc版本不同,没有-q的参数,更改脚本去掉-q 1 即可
另外 可以通过
echo stat | nc   localhost 2181来查看状态

7 安装hbase
7.1 建立目录 (所有机器)
mkdir /home/cbcloud/hbconf
chown -R cbcloud:cbcloud /home/cbcloud/hbconf
tar zxvf hbase-0.90.1-cdh3u0.tar.gz -C /home/cbcloud
cd /home/cbcloud
mv hbase-0.90.1-cdh3u0  hbase
chown -R cbcloud:cbcloud hbase/

7.2 配置环境变量 
vi /etc/profile (所有机器) 追加如下内容
export HBASE_CONF_DIR=/home/cbcloud/hbconf
export HBASE_HOME=/home/hadoop/hbase

把hbase的配置文件目录从源目录拿出来,方便以后升级hbase
mv hbase的conf目录内文件到/home/cbcloud/hbconf内

7.3 编辑 hbase-env.sh
export HBASE_OPTS="$HBASE_OPTS -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:+CMSIncrementalMode"
export JAVA_HOME=/usr/java/default
export HBASE_MANAGES_ZK=false
export HBASE_HOME=/home/cbcloud/hbase
export HADOOP_HOME=/home/cbcloud/hadoop

7.4 编辑hbase-site.xml
加入

 hbase.rootdir
 hdfs://hd203:9000/hbase


 hbase.cluster.distributed
 true


hbase.master
hd203:60000

 
 hbase.master.port
 60000
 The port master should bind to.
 

 
 
   hbase.zookeeper.quorum
   hd204,hd205,hd206
 


7.5 编辑regionservers
加入
hd204
hd205
hd206

scp hd203 的hbase到hd204-206,202
# scp -r /home/cbcloud/hbase/ cbcloud@hd204:/home/cbcloud
# scp -r /home/cbcloud/hbconf/ cbcloud@hd204:/home/cbcloud

su - cbcloud
启动hbase
在hd203上hbase的bin目录执行
$ ./start-hbase.sh
启动hbase的第二个HMaster
在202上执行
$ ./hbase-daemon.sh start master

查看Master:http://hd203:60010/master.jsp 

此时查看datanode 服务器 jps
5764 Jps
32091 HRegionServer
18142 DataNode
11221 QuorumPeerMain 
18290 TaskTracker
以上结果说明启动正常

至此 hadoop+zookeeper+hbase安装完成
启动顺序
1.hadoop
2.zookeeper
3.hbase
4.第二个HMaster

停止顺序
1.第二个HMaster, kill-9删除
2.hbase
3.zookeeper
4.hadoop
注意 一定要按顺序停止,
如果先停zookeeper再停hbase的话,基本停不下来(自己测试结果)

学习Hadoop2.x下安装HBase的教程

夕阳丶一抹红颜 发表了文章 0 个评论 1690 次浏览 2015-09-22 09:15 来自相关话题

学习Hadoop已经不短时间了,今天我们来学习Hadoop2.x下安装HBase,我们所使用的环境:CentOS6.5  Hadoop2.5.2  HBase1.0.0,大家如果和使用的环境相符,那么,详细教程如下: ...查看全部
学习Hadoop已经不短时间了,今天我们来学习Hadoop2.x下安装HBase,我们所使用的环境:CentOS6.5  Hadoop2.5.2  HBase1.0.0,大家如果和使用的环境相符,那么,详细教程如下:

1.安装好 hadoop 集群,并启动
[grid@hadoop4 ~]$ sh hadoop-2.5.2/sbin/start-dfs.sh
[grid@hadoop4 ~]$ sh hadoop-2.5.2/sbin/start-yarn.sh
查看 hadoop 版本:
[grid@hadoop4 ~]$ hadoop-2.5.2/bin/hadoop version
Hadoop 2.5.2
Subversion Unknown -r Unknown
Compiled by grid on 2014-12-31T01:40Z
Compiled with protoc 2.5.0
From source with checksum df7537a4faa4658983d397abf4514320
This command was run using /home/grid/hadoop-2.5.2/share/hadoop/common/hadoop-common-2.5.2.jar

2.查看 hbase 官方文档(http://hbase.apache.org/book.html#basic.prerequisites)
找到与 hadoop 版本对应的 hbase 并下载
[grid@hadoop4 ~]$ wget http://mirrors.cnnic.cn/apache/hbase/hbase-1.0.0/hbase-1.0.0-bin.tar.gz

3.解压
[grid@hadoop4 ~]$ tar -zxf hbase-1.0.0-bin.tar.gz

4.进入 hbase 的 lib 目录,查看 hadoop jar 包的版本
[grid@hadoop4 ~]$ cd hbase-1.0.0/lib/
[grid@hadoop4 lib]$ find -name 'hadoop*jar'
./hadoop-common-2.5.1.jar
./hadoop-mapreduce-client-common-2.5.1.jar
./hadoop-annotations-2.5.1.jar
./hadoop-yarn-server-common-2.5.1.jar
./hadoop-hdfs-2.5.1.jar
./hadoop-client-2.5.1.jar
./hadoop-mapreduce-client-shuffle-2.5.1.jar
./hadoop-yarn-common-2.5.1.jar
./hadoop-yarn-server-nodemanager-2.5.1.jar
./hadoop-yarn-client-2.5.1.jar
./hadoop-mapreduce-client-core-2.5.1.jar
./hadoop-auth-2.5.1.jar
./hadoop-mapreduce-client-app-2.5.1.jar
./hadoop-yarn-api-2.5.1.jar
./hadoop-mapreduce-client-jobclient-2.5.1.jar
发现与 hadoop 集群的版本号不一致,需要用 hadoop 目录下的 jar 替换 hbase/lib 目录下的 jar 文件。
编写脚本来完成替换,如下所示:
 
[grid@hadoop4 lib]$ pwd
/home/grid/hbase-1.0.0/lib
[grid@hadoop4 lib]$ vim f.sh
find -name "hadoop*jar" | sed 's/2.5.1/2.5.2/g' | sed 's/\.\///g' > f.log
rm ./hadoop*jar
cat ./f.log | while read Line
do
find /home/grid/hadoop-2.5.2 -name "$Line" | xargs -i cp {} ./
done
rm ./f.log
[grid@hadoop4 lib]$ chmod u+x f.sh
[grid@hadoop4 lib]$ ./f.sh
[grid@hadoop4 lib]$ find -name 'hadoop*jar'
./hadoop-yarn-api-2.5.2.jar
./hadoop-mapreduce-client-app-2.5.2.jar
./hadoop-common-2.5.2.jar
./hadoop-mapreduce-client-jobclient-2.5.2.jar
./hadoop-mapreduce-client-core-2.5.2.jar
./hadoop-yarn-server-nodemanager-2.5.2.jar
./hadoop-hdfs-2.5.2.jar
./hadoop-yarn-common-2.5.2.jar
./hadoop-mapreduce-client-shuffle-2.5.2.jar
./hadoop-auth-2.5.2.jar
./hadoop-mapreduce-client-common-2.5.2.jar
./hadoop-yarn-client-2.5.2.jar
./hadoop-annotations-2.5.2.jar
./hadoop-yarn-server-common-2.5.2.jar
OK,jar 包替换成功;hbase/lib 目录下还有个 slf4j-log4j12-XXX.jar,在机器有装hadoop时,由于classpath中会有hadoop中的这个jar包,会有冲突,直接删除掉[grid@hadoop4 lib]$ rm `find -name 'slf4j-log4j12-*jar'`

5.修改配置文件
5.1.
[grid@hadoop4 hbase-1.0.0]$ vi conf/hbase-env.sh
export JAVA_HOME=/usr/java/jdk1.7.0_72
export HBASE_CLASSPATH=/home/grid/hadoop-2.5.2/etc/hadoop
export HBASE_MANAGES_ZK=true
第一个参数指定了JDK路径;第二个参数指定了 hadoop 的配置文件路径;第三个参数设置使用 hbase 默认自带的 Zookeeper

5.2.
[grid@hadoop4 hbase-1.0.0]$ vim conf/hbase-site.xml

hbase.rootdir
hdfs://hadoop4:9000/hbase


hbase.cluster.distributed
true


hbase.tmp.dir
/home/grid/hbase-1.0.0/tmp


hbase.zookeeper.quorum
hadoop4,hadoop5,hadoop6


hbase.zookeeper.property.dataDir
/home/grid/hbase-1.0.0/zookeeper

创建目录
[grid@hadoop4 hbase-1.0.0]$ mkdir tmp
[grid@hadoop4 hbase-1.0.0]$ mkdir zookeeper

5.3.
[grid@hadoop4 hbase-1.0.0]$ vim conf/regionservers
hadoop4
hadoop5
hadoop6

6.设置环境变量
[grid@hadoop4 ~]$ vi .bash_profile
export HBASE_HOME=/home/grid/hbase-1.0.0
export PATH=$PATH:$HBASE_HOME/bin
[grid@hadoop4 ~]$ source .bash_profile

7.分发 hbase 到其它机器,并在其上设置环境变量
[grid@hadoop4 ~]$ scp -r hbase-1.0.0 grid@hadoop5:~
[grid@hadoop4 ~]$ scp -r hbase-1.0.0 grid@hadoop6:~

8.启动 hbase
[grid@hadoop4 ~]$ sh start-hbase.sh
[grid@hadoop4 ~]$ jps
2388 ResourceManager
3692 Jps
2055 NameNode
3375 HQuorumPeer
2210 SecondaryNameNode
3431 HMaster
[grid@hadoop5 ~]$ jps
2795 Jps
2580 HQuorumPeer
2656 HRegionServer
2100 NodeManager
1983 DataNode
[grid@hadoop6 ~]$ jps
2566 HQuorumPeer
1984 DataNode
2101 NodeManager
2803 Jps
2639 HRegionServer
jps查看发现 Master 机上 HRegionServer 服务未启动,查看日志显示因16020端口被占用导致 HRegionServer 启动失败,查证发现占用16020端口的是 HMaster 进程,查看官方文档后解决:[grid@hadoop4 ~]$ sh local-regionservers.sh start 2 
官方文档截图:


9.shell


10.Web管理界面

hbase源码系列(十五)终结篇&Scan续集-->如何查询出来下一个KeyValue

cenyuhai 发表了文章 0 个评论 1935 次浏览 2015-09-11 15:17 来自相关话题

这是这个系列的最后一篇了,实在没精力写了,本来还想写一下hbck的,这个东西很常用,当hbase的Meta表出现错误的时候,它能够帮助我们进行修复,无奈看到3000多行的代码时,退却了,原谅我这点自私的想法吧。 在讲《Get、Scan在服务端是如何 ...查看全部
这是这个系列的最后一篇了,实在没精力写了,本来还想写一下hbck的,这个东西很常用,当hbase的Meta表出现错误的时候,它能够帮助我们进行修复,无奈看到3000多行的代码时,退却了,原谅我这点自私的想法吧。
在讲《Get、Scan在服务端是如何处理?》当中的nextInternal流程,它的第一步从storeHeap当中取出当前kv,这块其实有点儿小复杂的,因为它存在异构的Scanner(一个MemStoreScanner和多个StoreFileScanner),那怎么保证从storeHeap里面拿出来的总是离上一个kv最接近的kv呢?
这里我们知道,在打开这些Scanner之后,就对他们进行了一下seek操作,它们就已经调整到最佳位置了。
我们看看KeyValueHeap的构造函数里面去看看吧。
public KeyValueHeap(List scanners, KVComparator comparator) throws IOException {
this.comparator = new KVScannerComparator(comparator);
if (!scanners.isEmpty()) {
this.heap = new PriorityQueue(scanners.size(),
this.comparator);
//...
   this.current = pollRealKV();
}
}


它内部有一个叫heap的PriorityQueue队列,它会对所有的Scanner进行排序,排序的比较器是KVScannerComparator, 然后current又调用了pollRealKV通过比较获得当前的Scanner,后面会讲。
那好,我们直接进去KVScannerComparator看看它的compare方法就能知道怎么回事了。
public int compare(KeyValueScanner left, KeyValueScanner right) {
// 先各取出来一个KeyValue进行比较
int comparison = compare(left.peek(), right.peek());
if (comparison != 0) {
return comparison;
} else {
// key相同,选择最新的那个
long leftSequenceID = left.getSequenceID();
long rightSequenceID = right.getSequenceID();
if (leftSequenceID > rightSequenceID) {
return -1;
} else if (leftSequenceID < rightSequenceID) {
return 1;
} else {
return 0;
}
}
}


额,从上面代码看得出来,把left和right各取出一个kv来进行比较,如果一样就比较SequenceID,SequenceID越大说明这个文件越新,返回-1,在升序的情况下,这个Scanner就跑到前面去了。
这样就实现了heap里面拿出来的第一个就是最小的kv的最新版。
在继续将之前,我们看一下在KeyValue是怎么被调用的,这样我们好理清思路。
//从storeHeap里面取出一个来
KeyValue current = this.storeHeap.peek();
//后面是一顿比较,比较通过,把结果保存到results当中
KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset, length);

接着看populateResult方法。
private KeyValue populateResult(List results, KeyValueHeap heap, int limit,
byte[] currentRow, int offset, short length) throws IOException {
KeyValue nextKv;
do {
//从heap当中取出剩下的结果保存在results当中
heap.next(results, limit - results.size());
//如果够数了,就返回了
if (limit > 0 && results.size() == limit) {
return KV_LIMIT;
}
nextKv = heap.peek();
} while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
return nextKv;
}


我们对KeyValueHeap的使用,就是先peek,然后再next,我们接下来就按这个顺序看吧。
先从peek取出来一个,peek就是从heap队列取出来的current的scanner取出来的当前的KeyValue。
if (this.current == null) {
return null;
}
return this.current.peek();

然后我们看next方法。
public boolean next(List result, int limit) throws IOException {
if (this.current == null) {
return false;
}
InternalScanner currentAsInternal = (InternalScanner)this.current;
boolean mayContainMoreRows = currentAsInternal.next(result, limit);
KeyValue pee = this.current.peek();
if (pee == null || !mayContainMoreRows) {
this.current.close();
} else {
this.heap.add(this.current);
}
this.current = pollRealKV();
return (this.current != null);
}


1、通过currentAsInternal.next继续获取kv,它是只针对通过通过检查的当前行的剩下的KeyValue,这个过程在之前那篇文章讲过了。
2、如果后面没有值了,就关闭这个Scanner。
3、然后还有,就把这个Scanner放回heap上,等待下一次调用。
4、使用pollRealKV再去一个新的Scanner出来。

private KeyValueScanner pollRealKV() throws IOException {
KeyValueScanner kvScanner = heap.poll();
if (kvScanner == null) {
return null;
}

while (kvScanner != null && !kvScanner.realSeekDone()) {
if (kvScanner.peek() != null) {
//查询之前没有查的
kvScanner.enforceSeek();
//把之前的查到位置的kv拿出来
KeyValue curKV = kvScanner.peek();
if (curKV != null) {
//再选出来下一个的scanner
KeyValueScanner nextEarliestScanner = heap.peek();
if (nextEarliestScanner == null) {
// 后面没了,只能是它了
return kvScanner;
}

// 那下一个Scanner的kv也出来比较比较
KeyValue nextKV = nextEarliestScanner.peek();
if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {
// 它确实小,那么就把它放出去吧
return kvScanner;
}

// 把它放回去,和别的kv进行竞争
heap.add(kvScanner);
} else {
// 它没东西了,关闭完事
kvScanner.close();
}
} else {
// 它没东西了,关闭完事
kvScanner.close();
}
kvScanner = heap.poll();
}

return kvScanner;
}
View Code
鉴于它每次都要比较的情况,如果一个列族下的HFile比较多的话,它的比较次数也会增大,会影响查询效率,查询时间和HFile的数量成线性关系。
另外补充点内容,是前面写Scan的时候拉下的:
由于写入同一个rowkey相关的KeyValue的时候时间戳在前的先写入,查询的时候又需要总是读该rowkey最新的KeyValue,所以在查询的时候会先seek到该rowkey的时间戳最大的位置,具体查的时候,不断的向前seekBefore,直到这个rowkey的KeyValue全部查完位置,然后再向前定位到一个rowkey的位置。
简而言之:
不同rowkey的向前查,从rowkey小的查到rowkey大的;查相同rowkey的向后查,从最新的时间戳到查到最久的时间戳。
 
总结:
这就把如何查询出来下一个KeyValue的过程讲完了,它的peek方法、next方法、比较的方法,希望对大家有帮助,这个系列的文章到此也就结束了,下个目标是跟随超哥学习Spark源码,感谢广大读者的支持,觉得我写得好的,可以关注一下我的博客,谢谢!
 

hbase源码系列(十四)Compact和Split

cenyuhai 发表了文章 1 个评论 1977 次浏览 2015-09-11 15:16 来自相关话题

先上一张图讲一下Compaction和Split的关系,这样会比较直观一些。 Compaction把多个MemStore flush出来的StoreFile合并成一个文件,而Split则是把过大的文件Split成两个。 之 ...查看全部
先上一张图讲一下Compaction和Split的关系,这样会比较直观一些。

Compaction把多个MemStore flush出来的StoreFile合并成一个文件,而Split则是把过大的文件Split成两个。
之前在Delete的时候,我们知道它其实并没有真正删除数据的,那总不能一直不删吧,下面我们就介绍一下它删除数据的过程,它就是Compaction。
在讲源码之前,先说一下它的分类和作用。
Compaction主要起到如下几个作用:
1)合并文件
2)清除删除、过期、多余版本的数据
3)提高读写数据的效率
Minor & Major Compaction的区别
1)Minor操作只用来做部分文件的合并操作以及包括minVersion=0并且设置ttl的过期版本清理,不做任何删除数据、多版本数据的清理工作。
2)Major操作是对Region下的HStore下的所有StoreFile执行合并操作,最终的结果是整理合并出一个文件。
先说一下怎么使用吧,下面分别是它们是shell命令,可以在hbase的shell里面执行。
//major compaction
major compact '表名或region名'

//minor compaction
compact '表名或region名'

下面我们开始看入口吧,入口在HBaseAdmin,找到compact方法,都知道我们compact可以对表操作或者对region进行操作。
1、先把表或者region相关的region信息和server信息全部获取出来
2、循环遍历这些region信息,依次请求compact操作
AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
try {
admin.compactRegion(null, request);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}


到这里,客户端的工作就结束了,我们直接到HRegionServer找compactRegion这个方法吧。
    //major compaction多走这一步骤
if (major) {
if (family != null) {
store.triggerMajorCompaction();
} else {
region.triggerMajorCompaction();
}
}
    //请求compaction走这里
if(family != null) {
compactSplitThread.requestCompaction(region, store, log, Store.PRIORITY_USER, null);
} else {
compactSplitThread.requestCompaction(region, log, Store.PRIORITY_USER, null);
}


我们先看major compaction吧,直接去看triggerMajorCompaction和requestCompaction方法。
Compaction
进入方法里面就发现了它把forceMajor置为true就完了,看来这个参数是major和minor的开关,接着看requestCompaction。
CompactionContext compaction = null;
if (selectNow) {
compaction = selectCompaction(r, s, priority, request);
if (compaction == null) return null; // message logged inside
}
// 要根据文件的size来判断用给个大的线程池还是小的线程池
long size = selectNow ? compaction.getRequest().getSize() : 0;
ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size)) ? largeCompactions : smallCompactions;
pool.execute(new CompactionRunner(s, r, compaction, pool));


上面的步骤是执行selectCompaction创建一个CompactionContext,然后提交CompactionRunner。
我们接着看CompactionContext的创建过程吧,这里还需要分是用户创建的Compaction和系统创建的Compaction。1、创建CompactionContext2、判断是否是非高峰时间,下面是这两个参数的值
int startHour = conf.getInt("hbase.offpeak.start.hour", -1);
int endHour = conf.getInt("hbase.offpeak.end.hour", -1);
3、选择需要进行compaction的文件,添加到CompactionRequest和filesCompacting列表当中
compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor && filesCompacting.isEmpty());

我们看看这个select的具体实现吧。
public boolean select(List filesCompacting, boolean isUserCompaction,
boolean mayUseOffPeak, boolean forceMajor) throws IOException {
request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),
filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor);
return request != null;
}

这里的select方法,从名字上看是压缩策略的意思,它是由这个参数控制的hbase.hstore.defaultengine.compactionpolicy.class,默认是ExploringCompactionPolicy这个类。
接着看ExploringCompactionPolicy的selectCompaction方法,发现这个方法是继承来的,找它的父类RatioBasedCompactionPolicy。

public CompactionRequest selectCompaction(Collection candidateFiles,
final List filesCompacting, final boolean isUserCompaction,
final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
ArrayList candidateSelection = new ArrayList(candidateFiles);
int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
>= storeConfigInfo.getBlockingFileCount();
//从candidateSelection排除掉filesCompacting中的文件
candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);long cfTtl = this.storeConfigInfo.getStoreFileTtl();
if (!forceMajor) {
// 如果不是强制major的话,包含了过期的文件,先删除过期的文件
if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
ArrayList expiredSelection = selectExpiredStoreFiles(
candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
if (expiredSelection != null) {
return new CompactionRequest(expiredSelection);
}
}
//居然还要跳过大文件,看来不是major的还是不行的,净挑小的弄
candidateSelection = skipLargeFiles(candidateSelection);
}
// 是不是major的compaction还需要判断,做这个操作还是比较谨慎的
boolean majorCompaction = (
(forceMajor && isUserCompaction)
|| ((forceMajor || isMajorCompaction(candidateSelection))
&& (candidateSelection.size() < comConf.getMaxFilesToCompact()))
|| StoreUtils.hasReferences(candidateSelection)
);

if (!majorCompaction) {
   //过滤掉bulk load进来的文件
candidateSelection = filterBulk(candidateSelection);
//过滤掉一些不满足大小的文件
candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
//检查文件数是否满足最小的要求,文件不够,也不做compaction
candidateSelection = checkMinFilesCriteria(candidateSelection);
}
//非major的超过最大可以compact的文件数量也要剔除掉,major的只是警告一下
candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
CompactionRequest result = new CompactionRequest(candidateSelection);
result.setOffPeak(!candidateSelection.isEmpty() && !majorCompaction && mayUseOffPeak);
return result;
}
View Code
从上面可以看出来,major compaction的选择文件几乎没什么限制,只要排除掉正在compacting的文件就行了,反而是minor compact有诸多的排除选项,因为默认的compaction是定时执行的,所以它这方面的考虑吧,排除太大的文件,选择那些过期的文件,排除掉bulkload的文件等等内容。Minor Compaction的文件选择策略
我们再简单看看applyCompactionPolicy这个方法吧,它是minor的时候用的,它的过程就像下图一样。

这个是双层循环:
从0开始,循环N遍(N=文件数),就相当于窗口向右滑动,指针为start
----->从currentEnd=start + MinFiles(默认是3)-1,每次增加一个文件作为考虑,类似扩张的动作, 窗口扩大, 指针为
-------------->从candidateSelection文件里面取出(start, currentEnd + 1)开始
-------------->小于最小compact数量文件,默认是3,continue
-------------->大于最大compact数量文件,默认是10,continue
-------------->获取这部分文件的大小
-------------->如果这部分文件数量比上次选择方案的文件还小,替换为最小文件方案
-------------->大于MemStore flush的大小128M并且符合有一个文件不满这个公式(FileSize(i) <= ( 文件总大小- FileSize(i) ) * Ratio),continue
       (注意上面的Ratio是干嘛的,这个和前面提到的非高峰时间的数值有关系,非高峰时段这个数值是5,高峰时间段这个值是1.2, 这说明高峰时段不允许compact过大的文件)
-------------->开始判断是不是最优的选择(下面讲的mayBeStuck是从selectCompaction传入的,可选择的文件超过7个的情况,上面黄色那部分代码)
          1)如果mayBeStuck并且不是初次,如果 文件平均大小 > 上次选择的文件的平均大小*1.05, 替换上次的选择文件方案成为最优解
          2)初次或者不是mayBeStuck的情况,文件更多的或者文件相同、总文件大小更小的会成为最新的选择文件方案
如果经过比较之后的最优文件选择方案不为空,就把它返回,否则就把最小文件方案返回。
下面是之前的Ratio的参数值,需要配合之前提到的参数配合使用的。
hbase.hstore.compaction.ratio              高峰时段,默认值是1.2
hbase.hstore.compaction.ratio.offpeak 非高峰时段,默认值是5

 
到这里先来个小结吧,从上面可以看得出来,这个Minor Compaction的文件选择策略就是选小的来,选最多的小文件来合并。选择文件结束,回到compact的主流程4、把CompactionRequest放入CompactionRunner,走线程池提交
之前的代码我再贴一下,省得大家有点凌乱。
ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size)) ? largeCompactions : smallCompactions;
pool.execute(new CompactionRunner(s, r, compaction, pool));

我们去看CompactionRunner的run方法吧,它也在当前的类里面。
      if (this.compaction == null) {this.compaction = selectCompaction(this.region, this.store, queuedPriority, null); 
     // 出口,实在选不出东西来了,它会走这里跑掉
     if (this.compaction == null) return;
     // ....还有别的限制,和父亲运行的线程池也要一致,尼玛,什么逻辑
    }

    boolean completed = region.compact(compaction, store);if (completed) {
// blocked的regions再来一次,这次又要一次compaction意欲何为啊
// 其实它的出口在上面的那段代码,它执行之后,没有这里这么恶心
if (store.getCompactPriority() <= 0) {
requestSystemCompaction(region, store, "Recursive enqueue");
} else {
// compaction之后的region可能很大,超过split的数量就要split了
requestSplit(region);
}


先是对region进行compact,如果完成了,判断一下优先级,优先级小于等于0,请求系统级别的compaction,否则请求split。
我们还是先看HRegion的compact方法,compact开始前,它要先上读锁,不让读了,然后调用HStore中的compact方法。
     // 执行compact,生成新文件
List newFiles = compaction.compact();
//把compact生成的文件移动到正确的位置
sfs = moveCompatedFilesIntoPlace(cr, newFiles);
//记录WALEdit日志
writeCompactionWalRecord(filesToCompact, sfs);
//更新HStore相关的数据结构
replaceStoreFiles(filesToCompact, sfs);/
/归档旧的文件,关闭reader,重新计算file的大小
completeCompaction(filesToCompact);


comact生成新文件的方法很简单,给源文件创建一个StoreScanner,之前说过StoreScanner能从多个Scanner当中每次都取出最小的kv,然后用StoreFile.Append的方法不停地追加写入即可,这些过程在前面的章节都介绍过了,这里不再重复。
简单的说,就是把这些文件合并到一个文件去了,尼玛,怪不得io那么大。
剩下的就是清理工作了,这里面有意思的就是它会记录一笔日志到writeCompactionWalRecord当中,在之间日志恢复那一章的时候,贴出来的代码里面有,只是没有详细的讲。因为走到这里它已经完成了compaction的过程,只是没有把旧的文件移入归档文件当中,它挂掉重启的时候进行恢复干的事情,就是替换文件。5、store.getCompactPriority() 下一步是天堂抑或是地狱?
compact完了,要判断一下这个,真是天才啊
public int getStoreCompactionPriority() {
int blockingFileCount = conf.getInt(
HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
int priority = blockingFileCount - storefiles.size();
return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority;
}

比较方法是这个,blockingFileCount的默认值是7,如果compact之后storefiles的文件数量大于7的话,就很有可能再触发一下,那么major compaction触发的可能性低,minor触发的可能性非常大。
不过没关系,实在选不出文件来,它会退出的。咱们可以将它这个参数hbase.hstore.blockingStoreFiles设置得大一些,弄出来一个比较大的数字。Split
好,我们接着看requestSplit。
if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER) {
byte[] midKey = r.checkSplit();
if (midKey != null) {
requestSplit(r, midKey);
return true;
}
}


先检查一下是否可以进行split,如果可以,把中间的key返回来。
那条件是啥?在这里,if的条件是成立的,条件判断在IncreasingToUpperBoundRegionSplitPolicy的shouldSplit方法当中。
遍历region里面所有的store
1、Store当中不能有Reference文件。
2、store.size > Math.min(getDesiredMaxFileSize(), this.flushSize * (tableRegionsCount * (long)tableRegionsCount)) 就返回ture,可以split。
getDesiredMaxFileSize()默认是10G,由这个参数来确定hbase.hregion.max.filesize, 当没超过10G的时候它就会根据128MB * (该表在这个RS上的region数量)平方。
midKey怎么找呢?找出最大的HStore,然后通过它来找这个分裂点,最大的文件的中间点。
return StoreUtils.getLargestFile(this.storefiles).getFileSplitPoint(this.kvComparator);

但是如果是另外一种情况,我们通过客户端来分裂Region,我们强制指定的分裂点,这种情况是按照我们设置的分裂点来进行分裂。
分裂点有了,我们接着看,我们发现它又提交了一个SplitRequest线程,看run方法。
1、先获得一个tableLock,给这个表上锁
2、执行SplitTransaction的prepare方法,然后execute
3、结束了释放tableLock
      // 先做准备工作,然后再execute执行主流程,过程当中出错了,就rollback
if (!st.prepare()) return;
try {
st.execute(this.server, this.server);
} catch (Exception e) {
try {
if (st.rollback(this.server, this.server)) {
} catch (RuntimeException ee) {this.server.abort(msg);
}
return;
}


prepare方法当中,主要做了这么件事,new了两个新的region出来
this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);

我们接着看execute方法,这个是重头戏。
PairOfSameType regions = createDaughters(server, services);
openDaughters(server, services, regions.getFirst(), regions.getSecond());
transitionZKNode(server, services, regions.getFirst(), regions.getSecond());

总共分三步:
1、创建子region
2、上线子region
3、更改zk当中的状态
我们先看createDaughters
    //在region-in-transition节点下给父region创建一个splitting的节点
createNodeSplitting(server.getZooKeeper(), parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);//在parent的region目录下创建.splits目录
this.parent.getRegionFileSystem().createSplitsDir();
this.journal.add(JournalEntry.CREATE_SPLIT_DIR);

Map[], List> hstoreFilesToSplit = null;
//关闭parent,然后返回相应的列族和storefile的map
hstoreFilesToSplit = this.parent.close(false);
//从在线列表里下线parent
services.removeFromOnlineRegions(this.parent, null);
this.journal.add(JournalEntry.OFFLINED_PARENT);
// 把parent的storefile均分给两个daughter,所谓均分,只是创建引用文件而已
splitStoreFiles(hstoreFilesToSplit);

// 把临时的Region A目录重名为正式的region A 的目录
this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);

// 把临时的Region B目录重名为正式的region B的目录
this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
this.journal.add(JournalEntry.PONR);

// 修改meta表中的信息,设置parent的状态为下线、并且split过,在增加两列左右孩子,左右孩子的信息也通过put插入到meta中
MetaEditor.splitRegion(server.getCatalogTracker(), parent.getRegionInfo(),
a.getRegionInfo(), b.getRegionInfo(), server.getServerName());
return new PairOfSameType(a, b);


在splitStoreFiles这块的,它给每个文件都开一个线程去进行split。
fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false);
fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true);

这里其实是给每个文件都创建了Reference文件,无论它的文件当中包不包括splitRow。
    //parentRegion/.splits/region/familyName目录
Path splitDir = new Path(getSplitsDir(hri), familyName);
// 其实它并没有真正的split,而是通过创建Reference
Reference r = top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);
String parentRegionName = regionInfo.getEncodedName();
// 原来通过这么关联啊,storefile名字 + 父parent的name
Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
return r.write(fs, p);


把引用文件生成在每个子region对应的目录,以便下一步直接重命令目录即可。
重命名目录之后,就是修改Meta表了,splitRegion的方法是通过Put来进行操作的,它修改parent的regioninfo这一列更新为最新的信息,另外又增加了splitA和splitB两列,hri_a和hri_b则通过另外两个Put插入到Meta表当中。
这个过程当中如果出现任何问题,就需要根据journal记录的过程信息进行回滚操作。
怎么open这两个子region就不讲了,之前讲《HMaster启动过程》的时候讲过了。
到这里split的过程就基本结束了,鉴于Compaction和Split的对io方面的巨大影响,所以在任何资料里面都是推荐屏蔽自动执行,写脚本在晚上自动进行这些操作。
 
 
 

hbase源码系列(十三)缓存机制MemStore与Block Cache

cenyuhai 发表了文章 0 个评论 2540 次浏览 2015-09-11 15:16 来自相关话题

这一章讲hbase的缓存机制,这里面涉及的内容也是比较多,呵呵,我理解中的缓存是保存在内存中的特定的便于检索的数据结构就是缓存。 之前在讲put的时候,put是被添加到Store里面,这个Store是个接口,实现是在HStore里面,MemStor ...查看全部
这一章讲hbase的缓存机制,这里面涉及的内容也是比较多,呵呵,我理解中的缓存是保存在内存中的特定的便于检索的数据结构就是缓存。
之前在讲put的时候,put是被添加到Store里面,这个Store是个接口,实现是在HStore里面,MemStore其实是它底下的小子。
那它和Region Server、Region是什么关系?
Region Server下面有若干个Region,每个Region下面有若干的列族,每个列族对应着一个HStore。
HStore里面有三个很重要的类,在这章的内容都会提到。
protected final MemStore memstore;
private final CacheConfig cacheConf;
final StoreEngine storeEngine;

MemStore是存储着两个有序的kv集合,kv进来先写到里面,超过阀值之后就会写入硬盘。
CacheConf是针对HFileBlock的缓存,专门用来缓存快,默认是在读的时候缓存块,也可以修改列族的参数,让它在写的时候也缓存,这个在数据模型定义的时候提到过。
StoreEngine是StoreFile的管理器,它管理着这个列族对应的所有StoreFiles。1. MemStore
memstore比较有意思,我们先看它的add方法,这个是入口。
long add(final KeyValue kv) {
this.lock.readLock().lock();
try {
KeyValue toAdd = maybeCloneWithAllocator(kv);
return internalAdd(toAdd);
} finally {
this.lock.readLock().unlock();
}
}


先把kv放到maybeCloneWithAllocator里面复制出来一个新的kv,然后再走internalAdd的方法,为啥要这么搞呢?1.1 MemStoreLAB
先看maybeCloneWithAllocator,我们慢慢看,没关系。
private KeyValue maybeCloneWithAllocator(KeyValue kv) {
if (allocator == null) {
return kv;
}
int len = kv.getLength();
//从allocator当中分配出来len长度的非堆空间
Allocation alloc = allocator.allocateBytes(len);
if (alloc == null) {
// 太大了,allocator决定不给它分配
return kv;
}
//用allocator生成的空间,new一个kv出来
assert alloc != null && alloc.getData() != null;
System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
newKv.setMvccVersion(kv.getMvccVersion());
return newKv;
}


allocator是何许人也,它是一个MemStoreLAB,它是干啥的呀,这个让人很纠结呀?

public Allocation allocateBytes(int size) {
// 如果申请的size比maxAlloc大,就不分了
if (size > maxAlloc) {
return null;
}

while (true) {
Chunk c = getOrMakeChunk();

// 给它分配个位置,返回数组的起始位置
int allocOffset = c.alloc(size);
if (allocOffset != -1) {
// 用一个数据结构Allocation来描述这个,它主要包括两个信息,1:数组的引用,2:数据在数组当中的起始位置
return new Allocation(c.data, allocOffset);
}

// 空间不足了,释放掉它
tryRetireChunk(c);
}
}
View Code
下面看看getOrMakeChunk看看是啥情况,挺疑惑的东西。

private Chunk getOrMakeChunk() {
while (true) {
// 当前的Chunk不为空,就取当前的
Chunk c = curChunk.get();
if (c != null) {
return c;
}
// 这里还有个Chunk的Pool,默认是没有的,走的是new Chunk这条路径
c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
if (curChunk.compareAndSet(null, c)) {
// curChunk是为空的话,就设置为c,然后加到chunkQueue里面
c.init();
this.chunkQueue.add(c);
return c;
} else if (chunkPool != null) {
// 先放回去,待会儿再拿出来
chunkPool.putbackChunk(c);
}
}
}
View Code
Chunk是一个持有一个byte[]数组的数据结构,属性如下。
static class Chunk {
/* 实际数据保存的地方,被不停地分配 */
private byte[] data;
private static final int UNINITIALIZED = -1;
private static final int OOM = -2;
/* 下一个chunk的起始位置,也是上一个chunk的结束位置 */
private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);

/** 分配给了多少个kv */
private AtomicInteger allocCount = new AtomicInteger();

/** Chunk的大小 */
private final int size;


好吧,我们现在清楚了,它是给每个kv的数据又重新找了个地方混,从注释上面讲这个Chunk未初始化,没有被分配内存,所以开销小。不太理解这个东西,人家之前也是在byte数组里面混,只不顾挪了个窝了,莫非是为了减少内存碎片?尼玛,还真被我说中了,在我以前的资料里面有《调优》
不管怎么样吧,把多个小的kv写到一个连续的数组里面可能是好点好处吧,下面讲一下它的相关参数吧。
/** 可分配的最大值,超过这个值就不给它分配了,默认值是256K */
hbase.hregion.memstore.mslab.max.allocation 默认值是256 * 1024
/** 每个Chunk的大小,默认是2M */
hbase.hregion.memstore.mslab.chunksize 默认值是2048 * 1024

那我们继续讲讲这个MemStoreChunkPool吧,它默认是不被开启的,因为它的参数hbase.hregion.memstore.chunkpool.maxsize默认是0 (只允许输入0->1的数值),它是通过堆内存的最大值*比例来计算得出来的结果。
它可以承受的最大的Chunk的数量是这么计算的 MaxCount = MemStore内存限制 * Chunkpool.Maxsize / Chunksize。
MemStore的内存最大最小值分别是0.35 --> 0.4,这个在我之前的博客里面也有。
hbase.regionserver.global.memstore.upperLimit 
hbase.regionserver.global.memstore.lowerLimit

还有这个参数hbase.hregion.memstore.chunkpool.initialsize需要设置,默认又是0,输入0->1的数值,MaxCount乘以它就设置初始的Chunk大小。
没试过开启这个Pool效果是否会好,它是依附在MemStore里面的,它设置过大了,最直接的影响就是,另外两个集合的空间就小了。1.2 有序集合
分配完Chunk之后,干的是这个函数,就是添加到一个有序集合当中kvset。
private long internalAdd(final KeyValue toAdd) {
long s = heapSizeChange(toAdd, addToKVSet(toAdd));
//把时间戳范围加到内部去
timeRangeTracker.includeTimestamp(toAdd);
this.size.addAndGet(s);
return s;
}


MemStore里面有两个有序的集合,kvset和snapshot,KeyValueSkipListSet的内部实现是ConcurrentNavigableMap。
volatile KeyValueSkipListSet kvset;
volatile KeyValueSkipListSet snapshot;

它们的排序规则上一章已经说过了,排过序的在搜索的时候方便查找,这里为什么还有一个snapshot呢?snapshot是一个和它一样的东西,我们都知道MemStore是要flush到文件生成StoreFile的,那我不能写文件的时候让别人都没法读了吧,那怎么办,先把它拷贝到snapshot当中,这个时间很短,复制完了就可以访问kvset,实际flush的之后,我们flush掉snapshot当中的kv就可以啦。2. CacheConfig
在看这个之前,先推荐看一下我的另外一篇文章《缓存机制以及可以利用SSD作为存储的BucketCache》,否则后面有很多概念,你看不懂的。
这里我们主要关注的是LruBlockCache和BucketCache,至于他们的使用,请参照上面的博客设置,这里不再介绍哦。
CacheConfig是一个HStore一个,属性是根据列族定制的,比如是否常驻内存,但是它内存用来缓存块的BlockCache是Region Server全局共享的的globalBlockCache,在new一个CacheConfig的时候,它会调用instantiateBlockCache方法返回一个BlockCache缓存Block的,如果已经存在globalBlockCache,就直接返回,没有才会重新实例化一个globalBlockCache。
这里还分堆上内存和直接分配的内存,堆上的内存的参数hfile.block.cache.size默认是0.25。2.1 DoubleCache
直接分配的内存,要通过设置JVM参数-XX:MaxDirectMemorySize来设置,设置了这个之后我们还需要设置hbase.offheapcache.percentage(默认是0)来设置占直接分配内存的比例。
offHeapCacheSize =offheapcache.percentage * DirectMemorySize
这里我们还真不能设置它,因为如果设置了它的话,它会把new一个DoubleCache出来,它是LruBlockCache和SlabCache的合体,之前我提到的那篇文章里面说到SlabCache是一个只能存固定大小的Block大小的Cache,比较垃圾。2.2 LruBlockCache
如果offHeapCacheSize <= 0,就走下面的逻辑,这里我就简单陈述一下了,代码没啥可贴的。
LruBlockCache和BucketCache的合作方式有两种,一种是BucketCache作为二级缓存使用,比如SSD,一种是在内存当中,它俩各占比列0.1和0.9,还是建议上SSD做二级缓存,其实也不贵。
不管如何,BlockCache这块的总大小是固定的,是由这个参数决定hfile.block.cache.size,默认它是0.25,所以LruBlockCache最大也就是0.25的最大堆内存。
在LruBlockCache当中还分了三种优先级的缓存块,分别是SINGLE、MULTI、MEMORY,比列分别是0.25、0.5、0.25,当快要满的时候,要把块剔除出内存的时候,就要遍历所有的块了,然后计算他们的分别占的比例,剔除的代码还挺有意思。
     PriorityQueue bucketQueue =
new PriorityQueue(3);

bucketQueue.add(bucketSingle);
bucketQueue.add(bucketMulti);
bucketQueue.add(bucketMemory);

int remainingBuckets = 3;
long bytesFreed = 0;

BlockBucket bucket;
while((bucket = bucketQueue.poll()) != null) {
long overflow = bucket.overflow();
if(overflow > 0) {
//把要释放的空间bytesToFree分给3个bucket,3个分完
long bucketBytesToFree = Math.min(overflow,
(bytesToFree - bytesFreed) / remainingBuckets);
bytesFreed += bucket.free(bucketBytesToFree);
}
remainingBuckets--;
}


搞了一个优先级队列,先从SINGLE的开刀、SINGLE不行了,再拿MULTI开刀,最后是MEMORY。bytesToFree是之前计算好的,要释放的大小=当前值-最小值。
在我们设置列族参数的时候,有一个InMemory的参数,如果设置了它就是MEMORY,如果没设置,就是SINGLE,SINGLE类型的一旦被访问过之后,立马变成高富帅的MULTI,但是没有希望变成MEMORY。
这里之前百度的一个哥么问我,Meta表的块会不会一直被保存在MEMORY当中呢,这块的代码写得让人有点儿郁闷的,它是按照列族的参数设置的,但是我怎么去找Meta表的列族设置啊,啊被我找到了,在代码里面写着的。
public static final HTableDescriptor META_TABLEDESC = new HTableDescriptor(
TableName.META_TABLE_NAME,
new HColumnDescriptor[] {
new HColumnDescriptor(HConstants.CATALOG_FAMILY)
// 保持10个版本是为了帮助调试
.setMaxVersions(10)
.setInMemory(true)
.setBlocksize(8 * 1024)
.setScope(HConstants.REPLICATION_SCOPE_LOCAL)
// 不使用BloomFilter
.setBloomFilterType(BloomType.NONE)
});


可以看出来Meta表的块只有8K,常驻内存,不使用BloomFilter,允许集群间复制。
再吐槽一下hbase这个Lru算法吧,做得挺粗糙的,它记录了每个Block块的访问次数,但是它并没有按照这个来排序,就是简单的依赖哈希值来排序。
Tips:江湖传言一个Regionserver上有一个BlockCache和N个Memstore,它们的大小之和不能大于等于heapsize * 0.8,否则HBase不能正常启动,想想也是,hbase是内存大户,内存稍有不够就挂掉,大家要小心设置这个缓存的参数。2.3 BucketCache
原来这块的图在上面的那篇文章已经提到了,我就不再重复了,之前没看的请一定要看,那边有很详细的图解,我这里只是讲点我了解的实现。
我们可以从两个方法里面看LruBlockCache和BucketCache的关系,一个是getBlock,一个是evictBlock,先看evictBlock。
protected long evictBlock(CachedBlock block, boolean evictedByEvictionProcess) {
  //从map里面删除
map.remove(block.getCacheKey());if (evictedByEvictionProcess && victimHandler != null) {
boolean wait = getCurrentSize() < acceptableSize();
boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
   //保存到victimHandler里面
victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
inMemory, wait);
}
return block.heapSize()
}


在把block剔除出内存之后,就把块加到victimHandler里面,这个victimHandler就是BucketCache,在CacheConfig实例化LruBlockCache之后就用setVictimCache方法传进去的。
看完这个我们再看getBlock。
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) {
CachedBlock cb = map.get(cacheKey);
if(cb == null) {if (victimHandler != null)
return victimHandler.getBlock(cacheKey, caching, repeat);
return null;
}
return cb.getBuffer();
}


先从map中取,如果找不到就从victimHandler中取得。
从上面两个方法,我们可以看出来BucketCache是LruBlockCache的二级缓存,它不要了才会存到BucketCache当中,取得时候也是,找不到了才想起人家来。
好,我们现在进入到BucketCache里面看看,它里面有几个重要的属性。
// Store/read block data
IOEngine ioEngine;
// 内存map
private ConcurrentHashMap ramCache;
// 后备队列,质保存块的索引信息,比如offset, length
private ConcurrentHashMap backingMap;

这里怎么又来了两个,一个内存的,一个后备队里的,这个是有区别的RAMQueueEntry当中直接保存了块的buffer数据,BucketEntry只是保存了起始位置和长度。
下面我们看看这个流程吧,还是老规矩,先看入口,再看出口,入口在哪里,前面的代码中提到了,入口在cacheBlockWithWait方法。
    //已经有就不加啦
if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey))
return;
//写入一级缓存
RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
ramCache.put(cacheKey, re);
//用哈希值给计算出一个随机的队列来
int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
//把实体也插入到写入队列
BlockingQueue bq = writerQueues.get(queueNum);


 
可以看得出来在这个方法当中,先把块写入到ramCache当中,然后再插入到一个随机的写入队列,写入线程有3个,每个写入线程持有一个写入队列,线程的数量由参数hbase.bucketcache.writer.threads控制。
我们看看这个WriterThread的run方法吧。
     List entries = new ArrayList();
try {
while (cacheEnabled && writerEnabled) {
try {
//从inputQueue拿出来放到entries,然后再对entries操作
entries.add(inputQueue.take());
inputQueue.drainTo(entries);
} catch (InterruptedException ie) {
if (!cacheEnabled) break;
}
doDrain(entries);
}


 
那我们要关注的就是doDrain的方法了,在这个方法里面,它主要干了4件事情。
1、把ramCache当中的实体给剔除出来转换成BucketEntry,并切入到ioEngine。
2、ioEngine同步,ioEngine包括3种(file,offheap,heap),第一种就是写入SSD,用的是FileChannel,后两种是写入到一个ByteBufferArray
3、把BucketEntry添加到backingMap
4、如果空间不足的话,调用freeSpace清理空间,清理空间的方法和LruBlockCache的方法类似。
这里面的Bucket它也不是一个具体的东西,它里面记住的也是起始位置,使用了多少次的这些参数,所以说它是一个逻辑上的,而不是物理上的分配的一块随机的地址。
final private static class Bucket {
//基准起始位置
private long baseOffset;
//每个item分配的大小
private int itemAllocationSize;
//对应的在bucketSizeInfos中的位置
private int sizeIndex;
//总容量
private int itemCount;
private int freeList[];
//空闲的数量
private int freeCount;
//已经使用的数量
private int usedCount;
}


 
我们是不是可以这么理解:就是当我们不需要某个块的时候我们不用去物理的删除它,只需要不断的重用它里面的空间就可以了,而不需要管怎么删除、释放等相关内容。
BucketSizeInfo是负责管理这些Bucket的,它管理着3个队列,同时它可以动态根据需求,new一些新的不同大小的Bucket出来,也可以把现有的Bucket变更它的大小,Bucket的大小最小是5K,最大是513K。
final class BucketSizeInfo {
// Free bucket means it has space to allocate a block;
// Completely free bucket means it has no block.
private List bucketList, freeBuckets, completelyFreeBuckets;
private int sizeIndex;
}

 
sizeIndex是啥意思?是在BucketSizeInfo的数组里面的位置,它的大小都是有固定的值的,不能多也不能少,这里就不详细介绍了。我们直接看WriteToCache这个方法吧,好验证一下之前的想法。

    //序列化长度 = 数据长度 + 额外的序列化的长度16个字节
int len = data.getSerializedLength();
// This cacheable thing can't be serialized...
if (len == 0) return null;
//bucketAllocator给分配点空间
long offset = bucketAllocator.allocateBlock(len);
//生成一个实体
BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, inMemory);
//设置Deserializer,具体的实现在HFileBlock当中
bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
try {
if (data instanceof HFileBlock) {
ByteBuffer sliceBuf = ((HFileBlock) data).getBufferReadOnlyWithHeader();
sliceBuf.rewind();
assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
((HFileBlock) data).serializeExtraInfo(extraInfoBuffer);
//先写入数据信息,再写入头信息
ioEngine.write(sliceBuf, offset);
ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
} else {
//如果不是HFileBlock的话,把数据序列化到bb当中,然后写入到IOEngine
ByteBuffer bb = ByteBuffer.allocate(len);
data.serialize(bb);
ioEngine.write(bb, offset);
}
} catch (IOException ioe) {
// 出错了就释放掉这个这个块
bucketAllocator.freeBlock(offset);
throw ioe;
   }


 
这里我们看这一句就可以了ioEngine.write(sliceBuf, offset); 在写入ioEngine的时候是要传这个offset的,也正好验证了我之前的想法,所以BucketAllocator.allocateBlock的分配管理这块就很关键了。
关于怎么分配这块,还是留个能人讲吧,我是讲不好了。
 

hbase源码系列(十二)Get、Scan在服务端是如何处理?

cenyuhai 发表了文章 0 个评论 2281 次浏览 2015-09-11 15:15 来自相关话题

继上一篇讲了Put和Delete之后,这一篇我们讲Get和Scan, 因为我发现这两个操作几乎是一样的过程,就像之前的Put和Delete一样,上一篇我本来只打算写Put的,结果发现Delete也可以走这个过程,所以就一起写了。Get 我们打开HR ...查看全部
继上一篇讲了Put和Delete之后,这一篇我们讲Get和Scan, 因为我发现这两个操作几乎是一样的过程,就像之前的Put和Delete一样,上一篇我本来只打算写Put的,结果发现Delete也可以走这个过程,所以就一起写了。Get
我们打开HRegionServer找到get方法。Get的方法处理分两种,设置了ClosestRowBefore和没有设置的,一般来讲,我们都是知道了明确的rowkey,不太会设置这个参数,它默认是false的。
if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
byte[] row = get.getRow().toByteArray();
byte[] family = get.getColumn(0).getFamily().toByteArray();
r = region.getClosestRowBefore(row, family);
} else {
Get clientGet = ProtobufUtil.toGet(get);
if (existence == null) {
r = region.get(clientGet);
}
}


所以我们走的是HRegion的get方法,杀过去。
public Result get(final Get get) throws IOException {
checkRow(get.getRow(), "Get");
// 检查列族,以下省略代码一百字
List results = get(get, true);
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null);
}

先检查get的row是否在这个region里面,然后检查列族,如果没有的话,它会根据表定义给补全的,然后它转身又进入了另外一个get方法,真是狠心啊!
    List results = new ArrayList();
Scan scan = new Scan(get);
RegionScanner scanner = null;
try {
scanner = getScanner(scan);
scanner.next(results);
} finally {
if (scanner != null)
scanner.close();
}


从上面可以看得出来,为什么我要把get和Scanner一起讲了吧,因为get也是一种特殊的Scan的方法,它只寻找一个row的数据。Scan
下面开始讲Scan,在《HTable探秘》里面有个细节不知道注意到没,在查询之前,它要先OpenScanner获得要给ScannerId,这个OpenScanner其实也调用了scan方法,但是它过去不是干活的,而是先过去注册一个Scanner,订个租约,然后再把这个返回的ScannerId再次发送一个scan请求,这次才开始调用开始扫描。
扫描的时候,走的是这一段

      if (!done) {
long maxResultSize = scanner.getMaxResultSize();
if (maxResultSize <= 0) {
maxResultSize = maxScannerResultSize;
}
List values = new ArrayList();
MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
region.startRegionOperation(Operation.SCAN);
try {
int i = 0;
synchronized(scanner) {
for (; i < rows && currentScanResultSize < maxResultSize; i++) {
// 它用的是这个nextRaw方法
boolean moreRows = scanner.nextRaw(values);
if (!values.isEmpty()) {
results.add(Result.create(values));
}
if (!moreRows) {
break;
}
values.clear();
}
}
} finally {
region.closeRegionOperation();
}
}

// 没找到设置moreResults为false,找到了把结果添加到builder里面去
if (scanner.isFilterDone() && results.isEmpty()) {
moreResults = false;
results = null;
} else {
addResults(builder, results, controller);
}
}
}
View Code
这里面有controller和result,这块的话,我求证了一下RpcServer那块,如果Rpc传输的时候使用了codec来压缩的话,就用controller返回结果,否则用response返回。
这块就不管了不是重点,下面我们看一下RegionScanner。RegionScanner详解与代码拆分
我们冲过去看RegionScannerImpl吧,它在HRegion里面,我们直接去看nextRaw方法就可以了,get方法的那个next方法也是调用了nextRaw方法。
if (outResults.isEmpty()) {
     // 把结果存到outResults当中
returnResult = nextInternal(outResults, limit);
} else {
List tmpList = new ArrayList();
returnResult = nextInternal(tmpList, limit);
outResults.addAll(tmpList);
}


去nextInternal方法吧,这方法真大,尼玛,我要歇菜了,我们进入下一个阶段吧。

/**  把查询出来的结果保存到results当中  */
private boolean nextInternal(List results, int limit)
throws IOException {

while (true) {
//从storeHeap里面取出一个来
KeyValue current = this.storeHeap.peek();

byte[] currentRow = null;
int offset = 0;
short length = 0;
if (current != null) {
currentRow = current.getBuffer();
offset = current.getRowOffset();
length = current.getRowLength();
}
//检查一下到这个row是否应该停止了
boolean stopRow = isStopRow(currentRow, offset, length);
if (joinedContinuationRow == null) {
// 如果要停止了,就用filter的filterRowCells过滤一下results.
if (stopRow) {
if (filter != null && filter.hasFilterRow()) {
//使用filter过滤掉一些cells
filter.filterRowCells(results);
}
return false;
}
// 如果有filter的话,过滤通过
if (filterRowKey(currentRow, offset, length)) {
boolean moreRows = nextRow(currentRow, offset, length);
if (!moreRows) return false;
results.clear();
continue;
}
//把结果保存到results当中
KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
length);
// Ok, we are good, let's try to get some results from the main heap.
// 在populateResult找到了足够limit数量的
if (nextKv == KV_LIMIT) {
if (this.filter != null && filter.hasFilterRow()) {
throw new IncompatibleFilterException(
"Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
}
return true; // We hit the limit.
}

stopRow = nextKv == null ||
isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
// save that the row was empty before filters applied to it.
final boolean isEmptyRow = results.isEmpty();

// We have the part of the row necessary for filtering (all of it, usually).
// First filter with the filterRow(List). 过滤一下刚才找出来的
if (filter != null && filter.hasFilterRow()) {
filter.filterRowCells(results);
}
//如果result的空的,啥也没找到,这是。。。悲剧啊
if (isEmptyRow) {
boolean moreRows = nextRow(currentRow, offset, length);
if (!moreRows) return false;
results.clear();
// This row was totally filtered out, if this is NOT the last row,
// we should continue on. Otherwise, nothing else to do.
if (!stopRow) continue;
return false;
}

// Ok, we are done with storeHeap for this row.
// Now we may need to fetch additional, non-essential data into row.
// These values are not needed for filter to work, so we postpone their
// fetch to (possibly) reduce amount of data loads from disk.
if (this.joinedHeap != null) {
KeyValue nextJoinedKv = joinedHeap.peek();
// If joinedHeap is pointing to some other row, try to seek to a correct one.
boolean mayHaveData =
(nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length))
|| (this.joinedHeap.requestSeek(KeyValue.createFirstOnRow(currentRow, offset, length),
true, true)
&& joinedHeap.peek() != null
&& joinedHeap.peek().matchingRow(currentRow, offset, length));
if (mayHaveData) {
joinedContinuationRow = current;
populateFromJoinedHeap(results, limit);
}
}
} else {
// Populating from the joined heap was stopped by limits, populate some more.
populateFromJoinedHeap(results, limit);
}

// We may have just called populateFromJoinedMap and hit the limits. If that is
// the case, we need to call it again on the next next() invocation.
if (joinedContinuationRow != null) {
return true;
}

// Finally, we are done with both joinedHeap and storeHeap.
// Double check to prevent empty rows from appearing in result. It could be
// the case when SingleColumnValueExcludeFilter is used.
if (results.isEmpty()) {
boolean moreRows = nextRow(currentRow, offset, length);
if (!moreRows) return false;
if (!stopRow) continue;
}

// We are done. Return the result.
return !stopRow;
}
}
View Code
上面那段代码真的很长很臭,尼玛。。被我折叠起来了,有兴趣的看一眼就行,我们先分解开来看吧,这里面有两个Heap,一个是storeHeap,一个是JoinedHeap,他们啥时候用呢?看一下它的构造方法吧
for (Map.Entry[], NavigableSet[]>> entry :
scan.getFamilyMap().entrySet()) {
//遍历列族和列的映射关系,设置store相关的内容
Store store = stores.get(entry.getKey());
KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
|| this.filter.isFamilyEssential(entry.getKey())) {
scanners.add(scanner);
} else {
joinedScanners.add(scanner);
}
}
this.storeHeap = new KeyValueHeap(scanners, comparator);
if (!joinedScanners.isEmpty()) {
this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
}
}


如果joinedScanners不空的话,就new一个joinedHeap出来,但是我们看看它的成立条件,有点儿难吧。
1、filter不为null
2、scan设置了doLoadColumnFamiliesOnDemand为true
3、设置了的filter的isFamilyEssential方法返回false,这个估计得自己写一个,因为我刚才去看了几个filter的这个方法默认都是用的FilterBase的方法返回false。
好的,到这里我们有可以把上面那段代码砍掉很大一部分了,它的成立条件比较困难,所以很难出现了,那我们就挑重点的storeHeap来讲吧,我们先看着这三行。
Store store = stores.get(entry.getKey());
KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
this.storeHeap = new KeyValueHeap(scanners, comparator);

通过列族获得相应的Store,然后通过getScanner返回scanner加到KeyValueHeap当中,我们应该去刺探一下HStore的getScanner方法,它new了一个StoreScanner返回,继续看StoreScanner。

public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet[]> columns) throws IOException {

matcher = new ScanQueryMatcher(scan, scanInfo, columns,
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
oldestUnexpiredTS);

// 返回MemStore、所有StoreFile的Scanner.
List scanners = getScannersNoCompaction();

//explicitColumnQuery:是否过滤列族 lazySeekEnabledGlobally默认是true 如果文件数量超过1个,isParallelSeekEnabled就是true
if (explicitColumnQuery && lazySeekEnabledGlobally) {
for (KeyValueScanner scanner : scanners) {
scanner.requestSeek(matcher.getStartKey(), false, true);
}
} else {
if (!isParallelSeekEnabled) {
for (KeyValueScanner scanner : scanners) {
scanner.seek(matcher.getStartKey());
}
} else {
     //一般走这里,并行查
parallelSeek(scanners, matcher.getStartKey());
}
}

// 一个堆里面包括了两个scanner,MemStore、StoreFile的Scanner
heap = new KeyValueHeap(scanners, store.getComparator());

this.store.addChangedReaderObserver(this);
}
View Code
对上面的代码,我们再慢慢来分解。
1、先new了一个ScanQueryMatcher,它是一个用来过滤的类,传参数的时候,需要传递scan和oldestUnexpiredTS进去,oldestUnexpiredTS是个参数,是(当前时间-列族的生存周期),小于这个时间戳的kv视为已经过期了,在它初始化的时候,我们注意一下它的startKey和stopRow,这个startKey要注意,它可不是我们设置的那个startRow,而是用这个startRow来new了一个DeleteFamily类型的KeyValue。
this.stopRow = scan.getStopRow();
this.startKey = KeyValue.createFirstDeleteFamilyOnRow(scan.getStartRow())

2、接着我们看getScannersNoCompaction这个方法,它这里是返回了两个Scanner,MemStoreScanner和所有StoreFile的Scanner,在从StoreHeap中peak出来一个kv的时候,是从他们当中交替取出kv来的,StoreHeap从它的名字上面来看像是用了堆排序的算法,它的peek方法和next方法真有点儿复杂,下一章讲MemStore的时候再讲吧。
//获取所有的storefile,默认的实现没有用上startRow和stopRow
storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
memStoreScanners = this.memstore.getScanners();

默认的getStoreFileManager的getFilesForScanOrGet是返回了所有的StoreFile的Scanner,而不是通过startRow和stopRow做过滤,它的注释里面给出的解释,里面的files默认是按照seq id来排序的,而不是startKey,需要优化的可以从这里下手。
3、然后就开始先seek一下,而不是全表扫啊!
//过滤列族的情况
scanner.requestSeek(matcher.getStartKey(), false, true);
//一般走这里,并行查
parallelSeek(scanners, matcher.getStartKey());

scanner.requestSeek不是所有情况都要seek,是查询Delete的时候,如果查询的kv的时间戳比文件的最大时间戳小,就seek到上次未查询到的kv;它这里可能会用上DeleteFamily删除真个family这种情况。
parallelSeek就是开多线程去调用Scanner的seek方法, MemStore的seek很简单,因为它的kv集合是一个排序好的集合,HFile的seek比较复杂,下面我用一个图来表达吧。

在搜索HFile的时候,key先从一级索引找,通过它定位到细的二级索引,然后再定位到具体的block上面,到了HFileBlock之后,就不是seek了,就是遍历,遍历没什么好说的,不熟悉的朋友建议先回去看看《StoreFile存储格式》。注意哦,这个key就是我们的startKey哦,所以大家知道为什么要在scan的时候要设置StartKey了吗?nextInternal的流程
通过前面的分析,我们可以把nextInternal分解与拆分、抹去一些不必要的代码,我发现代码还是很难懂,所以我画了一个过程图出来代替那段代码。

 
特别注意事项:
1、这个图是被我处理过的简化之后的图,还有在放弃该row的kv们 之后并非都要进行是StopRow的判断,只是为了合并这个流程,我加上去的isStopRow的判断,但并不影响整个流程。
2、!isStopRow代表返回代码的(!isStopRow)的意思, 根据isStopRow的当前值来返回true或者false
3、true意味着退出,并且还有结果,false意味着退出,没有结果
 
诶,看到这里,还是没看到它是怎么用ScanQueryMatcher去过滤被删除的kv们啊,好,接下来我们重点考察这个问题。ScanQueryMatcher如何过滤已经被删除的KeyValue
这个过程屏蔽在了filterRow之后通过的把该row的kv接到结果集的这一步里面去了。它在里面不停的调用KeyValueHeap的next方法,match的调用正好在这个方法。我们现在就去追踪这遗失的部分。
我们直接去看它的match方法就好了,别的不用看了,它处理的情况好多好多,尼玛,这是要死人的节奏啊。
ScanQueryMatcher是用来处理一行数据之间的版本问题的,在每遇到一个新的row的时候,它都会先被设置matcher.setRow(row, offset, length)。
if (limit < 0 || matcher.row == null || !Bytes.equals(row, offset, length, matcher.row,
matcher.rowOffset, matcher.rowLength)) {
this.countPerRow = 0;
matcher.setRow(row, offset, length);
}

 
上面这段代码在StoreScanner的next方法里面,每当一行结束之后,都会调用这个方法。
在讲match方法之前,我先讲一下rowkey的排序规则,rowkey 正序->family 正序->qualifier 正序->ts 降序->type 降序,那么对于同一个行、列族、列的数据,时间越近的排在前面,类型越大的排在前面,比如Delete就在Put前面,下面是它的类型表。
//search用
Minimum((byte)0),
Put((byte)4),
Delete((byte)8),
DeleteFamilyVersion((byte)10),
DeleteColumn((byte)12),
DeleteFamily((byte)14),
//search用
Maximum((byte)255);


 
为什么这里先KeyValue的排序规则呢,这当然有关系了,这关系着扫描的时候,谁先谁后的问题,如果时间戳小的在前面,下面这个过滤就不生效了。
下面我们看看它的match方法的检查规则。
1、和当前行比较
//和当前的行进行比较,只有相等才继续,大于当前的行就要跳到下一行,小于说明有问题,停止
int ret = this.rowComparator.compareRows(row, this.rowOffset, this.rowLength,
bytes, offset, rowLength);
if (ret <= -1) {
return MatchCode.DONE;
} else if (ret >= 1) {
return MatchCode.SEEK_NEXT_ROW;
}


2、检查是否所有列都查过了
//所有的列都扫描过来
if (this.columns.done()) {
stickyNextRow = true;
return MatchCode.SEEK_NEXT_ROW;
}

3、检查列的时间戳是否过期
long timestamp = kv.getTimestamp();
// 检查列的时间是否过期
if (columns.isDone(timestamp)) {
return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
}

 
4a、如果是Delete的类型,加到ScanDeleteTraker。
if (kv.isDelete()) {
this.deletes.add(bytes, offset, qualLength, timestamp, type);
}

 
4b、如果不是,如果ScanDeleteTraker里面有Delete,就要让它经历ScanDeleteTraker的检验了(进宫前先验一下身)
DeleteResult deleteResult = deletes.isDeleted(bytes, offset, qualLength,
timestamp);
switch (deleteResult) {
case FAMILY_DELETED:
case COLUMN_DELETED:
return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
case VERSION_DELETED:
case FAMILY_VERSION_DELETED:
return MatchCode.SKIP;
case NOT_DELETED:
break;
default:
throw new RuntimeException("UNEXPECTED");
}


 
这里就要说一下刚才那几个Delete的了:
1)DeleteFamily是最凶狠的,生命周期也长,整个列族全删,基本上会一直存在
2)DeleteColum只删掉一个列,出现这个列的都会被干掉
3)DeleteFamilyVersion没遇到过
4)Delete最差劲儿了,只能删除指定时间戳的,时间戳一定要对哦,否则一旦发现不对的,这个Delete就失效了,可以说,生命周期只有一次,下面是源代码。

public DeleteResult isDeleted(byte [] buffer, int qualifierOffset,
int qualifierLength, long timestamp) {
//时间戳小于删除列族的时间戳,说明这个列族被删掉是后来的事情
if (hasFamilyStamp && timestamp <= familyStamp) {
return DeleteResult.FAMILY_DELETED;
}
//检查时间戳
if (familyVersionStamps.contains(Long.valueOf(timestamp))) {
return DeleteResult.FAMILY_VERSION_DELETED;
}

if (deleteBuffer != null) {
  
int ret = Bytes.compareTo(deleteBuffer, deleteOffset, deleteLength,
buffer, qualifierOffset, qualifierLength);

if (ret == 0) {
if (deleteType == KeyValue.Type.DeleteColumn.getCode()) {
return DeleteResult.COLUMN_DELETED;
}
// 坑爹的Delete它只删除相同时间戳的,遇到不想的它就pass了
if (timestamp == deleteTimestamp) {
return DeleteResult.VERSION_DELETED;
}

//时间戳不对,这个Delete失效了
deleteBuffer = null;
} else if(ret < 0){
// row比当前的大,这个Delete也失效了
deleteBuffer = null;
} else {
throw new IllegalStateException(...);
}
}

return DeleteResult.NOT_DELETED;
View Code
上一章说过,Delete new出来之后什么都不设置,就是DeleteFamily级别的选手,所以在它之后的会全部被干掉,所以你们懂的,我们也会用DeleteColum来删除某一列数据,只要时间戳在它之前的kv就会被干掉,删某个指定版本的少,因为你得知道具体的时间戳,否则你删不了。例子详解DeleteFamily
假设我们有这些数据
KeyValue [] kvs1 = new KeyValue[] {
KeyValueTestUtil.create("R1", "cf", "a", now, KeyValue.Type.Put, "dont-care"),
KeyValueTestUtil.create("R1", "cf", "a", now, KeyValue.Type.DeleteFamily, "dont-care"),
KeyValueTestUtil.create("R1", "cf", "a", now-500, KeyValue.Type.Put, "dont-care"),
KeyValueTestUtil.create("R1", "cf", "a", now+500, KeyValue.Type.Put, "dont-care"),
KeyValueTestUtil.create("R1", "cf", "a", now, KeyValue.Type.Put, "dont-care"),
KeyValueTestUtil.create("R2", "cf", "z", now, KeyValue.Type.Put, "dont-care")
};


 
Scan的参数是这些。
Scan scanSpec = new Scan(Bytes.toBytes("R1"));
scanSpec.setMaxVersions(3);
scanSpec.setBatch(10);
StoreScanner scan = new StoreScanner(scanSpec, scanInfo, scanType, getCols("a","z"), scanners);

 
然后,我们先将他们排好序,是这样的。
R1/cf:a/1400602376242(now+500)/Put/vlen=9/mvcc=0, 
R1/cf:a/1400602375742(now)/DeleteFamily/vlen=9/mvcc=0,
R1/cf:a/1400602375742(now)/Put/vlen=9/mvcc=0,
R1/cf:a/1400602375742(now)/Put/vlen=9/mvcc=0,
R1/cf:a/1400602375242(now-500)/Put/vlen=9/mvcc=0,
R2/cf:z/1400602375742(now)/Put/vlen=9/mvcc=0

 
所以到最后,黄色的三行会被删除,只剩下第一行和最后一行,但是最后一行也会被排除掉,因为它已经换行了,不是同一个行的,不在这一轮进行比较,返回MatchCode.DONE。---->回到前面是match过程
5、检查时间戳,即设置给Scan的时间戳,这个估计一般很少设置,时间戳过期,就返回下一个MatchCode.SEEK_NEXT_ROW。
6、检查列是否是Scan里面设置的需要查询的列。
7、检查列的版本,Scan设置的MaxVersion,超过了这个version就要赶紧闪人了哈,返回MatchCode.SEEK_NEXT_COL。
 
对于match的结果,有几个常见的:
1、MatchCode.INCLUDE_AND_SEEK_NEXT_COL 包括当前这个,跳到下一列,会引发StoreScanner的reseek方法。
2、MatchCode.SKIP 忽略掉,继续调用next方法。
3、MatchCode.SEEK_NEXT_ROW 不包括当前这个,继续调用next方法。
4、MatchCode.SEEK_NEXT_COL 不包括它,跳过下一列,会引发StoreScanner的reseek方法。
5、MatchCode.DONE rowkey变了,要留到下次进行比较了
 
讲到这里基本算结束了。关于测试
呵呵,有兴趣测试的童鞋可以打开下hbase源码,找到TestStoreScanner这个类自己调试看下结果。
 
 

hbase源码系列(十一)Put、Delete在服务端是如何处理?

cenyuhai 发表了文章 0 个评论 2166 次浏览 2015-09-11 15:15 来自相关话题

List> e : familyMap.entrySet()) { byte[] family = e.getKey(); List cells = e.getValue(); //列和count的映射 Map ...查看全部
List> e : familyMap.entrySet()) { byte[] family = e.getKey(); List cells = e.getValue(); //列和count的映射 Map kvCount = new TreeMap(Bytes.BYTES_COMPARATOR); for (Cell cell: cells) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); // 如果是时间戳是最新的话就执行下面这些操作
if (kv.isLatestTimestamp() && kv.isDeleteType()) { //new一个Get从Store里面去搜索
} else { kv.updateLatestStamp(byteNow); } } } }

看来一下代码,这里是上来先判断是否是最新的时间戳,我就回去看来一下Delete的构造函数,尼玛。。。
public Delete(byte [] row) {
this(row, HConstants.LATEST_TIMESTAMP);
}

public Delete(byte [] row, long timestamp) {
this(row, 0, row.length, timestamp);
}


只传了rowkey进去的,它就是最新的。。然后看了一下注释,凡是在这个时间点之前的所有版本的所有列,我们都要删除。
好吧,我们很无奈的宣布,我们只能走kv.isLatestTimestamp() && kv.isDeleteType(),下面是没放出来的代码。
byte[] qual = kv.getQualifier();
if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
//想到相同列的每次+1
Integer count = kvCount.get(qual);
if (count == null) {
kvCount.put(qual, 1);
} else {
kvCount.put(qual, count + 1);
}
//更新之后把最新的count数量
count = kvCount.get(qual);

Get get = new Get(kv.getRow());
get.setMaxVersions(count);
get.addColumn(family, qual);
//从store当中取出相应的result来
List result = get(get, false);

if (result.size() < count) {
// Nothing to delete 数量不够。。 更新最新的时间戳为现在的时间
kv.updateLatestStamp(byteNow);
continue;
}
//数量超过了也不行
if (result.size() > count) {
throw new RuntimeException("Unexpected size: " + result.size());
}
//取最后一个的时间戳
KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(count - 1));
//更新kv的时间戳为getkv的时间戳
Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);


 
这里又干了一个Get操作,把列族的多个版本的内容取出来,如果数量不符合预期也会有问题,但是这后面操作的中心思想就是:
(a)按照预期来说,取出来的少了,就设置删除的时间戳为现在;
(b)取出来的多了,就报错;
(c)刚好的,就把Delete的时间戳设置为最大的那个的时间戳,但即便是这样也没有删除数据。
回到这里我又想起来,只有在Compaction之后,hbase的文件才会变小,难道是在那个时候删除的?那在删除之前,我们进行Get或者Scan操作的时候,会不会读到这些没有被删除的数据呢?
好,让我们拭目以待。
 
 

hbase源码系列(十)HLog与日志恢复

cenyuhai 发表了文章 0 个评论 2164 次浏览 2015-09-11 15:14 来自相关话题

HLog概述 hbase在写入数据之前会先写入MemStore,成功了再写入HLog,当MemStore的数据丢失的时候,还可以用HLog的数据来进行恢复,下面先看看HLog的图。 旧版的HLog是实际上是一个Sequce ...查看全部
HLog概述
hbase在写入数据之前会先写入MemStore,成功了再写入HLog,当MemStore的数据丢失的时候,还可以用HLog的数据来进行恢复,下面先看看HLog的图。

旧版的HLog是实际上是一个SequceneFile,0.96的已经使用Protobuf来进行序列化了。从Writer和Reader上来看HLog的都是Entry的,换句话说就是,它的每一条记录就是一个Entry。
class Entry implements Writable {
private WALEdit edit;
private HLogKey key;
}

所以上面那个图已经不准确了,HLogKey没变,但是Value缺不是KeyValue,而是WALEdit。
下面我们看看HLogKey的五要素,region、tableName、log的顺序、写入时间戳、集群id。
public HLogKey(final byte [] encodedRegionName, final TableName tablename,
long logSeqNum, final long now, List clusterIds){
init(encodedRegionName, tablename, logSeqNum, now, clusterIds);
}

protected void init(final byte [] encodedRegionName, final TableName tablename,
long logSeqNum, final long now, List clusterIds) {
this.logSeqNum = logSeqNum;
this.writeTime = now;
this.clusterIds = clusterIds;
this.encodedRegionName = encodedRegionName;
this.tablename = tablename;
}


下面看看WALEdit的属性, 这里只列出来一个重要的,它是内部持有的一群KeyValue。。
public class WALEdit implements Writable, HeapSize {
......private final ArrayList kvs = new ArrayList();

HLog的具体实现类是FSHLog,一个Region Server有两个FSHLog,一个负责RS上面所有的用户region的日志,一个负责RS上面的META表的region的日志。
对于日志来说,我们关心的是它如何保证一致性和准确性,在需要它的时候可以发挥救命作用。HLog同步
对于meta region的HLog写入之后,它会立即同步到硬盘,非meta表的region,它会先把Entry添加到一个队列里面等待同步。
while(!this.isInterrupted() && !closeLogSyncer.get()) {
try {
if (unflushedEntries.get() <= syncedTillHere) {
synchronized (closeLogSyncer) {
closeLogSyncer.wait(this.optionalFlushInterval);
}
}// 同步已经添加的entry
sync();
} catch (IOException e) {
LOG.error("Error while syncing, requesting close of hlog ", e);
requestLogRoll();
Threads.sleep(this.optionalFlushInterval);
}
}


 
它这里是有一个判断条件的,如果判断条件不成立就立即同步,等待this.optionalFlushInterval时间,默认的同步间隔是1000,它是通过参数hbase.regionserver.optionallogflushinterval设置。unflushedEntries是一个AtomicLong在写入entry的时候递增,syncedTillHere是一个volatile long,同步完成之后也是变大,因为可能被多个线程调用同步操作,所以它是volatile的,从条件上来看,如果没有日志需要同步就等待一秒再进行判断,如果有日志需要同步,也是立马就写入硬盘的,如果发生错误,就是调用requestLogRoll方法,进行回滚,这个回滚比较有意思,它是跑过去flush掉MemStore中的数据,把他们写入硬盘。
下面是回滚的方法。中间我忽略了几步,然后找到LogRoller中的这段代码。
byte [][] regionsToFlush = getWAL().rollWriter(rollLog.get());
if (regionsToFlush != null) {
for (byte [] r: regionsToFlush) scheduleFlush(r);
}

找出来需要flush的region,然后计划flush。
regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
this.oldestUnflushedSeqNums);

static byte[][] findMemstoresWithEditsEqualOrOlderThan(
final long walSeqNum, final Map[], Long> regionsToSeqNums) {
List[]> regions = null;
for (Map.Entry[], Long> e : regionsToSeqNums.entrySet()) {
//逐个对比,找出小于已输出为文件的最小的seq id的region
if (e.getValue().longValue() <= walSeqNum) {
if (regions == null) regions = new ArrayList[]>();
regions.add(e.getKey());
}
}
return regions == null ? null : regions
.toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
}


逐个对比,找出来未flush MemStore的比输出的文件的HLog流水号还小的region,当它准备flush MemStore之前会调用startCacheFlush方法来把region从oldestUnflushedSeqNums这个map当中去除,添加到已经flush的map当中。从日志恢复
看过《HMaster启动过程》的童鞋都知道,如果之前有region失败的话,在启动之前会把之前的HLog进行split,把属于该region的为flush过的日志提取出来,然后生成一个新的HLog到recovered.edits目录下,中间的过程控制那块有点儿类似于snapshot的那种,在zk里面建立一个splitWAL节点,在这个节点下面建立任务,不一样的是,snapshot那块是自己处理自己的,这里是别人的闲事它也管,处理完了之后就更新这个任务的状态了,没有snapshot那么复杂的交互过程。
那啥时候会用到这个呢,在region打开的时候,我们从HRegionServer的openRegion方法一路跟踪,中间历经OpenMetaHandler,再到HRegion.openHRegion方法,终于在initializeRegionStores方法里面找到了那么一句话。
    // 如果recovered.edits有日志的话,就恢复日志
maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));

高潮来了!!!
    HLog.Reader reader = null;
try {
//创建reader读取hlog
reader = HLogFactory.createReader(fs, edits, conf);
long currentEditSeqId = -1;
long firstSeqIdInLog = -1;
long skippedEdits = 0;
long editsCount = 0;
long intervalEdits = 0;
HLog.Entry entry;
Store store = null;
boolean reported_once = false;

try {//逐个读取
while ((entry = reader.next()) != null) {
HLogKey key = entry.getKey();
WALEdit val = entry.getEdit();
//实例化firstSeqIdInLog
if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getLogSeqNum();
}
boolean flush = false;
for (KeyValue kv: val.getKeyValues()) {
// 从WALEdits里面取出kvs
if (kv.matchingFamily(WALEdit.METAFAMILY) ||
!Bytes.equals(key.getEncodedRegionName(),
this.getRegionInfo().getEncodedNameAsBytes())) {//是meta表的kv就有compaction
CompactionDescriptor compaction = WALEdit.getCompaction(kv);
if (compaction != null) {
//完成compaction未完成的事情,校验输入输出文件,完成文件替换等操作
completeCompactionMarker(compaction);
}

skippedEdits++;
continue;
}
// 获得kv对应的store
if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
store = this.stores.get(kv.getFamily());
}
if (store == null) {
// 应该不会发生,缺少它对应的列族
skippedEdits++;
continue;
}
// seq id小,呵呵,说明已经被处理过了这个日志
if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily().getName())) {
skippedEdits++;
continue;
}
currentEditSeqId = key.getLogSeqNum();
// 这个就是我们要处理的日志,添加到MemStore里面就ok了
flush = restoreEdit(store, kv);
editsCount++;
}
//MemStore太大了,需要flush掉
if (flush) internalFlushcache(null, currentEditSeqId, status);

}
} catch (IOException ioe) {
// 就是把名字改了,然后在后面加上".时间戳",这个有毛意思?
if (ioe.getCause() instanceof ParseException) {
Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
msg = "File corruption encountered! " +
"Continuing, but renaming " + edits + " as " + p;
} else {// 不知道是啥错误,抛错误吧,处理不了
throw ioe;
}
}
status.markComplete(msg);
return currentEditSeqId;
} finally {
status.cleanup();
if (reader != null) {
reader.close();
}
}


呵呵,读取recovered.edits下面的日志,符合条件的就加到MemStore里面去,完成之后,就把这些文件删掉。大家也看到了,这里通篇讲到一个logSeqNum,哪里都有它的身影,它实际上是FSHLog当中的一个递增的AtomicLong,每当往FSLog里面写入一条日志的时候,它都会加一,然后MemStore请求flush的时候,会调用FSLog的startCacheFlush方法,获取(logSeqNum+1)回来,然后写入到StoreFile的sequenceid字段,再次拿出来的时候,就遍历这个HStore下面的StoreFile的logSeqNum,取出来最大的跟它比较,小于它的都已经写过了,没必要再写了。
好了,HLog结束了,累死我了,要睡了。