HDFS

HDFS

Sqoop从mysql导入数据HDFS出错

回复

糖分5f1035014acd6 发起了问题 1 人关注 0 个回复 1371 次浏览 2020-07-16 19:12 来自相关话题

Sqoop从mysql导入数据HDFS出错

回复

糖分5f1035014acd6 发起了问题 1 人关注 0 个回复 1265 次浏览 2020-07-16 19:11 来自相关话题

启动HA高可用的时候,namenode和datanode启动后马上挂掉。

回复

阿陈 发起了问题 1 人关注 0 个回复 1166 次浏览 2019-05-01 16:03 来自相关话题

hadoop2.9运行自带wordcount时出错?

坏脾气先森 回复了问题 3 人关注 2 个回复 1826 次浏览 2018-08-22 10:59 来自相关话题

sqoop从mysql导入hdfs的时候,报错ERROR manager.SqlManager

Bangle 回复了问题 2 人关注 1 个回复 6021 次浏览 2018-06-03 13:14 来自相关话题

请问可以修改 hdfs文件的副本数,实现副本的增加减少吗?

银河系管理员 回复了问题 2 人关注 1 个回复 1470 次浏览 2018-02-09 10:41 来自相关话题

kafka接收很多消息,怎么写到hdfs里比较好呢 ?

回复

封尘 发起了问题 1 人关注 0 个回复 1215 次浏览 2018-02-09 10:37 来自相关话题

请问spark输出到hdfs怎样改成mapreduce的输出格式?

回复

JVMer 发起了问题 1 人关注 0 个回复 1003 次浏览 2018-02-08 16:33 来自相关话题

java读取HDFS上的文件报错

jane3von 回复了问题 2 人关注 2 个回复 3153 次浏览 2018-01-05 15:52 来自相关话题

Apache的Hadoop中HDFS方式挂载--fuse

Sam_Hadoop 回复了问题 3 人关注 2 个回复 2597 次浏览 2017-12-28 15:34 来自相关话题

正在部署多机hdfs ha+yarn,启动namenode时遇到以下错误

xinchai 回复了问题 3 人关注 2 个回复 8715 次浏览 2017-10-14 16:38 来自相关话题

hadoop fuse-hdfs挂载

脸脸 回复了问题 2 人关注 9 个回复 3041 次浏览 2017-08-18 11:26 来自相关话题

mapreduce执行程序HDFS路径找不到

fish 回复了问题 2 人关注 7 个回复 4274 次浏览 2017-08-05 23:32 来自相关话题

文件占用空间异常

fish 回复了问题 2 人关注 1 个回复 1338 次浏览 2017-06-27 18:23 来自相关话题

Hadoop MapReduce 性能优化

李志博 回复了问题 3 人关注 3 个回复 2958 次浏览 2017-06-09 16:56 来自相关话题

hadoop执行wordcount报错

fish 回复了问题 3 人关注 2 个回复 1273 次浏览 2017-05-07 13:26 来自相关话题

浏览器输入master:50070,HDFS界面打不开,本地已经配置了hosts

alexpan 回复了问题 4 人关注 4 个回复 6587 次浏览 2017-04-20 10:53 来自相关话题

条新动态, 点击查看
两种方式: 1. [size=14]java -cp target/kafka_hdfs-0.0.1-SNAPSHOT-jar-with-dependencies.jar:/etc/hadoop/conf cn.chinahadoop.kafka.hadoop... 显示全部 »
两种方式: 1. [size=14]java -cp target/kafka_hdfs-0.0.1-SNAPSHOT-jar-with-dependencies.jar:/etc/hadoop/conf cn.chinahadoop.kafka.hadoop_consumer.TestHadoopConsumer[/size] [size=14]2. java -cp target/kafka_hdfs-0.0.1-SNAPSHOT.jar:`/mnt/xkhadoop/hadoop-2.6.0-cdh5.4.0/bin/hadoop classpath` cn.chinahadoop.kafka.hadoop_consumer.TestHadoopConsumer[/size]    
那谁知道这个“垃圾清理”机制所需要的策略是什么? 可以自己根据策略通过hadoop fs -du,hadoop fs -ls,hadoop fs -rm的组合完成你的工具。
那谁知道这个“垃圾清理”机制所需要的策略是什么? 可以自己根据策略通过hadoop fs -du,hadoop fs -ls,hadoop fs -rm的组合完成你的工具。

hadoop机架感知脚本修改之后需要重启namenode么

link 回复了问题 1 人关注 4 个回复 4451 次浏览 2015-11-23 09:54 来自相关话题

Sqoop从mysql导入数据HDFS出错

回复

糖分5f1035014acd6 发起了问题 1 人关注 0 个回复 1371 次浏览 2020-07-16 19:12 来自相关话题

Sqoop从mysql导入数据HDFS出错

回复

糖分5f1035014acd6 发起了问题 1 人关注 0 个回复 1265 次浏览 2020-07-16 19:11 来自相关话题

启动HA高可用的时候,namenode和datanode启动后马上挂掉。

回复

阿陈 发起了问题 1 人关注 0 个回复 1166 次浏览 2019-05-01 16:03 来自相关话题

hadoop2.9运行自带wordcount时出错?

回复

坏脾气先森 回复了问题 3 人关注 2 个回复 1826 次浏览 2018-08-22 10:59 来自相关话题

sqoop从mysql导入hdfs的时候,报错ERROR manager.SqlManager

回复

Bangle 回复了问题 2 人关注 1 个回复 6021 次浏览 2018-06-03 13:14 来自相关话题

请问可以修改 hdfs文件的副本数,实现副本的增加减少吗?

回复

银河系管理员 回复了问题 2 人关注 1 个回复 1470 次浏览 2018-02-09 10:41 来自相关话题

kafka接收很多消息,怎么写到hdfs里比较好呢 ?

回复

封尘 发起了问题 1 人关注 0 个回复 1215 次浏览 2018-02-09 10:37 来自相关话题

请问spark输出到hdfs怎样改成mapreduce的输出格式?

回复

JVMer 发起了问题 1 人关注 0 个回复 1003 次浏览 2018-02-08 16:33 来自相关话题

java读取HDFS上的文件报错

回复

jane3von 回复了问题 2 人关注 2 个回复 3153 次浏览 2018-01-05 15:52 来自相关话题

Apache的Hadoop中HDFS方式挂载--fuse

回复

Sam_Hadoop 回复了问题 3 人关注 2 个回复 2597 次浏览 2017-12-28 15:34 来自相关话题

正在部署多机hdfs ha+yarn,启动namenode时遇到以下错误

回复

xinchai 回复了问题 3 人关注 2 个回复 8715 次浏览 2017-10-14 16:38 来自相关话题

hadoop fuse-hdfs挂载

回复

脸脸 回复了问题 2 人关注 9 个回复 3041 次浏览 2017-08-18 11:26 来自相关话题

mapreduce执行程序HDFS路径找不到

回复

fish 回复了问题 2 人关注 7 个回复 4274 次浏览 2017-08-05 23:32 来自相关话题

文件占用空间异常

回复

fish 回复了问题 2 人关注 1 个回复 1338 次浏览 2017-06-27 18:23 来自相关话题

Hadoop MapReduce 性能优化

回复

李志博 回复了问题 3 人关注 3 个回复 2958 次浏览 2017-06-09 16:56 来自相关话题

hadoop执行wordcount报错

回复

fish 回复了问题 3 人关注 2 个回复 1273 次浏览 2017-05-07 13:26 来自相关话题

浏览器输入master:50070,HDFS界面打不开,本地已经配置了hosts

回复

alexpan 回复了问题 4 人关注 4 个回复 6587 次浏览 2017-04-20 10:53 来自相关话题

Hadoop 2.0 部署单机HDFS+YARN——示例

wangxiaolei 发表了文章 0 个评论 3402 次浏览 2015-11-25 14:36 来自相关话题

准备Linux 虚拟机环境 1、用Oracle VM VirtualBox 虚拟Linux 系统 虚拟好的Linux 操作系统:ubuntu-14.04-desktop-i386 准备好:hadoop-2.2.0.tar.g ...查看全部
准备Linux 虚拟机环境
1、用Oracle VM VirtualBox 虚拟Linux 系统
虚拟好的Linux 操作系统:ubuntu-14.04-desktop-i386
准备好:hadoop-2.2.0.tar.gz jdk-6u45-linux-i586.bin

图片1.jpg


2、设置hosts 文件
对linux 虚机的hosts 进行设置sudo vi /etc/hosts
加入127.0.0.1 YARN001

图片2.jpg


3、安装jdk 和hadoop
拥有执行权限: chmod +x jdk-6u45-linux-i586.bin
解压JDK:./jdk-6u45-linux-i586.bin
解压成功后,ls 查看下

图片3.jpg


解压hadoop 的安装包:tar -zxvf hadoop-2.2.0.tar.gz
解压成功后,ls 查看下

图片4.jpg


修改hadoop 的配置文件
1、配置文件列表图例展示

图片5.jpg


2、修改hadoop-env.sh 文件
进入hadoop 目录下: cd hadoop-2.2.0/ ,输入命令vi etc/hadoop/hadoop-env.sh
也可以使用ftp 工具传输到本地,用编辑器编辑此文件。
export JAVA_HOME=/home/wangxiaolei/hadoop/jdk1.6.0_45

图片6.jpg


3、修改mapred-site.xml 文件
重命名mapred-site.xml.template 为mapred-site.xml
输入命令vi etc/hadoop/mapred-site.xml
添加下面的配置[code=Xml]

mapreduce.framework.name
yarn

[/code]4、修改core-site.xml 文件
输入命令vi etc/hadoop/core-site.xml[code=Xml]

fs.default.name
hdfs://YARN001:8020

[/code]5、修改core-site.xml 文件
输入命令vi etc/hadoop/hdfs-site.xml
注意:1、单机版副本数dfs.replication 的值默认是3 这里写为1
2、dfs.namenode.name.dir 和dfs.datanode.data.dir 的默认值,
在hadoop 安装目录下的tmp 目录下。
3、这里修改为非tmp 目录,此目录无需存在。
它是在启动hadoop 时目录是自动创建的。[code=Xml]

dfs.replication
1


dfs.namenode.name.dir
/home/wangxiaolei/hadoop/dfs/name


dfs.datanode.data.dir
/home/wangxiaolei/hadoop/dfs/data

[/code]6、修改yarn-site.xml 文件
输入命令vi etc/hadoop/yarn-site.xml[code=Xml]

yarn.nodemanager.aux-services
mapreduce_shuffle

[/code]部署HDFS+YARN
1、格式化NameNode
第一次搭建环境,需要格式化
输入命令bin/hadoop namenode -format
完成后,查看/home/wangxiaolei/hadoop 发现自动创建的目录文件

图片7.jpg


2、启动namenode
输入命令sbin/hadoop-daemon.sh start namenode

图片8.jpg


3、启动datanode
输入命令sbin/hadoop-daemon.sh start datanode

图片9.jpg


4、验证HDFS 是否启动成功
在本机上配置host
192.168.1.122 yarn001
然后在浏览器窗口输入http://yarn001:50070

图片10.jpg


bin/hadoop fs -mkdir /home
bin/hadoop fs -mkdir /home/wangxiaolei

图片11.jpg


5、启动yarn
输入命令sbin/start-yarn.sh

图片12.jpg


6、验证yarn 是否启动成功
6.1 使用jps 查看进程
这里没有配置jdk 的环境变量,所以要指定jps 的存放目录。

图片13.jpg


6.2 在web 界面查看
在本机浏览器中输入http://yarn001:8088

图片14.jpg


6.3 此时就可以跑MapReduce 程序了。
bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar pi 2 100

7、关闭yarn
输入命令sbin/stop-yarn.sh

图片15.jpg


8、关闭HDFS
输入命令sbin/stop-dfs.sh

图片16.jpg


最后使用jps 查看下进程

图片17.jpg


单机版部署HDFS+YARN 顺利完成!

Hadoop2.2.0中HDFS的高可用性实现原理

唐半张 发表了文章 0 个评论 1606 次浏览 2015-10-08 10:35 来自相关话题

在Hadoop2.0.0之前,NameNode(NN)在HDFS集群中存在单点故障(single point of failure),每一个集群中存在一个NameNode,如果NN所在的机器出现了故障,那么将导致整个集群无法利用,直到NN重启或者在另一台主机上 ...查看全部
Hadoop2.0.0之前,NameNode(NN)在HDFS集群中存在单点故障(single point of failure),每一个集群中存在一个NameNode,如果NN所在的机器出现了故障,那么将导致整个集群无法利用,直到NN重启或者在另一台主机上启动NN守护线程。
主要在两方面影响了HDFS的可用性:
(1)、在不可预测的情况下,如果NN所在的机器崩溃了,整个集群将无法利用,直到NN被重新启动;
(2)、在可预知的情况下,比如NN所在的机器硬件或者软件需要升级,将导致集群宕机。
HDFS的高可用性将通过在同一个集群中运行两个NN(active NN & standby NN)来解决上面两个问题,这种方案允许在机器破溃或者机器维护快速地启用一个新的NN来恢复故障。
在典型的HA集群中,通常有两台不同的机器充当NN。在任何时间,只有一台机器处于Active状态;另一台机器是处于Standby状态。Active NN负责集群中所有客户端的操作;而Standby NN主要用于备用,它主要维持足够的状态,如果必要,可以提供快速的故障恢复。
为了让Standby NN的状态和Active NN保持同步,即元数据保持一致,它们都将会和JournalNodes守护进程通信。当Active NN执行任何有关命名空间的修改,它需要持久化到一半以上的JournalNodes上(通过edits log持久化存储),而Standby NN负责观察edits log的变化,它能够读取从JNs中读取edits信息,并更新其内部的命名空间。一旦Active NN出现故障,Standby NN将会保证从JNs中读出了全部的Edits,然后切换成Active状态。Standby NN读取全部的edits可确保发生故障转移之前,是和Active NN拥有完全同步的命名空间状态。
为了提供快速的故障恢复,Standby NN也需要保存集群中各个文件块的存储位置。为了实现这个,集群中所有的Database将配置好Active NN和Standby NN的位置,并向它们发送块文件所在的位置及心跳,如下图所示:

Hadoop中HDFS高可用性实现
   在任何时候,集群中只有一个NN处于Active 状态是极其重要的。否则,在两个Active NN的状态下NameSpace状态将会出现分歧,这将会导致数据的丢失及其它不正确的结果。为了保证这种情况不会发生,在任何时间,JNs只允许一个NN充当writer。在故障恢复期间,将要变成Active 状态的NN将取得writer的角色,并阻止另外一个NN继续处于Active状态。
为了部署HA集群,你需要准备以下事项:
(1)、NameNode machines:运行Active NN和Standby NN的机器需要相同的硬件配置;
(2)、JournalNode machines:也就是运行JN的机器。JN守护进程相对来说比较轻量,所以这些守护进程可以可其他守护线程(比如NN,YARN ResourceManager)运行在同一台机器上。在一个集群中,最少要运行3个JN守护进程,这将使得系统有一定的容错能力。当然,你也可以运行3个以上的JN,但是为了增加系统的容错能力,你应该运行奇数个JN(3、5、7等),当运行N个JN,系统将最多容忍(N-1)/2个JN崩溃。
在HA集群中,Standby NN也执行namespace状态的checkpoints,所以不必要运行Secondary NN、CheckpointNode和BackupNode;事实上,运行这些守护进程是错误的。
 

HDFS的JavaAPI操作

唐半张 发表了文章 0 个评论 1522 次浏览 2015-10-08 10:21 来自相关话题

package hdfs; import static org.junit.Assert.fail; import java.util.Arrays; import org.apac ...查看全部
package hdfs;

import static org.junit.Assert.fail;

import java.util.Arrays;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.junit.Test;

public class TestHdfs {

@Test
public void test() {
fail("Not yet implemented");
}

//上传本地文件到HDFS
@Test
public void testUpload() throws Exception{

Configuration conf = new Configuration();
conf.addResource(new Path("D:\\myeclipse\\Hadoop\\hadoopEx\\src\\conf\\hadoop.xml"));

FileSystem hdfs = FileSystem.get(conf);
Path src = new Path("F:\\lzp\\T.txt");
Path dst = new Path("/");
hdfs.copyFromLocalFile(src, dst);

System.out.println("Upload to " + conf.get("fs.default.name"));
FileStatus files[] = hdfs.listStatus(dst);
for(FileStatus file : files){
System.out.println(file.getPath());
}
}

//创建HDFS文件
@Test
public void testCreate() throws Exception{

Configuration conf = new Configuration();
conf.addResource(new Path("D:\\myeclipse\\Hadoop\\hadoopEx\\src\\conf\\hadoop.xml"));

byte[] buff = "hello world!".getBytes();

FileSystem hdfs = FileSystem.get(conf);
Path dst = new Path("/test");
FSDataOutputStream outputStream = null;
try{
outputStream = hdfs.create(dst);
outputStream.write(buff,0,buff.length);
}catch(Exception e){
e.printStackTrace();

}finally{
if(outputStream != null){
outputStream.close();
}
}

FileStatus files[] = hdfs.listStatus(dst);
for(FileStatus file : files){
System.out.println(file.getPath());
}
}

//重命名HDFS文件
@Test
public void testRename() throws Exception{

Configuration conf = new Configuration();
conf.addResource(new Path("D:\\myeclipse\\Hadoop\\hadoopEx\\src\\conf\\hadoop.xml"));


FileSystem hdfs = FileSystem.get(conf);
Path dst = new Path("/");

Path frpath = new Path("/test");
Path topath = new Path("/test1");

hdfs.rename(frpath, topath);

FileStatus files[] = hdfs.listStatus(dst);
for(FileStatus file : files){
System.out.println(file.getPath());
}
}

//刪除HDFS文件
@Test
public void testDel() throws Exception{

Configuration conf = new Configuration();
conf.addResource(new Path("D:\\myeclipse\\Hadoop\\hadoopEx\\src\\conf\\hadoop.xml"));


FileSystem hdfs = FileSystem.get(conf);
Path dst = new Path("/");

Path topath = new Path("/test1");

boolean ok = hdfs.delete(topath,false);
System.out.println( ok ? "删除成功" : "删除失败");

FileStatus files[] = hdfs.listStatus(dst);
for(FileStatus file : files){
System.out.println(file.getPath());
}
}

//查看HDFS文件的最后修改时间
@Test
public void testgetModifyTime() throws Exception{

Configuration conf = new Configuration();
conf.addResource(new Path("D:\\myeclipse\\Hadoop\\hadoopEx\\src\\conf\\hadoop.xml"));


FileSystem hdfs = FileSystem.get(conf);
Path dst = new Path("/");

FileStatus files[] = hdfs.listStatus(dst);
for(FileStatus file : files){
System.out.println(file.getPath() +"\t" + file.getModificationTime());
}
}

//查看HDFS文件是否存在
@Test
public void testExists() throws Exception{

Configuration conf = new Configuration();
conf.addResource(new Path("D:\\myeclipse\\Hadoop\\hadoopEx\\src\\conf\\hadoop.xml"));


FileSystem hdfs = FileSystem.get(conf);
Path dst = new Path("/T.txt");

boolean ok = hdfs.exists(dst);
System.out.println( ok ? "文件存在" : "文件不存在");
}

//查看某个文件在HDFS集群的位置
@Test
public void testFileBlockLocation() throws Exception{

Configuration conf = new Configuration();
conf.addResource(new Path("D:\\myeclipse\\Hadoop\\hadoopEx\\src\\conf\\hadoop.xml"));


FileSystem hdfs = FileSystem.get(conf);
Path dst = new Path("/T.txt");

FileStatus fileStatus = hdfs.getFileStatus(dst);
BlockLocation[] blockLocations =hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
for(BlockLocation block : blockLocations){
System.out.println(Arrays.toString(block.getHosts())+ "\t" + Arrays.toString(block.getNames()));
}
}

//获取HDFS集群上所有节点名称
@Test
public void testGetHostName() throws Exception{

Configuration conf = new Configuration();
conf.addResource(new Path("D:\\myeclipse\\Hadoop\\hadoopEx\\src\\conf\\hadoop.xml"));


DistributedFileSystem hdfs = (DistributedFileSystem)FileSystem.get(conf);
DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();

for(DatanodeInfo dataNode : dataNodeStats){
System.out.println(dataNode.getHostName() + "\t" + dataNode.getName());
}
}

}

利用SQOOP将数据从数据库导入到HDFS

唐半张 发表了文章 0 个评论 1516 次浏览 2015-10-08 10:04 来自相关话题

基本使用 [size=0.9em]如下面这个shell脚本: [size=0.9em]#Oracle的连接字符串,其中包含了Oracle的地址,SID,和端口号 CONNECTURL=jdbcracle:thin20.135 ...查看全部
基本使用
[size=0.9em]如下面这个shell脚本:
[size=0.9em]#Oracle的连接字符串,其中包含了Oracle的地址,SID,和端口号
CONNECTURL=jdbcracle:thin20.135.60.21:1521WRAC2
#使用的用户名
ORACLENAME=kkaa
#使用的密码
ORACLEPASSWORD=kkaa123
#需要从Oracle中导入的表名
oralceTableName=tt
#需要从Oracle中导入的表中的字段名
columns=AREA_ID,TEAM_NAME
#将Oracle中的数据导入到HDFS后的存放路径
hdfsPath=apps/as/hive/$oralceTableName

#执行导入逻辑。将Oracle中的数据导入到HDFS中
sqoop import --append --connect $CONNECTURL --username $ORACLENAME --password $ORACLEPASSWORD --target-dir $hdfsPath  --num-mappers 1 --table $oralceTableName --columns $columns --fields-terminated-by '\001'
[size=0.9em]执行这个脚本之后,导入程序就完成了。
[size=0.9em]接下来,用户可以自己创建外部表,将外部表的路径和HDFS中存放Oracle数据的路径对应上即可。
[size=0.9em]注意:这个程序导入到HDFS中的数据是文本格式,所以在创建Hive外部表的时候,不需要指定文件的格式为RCFile,而使用默认的TextFile即可。数据间的分隔符为'\001'。如果多次导入同一个表中的数据,数据以append的形式插入到HDFS目录中。并行导入
[size=0.9em]假设有这样这个sqoop命令,需要将Oracle中的数据导入到HDFS中:
[size=0.9em]sqoop import --append --connect $CONNECTURL --username $ORACLENAME --password $ORACLEPASSWORD --target-dir $hdfsPath  --m 1 --table $oralceTableName --columns $columns --fields-terminated-by '\001'  --where "data_desc='2011-02-26'"
[size=0.9em]请注意,在这个命令中,有一个参数“-m”,代表的含义是使用多少个并行,这个参数的值是1,说明没有开启并行功能。
[size=0.9em]现在,我们可以将“-m”参数的值调大,使用并行导入的功能,如下面这个命令:
[size=0.9em]sqoop import --append --connect $CONNECTURL --username $ORACLENAME --password $ORACLEPASSWORD --target-dir $hdfsPath  --m 4 --table $oralceTableName --columns $columns --fields-terminated-by '\001'  --where "data_desc='2011-02-26'"
[size=0.9em]一般来说,Sqoop就会开启4个进程,同时进行数据的导入操作。
[size=0.9em]但是,如果从Oracle中导入的表没有主键,那么会出现如下的错误提示:
[size=0.9em]ERROR tool.ImportTool: Error during import: No primary key could be found for table creater_user.popt_cas_redirect_his. Please specify one with --split-by or perform a sequential import with '-m 1'.
[size=0.9em]在这种情况下,为了更好的使用Sqoop的并行导入功能,我们就需要从原理上理解Sqoop并行导入的实现机制。
[size=0.9em]如果需要并行导入的Oracle表的主键是id,并行的数量是4,那么Sqoop首先会执行如下一个查询:
[size=0.9em]select max(id) as max, select min(id) as min from table [where 如果指定了where子句];
[size=0.9em]通过这个查询,获取到需要拆分字段(id)的最大值和最小值,假设分别是1和1000。
[size=0.9em]然后,Sqoop会根据需要并行导入的数量,进行拆分查询,比如上面的这个例子,并行导入将拆分为如下4条SQL同时执行:
[size=0.9em]select * from table where 0 <= id < 250;
[size=0.9em]select * from table where 250 <= id < 500;
[size=0.9em]select * from table where 500 <= id < 750;
[size=0.9em]select * from table where 750 <= id < 1000;
[size=0.9em]注意,这个拆分的字段需要是整数。
[size=0.9em]从上面的例子可以看出,如果需要导入的表没有主键,我们应该如何手动选取一个合适的拆分字段,以及选择合适的并行数。
[size=0.9em]再举一个实际的例子来说明:
[size=0.9em]我们要从Oracle中导入creater_user.popt_cas_redirect_his。
[size=0.9em]这个表没有主键,所以我们需要手动选取一个合适的拆分字段。
[size=0.9em]首先看看这个表都有哪些字段:
[size=0.9em]然后,我假设ds_name字段是一个可以选取的拆分字段,然后执行下面的sql去验证我的想法:
[size=0.9em]select min(ds_name), max(ds_name) from creater_user.popt_cas_redirect_his where data_desc='2011-02-26'
[size=0.9em]发现结果不理想,min和max的值都是相等的。所以这个字段不合适作为拆分字段。
[size=0.9em]再测试一下另一个字段:CLIENTIP
select min(CLIENTIP), max(CLIENTIP) from creater_user.popt_cas_redirect_his where data_desc='2011-02-26'
[size=0.9em]这个结果还是不错的。所以我们使用CLIENTIP字段作为拆分字段。
[size=0.9em]所以,我们使用如下命令并行导入:
[size=0.9em]sqoop import --append --connect $CONNECTURL --username $ORACLENAME --password $ORACLEPASSWORD --target-dir $hdfsPath  --m 12 --split-by CLIENTIP --table $oralceTableName --columns $columns --fields-terminated-by '\001'  --where "data_desc='2011-02-26'"
[size=0.9em]这次执行这个命令,可以看到,消耗的时间为:20mins, 35sec,导入了33,222,896条数据。
[size=0.9em]另外,如果觉得这种拆分不能很好满足我们的需求,可以同时执行多个Sqoop命令,然后在where的参数后面指定拆分的规则。如:
[size=0.9em]sqoop import --append --connect $CONNECTURL --username $ORACLENAME --password $ORACLEPASSWORD --target-dir $hdfsPath  --m 1 --table $oralceTableName --columns $columns --fields-terminated-by '\001'  --where "data_desc='2011-02-26' logtime<10:00:00"
[size=0.9em]sqoop import --append --connect $CONNECTURL --username $ORACLENAME --password $ORACLEPASSWORD --target-dir $hdfsPath  --m 1 --table $oralceTableName --columns $columns --fields-terminated-by '\001'  --where "data_desc='2011-02-26' logtime>=10:00:00"
[size=0.9em]从而达到并行导入的目的。

Flume进阶-将日志文件写入HDFS

唐半张 发表了文章 0 个评论 2644 次浏览 2015-10-06 10:15 来自相关话题

配置将指定目录下的日志文件写入HDFS 步骤 1:通过hadoop创建目录/flume/log 2:复制hadoop-core-1.1.1.jar到flume/lib 3:配置一个hdfs.conf如下 ...查看全部
配置将指定目录下的日志文件写入HDFS
步骤
1:通过hadoop创建目录/flume/log
2:复制hadoop-core-1.1.1.jar到flume/lib
3:配置一个hdfs.conf如下
agent1.sources = spooldirSource
agent1.channels = memoryChannel
agent1.sinks = hdfsSink

agent1.sources.spooldirSource.type=spooldir
agent1.sources.spooldirSource.spoolDir=/tmp/flume
agent1.sources.spooldirSource.channels=memoryChannel

agent1.sinks.hdfsSink.type=hdfs
agent1.sinks.hdfsSink.hdfs.path=hdfs://pg2:9000/flume/log
agent1.sinks.hdfsSink.filePrefix=log-
agent1.sinks.hdfsSink.channel=memoryChannel

agent1.channels.memoryChannel.type=memory
agent1.channels.memoryChannel.capacity=100
4:启动
[root@pg1 apache-flume-1.4.0-bin]# ./bin/flume-ng agent -n agent1 -c conf -f conf/hdfs.conf                                                                                                                                                   Info: Sourcing environment configuration script /flume/apache-flume-1.4.0-bin/conf/flume-env.sh
Info: Including Hadoop libraries found via (/hadoop/hadoop-1.1.1/bin/hadoop) for HDFS access
Info: Excluding /hadoop/hadoop-1.1.1/libexec/../lib/slf4j-api-1.4.3.jar from classpath
Info: Excluding /hadoop/hadoop-1.1.1/libexec/../lib/slf4j-log4j12-1.4.3.jar from classpath
Info: Including HBASE libraries found via (/hbase/hbase-0.94.4/bin/hbase) for HBASE access
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flume/tools/GetJavaProperty
Caused by: java.lang.ClassNotFoundException: org.apache.flume.tools.GetJavaProperty
 at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
Could not find the main class: org.apache.flume.tools.GetJavaProperty.  Program will exit.
Info: Excluding /hbase/hbase-0.94.4/lib/slf4j-api-1.4.3.jar from classpath
Info: Excluding /hbase/hbase-0.94.4/lib/slf4j-log4j12-1.4.3.jar from classpath
Info: Excluding /hadoop/hadoop-1.1.1/libexec/../lib/slf4j-api-1.4.3.jar from classpath
Info: Excluding /hadoop/hadoop-1.1.1/libexec/../lib/slf4j-log4j12-1.4.3.jar from classpath
 
报错内容org/apache/flume/tools/GetJavaProperty在该包中。
flume-ng-core-1.4.0.jar
5:测试
[root@pg1 conf]# cd /tmp/flume
[root@pg1 flume]# ls
[root@pg1 flume]# echo "Test hello flume write data to hdfs">test123.txt
[root@pg1 flume]# ls
test123.txt.COMPLETED 
[root@pg1 flume]#
可以看到已经被处理
6:查看Flume日志 $FLUME_HOME/logs下
2013 08:19:42,859 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:145)  - Starting Channel memoryChannelemoryChannel type memory
2013 08:19:42,916 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:110)  - Monitoried counter group for type: CHANNEL, name: memoryChannel, registered successfully.
2013 08:19:42,917 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:94)  - Component type: CHANNEL, name: memoryChannel started
2013 08:19:42,917 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:173)  - Starting Sink hdfsSinkdfsSink, type: hdfs
2013 08:19:42,918 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:184)  - Starting Source spooldirSourcee
2013 08:19:42,921 INFO  [lifecycleSupervisor-1-1] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:110)  - Monitoried counter group for type: SINK, name: hdfsSink, registered successfully.
2013 08:19:42,921 INFO  [lifecycleSupervisor-1-1] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:94)  - Component type: SINK, name: hdfsSink startedce=EventDrivenSourceRunner: { source:org.apache.flume.source.Spooll
7:查看HDFS文件

Sqoop从Oracle导入到HDFS

唐半张 发表了文章 0 个评论 2623 次浏览 2015-09-30 10:57 来自相关话题

#Oracle的连接字符串,其中包含了Oracle的地址,SID,和端口号 CONNECTURL=jdbcracle:thin20.135.60.21:1521WRAC2 #使用的用户名 ORACLENAME=kkaa ...查看全部
#Oracle的连接字符串,其中包含了Oracle的地址,SID,和端口号
CONNECTURL=jdbcracle:thin20.135.60.21:1521WRAC2
#使用的用户名
ORACLENAME=kkaa
#使用的密码
ORACLEPASSWORD=kkaa123
#需要从Oracle中导入的表名
oralceTableName=tt
#需要从Oracle中导入的表中的字段名
columns=AREA_ID,TEAM_NAME
#将Oracle中的数据导入到HDFS后的存放路径
hdfsPath=apps/as/hive/$oralceTableName

#执行导入逻辑。将Oracle中的数据导入到HDFS中
sqoop import --append --connect $CONNECTURL --username $ORACLENAME --password $ORACLEPASSWORD --target-dir $hdfsPath  --num-mappers 1 --table $oralceTableName --columns $columns --fields-terminated-by '\001'
执行这个脚本之后,导入程序就完成了。 
接下来,用户可以自己创建外部表,将外部表的路径和HDFS中存放Oracle数据的路径对应上即可。 
注意:这个程序导入到HDFS中的数据是文本格式,所以在创建Hive外部表的时候,不需要指定文件的格式为RCFile,而使用默认的TextFile即可。数据间的分隔符为'\001'。如果多次导入同一个表中的数据,数据以append的形式插入到HDFS目录中。

Flume-ng HDFS sink原理解析

唐半张 发表了文章 0 个评论 2126 次浏览 2015-09-30 10:18 来自相关话题

HDFS sink主要处理过程在process方法: //循环batchSize次或者Channel为空 for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount+ ...查看全部
HDFS sink主要处理过程在process方法:
//循环batchSize次或者Channel为空
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
//该方法会调用BasicTransactionSemantics的具体实现
Event event = channel.take();
if (event == null) {
break;
}
......
//sfWriter是一个LRU缓存,缓存对文件Handler,最大打开文件由参数maxopenfiles控制
BucketWriter bucketWriter = sfWriters.get(lookupPath);
// 如果不存在,则构造一个缓存
if (bucketWriter == null) {
//通过HDFSWriterFactory根据filetype生成一个hdfswriter,由参数hdfs.Filetype控制;eg:HDFSDataStream
HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);
//idleCallback会在bucketWriter flush完毕后从LRU中删除;
bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
suffix, codeC, compType, hdfsWriter, timedRollerPool,
proxyTicket, sinkCounter, idleTimeout, idleCallback,
lookupPath, callTimeout, callTimeoutPool);
sfWriters.put(lookupPath, bucketWriter);
}
......
// track一个事务内的bucket
if (!writers.contains(bucketWriter)) {
writers.add(bucketWriter);
}
// 写数据到HDFS;
bucketWriter.append(event);->
open();//如果底层支持append,则通过open接口打开;否则create接口
//判断是否进行日志切换
//根据复制的副本书和目标副本数做对比,如果不满足则doRotate=false
if (doRotate) {
close();
open();
}
HDFSWriter.append(event);
if (batchCounter == batchSize) {//如果达到batchSize行进行一次flush
flush();->
doFlush()->
HDFSWriter.sync()->
FSDataoutputStream.flush/sync
}
// 提交事务之前,刷新所有的bucket
for (BucketWriter bucketWriter : writers) {
bucketWriter.flush();
}
transaction.commit();
这里,无论是BucketWriter执行append,sync还是rename等操作都是提交到一个后台线程池进行异步处理:callWithTimeout,这个线程池的大小是由hdfs.threadsize来设置;

用sqoop进行mysql和hdfs系统间的数据互导

唐半张 发表了文章 0 个评论 2131 次浏览 2015-09-30 09:39 来自相关话题

下载: http://www.apache.org/dyn/closer.cgi/sqoop/ [zhouhh@Hadoop48 ~]$ wget http://labs.renren.com/apache-mirror/sqoop/1 ...查看全部
下载:
http://www.apache.org/dyn/closer.cgi/sqoop/
[zhouhh@Hadoop48 ~]$ wget http://labs.renren.com/apache-mirror/sqoop/1.4.1-incubating/sqoop-1.4.1-incubating__hadoop-1.0.0.tar.gz
最新用户手册
http://sqoop.apache.org/docs/1.4.1-incubating/SqoopUserGuide.html
 
一、从HBase库中直接导出到mysql中?
一开始我想从HBase库中直接导出到mysql中。
在mysql中创建一个库和表
mysql> create database toplists;
Query OK, 1 row affected (0.06 sec)
mysql> use toplists
Database changed
mysql> create table t1(id int not null primary key, name varchar(255),value int);
Query OK, 0 rows affected (0.10 sec)

hbase(main):011:0> scan 't1'
ROW COLUMN+CELL
1001 column=info:count, timestamp=1340265059531, value=724988
1009 column=info:count, timestamp=1340265059533, value=108051
...
total column=info:count, timestamp=1340265059534, value=833039
total_user_count column=info:, timestamp=1340266656307, value=154516
11 row(s) in 0.0420 seconds

[zhouhh@Hadoop48 ~]$ sqoop list-tables --connect jdbc:mysql://localhost/toplists --username root
java.lang.RuntimeException: Could not load db driver class: com.mysql.jdbc.Driver
at org.apache.sqoop.manager.SqlManager.makeConnection(SqlManager.java:657)
at org.apache.sqoop.manager.GenericJdbcManager.getConnection(GenericJdbcManager.java:52)
at org.apache.sqoop.manager.SqlManager.execute(SqlManager.java:473)
at org.apache.sqoop.manager.SqlManager.execute(SqlManager.java:496)
at org.apache.sqoop.manager.SqlManager.getColumnTypesForRawQuery(SqlManager.java:194)
at org.apache.sqoop.manager.SqlManager.getColumnTypes(SqlManager.java:178)
at org.apache.sqoop.manager.ConnManager.getColumnTypes(ConnManager.java:114)
at org.apache.sqoop.orm.ClassWriter.getColumnTypes(ClassWriter.java:1235)
at org.apache.sqoop.orm.ClassWriter.generate(ClassWriter.java:1060)
at org.apache.sqoop.tool.CodeGenTool.generateORM(CodeGenTool.java:82)
at org.apache.sqoop.tool.ExportTool.exportTable(ExportTool.java:64)
at org.apache.sqoop.tool.ExportTool.run(ExportTool.java:97)
at org.apache.sqoop.Sqoop.run(Sqoop.java:145)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:181)
at org.apache.sqoop.Sqoop.runTool(Sqoop.java:220)
at org.apache.sqoop.Sqoop.runTool(Sqoop.java:229)
at org.apache.sqoop.Sqoop.main(Sqoop.java:238)
at com.cloudera.sqoop.Sqoop.main(Sqoop.java:57)

需下载 MySQL JDBC Connector 库,并将其复制到$SQOOP_HOME/lib
下载mysql jdbc连接库
地址:http://www.mysql.com/downloads/connector/j/
[zhouhh@Hadoop48 ~]$ wget http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.21.tar.gz/from/http://cdn.mysql.com/
[zhouhh@Hadoop48 mysql-connector-java-5.1.21]$ cp mysql-connector-java-5.1.21-bin.jar ../sqoop/lib/.
[zhouhh@Hadoop48 ~]$ sqoop list-tables --connect jdbc:mysql://localhost/toplists --username root
t1

[zhouhh@Hadoop48 ~]$ sqoop-export --connect jdbc:mysql://localhost/toplists --username root --table t1 --export-dir /hbase

java.io.IOException: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at org.apache.sqoop.mapreduce.ExportOutputFormat.getRecordWriter(ExportOutputFormat.java:79)
at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.(MapTask.java:628)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:753)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

这是可能由jdbc版本引起的,换成5.1.18
[zhouhh@Hadoop48 ~]$ sqoop-export --connect jdbc:mysql://localhost:3306/toplists --username root --table t1 --export-dir /hbase

Error initializing attempt_201206271529_0006_r_000000_0:
org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for ttprivate/taskTracker/zhouhh/jobcache/job_201206271529_0006/jobToken
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
at org.apache.hadoop.mapred.TaskTracker.localizeJobTokenFile(TaskTracker.java:4271)
at org.apache.hadoop.mapred.TaskTracker.initializeJob(TaskTracker.java:1177)
at org.apache.hadoop.mapred.TaskTracker.localizeJob(TaskTracker.java:1118)
at org.apache.hadoop.mapred.TaskTracker$5.run(TaskTracker.java:2430)
at java.lang.Thread.run(Thread.java:722)

DiskErrorException ,定位半天,发现是另一台机器的空间满了,在mapreduce运行时会引起该异常。
[zhouhh@Hadoop46 ~]$ df
Filesystem 1K-blocks Used Available Use% Mounted on
/dev/sda3 28337624 26877184 0 100% /
[zhouhh@Hadoop48 ~]$ sqoop-export --connect jdbc:mysql://192.168.10.48:3306/toplists --username root --table t1 --export-dir /hbase
Caused by: java.sql.SQLException: null, message from server: "Host 'Hadoop47' is not allowed to connect to this MySQL server"

这是权限问题,设置授权:
mysql> GRANT ALL PRIVILEGES ON *.* TO '%'@'%';#允许所有用户查看和修改databaseName数据库模式的内容,否则别的IP连不上本MYSQL
Query OK, 0 rows affected (0.06 sec)

这是测试,所以权限没有限制。实际工作环境需谨慎授权。
[zhouhh@Hadoop48 ~]$ sqoop-export --connect jdbc:mysql://192.168.10.48:3306/toplists --username root --table t1 --export-dir /hbase
Note: /tmp/sqoop-zhouhh/compile/fa1d1c042030b0ec8537c7a4cd02aab3/t1.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
java.lang.NumberFormatException: For input string: "7"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:481)
at java.lang.Integer.valueOf(Integer.java:582)
at t1.__loadFromFields(t1.java:218)
at t1.parse(t1.java:170)
at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:77)
at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:36)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.sqoop.mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:183)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:249)

这是由于/hbase是hbase的库表,根本不是可以导的格式,所以报错。
[zhouhh@Hadoop48 ~]$ sqoop-export --connect jdbc:mysql://192.168.10.48:3306/toplists --username root --table t1 --export-dir /hbase/t1
[zhouhh@Hadoop48 ~]$ sqoop-export --verbose --connect jdbc:mysql://192.168.10.48:3306/toplists --username root --table t1 --update-key id --input-fields-terminated-by '\t' --export-dir /hbase/t1
Note: /tmp/sqoop-zhouhh/compile/8ce6556eb13b3000550a9c864eaa6820/t1.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
[zhouhh@Hadoop48 ~]$

但将导出目录指到/hbase/t1表中,导出不会报错,而mysql中没有数据。后面才了解到,sqoop没有直接从hbase中将表导出到mysql的办法。必须先将hbase导出成平面文件,或者导出到hive中,才可以用sqoop将数据导出到mysql。
 二、从mysql中导到hdfs。
创建mysql表,将其导入到hdfs
mysql> create table test(id int not null primary key auto_increment,name varchar(64) not null,price decimal(10,2), cdate date,version int,comment varchar(255));
Query OK, 0 rows affected (0.10 sec)
mysql> insert into test values(null,'iphone',3900.00,'2012-7-18',1,'8g');
Query OK, 1 row affected (0.04 sec)
mysql> insert into test values(null,'ipad',3200.00,'2012-7-16',2,'16g');
Query OK, 1 row affected (0.00 sec)
mysql> select * from test;
+----+--------+---------+------------+---------+---------+
| id | name | price | cdate | version | comment |
+----+--------+---------+------------+---------+---------+
| 1 | iphone | 3900.00 | 2012-07-18 | 1 | 8g |
| 2 | ipad | 3200.00 | 2012-07-16 | 2 | 16g |
+----+--------+---------+------------+---------+---------+
2 rows in set (0.00 sec)

导入:
[zhouhh@Hadoop48 ~]$ sqoop import --connect jdbc:mysql://Hadoop48/toplists --table test -m 1
java.lang.RuntimeException: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Access denied for user ''@'Hadoop48' to database 'toplists'
at org.apache.sqoop.manager.CatalogQueryManager.getColumnNames(CatalogQueryManager.java:162)

给空用户授权
mysql> GRANT ALL PRIVILEGES ON *.* TO ''@'%';
[zhouhh@Hadoop48 ~]$ sqoop import --connect jdbc:mysql://Hadoop48/toplists --username root --table test -m 1

12/07/18 11:10:16 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
12/07/18 11:10:16 INFO tool.CodeGenTool: Beginning code generation
12/07/18 11:10:16 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `index_mapping` AS t LIMIT 1
12/07/18 11:10:16 INFO orm.CompilationManager: HADOOP_HOME is /home/zhoulei/hadoop-1.0.0/libexec/..
注: /tmp/sqoop-zhoulei/compile/2b04bdabb7043e4f75b215d72f65388e/index_mapping.java使用或覆盖了已过时的 API。
注: 有关详细信息, 请使用 -Xlint:deprecation 重新编译。
12/07/18 11:10:18 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-zhoulei/compile/2b04bdabb7043e4f75b215d72f65388e/index_mapping.jar
12/07/18 11:10:18 WARN manager.MySQLManager: It looks like you are importing from mysql.
12/07/18 11:10:18 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
12/07/18 11:10:18 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
12/07/18 11:10:18 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
12/07/18 11:10:25 INFO mapreduce.ImportJobBase: Beginning import of index_mapping
12/07/18 11:10:26 INFO mapred.JobClient: Running job: job_201207101344_0519
12/07/18 11:10:27 INFO mapred.JobClient: map 0% reduce 0%
12/07/18 11:10:40 INFO mapred.JobClient: map 100% reduce 0%
12/07/18 11:10:45 INFO mapred.JobClient: Job complete: job_201207101344_0519
12/07/18 11:10:45 INFO mapred.JobClient: Counters: 18
12/07/18 11:10:45 INFO mapred.JobClient: Job Counters
12/07/18 11:10:45 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=12083
12/07/18 11:10:45 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
12/07/18 11:10:45 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
12/07/18 11:10:45 INFO mapred.JobClient: Launched map tasks=1
12/07/18 11:10:45 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0
12/07/18 11:10:45 INFO mapred.JobClient: File Output Format Counters
12/07/18 11:10:45 INFO mapred.JobClient: Bytes Written=28
12/07/18 11:10:45 INFO mapred.JobClient: FileSystemCounters
12/07/18 11:10:45 INFO mapred.JobClient: HDFS_BYTES_READ=87
12/07/18 11:10:45 INFO mapred.JobClient: FILE_BYTES_WRITTEN=30396
12/07/18 11:10:45 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=28
12/07/18 11:10:45 INFO mapred.JobClient: File Input Format Counters
12/07/18 11:10:45 INFO mapred.JobClient: Bytes Read=0
12/07/18 11:10:45 INFO mapred.JobClient: Map-Reduce Framework
12/07/18 11:10:45 INFO mapred.JobClient: Map input records=2
12/07/18 11:10:45 INFO mapred.JobClient: Physical memory (bytes) snapshot=79167488
12/07/18 11:10:45 INFO mapred.JobClient: Spilled Records=0
12/07/18 11:10:45 INFO mapred.JobClient: CPU time spent (ms)=340
12/07/18 11:10:45 INFO mapred.JobClient: Total committed heap usage (bytes)=56623104
12/07/18 11:10:45 INFO mapred.JobClient: Virtual memory (bytes) snapshot=955785216
12/07/18 11:10:45 INFO mapred.JobClient: Map output records=2
12/07/18 11:10:45 INFO mapred.JobClient: SPLIT_RAW_BYTES=87
12/07/18 11:10:45 INFO mapreduce.ImportJobBase: Transferred 28 bytes in 20.2612 seconds (1.382 bytes/sec)
12/07/18 11:10:45 INFO mapreduce.ImportJobBase: Retrieved 2 records.

检查数据是否导入
[zhouhh@Hadoop48 ~]$ fs -cat /user/zhouhh/test/part-m-00000
1,iphone,3900.00,2012-07-18,1,8g
2,ipad,3200.00,2012-07-16,2,16g

[zhouhh@Hadoop48 ~]$ fs -cat test/part-m-00000
1,iphone,3900.00,2012-07-18,1,8g
2,ipad,3200.00,2012-07-16,2,16g

 三、从hdfs导出到mysql
清空表
mysql> delete from test;
Query OK, 2 rows affected (0.00 sec)

mysql> select * from test;
Empty set (0.00 sec)

导出
[zhouhh@Hadoop48 ~]$ sqoop-export --connect jdbc:mysql://192.168.10.48:3306/toplists --username root --table test --export-dir test
Note: /tmp/sqoop-zhouhh/compile/7adaaa7ffe5f49ed9d794b1be8a9a983/test.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.

导出时,–connect,–table, –export-dir是必须设置的。其中toplists是库名,–table是该库下的表名。 –export-dir是要导出的HDFS平面文件位置。如果不是绝对路径,指/user/username/datadir
检查mysql表
mysql> select * from test;
+----+--------+---------+------------+---------+---------+
| id | name | price | cdate | version | comment |
+----+--------+---------+------------+---------+---------+
| 1 | iphone | 3900.00 | 2012-07-18 | 1 | 8g |
| 2 | ipad | 3200.00 | 2012-07-16 | 2 | 16g |
+----+--------+---------+------------+---------+---------+
2 rows in set (0.00 sec)

可见导出成功。
 四、不执行mapreduce,但生成导入代码
[zhouhh@Hadoop48 ~]$ sqoop codegen --connect jdbc:mysql://192.168.10.48:3306/toplists --username root --table test --class-name Mycodegen
Note: /tmp/sqoop-zhouhh/compile/104b871487669b89dcd5b9b2c61f905f/Mycodegen.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.

[zhouhh@Hadoop48 ~]$ sqoop help codegen
usage: sqoop codegen [GENERIC-ARGS] [TOOL-ARGS]

sqoop导入时,可以加选择语句,以过滤和综合多表,用–query.也可以只加条件,用–where。这样可以不必每次导入整张表。 如 –where ‘id > 1000′
示例,采用join选择多表数据:
sqoop import –query ‘select a.*,b.* from a join b on (a.id == b.id) where $conditions’ -m 1 –target-dir /usr/foo/joinresults
 五、将mysql表导入到HBase
虽然目前,sqoop没有将HBase直接导入mysql的办法,但将mysql直接导入HBase是可以的。需指定–hbase-table,用–hbase-create-table来自动在HBase中创建表。–column-family指定列族名。–hbase-row-key指定rowkey对应的mysql的键。
[zhouhh@Hadoop48 ~]$ sqoop import –connect jdbc:mysql://Hadoop48/toplists –table test –hbase-table a –column-family name –hbase-row-key id –hbase-create-table –username ‘root’
检查hbase被导入的表:
hbase(main):002:0> scan 'a'
ROW COLUMN+CELL
1 column=name:cdate, timestamp=1342601695952, value=2012-07-18
1 column=name:comment, timestamp=1342601695952, value=8g
1 column=name:name, timestamp=1342601695952, value=iphone
1 column=name:price, timestamp=1342601695952, value=3900.00
1 column=name:version, timestamp=1342601695952, value=1
2 column=name:cdate, timestamp=1342601695952, value=2012-07-16
2 column=name:comment, timestamp=1342601695952, value=16g
2 column=name:name, timestamp=1342601695952, value=ipad
2 column=name:price, timestamp=1342601695952, value=3200.00
2 column=name:version, timestamp=1342601695952, value=2
2 row(s) in 0.2370 seconds

关于导入的一致性:建议停止mysql表的写入再导入到HDFS或HIVE,否则,mapreduce可能会丢失新增的数据。
关于效率:mysql直接模式(–direct)导入的方式效率高。但不支持大对象数据,类型为CLOB或BLOB的列。用JDBC效率较低,但有专用API可以支持CLOB及BLOB。六、从HBase导出数据到Mysql
目前没有直接的导出命令。但有两个方法可以将HBase数据导出到mysql。
其一,将HBase导出成HDFS平面文件,再导出到mysql.
其二,将HBase数据导出到HIVE,再导出到mysql,参见后续文章《从hive将数据导出到mysql

HDFS体系结构简介及优缺点

唐半张 发表了文章 0 个评论 2015 次浏览 2015-09-29 10:49 来自相关话题

1      HDFS体系结构简介及优缺点 1.1体系结构简介           HDFS是一个主/从(Mater/Slave)体系结构,从最终用户的角度来看,它就像传统的文件系统一样,可以通过目录路径对文件执行CRUD(Create ...查看全部
1      HDFS体系结构简介及优缺点
1.1体系结构简介
          HDFS是一个主/从(Mater/Slave)体系结构,从最终用户的角度来看,它就像传统的文件系统一样,可以通过目录路径对文件执行CRUD(Create、Read、Update和Delete)操作。但由于分布式存储的性质,HDFS集群拥有一个NameNode和一些DataNode。NameNode管理文件系统的元数据,DataNode存储实际的数据。客户端通过同NameNode和DataNodes的交互访问文件系统。客户端联系NameNode以获取文件的元数据,而真正的文件I/O操作是直接和DataNode进行交互的。
1.1.1       NameNode 
          NameNode可以看作是分布式文件系统中的管理者,主要负责管理文件系统的命名空间、集群配置信息和存储块的复制等。NameNode会将文件系统的Meta-data存储在内存中,这些信息主要包括了文件信息、每一个文件对应的文件块的信息和每一个文件块在DataNode的信息等。l  Masterl  管理HDFS的名称空间l  管理数据块映射信息l  配置副本策略l  处理客户端读写请求
  1.1.2       Secondary namenode 
     并非NameNode的热备;
     辅助NameNode,分担其工作量;
     定期合并fsimage和fsedits,推送给NameNode;
     在紧急情况下,可辅助恢复NameNode。

1.1.3       DataNode 
      DataNode是文件存储的基本单元,它将Block存储在本地文件系统中,保存了Block的Meta-data,同时周期性地将所有存在的Block信息发送给NameNode。 
       Slavel  
       存储实际的数据块
       执行数据块读/写

1.1.4       Client     
       文件切分
      与NameNode交互,获取文件位置信息;
      与DataNode交互,读取或者写入数据;
      管理HDFS;
      访问HDFS。 
1.1.5       文件写入
     1)  Client向NameNode发起文件写入的请求。
     2)  NameNode根据文件大小和文件块配置情况,返回给Client它所管理部分DataNode的信息。
     3)  Client将文件划分为多个Block,根据DataNode的地址信息,按顺序写入到每一个DataNode块中。 
1.1.6       文件读取
     1)  Client向NameNode发起文件读取的请求。
     2)  NameNode返回文件存储的DataNode的信息。
     3)  Client读取文件信息。 
         HDFS典型的部署是在一个专门的机器上运行NameNode,集群中的其他机器各运行一个DataNode;也可以在运行NameNode的机器上同时运行DataNode,或者一台机器上运行多个DataNode。一个集群只有一个NameNode的设计大大简化了系统架构。 
1.2优点 
1.2.1       处理超大文件
            这里的超大文件通常是指百MB、设置数百TB大小的文件。目前在实际应用中,HDFS已经能用来存储管理PB级的数据了。 
1.2.2       流式的访问数据
           HDFS的设计建立在更多地响应"一次写入、多次读写"任务的基础上。这意味着一个数据集一旦由数据源生成,就会被复制分发到不同的存储节点中,然后响应各种各样的数据分析任务请求。在多数情况下,分析任务都会涉及数据集中的大部分数据,也就是说,对HDFS来说,请求读取整个数据集要比读取一条记录更加高效。 
1.2.3       运行于廉价的商用机器集群上
           Hadoop设计对硬件需求比较低,只须运行在低廉的商用硬件集群上,而无需昂贵的高可用性机器上。廉价的商用机也就意味着大型集群中出现节点故障情况的概率非常高。这就要求设计HDFS时要充分考虑数据的可靠性,安全性及高可用性。 
1.3     缺点 
1.3.1       不适合低延迟数据访问
           如果要处理一些用户要求时间比较短的低延迟应用请求,则HDFS不适合。HDFS是为了处理大型数据集分析任务的,主要是为达到高的数据吞吐量而设计的,这就可能要求以高延迟作为代价。
          改进策略:对于那些有低延时要求的应用程序,HBase是一个更好的选择。通过上层数据管理项目来尽可能地弥补这个不足。在性能上有了很大的提升,它的口号就是goes real time。使用缓存或多master设计可以降低client的数据请求压力,以减少延时。还有就是对HDFS系统内部的修改,这就得权衡大吞吐量与低延时了,HDFS不是万能的银弹。 
1.3.2       无法高效存储大量小文件 
            因为Namenode把文件系统的元数据放置在内存中,所以文件系统所能容纳的文件数目是由Namenode的内存大小来决定。一般来说,每一个文件、文件夹和Block需要占据150字节左右的空间,所以,如果你有100万个文件,每一个占据一个Block,你就至少需要300MB内存。当前来说,数百万的文件还是可行的,当扩展到数十亿时,对于当前的硬件水平来说就没法实现了。还有一个问题就是,因为Map task的数量是由splits来决定的,所以用MR处理大量的小文件时,就会产生过多的Maptask,线程管理开销将会增加作业时间。举个例子,处理10000M的文件,若每个split为1M,那就会有10000个Maptasks,会有很大的线程开销;若每个split为100M,则只有100个Maptasks,每个Maptask将会有更多的事情做,而线程的管理开销也将减小很多。
          改进策略:要想让HDFS能处理好小文件,有不少方法。利用SequenceFile、MapFile、Har等方式归档小文件,这个方法的原理就是把小文件归档起来管理,HBase就是基于此的。对于这种方法,如果想找回原来的小文件内容,那就必须得知道与归档文件的映射关系。横向扩展,一个Hadoop集群能管理的小文件有限,那就把几个Hadoop集群拖在一个虚拟服务器后面,形成一个大的Hadoop集群。google也是这么干过的。多Master设计,这个作用显而易见了。正在研发中的GFS II也要改为分布式多Master设计,还支持Master的Failover,而且Block大小改为1M,有意要调优处理小文件啊。附带个Alibaba DFS的设计,也是多Master设计,它把Metadata的映射存储和管理分开了,由多个Metadata存储节点和一个查询Master节点组成。 
1.3.3       不支持多用户写入及任意修改文件
             在HDFS的一个文件中只有一个写入者,而且写操作只能在文件末尾完成,即只能执行追加操作。目前HDFS还不支持多个用户对同一文件的写操作,以及在文件任意位置进行修改。

关于HDFS副本放置策略

唐半张 发表了文章 0 个评论 1675 次浏览 2015-09-29 10:42 来自相关话题

         副本的存放是HDFS可靠性和性能的关键. 优化的副本存放策略是HDFS区分于其他大部分分布式文件系统的重要特性. 这种特性需要做大量的调优, 并需要经验的积累. HDFS采用一种称为机架感知(rack-aware)的策略来改进数据的可靠性、可 ...查看全部
         副本的存放是HDFS可靠性和性能的关键优化的副本存放策略是HDFS区分于其他大部分分布式文件系统的重要特性这种特性需要做大量的调优并需要经验的积累. HDFS采用一种称为机架感知(rack-aware)的策略来改进数据的可靠性、可用性和网络带宽的利用率目前实现的副本存放策略只是在这个方向上的第一步实现这个策略的短期目标是验证它在生产环境下的有效性观察它的行为为实现更先进的策略打下测试和研究的基础大型HDFS实例一般运行在跨越多个机架的计算机组成的集群上不同机架上的两台机器之间的通讯需要经过交换机在大多数情况下同一个机架内的两台机器间的带宽会比不同机架的两台机器间的带宽大.
  
         通过一个机架感知的过程, Namenode可以确定每个Datanode所属的机架id. 一个简单但没有优化的策略就是将副本存放在不同的机架上这样可以有效防止当整个机架失效时数据的丢失并且允许读数据的时候充分利用多个机架的带宽这种策略设置可以将副本均匀分布在集群中有利于当组件失效情况下的负载均衡但是因为这种策略的一个写操作需要传输数据块到多个机架这增加了写的代价.
  
         
在大多数情况下副本系数默认为3, HDFS的存放策略是将第一个副本存放和客户端相同的节点上(如果该客户端不在Hadoop集群中,则任选一机架的节点放置第一个副本), 第二个副本放在和第一个副本不同机架的任一节点上,最后一个副本放在和第二个副本相同机架的不同节点上这种策略减少了机架间的数据传输这就提高了写操作的效率机架的错误远远比节点的错误少所以这个策略不会影响到数据的可靠性和可用性与此同时因为数据块只放在两个(不是三个)不同的机架上所以此策略减少了读取数据时需要的网络传输总带宽在这种策略下副本并不是均匀分布在不同的机架 三分之一的副本在一个节点上三分之二的副本在一个机架上其他副本均匀分布在剩下的机架中这一策略在不损害数据可靠性和读取性能的情况下改进了写的性能.

HDFS操作可视化的问题

唐半张 发表了文章 0 个评论 1863 次浏览 2015-09-28 10:40 来自相关话题

在安装好Hadoop集群之后,为验证HDFS文件相关操作功能,本人对目前常用的三种方法进行了验证,给出自己的实验环境,一些感受和想法。(1)搭建的实验环境: Hadoop 版本:Apache 0.20.2。 Eclipse版本:3.5. ...查看全部
在安装好Hadoop集群之后,为验证HDFS文件相关操作功能,本人对目前常用的三种方法进行了验证,给出自己的实验环境,一些感受和想法。(1)搭建的实验环境:
Hadoop 版本:Apache 0.20.2。
Eclipse版本:3.5.2。
Java版本:OpenJDK 1.6.0_17。

(2)感受
向HDFS上传文件,可以采用shell命令,eclipse Plugin和调用HDFS API等方法。其中shell命令方式最直接的方法,上手是最快的,适合对linux比较熟悉的人员使用。而通过在Eclipse中使用hadoop-eclipse-plugin操作文件系统简单易行,用户界面友好,但效率一般。调用HDFS API效率最高,但是环境配置比较复杂,本人尝试使用Java 调用HDFS API,在Qt中调用 C libhdfs等,在花了大量的时间后,可以实现,但感觉使用还是比较复杂,因此我就想有没有一种比较好的HDFS可视化的方法,

经过搜索,发现有实现的:http://blog.csdn.net/aperson111/article/details/8056019,但是还是限于比较简单的界面。

HDFS的运行原理

唐半张 发表了文章 0 个评论 1593 次浏览 2015-09-28 10:25 来自相关话题

简介 HDFS(Hadoop Distributed File System )Hadoop分布式文件系统。是根据google发表的论文翻版的。论文为GFS(Google File System)Google 文件系统(中文,英文)。 ...查看全部
简介
HDFS(Hadoop Distributed File System )Hadoop分布式文件系统。是根据google发表的论文翻版的。论文为GFS(Google File System)Google 文件系统(中文英文)。
HDFS有很多特点
    ① 保存多个副本,且提供容错机制,副本丢失或宕机自动恢复。默认存3份。
    ② 运行在廉价的机器上。
    ③ 适合大数据的处理。多大?多小?HDFS默认会将文件分割成block,64M为1个block。然后将block按键值对存储在HDFS上,并将键值对的映射存到内存中。如果小文件太多,那内存的负担会很重。

如上图所示,HDFS也是按照Master和Slave的结构。分NameNode、SecondaryNameNode、DataNode这几个角色。
NameNode:是Master节点,是大领导。管理数据块映射;处理客户端的读写请求;配置副本策略;管理HDFS的名称空间;
SecondaryNameNode:是一个小弟,分担大哥namenode的工作量;是NameNode的冷备份;合并fsimage和fsedits然后再发给namenode。
DataNode:Slave节点,奴隶,干活的。负责存储client发来的数据块block;执行数据块的读写操作。
热备份:b是a的热备份,如果a坏掉。那么b马上运行代替a的工作。
冷备份:b是a的冷备份,如果a坏掉。那么b不能马上代替a工作。但是b上存储a的一些信息,减少a坏掉之后的损失。
fsimage:元数据镜像文件(文件系统的目录树。)
edits:元数据的操作日志(针对文件系统做的修改操作记录)
namenode内存中存储的是=fsimage+edits。
SecondaryNameNode负责定时默认1小时,从namenode上,获取fsimage和edits来进行合并,然后再发送给namenode。减少namenode的工作量。
工作原理
写操作:

有一个文件FileA,100M大小。Client将FileA写入到HDFS上。
HDFS按默认配置。
HDFS分布在三个机架上Rack1,Rack2,Rack3。
 
a. Client将FileA按64M分块。分成两块,block1和Block2;
b. Client向nameNode发送写数据请求,如图蓝色虚线①------>。
c. NameNode节点,记录block信息。并返回可用的DataNode,如粉色虚线②--------->。
    Block1: host2,host1,host3
    Block2: host7,host8,host4
    原理:
        NameNode具有RackAware机架感知功能,这个可以配置。
        若client为DataNode节点,那存储block时,规则为:副本1,同client的节点上;副本2,不同机架节点上;副本3,同第二个副本机架的另一个节点上;其他副本随机挑选。
        若client不为DataNode节点,那存储block时,规则为:副本1,随机选择一个节点上;副本2,不同副本1,机架上;副本3,同副本2相同的另一个节点上;其他副本随机挑选。
d. client向DataNode发送block1;发送过程是以流式写入。
    流式写入过程,
       1>将64M的block1按64k的package划分;
        2>然后将第一个package发送给host2;
        3>host2接收完后,将第一个package发送给host1,同时client想host2发送第二个package;
        4>host1接收完第一个package后,发送给host3,同时接收host2发来的第二个package。
        5>以此类推,如图红线实线所示,直到将block1发送完毕。
        6>host2,host1,host3向NameNode,host2向Client发送通知,说“消息发送完了”。如图粉红颜色实线所示。
        7>client收到host2发来的消息后,向namenode发送消息,说我写完了。这样就真完成了。如图黄色粗实线
        8>发送完block1后,再向host7,host8,host4发送block2,如图蓝色实线所示。
        9>发送完block2后,host7,host8,host4向NameNode,host7向Client发送通知,如图浅绿色实线所示。
        10>client向NameNode发送消息,说我写完了,如图黄色粗实线。。。这样就完毕了。
分析,通过写过程,我们可以了解到:
    写1T文件,我们需要3T的存储,3T的网络流量贷款。
    在执行读或写的过程中,NameNode和DataNode通过HeartBeat进行保存通信,确定DataNode活着。如果发现DataNode死掉了,就将死掉的DataNode上的数据,放到其他节点去。读取时,要读其他节点去。
    挂掉一个节点,没关系,还有其他节点可以备份;甚至,挂掉某一个机架,也没关系;其他机架上,也有备份。
 
工作原理
写操作:

有一个文件FileA,100M大小。Client将FileA写入到HDFS上。
HDFS按默认配置。
HDFS分布在三个机架上Rack1,Rack2,Rack3。
 
a. Client将FileA按64M分块。分成两块,block1和Block2;
b. Client向nameNode发送写数据请求,如图蓝色虚线①------>。
c. NameNode节点,记录block信息。并返回可用的DataNode,如粉色虚线②--------->。
    Block1: host2,host1,host3
    Block2: host7,host8,host4
    原理:
        NameNode具有RackAware机架感知功能,这个可以配置。
        若client为DataNode节点,那存储block时,规则为:副本1,同client的节点上;副本2,不同机架节点上;副本3,同第二个副本机架的另一个节点上;其他副本随机挑选。
        若client不为DataNode节点,那存储block时,规则为:副本1,随机选择一个节点上;副本2,不同副本1,机架上;副本3,同副本2相同的另一个节点上;其他副本随机挑选。
d. client向DataNode发送block1;发送过程是以流式写入。
    流式写入过程,
       1>将64M的block1按64k的package划分;
        2>然后将第一个package发送给host2;
        3>host2接收完后,将第一个package发送给host1,同时client想host2发送第二个package;
        4>host1接收完第一个package后,发送给host3,同时接收host2发来的第二个package。
        5>以此类推,如图红线实线所示,直到将block1发送完毕。
        6>host2,host1,host3向NameNode,host2向Client发送通知,说“消息发送完了”。如图粉红颜色实线所示。
        7>client收到host2发来的消息后,向namenode发送消息,说我写完了。这样就真完成了。如图黄色粗实线
        8>发送完block1后,再向host7,host8,host4发送block2,如图蓝色实线所示。
        9>发送完block2后,host7,host8,host4向NameNode,host7向Client发送通知,如图浅绿色实线所示。
        10>client向NameNode发送消息,说我写完了,如图黄色粗实线。。。这样就完毕了。
分析,通过写过程,我们可以了解到:
    写1T文件,我们需要3T的存储,3T的网络流量贷款。
    在执行读或写的过程中,NameNode和DataNode通过HeartBeat进行保存通信,确定DataNode活着。如果发现DataNode死掉了,就将死掉的DataNode上的数据,放到其他节点去。读取时,要读其他节点去。
    挂掉一个节点,没关系,还有其他节点可以备份;甚至,挂掉某一个机架,也没关系;其他机架上,也有备份。
 
HDFS中常用到的命令
1、hadoop fshadoop fs -ls /hadoop fs -lsr
hadoop fs -mkdir /user/hadoop
hadoop fs -put a.txt /user/hadoop/
hadoop fs -get /user/hadoop/a.txt /
hadoop fs -cp src dst
hadoop fs -mv src dst
hadoop fs -cat /user/hadoop/a.txt
hadoop fs -rm /user/hadoop/a.txt
hadoop fs -rmr /user/hadoop/a.txt
hadoop fs -text /user/hadoop/a.txt
hadoop fs -copyFromLocal localsrc dst 与hadoop fs -put功能类似。
hadoop fs -moveFromLocal localsrc dst 将本地文件上传到hdfs,同时删除本地文件。
2、hadoop fsadminhadoop dfsadmin -report
hadoop dfsadmin -safemode enter | leave | get | wait
hadoop dfsadmin -setBalancerBandwidth 1000
3、hadoop fsck
4、start-balancer.sh
 

hadoop 创建hdfs文件夹

唐半张 发表了文章 0 个评论 2594 次浏览 2015-09-25 11:02 来自相关话题

sudo addgroup Hadoop#添加一个hadoop组 sudo usermod -a -G hadoop larry#将当前用户加入到hadoop组 sudo gedit etc/sudoers#将hadoop组加入到sud ...查看全部
sudo addgroup Hadoop#添加一个hadoop组
sudo usermod -a -G hadoop larry#将当前用户加入到hadoop组
sudo gedit etc/sudoers#将hadoop组加入到sudoer
在root ALL=(ALL) ALL后 hadoop ALL=(ALL) ALL
修改hadoop目录的权限
sudo chown -R larry:hadoop /home/larry/hadoop<所有者:组 文件>
sudo chmod -R 755 /home/larry/hadoop
修改hdfs的权限
sudo bin/hadoop dfs -chmod -R 755 /
sudo bin/hadoop dfs -ls /
修改hdfs文件的所有者
sudo bin/hadoop fs -chown -R larry /
sudo bin/hadoop dfsadmin -safemode leave #解除hadoop的安全模式
hadoop fs -copyFromLocal URI#拷贝本地文件到hdfs

hadoop fs -cat file:///file3 /user/hadoop/file4#将路径指定文件的内容输出到stdout

hadoop fs -chgrp [-R] GROUP URI#改变文件的所属组

hadoop fs -chmod [-R] 755 URI#改变用户访问权限

hadoop fs -chown [-R] [OWNER][:[GROUP]] URI [URI ]#修改文件的所有者

hadoop fs -copyToLocal URI localdst#拷贝hdfs文件到本地

hadoop fs -cp URI [URI …] #拷贝hdfs文件到其它目录

hadoop fs -du URI [URI …]#显示目录中所有文件的大小

hadoop fs -getmerge [addnl]#合并文件到本地目录
更多Hadoop相关信息见Hadoop 专题页面

分布式计算框架有哪些

唐半张 发表了文章 0 个评论 2167 次浏览 2015-09-25 10:59 来自相关话题

在SIP项目设计的过程中,对于它庞大的日志在开始时就考虑使用任务分解的多线程处理模式来分析统计,在我从前写的文章《Tiger Concurrent Practice --日志分析并行分解设计与实现》中有所提到。但是由于统计的内容暂时还是十分简单,所以就采用Me ...查看全部
在SIP项目设计的过程中,对于它庞大的日志在开始时就考虑使用任务分解的多线程处理模式来分析统计,在我从前写的文章《Tiger Concurrent Practice --日志分析并行分解设计与实现》中有所提到。但是由于统计的内容暂时还是十分简单,所以就采用Memcache作为计数器,结合MySQL就完成了访问控制以及统计的工作。然而未来,对于海量日志分析的工作,还是需要有所准备。现在最火的技术词汇莫过于“云计算”,在Open API日益盛行的今天,互联网应用的数据将会越来越有价值,如何去分析这些数据,挖掘其内在价值,就需要分布式计算来支撑海量数据的分析工作。
回过头来看,早先那种多线程,多任务分解的日志分析设计,其实是分布式计算的一个单机版缩略,如何将这种单机的工作进行分拆,变成协同工作的集群,其实就是分布式计算框架设计所涉及的。在去年参加BEA大会的时候,BEA和VMWare合作采用虚拟机来构建集群,无非就是希望使得计算机硬件能够类似于应用程序中资源池的资源,使用者无需关心资源的分配情况,从而最大化了硬件资源的使用价值。分布式计算也是如此,具体的计算任务交由哪一台机器执行,执行后由谁来汇总,这都由分布式框架的Master来抉择,而使用者只需简单地将待分析内容提供给分布式计算系统作为输入,就可以得到分布式计算后的结果。
Hadoop是Apache开源组织的一个分布式计算开源框架,在很多大型网站上都已经得到了应用,如亚马逊、Facebook和Yahoo等等。对于我来说,最近的一个使用点就是服务集成平台的日志分析。服务集成平台的日志量将会很大,而这也正好符合了分布式计算的适用场景(日志分析和索引建立就是两大应用场景)。
当前没有正式确定使用,所以也是自己业余摸索,后续所写的相关内容,都是一个新手的学习过程,难免会有一些错误,只是希望记录下来可以分享给更多志同道合的朋友。什么是Hadoop?
搞什么东西之前,第一步是要知道What(是什么),然后是Why(为什么),最后才是How(怎么做)。但很多开发的朋友在做了多年项目以后,都习惯是先How,然后What,最后才是Why,这样只会让自己变得浮躁,同时往往会将技术误用于不适合的场景。
 
Hadoop框架中最核心的设计就是:MapReduce和HDFS。MapReduce的思想是由Google的一篇论文所提及而被广为流传的,简单的一句话解释MapReduce就是“任务的分解与结果的汇总”。HDFS是Hadoop分布式文件系统(Hadoop Distributed File System)的缩写,为分布式计算存储提供了底层支持。
MapReduce从它名字上来看就大致可以看出个缘由,两个动词Map和Reduce,“Map(展开)”就是将一个任务分解成为多个任务,“Reduce”就是将分解后多任务处理的结果汇总起来,得出最后的分析结果。这不是什么新思想,其实在前面提到的多线程,多任务的设计就可以找到这种思想的影子。不论是现实社会,还是在程序设计中,一项工作往往可以被拆分成为多个任务,任务之间的关系可以分为两种:一种是不相关的任务,可以并行执行;另一种是任务之间有相互的依赖,先后顺序不能够颠倒,这类任务是无法并行处理的。回到大学时期,教授上课时让大家去分析关键路径,无非就是找最省时的任务分解执行方式。在分布式系统中,机器集群就可以看作硬件资源池,将并行的任务拆分,然后交由每一个空闲机器资源去处理,能够极大地提高计算效率,同时这种资源无关性,对于计算集群的扩展无疑提供了最好的设计保证。(其实我一直认为Hadoop的卡通图标不应该是一个小象,应该是蚂蚁,分布式计算就好比蚂蚁吃大象,廉价的机器群可以匹敌任何高性能的计算机,纵向扩展的曲线始终敌不过横向扩展的斜线)。任务分解处理以后,那就需要将处理以后的结果再汇总起来,这就是Reduce要做的工作。
图1:MapReduce结构示意图
上图就是MapReduce大致的结构图,在Map前还可能会对输入的数据有Split(分割)的过程,保证任务并行效率,在Map之后还会有Shuffle(混合)的过程,对于提高Reduce的效率以及减小数据传输的压力有很大的帮助。后面会具体提及这些部分的细节。
HDFS是分布式计算的存储基石,Hadoop的分布式文件系统和其他分布式文件系统有很多类似的特质。分布式文件系统基本的几个特点:
  1. 对于整个集群有单一的命名空间。
  2. 数据一致性。适合一次写入多次读取的模型,客户端在文件没有被成功创建之前无法看到文件存在。
  3. 文件会被分割成多个文件块,每个文件块被分配存储到数据节点上,而且根据配置会由复制文件块来保证数据的安全性。


图2:HDFS结构示意图
上图中展现了整个HDFS三个重要角色:NameNode、DataNode和Client。NameNode可以看作是分布式文件系统中的管理者,主要负责管理文件系统的命名空间、集群配置信息和存储块的复制等。NameNode会将文件系统的Meta-data存储在内存中,这些信息主要包括了文件信息、每一个文件对应的文件块的信息和每一个文件块在DataNode的信息等。DataNode是文件存储的基本单元,它将Block存储在本地文件系统中,保存了Block的Meta-data,同时周期性地将所有存在的Block信息发送给NameNode。Client就是需要获取分布式文件系统文件的应用程序。这里通过三个操作来说明他们之间的交互关系。
文件写入:
  1. Client向NameNode发起文件写入的请求。
  2. NameNode根据文件大小和文件块配置情况,返回给Client它所管理部分DataNode的信息。
  3. Client将文件划分为多个Block,根据DataNode的地址信息,按顺序写入到每一个DataNode块中。

文件读取:
  1. Client向NameNode发起文件读取的请求。
  2. NameNode返回文件存储的DataNode的信息。
  3. Client读取文件信息。

文件Block复制:
  1. NameNode发现部分文件的Block不符合最小复制数或者部分DataNode失效。
  2. 通知DataNode相互复制Block。
  3. DataNode开始直接相互复制。

最后再说一下HDFS的几个设计特点(对于框架设计值得借鉴):
  1. Block的放置:默认不配置。一个Block会有三份备份,一份放在NameNode指定的DataNode,另一份放在与指定DataNode非同一Rack上的DataNode,最后一份放在与指定DataNode同一Rack上的DataNode上。备份无非就是为了数据安全,考虑同一Rack的失败情况以及不同Rack之间数据拷贝性能问题就采用这种配置方式。
  2. 心跳检测DataNode的健康状况,如果发现问题就采取数据备份的方式来保证数据的安全性。
  3. 数据复制(场景为DataNode失败、需要平衡DataNode的存储利用率和需要平衡DataNode数据交互压力等情况):这里先说一下,使用HDFS的balancer命令,可以配置一个Threshold来平衡每一个DataNode磁盘利用率。例如设置了Threshold为10%,那么执行balancer命令的时候,首先统计所有DataNode的磁盘利用率的均值,然后判断如果某一个DataNode的磁盘利用率超过这个均值Threshold以上,那么将会把这个DataNode的block转移到磁盘利用率低的DataNode,这对于新节点的加入来说十分有用。
  4. 数据交验:采用CRC32作数据交验。在文件Block写入的时候除了写入数据还会写入交验信息,在读取的时候需要交验后再读入。
  5. NameNode是单点:如果失败的话,任务处理信息将会纪录在本地文件系统和远端的文件系统中。
  6. 数据管道性的写入:当客户端要写入文件到DataNode上,首先客户端读取一个Block然后写到第一个DataNode上,然后由第一个DataNode传递到备份的DataNode上,一直到所有需要写入这个Block的NataNode都成功写入,客户端才会继续开始写下一个Block。
  7. 安全模式:在分布式文件系统启动的时候,开始的时候会有安全模式,当分布式文件系统处于安全模式的情况下,文件系统中的内容不允许修改也不允许删除,直到安全模式结束。安全模式主要是为了系统启动的时候检查各个DataNode上数据块的有效性,同时根据策略必要的复制或者删除部分数据块。运行期通过命令也可以进入安全模式。在实践过程中,系统启动的时候去修改和删除文件也会有安全模式不允许修改的出错提示,只需要等待一会儿即可。

下面综合MapReduce和HDFS来看Hadoop的结构:

图3:Hadoop结构示意图
在Hadoop的系统中,会有一台Master,主要负责NameNode的工作以及JobTracker的工作。JobTracker的主要职责就是启动、跟踪和调度各个Slave的任务执行。还会有多台Slave,每一台Slave通常具有DataNode的功能并负责TaskTracker的工作。TaskTracker根据应用要求来结合本地数据执行Map任务以及Reduce任务。
说到这里,就要提到分布式计算最重要的一个设计点:Moving Computation is Cheaper than Moving Data。就是在分布式处理中,移动数据的代价总是高于转移计算的代价。简单来说就是分而治之的工作,需要将数据也分而存储,本地任务处理本地数据然后归总,这样才会保证分布式计算的高效性。为什么要选择Hadoop?
说完了What,简单地说一下Why。官方网站已经给了很多的说明,这里就大致说一下其优点及使用的场景(没有不好的工具,只用不适用的工具,因此选择好场景才能够真正发挥分布式计算的作用):
  1. 可扩展:不论是存储的可扩展还是计算的可扩展都是Hadoop的设计根本。
  2. 经济:框架可以运行在任何普通的PC上。
  3. 可靠:分布式文件系统的备份恢复机制以及MapReduce的任务监控保证了分布式处理的可靠性。
  4. 高效:分布式文件系统的高效数据交互实现以及MapReduce结合Local Data处理的模式,为高效处理海量的信息作了基础准备。

使用场景:个人觉得最适合的就是海量数据的分析,其实Google最早提出MapReduce也就是为了海量数据分析。同时HDFS最早是为了搜索引擎实现而开发的,后来才被用于分布式计算框架中。海量数据被分割于多个节点,然后由每一个节点并行计算,将得出的结果归并到输出。同时第一阶段的输出又可以作为下一阶段计算的输入,因此可以想象到一个树状结构的分布式计算图,在不同阶段都有不同产出,同时并行和串行结合的计算也可以很好地在分布式集群的资源下得以高效的处理。

hdfs创建文件

夕阳丶一抹红颜 发表了文章 0 个评论 1546 次浏览 2015-09-22 11:45 来自相关话题

public void create(String src, FsPermission masked, String clientName,                      boolean overwrite, short replica ...查看全部
public void create(String src, FsPermission masked, String clientName,
                     boolean overwrite, short replication, long blockSize) throws IOException;

它把請求交给了FSNamesystem。FSNamesystem相当于一本花名册,记录着文件系统的一些元数据,比如从文件名映射到block列表。创建文件要在它这里先“注册”一下。注册的过程有一系列的检查,包括是否处于“安全模式”(系统启动时需要检查数据完整性,不允许写操作),用户是否有操作权限等。然后取得文件的INode(包含文件元数据的类),使用LeaseManager来检查避免并发写文件。

这里专门说一下 Lease Management(租约管理)。每个需要写文件的客户端要先得到对该文件的一个Lease,它就相当于一把锁,控制并发的写操作。既然是租约,便有使用期限,客户端需要定期地续约,以告知服务器它还在写这个文件。如果超过了期限而没有续约(renew),那么此Lease将作废,即该写操作结束。这时,LeaseManager使用一个Lease Recovery Algorithm,清理好这个失效的Lease[2]。

继续说创建文件。在一些检查之后,开始了实质性的操作:

1. 产生一个GenerationStamp,全局的一个时间戳,用于记录block的版本信息,保证备份数据的一致性[1]。

2. 创建一个新的INodeFileUnderConstruction,它表示一个正在创建的文件,它的isUnderConstruction方法返回 true。将它加到相应的目录下,调用INodeDirectory的addChild方法,它用ArrayList来管理该目录下的文件。注意这些信息还只是在内存里。

3. 使用日志记录下此次操作:

   fsImage.getEditLog().logOpenFile(path, newNode)

FSImage 处理checkpoint以及记录命名空间的修改,后者主要信赖FSEditLog进行,它将newNode的詳細信息都写入文件"edits"里。文件 fsimage含有整个文件系统的信息,比如文件到block的映射,但它并不是实时更新的,最近的更改会临时写入edits文件里。NameNode在启动的时候会将edits里记录的改变都合并进来。并且SecondaryNameNode定期地也会扫描合并两个文件,一来用于备份,二来可以在下次重启时作为一个更新的起点,加快启动,这点以后再继续分析。注意,这些文件默认存在NameNode所在机器上的"${hadoop.tmp.dir}/dfs/name"下,可以配置属性"dfs.name.dir"来更改[3]。

hadoop hdfs配置dfs.replication

夕阳丶一抹红颜 发表了文章 0 个评论 2027 次浏览 2015-09-22 11:42 来自相关话题

dfs.data.dir指的是datanode上数据存放的目录,配置多个可能是因为一个目录下面挂的硬盘不够用,所以多加了几个目录 repication是配置hdfs中数据存放的份数,也就是备份数,防止数据丢失的 ...查看全部
dfs.data.dir指的是datanode上数据存放的目录,配置多个可能是因为一个目录下面挂的硬盘不够用,所以多加了几个目录
repication是配置hdfs中数据存放的份数,也就是备份数,防止数据丢失的

hdfs平衡分布

cenyuhai 发表了文章 0 个评论 1441 次浏览 2015-09-11 14:40 来自相关话题

这篇文章是从网上看到的,觉得很好就收藏了,但是最终不知道出处了。 Hadoop的HDFS集群非常容易出现机器与机器之间磁盘利用率不平衡的情况,比如集群中添加新的数据节点。当HDFS出现不平衡状况的时候,将引发很多问题,比如MR程序无法很好地利用本地 ...查看全部
这篇文章是从网上看到的,觉得很好就收藏了,但是最终不知道出处了。
Hadoop的HDFS集群非常容易出现机器与机器之间磁盘利用率不平衡的情况,比如集群中添加新的数据节点。当HDFS出现不平衡状况的时候,将引发很多问题,比如MR程序无法很好地利用本地计算的优势,机器之间无法达到更好的网络带宽使用率,机器磁盘无法利用等等。可见,保证HDFS中的数据平衡是非常重要的。
在Hadoop中,包含一个Balancer程序,通过运行这个程序,可以使得HDFS集群达到一个平衡的状态,使用这个程序的命令如下:
sh $HADOOP_HOME/bin/start-balancer.sh –t 10%
这个命令中-t参数后面跟的是HDFS达到平衡状态的磁盘使用率偏差值。如果机器与机器之间磁盘使用率偏差小于10%,那么我们就认为HDFS集群已经达到了平衡的状态。
Hadoop的开发人员在开发Balancer程序的时候,遵循了以下几点原则:
1. 在执行数据重分布的过程中,必须保证数据不能出现丢失,不能改变数据的备份数,不能改变每一个rack中所具备的block数量。
2. 系统管理员可以通过一条命令启动数据重分布程序或者停止数据重分布程序。
3. Block在移动的过程中,不能暂用过多的资源,如网络带宽。
4. 数据重分布程序在执行的过程中,不能影响name node的正常工作。
基于这些基本点,目前Hadoop数据重分布程序实现的逻辑流程如下图所示:

Rebalance程序作为一个独立的进程与name node进行分开执行。
1 Rebalance Server从Name Node中获取所有的Data Node情况:每一个Data Node磁盘使用情况。
2 Rebalance Server计算哪些机器需要将数据移动,哪些机器可以接受移动的数据。并且从Name Node中获取需要移动的数据分布情况。
3 Rebalance Server计算出来可以将哪一台机器的block移动到另一台机器中去。
4,5,6 需要移动block的机器将数据移动的目的机器上去,同时删除自己机器上的block数据。
7 Rebalance Server获取到本次数据移动的执行结果,并继续执行这个过程,一直没有数据可以移动或者HDFS集群以及达到了平衡的标准为止。
Hadoop现有的这种Balancer程序工作的方式在绝大多数情况中都是非常适合的。
现在我们设想这样一种情况:
1 数据是3份备份。
2 HDFS由2个rack组成。
3 2个rack中的机器磁盘配置不同,第一个rack中每一台机器的磁盘空间为1TB,第二个rack中每一台机器的磁盘空间为10TB。
4 现在大多数数据的2份备份都存储在第一个rack中。
在这样的一种情况下,HDFS级群中的数据肯定是不平衡的。现在我们运行Balancer程序,但是会发现运行结束以后,整个HDFS集群中的数据依旧不平衡:rack1中的磁盘剩余空间远远小于rack2。
这是因为Balance程序的开发原则1导致的。
简单的说,就是在执行Balancer程序的时候,不会将数据中一个rack移动到另一个rack中,所以就导致了Balancer程序永远无法平衡HDFS集群的情况。
针对于这种情况,可以采取2中方案:
1 继续使用现有的Balancer程序,但是修改rack中的机器分布。将磁盘空间小的机器分叉到不同的rack中去。
2 修改Balancer程序,允许改变每一个rack中所具备的block数量,将磁盘空间告急的rack中存放的block数量减少,或者将其移动到其他磁盘空间富余的rack中去。

nfs挂载hdfs,实现云存储

cenyuhai 发表了文章 0 个评论 1513 次浏览 2015-09-11 14:39 来自相关话题

本来不知道nfs是啥,因为群里的Harry童鞋有个问题,如何把本地目录挂载到hdfs上,搞什么云存储,说那么巧就是那么巧,HDP支持nfs,然后我就照着文档的说明去做,最后弄出来了。   1.修改机器上的hdfs-default.xml ...查看全部
本来不知道nfs是啥,因为群里的Harry童鞋有个问题,如何把本地目录挂载到hdfs上,搞什么云存储,说那么巧就是那么巧,HDP支持nfs,然后我就照着文档的说明去做,最后弄出来了。
  1.修改机器上的hdfs-default.xml
   vi /share/lib/hadoop/conf/hdfs-default.xml
  如果没有hdfs-default就找hdfs-site.xml 设置为如下内容,hdp的默认值是0
  

  dfs.access.time.precision
  3600000

  2.修改hdfs-site.xml,HDP的默认值比较大,4096的,这里就不修改了
  

  dfs.datanode.max.xcievers
  1024

  3.设置nf3的临时存储目录
  

dfs.nfs3.dump.dir
/tmp/.hdfs-nfs

  4.nfs本身也有portmap和nfs3等东西,但是我们不用它们的,我们用hadoop本身自带的,分别执行以下的命令
hadoop portmap
hadoop nfs3

  或者
  
hadoop-daemon.sh start portmap
hadoop-daemon.sh start nfs3

  使用上面的命令启动的,可以用以下的命令停止
  
hadoop-daemon.sh stop nfs3
hadoop-daemon.sh stop portmap

  5.安装nfs yum install nfs-utils
  6.然后分别执行以下两句命令检查一下
  rpcinfo -p 127.0.0.1
program vers proto port
100005 1 tcp 4242 mountd
100005 2 udp 4242 mountd
100005 2 tcp 4242 mountd
100000 2 tcp 111 portmapper
100000 2 udp 111 portmapper
100005 3 udp 4242 mountd


  showmount -e 127.0.0.1
  
Exports list on $nfs_server_ip:
/ (everyone)

  7.把本地的home/cenyuhai/nfs和hdfs做映射,做完映射之后,hdfs上的目录会出现在本地的/home/cenyuhai/nfs目录中
  
mount -t nfs -o vers=3,proto=tcp,nolock 127.0.0.1:/ /home/cenyuhai/nfs

  然后我们可以看到nfs下面出现了这些目录,它们就是我在hdfs上面的目录。
 

源码分析之灰太狼手札(四):HDFS IO

mopishv0 发表了文章 0 个评论 1918 次浏览 2015-09-08 15:20 来自相关话题

四 HDFS IO  4.1 L2缓存  4.1.1 构造 代码起始位置:Store 259行。     如3.1.3所述构造CacheConfigBuil ...查看全部
四 HDFS IO

 4.1 L2缓存

 4.1.1 构造


代码起始位置:Store 259行。

    如3.1.3所述构造CacheConfigBuilder实例时创建L2BucketCacheFactory实例(目前没有多种实现的配置),然后在build出CacheConfig的同时,利用构造L2Cache。此处需要注意一个配置,"hfile.l2.cacheblocksonwrite"(L2_CACHE_BLOCKS_ON_FLUSH_KEY),此配置默认为true,也就是说在flush的时候默认会将block写入缓存,对于SSD做缓存来说是致命的,需要配置为false。

    实例化L2Cache对象时,首先根据配置hfile.l2.bucketcache.ioengine(根据注释,只能有heap和offheap两个值)决定使用内存位置,如果配置为空(也是默认值),则认为禁用L2缓存;然后根据其配置获取对应内存区域的最大内存值;根据hfile.l2.bucketcache.size计算L2缓存大小(配置浮点数和整数的含义不同,前者为最大内存值的比例,后者为占用几MB内存);根据hfile.l2.bucketcache.writer.threads、hfile.l2.bucketcache.queue.length和hfile.l2.bucketcache.ioengine.errors.tolerated.duration配置缓存写入线程数、写入队列长度和发生异常时最长可容忍的时长;根据hfile.l2.bucketcache.bucket.sizes设置每个数据缓存层级大小(此处桶,也就是bucket代表一个缓存数据集合,其长度类似redis中内存按4K、8K、16K…的分级);构造数据缓存桶类BucketCache实例;构造L2缓存类L2BucketCache实例。

    L2BucketCache实际上是BucketCache的壳子,目的是为了提供L2Cache接口的具体实现;所以更复杂的初始化操作包含在BucketCache类中。首先,设置之前的参数,根据ioengine名构造具体的IOEngine对象(实现类为ByteBufferIOEngine);然后,如果做缓存最大只支持32T的校验;按照8K设置一个block的最小值得预设值;初始化存储管理类,其中主要包括缓存长度信息(其中包括该长度中的所有桶列表、有空余空间的桶列表、未缓存block的桶列表),数据桶信息并对其进行初始化(根据总长度和此层长度划分出缓存数组类似memcache的结构,这里指规划在总体缓存中的偏移量),按顺序每级缓存长度一个数据桶,多出的全部分给最后一级;为每个线程创建写入队列和标记信号对象;构造正要写入缓存的对象Map和缓存信息Map;构造写入线程;创建定时统计任务,此任务只负责打印缓存状态日志。

4.1.2 写操作

代码起始位置:L2Cache.java 45行。

    首先来看看接口中cacheRawBlock函数的注释:添加块到L2缓存。该块必须由一个可被写入到磁盘的字节数组来表示。可见,接口设计方面考虑到了写入磁盘的可能性。

接下来分析一下具体实现:首先,将hfile文件名和偏移量封装缓存键值对象;然后将键值和块内容的字节数组交给BucketCache对象处理;如果正要写入缓存Map中或缓存信息Map中有相关记录则直接退出函数;将字节数组、键值是否保存至内存(默认为false)和缓存序列号封装为缓存值对象并保存至正要写入缓存Map中;根据键值对象的哈希值选择并放入缓存写入队列;如果放入队列成功则增加缓存计数,增加缓存长度大小,保存hfile到缓存键值的映射关系;如果放入队列失败则等待50ms再次尝试,如果依然失败则从正要写入缓存Map中移除并增加失败计数。

进一步保存由WriterThread处理。线程中首先将队列中所有缓存对象取出;然后针对每个对象写入缓存:先找到合适的桶大小(缓存长度级别),然后尝试获取有空余空间的桶,如果无法获取则从有多个桶且有未缓存block的桶的缓存级别中拿来一个未缓存block的桶并重新初始化,从缓存数组中拿出一个元素的偏移量,根据桶的情况在修改长度层级中的有空余空间的桶列表和未缓存block的桶列表,将偏移量、长度、保存时间和是否在内存中封装为BucketCache.BucketEntry类对象实例,根据偏移量计算在总缓存(以ByteBuffer数组的形式引用)中的起始下标位置,然后依次写入缓存内容(此处做了较复杂的处理,猜测目的是为了适应buffer数组长度不能与缓存长度级别自适应的情况),至此数据已经写入缓存,如果期间发生异常需要及时释放缓存;写入正确后更新IO异常起始时间为-1;更新正要写入缓存的对象Map和缓存信息Map;如果已缓存总长度大于预设值(写死的95%)则开始回收缓存,见4.1.5。

这个过程和某些内存缓存存储的方案类似,不过区别是,这里总是拿一个完全空的桶,然后根据不同的长度桶中可存储的block个数不同。而有的内存存储中,是通过类似128K分裂为两个64K的桶,64K的桶适时的分裂为2个32K的桶来实现的。HBASE的方案好处是不需要进行整理,但是当某一级别的桶不够用时,有可能很多桶不满但是也不完全空。分裂的方案优点是存储空间灵活分配,但是有时需要整理,要不然会产生碎片。

4.1.3 读缓存

代码起始位置:L2Cache.java 35行。

    首先来看看接口中getRawBlock函数的注释:从L2缓存中取回block。该块由一个在磁盘中存储的字节数组来表示。

    如果读取时,记录仍在正要写入缓存的对象Map,则直接从其中获取数据,记录命中信息后返回;否则从缓存信息Map中获取缓存数据桶信息,根据信息中的长度和偏移量构造byte数组并从ioEngine对象中读取缓存数据;记录命中信息;如果已缓存信息中也没有命中,则记录未命中信息。

4.1.4 移除缓存

代码起始位置:L2Cache.java 53行。

    移除缓存以hfile为单位,首先从缓存索引中,拿出所有指定文件的缓存信息;然后将信息进行一次拷贝,用于生成缓存快照,此后可能会有新的缓存进入或移除,拷贝一次可防止并发导致的误操作;之后,针对每一个缓存信息:首先试图从正写入集合中移除;然后试图从已写入缓存集合中拿出对应缓存信息,然后获得缓存所在桶及对应缓存长度级别,释放桶中对应元素,之后各种标记。

    吐槽一下:虽然此段逻辑没有IOEngine的关系,L2缓存的逻辑层次到了内存IOEngine级别后较多,但是功能结构划分又不明确有待优化。

4.1.5 回收缓存

代码起始位置:BucketCache.java 743行。

    代码中首先根据DEFAULT_MIN_FACTOR计算每个缓存长度级别应当保留多少纯空的数据桶(最少1个),并计算出依次标准能释放出的存储量,如果总量小于等于0说明空间很充足不需要释放;然后打印诸多log;然后统计不同优先级SINGLE(首次加入缓存)、MULTI(被多次访问后)、MEMORY(默认未开启)归类(优先级依次提高);根据每个优先级的实际占用总和与DEFAULT_SINGLE_FACTOR、DEFAULT_MULTI_FACTOR和DEFAULT_MEMORY_FACTOR确定每个优先级多占了多少内存;从多占内存与(之前计算的可释放的存储量-已释放存储)/剩余优先级数量这两个值中取较小值作为此次回收内存的标准,开始释放本优先级内存;释放内存比较简单,即遍历根据之前统计各优先级占用存储时生成的优先队列,并释放移除的缓存(见4.1.4),直到达到此次释放标准或已遍历所有其中元素;最后,打印log释放锁。

4.1.6 关闭缓存

代码起始位置:L2Cache.java 58行。

    关闭缓存的逻辑比较简单:标记缓存关闭;关闭IOEngine(目前的内存IOEngine没有任何操作);关闭统计线程;中断缓存写入线程;清空正在写入缓存集合和已缓存集合。

4.2 HDFS写入类V2

4.2.1 构造


代码起始位置:StoreFile.java 892行。