源码分析

源码分析

president_poll的代码在资料下载中没有找到

fish 回复了问题 2 人关注 1 个回复 1512 次浏览 2017-02-18 22:37 来自相关话题

推荐个好的源码阅读工具

mopishv0 回复了问题 4 人关注 3 个回复 2680 次浏览 2016-03-31 11:33 来自相关话题

Hadoop源码分析之心跳机制

唐半张 发表了文章 0 个评论 1872 次浏览 2015-09-30 11:05 来自相关话题

一、心跳机制 1、hadoop集群是master/slave模式,master包括Namenode和Jobtracker,slave包括Datanode和T ...查看全部
一、心跳机制
1、hadoop集群是master/slave模式,master包括Namenode和Jobtracker,slave包括Datanode和Tasktracker。
2、master启动的时候,会开一个ipc server在那里,等待slave心跳。
3、slave启动时,会连接master,并每隔3秒钟主动向master发送一个“心跳”,这个时间可 以通过”heartbeat.recheck.interval”属性来设置。将自己的状态信息告诉master,然后master也是通过这个心跳的返回值,向slave节点传达指令。
4、需要指出的是:namenode与datanode之间的通信,jobtracker与tasktracker之间的通信,都是通过“心跳”完成的。
二、Datanode、Namenode心跳源码分析
既然“心跳”是Datanode主动给Namenode发送的。那Datanode是怎么样发送的呢?下面贴出Datanode.class中的关键代码:
代码一:
/**
* 循环调用“发送心跳”方法,直到shutdown
* 调用远程Namenode的方法
*/
public void offerService() throws Exception {
•••
while (shouldRun) {
try {
long startTime = now();
// heartBeatInterval是在启动Datanode时根据配置文件设置的,是心跳间隔时间
if (startTime - lastHeartbeat > heartBeatInterval) {
lastHeartbeat = startTime;
//Datanode发送心跳
DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
data.getCapacity(),
data.getDfsUsed(),
data.getRemaining(),
xmitsInProgress.get(),
getXceiverCount());
myMetrics.addHeartBeat(now() - startTime);

if (!processCommand(cmds))
continue;
}

•••
}
} // while (shouldRun)
} // offerService

需要注意的是:发送心跳的对象并不是datanode,而是一个名为namenode的对象,难道在datanode端就直接有个namenode的引用吗?其实不然,我们来看看这个namenode吧:
代码二:
public DatanodeProtocol namenode = null;

namenode其实是一个DatanodeProtocol的引用,在对hadoop RPC机制分析的文章中我提到过,这是一个Datanode和Namenode通信的协议,其中有许多未实现的接口方法,sendHeartbeat()就是其中的一个。下面看看这个namenode对象是怎么被实例化的吧:
代码三:
this.namenode = (DatanodeProtocol)   
RPC.waitForProxy(DatanodeProtocol.class,
DatanodeProtocol.versionID,
nameNodeAddr,
conf);
其实这个namenode并不是Namenode的一个对象,而只是一个Datanode端对Namenode的代理对象,正是这个代理完成了“心跳”。代理的底层实现就是RPC机制了。

hadoop 源码求解 RecordReader initialize方法如何调用

回复

VanquisherCsn 发起了问题 1 人关注 0 个回复 3288 次浏览 2015-09-16 10:37 来自相关话题

VersionInfo何时初始化的,怎么初始化的

回复

VanquisherCsn 发起了问题 1 人关注 0 个回复 1863 次浏览 2015-09-16 09:47 来自相关话题

源码分析之灰太狼手札(六):HBase的客户端请求处理

mopishv0 发表了文章 0 个评论 3107 次浏览 2015-09-08 15:28 来自相关话题

六 客户端请求处理 6.1 Region相关操作     代码起始位置:HRegionInterface.java 57、79、85 ...查看全部
六 客户端请求处理

6.1 Region相关操作

    代码起始位置:HRegionInterface.java 57、79、85、90、98、104、110、116、124、133、141、149行。

    Region的相关操作都比较简单,基本上就是对集合信息的过滤与获取,看一眼就能了解主要逻辑了。HRegionInterface 的实现类是HRegionServer,接口方法对应的实现十分好找。因此,此处就忽略以上开始行数的api说明了。

6.2 getClosestRowBefore

    代码起始位置:HRegionInterface.java 71行。

    该函数获取符合参数的所有数据或一条最接近且小于参数row key的数据。代码中首先获取region对象;判断key值是否在该region的范围内;为锁对象splitsAndClosesLock加上读锁;根据列名获取store对象;构造获取最接近行的状态记录对象,其中会初始化key特征信息、删除记录集合、如果是meta表还会计算表名分隔符位置;再次获取store中的锁对象,获取读锁。在store中的整个查找过程中,由一个GetClosestRowBeforeTracker类对象负责记录候选记录。

首先,从memstore中获取最接近的记录,根据3.4中的逻辑,memstore中的数据保存在两个有序集合中(数据集合和用于flush的快照集合,集合中的记录按照key升序、按照进入memstore的时间降序),针对每个集合做相同的操作。先根据目标key获得大于等于此key的集合子集,对于其中不大于key值的记录(也就是只处理等于该值的记录了),如果遇到已经超过TTL的记录,则从原集合中删除;如果遇到删除操作,则先缓存删除操作,如果候选记录命中缓存的删除操作的key,则进一步判断是否时间戳相等或为删除列或列族,如果匹配则移除候选;如果遇到非删除操作,则先判断其是否命中之前的删除操作,然后判断其是否更接近且不大于目标key,如果是则替换候选值。如果找到能替换候选值的记录,则不再进一步查找。如果在大于等于目标值的集合中,没有找到候选记录,则从小于目标值的子集中开始查找,直至找到非目标表的记录或首个不如候选记录的记录,对于子集中的记录,处理过程和处理大于等于目标子集的逻辑一样(吐槽:甚至有点牵强附会,为了重用搞了一些不符合逻辑的代码)。

然后,遍历每个storefile对象。首先确定文件中的最后一个key值与目标值中较小的确定为起始key(奇怪的是这里还要判断是否不是目标表);构造扫描器(详细内容在第六章进行分析其构造过程);将扫描器seek到起始key(其中有一句逻辑似乎没用);接下来的逻辑和memstore中很像,先向后找直到不等于起始key,然后再向前找直到与目标表不相等或不优于候选值。

随着函数的退出,释放锁。拿到候选值后,进行一次get操作(get细节见后续分析)。

6.3 get

    代码起始位置:HRegionInterface.java 158、160行。

    多个get批量执行,实际上就是循环执行get,除去锁,我觉得批量get是可以优化的。接下来分析单个get请求的处理。

    获取到对应region后,如果有行锁,则延长行锁的租期,开始get处理。首先,如果get请求对象中有列族过滤需求则通过表描述对象先检测列族是否存在。否则,将所有现有列族添加到get请求对象中(吐槽:lock id并没用处)。

    接下来将get请求转化为scanner操作。首先根据get请求对象构造scan请求对象,逻辑比较简单,就是将各个参数对应的赋值;然后开始构造扫描器InternalScanner类对象,构造前后会获取newScannerLock读锁,此时首先依然会进行一次get请求已做过的列族合法性检测(吐槽:那之前的逻辑真是多余);

6.4 写入

6.4.1 客户端部分—Put


    代码起始位置0.98.13  HTable.java 983行。

    首先判断之前的异步flush是否出现异常,如果出现异常,则将put加入异步写入缓冲中并同步执行flushcommits(backgroundFlushCommits方法)。诡异的是,处理到此并没有直接返回,异常也没有处理,而是继续进入没有异常的逻辑:调研Put大小是否超过hbase.client.keyvalue.maxsize(默认值-1,TableConfiguration:76,HTable:996),然后再次将put加入写缓冲,如果当前缓冲中数据长度大于hbase.client.write.buffer(默认值2097152,,TableConfiguration:58,HTable:1001)则进行一次非阻塞flushcommits。最后如果设置autoflush,则进行一次阻塞flushcommits。在flushcommits中,将写入缓冲交给异步处理对象(AsyncProcess.submit)处理,根据是否阻塞决定是否等待处理完成,如果发生异常,则阻塞的等待剩余操作执行完毕,根据失败后是否清空缓冲(clearBufferOnFail) 确定是否将失败操作重新放入缓冲。最后,重新统计缓冲中的数据长度。

    代码起始位置0.98.13  AsyncProcess.java 285行。

    接下来分析AsyncProcess接收到put缓冲后,向RS发送请求前的处理过程。代码中先做流控处理,如果正在处理的任务(之前发送的,每个rs的一次rpc视作一个任务)数大于等于hbase.client.max.total.tasks(默认值100,AsyncProcess:239,AsyncProcess:323),则等待一个任务执行完成;然后遍历处理每个请求,首先根据rowkey找到所在region的HRegionLocation对象,然后按以下条件过滤当前不可操作的请求,被过滤掉的请求会保留在缓冲中,参与下次flushcommits:

  •  每个region正在处理的任务数大于等于hbase.client.max.perregion.tasks(默认值1 ,AsyncProcess:241,AsyncProcess:435);

  •  region所在rs已被判为不可用;

  • 判断当前正在处理的任务数加上当前准备发送的任务(为每个已知rs生成一个任务)是否小于hbase.client.max.total.tasks,否则rs判为不可用,不再为新遇到的rs生成任务;

  • 请求的目标rs的任务数大于等于hbase.client.max.perserver.tasks(默认值2 ,AsyncProcess:243,AsyncProcess:455),如果大于,rs判为不可用;



对于没被过滤的请求,会进行进一步封装并和HRegionLocation对象建立映射关系以便后续发送RPC(代码逻辑中有对于Increase和Append的预防重复提交的逻辑,但是这两类请求的批量请求处理,已不由AsyncProcess.submit函数处理,而是由AsyncProcess.submitAll处理,所以相关逻辑后续介绍)。所有请求整理完成后,进入发送请求部分。

6.4.2 客户端部分—批量Delete、batch

代码起始位置0.98.13  HTable.java 886行。

批量Delete直接调用batch。所以我们直接分析batch即可。可处理batch操作的有多个函数,这里我们分析的是public void batchCallback( final List actions, final Object[] results, final Batch.Callback callback),因为这些函数最终都是调用的该函数,而该函数调用的HConnection.processBatchCallback,这个接口的实现类为HConnectionImplementation。在HConnectionImplementation.processBatchCallback中,主要逻辑只有构造AsyncProcess对象并调用submitAll和waitUntilDone阻塞的发起RPC。submitAll逻辑和上面说的submit大体相似,区别为:

  • 没有请求过滤,所有请求都会被发送;

  • 为防止Increase和Append请求被重复处理,在hbase.client.nonces.enabled(默认值为true,HConnectionImplementation:761,)被设置为true会被加上Nonce。Nonce的本质是给请求附带一个随机数,rs可以根据此随机数防止重复提交(rs可以单方面禁用Nonce,也可以配置Nonce值缓存时间,详见服务端分析)。



6.4.3 客户端部分—发送批量请求

代码起始位置0.98.13  AsyncProcess.java 542行。

代码主要可以分为两部分:构造线程和线程执行内容。首先来看构造线程,代码中遍历之前构建好的HRegionLocation对象及其请求列表映射,对其中的每个元素,统计、缓存rs和region正在执行的任务数;构造对应的任务线程;当线程构造失败时,减去相应的任务数,并进入尝试重试逻辑。

在构造线程时,有一个默认关闭的特性:当hbase.client.backpressure.enabled(默认值为false,ServerStatisticTracker:63,AsyncProcess:629)为ture时,客户端会根据服务端压力和客户端配置的策略,生成带延时的Runnable对象,用以动态缓解服务端压力。

在重试逻辑中,先尝试根据异常重新加载region位置,RegionMovedException中带有region新位置信息,RegionTooBusyException和RegionOpeningException时位置不变,其他异常移除原有region位置缓存;根据hbase.client.retries.number(默认值为31,AsyncProcess:243,AsyncProcess:753)和hbase.client.pause(默认值100,AsyncProcess:239,AsyncProcess:753,并非单纯的阈值,而是根据重试次数计算得出总的超时时间)确定是否还可以重试;通过回调函数对象(此段逻辑中为ObjectResultFiller类对象)确定请求是否可以重试(此逻辑中总是可以重试);然后根据重试次数和hbase.client.pause计算重试前休眠时间,休眠后重新调用submit再次发起请求。

代码起始位置0.98.13  AsyncProcess.java 580行。

接下来我们来分析执行内容。由于请求已经按rs组织好,所以每个线程都只处理向一个rs发送的RPC。代码中首先构造封装过的Callable对象;然后RpcRetryingCaller类(如果hbase.client.backpressure.enabled配置为true,则使用其子类)对象进行调用,整个调用过程,超时时间为hbase.rpc.timeout(默认值为60000ms,AsyncProcess:235,RpcRetryingCaller:753)。如果出现异常调用上述重试逻辑,并终止处理;如果请求正常,则开始处理返回结果,主要是将结果转成batch请求回调函数可用的参数,然后调用对应的回调函数处理结果的组装;然后减小rs、region对应的执行中的任务数。

调用过程:重置超时时间,使其最小不小于两秒;判断rs是否在故障节点缓存中,如果存在抛出异常;如果对rs的客户端没有缓存,则构造实际的客户端(RpcClient.BlockingRpcChannelImplementation.BlockingRpcChannelImplementation类对象),然后包装进protobuf生成类BlockingStub;然后开始通过protobuf生成的各种Builder类组装请求,根据hbase.client.rpc.codec(默认值为空字符串,MultiServerCallable:133)确定请求值的组装位置(如果不为空则集中封装到cellblock,服务端可使用cellscanner遍历);通过protobuf发送multi函数的RPC请求;然后将返回的protobuf的MultiResponse转为自定义的MultiResponse类对象并返回。

6.4.4 客户端部分—Delete、Increase、Append、mutateRow、checkAnd**等

这几个方法实际上就是发起一次独立的rpc调用,过程类似上面所说的执行内容,单个KV请求发送mutate函数的RPC请求,多个则发送multi函数的RPC请求。

对于checkAnd**会额外传入条件参数,服务端会负责进行检测和处理。

6.4.5 客户端部分—根据rowkey获取HRegionLocation

代码起始位置0.98.13  HConnectionManager.HConnectionImplementation.java 1114行。

如果是获取meta表region的物理位置,则从zookeeper读取,超时时间为hbase.rpc.timeout配置值,节点为zookeeper.znode.parent(默认值为/hbase,ZooKeeperWatcher:321,ZooKeeperRegistry:58)和zookeeper.znode.metaserver(默认值为meta-region-server,ZooKeeperWatcher:324,ZooKeeperRegistry:58)两个配置的连接。如果无法读取,则会一直阻塞直到超时。

如果是读取用户表,则先从缓存中拿到rowkey最接近的记录,并比较确认是否是符合要求的记录;如果缓存中不存在,则从meta表中获取:首先获取meta表位置,然后向meta表所载的rs发送getRowOrBefore请求,然后根据返回的region状态,确定是否能当做正常返回结果,如果出现异常则最多重试hbase.client.retries.number次。

6.4.6 服务端部分—mutate 、multi函数响应

代码起始位置0.98.13 HRegionServer.java 3545行。

mutate可以看做是multi的简化版,处理过程不多,即并入multi的处理过程了,所以这里着重分析multi的实现。

首先检测rs是否正常运行中,然后遍历每个region的请求并获取相关HRegion对象,如果region不存在或正在打开,则抛出异常;接下来会有比较多的分支,主要由于不同请求都由一个函数相应造成:

1. checkAndMutate:串行处理请求,目前只支持put和delete,首先根据数据是否保存在cellblock,将protobuf中的信息转换为HBase的请求对象;然后判断region是否是只读状态,region是否因为memstore写满而无法继续相应写请求,region是否正在修复或关闭,进行协处理器处理;为正在处理的数据行加读锁并等待mvcc中到目前为止的写入操作处理结束;然后进行一次get处理,获取到相应的需要check的值并check;如果check通过则调用HRegion对象的mutateRows;最后释放读锁,并调用协处理器。该方法返回请求是否被处理。

2. mutateRow:目前只支持put和delete,直接调用HRegion对象的mutateRows。该方法返回region负载。

3.其他请求(包括get):串行的处理每个请求,对于get或协处理器,执行相应处理。对于写入请求,目前支持append、increment、put和delete,对于put和delete将请求放入集合mutations中稍后批量处理,对于append和increment,先将mutations中的请求处理完毕,然后再调用append或increment,然后将返回结果或异常组织为protobuf对象;最后批量处理put和delete请求(处理内容与mutateRows十分类似,只是顺序不同,代码不同)。这些方法返回CellScannable接口(一般是Result类)对象。

处理完成后,组装protobuf对象,然后返回结果。

6.4.6.1 mutateRows

代码起始位置0.98.13 HRegion.java 5054行。

接下来我们来看Region对象的mutateRows方法的处理过程,注释中将处理分为14步,第一部分预处理:检测rowkey是否符合region范围;检测region是否可写;检测region是否正在关闭;调用协处理器。 

第二、三部分,是批量加行锁:对于每一个rowkey,尝试获取行锁对象,如果行锁对象在其他线程存在,则最多等待hbase.rowlock.wait.duration(默认值30000,HRegion:602,HRegion:3698)毫秒;然后尝试加HRegion对象中updatesLock的读锁,最多等待Math.min{hbase.ipc.client.call.purge.timeout(默认值60000,HRegion:621,HRegion:6255), hbase.busy.wait.duration(默认值60000,HRegion:613,HRegion:6255)* Math.min[行锁个数, hbase.busy.wait.multiplier.max(默认值2,HRegion:615,HRegion:6255)]}毫秒。以上等待超时后,均抛出异常。

第四部分,预处理数据和WAL:对于put,检测列族是否存在,检测每个KeyValue的时间戳范围是否超过当前服务端时间加上hbase.hregion.keyvalue.timestamp.slop.millisecs(默认值Long.MAX_VALUE,HRegion:630,HRegion:3201),如果客户端没有给KeyValue设定时间戳,则设置为当前时间;对于delete,如果没有提供列族,则用所有列族补全,否则检测列族是否存在,对于每个KeyValue,如果删除操作是删除具体列中的数据,则需要先将列中最新的数据get出来,然后根据结果写入时间戳,其他删除操作的时间戳直接使用当前时间;如果没有跳过WAL,将处理过的KeyValue保存到WALEdit类的对象中;再次调用协处理器。

第五部分,获取mvcc值。第六部分,调用协处理器,并将协处理器生成的写入操作加入WALEdit对象。

第七部分,写入memstore:如果启用了tag,则先填充tag;然后给每个KeyValue填充mvcc值;然后将KeyValue对象传入列族对应的MemStore对象中;在MemStore构造中,根据hbase.hregion.memstore.mslab.enabled(默认值为true,MemStore:123,MemStore:230) 确定是否开启memstore内存池,如果开启,根据hbase.hregion.memstore.chunkpool.maxsize(默认值0.0,MemStoreChunkPool:185,MemStore:230)、memstorelimit的两个配置、hbase.hregion.memstore.mslab.chunksize(默认值2097152,MemStoreChunkPool:199,MemStore:230)以及hbase.hregion.memstore.chunkpool.initialsize(默认值0.0,MemStoreChunkPool:203,MemStore:230)确定是否开启和如何构造memstore chunk pool,根据hbase.hregion.memstore.mslab.max.allocation(默认值262144,MemStoreLAB:89,MemStore:230)确定内存池接收的对象最大长度;基于二者全开的情况下,当对象传入MemStore时,首先尝试获取当前空余chunk,如果没有则通过memstore chunk pool获取一个空闲的或者申请一个新的chunk对象,初始化并加入到chunk队列中;拿到chunk后,尝试基于当前偏移量申请占用内存,如果内存不足,则重新获取chunk重试;然后开始拷贝数据,重新构造KeyValue对象(值得注意的是,内存池这部分逻辑包含典型的利用CAS的无锁编程,如果CAS失败则会循环重试);然后保存到MemStore的内存中,更新memstore中数据的时间戳范围和总数据长度。

第八部分,写入hlog:首先获取seqId并进行封装;然后调用各种协处理,并把封装后的对象放入待写入列表中。

第九部分,释放updatesLock的读锁。第十部分,释放行锁。

第十一部分,根据region和操作中最高的Durability设定进行WAL同步。如果为同步写入,则通过同步结果通知线程等待写入线程WAL完成;写入线程所用的hdfs writer,每次rollhlog时重生成,实现类根据hbase.regionserver.hlog.writer.impl(默认值org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter,HLogFactory:204,FSHLog:556)确定;对于写入线程,只要需要写入的seqid大于已写入id,线程就开始写入,写入前重新生成写入列表,然后将原列表中的内容写入writer缓冲中;写入完成后,选择一个空闲的sync线程,如果没有则按照最后的seqid取模取一个(其数量取决于hbase.hlog.asyncer.number(默认值为5,FSHLog:438,FSHLog:1158)),然后通知其已写入的最新的seqid;对于sync线程,每当已写入seqid大于已同步seqid,则会调用hdfs writer的对应同步方法,通知同步结果通知线程最新同步的seqid,然后检测是否需要rollhlog;同步结果通知结果线程每当最新同步seqid更新时,都会更新共享变量,然后唤醒阻塞线程。

第十二和十四部分,再次调用协处理器。第十三部分,更新mvcc最近写入完成的mvcc值。

最后,检测是否需要flush,如果需要则触发一次flush。

6.4.6.2 append和increment

代码起始位置0.98.13 HRegion.java 4421行。

append和increment处理逻辑非常类似,所以这里主要说明append的处理过程。代码中首先将protobuf类转为请求类对象;然后调用协处理器;然后如果服务端开启了hbase.regionserver.nonces.enabled(默认值true,HRegion:576,HRegion:4488)根据nonce获取之前的相同操作,如果没有,则将本操作放入共享集合继续及执行,如果有则等待之前的操作处理结束,根据上个操作的结果确定是否可继续执行;后续真正处理append的过程和6.4.6.1中类似,只不过是需要先通过get读取原值,然后再进一步处理得出新值;append处理完成后,向共享集合中写入结果,更新清理时间;最后调用协处理器。

6.4.7 简要调用关系图



3.png


 

源码分析之灰太狼手札(五):HBase RPC机制

mopishv0 发表了文章 0 个评论 2074 次浏览 2015-09-08 15:22 来自相关话题

五 HBase RPC机制     本章以region server为例讲解HBase的RPC机制。 5.1 初始化和启动 ...查看全部
五 HBase RPC机制

    本章以region server为例讲解HBase的RPC机制。

5.1 初始化和启动

代码起始位置:HRegionServer.java 506行。

    服务实例的创建简单粗暴,直接new出org.apache.hadoop.hbase.ipc.HBaseRPC.Server的实例,其父类为org.apache.hadoop.hbase.ipc.HBaseServer。

    父类构造函数的初始化流程为:初始化监听地址(域名或ip)、端口、配置对象、请求参数实现类类名、请求处理线程数(handler);设置socket发送缓冲长度为0;初始化请求队列;根据max(.small、large和无).callqueue.memory.size、hbase.server.multithrottler和small.queue.request.limit构造请求流控对象(流控逻辑后续分析);设置客户端最大空闲时间、每次最多回收连接数以及回收后预期空闲连接数;根据ipc.server.tcpnodelay和ipc.server.tcpkeepalive设置tcp连接处理方式(Nagle's Algorithm和keepalive?);根据ipc.server.response.queue.maxsize构造请求响应返回流控;构造请求监听线程和请求响应返回线程,请求监听线程构造较为复杂,下面对他的构造进行分析。

    首先,构造一个套接字通道实例(nio的ServerSocketChannel类)并设置为非阻塞模式;绑定socket监听地址和端口,并根据ipc.server.listen.queue.size指定侦听backlog长度;构造选择器实例(nio的Selector类)并向套接字通道实例注册;设置线程名,并指定为守护线程;ipc.server.reader.count构造指定个数的连接读取线程;Listener将得到的请求连接直接交给Reader线程读取,其构造比较简单:设置为守护线程并设置线程名,构造连接对象列表,构造一个未注册的选择器实例。

    至此,基类中的初始化就结束了,接下来分析Server中的初始化操作:设置方法调用实例(此处为HRegionServer实例)及其类型;设置每个请求是否都打印log(默认设置为false);根据hbase.ipc.warn.response.time设置响应过慢时间阈值;根据hbase.ipc.warn.response.size设置请求过多阈值;构造参数说明类。

    最后是线程的启动,比较简单,依次启动响应发送线程,监听线程和请求处理线程(handler)。

5.2 请求监听&分发线程:Listener的运行

代码起始位置:HBaseServer.java 407行。

    从选择器中获取有新socket连接的选择器键;通过每个选择器键获取ServerSocketChannel实例,并通过其尝试获取10次SocketChannel实例;获取后设置连接处理方式(Nagle's Algorithm和keepalive);如果获取成功,则创建请求连接实例,并将其分发给Reader线程、加到连接集合中;最后是输出日志。

    当running为false时,线程结束运行。线程首先关闭socket连接器和通道(调用对应的close函数);然后统计每个请求未响应的的总字节数,并将连接的响应队列清空;减少响应流控对象中的统计值(由于会有某些对象wait在此统计值,所以即使线程退出也要减少统计值从而notifyAll)。

5.3 请求读取线程:Reader的运行

代码起始位置:HBaseServer.java 636行。

    线程首次执行时,会从选择器中获取选择键(SelectionKey),由于没有进行过注册,所以必然没有办法得到结果,所以跳过接下来的处理,先进入连接注册逻辑;注册逻辑是对列表newConnections进行同步的,首先遍历其中元素,对于每个元素拿到其中的通道并注册选择器,向选择键中附加对应的连接对象,便利后清空列表;注册逻辑过后循环结束,如果进程没有终止则会继续循环;注册过的选择器,是能够获取选择键的(有请求时);处理过程中遍历获得的选择键集合,移除选择键后进行真正的请求读取处理。

    先判断选择键是否有效且可读;然后拿到刚刚附加到选择键中的连接对象;标记连接对象最后的访问时间;之后在连接对象中做进一步处理;处理中首先读取4个字节的HEADER信息(合法值为” hrpc”);然后读取1个字节的版本信息并判断其是否符合当前支持的范围;接下来读取数据长度,如果长度为-1则代表此次是一个PING消息,否则构造响应长度的缓冲并记录rpc调用次数;然后开始首次读取内容,每次最多读取64K字节直到读完指定长度;第一次读取出的请求内容是用户信息;接下来再次读取的内容就是请求内容了;最后将构造待解析请求对象放入队列并进行流控信息的标记,请求处理读取就结束了。吐槽:这段逻辑用了一个大的while(true)来处理多种情况,循环会被执行多次,但是每次循环执行的逻辑又不同,莫不如顺序流程可读性更高,还不容易出错。

5.4 请求处理线程:Handler的运行

代码起始位置:HBaseServer.java 1215行。

    线程的循环中,首先取出一个请求,对流控对象进行标记,然后开始对请求进行解析:解析中先读取请求id,如果版本符合,则会读取解压算法、生成解压后的输入流以及获取是否需要记录操作状态(profile);构造参数读取类实例(Region Sever中为org.apache.hadoop.hbase.ipc.HBaseRPC.Invocation),并使其解析请求参数,解析时先读取方法名,然后依次读出参数对象(遵循Writable接口的对象),并获取其类型;构造已解析请求对象,设置RPC压缩算法、连接版本、是否记录操作内容;送回解压对象。

    解析后,设置handler状态,设置请求上下文对象,然后开始调用call函数,此函数由子类提供实现,接下来分析Region Server中此函数的实现过程:首先通过构造时设置的实现类用反射获取调用方法对象;然后记录执行状态,如果需要记录操作状态的话则记录方法名和请求被处理前所消耗的时间;然后用反射调用具体函数;调用完成后再次记录执行信息;将返回对象封装为HbaseObjectWritable;如果响应时间过长或返回结果过长,则打印详细日志并记录监控信息;至此,基类中的实现结束。

    后续的处理中如果请求抛出异常,则进行记录;如果结果是可计算长度的,则将长度加上一个int和byte的长度(加上id和是否出现异常的标记长度),如果长度过长则将结果变为异常;最后将结果写入流,首先写入回话id,是否出现异常;如果版本允许对响应结果进行压缩则获得压缩流;然后将结果与操作状态(profile)或异常写入流中;将数据流赋值给已解析请求对象后,调用响应返回线程的处理响应方法。

    该方法中,首先标记流控;然后如果请求连接没有关闭的话,则将请求放入连接响应队列中;如果响应队列长度为1,则立即处理响应(见5.5)。

5.5 响应返回线程:Responder的运行

代码起始位置:HBaseServer.java 920行。

    一般请求第一次都是从上阶段的接口处开始首次返回响应的,所以这里我们先分析一个链接的首次返回(看代码中一个链接是不可以发送多次请求的)。此处首先拿出链接对应的Socket通道,将返回值写入通道中,每次最多写入64K;如果已写完则减少RPC计数,如果未写完且由Handler进入该函数则注册写入通道,由线程进一步处理。

    线程中,首先不断从刚注册的通道中获取选择器,并从通道中移除选择键;针对每个键,做以上处理;每隔900000毫秒,会从当前的通道中获取选择键,并遍历其中连接对象,遇到第一个请求收到时间至当前时间大于900000的请求则关闭其连接并跳出遍历。

    奇怪的是,看响应线程的处理,每个连接应当能处理多个请求的,但是在请求监听和读取中并没有看到相关处理。 看了下blame,之前每次发送响应请求不一定发送全部内容,所以丢给一个线程慢慢的发送后续结果,而且依据连接对象中记录rpc次数的变量来看,之前连接对象应该是可以复用的。

    整个RPC过程到此就为止了,过程中有各种流控对象,其实现比较简单。当增加流量时,先获取当前值,如果此值已经大于阈值则同步流控对象再次获取当前值并判断(double check的应用),如果依然大于阈值则wait;其他地方减少流量值时,会notifyall,此时wait退出;然后利用AtomicLong.compareAndSet接口写入新值,如果写入失败则重复此逻辑。此段逻辑做到了同步块范围最小,因此需要在多个地方进行检查,更多的可以百度”无锁编程”。

源码分析之灰太狼手札(四):HDFS IO

mopishv0 发表了文章 0 个评论 2102 次浏览 2015-09-08 15:20 来自相关话题

四 HDFS IO  4.1 L2缓存  4.1.1 构造 代码起始位置:Store 259行。     如3.1.3所述 ...查看全部
四 HDFS IO

 4.1 L2缓存

 4.1.1 构造


代码起始位置:Store 259行。

    如3.1.3所述构造CacheConfigBuilder实例时创建L2BucketCacheFactory实例(目前没有多种实现的配置),然后在build出CacheConfig的同时,利用构造L2Cache。此处需要注意一个配置,"hfile.l2.cacheblocksonwrite"(L2_CACHE_BLOCKS_ON_FLUSH_KEY),此配置默认为true,也就是说在flush的时候默认会将block写入缓存,对于SSD做缓存来说是致命的,需要配置为false。

    实例化L2Cache对象时,首先根据配置hfile.l2.bucketcache.ioengine(根据注释,只能有heap和offheap两个值)决定使用内存位置,如果配置为空(也是默认值),则认为禁用L2缓存;然后根据其配置获取对应内存区域的最大内存值;根据hfile.l2.bucketcache.size计算L2缓存大小(配置浮点数和整数的含义不同,前者为最大内存值的比例,后者为占用几MB内存);根据hfile.l2.bucketcache.writer.threads、hfile.l2.bucketcache.queue.length和hfile.l2.bucketcache.ioengine.errors.tolerated.duration配置缓存写入线程数、写入队列长度和发生异常时最长可容忍的时长;根据hfile.l2.bucketcache.bucket.sizes设置每个数据缓存层级大小(此处桶,也就是bucket代表一个缓存数据集合,其长度类似redis中内存按4K、8K、16K…的分级);构造数据缓存桶类BucketCache实例;构造L2缓存类L2BucketCache实例。

    L2BucketCache实际上是BucketCache的壳子,目的是为了提供L2Cache接口的具体实现;所以更复杂的初始化操作包含在BucketCache类中。首先,设置之前的参数,根据ioengine名构造具体的IOEngine对象(实现类为ByteBufferIOEngine);然后,如果做缓存最大只支持32T的校验;按照8K设置一个block的最小值得预设值;初始化存储管理类,其中主要包括缓存长度信息(其中包括该长度中的所有桶列表、有空余空间的桶列表、未缓存block的桶列表),数据桶信息并对其进行初始化(根据总长度和此层长度划分出缓存数组类似memcache的结构,这里指规划在总体缓存中的偏移量),按顺序每级缓存长度一个数据桶,多出的全部分给最后一级;为每个线程创建写入队列和标记信号对象;构造正要写入缓存的对象Map和缓存信息Map;构造写入线程;创建定时统计任务,此任务只负责打印缓存状态日志。

4.1.2 写操作

代码起始位置:L2Cache.java 45行。

    首先来看看接口中cacheRawBlock函数的注释:添加块到L2缓存。该块必须由一个可被写入到磁盘的字节数组来表示。可见,接口设计方面考虑到了写入磁盘的可能性。

接下来分析一下具体实现:首先,将hfile文件名和偏移量封装缓存键值对象;然后将键值和块内容的字节数组交给BucketCache对象处理;如果正要写入缓存Map中或缓存信息Map中有相关记录则直接退出函数;将字节数组、键值是否保存至内存(默认为false)和缓存序列号封装为缓存值对象并保存至正要写入缓存Map中;根据键值对象的哈希值选择并放入缓存写入队列;如果放入队列成功则增加缓存计数,增加缓存长度大小,保存hfile到缓存键值的映射关系;如果放入队列失败则等待50ms再次尝试,如果依然失败则从正要写入缓存Map中移除并增加失败计数。

进一步保存由WriterThread处理。线程中首先将队列中所有缓存对象取出;然后针对每个对象写入缓存:先找到合适的桶大小(缓存长度级别),然后尝试获取有空余空间的桶,如果无法获取则从有多个桶且有未缓存block的桶的缓存级别中拿来一个未缓存block的桶并重新初始化,从缓存数组中拿出一个元素的偏移量,根据桶的情况在修改长度层级中的有空余空间的桶列表和未缓存block的桶列表,将偏移量、长度、保存时间和是否在内存中封装为BucketCache.BucketEntry类对象实例,根据偏移量计算在总缓存(以ByteBuffer数组的形式引用)中的起始下标位置,然后依次写入缓存内容(此处做了较复杂的处理,猜测目的是为了适应buffer数组长度不能与缓存长度级别自适应的情况),至此数据已经写入缓存,如果期间发生异常需要及时释放缓存;写入正确后更新IO异常起始时间为-1;更新正要写入缓存的对象Map和缓存信息Map;如果已缓存总长度大于预设值(写死的95%)则开始回收缓存,见4.1.5。

这个过程和某些内存缓存存储的方案类似,不过区别是,这里总是拿一个完全空的桶,然后根据不同的长度桶中可存储的block个数不同。而有的内存存储中,是通过类似128K分裂为两个64K的桶,64K的桶适时的分裂为2个32K的桶来实现的。HBASE的方案好处是不需要进行整理,但是当某一级别的桶不够用时,有可能很多桶不满但是也不完全空。分裂的方案优点是存储空间灵活分配,但是有时需要整理,要不然会产生碎片。

4.1.3 读缓存

代码起始位置:L2Cache.java 35行。

    首先来看看接口中getRawBlock函数的注释:从L2缓存中取回block。该块由一个在磁盘中存储的字节数组来表示。

    如果读取时,记录仍在正要写入缓存的对象Map,则直接从其中获取数据,记录命中信息后返回;否则从缓存信息Map中获取缓存数据桶信息,根据信息中的长度和偏移量构造byte数组并从ioEngine对象中读取缓存数据;记录命中信息;如果已缓存信息中也没有命中,则记录未命中信息。

4.1.4 移除缓存

代码起始位置:L2Cache.java 53行。

    移除缓存以hfile为单位,首先从缓存索引中,拿出所有指定文件的缓存信息;然后将信息进行一次拷贝,用于生成缓存快照,此后可能会有新的缓存进入或移除,拷贝一次可防止并发导致的误操作;之后,针对每一个缓存信息:首先试图从正写入集合中移除;然后试图从已写入缓存集合中拿出对应缓存信息,然后获得缓存所在桶及对应缓存长度级别,释放桶中对应元素,之后各种标记。

    吐槽一下:虽然此段逻辑没有IOEngine的关系,L2缓存的逻辑层次到了内存IOEngine级别后较多,但是功能结构划分又不明确有待优化。

4.1.5 回收缓存

代码起始位置:BucketCache.java 743行。

    代码中首先根据DEFAULT_MIN_FACTOR计算每个缓存长度级别应当保留多少纯空的数据桶(最少1个),并计算出依次标准能释放出的存储量,如果总量小于等于0说明空间很充足不需要释放;然后打印诸多log;然后统计不同优先级SINGLE(首次加入缓存)、MULTI(被多次访问后)、MEMORY(默认未开启)归类(优先级依次提高);根据每个优先级的实际占用总和与DEFAULT_SINGLE_FACTOR、DEFAULT_MULTI_FACTOR和DEFAULT_MEMORY_FACTOR确定每个优先级多占了多少内存;从多占内存与(之前计算的可释放的存储量-已释放存储)/剩余优先级数量这两个值中取较小值作为此次回收内存的标准,开始释放本优先级内存;释放内存比较简单,即遍历根据之前统计各优先级占用存储时生成的优先队列,并释放移除的缓存(见4.1.4),直到达到此次释放标准或已遍历所有其中元素;最后,打印log释放锁。

4.1.6 关闭缓存

代码起始位置:L2Cache.java 58行。

    关闭缓存的逻辑比较简单:标记缓存关闭;关闭IOEngine(目前的内存IOEngine没有任何操作);关闭统计线程;中断缓存写入线程;清空正在写入缓存集合和已缓存集合。

4.2 HDFS写入类V2

4.2.1 构造


代码起始位置:StoreFile.java 892行。
 

源码分析之灰太狼手札(三):Master命令执行线程Worker

mopishv0 发表了文章 1 个评论 2001 次浏览 2015-09-08 15:17 来自相关话题

三 Master命令执行线程Worker 代码起始位置:HRegionServer 2094行。 Worker是region server执行master发来的指令的 ...查看全部
三 Master命令执行线程Worker

代码起始位置:HRegionServer 2094行。

Worker是region server执行master发来的指令的线程。Worker在HRegionServer构造时初始化,WorkerThread在HRegionServer线程run()函数之发送注册信息成功之后的线程初始化的逻辑中初始化并启动,WorkerThread以守护线程的方式启动,当发生未捕获到异常时,未捕获到异常处理器会结束region server进程;在HRegionServer线程run()函数之服务结束的逻辑中停止;只要线程处于活动状态(isAlive返回true),region server就会认为它状态正常。

接下来,按照MSG_REGION_OPEN、MSG_REGION_CLOSE、MSG_REGION_CLOSE_WITHOUT_REPORT、MSG_REGIONSERVER_QUIESCE、MSG_REGION_FLUSH、MSG_REGION_SPLIT、MSG_REGION_MAJOR_COMPACT与MSG_REGION_COMPACT、MSG_REGION_CF_MAJOR_COMPACT与MSG_REGION_CF_COMPACT的顺序进行分析。

3.1 MSG_REGION_OPEN

3.1.1 命令处理


代码起始位置:HRegionServer 2120行。

首先检测是否已获取到root表所在region或者当前需要open的region是否是root表。如果没有root表所在region信息则将命令重新放到toDo集合中,等待root表信息获取成功。如果root表信息已经获取成功,则创建并执行open region线程。线程主要调用openRegion方法。

方法中,首先将region正在open(HBaseEventType.RS2ZK_REGION_OPENING)的信息写入zookeeper;然后分配hlog;根据配置中hbase.hregion.impl的值创建HRegion实例并初始化;设置偏好节点(与flush有关、favoredNodes);对原有region的每个store文件则进行一次compact(创建compact任务,并放入线程池中执行);将region信息放入在线region集合并从opening region集合中移除;向master发送region已open信息;向zookeeper中写入region已open完成(HBaseEventType.RS2ZK_REGION_OPENED)。其中HRegion初始化过程比较复杂(构造,紧接着调用initialize方法),下面分析这个过程。

3.1.2 HRegion初始化

代码起始位置:HRegionServer 2257行。

HRegion构造函数:配置(这里的配置指设置对象引用)表文件路径、文件系统对象;获取key比较器(root、meta和普通表的比较器不同);配置HLog对象;初始化region用配置对象,并将表属性加入配置中;配置master传递过来的region信息;配置memstore flush监听器;根据hbase.hregion.memstore.percolumnfamilyflush.enabled配置是否允许根据单个familiy刷新memstore文件,开启此项后需要配合hb...store.percolumnfamilyflush.flush.size设置flush阈值;初始化region目录;根据hbase.hregion.keyvalue.timestamp.slop.millisecs设置key可容忍的最新的时间戳;根据hbase.hregion.memstore.flush.size设置flushSize;设置是否允许WAL;根据hbase.hregion.memstore.block.multiplier设置触发flush阈值是flushSize的多少倍;根据hbase.hregion.memstore.block.waitonblock设置flush时是否阻塞;初始化扫库任务集合。

HRegion.initialize方法:这个方法是初始化和open region的主要方法。方法开始时,设置任务监控对象,并为任务监控对象设置动态代理(但是动态代理什么都没有做,意义何在啊),任务监控对象保存在TaskAndWeakRefPair对象并存放到任务监控对象集合中,TaskAndWeakRefPair保存监控对象本身和本身的弱引用,当内存匮乏时,弱引用会较快失效,并在获取集合时会清理时导致监控对象失效从而使其从集合中移除,监控对象在initialize的过程中记录执行状态;然后在region目录中创建region描述文件(如果存在则忽略创建过程);删除原有region临时目录;

然后开始加载原有region存储文件。此段逻辑有一大段注释:加载所有region存储文件(HStores)。当replay log时我们不希望丢失任何数据,所以在replay时必须用谨慎的策略。对于每个存储文件(store),计算最大的已保存到文件中的log id(seqId)。在replay时,忽略小于等于log id的hlog。我们不能只从所有存储文件中最大的已保存到文件中的log id中选择最小值,因为多余的、不连贯的log id会产生不确定的问题(不是有版本控制和标记删除么?因为标记删除是可配的?)。

接下来看看具体逻辑是什么样的:首先,创建加载存储文件线程池,hbase.hstore.open.and.close.threads.max会限制最大线程数;然后对于每个列族(family),多线程地加载每个列族的存储文件并创建Store对象(包含StoreFile集合,StoreFile代表具体的region列族的数据文件);统计除bulk-loaded外的存储文件的最大的log id用于replay;统计包括bulk-loaded的存储文件的最大的log id,用于初始化region分配给新操作的log id值;统计最大时间戳(可以是用户指定的值)并+1,用于版本控制;设置每个Store对象的最后刷新时间为当前时间;然后开始replay log,将返回的log id和上面统计的log id取最大值+1返回函数并设置给HLog对象;replay完成后删除HLog文件。

这段逻辑中又有两部分比较复杂:构造Store对象和replay log。

3.1.3 创建Store对象

代码起始位置:HRegion 612行。

构造函数中首先调用自己的另一个进行基础设置的构造函数,首先设置region信息和store目录;设置列族信息和配置;根据hbase.hstore.majorcompaction.compression设置major compact压缩算法;根据普通列族、root列族、meta列族设置不同的compactor;初始化MemStore对象,初始化中主要是初始化多个集合;根据hbase.hregion.max.filesize设置存储文件最大长度;根据hbase.hstore.blockingStoreFiles设置最大compact文件数、hbase.hstore.close.check.interval设置每写入多少字节检测一次是否需要compact。

其中有一个十分重要的对象的初始化:CacheConfig cacheConf。这个对象负责block的缓存工作。一级缓存使用LruBlockCacheFactory的实例,二级缓存使用L2BucketCacheFactory的实例,两者都是单例的。首先构件CacheConfig工厂对象,它会获取上述两个类型实例并从配置文件和列族信息中读取多项配置(详见CacheConfig 567行)。然后,申请缓存并构建CacheConfig对象。

退出进行基础设置的构造函数后,多线程地加载列族目录中的数据文件(忽略长度为0的文件,生成StoreFile对象集合),主要是根据传入参数设置一些属性;如果文件是引用文件,设置引用文件实际路径;根据io.storefile.bloom.enabled决定是否使用布隆过滤器,然后获取文件更新时间。

加载数据文件后,根据hbase.peak.start.hour和hbase.peak.end.hour设置hbase尖峰时间;根据hbase.compactionmanager.class设置CompactionManager实现类;根据kvaggregator设置key value处理实现类KeyValueAggregator实例(类似key value处理钩子);根据compaction_hook设置compact钩子CompactionHook实现类。

3.1.4 replay log

代码起始位置:HRegion 647行。

首先,过滤region目录中符合正则表达式"-?[0-9]+"的文件,这些文件为HLog文件;然后对于每个HLog文件,进行replay,并返回最大log id。

replay中,首先创建任务监控对象(如3.1.2中所述)并根据hbase.regionserver.hlog.reader.impl创建HLog.Reader类实例。然后顺序处理每一条操作日志的每个key/value:检查region和familiy是否为要replay的region中的内容;检测操作日志中的log id是否大于store文件中的log id,这一步之所以要在此处判断是为了统计忽略了多少KeyValue;如果通过检测,则将KeyValue对加入MemStore中,加入后如果需要flush MemStore,则在处理当前操作日志完成后flush一次(会在MSG_REGION_FLUSH中具体分析);每个操作处理完成后,都需要向任务监控对象(具体replay数和忽略数)和master汇报(将正在处理open region放到2.3中所说的向master发送的消息集合outboundMsgs中)。

当replay操作完成时,标记任务监控对象;返回最后一次replay的操作的log id。

3.2 MSG_REGION_CLOSE和MSG_REGION_CLOSE_WITHOUT_REPORT

3.2.1 命令处理


代码起始位置:HRegionServer 2145行。

    此处的REPORT是指向zookeeper汇报,如果WITHOUT_REPORT则不对zookeeper节点做处理。接下来按照需要向zookeeper汇报的逻辑进行分析。

    命令处理直接调用closeRegion,函数中首先向zookeeper中写入region正在关闭事件(HBaseEventType.RS2ZK_REGION_CLOSING);然后获取region对象,调用region的close方法;从在线region集合和统计region log id的集合中移除region信息;向近期关闭的region列表中加入region信息(列表中只保存3个最近关闭的region);最后,向zookeeper中写入region已关闭事件(HBaseEventType.RS2ZK_REGION_CLOSED)。

3.2.2 region关闭

代码起始位置:HRegionServer 2402行。

region.close()是本地region关闭时的主要处理内容,函数注释内容为:关闭此HRegion。只要参数abort(退出进程)不为true就会flush缓存。关闭每个HStore并不再响应任何请求。此方法会消耗一些时间,因此不要在时间敏感的线程中调用此函数。返回值为所有组成HRegion的HStoreFile的列表。当此region此次不关闭或已经被关闭过时,返回null。

    接下来分析函数的代码逻辑:首先建立任务监控对象;然后获取splitLock可见关闭与split是互斥操作;然后获取写状态对象writestate锁,禁止写入操作;等待compact和flush结束(此处使用writestate.wait()阻塞,我觉得容易造成死锁或错误的阻塞,应该使用wait(long timeout));如果之前没有刷新过且MemStore中的数据缓存大于hbase.hregion.preclose.flush.size则进行一次flush MemStore,完成此操作后将要禁止读取操作。

然后获取扫库操作锁、获取split和关闭操作时使用的锁对象splitsAndClosesLock的写入锁(该锁将会阻塞部分读取操作);设置region closing标记为true;等待行锁释放;再次执行刷新MemStore;多线程地执行store.close()(分布式的关闭文件读取对象),关闭列族数据存储对象;向结果列表中加入已关闭的数据存储文件StoreFile对象;设置region closed标记为true。

3.3 MSG_REGIONSERVER_QUIESCE

    此命令要求关闭所有用户region,所以在关闭前排除meta表region即可(可是root表呢)。

3.4 MSG_REGION_FLUSH

3.4.1 预处理

代码起始位置:HRegionServer 2191行。

    首先翻译一下函数注释。当发生以下情况时,不会flush缓存:缓存为空;region已关闭;已有flush操作;该region不可写。此方法会产生一段时间的阻塞,所以不应该在时间敏感的线程中调用它。参数selectiveFlushRequest如果为true,则选择性的flush列族(memstore大小由列族指定),同时此功能需要在配置文件中开启。如果返回true,则代表缓存已flush。

    接下来分析代码逻辑:根据配置hbase.hregion.memstore.percolumnfamilyflush.enabled再次为参数赋值;创建任务监控对象;如果region已close或写入状态对象writestate中已标记flushing或不可写入则退出函数,否则标记flushing;获取splitsAndClosesLock读锁,使该操作与split和关闭region互斥;如果选择性flush参数为true则过滤需要flush的Store对象(执行master发送命令时,此参数为false),过滤条件比较简单,判断memstore大小是否大、于列族中的设置;然后调用internalFlushcache 开始具体的flush操作;flush后释放锁、去除flushing标记。

吐槽:写hbase的也会写这样的代码,值得吐槽一下return (store.getMemStoreSize() > this.columnfamilyMemstoreFlushSize) ? true : false;此处还有一点flushcache返回值代表是否已flush,但是返回值是由internalFlushcache函数返回的,该返回值的注释是region是否需要compact;另外操作中的锁对象变化比较复杂,应当考虑用状态模式替换各种操作下的region状态。

internalFlushcache函数有一大段注释。flush Memstore有一些棘手。Memstore中有很多已写入HLog的更新。Flush Memstore到磁盘的过程中,要尽可能多的处理读写请求。同时,为了容灾时能区分flush过的数据和需要重演到Memstore中的数据,HLog也必须能清楚的界定刷新时间。因此,flush处理分为三步:A.flush memstore中的数据至磁盘中并为log指出seqid(log id);B.根据上步中的seqid向log中写入” FLUSHCACHE-COMPLETE”消息;C.移除已flush过的冗余的memstore。此函数会阻塞一些时间。返回值为true代表需要进行compact。接下来将按照注释中的划分分析代码逻辑。

3.4.2 准备阶段

    代码起始位置:HRegion 1386行。

    虽然,函数注释中,将处理过程分为了三部分,但在代码中A开始前还是有一长段处理逻辑的。

    首先通过updatesLock禁止写入操作;然后获取需要flush和不需要flush的Store对象集合;获取两个集合中的最小的log id(这是一个记录在memstore中的变量);在HLog对象中将region对应的最小log id从firstSeqWrittenInCurrentMemstore(当前region未flush最小log id)移动到firstSeqWrittenInSnapshotMemstore中(当前region正在flush最小log id,如果flush中途失败其中的内容不会移回firstSeqWrittenInCurrentMemstore,但所有wal的log处理两个集合都是同时处理的,因此不必担心无法滚动日志的问题)并返回最小log id-1作为flush起始id;为每个Store对象生成StoreFlusher(没有复杂逻辑只是new一个对象);调用flusher.prepare(),其主要工作是生成memstore的快照;释放updatesLock写锁。

    flusher.prepare()的逻辑稍微复杂些,这里单独分析下。首先,获取并加写锁;然后,将Memstore中的key value集合引用付给快照变量,并重新构造一个集合付给原有变量;同样的,将最小log id、时间戳范围跟踪器、key value删除计数器和Memstore内存申请对象(其实只有一个scanner引用计数会被用到,为什么重新构造呢)都付给快照对象并重新构造;释放锁。

3.4.3 Step A & B & C

    代码起始位置:HRegion 1579行。

本步首先调用flusher.flushCache(status)。函数中首先获取需要读取的最小时间戳(包括列族设置和所有扫库对象);设置最大版本号并创建遍历对象(InternalScanner接口、StoreScanner类的实例),用于遍历刚刚创建的Memstore的快照,其中过滤器列表的参数内容是一个CollectionBackedScanner类实例。

    构造遍历对象中,首先调用自身的设置参数的构造函数,这个函数中会设置缓存、列族存储对象(Store)、扫描列(此次操作中不设置列)、设置回闪(撤销删除)时间、根据io.storefile.delete.column.bloom.enabled和use_delete_column_bloom_filter决定是否使用布隆过滤器;然后,创建Key Value过滤器(ScanQueryMatcher),主要是将start、end key、回闪时间、扫描列和最大版本数等参数;根据遍历器中的统计信息决定是否使用此过滤器(flush中使用的CollectionBackedScanner类实例直接返回true,即必然使用);定位遍历器到需要遍历的第一行。

    构造完Memstore中记录的遍历对象后,开始构造KEY VALUE写入对象。首先,通过连缀的方式构造写入对象构造工厂;然后,构造文件写入对象(StoreFile.Writer类)实例。构造实例时,首先将没有设置的关键参数设置为默认值并设置临时文件路径;然后,根据hfile.format.version创建不同版本的HFile Writer构造工厂实例并进行参数配置(包括压缩算法,在每次写入一个block后会对block进行压缩);创建输出流实例,如果文件系统对象为HDFS的DistributedFileSystem,则尝试传入上文所述的偏好节点(favoredNodes);然后创建不同版本的HFile Writer实例。之后是构造布隆过滤结果写入对象:根据io.storefile.bloom.enabled和之前的列族设置决定是否使用布隆过滤器;然后如果HFileWriter的版本为2,则根据io.storefile.delete.family.bloom.enabled和io.storefile.delete.column.bloom.enabled创建列族删除和列删除操作布隆过滤器。

    之后通过上述构造好的遍历对象遍历Memstore中的KeyValue对象,将其进入memstore的时间标记设置为0;向StoreFile.Writer中写入。写入逻辑比较复杂。首先是对每个布隆过滤器进行处理。由于KeyValue对象在Memstore中是有序的,所以每次比较是否为新key时,只需与上一个对象比较即可。如果为新key,生成布隆过滤器key和偏移量并将记录加入过滤器中。对于列族删除和列删除操作布隆过滤器也有相似处理逻辑。然后向HFileWriter中写入数据;更新记录时间戳范围。写入后计算flush字节大小。

    所有KeyValue写入后,写入StoreFile的meta信息,包括最大log id、是否是major compact生成以及时间戳范围;然后调用钩子函数(目前没有特殊处理InjectionEvent.STOREFILE_AFTER_WRITE_CLOSE的钩子);然后将生成的文件从临时目录转移至region文件目录中;调用处理InjectionEvent.STOREFILE_AFTER_RENAME事件的钩子函数;创建StoreFile文件对象,主要是设置一些属性;创建文件对象对应的reader,创建reader的过程中同样要根据不同版本创建不同的HFileReader实现类对象,然后再重新将meta信息和布隆过滤内容重新加载出来(为了顺便校验文件么,可是之前已经有相同的步骤校验文件了啊……之前的校验意义何在呢)。

    以上为flusher.flushCache(status)的逻辑。之后执行钩子函数internalPreFlushcacheCommit所返回的钩子(目前没有钩子)。然后逐个调用flusher.commit()完成flush。其中的逻辑一开始主要是更新监控和输出log;然后将新生成的StoreFile对象加入Store的有序列表中;将之前建立的快照丢弃;关闭之前的遍历对象;检测是否需要compact(与3.1.3中逻辑类似,与storefile数量和配置有关)。

    最后清空fluser集合并更新每个列族存储对象store的最后flush时间。步骤A到此结束。

    步骤B时,StoreFile已创建成功,此时开始标记HLog中完成flush的log id。先从firstSeqWrittenInSnapshotMemstore中将3.4.2中的log id移除;然后在regionServer中将该region已flush的最小log id。

    步骤C也比较简单,首先notfyall所有等待flush的线程。然后记录log和最近flush信息,然后返回是否需要compact。

3.5 MSG_REGION_SPLIT

3.5.1 获取split row key


代码起始位置:HRegionServer 2156行。

    处理请求前,先进行一次flush。然后标记进行手动split(force split),传入预设定的row key。然后针对每个列族存储对象(Store),判断其是否有引用的存储文件,如果有则不可split。然后,开始获取split的row key,如果之前请求中有设置,则会直接返回请求中的值,否则每个列族获取一次split row key,然后取最大列族(存储文件总长度)的中间row key为此次split的row key。

    列族中的中间row key的逻辑为:遍历所有存储文件对象,选取最长的存储文件,读取其中间row key(偏移量写在文件尾,读取时只需取出对应block,如果文件中没有则读取中间的block);然后,比较开始row key和结尾row key,不能相等,如果相等则不能split返回null,会选用其他region的中间row key。

    获取split的row key后,即向处理split的线程池中加入任务。

3.5.2 执行split

代码起始位置:SplitRequest 38行。

    该段逻辑主要包含在region.splitRegion(midKey)中。该函数注释为:Split HRegion为两个新region并关闭当前region。Split应当比较快速,因为不必重写store file,而是创建store file的按上下两部分分割的软链。

    接下来分析代码逻辑。首先,判断split row key是否与region的start key或end key相等,如果相等,则不需要split。如果不相等,在父region目录中建立splits目录;然后创建两个HRegionInfo对象,主要是设置一些基本信息;在生成的split目录中创建以两个region名命名的目录;然后开始关闭父region并获得该region所有store file对象,逻辑参见3.2.2;然后针对每个store file对象,以“文件名.父region名”为名生成软链文件。然后创建HRegion对象,逻辑见3.1.2第一段,但并没有让其初始化。然后将splits目录中的文件移至新生成的region目录中。

3.5.3 后续处理

代码起始位置:SplitRequest 53行。

    得到split出的两个region对象后,开始更新region信息,.META.更新-ROOT-表,其他表的region更新.META.表:将父表设置为下线和split;然后将父表信息从在线region集合和flush log id集合中移除;将对应表中父region的起始时间戳和实例位置置空,写入split后的子region信息;向对应表中写入子region信息;添加向master汇报split结束的消息;输出LOG。

3.6 MSG_REGION_MAJOR_COMPACT和MSG_REGION_COMPACT

3.6.1 构造compact处理线程----默认策略

代码起始位置:HRegionServer 2165行。

    命令处理中,首先将是否强制compact写入region的所有store对象中。然后针对每个store,创建并执行compact处理线程。接下来先分析线程构造的逻辑。

    逻辑首先判断region是否可写并加入读锁(可以与flush互斥)。如果当前有正在compact的文件,则获取其中最后一个,再次compact时将不包括其+1之前的文件。然后开始过滤需要compact的文件,在这段逻辑中只有设置了major compact并且上一步没有过滤掉文件才能被认作真正需要执行major compact。

如果设置了hbase.store.delete.expired.storefile为true和TTL并且有超时的storefile,则此次compact只对这些超时文件进行删除处理(如果为major compact,此处则不会检测ttl)。然后开始过滤大文件,找到首个长度小于hbase.hstore.compaction.max.size的文件在集合中的位置,然后将这些文件从compact文件列表中移除。然后开始判断是能成为major compact,需要符合三者之一:(1)有引用文件;(2) compact文件数小于hbase.hstore.compaction.max并且store file最小更新时间范围在major compact范围内(根据hbase.hregion.majorcompaction和hbase.hregion.majorcompaction.jitter计算)并且当前时间不是hbase尖峰时间(hbase.peak.*设置)并且当前并不是只有一个已major compact过的文件;(3) 被认作真正需要执行major compact。

如果不需要major compact,则开始过滤文件:如果hbase.hstore.compaction.exclude.bulk为true,则忽略通过bulk-loaded导入的文件。然后根据预定算法过滤文件,调用applyCompactionPolicy函数,这段逻辑的注释为:首先如果配置了不包括bulk-loaded导入的文件则过滤之,然后根据新文件比例(compactRatio)确定一个偏移值,使新文件也能满足这个标准。如果文件数大于最大compact文件数,则会在compact()函数中递归。考虑最老的文件,避免compact [end-threshold,end)范围内的文件。最后,文件成为所有文件的compact结果。代码逻辑中首先标记非尖峰时刻compact数,并根据是否为尖峰时期和hbase.hstore.compaction.ratio、hbase.hstore.compaction.ratio.offpeak得到文件过滤比值。然后以hbase.hstore.compaction.max为窗口,从后向前的计算窗口内文件长度总和(O(n)的算法)。然后根据hbase.hstore.compaction.min.size、hbase.hstore.compaction.min、文件过滤比值和窗口内总文件大小计算出起始compact窗口位置,从而过滤掉过大的文件(需要比较有经验才能设置好这些参数)。注释中给出了如下配图,比较能表达其思想。


1.png



如果过滤后文件数量小于hbase.hstore.compaction.min则将compact文件列表置空。如果大于hbase.hstore.compaction.max则只保留前hbase.hstore.compaction.max个文件。至此,已筛选出需要compact的文件。

筛选完文件后,将其加入正在compact文件列表中并重新排序。如果筛选出来的文件不等于总文件数,则不会进行major compact。然后根据hbase.hstore.blockingStoreFiles计算任务优先级,当前总文件越多,计算出的值越低。然后new出compact线程对象并释放读锁。

由于是手动触发操作,此处优先级会被置为1。然后线程对象会根据需要compact的总文件长度和hbase.regionserver.thread.compaction.throttle被放入不同的线程池中运行。

3.6.2 构造compact处理线程----按时间分层策略

代码起始位置:TierCompactionManager。

    按时间分层的compactionManager类TierCompactionManager覆盖了applyCompactionPolicy函数,封装了对应的配置类TierCompactionConfiguration(extends CompactionConfiguration)。其构造函数主要是加载配置,值得注意的是,他可以针对每个表和列族进行单独配置。

    然后是过滤逻辑,其代码注释为:首先如果配置了不包括bulk-loaded导入的文件则过滤之;将文件从老到新排列,然后选择一个适当的[start,end)范围。从老到新尝试每个file在列表中的索引值作为start,并定义对应的end (也是file在集合中的索引值)。当[start,end)为一个可接受值时停止。当[start,end)为一个可接受值时应当满足以下条件:(1) fileSize[start]最多为maxCompactSize;(2) 参与compact的文件数量最少为currentTier.minFilesToCompact;(3) fileSize[start]最多为minCompactSize或最多为剩余文件的配置的倍数(我觉得这段注释语法不通至少缺个of)。end值为endInTier[tierOf[start].endingInclusionTier],默认情况下currentTier.endingIndexForTier = currentTier(拜托,这种注释是给作者本人看的吧),因此默认情况下,end总是为currentTier的最后一个文件的索引值+1。默认情况下要确认不同层次的文件不会被同时选取。注释给出了示意图:

2.png


    接下来分析代码逻辑。该逻辑的重点是将文件按时间新旧分层compact。设文件列表数组为f[0, n-1],先求第i个文件与后续文件的长度和sum[i]。然后开始对文件分层并计算每层的end对应的f中的下标,处理过程为从配置中依次取出层对象进行以下处理:如果配置了hbase.hstore.compaction.(表名.列名或Default,以下忽略此段配置前缀).IsTierBoundaryFixed为true则代表使用类似crontab的方式配置(表达式为** .Tier.第几层.Boundary)分层并计算出该层的起始时间点。n向0的遍历f,从0开始遍历层,层数配置为**.NumCompactionTiers,由于end是开区间所以层的end下标从n开始也就是实际结束下标+1;判断每个文件的最小flush 时间是否符合当前层时间范围,如果用crontab则判断其是否大于等于计算出的值,否则判断其是否距现在小于等于**. Tier.第几层.MaxAgeInDisk并且小于等于当层文件最大值**. Tier.第几层.MaxSize。一旦不符合当前层的条件,则切换到下一层,否则标记f[i]属于当前层。

    接下来尝试能够进行compact的[start,end)。大多数情况**.IsRecentFirstOrder都会配置为true,而且为false时逻辑比较简单,直接从f[0]所在层尝试是否可以作为start,不行则start++。所以这里重点说下从最新的层开始尝试的策略。首次尝试时,首先获取首个层end值不与第0层end值相等的层,因为有的层中可能没有文件(所以这里需要用过while循环进行查找),找到的层的end即为首次可尝试的start。如果无法进行compact(后续分析判断标准),则在当层中+1如果start值等于当前层end值,则切到下一层,并找到第一个与当前层end值不相等的层的end值,将其作为start返回。此处需要说明的是因为每层的取值范围为左闭右开,所以每层的end实际上等于上一层的start。拿到start后,先获取start所在层的end值(f[i]对应的层上段逻辑已求出)。然后通过sum数组值获取当层除start外,其他文件总长度。如果满足以下条件,则为可进行compact的start和end值:(1) 当前文件小于等于hbase.hstore.compaction.max.size。(2) end与start之间的文件数大于等于**. Tier.当前层号.MinFilesToCompact。 (3) 当前文件长度小于等hbase.hstore.compaction.min.size或小于等于剩余文件总长度乘以**. Tier.当前层号.CompactionRatio。选取好start和end值之后会输出较详细的log,可用于推断正确性。最后截取并返回文件列表。

3.6.3 执行compact处理线程

代码起始位置:CompactSplitThread 168行。

    线程中的run函数逻辑比较简单,调用HRegion的compact方法,该方法完成了compact操作。操作完后,如果当前操作优先级为不为正值(即列族中的文件数compact前大于hbase.hstore.blockingStoreFiles),则继续请求一次该列族的compact,否则如果符合以下条件则进行split:compact优先级不为负数;region的所有列族文件中没有引用文件;region的某个列族文件总长度大于maxfilesize(可在表和hbase.hregion.max.filesize进行设置)。获取split key和split的逻辑见3.5。

    接下来分析一下compact的逻辑。首先,获取splitsAndClosesLock锁并确认当前region没有关闭;然后,增加正在compact的操作数;调用doRegionCompactionPrep函数,该函数没有实现,如果做预处理可以在此处修改;然后进入compact对应的store对象进行compact操作;最终,释放锁。

    store的compact操作过程为:获取待compact文件中的最大log id(记录在文件的meta信息中);初始化任务监控信息;获取待compact的文件中最小的flush时间,统计所有文件中的记录数总和;针对每个待compact的store file,生成扫描类StoreFileScanner对象实例,StoreFileScanner封装了HFileScanner的实现类包括ScannerV2、EncodedScannerV2和ScannerV1;如果此次compact为major compact并且配置了major compact压缩算法hbase.hstore.majorcompaction.compression,则使用此算法作为compact压缩算法,否则使用表中声明的算法;获取目前所有scanner的最小起始扫描时间(似乎只为满足接口,因为后续会忽略掉MVCC),并设置给threadlocal变量;根据是否为major compact和hbase.hstore.time.to.purge.deletes设定保留删除记录的时间范围;构造StoreScanner类实例,根据列族中设置的回闪时间设置回闪时间点,构造记录过滤类ScanQueryMatcher对象实例(设置各种属性),过滤掉所有scanner中最大记录时间小于TTL时间的scanner,构造记录堆排序对象;构造store file写入对象,写入目录为region的.tmp,过程见3.4.3;接着开始通过StoreScanner类实例遍历待compact store file中的记录,遍历的具体逻辑在scan操作时再做分析;对于每条记录如果有compact钩子则先进行钩子处理,然后向store file writer中追加记录;每写入hbase.hstore.close.check.interval长度的记录后,检测一次region是否依然可写入,如果不能写入则删除已写入的文件并抛出异常;最后,向store file writer中追加meta信息。然后,进行compact完成后的后续处理:校验生成的store file的合法性(详情见3.4.3);将文件移至列族目录中;构造store file对象,并为其创建reader对象(详情见3.4.3);为其设置表和列族信息;从正在compact store file文件集合和region的所有store file文件集合中移除此次已compact完成的文件;将新生成的store file加入region的store file集合中;通知所有store scanner,会使其扫库中断;调用STORESCANNER_COMPACTION_RACE事件;关闭compact完成的store file的读对象,清空其缓存,变比输入流;删除已compact完成的store file;重新统计region中的store file的总长度;至此store内部的compact处理完毕。

3.7 MSG_REGION_CF_MAJOR_COMPACT和MSG_REGION_CF_COMPACT

    首先,获取region,然后按照上面的处理过程,按列族生成compact处理线程并执行。至此,woker线程的处理分析完毕,如果这些处理失败,会将请求的重试次数+1,然后重新放回todo集合中,否则按照之前所述检测文件系统。

源码分析之灰太狼手札(二):HRegionServer线程run( )函数

mopishv0 发表了文章 0 个评论 1551 次浏览 2015-09-08 15:10 来自相关话题

二 HRegionServer线程run()函数 2.1 发送注册信息  代码起始位置:HRegionServer 705行。 ...查看全部
二 HRegionServer线程run()函数

2.1 发送注册信息 

代码起始位置:HRegionServer 705行。

run()函数首先进入第一阶段(自创名,由stopRequestedAtStageOne起名)。第一阶段是region server线程首次启动并向master注册,之后不断与master交互的阶段。

在函数开始时,首先重命名线程名,线程启动时首次重命名,后续都由reinitialize()重命名。之后只要stopRequestedAtStageOne为false就向master发送启动信息,调用reportForDuty,因为stopRequestedAtStageOne即可退出循环,因此在发送注册的逻辑中多次判断了此值,从而能多个等待的死循环中跳出注册信息的构造。

reportForDuty中首选获取master信息,调用getMaster,这是一个死循环逻辑,获取失败会睡一会儿(所有的睡一会儿操作都依赖于sleeper)。getMaster中也是一个死循环,它会先从zookeeper中获取master信息,此处链接zookeeper异常的话会强行退出进程。如果没有节点,则会立即continue(cpu会被吃满的啊……;拿到master地址后用过HBaseRPC.getProxy检查协议版本是否匹配(不匹配抛异常,吐槽一点,既然不匹配就抛异常,为什么又要缓存版本号呢,这样如果更新重启了还是照样会报版本不匹配的异常啊,意义何在啊……)、生成调用动态代理实例(由 org.apache.hadoop.hbase.ipc.HBaseRPC.Invoker.Invoker 进行封装),抛出异常时会睡一会儿。构造好HMasterRegionInterface实例后,保存到hbaseMaster和masterRef中,之所以要保存两份,我猜应当是应对不同锁级别需求的处理吧。然后getMaster会必然返回true,外围的死循环意义何在?

然后开始获取服务状态,统计内存使用,将自己的信息写入zookeeper,然后通知master本region server的启动信息并break出循环。如果zookeeper写失败,则会睡一会儿后继续循环。

退出reportForDuty后,会根据master返回的map初始化线程(这部分下次再分析)。如果maser没有响应则睡一会儿。如果睡一会儿后依然stopRequestedAtStageOne == false,则再次进入reportForDuty,如果一切没有变化则会在写zookeeper时失败(因为节点已经存在,返回false,不抛异常),然后陷入reportForDuty的第二个循环中。当stopRequestedAtStageOne被设置好后,退出循环但是因为没有向master发送请求,所以没有返回的map,因此不会重复进入初始化线程(也就是说,如果master有变化或zookeeper有变化,则会重走上述流程)。      

stopRequestedAtStageOne应当是在他处被设置成true的。由此引申出2个问题,stopRequestedAtStageOne何时被设置为true,master端处理regionServerStartup流程。

虽然明白编码不是一朝一夕的事儿,并且多人长期合作必然会导致部分没有意义的代码,但看到后依然还是会为浪费的时间吐槽,但不代表不能理解,详情参考《人月神话》。

2.2  Master响应regionServerStartup 

代码起始位置:HMaster 1270行。

上节说道HRegionServer线程run()函数之第一阶段中会向Master发送regionServerStartup请求,这次先说说master如何处理请求。

master端响应的主要目的检测region server启动信息是否有效并向region server返回一些必须参数。函数中首先从RPC请求中获取region server的host然后通过传递过来的serverInfo参数获取端口(为什么不都从serverInfo中获取呢?而且后续还会替换rs中的host,那前面的一系列host获取处理意义何在呢?不过0.90中这段逻辑已经被去掉了,可能真的没有意义吧)。regionServerStartup中,根据注释描述主要检测两种异常情况:在ZNODE失效前快速重启;region server正在被master做失效处理中。之后还要检测其是否在serverAddresstoServerInfo中,如果在则出发过期处理,下次请求时信息就应当已经serverAddresstoServerInfo中移除并在排队等待ProcessServerShutdown处理。那么代码中实现是如何的呢?

代码中首先从已知region server信息中查找是否已具有注册信息。如果有:判断两者的startcode(启动时间戳),注册信息小则返回YouAreDeadException异常,如果相等啥也不干。如果大,则进行region server过期处理:将其从已知region server信息中移除。并将其加入deadServers和getRegionServerOperationQueue做ProcessServerShutdown处理。如果没有:不作处理。之后如果regersion server正在做失效处理,则抛出YouAreDeadException。否则记录新regionserver信息:向集合中加入信息,创建zookeeper watcher。

检测完有效,会将master获取的regtion server的ip当做的hbase.regionserver.address、hbase.rootdir和fs.default.name返回给region server。

2.3 发送注册信息成功之后的线程初始化

代码起始位置:HRegionServer 714行。

上次分析了master响应regionServerStartup的流程,本次以该流程没有返回YouAreDeadException为假定,继续分析后续逻辑。返回YouAreDeadException的逻辑且听后续分解。

发送注册信息成功之后首先将master的返回值写入config;重新创建host信息;初始化mapred.task.id和fs.defaultFS;构造HDFS路径;构造HLog对象;构造metrics监控对象;如果需要则启动thrift服务;调用startServiceThreads启动各主要线程。

startServiceThreads的注释中说明:启动状态维护线程、客户请求处理线程(Server)、master请求响应线程(Worker)以及心跳检查(这里英文用的是租期检查)。构建一个未捕获到异常处理器,当线程中出现一个未捕获到异常时,退出region server。但这个处理器不会设置给所有的线程。客户请求处理线程是没有限制的,如果线程内出现了OOMException,它会等一会儿然后重试,与此同时,flush和compaction如果尝试运行的话,则会触发比较严重的状态,然后会运行关闭客户请求处理线程。心跳检查会同时运行,它是一个继承Chore的按时间间隔执行的线程,它按照自己的间隔停止机制,所以这个线程需要被机器停止。master请求响应线程会记录异常并退出。注释中说明了个线程的停止机制,写在这里当做参考,startServiceThreads的实现是怎样的呢?

startServiceThreads中首先构造一个未捕获到异常处理器(当有异常时,直接退出region server),将这个处理器设置给hlogRollers、workerThread、majorCompactionChecker并作为守护线程启动。然后启动心跳检测线程;启动客户端响应线程;启动多个splitLogWorker(负责Region恢复前,分发HLog中记录到具体Region目录的任务处理),数量由hbase.hregionserver.hlog.split.workers.num决定;构造扫库任务线程池scanPrefetchThreadPool。

至此发送注册信息成功之后的线程初始化就结束了,如果这阶段出现异常的话设置stopRequestedAtStageOne为true并抛出异常IOException。

2.4 与master的持续交互

代码起始位置:HRegionServer 724行。

目前看来StageOne比预想的复杂。初始化好各线程后,开始进入新的死循环逻辑:region server开始与master进行持续的交互。

循环中首先从zookeeper中获取root表所在的region server,如果没有变化则只获取一次。然后在大于消息发送间隔或有需要发送给master的消息(outboundMessages不为空)时,与master发生交互。在整理交互内容的过程中,首先收集内存使用情况和各region的负载情况。然后加入并去重需要发送给master的消息(此处也从HRegionServer的属性outboundMsgs获取消息)。然后向master发送请求regionServerReport,master也会有消息返回,返回的消息中可能包括对region server的操作指令。

消息发送完成后根据已发送的消息对本地待发送消息去重。此后,如果之前收到过关闭所有region的命令并且所有region已经关闭完成,此时会stopRequestedAtStageOne.set(true)并退出循环。

做完返回消息无关处理后,开始逐条处理返回消息。奇怪的是,每次处理消息都要将root表所在region server信息重置。之后根据返回消息的不同类型,将其分发到不同的处理队列或集合中,预计会由不同的线程进行处理。此处由swicth case处理,实质上个人认为既然已经使用了枚举类,将处理内容封装到枚举类中更好。目前包括的分发处理包括:MSG_REGIONSERVER_STOP、MSG_REGIONSERVER_QUIESCE(关闭所有用户region)、MSG_REGION_CLOSE、MSG_REGION_OPEN()、default,此段逻辑中的处理大都是去重然后放到toDo中,toDo中的消息后续由work线程处理。整个过程中有个restart一直起作用,并会在适时的时候通过stopRequestedAtStageOne判断是否需要退出死循环。

以上逻辑如果产生异常,会尝试将RemoteException转为普通异常。如果是YouAreDeadException则直接抛出,其他情况则尝试重试,每重试hbase.client.retries.number次会检测一次文件系统。检测文件系统有频率控制,会尝试访问根路径。如果访问错误,则会记录此轮首次错误时间,属性fsOk会被设为false,但不会重复检测,如果下次检测时fsOk依然为false并且此轮首次错误时间距现在超过hbase.regionserver.check.fs.abort.timeout则会结束region server进程。

如果顺利,会通过outboundMsgs的LinkedBlockingQueue.poll(long timeout, TimeUnit unit)方法睡一会儿,睡觉间隔的计算使得循环以固定速率执行(类似scheduleAtFixedRate)。睡醒后,会执行housekeeping方法(奇怪的命名),方法内容就是如果toDo(保存上述逻辑从master获取并需要执行的任务列表)中有MSG_REGION_OPEN,则在下一轮交互中加入已经对此作出响应的应答(注释中与此意义相同)。

至此,第一阶段分析完毕。如果此阶段发生异常,首先会检测是否是OOMException,如果是调用forceAbort结束region server进程,后续处理意义似乎也不大了,如果不是(大多是YouAreDeadException),调用abort结束region server进程。

2.5 服务结束

代码起始位置:HRegionServer 917行。

退出第一阶段后,HRegionServer线程即将结束,此处主要做一些通知master和清理、停止线程的工作。走入此流程的原因有三种:zookeeper得到异常或事件通知(其中会设置killed=true)、调用HRegionServer.abort(其中会设置abortRequested=true)和region server状态不健康。此段逻辑是用大量if区分不同原因导致的退出的处理,实质上可以用枚举集合或状态+组合模式划分代码。接下来先说说不同原因处理逻辑的特有部分吧。

zookeeper原因:kill所有hlogs线程。

调用HRegionServer.abort:不阻塞的向master汇报退出消息;close所有hlogs线程;关闭所有region;公有部分执行完后等待所有线程关闭,之后关闭文件系统。

region server状态不健康:不阻塞的向master汇报退出消息;关闭所有region;如果所有region关闭不成功的话,直接执行公有部分代码;closeAndDelete所有hlogs线程;通知master本region server正在重启(restartRequested)或退出;通知master关闭region;公有部分执行完后等待所有线程关闭,之后关闭文件系统。

接下来分析公有部分代码逻辑:首先停止各线程;然后停止向master的RPC代理;制空master对象;关闭zookeeper客户端。

此段逻辑比较简单,之后将对各线程的创建、运行和结束进行分析。

源码分析之灰太狼手札(一):HBase RegionServer

mopishv0 发表了文章 0 个评论 1725 次浏览 2015-09-08 15:06 来自相关话题

一 RegionServer启动和构造 1.1 启动      代码起始位置:HRegionServer 3730行。 ...查看全部
一 RegionServer启动和构造

1.1 启动 

    代码起始位置:HRegionServer 3730行。

启动RegionServer的主类是HRegionServer。

main函数中主要是设置config。doMain中将-D中的配置设置到config中。因此配置的优先级为,-D>hbase配置>hadoop配置。

之后判断命令内容,值得注意的是是不允许通过stop关闭RegionServer,需要通过bin/hbase-daemon.sh stop regionserver和Kill关闭。另外一个合法的命令就是start了。start中如果为LOCAL模式的话则不允许启动。HRegionServer的实例是通过反射构造出来的,类型可以配置到配置文件中。由于HRegionServer本身为Runable所以直接将生成的对象放到Thead中启动即可。启动的最后,替换了进程关闭钩子(原有钩子只在程序顺利启动前起效,会删除一些ZK连接、ZK路径和HDFS文件)。

启动流程到此就为止了。这部分比较简单,希望是个良好的开端。

1.2 HRegionServer构造函数 

    代码起始位置:HRegionServer 397行。

上篇说道HRegionServer是通过反射调用构造函数,这回我们来分析一下HRegionServer构造函数都做了什么。

首先通过配置构造machineName(ip或域名),如果"hbase.regionserver.dns.interface"值为"default",则使用hadoop中的自解析域名。如果"hbase.regionserver.dns.nameserver"为"default"则把他设成null再递归地调用一次获取machineName(意义何在啊……)。如果两者都不为"default"则根据"hbase.regionserver.dns.interface"配置的网卡名获取ip。machineName获取完成后拼接配置的端口号,生成HServerAddress对象(里面要将拼接好的字符串再解开,意义何在啊……),这里会将ip再次解析成域名。

然后设置一些变量的初始值:通过this.abortRequested = false;    this.fsOk = true;猜测,文件系统不可访问时的逻辑和文件系统正常的逻辑有可能搅在了一起;根据config对象的哈希值(^config中每个Entry的哈希值)生成ServerConnection对象(实现为org.apache.hadoop.hbase.client.HConnectionManager.TableServers,它的注释为 Encapsulates finding the servers for an HBase instance ,与处理客户端请求相关);从配置中读取重试次数、responseSizeLimit等值;生成定长sleep类Sleeper的实例;生成执行master命令的Worker类实例;生成Region开启关闭处理线程池实例;生成我猜是用来处理请求的线程池PreloadThreadPool类的实例。

在众多设置变量初始值的语句中,有一行reinitialize();想必这既是重中之重了。函数中先是设置了一些变量的值,这些值的生存周期(与服务共生死)应当与构造函数中的(与进程共生死)不同:构造RPC响应服务实例server用并获得临时端口号;初始化服务状态类HServerInfo对象;重命名线程名;初始化zookeeper客户端和监听;初始化workerThread(为上述Worker类实例创建线程);初始化flush memstore线程MemStoreFlusher类实例;初始化compact与split响应类CompactSplitThread(并非一个线程)实例;向配置更新监听类ConfigurationObserver对象注册通知compactSplitThread等对象;根据配置创建多个hlog滚动线程LogRoller类实例;初始化MajorCompactionChecker类实例(封装的线程);this.leases = new Leases,似乎与请求的超时处理有关。

此处有一个对象的初始化比较有意思reservedSpace,从代码引用上和注释上看,只有在退出时会被释放。其他时间并不会用到这个变量。猜测其设计目的是为了给OOM Exception出现时预留内存,从而使进程能够有足够的内存保证正常退出。正常退出对于保存关键信息的服务或存储服务来说十分重要,非正常退出极易丢失数据。

通过这次的分析,大致能够了解HRegionServer提供服务所依赖的线程了。下次计划分析run函数,后续再逐一分析各线程和zookeeper监听。

president_poll的代码在资料下载中没有找到

回复

fish 回复了问题 2 人关注 1 个回复 1512 次浏览 2017-02-18 22:37 来自相关话题

推荐个好的源码阅读工具

回复

mopishv0 回复了问题 4 人关注 3 个回复 2680 次浏览 2016-03-31 11:33 来自相关话题

hadoop 源码求解 RecordReader initialize方法如何调用

回复

VanquisherCsn 发起了问题 1 人关注 0 个回复 3288 次浏览 2015-09-16 10:37 来自相关话题

VersionInfo何时初始化的,怎么初始化的

回复

VanquisherCsn 发起了问题 1 人关注 0 个回复 1863 次浏览 2015-09-16 09:47 来自相关话题

Hadoop源码分析之心跳机制

唐半张 发表了文章 0 个评论 1872 次浏览 2015-09-30 11:05 来自相关话题

一、心跳机制 1、hadoop集群是master/slave模式,master包括Namenode和Jobtracker,slave包括Datanode和T ...查看全部
一、心跳机制
1、hadoop集群是master/slave模式,master包括Namenode和Jobtracker,slave包括Datanode和Tasktracker。
2、master启动的时候,会开一个ipc server在那里,等待slave心跳。
3、slave启动时,会连接master,并每隔3秒钟主动向master发送一个“心跳”,这个时间可 以通过”heartbeat.recheck.interval”属性来设置。将自己的状态信息告诉master,然后master也是通过这个心跳的返回值,向slave节点传达指令。
4、需要指出的是:namenode与datanode之间的通信,jobtracker与tasktracker之间的通信,都是通过“心跳”完成的。
二、Datanode、Namenode心跳源码分析
既然“心跳”是Datanode主动给Namenode发送的。那Datanode是怎么样发送的呢?下面贴出Datanode.class中的关键代码:
代码一:
/**
* 循环调用“发送心跳”方法,直到shutdown
* 调用远程Namenode的方法
*/
public void offerService() throws Exception {
•••
while (shouldRun) {
try {
long startTime = now();
// heartBeatInterval是在启动Datanode时根据配置文件设置的,是心跳间隔时间
if (startTime - lastHeartbeat > heartBeatInterval) {
lastHeartbeat = startTime;
//Datanode发送心跳
DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
data.getCapacity(),
data.getDfsUsed(),
data.getRemaining(),
xmitsInProgress.get(),
getXceiverCount());
myMetrics.addHeartBeat(now() - startTime);

if (!processCommand(cmds))
continue;
}

•••
}
} // while (shouldRun)
} // offerService

需要注意的是:发送心跳的对象并不是datanode,而是一个名为namenode的对象,难道在datanode端就直接有个namenode的引用吗?其实不然,我们来看看这个namenode吧:
代码二:
public DatanodeProtocol namenode = null;

namenode其实是一个DatanodeProtocol的引用,在对hadoop RPC机制分析的文章中我提到过,这是一个Datanode和Namenode通信的协议,其中有许多未实现的接口方法,sendHeartbeat()就是其中的一个。下面看看这个namenode对象是怎么被实例化的吧:
代码三:
this.namenode = (DatanodeProtocol)   
RPC.waitForProxy(DatanodeProtocol.class,
DatanodeProtocol.versionID,
nameNodeAddr,
conf);
其实这个namenode并不是Namenode的一个对象,而只是一个Datanode端对Namenode的代理对象,正是这个代理完成了“心跳”。代理的底层实现就是RPC机制了。

源码分析之灰太狼手札(六):HBase的客户端请求处理

mopishv0 发表了文章 0 个评论 3107 次浏览 2015-09-08 15:28 来自相关话题

六 客户端请求处理 6.1 Region相关操作     代码起始位置:HRegionInterface.java 57、79、85 ...查看全部
六 客户端请求处理

6.1 Region相关操作

    代码起始位置:HRegionInterface.java 57、79、85、90、98、104、110、116、124、133、141、149行。

    Region的相关操作都比较简单,基本上就是对集合信息的过滤与获取,看一眼就能了解主要逻辑了。HRegionInterface 的实现类是HRegionServer,接口方法对应的实现十分好找。因此,此处就忽略以上开始行数的api说明了。

6.2 getClosestRowBefore

    代码起始位置:HRegionInterface.java 71行。

    该函数获取符合参数的所有数据或一条最接近且小于参数row key的数据。代码中首先获取region对象;判断key值是否在该region的范围内;为锁对象splitsAndClosesLock加上读锁;根据列名获取store对象;构造获取最接近行的状态记录对象,其中会初始化key特征信息、删除记录集合、如果是meta表还会计算表名分隔符位置;再次获取store中的锁对象,获取读锁。在store中的整个查找过程中,由一个GetClosestRowBeforeTracker类对象负责记录候选记录。

首先,从memstore中获取最接近的记录,根据3.4中的逻辑,memstore中的数据保存在两个有序集合中(数据集合和用于flush的快照集合,集合中的记录按照key升序、按照进入memstore的时间降序),针对每个集合做相同的操作。先根据目标key获得大于等于此key的集合子集,对于其中不大于key值的记录(也就是只处理等于该值的记录了),如果遇到已经超过TTL的记录,则从原集合中删除;如果遇到删除操作,则先缓存删除操作,如果候选记录命中缓存的删除操作的key,则进一步判断是否时间戳相等或为删除列或列族,如果匹配则移除候选;如果遇到非删除操作,则先判断其是否命中之前的删除操作,然后判断其是否更接近且不大于目标key,如果是则替换候选值。如果找到能替换候选值的记录,则不再进一步查找。如果在大于等于目标值的集合中,没有找到候选记录,则从小于目标值的子集中开始查找,直至找到非目标表的记录或首个不如候选记录的记录,对于子集中的记录,处理过程和处理大于等于目标子集的逻辑一样(吐槽:甚至有点牵强附会,为了重用搞了一些不符合逻辑的代码)。

然后,遍历每个storefile对象。首先确定文件中的最后一个key值与目标值中较小的确定为起始key(奇怪的是这里还要判断是否不是目标表);构造扫描器(详细内容在第六章进行分析其构造过程);将扫描器seek到起始key(其中有一句逻辑似乎没用);接下来的逻辑和memstore中很像,先向后找直到不等于起始key,然后再向前找直到与目标表不相等或不优于候选值。

随着函数的退出,释放锁。拿到候选值后,进行一次get操作(get细节见后续分析)。

6.3 get

    代码起始位置:HRegionInterface.java 158、160行。

    多个get批量执行,实际上就是循环执行get,除去锁,我觉得批量get是可以优化的。接下来分析单个get请求的处理。

    获取到对应region后,如果有行锁,则延长行锁的租期,开始get处理。首先,如果get请求对象中有列族过滤需求则通过表描述对象先检测列族是否存在。否则,将所有现有列族添加到get请求对象中(吐槽:lock id并没用处)。

    接下来将get请求转化为scanner操作。首先根据get请求对象构造scan请求对象,逻辑比较简单,就是将各个参数对应的赋值;然后开始构造扫描器InternalScanner类对象,构造前后会获取newScannerLock读锁,此时首先依然会进行一次get请求已做过的列族合法性检测(吐槽:那之前的逻辑真是多余);

6.4 写入

6.4.1 客户端部分—Put


    代码起始位置0.98.13  HTable.java 983行。

    首先判断之前的异步flush是否出现异常,如果出现异常,则将put加入异步写入缓冲中并同步执行flushcommits(backgroundFlushCommits方法)。诡异的是,处理到此并没有直接返回,异常也没有处理,而是继续进入没有异常的逻辑:调研Put大小是否超过hbase.client.keyvalue.maxsize(默认值-1,TableConfiguration:76,HTable:996),然后再次将put加入写缓冲,如果当前缓冲中数据长度大于hbase.client.write.buffer(默认值2097152,,TableConfiguration:58,HTable:1001)则进行一次非阻塞flushcommits。最后如果设置autoflush,则进行一次阻塞flushcommits。在flushcommits中,将写入缓冲交给异步处理对象(AsyncProcess.submit)处理,根据是否阻塞决定是否等待处理完成,如果发生异常,则阻塞的等待剩余操作执行完毕,根据失败后是否清空缓冲(clearBufferOnFail) 确定是否将失败操作重新放入缓冲。最后,重新统计缓冲中的数据长度。

    代码起始位置0.98.13  AsyncProcess.java 285行。

    接下来分析AsyncProcess接收到put缓冲后,向RS发送请求前的处理过程。代码中先做流控处理,如果正在处理的任务(之前发送的,每个rs的一次rpc视作一个任务)数大于等于hbase.client.max.total.tasks(默认值100,AsyncProcess:239,AsyncProcess:323),则等待一个任务执行完成;然后遍历处理每个请求,首先根据rowkey找到所在region的HRegionLocation对象,然后按以下条件过滤当前不可操作的请求,被过滤掉的请求会保留在缓冲中,参与下次flushcommits:

  •  每个region正在处理的任务数大于等于hbase.client.max.perregion.tasks(默认值1 ,AsyncProcess:241,AsyncProcess:435);

  •  region所在rs已被判为不可用;

  • 判断当前正在处理的任务数加上当前准备发送的任务(为每个已知rs生成一个任务)是否小于hbase.client.max.total.tasks,否则rs判为不可用,不再为新遇到的rs生成任务;

  • 请求的目标rs的任务数大于等于hbase.client.max.perserver.tasks(默认值2 ,AsyncProcess:243,AsyncProcess:455),如果大于,rs判为不可用;



对于没被过滤的请求,会进行进一步封装并和HRegionLocation对象建立映射关系以便后续发送RPC(代码逻辑中有对于Increase和Append的预防重复提交的逻辑,但是这两类请求的批量请求处理,已不由AsyncProcess.submit函数处理,而是由AsyncProcess.submitAll处理,所以相关逻辑后续介绍)。所有请求整理完成后,进入发送请求部分。

6.4.2 客户端部分—批量Delete、batch

代码起始位置0.98.13  HTable.java 886行。

批量Delete直接调用batch。所以我们直接分析batch即可。可处理batch操作的有多个函数,这里我们分析的是public void batchCallback( final List actions, final Object[] results, final Batch.Callback callback),因为这些函数最终都是调用的该函数,而该函数调用的HConnection.processBatchCallback,这个接口的实现类为HConnectionImplementation。在HConnectionImplementation.processBatchCallback中,主要逻辑只有构造AsyncProcess对象并调用submitAll和waitUntilDone阻塞的发起RPC。submitAll逻辑和上面说的submit大体相似,区别为:

  • 没有请求过滤,所有请求都会被发送;

  • 为防止Increase和Append请求被重复处理,在hbase.client.nonces.enabled(默认值为true,HConnectionImplementation:761,)被设置为true会被加上Nonce。Nonce的本质是给请求附带一个随机数,rs可以根据此随机数防止重复提交(rs可以单方面禁用Nonce,也可以配置Nonce值缓存时间,详见服务端分析)。



6.4.3 客户端部分—发送批量请求

代码起始位置0.98.13  AsyncProcess.java 542行。

代码主要可以分为两部分:构造线程和线程执行内容。首先来看构造线程,代码中遍历之前构建好的HRegionLocation对象及其请求列表映射,对其中的每个元素,统计、缓存rs和region正在执行的任务数;构造对应的任务线程;当线程构造失败时,减去相应的任务数,并进入尝试重试逻辑。

在构造线程时,有一个默认关闭的特性:当hbase.client.backpressure.enabled(默认值为false,ServerStatisticTracker:63,AsyncProcess:629)为ture时,客户端会根据服务端压力和客户端配置的策略,生成带延时的Runnable对象,用以动态缓解服务端压力。

在重试逻辑中,先尝试根据异常重新加载region位置,RegionMovedException中带有region新位置信息,RegionTooBusyException和RegionOpeningException时位置不变,其他异常移除原有region位置缓存;根据hbase.client.retries.number(默认值为31,AsyncProcess:243,AsyncProcess:753)和hbase.client.pause(默认值100,AsyncProcess:239,AsyncProcess:753,并非单纯的阈值,而是根据重试次数计算得出总的超时时间)确定是否还可以重试;通过回调函数对象(此段逻辑中为ObjectResultFiller类对象)确定请求是否可以重试(此逻辑中总是可以重试);然后根据重试次数和hbase.client.pause计算重试前休眠时间,休眠后重新调用submit再次发起请求。

代码起始位置0.98.13  AsyncProcess.java 580行。

接下来我们来分析执行内容。由于请求已经按rs组织好,所以每个线程都只处理向一个rs发送的RPC。代码中首先构造封装过的Callable对象;然后RpcRetryingCaller类(如果hbase.client.backpressure.enabled配置为true,则使用其子类)对象进行调用,整个调用过程,超时时间为hbase.rpc.timeout(默认值为60000ms,AsyncProcess:235,RpcRetryingCaller:753)。如果出现异常调用上述重试逻辑,并终止处理;如果请求正常,则开始处理返回结果,主要是将结果转成batch请求回调函数可用的参数,然后调用对应的回调函数处理结果的组装;然后减小rs、region对应的执行中的任务数。

调用过程:重置超时时间,使其最小不小于两秒;判断rs是否在故障节点缓存中,如果存在抛出异常;如果对rs的客户端没有缓存,则构造实际的客户端(RpcClient.BlockingRpcChannelImplementation.BlockingRpcChannelImplementation类对象),然后包装进protobuf生成类BlockingStub;然后开始通过protobuf生成的各种Builder类组装请求,根据hbase.client.rpc.codec(默认值为空字符串,MultiServerCallable:133)确定请求值的组装位置(如果不为空则集中封装到cellblock,服务端可使用cellscanner遍历);通过protobuf发送multi函数的RPC请求;然后将返回的protobuf的MultiResponse转为自定义的MultiResponse类对象并返回。

6.4.4 客户端部分—Delete、Increase、Append、mutateRow、checkAnd**等

这几个方法实际上就是发起一次独立的rpc调用,过程类似上面所说的执行内容,单个KV请求发送mutate函数的RPC请求,多个则发送multi函数的RPC请求。

对于checkAnd**会额外传入条件参数,服务端会负责进行检测和处理。

6.4.5 客户端部分—根据rowkey获取HRegionLocation

代码起始位置0.98.13  HConnectionManager.HConnectionImplementation.java 1114行。

如果是获取meta表region的物理位置,则从zookeeper读取,超时时间为hbase.rpc.timeout配置值,节点为zookeeper.znode.parent(默认值为/hbase,ZooKeeperWatcher:321,ZooKeeperRegistry:58)和zookeeper.znode.metaserver(默认值为meta-region-server,ZooKeeperWatcher:324,ZooKeeperRegistry:58)两个配置的连接。如果无法读取,则会一直阻塞直到超时。

如果是读取用户表,则先从缓存中拿到rowkey最接近的记录,并比较确认是否是符合要求的记录;如果缓存中不存在,则从meta表中获取:首先获取meta表位置,然后向meta表所载的rs发送getRowOrBefore请求,然后根据返回的region状态,确定是否能当做正常返回结果,如果出现异常则最多重试hbase.client.retries.number次。

6.4.6 服务端部分—mutate 、multi函数响应

代码起始位置0.98.13 HRegionServer.java 3545行。

mutate可以看做是multi的简化版,处理过程不多,即并入multi的处理过程了,所以这里着重分析multi的实现。

首先检测rs是否正常运行中,然后遍历每个region的请求并获取相关HRegion对象,如果region不存在或正在打开,则抛出异常;接下来会有比较多的分支,主要由于不同请求都由一个函数相应造成:

1. checkAndMutate:串行处理请求,目前只支持put和delete,首先根据数据是否保存在cellblock,将protobuf中的信息转换为HBase的请求对象;然后判断region是否是只读状态,region是否因为memstore写满而无法继续相应写请求,region是否正在修复或关闭,进行协处理器处理;为正在处理的数据行加读锁并等待mvcc中到目前为止的写入操作处理结束;然后进行一次get处理,获取到相应的需要check的值并check;如果check通过则调用HRegion对象的mutateRows;最后释放读锁,并调用协处理器。该方法返回请求是否被处理。

2. mutateRow:目前只支持put和delete,直接调用HRegion对象的mutateRows。该方法返回region负载。

3.其他请求(包括get):串行的处理每个请求,对于get或协处理器,执行相应处理。对于写入请求,目前支持append、increment、put和delete,对于put和delete将请求放入集合mutations中稍后批量处理,对于append和increment,先将mutations中的请求处理完毕,然后再调用append或increment,然后将返回结果或异常组织为protobuf对象;最后批量处理put和delete请求(处理内容与mutateRows十分类似,只是顺序不同,代码不同)。这些方法返回CellScannable接口(一般是Result类)对象。

处理完成后,组装protobuf对象,然后返回结果。

6.4.6.1 mutateRows

代码起始位置0.98.13 HRegion.java 5054行。

接下来我们来看Region对象的mutateRows方法的处理过程,注释中将处理分为14步,第一部分预处理:检测rowkey是否符合region范围;检测region是否可写;检测region是否正在关闭;调用协处理器。 

第二、三部分,是批量加行锁:对于每一个rowkey,尝试获取行锁对象,如果行锁对象在其他线程存在,则最多等待hbase.rowlock.wait.duration(默认值30000,HRegion:602,HRegion:3698)毫秒;然后尝试加HRegion对象中updatesLock的读锁,最多等待Math.min{hbase.ipc.client.call.purge.timeout(默认值60000,HRegion:621,HRegion:6255), hbase.busy.wait.duration(默认值60000,HRegion:613,HRegion:6255)* Math.min[行锁个数, hbase.busy.wait.multiplier.max(默认值2,HRegion:615,HRegion:6255)]}毫秒。以上等待超时后,均抛出异常。

第四部分,预处理数据和WAL:对于put,检测列族是否存在,检测每个KeyValue的时间戳范围是否超过当前服务端时间加上hbase.hregion.keyvalue.timestamp.slop.millisecs(默认值Long.MAX_VALUE,HRegion:630,HRegion:3201),如果客户端没有给KeyValue设定时间戳,则设置为当前时间;对于delete,如果没有提供列族,则用所有列族补全,否则检测列族是否存在,对于每个KeyValue,如果删除操作是删除具体列中的数据,则需要先将列中最新的数据get出来,然后根据结果写入时间戳,其他删除操作的时间戳直接使用当前时间;如果没有跳过WAL,将处理过的KeyValue保存到WALEdit类的对象中;再次调用协处理器。

第五部分,获取mvcc值。第六部分,调用协处理器,并将协处理器生成的写入操作加入WALEdit对象。

第七部分,写入memstore:如果启用了tag,则先填充tag;然后给每个KeyValue填充mvcc值;然后将KeyValue对象传入列族对应的MemStore对象中;在MemStore构造中,根据hbase.hregion.memstore.mslab.enabled(默认值为true,MemStore:123,MemStore:230) 确定是否开启memstore内存池,如果开启,根据hbase.hregion.memstore.chunkpool.maxsize(默认值0.0,MemStoreChunkPool:185,MemStore:230)、memstorelimit的两个配置、hbase.hregion.memstore.mslab.chunksize(默认值2097152,MemStoreChunkPool:199,MemStore:230)以及hbase.hregion.memstore.chunkpool.initialsize(默认值0.0,MemStoreChunkPool:203,MemStore:230)确定是否开启和如何构造memstore chunk pool,根据hbase.hregion.memstore.mslab.max.allocation(默认值262144,MemStoreLAB:89,MemStore:230)确定内存池接收的对象最大长度;基于二者全开的情况下,当对象传入MemStore时,首先尝试获取当前空余chunk,如果没有则通过memstore chunk pool获取一个空闲的或者申请一个新的chunk对象,初始化并加入到chunk队列中;拿到chunk后,尝试基于当前偏移量申请占用内存,如果内存不足,则重新获取chunk重试;然后开始拷贝数据,重新构造KeyValue对象(值得注意的是,内存池这部分逻辑包含典型的利用CAS的无锁编程,如果CAS失败则会循环重试);然后保存到MemStore的内存中,更新memstore中数据的时间戳范围和总数据长度。

第八部分,写入hlog:首先获取seqId并进行封装;然后调用各种协处理,并把封装后的对象放入待写入列表中。

第九部分,释放updatesLock的读锁。第十部分,释放行锁。

第十一部分,根据region和操作中最高的Durability设定进行WAL同步。如果为同步写入,则通过同步结果通知线程等待写入线程WAL完成;写入线程所用的hdfs writer,每次rollhlog时重生成,实现类根据hbase.regionserver.hlog.writer.impl(默认值org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter,HLogFactory:204,FSHLog:556)确定;对于写入线程,只要需要写入的seqid大于已写入id,线程就开始写入,写入前重新生成写入列表,然后将原列表中的内容写入writer缓冲中;写入完成后,选择一个空闲的sync线程,如果没有则按照最后的seqid取模取一个(其数量取决于hbase.hlog.asyncer.number(默认值为5,FSHLog:438,FSHLog:1158)),然后通知其已写入的最新的seqid;对于sync线程,每当已写入seqid大于已同步seqid,则会调用hdfs writer的对应同步方法,通知同步结果通知线程最新同步的seqid,然后检测是否需要rollhlog;同步结果通知结果线程每当最新同步seqid更新时,都会更新共享变量,然后唤醒阻塞线程。

第十二和十四部分,再次调用协处理器。第十三部分,更新mvcc最近写入完成的mvcc值。

最后,检测是否需要flush,如果需要则触发一次flush。

6.4.6.2 append和increment

代码起始位置0.98.13 HRegion.java 4421行。

append和increment处理逻辑非常类似,所以这里主要说明append的处理过程。代码中首先将protobuf类转为请求类对象;然后调用协处理器;然后如果服务端开启了hbase.regionserver.nonces.enabled(默认值true,HRegion:576,HRegion:4488)根据nonce获取之前的相同操作,如果没有,则将本操作放入共享集合继续及执行,如果有则等待之前的操作处理结束,根据上个操作的结果确定是否可继续执行;后续真正处理append的过程和6.4.6.1中类似,只不过是需要先通过get读取原值,然后再进一步处理得出新值;append处理完成后,向共享集合中写入结果,更新清理时间;最后调用协处理器。

6.4.7 简要调用关系图



3.png


 

源码分析之灰太狼手札(五):HBase RPC机制

mopishv0 发表了文章 0 个评论 2074 次浏览 2015-09-08 15:22 来自相关话题

五 HBase RPC机制     本章以region server为例讲解HBase的RPC机制。 5.1 初始化和启动 ...查看全部
五 HBase RPC机制

    本章以region server为例讲解HBase的RPC机制。

5.1 初始化和启动

代码起始位置:HRegionServer.java 506行。

    服务实例的创建简单粗暴,直接new出org.apache.hadoop.hbase.ipc.HBaseRPC.Server的实例,其父类为org.apache.hadoop.hbase.ipc.HBaseServer。

    父类构造函数的初始化流程为:初始化监听地址(域名或ip)、端口、配置对象、请求参数实现类类名、请求处理线程数(handler);设置socket发送缓冲长度为0;初始化请求队列;根据max(.small、large和无).callqueue.memory.size、hbase.server.multithrottler和small.queue.request.limit构造请求流控对象(流控逻辑后续分析);设置客户端最大空闲时间、每次最多回收连接数以及回收后预期空闲连接数;根据ipc.server.tcpnodelay和ipc.server.tcpkeepalive设置tcp连接处理方式(Nagle's Algorithm和keepalive?);根据ipc.server.response.queue.maxsize构造请求响应返回流控;构造请求监听线程和请求响应返回线程,请求监听线程构造较为复杂,下面对他的构造进行分析。

    首先,构造一个套接字通道实例(nio的ServerSocketChannel类)并设置为非阻塞模式;绑定socket监听地址和端口,并根据ipc.server.listen.queue.size指定侦听backlog长度;构造选择器实例(nio的Selector类)并向套接字通道实例注册;设置线程名,并指定为守护线程;ipc.server.reader.count构造指定个数的连接读取线程;Listener将得到的请求连接直接交给Reader线程读取,其构造比较简单:设置为守护线程并设置线程名,构造连接对象列表,构造一个未注册的选择器实例。

    至此,基类中的初始化就结束了,接下来分析Server中的初始化操作:设置方法调用实例(此处为HRegionServer实例)及其类型;设置每个请求是否都打印log(默认设置为false);根据hbase.ipc.warn.response.time设置响应过慢时间阈值;根据hbase.ipc.warn.response.size设置请求过多阈值;构造参数说明类。

    最后是线程的启动,比较简单,依次启动响应发送线程,监听线程和请求处理线程(handler)。

5.2 请求监听&分发线程:Listener的运行

代码起始位置:HBaseServer.java 407行。

    从选择器中获取有新socket连接的选择器键;通过每个选择器键获取ServerSocketChannel实例,并通过其尝试获取10次SocketChannel实例;获取后设置连接处理方式(Nagle's Algorithm和keepalive);如果获取成功,则创建请求连接实例,并将其分发给Reader线程、加到连接集合中;最后是输出日志。

    当running为false时,线程结束运行。线程首先关闭socket连接器和通道(调用对应的close函数);然后统计每个请求未响应的的总字节数,并将连接的响应队列清空;减少响应流控对象中的统计值(由于会有某些对象wait在此统计值,所以即使线程退出也要减少统计值从而notifyAll)。

5.3 请求读取线程:Reader的运行

代码起始位置:HBaseServer.java 636行。

    线程首次执行时,会从选择器中获取选择键(SelectionKey),由于没有进行过注册,所以必然没有办法得到结果,所以跳过接下来的处理,先进入连接注册逻辑;注册逻辑是对列表newConnections进行同步的,首先遍历其中元素,对于每个元素拿到其中的通道并注册选择器,向选择键中附加对应的连接对象,便利后清空列表;注册逻辑过后循环结束,如果进程没有终止则会继续循环;注册过的选择器,是能够获取选择键的(有请求时);处理过程中遍历获得的选择键集合,移除选择键后进行真正的请求读取处理。

    先判断选择键是否有效且可读;然后拿到刚刚附加到选择键中的连接对象;标记连接对象最后的访问时间;之后在连接对象中做进一步处理;处理中首先读取4个字节的HEADER信息(合法值为” hrpc”);然后读取1个字节的版本信息并判断其是否符合当前支持的范围;接下来读取数据长度,如果长度为-1则代表此次是一个PING消息,否则构造响应长度的缓冲并记录rpc调用次数;然后开始首次读取内容,每次最多读取64K字节直到读完指定长度;第一次读取出的请求内容是用户信息;接下来再次读取的内容就是请求内容了;最后将构造待解析请求对象放入队列并进行流控信息的标记,请求处理读取就结束了。吐槽:这段逻辑用了一个大的while(true)来处理多种情况,循环会被执行多次,但是每次循环执行的逻辑又不同,莫不如顺序流程可读性更高,还不容易出错。

5.4 请求处理线程:Handler的运行

代码起始位置:HBaseServer.java 1215行。

    线程的循环中,首先取出一个请求,对流控对象进行标记,然后开始对请求进行解析:解析中先读取请求id,如果版本符合,则会读取解压算法、生成解压后的输入流以及获取是否需要记录操作状态(profile);构造参数读取类实例(Region Sever中为org.apache.hadoop.hbase.ipc.HBaseRPC.Invocation),并使其解析请求参数,解析时先读取方法名,然后依次读出参数对象(遵循Writable接口的对象),并获取其类型;构造已解析请求对象,设置RPC压缩算法、连接版本、是否记录操作内容;送回解压对象。

    解析后,设置handler状态,设置请求上下文对象,然后开始调用call函数,此函数由子类提供实现,接下来分析Region Server中此函数的实现过程:首先通过构造时设置的实现类用反射获取调用方法对象;然后记录执行状态,如果需要记录操作状态的话则记录方法名和请求被处理前所消耗的时间;然后用反射调用具体函数;调用完成后再次记录执行信息;将返回对象封装为HbaseObjectWritable;如果响应时间过长或返回结果过长,则打印详细日志并记录监控信息;至此,基类中的实现结束。

    后续的处理中如果请求抛出异常,则进行记录;如果结果是可计算长度的,则将长度加上一个int和byte的长度(加上id和是否出现异常的标记长度),如果长度过长则将结果变为异常;最后将结果写入流,首先写入回话id,是否出现异常;如果版本允许对响应结果进行压缩则获得压缩流;然后将结果与操作状态(profile)或异常写入流中;将数据流赋值给已解析请求对象后,调用响应返回线程的处理响应方法。

    该方法中,首先标记流控;然后如果请求连接没有关闭的话,则将请求放入连接响应队列中;如果响应队列长度为1,则立即处理响应(见5.5)。

5.5 响应返回线程:Responder的运行

代码起始位置:HBaseServer.java 920行。

    一般请求第一次都是从上阶段的接口处开始首次返回响应的,所以这里我们先分析一个链接的首次返回(看代码中一个链接是不可以发送多次请求的)。此处首先拿出链接对应的Socket通道,将返回值写入通道中,每次最多写入64K;如果已写完则减少RPC计数,如果未写完且由Handler进入该函数则注册写入通道,由线程进一步处理。

    线程中,首先不断从刚注册的通道中获取选择器,并从通道中移除选择键;针对每个键,做以上处理;每隔900000毫秒,会从当前的通道中获取选择键,并遍历其中连接对象,遇到第一个请求收到时间至当前时间大于900000的请求则关闭其连接并跳出遍历。

    奇怪的是,看响应线程的处理,每个连接应当能处理多个请求的,但是在请求监听和读取中并没有看到相关处理。 看了下blame,之前每次发送响应请求不一定发送全部内容,所以丢给一个线程慢慢的发送后续结果,而且依据连接对象中记录rpc次数的变量来看,之前连接对象应该是可以复用的。

    整个RPC过程到此就为止了,过程中有各种流控对象,其实现比较简单。当增加流量时,先获取当前值,如果此值已经大于阈值则同步流控对象再次获取当前值并判断(double check的应用),如果依然大于阈值则wait;其他地方减少流量值时,会notifyall,此时wait退出;然后利用AtomicLong.compareAndSet接口写入新值,如果写入失败则重复此逻辑。此段逻辑做到了同步块范围最小,因此需要在多个地方进行检查,更多的可以百度”无锁编程”。

源码分析之灰太狼手札(四):HDFS IO

mopishv0 发表了文章 0 个评论 2102 次浏览 2015-09-08 15:20 来自相关话题

四 HDFS IO  4.1 L2缓存  4.1.1 构造 代码起始位置:Store 259行。     如3.1.3所述 ...查看全部
四 HDFS IO

 4.1 L2缓存

 4.1.1 构造


代码起始位置:Store 259行。

    如3.1.3所述构造CacheConfigBuilder实例时创建L2BucketCacheFactory实例(目前没有多种实现的配置),然后在build出CacheConfig的同时,利用构造L2Cache。此处需要注意一个配置,"hfile.l2.cacheblocksonwrite"(L2_CACHE_BLOCKS_ON_FLUSH_KEY),此配置默认为true,也就是说在flush的时候默认会将block写入缓存,对于SSD做缓存来说是致命的,需要配置为false。

    实例化L2Cache对象时,首先根据配置hfile.l2.bucketcache.ioengine(根据注释,只能有heap和offheap两个值)决定使用内存位置,如果配置为空(也是默认值),则认为禁用L2缓存;然后根据其配置获取对应内存区域的最大内存值;根据hfile.l2.bucketcache.size计算L2缓存大小(配置浮点数和整数的含义不同,前者为最大内存值的比例,后者为占用几MB内存);根据hfile.l2.bucketcache.writer.threads、hfile.l2.bucketcache.queue.length和hfile.l2.bucketcache.ioengine.errors.tolerated.duration配置缓存写入线程数、写入队列长度和发生异常时最长可容忍的时长;根据hfile.l2.bucketcache.bucket.sizes设置每个数据缓存层级大小(此处桶,也就是bucket代表一个缓存数据集合,其长度类似redis中内存按4K、8K、16K…的分级);构造数据缓存桶类BucketCache实例;构造L2缓存类L2BucketCache实例。

    L2BucketCache实际上是BucketCache的壳子,目的是为了提供L2Cache接口的具体实现;所以更复杂的初始化操作包含在BucketCache类中。首先,设置之前的参数,根据ioengine名构造具体的IOEngine对象(实现类为ByteBufferIOEngine);然后,如果做缓存最大只支持32T的校验;按照8K设置一个block的最小值得预设值;初始化存储管理类,其中主要包括缓存长度信息(其中包括该长度中的所有桶列表、有空余空间的桶列表、未缓存block的桶列表),数据桶信息并对其进行初始化(根据总长度和此层长度划分出缓存数组类似memcache的结构,这里指规划在总体缓存中的偏移量),按顺序每级缓存长度一个数据桶,多出的全部分给最后一级;为每个线程创建写入队列和标记信号对象;构造正要写入缓存的对象Map和缓存信息Map;构造写入线程;创建定时统计任务,此任务只负责打印缓存状态日志。

4.1.2 写操作

代码起始位置:L2Cache.java 45行。

    首先来看看接口中cacheRawBlock函数的注释:添加块到L2缓存。该块必须由一个可被写入到磁盘的字节数组来表示。可见,接口设计方面考虑到了写入磁盘的可能性。

接下来分析一下具体实现:首先,将hfile文件名和偏移量封装缓存键值对象;然后将键值和块内容的字节数组交给BucketCache对象处理;如果正要写入缓存Map中或缓存信息Map中有相关记录则直接退出函数;将字节数组、键值是否保存至内存(默认为false)和缓存序列号封装为缓存值对象并保存至正要写入缓存Map中;根据键值对象的哈希值选择并放入缓存写入队列;如果放入队列成功则增加缓存计数,增加缓存长度大小,保存hfile到缓存键值的映射关系;如果放入队列失败则等待50ms再次尝试,如果依然失败则从正要写入缓存Map中移除并增加失败计数。

进一步保存由WriterThread处理。线程中首先将队列中所有缓存对象取出;然后针对每个对象写入缓存:先找到合适的桶大小(缓存长度级别),然后尝试获取有空余空间的桶,如果无法获取则从有多个桶且有未缓存block的桶的缓存级别中拿来一个未缓存block的桶并重新初始化,从缓存数组中拿出一个元素的偏移量,根据桶的情况在修改长度层级中的有空余空间的桶列表和未缓存block的桶列表,将偏移量、长度、保存时间和是否在内存中封装为BucketCache.BucketEntry类对象实例,根据偏移量计算在总缓存(以ByteBuffer数组的形式引用)中的起始下标位置,然后依次写入缓存内容(此处做了较复杂的处理,猜测目的是为了适应buffer数组长度不能与缓存长度级别自适应的情况),至此数据已经写入缓存,如果期间发生异常需要及时释放缓存;写入正确后更新IO异常起始时间为-1;更新正要写入缓存的对象Map和缓存信息Map;如果已缓存总长度大于预设值(写死的95%)则开始回收缓存,见4.1.5。

这个过程和某些内存缓存存储的方案类似,不过区别是,这里总是拿一个完全空的桶,然后根据不同的长度桶中可存储的block个数不同。而有的内存存储中,是通过类似128K分裂为两个64K的桶,64K的桶适时的分裂为2个32K的桶来实现的。HBASE的方案好处是不需要进行整理,但是当某一级别的桶不够用时,有可能很多桶不满但是也不完全空。分裂的方案优点是存储空间灵活分配,但是有时需要整理,要不然会产生碎片。

4.1.3 读缓存

代码起始位置:L2Cache.java 35行。

    首先来看看接口中getRawBlock函数的注释:从L2缓存中取回block。该块由一个在磁盘中存储的字节数组来表示。

    如果读取时,记录仍在正要写入缓存的对象Map,则直接从其中获取数据,记录命中信息后返回;否则从缓存信息Map中获取缓存数据桶信息,根据信息中的长度和偏移量构造byte数组并从ioEngine对象中读取缓存数据;记录命中信息;如果已缓存信息中也没有命中,则记录未命中信息。

4.1.4 移除缓存

代码起始位置:L2Cache.java 53行。

    移除缓存以hfile为单位,首先从缓存索引中,拿出所有指定文件的缓存信息;然后将信息进行一次拷贝,用于生成缓存快照,此后可能会有新的缓存进入或移除,拷贝一次可防止并发导致的误操作;之后,针对每一个缓存信息:首先试图从正写入集合中移除;然后试图从已写入缓存集合中拿出对应缓存信息,然后获得缓存所在桶及对应缓存长度级别,释放桶中对应元素,之后各种标记。

    吐槽一下:虽然此段逻辑没有IOEngine的关系,L2缓存的逻辑层次到了内存IOEngine级别后较多,但是功能结构划分又不明确有待优化。

4.1.5 回收缓存

代码起始位置:BucketCache.java 743行。

    代码中首先根据DEFAULT_MIN_FACTOR计算每个缓存长度级别应当保留多少纯空的数据桶(最少1个),并计算出依次标准能释放出的存储量,如果总量小于等于0说明空间很充足不需要释放;然后打印诸多log;然后统计不同优先级SINGLE(首次加入缓存)、MULTI(被多次访问后)、MEMORY(默认未开启)归类(优先级依次提高);根据每个优先级的实际占用总和与DEFAULT_SINGLE_FACTOR、DEFAULT_MULTI_FACTOR和DEFAULT_MEMORY_FACTOR确定每个优先级多占了多少内存;从多占内存与(之前计算的可释放的存储量-已释放存储)/剩余优先级数量这两个值中取较小值作为此次回收内存的标准,开始释放本优先级内存;释放内存比较简单,即遍历根据之前统计各优先级占用存储时生成的优先队列,并释放移除的缓存(见4.1.4),直到达到此次释放标准或已遍历所有其中元素;最后,打印log释放锁。

4.1.6 关闭缓存

代码起始位置:L2Cache.java 58行。

    关闭缓存的逻辑比较简单:标记缓存关闭;关闭IOEngine(目前的内存IOEngine没有任何操作);关闭统计线程;中断缓存写入线程;清空正在写入缓存集合和已缓存集合。

4.2 HDFS写入类V2

4.2.1 构造


代码起始位置:StoreFile.java 892行。
 

源码分析之灰太狼手札(三):Master命令执行线程Worker

mopishv0 发表了文章 1 个评论 2001 次浏览 2015-09-08 15:17 来自相关话题

三 Master命令执行线程Worker 代码起始位置:HRegionServer 2094行。 Worker是region server执行master发来的指令的 ...查看全部
三 Master命令执行线程Worker

代码起始位置:HRegionServer 2094行。

Worker是region server执行master发来的指令的线程。Worker在HRegionServer构造时初始化,WorkerThread在HRegionServer线程run()函数之发送注册信息成功之后的线程初始化的逻辑中初始化并启动,WorkerThread以守护线程的方式启动,当发生未捕获到异常时,未捕获到异常处理器会结束region server进程;在HRegionServer线程run()函数之服务结束的逻辑中停止;只要线程处于活动状态(isAlive返回true),region server就会认为它状态正常。

接下来,按照MSG_REGION_OPEN、MSG_REGION_CLOSE、MSG_REGION_CLOSE_WITHOUT_REPORT、MSG_REGIONSERVER_QUIESCE、MSG_REGION_FLUSH、MSG_REGION_SPLIT、MSG_REGION_MAJOR_COMPACT与MSG_REGION_COMPACT、MSG_REGION_CF_MAJOR_COMPACT与MSG_REGION_CF_COMPACT的顺序进行分析。

3.1 MSG_REGION_OPEN

3.1.1 命令处理


代码起始位置:HRegionServer 2120行。

首先检测是否已获取到root表所在region或者当前需要open的region是否是root表。如果没有root表所在region信息则将命令重新放到toDo集合中,等待root表信息获取成功。如果root表信息已经获取成功,则创建并执行open region线程。线程主要调用openRegion方法。

方法中,首先将region正在open(HBaseEventType.RS2ZK_REGION_OPENING)的信息写入zookeeper;然后分配hlog;根据配置中hbase.hregion.impl的值创建HRegion实例并初始化;设置偏好节点(与flush有关、favoredNodes);对原有region的每个store文件则进行一次compact(创建compact任务,并放入线程池中执行);将region信息放入在线region集合并从opening region集合中移除;向master发送region已open信息;向zookeeper中写入region已open完成(HBaseEventType.RS2ZK_REGION_OPENED)。其中HRegion初始化过程比较复杂(构造,紧接着调用initialize方法),下面分析这个过程。

3.1.2 HRegion初始化

代码起始位置:HRegionServer 2257行。

HRegion构造函数:配置(这里的配置指设置对象引用)表文件路径、文件系统对象;获取key比较器(root、meta和普通表的比较器不同);配置HLog对象;初始化region用配置对象,并将表属性加入配置中;配置master传递过来的region信息;配置memstore flush监听器;根据hbase.hregion.memstore.percolumnfamilyflush.enabled配置是否允许根据单个familiy刷新memstore文件,开启此项后需要配合hb...store.percolumnfamilyflush.flush.size设置flush阈值;初始化region目录;根据hbase.hregion.keyvalue.timestamp.slop.millisecs设置key可容忍的最新的时间戳;根据hbase.hregion.memstore.flush.size设置flushSize;设置是否允许WAL;根据hbase.hregion.memstore.block.multiplier设置触发flush阈值是flushSize的多少倍;根据hbase.hregion.memstore.block.waitonblock设置flush时是否阻塞;初始化扫库任务集合。

HRegion.initialize方法:这个方法是初始化和open region的主要方法。方法开始时,设置任务监控对象,并为任务监控对象设置动态代理(但是动态代理什么都没有做,意义何在啊),任务监控对象保存在TaskAndWeakRefPair对象并存放到任务监控对象集合中,TaskAndWeakRefPair保存监控对象本身和本身的弱引用,当内存匮乏时,弱引用会较快失效,并在获取集合时会清理时导致监控对象失效从而使其从集合中移除,监控对象在initialize的过程中记录执行状态;然后在region目录中创建region描述文件(如果存在则忽略创建过程);删除原有region临时目录;

然后开始加载原有region存储文件。此段逻辑有一大段注释:加载所有region存储文件(HStores)。当replay log时我们不希望丢失任何数据,所以在replay时必须用谨慎的策略。对于每个存储文件(store),计算最大的已保存到文件中的log id(seqId)。在replay时,忽略小于等于log id的hlog。我们不能只从所有存储文件中最大的已保存到文件中的log id中选择最小值,因为多余的、不连贯的log id会产生不确定的问题(不是有版本控制和标记删除么?因为标记删除是可配的?)。

接下来看看具体逻辑是什么样的:首先,创建加载存储文件线程池,hbase.hstore.open.and.close.threads.max会限制最大线程数;然后对于每个列族(family),多线程地加载每个列族的存储文件并创建Store对象(包含StoreFile集合,StoreFile代表具体的region列族的数据文件);统计除bulk-loaded外的存储文件的最大的log id用于replay;统计包括bulk-loaded的存储文件的最大的log id,用于初始化region分配给新操作的log id值;统计最大时间戳(可以是用户指定的值)并+1,用于版本控制;设置每个Store对象的最后刷新时间为当前时间;然后开始replay log,将返回的log id和上面统计的log id取最大值+1返回函数并设置给HLog对象;replay完成后删除HLog文件。

这段逻辑中又有两部分比较复杂:构造Store对象和replay log。

3.1.3 创建Store对象

代码起始位置:HRegion 612行。

构造函数中首先调用自己的另一个进行基础设置的构造函数,首先设置region信息和store目录;设置列族信息和配置;根据hbase.hstore.majorcompaction.compression设置major compact压缩算法;根据普通列族、root列族、meta列族设置不同的compactor;初始化MemStore对象,初始化中主要是初始化多个集合;根据hbase.hregion.max.filesize设置存储文件最大长度;根据hbase.hstore.blockingStoreFiles设置最大compact文件数、hbase.hstore.close.check.interval设置每写入多少字节检测一次是否需要compact。

其中有一个十分重要的对象的初始化:CacheConfig cacheConf。这个对象负责block的缓存工作。一级缓存使用LruBlockCacheFactory的实例,二级缓存使用L2BucketCacheFactory的实例,两者都是单例的。首先构件CacheConfig工厂对象,它会获取上述两个类型实例并从配置文件和列族信息中读取多项配置(详见CacheConfig 567行)。然后,申请缓存并构建CacheConfig对象。

退出进行基础设置的构造函数后,多线程地加载列族目录中的数据文件(忽略长度为0的文件,生成StoreFile对象集合),主要是根据传入参数设置一些属性;如果文件是引用文件,设置引用文件实际路径;根据io.storefile.bloom.enabled决定是否使用布隆过滤器,然后获取文件更新时间。

加载数据文件后,根据hbase.peak.start.hour和hbase.peak.end.hour设置hbase尖峰时间;根据hbase.compactionmanager.class设置CompactionManager实现类;根据kvaggregator设置key value处理实现类KeyValueAggregator实例(类似key value处理钩子);根据compaction_hook设置compact钩子CompactionHook实现类。

3.1.4 replay log

代码起始位置:HRegion 647行。

首先,过滤region目录中符合正则表达式"-?[0-9]+"的文件,这些文件为HLog文件;然后对于每个HLog文件,进行replay,并返回最大log id。

replay中,首先创建任务监控对象(如3.1.2中所述)并根据hbase.regionserver.hlog.reader.impl创建HLog.Reader类实例。然后顺序处理每一条操作日志的每个key/value:检查region和familiy是否为要replay的region中的内容;检测操作日志中的log id是否大于store文件中的log id,这一步之所以要在此处判断是为了统计忽略了多少KeyValue;如果通过检测,则将KeyValue对加入MemStore中,加入后如果需要flush MemStore,则在处理当前操作日志完成后flush一次(会在MSG_REGION_FLUSH中具体分析);每个操作处理完成后,都需要向任务监控对象(具体replay数和忽略数)和master汇报(将正在处理open region放到2.3中所说的向master发送的消息集合outboundMsgs中)。

当replay操作完成时,标记任务监控对象;返回最后一次replay的操作的log id。

3.2 MSG_REGION_CLOSE和MSG_REGION_CLOSE_WITHOUT_REPORT

3.2.1 命令处理


代码起始位置:HRegionServer 2145行。

    此处的REPORT是指向zookeeper汇报,如果WITHOUT_REPORT则不对zookeeper节点做处理。接下来按照需要向zookeeper汇报的逻辑进行分析。

    命令处理直接调用closeRegion,函数中首先向zookeeper中写入region正在关闭事件(HBaseEventType.RS2ZK_REGION_CLOSING);然后获取region对象,调用region的close方法;从在线region集合和统计region log id的集合中移除region信息;向近期关闭的region列表中加入region信息(列表中只保存3个最近关闭的region);最后,向zookeeper中写入region已关闭事件(HBaseEventType.RS2ZK_REGION_CLOSED)。

3.2.2 region关闭

代码起始位置:HRegionServer 2402行。

region.close()是本地region关闭时的主要处理内容,函数注释内容为:关闭此HRegion。只要参数abort(退出进程)不为true就会flush缓存。关闭每个HStore并不再响应任何请求。此方法会消耗一些时间,因此不要在时间敏感的线程中调用此函数。返回值为所有组成HRegion的HStoreFile的列表。当此region此次不关闭或已经被关闭过时,返回null。

    接下来分析函数的代码逻辑:首先建立任务监控对象;然后获取splitLock可见关闭与split是互斥操作;然后获取写状态对象writestate锁,禁止写入操作;等待compact和flush结束(此处使用writestate.wait()阻塞,我觉得容易造成死锁或错误的阻塞,应该使用wait(long timeout));如果之前没有刷新过且MemStore中的数据缓存大于hbase.hregion.preclose.flush.size则进行一次flush MemStore,完成此操作后将要禁止读取操作。

然后获取扫库操作锁、获取split和关闭操作时使用的锁对象splitsAndClosesLock的写入锁(该锁将会阻塞部分读取操作);设置region closing标记为true;等待行锁释放;再次执行刷新MemStore;多线程地执行store.close()(分布式的关闭文件读取对象),关闭列族数据存储对象;向结果列表中加入已关闭的数据存储文件StoreFile对象;设置region closed标记为true。

3.3 MSG_REGIONSERVER_QUIESCE

    此命令要求关闭所有用户region,所以在关闭前排除meta表region即可(可是root表呢)。

3.4 MSG_REGION_FLUSH

3.4.1 预处理

代码起始位置:HRegionServer 2191行。

    首先翻译一下函数注释。当发生以下情况时,不会flush缓存:缓存为空;region已关闭;已有flush操作;该region不可写。此方法会产生一段时间的阻塞,所以不应该在时间敏感的线程中调用它。参数selectiveFlushRequest如果为true,则选择性的flush列族(memstore大小由列族指定),同时此功能需要在配置文件中开启。如果返回true,则代表缓存已flush。

    接下来分析代码逻辑:根据配置hbase.hregion.memstore.percolumnfamilyflush.enabled再次为参数赋值;创建任务监控对象;如果region已close或写入状态对象writestate中已标记flushing或不可写入则退出函数,否则标记flushing;获取splitsAndClosesLock读锁,使该操作与split和关闭region互斥;如果选择性flush参数为true则过滤需要flush的Store对象(执行master发送命令时,此参数为false),过滤条件比较简单,判断memstore大小是否大、于列族中的设置;然后调用internalFlushcache 开始具体的flush操作;flush后释放锁、去除flushing标记。

吐槽:写hbase的也会写这样的代码,值得吐槽一下return (store.getMemStoreSize() > this.columnfamilyMemstoreFlushSize) ? true : false;此处还有一点flushcache返回值代表是否已flush,但是返回值是由internalFlushcache函数返回的,该返回值的注释是region是否需要compact;另外操作中的锁对象变化比较复杂,应当考虑用状态模式替换各种操作下的region状态。

internalFlushcache函数有一大段注释。flush Memstore有一些棘手。Memstore中有很多已写入HLog的更新。Flush Memstore到磁盘的过程中,要尽可能多的处理读写请求。同时,为了容灾时能区分flush过的数据和需要重演到Memstore中的数据,HLog也必须能清楚的界定刷新时间。因此,flush处理分为三步:A.flush memstore中的数据至磁盘中并为log指出seqid(log id);B.根据上步中的seqid向log中写入” FLUSHCACHE-COMPLETE”消息;C.移除已flush过的冗余的memstore。此函数会阻塞一些时间。返回值为true代表需要进行compact。接下来将按照注释中的划分分析代码逻辑。

3.4.2 准备阶段

    代码起始位置:HRegion 1386行。

    虽然,函数注释中,将处理过程分为了三部分,但在代码中A开始前还是有一长段处理逻辑的。

    首先通过updatesLock禁止写入操作;然后获取需要flush和不需要flush的Store对象集合;获取两个集合中的最小的log id(这是一个记录在memstore中的变量);在HLog对象中将region对应的最小log id从firstSeqWrittenInCurrentMemstore(当前region未flush最小log id)移动到firstSeqWrittenInSnapshotMemstore中(当前region正在flush最小log id,如果flush中途失败其中的内容不会移回firstSeqWrittenInCurrentMemstore,但所有wal的log处理两个集合都是同时处理的,因此不必担心无法滚动日志的问题)并返回最小log id-1作为flush起始id;为每个Store对象生成StoreFlusher(没有复杂逻辑只是new一个对象);调用flusher.prepare(),其主要工作是生成memstore的快照;释放updatesLock写锁。

    flusher.prepare()的逻辑稍微复杂些,这里单独分析下。首先,获取并加写锁;然后,将Memstore中的key value集合引用付给快照变量,并重新构造一个集合付给原有变量;同样的,将最小log id、时间戳范围跟踪器、key value删除计数器和Memstore内存申请对象(其实只有一个scanner引用计数会被用到,为什么重新构造呢)都付给快照对象并重新构造;释放锁。

3.4.3 Step A & B & C

    代码起始位置:HRegion 1579行。

本步首先调用flusher.flushCache(status)。函数中首先获取需要读取的最小时间戳(包括列族设置和所有扫库对象);设置最大版本号并创建遍历对象(InternalScanner接口、StoreScanner类的实例),用于遍历刚刚创建的Memstore的快照,其中过滤器列表的参数内容是一个CollectionBackedScanner类实例。

    构造遍历对象中,首先调用自身的设置参数的构造函数,这个函数中会设置缓存、列族存储对象(Store)、扫描列(此次操作中不设置列)、设置回闪(撤销删除)时间、根据io.storefile.delete.column.bloom.enabled和use_delete_column_bloom_filter决定是否使用布隆过滤器;然后,创建Key Value过滤器(ScanQueryMatcher),主要是将start、end key、回闪时间、扫描列和最大版本数等参数;根据遍历器中的统计信息决定是否使用此过滤器(flush中使用的CollectionBackedScanner类实例直接返回true,即必然使用);定位遍历器到需要遍历的第一行。

    构造完Memstore中记录的遍历对象后,开始构造KEY VALUE写入对象。首先,通过连缀的方式构造写入对象构造工厂;然后,构造文件写入对象(StoreFile.Writer类)实例。构造实例时,首先将没有设置的关键参数设置为默认值并设置临时文件路径;然后,根据hfile.format.version创建不同版本的HFile Writer构造工厂实例并进行参数配置(包括压缩算法,在每次写入一个block后会对block进行压缩);创建输出流实例,如果文件系统对象为HDFS的DistributedFileSystem,则尝试传入上文所述的偏好节点(favoredNodes);然后创建不同版本的HFile Writer实例。之后是构造布隆过滤结果写入对象:根据io.storefile.bloom.enabled和之前的列族设置决定是否使用布隆过滤器;然后如果HFileWriter的版本为2,则根据io.storefile.delete.family.bloom.enabled和io.storefile.delete.column.bloom.enabled创建列族删除和列删除操作布隆过滤器。

    之后通过上述构造好的遍历对象遍历Memstore中的KeyValue对象,将其进入memstore的时间标记设置为0;向StoreFile.Writer中写入。写入逻辑比较复杂。首先是对每个布隆过滤器进行处理。由于KeyValue对象在Memstore中是有序的,所以每次比较是否为新key时,只需与上一个对象比较即可。如果为新key,生成布隆过滤器key和偏移量并将记录加入过滤器中。对于列族删除和列删除操作布隆过滤器也有相似处理逻辑。然后向HFileWriter中写入数据;更新记录时间戳范围。写入后计算flush字节大小。

    所有KeyValue写入后,写入StoreFile的meta信息,包括最大log id、是否是major compact生成以及时间戳范围;然后调用钩子函数(目前没有特殊处理InjectionEvent.STOREFILE_AFTER_WRITE_CLOSE的钩子);然后将生成的文件从临时目录转移至region文件目录中;调用处理InjectionEvent.STOREFILE_AFTER_RENAME事件的钩子函数;创建StoreFile文件对象,主要是设置一些属性;创建文件对象对应的reader,创建reader的过程中同样要根据不同版本创建不同的HFileReader实现类对象,然后再重新将meta信息和布隆过滤内容重新加载出来(为了顺便校验文件么,可是之前已经有相同的步骤校验文件了啊……之前的校验意义何在呢)。

    以上为flusher.flushCache(status)的逻辑。之后执行钩子函数internalPreFlushcacheCommit所返回的钩子(目前没有钩子)。然后逐个调用flusher.commit()完成flush。其中的逻辑一开始主要是更新监控和输出log;然后将新生成的StoreFile对象加入Store的有序列表中;将之前建立的快照丢弃;关闭之前的遍历对象;检测是否需要compact(与3.1.3中逻辑类似,与storefile数量和配置有关)。

    最后清空fluser集合并更新每个列族存储对象store的最后flush时间。步骤A到此结束。

    步骤B时,StoreFile已创建成功,此时开始标记HLog中完成flush的log id。先从firstSeqWrittenInSnapshotMemstore中将3.4.2中的log id移除;然后在regionServer中将该region已flush的最小log id。

    步骤C也比较简单,首先notfyall所有等待flush的线程。然后记录log和最近flush信息,然后返回是否需要compact。

3.5 MSG_REGION_SPLIT

3.5.1 获取split row key


代码起始位置:HRegionServer 2156行。

    处理请求前,先进行一次flush。然后标记进行手动split(force split),传入预设定的row key。然后针对每个列族存储对象(Store),判断其是否有引用的存储文件,如果有则不可split。然后,开始获取split的row key,如果之前请求中有设置,则会直接返回请求中的值,否则每个列族获取一次split row key,然后取最大列族(存储文件总长度)的中间row key为此次split的row key。

    列族中的中间row key的逻辑为:遍历所有存储文件对象,选取最长的存储文件,读取其中间row key(偏移量写在文件尾,读取时只需取出对应block,如果文件中没有则读取中间的block);然后,比较开始row key和结尾row key,不能相等,如果相等则不能split返回null,会选用其他region的中间row key。

    获取split的row key后,即向处理split的线程池中加入任务。

3.5.2 执行split

代码起始位置:SplitRequest 38行。

    该段逻辑主要包含在region.splitRegion(midKey)中。该函数注释为:Split HRegion为两个新region并关闭当前region。Split应当比较快速,因为不必重写store file,而是创建store file的按上下两部分分割的软链。

    接下来分析代码逻辑。首先,判断split row key是否与region的start key或end key相等,如果相等,则不需要split。如果不相等,在父region目录中建立splits目录;然后创建两个HRegionInfo对象,主要是设置一些基本信息;在生成的split目录中创建以两个region名命名的目录;然后开始关闭父region并获得该region所有store file对象,逻辑参见3.2.2;然后针对每个store file对象,以“文件名.父region名”为名生成软链文件。然后创建HRegion对象,逻辑见3.1.2第一段,但并没有让其初始化。然后将splits目录中的文件移至新生成的region目录中。

3.5.3 后续处理

代码起始位置:SplitRequest 53行。

    得到split出的两个region对象后,开始更新region信息,.META.更新-ROOT-表,其他表的region更新.META.表:将父表设置为下线和split;然后将父表信息从在线region集合和flush log id集合中移除;将对应表中父region的起始时间戳和实例位置置空,写入split后的子region信息;向对应表中写入子region信息;添加向master汇报split结束的消息;输出LOG。

3.6 MSG_REGION_MAJOR_COMPACT和MSG_REGION_COMPACT

3.6.1 构造compact处理线程----默认策略

代码起始位置:HRegionServer 2165行。

    命令处理中,首先将是否强制compact写入region的所有store对象中。然后针对每个store,创建并执行compact处理线程。接下来先分析线程构造的逻辑。

    逻辑首先判断region是否可写并加入读锁(可以与flush互斥)。如果当前有正在compact的文件,则获取其中最后一个,再次compact时将不包括其+1之前的文件。然后开始过滤需要compact的文件,在这段逻辑中只有设置了major compact并且上一步没有过滤掉文件才能被认作真正需要执行major compact。

如果设置了hbase.store.delete.expired.storefile为true和TTL并且有超时的storefile,则此次compact只对这些超时文件进行删除处理(如果为major compact,此处则不会检测ttl)。然后开始过滤大文件,找到首个长度小于hbase.hstore.compaction.max.size的文件在集合中的位置,然后将这些文件从compact文件列表中移除。然后开始判断是能成为major compact,需要符合三者之一:(1)有引用文件;(2) compact文件数小于hbase.hstore.compaction.max并且store file最小更新时间范围在major compact范围内(根据hbase.hregion.majorcompaction和hbase.hregion.majorcompaction.jitter计算)并且当前时间不是hbase尖峰时间(hbase.peak.*设置)并且当前并不是只有一个已major compact过的文件;(3) 被认作真正需要执行major compact。

如果不需要major compact,则开始过滤文件:如果hbase.hstore.compaction.exclude.bulk为true,则忽略通过bulk-loaded导入的文件。然后根据预定算法过滤文件,调用applyCompactionPolicy函数,这段逻辑的注释为:首先如果配置了不包括bulk-loaded导入的文件则过滤之,然后根据新文件比例(compactRatio)确定一个偏移值,使新文件也能满足这个标准。如果文件数大于最大compact文件数,则会在compact()函数中递归。考虑最老的文件,避免compact [end-threshold,end)范围内的文件。最后,文件成为所有文件的compact结果。代码逻辑中首先标记非尖峰时刻compact数,并根据是否为尖峰时期和hbase.hstore.compaction.ratio、hbase.hstore.compaction.ratio.offpeak得到文件过滤比值。然后以hbase.hstore.compaction.max为窗口,从后向前的计算窗口内文件长度总和(O(n)的算法)。然后根据hbase.hstore.compaction.min.size、hbase.hstore.compaction.min、文件过滤比值和窗口内总文件大小计算出起始compact窗口位置,从而过滤掉过大的文件(需要比较有经验才能设置好这些参数)。注释中给出了如下配图,比较能表达其思想。


1.png



如果过滤后文件数量小于hbase.hstore.compaction.min则将compact文件列表置空。如果大于hbase.hstore.compaction.max则只保留前hbase.hstore.compaction.max个文件。至此,已筛选出需要compact的文件。

筛选完文件后,将其加入正在compact文件列表中并重新排序。如果筛选出来的文件不等于总文件数,则不会进行major compact。然后根据hbase.hstore.blockingStoreFiles计算任务优先级,当前总文件越多,计算出的值越低。然后new出compact线程对象并释放读锁。

由于是手动触发操作,此处优先级会被置为1。然后线程对象会根据需要compact的总文件长度和hbase.regionserver.thread.compaction.throttle被放入不同的线程池中运行。

3.6.2 构造compact处理线程----按时间分层策略

代码起始位置:TierCompactionManager。

    按时间分层的compactionManager类TierCompactionManager覆盖了applyCompactionPolicy函数,封装了对应的配置类TierCompactionConfiguration(extends CompactionConfiguration)。其构造函数主要是加载配置,值得注意的是,他可以针对每个表和列族进行单独配置。

    然后是过滤逻辑,其代码注释为:首先如果配置了不包括bulk-loaded导入的文件则过滤之;将文件从老到新排列,然后选择一个适当的[start,end)范围。从老到新尝试每个file在列表中的索引值作为start,并定义对应的end (也是file在集合中的索引值)。当[start,end)为一个可接受值时停止。当[start,end)为一个可接受值时应当满足以下条件:(1) fileSize[start]最多为maxCompactSize;(2) 参与compact的文件数量最少为currentTier.minFilesToCompact;(3) fileSize[start]最多为minCompactSize或最多为剩余文件的配置的倍数(我觉得这段注释语法不通至少缺个of)。end值为endInTier[tierOf[start].endingInclusionTier],默认情况下currentTier.endingIndexForTier = currentTier(拜托,这种注释是给作者本人看的吧),因此默认情况下,end总是为currentTier的最后一个文件的索引值+1。默认情况下要确认不同层次的文件不会被同时选取。注释给出了示意图:

2.png


    接下来分析代码逻辑。该逻辑的重点是将文件按时间新旧分层compact。设文件列表数组为f[0, n-1],先求第i个文件与后续文件的长度和sum[i]。然后开始对文件分层并计算每层的end对应的f中的下标,处理过程为从配置中依次取出层对象进行以下处理:如果配置了hbase.hstore.compaction.(表名.列名或Default,以下忽略此段配置前缀).IsTierBoundaryFixed为true则代表使用类似crontab的方式配置(表达式为** .Tier.第几层.Boundary)分层并计算出该层的起始时间点。n向0的遍历f,从0开始遍历层,层数配置为**.NumCompactionTiers,由于end是开区间所以层的end下标从n开始也就是实际结束下标+1;判断每个文件的最小flush 时间是否符合当前层时间范围,如果用crontab则判断其是否大于等于计算出的值,否则判断其是否距现在小于等于**. Tier.第几层.MaxAgeInDisk并且小于等于当层文件最大值**. Tier.第几层.MaxSize。一旦不符合当前层的条件,则切换到下一层,否则标记f[i]属于当前层。

    接下来尝试能够进行compact的[start,end)。大多数情况**.IsRecentFirstOrder都会配置为true,而且为false时逻辑比较简单,直接从f[0]所在层尝试是否可以作为start,不行则start++。所以这里重点说下从最新的层开始尝试的策略。首次尝试时,首先获取首个层end值不与第0层end值相等的层,因为有的层中可能没有文件(所以这里需要用过while循环进行查找),找到的层的end即为首次可尝试的start。如果无法进行compact(后续分析判断标准),则在当层中+1如果start值等于当前层end值,则切到下一层,并找到第一个与当前层end值不相等的层的end值,将其作为start返回。此处需要说明的是因为每层的取值范围为左闭右开,所以每层的end实际上等于上一层的start。拿到start后,先获取start所在层的end值(f[i]对应的层上段逻辑已求出)。然后通过sum数组值获取当层除start外,其他文件总长度。如果满足以下条件,则为可进行compact的start和end值:(1) 当前文件小于等于hbase.hstore.compaction.max.size。(2) end与start之间的文件数大于等于**. Tier.当前层号.MinFilesToCompact。 (3) 当前文件长度小于等hbase.hstore.compaction.min.size或小于等于剩余文件总长度乘以**. Tier.当前层号.CompactionRatio。选取好start和end值之后会输出较详细的log,可用于推断正确性。最后截取并返回文件列表。

3.6.3 执行compact处理线程

代码起始位置:CompactSplitThread 168行。

    线程中的run函数逻辑比较简单,调用HRegion的compact方法,该方法完成了compact操作。操作完后,如果当前操作优先级为不为正值(即列族中的文件数compact前大于hbase.hstore.blockingStoreFiles),则继续请求一次该列族的compact,否则如果符合以下条件则进行split:compact优先级不为负数;region的所有列族文件中没有引用文件;region的某个列族文件总长度大于maxfilesize(可在表和hbase.hregion.max.filesize进行设置)。获取split key和split的逻辑见3.5。

    接下来分析一下compact的逻辑。首先,获取splitsAndClosesLock锁并确认当前region没有关闭;然后,增加正在compact的操作数;调用doRegionCompactionPrep函数,该函数没有实现,如果做预处理可以在此处修改;然后进入compact对应的store对象进行compact操作;最终,释放锁。

    store的compact操作过程为:获取待compact文件中的最大log id(记录在文件的meta信息中);初始化任务监控信息;获取待compact的文件中最小的flush时间,统计所有文件中的记录数总和;针对每个待compact的store file,生成扫描类StoreFileScanner对象实例,StoreFileScanner封装了HFileScanner的实现类包括ScannerV2、EncodedScannerV2和ScannerV1;如果此次compact为major compact并且配置了major compact压缩算法hbase.hstore.majorcompaction.compression,则使用此算法作为compact压缩算法,否则使用表中声明的算法;获取目前所有scanner的最小起始扫描时间(似乎只为满足接口,因为后续会忽略掉MVCC),并设置给threadlocal变量;根据是否为major compact和hbase.hstore.time.to.purge.deletes设定保留删除记录的时间范围;构造StoreScanner类实例,根据列族中设置的回闪时间设置回闪时间点,构造记录过滤类ScanQueryMatcher对象实例(设置各种属性),过滤掉所有scanner中最大记录时间小于TTL时间的scanner,构造记录堆排序对象;构造store file写入对象,写入目录为region的.tmp,过程见3.4.3;接着开始通过StoreScanner类实例遍历待compact store file中的记录,遍历的具体逻辑在scan操作时再做分析;对于每条记录如果有compact钩子则先进行钩子处理,然后向store file writer中追加记录;每写入hbase.hstore.close.check.interval长度的记录后,检测一次region是否依然可写入,如果不能写入则删除已写入的文件并抛出异常;最后,向store file writer中追加meta信息。然后,进行compact完成后的后续处理:校验生成的store file的合法性(详情见3.4.3);将文件移至列族目录中;构造store file对象,并为其创建reader对象(详情见3.4.3);为其设置表和列族信息;从正在compact store file文件集合和region的所有store file文件集合中移除此次已compact完成的文件;将新生成的store file加入region的store file集合中;通知所有store scanner,会使其扫库中断;调用STORESCANNER_COMPACTION_RACE事件;关闭compact完成的store file的读对象,清空其缓存,变比输入流;删除已compact完成的store file;重新统计region中的store file的总长度;至此store内部的compact处理完毕。

3.7 MSG_REGION_CF_MAJOR_COMPACT和MSG_REGION_CF_COMPACT

    首先,获取region,然后按照上面的处理过程,按列族生成compact处理线程并执行。至此,woker线程的处理分析完毕,如果这些处理失败,会将请求的重试次数+1,然后重新放回todo集合中,否则按照之前所述检测文件系统。

源码分析之灰太狼手札(二):HRegionServer线程run( )函数

mopishv0 发表了文章 0 个评论 1551 次浏览 2015-09-08 15:10 来自相关话题

二 HRegionServer线程run()函数 2.1 发送注册信息  代码起始位置:HRegionServer 705行。 ...查看全部
二 HRegionServer线程run()函数

2.1 发送注册信息 

代码起始位置:HRegionServer 705行。

run()函数首先进入第一阶段(自创名,由stopRequestedAtStageOne起名)。第一阶段是region server线程首次启动并向master注册,之后不断与master交互的阶段。

在函数开始时,首先重命名线程名,线程启动时首次重命名,后续都由reinitialize()重命名。之后只要stopRequestedAtStageOne为false就向master发送启动信息,调用reportForDuty,因为stopRequestedAtStageOne即可退出循环,因此在发送注册的逻辑中多次判断了此值,从而能多个等待的死循环中跳出注册信息的构造。

reportForDuty中首选获取master信息,调用getMaster,这是一个死循环逻辑,获取失败会睡一会儿(所有的睡一会儿操作都依赖于sleeper)。getMaster中也是一个死循环,它会先从zookeeper中获取master信息,此处链接zookeeper异常的话会强行退出进程。如果没有节点,则会立即continue(cpu会被吃满的啊……;拿到master地址后用过HBaseRPC.getProxy检查协议版本是否匹配(不匹配抛异常,吐槽一点,既然不匹配就抛异常,为什么又要缓存版本号呢,这样如果更新重启了还是照样会报版本不匹配的异常啊,意义何在啊……)、生成调用动态代理实例(由 org.apache.hadoop.hbase.ipc.HBaseRPC.Invoker.Invoker 进行封装),抛出异常时会睡一会儿。构造好HMasterRegionInterface实例后,保存到hbaseMaster和masterRef中,之所以要保存两份,我猜应当是应对不同锁级别需求的处理吧。然后getMaster会必然返回true,外围的死循环意义何在?

然后开始获取服务状态,统计内存使用,将自己的信息写入zookeeper,然后通知master本region server的启动信息并break出循环。如果zookeeper写失败,则会睡一会儿后继续循环。

退出reportForDuty后,会根据master返回的map初始化线程(这部分下次再分析)。如果maser没有响应则睡一会儿。如果睡一会儿后依然stopRequestedAtStageOne == false,则再次进入reportForDuty,如果一切没有变化则会在写zookeeper时失败(因为节点已经存在,返回false,不抛异常),然后陷入reportForDuty的第二个循环中。当stopRequestedAtStageOne被设置好后,退出循环但是因为没有向master发送请求,所以没有返回的map,因此不会重复进入初始化线程(也就是说,如果master有变化或zookeeper有变化,则会重走上述流程)。      

stopRequestedAtStageOne应当是在他处被设置成true的。由此引申出2个问题,stopRequestedAtStageOne何时被设置为true,master端处理regionServerStartup流程。

虽然明白编码不是一朝一夕的事儿,并且多人长期合作必然会导致部分没有意义的代码,但看到后依然还是会为浪费的时间吐槽,但不代表不能理解,详情参考《人月神话》。

2.2  Master响应regionServerStartup 

代码起始位置:HMaster 1270行。

上节说道HRegionServer线程run()函数之第一阶段中会向Master发送regionServerStartup请求,这次先说说master如何处理请求。

master端响应的主要目的检测region server启动信息是否有效并向region server返回一些必须参数。函数中首先从RPC请求中获取region server的host然后通过传递过来的serverInfo参数获取端口(为什么不都从serverInfo中获取呢?而且后续还会替换rs中的host,那前面的一系列host获取处理意义何在呢?不过0.90中这段逻辑已经被去掉了,可能真的没有意义吧)。regionServerStartup中,根据注释描述主要检测两种异常情况:在ZNODE失效前快速重启;region server正在被master做失效处理中。之后还要检测其是否在serverAddresstoServerInfo中,如果在则出发过期处理,下次请求时信息就应当已经serverAddresstoServerInfo中移除并在排队等待ProcessServerShutdown处理。那么代码中实现是如何的呢?

代码中首先从已知region server信息中查找是否已具有注册信息。如果有:判断两者的startcode(启动时间戳),注册信息小则返回YouAreDeadException异常,如果相等啥也不干。如果大,则进行region server过期处理:将其从已知region server信息中移除。并将其加入deadServers和getRegionServerOperationQueue做ProcessServerShutdown处理。如果没有:不作处理。之后如果regersion server正在做失效处理,则抛出YouAreDeadException。否则记录新regionserver信息:向集合中加入信息,创建zookeeper watcher。

检测完有效,会将master获取的regtion server的ip当做的hbase.regionserver.address、hbase.rootdir和fs.default.name返回给region server。

2.3 发送注册信息成功之后的线程初始化

代码起始位置:HRegionServer 714行。

上次分析了master响应regionServerStartup的流程,本次以该流程没有返回YouAreDeadException为假定,继续分析后续逻辑。返回YouAreDeadException的逻辑且听后续分解。

发送注册信息成功之后首先将master的返回值写入config;重新创建host信息;初始化mapred.task.id和fs.defaultFS;构造HDFS路径;构造HLog对象;构造metrics监控对象;如果需要则启动thrift服务;调用startServiceThreads启动各主要线程。

startServiceThreads的注释中说明:启动状态维护线程、客户请求处理线程(Server)、master请求响应线程(Worker)以及心跳检查(这里英文用的是租期检查)。构建一个未捕获到异常处理器,当线程中出现一个未捕获到异常时,退出region server。但这个处理器不会设置给所有的线程。客户请求处理线程是没有限制的,如果线程内出现了OOMException,它会等一会儿然后重试,与此同时,flush和compaction如果尝试运行的话,则会触发比较严重的状态,然后会运行关闭客户请求处理线程。心跳检查会同时运行,它是一个继承Chore的按时间间隔执行的线程,它按照自己的间隔停止机制,所以这个线程需要被机器停止。master请求响应线程会记录异常并退出。注释中说明了个线程的停止机制,写在这里当做参考,startServiceThreads的实现是怎样的呢?

startServiceThreads中首先构造一个未捕获到异常处理器(当有异常时,直接退出region server),将这个处理器设置给hlogRollers、workerThread、majorCompactionChecker并作为守护线程启动。然后启动心跳检测线程;启动客户端响应线程;启动多个splitLogWorker(负责Region恢复前,分发HLog中记录到具体Region目录的任务处理),数量由hbase.hregionserver.hlog.split.workers.num决定;构造扫库任务线程池scanPrefetchThreadPool。

至此发送注册信息成功之后的线程初始化就结束了,如果这阶段出现异常的话设置stopRequestedAtStageOne为true并抛出异常IOException。

2.4 与master的持续交互

代码起始位置:HRegionServer 724行。

目前看来StageOne比预想的复杂。初始化好各线程后,开始进入新的死循环逻辑:region server开始与master进行持续的交互。

循环中首先从zookeeper中获取root表所在的region server,如果没有变化则只获取一次。然后在大于消息发送间隔或有需要发送给master的消息(outboundMessages不为空)时,与master发生交互。在整理交互内容的过程中,首先收集内存使用情况和各region的负载情况。然后加入并去重需要发送给master的消息(此处也从HRegionServer的属性outboundMsgs获取消息)。然后向master发送请求regionServerReport,master也会有消息返回,返回的消息中可能包括对region server的操作指令。

消息发送完成后根据已发送的消息对本地待发送消息去重。此后,如果之前收到过关闭所有region的命令并且所有region已经关闭完成,此时会stopRequestedAtStageOne.set(true)并退出循环。

做完返回消息无关处理后,开始逐条处理返回消息。奇怪的是,每次处理消息都要将root表所在region server信息重置。之后根据返回消息的不同类型,将其分发到不同的处理队列或集合中,预计会由不同的线程进行处理。此处由swicth case处理,实质上个人认为既然已经使用了枚举类,将处理内容封装到枚举类中更好。目前包括的分发处理包括:MSG_REGIONSERVER_STOP、MSG_REGIONSERVER_QUIESCE(关闭所有用户region)、MSG_REGION_CLOSE、MSG_REGION_OPEN()、default,此段逻辑中的处理大都是去重然后放到toDo中,toDo中的消息后续由work线程处理。整个过程中有个restart一直起作用,并会在适时的时候通过stopRequestedAtStageOne判断是否需要退出死循环。

以上逻辑如果产生异常,会尝试将RemoteException转为普通异常。如果是YouAreDeadException则直接抛出,其他情况则尝试重试,每重试hbase.client.retries.number次会检测一次文件系统。检测文件系统有频率控制,会尝试访问根路径。如果访问错误,则会记录此轮首次错误时间,属性fsOk会被设为false,但不会重复检测,如果下次检测时fsOk依然为false并且此轮首次错误时间距现在超过hbase.regionserver.check.fs.abort.timeout则会结束region server进程。

如果顺利,会通过outboundMsgs的LinkedBlockingQueue.poll(long timeout, TimeUnit unit)方法睡一会儿,睡觉间隔的计算使得循环以固定速率执行(类似scheduleAtFixedRate)。睡醒后,会执行housekeeping方法(奇怪的命名),方法内容就是如果toDo(保存上述逻辑从master获取并需要执行的任务列表)中有MSG_REGION_OPEN,则在下一轮交互中加入已经对此作出响应的应答(注释中与此意义相同)。

至此,第一阶段分析完毕。如果此阶段发生异常,首先会检测是否是OOMException,如果是调用forceAbort结束region server进程,后续处理意义似乎也不大了,如果不是(大多是YouAreDeadException),调用abort结束region server进程。

2.5 服务结束

代码起始位置:HRegionServer 917行。

退出第一阶段后,HRegionServer线程即将结束,此处主要做一些通知master和清理、停止线程的工作。走入此流程的原因有三种:zookeeper得到异常或事件通知(其中会设置killed=true)、调用HRegionServer.abort(其中会设置abortRequested=true)和region server状态不健康。此段逻辑是用大量if区分不同原因导致的退出的处理,实质上可以用枚举集合或状态+组合模式划分代码。接下来先说说不同原因处理逻辑的特有部分吧。

zookeeper原因:kill所有hlogs线程。

调用HRegionServer.abort:不阻塞的向master汇报退出消息;close所有hlogs线程;关闭所有region;公有部分执行完后等待所有线程关闭,之后关闭文件系统。

region server状态不健康:不阻塞的向master汇报退出消息;关闭所有region;如果所有region关闭不成功的话,直接执行公有部分代码;closeAndDelete所有hlogs线程;通知master本region server正在重启(restartRequested)或退出;通知master关闭region;公有部分执行完后等待所有线程关闭,之后关闭文件系统。

接下来分析公有部分代码逻辑:首先停止各线程;然后停止向master的RPC代理;制空master对象;关闭zookeeper客户端。

此段逻辑比较简单,之后将对各线程的创建、运行和结束进行分析。

源码分析之灰太狼手札(一):HBase RegionServer

mopishv0 发表了文章 0 个评论 1725 次浏览 2015-09-08 15:06 来自相关话题

一 RegionServer启动和构造 1.1 启动      代码起始位置:HRegionServer 3730行。 ...查看全部
一 RegionServer启动和构造

1.1 启动 

    代码起始位置:HRegionServer 3730行。

启动RegionServer的主类是HRegionServer。

main函数中主要是设置config。doMain中将-D中的配置设置到config中。因此配置的优先级为,-D>hbase配置>hadoop配置。

之后判断命令内容,值得注意的是是不允许通过stop关闭RegionServer,需要通过bin/hbase-daemon.sh stop regionserver和Kill关闭。另外一个合法的命令就是start了。start中如果为LOCAL模式的话则不允许启动。HRegionServer的实例是通过反射构造出来的,类型可以配置到配置文件中。由于HRegionServer本身为Runable所以直接将生成的对象放到Thead中启动即可。启动的最后,替换了进程关闭钩子(原有钩子只在程序顺利启动前起效,会删除一些ZK连接、ZK路径和HDFS文件)。

启动流程到此就为止了。这部分比较简单,希望是个良好的开端。

1.2 HRegionServer构造函数 

    代码起始位置:HRegionServer 397行。

上篇说道HRegionServer是通过反射调用构造函数,这回我们来分析一下HRegionServer构造函数都做了什么。

首先通过配置构造machineName(ip或域名),如果"hbase.regionserver.dns.interface"值为"default",则使用hadoop中的自解析域名。如果"hbase.regionserver.dns.nameserver"为"default"则把他设成null再递归地调用一次获取machineName(意义何在啊……)。如果两者都不为"default"则根据"hbase.regionserver.dns.interface"配置的网卡名获取ip。machineName获取完成后拼接配置的端口号,生成HServerAddress对象(里面要将拼接好的字符串再解开,意义何在啊……),这里会将ip再次解析成域名。

然后设置一些变量的初始值:通过this.abortRequested = false;    this.fsOk = true;猜测,文件系统不可访问时的逻辑和文件系统正常的逻辑有可能搅在了一起;根据config对象的哈希值(^config中每个Entry的哈希值)生成ServerConnection对象(实现为org.apache.hadoop.hbase.client.HConnectionManager.TableServers,它的注释为 Encapsulates finding the servers for an HBase instance ,与处理客户端请求相关);从配置中读取重试次数、responseSizeLimit等值;生成定长sleep类Sleeper的实例;生成执行master命令的Worker类实例;生成Region开启关闭处理线程池实例;生成我猜是用来处理请求的线程池PreloadThreadPool类的实例。

在众多设置变量初始值的语句中,有一行reinitialize();想必这既是重中之重了。函数中先是设置了一些变量的值,这些值的生存周期(与服务共生死)应当与构造函数中的(与进程共生死)不同:构造RPC响应服务实例server用并获得临时端口号;初始化服务状态类HServerInfo对象;重命名线程名;初始化zookeeper客户端和监听;初始化workerThread(为上述Worker类实例创建线程);初始化flush memstore线程MemStoreFlusher类实例;初始化compact与split响应类CompactSplitThread(并非一个线程)实例;向配置更新监听类ConfigurationObserver对象注册通知compactSplitThread等对象;根据配置创建多个hlog滚动线程LogRoller类实例;初始化MajorCompactionChecker类实例(封装的线程);this.leases = new Leases,似乎与请求的超时处理有关。

此处有一个对象的初始化比较有意思reservedSpace,从代码引用上和注释上看,只有在退出时会被释放。其他时间并不会用到这个变量。猜测其设计目的是为了给OOM Exception出现时预留内存,从而使进程能够有足够的内存保证正常退出。正常退出对于保存关键信息的服务或存储服务来说十分重要,非正常退出极易丢失数据。

通过这次的分析,大致能够了解HRegionServer提供服务所依赖的线程了。下次计划分析run函数,后续再逐一分析各线程和zookeeper监听。