Hive

Hive

beeline 连接spark-thriftServer,insert语句报错

回复

啊啊啊吧 发起了问题 1 人关注 0 个回复 1149 次浏览 2020-07-20 15:10 来自相关话题

hive 不能drop表问题

回复

幻想症khe 发起了问题 1 人关注 0 个回复 1307 次浏览 2020-05-29 11:30 来自相关话题

sqoop将hive中user_action导入mysql中出错 java.lang.ClassNotFoundException: user_action

piedpiper5e638482573c9 回复了问题 4 人关注 2 个回复 2450 次浏览 2020-03-24 18:53 来自相关话题

hive上使用Python udf ,报stream closed错误

桦渐2019 回复了问题 3 人关注 3 个回复 4715 次浏览 2020-02-18 17:55 来自相关话题

hive3.x版本,用户不能创建表

回复

幻想症khe 发起了问题 1 人关注 0 个回复 1869 次浏览 2019-11-02 15:59 来自相关话题

sqoop使用create-hive-table命令 是否有办法直接创建分区hive表?

回复

时生君 发起了问题 1 人关注 0 个回复 1785 次浏览 2019-03-10 17:41 来自相关话题

请教一个参数 serialization.format

回复

那小子真帅 发起了问题 1 人关注 0 个回复 5220 次浏览 2018-09-11 15:59 来自相关话题

hive的load数据分布及如何提交分布事务相关问题

回复

奔跑的鱼 发起了问题 1 人关注 0 个回复 1521 次浏览 2018-07-20 17:50 来自相关话题

hive 如何多行转多列,并且行(转换前的)能对应到指定的列(转换后)

回复

开心就好_kxjh 发起了问题 0 人关注 0 个回复 3860 次浏览 2018-07-13 22:30 来自相关话题

请问oozie里执行tez引擎的hive,如何配置?

回复

兔撕鸡大爷 发起了问题 1 人关注 0 个回复 1258 次浏览 2018-03-15 15:44 来自相关话题

hive on spark执行完job还一直是running状态, 哪位知道怎么解决

回复

兔撕鸡大爷 发起了问题 1 人关注 0 个回复 1767 次浏览 2018-03-15 15:44 来自相关话题

hive中有 a&get;b?a:b这样的函数吗?不要case when

回复

兔撕鸡大爷 发起了问题 1 人关注 0 个回复 1030 次浏览 2018-03-15 15:43 来自相关话题

请教个问题:hive上怎么实现匹配上更新,没匹配上插入.最好是在现有表上操作

史晓江 回复了问题 2 人关注 1 个回复 1333 次浏览 2018-03-15 15:43 来自相关话题

问一下,HIVE 0.13已经有了IN  和 EXISTS,那 LEFT SEMI JOIN 这种用法存在的意义是什么。

史晓江 回复了问题 2 人关注 1 个回复 1140 次浏览 2018-03-15 15:43 来自相关话题

hive2.1,hawq怎么批量导入hdfs.上的数据?

回复

兔撕鸡大爷 发起了问题 1 人关注 0 个回复 1056 次浏览 2018-03-15 15:42 来自相关话题

请问hive中python编写udf时,需要配置什么参数吗?是需要配置什么参数还是需要赋权吗?

回复

兔撕鸡大爷 发起了问题 1 人关注 0 个回复 1150 次浏览 2018-03-15 15:42 来自相关话题

这个sql怎么优化

回复

兔撕鸡大爷 发起了问题 1 人关注 0 个回复 1108 次浏览 2018-03-15 15:41 来自相关话题

条新动态, 点击查看
fish

fish 回答了问题 • 2015-10-22 09:54 • 18 个回复 不感兴趣

hive server2启动报错

[size=14]Cannot find hadoop installation: $HADOOP_HOME or $HADOOP_PREFIX must be set or hadoop must be in the path[/size] [size=14... 显示全部 »
[size=14]Cannot find hadoop installation: $HADOOP_HOME or $HADOOP_PREFIX must be set or hadoop must be in the path[/size] [size=14]是执行什么的时候报的错?[/size]
这是apache hive1.1.1版本的一个问题:[url]https://issues.apache.org/jira/browse/HIVE-10831[/url] 将你机器上的hive拷贝到我的环境中执行也会发生同样的问题。   换用apache hi... 显示全部 »
这是apache hive1.1.1版本的一个问题:[url]https://issues.apache.org/jira/browse/HIVE-10831[/url] 将你机器上的hive拷贝到我的环境中执行也会发生同样的问题。   换用apache hive 1.2.1 或者 CDH版本的就可以正常工作。

beeline 连接spark-thriftServer,insert语句报错

回复

啊啊啊吧 发起了问题 1 人关注 0 个回复 1149 次浏览 2020-07-20 15:10 来自相关话题

hive 不能drop表问题

回复

幻想症khe 发起了问题 1 人关注 0 个回复 1307 次浏览 2020-05-29 11:30 来自相关话题

sqoop将hive中user_action导入mysql中出错 java.lang.ClassNotFoundException: user_action

回复

piedpiper5e638482573c9 回复了问题 4 人关注 2 个回复 2450 次浏览 2020-03-24 18:53 来自相关话题

hive上使用Python udf ,报stream closed错误

回复

桦渐2019 回复了问题 3 人关注 3 个回复 4715 次浏览 2020-02-18 17:55 来自相关话题

hive3.x版本,用户不能创建表

回复

幻想症khe 发起了问题 1 人关注 0 个回复 1869 次浏览 2019-11-02 15:59 来自相关话题

sqoop使用create-hive-table命令 是否有办法直接创建分区hive表?

回复

时生君 发起了问题 1 人关注 0 个回复 1785 次浏览 2019-03-10 17:41 来自相关话题

请教一个参数 serialization.format

回复

那小子真帅 发起了问题 1 人关注 0 个回复 5220 次浏览 2018-09-11 15:59 来自相关话题

hive的load数据分布及如何提交分布事务相关问题

回复

奔跑的鱼 发起了问题 1 人关注 0 个回复 1521 次浏览 2018-07-20 17:50 来自相关话题

hive 如何多行转多列,并且行(转换前的)能对应到指定的列(转换后)

回复

开心就好_kxjh 发起了问题 0 人关注 0 个回复 3860 次浏览 2018-07-13 22:30 来自相关话题

请问oozie里执行tez引擎的hive,如何配置?

回复

兔撕鸡大爷 发起了问题 1 人关注 0 个回复 1258 次浏览 2018-03-15 15:44 来自相关话题

hive on spark执行完job还一直是running状态, 哪位知道怎么解决

回复

兔撕鸡大爷 发起了问题 1 人关注 0 个回复 1767 次浏览 2018-03-15 15:44 来自相关话题

hive中有 a&get;b?a:b这样的函数吗?不要case when

回复

兔撕鸡大爷 发起了问题 1 人关注 0 个回复 1030 次浏览 2018-03-15 15:43 来自相关话题

请教个问题:hive上怎么实现匹配上更新,没匹配上插入.最好是在现有表上操作

回复

史晓江 回复了问题 2 人关注 1 个回复 1333 次浏览 2018-03-15 15:43 来自相关话题

问一下,HIVE 0.13已经有了IN  和 EXISTS,那 LEFT SEMI JOIN 这种用法存在的意义是什么。

回复

史晓江 回复了问题 2 人关注 1 个回复 1140 次浏览 2018-03-15 15:43 来自相关话题

hive2.1,hawq怎么批量导入hdfs.上的数据?

回复

兔撕鸡大爷 发起了问题 1 人关注 0 个回复 1056 次浏览 2018-03-15 15:42 来自相关话题

请问hive中python编写udf时,需要配置什么参数吗?是需要配置什么参数还是需要赋权吗?

回复

兔撕鸡大爷 发起了问题 1 人关注 0 个回复 1150 次浏览 2018-03-15 15:42 来自相关话题

这个sql怎么优化

回复

兔撕鸡大爷 发起了问题 1 人关注 0 个回复 1108 次浏览 2018-03-15 15:41 来自相关话题

Impala与Hive的比较

唐半张 发表了文章 0 个评论 2379 次浏览 2015-10-10 09:25 来自相关话题

1. Impala架构        Impala是Cloudera在受到Google的Dremel启发下开发的实时交互SQL大数据查询工具,Impala没有再使用缓慢的Hive+MapReduce批处理,而是通过使用与商用并行关系数据库中类似的分 ...查看全部
1. Impala架构
       Impala是Cloudera在受到Google的Dremel启发下开发的实时交互SQL大数据查询工具,Impala没有再使用缓慢的Hive+MapReduce批处理,而是通过使用与商用并行关系数据库中类似的分布式查询引擎(由Query Planner、Query Coordinator和Query Exec Engine三部分组成),可以直接从HDFS或HBase中用SELECT、JOIN和统计函数查询数据,从而大大降低了延迟。其架构如图 1所示,Impala主要由Impalad, State Store和CLI组成。

  Impalad: 与DataNode运行在同一节点上,由Impalad进程表示,它接收客户端的查询请求(接收查询请求的Impalad为Coordinator,Coordinator通过JNI调用java前端解释SQL查询语句,生成查询计划树,再通过调度器把执行计划分发给具有相应数据的其它Impalad进行执行),读写数据,并行执行查询,并把结果通过网络流式的传送回给Coordinator,由Coordinator返回给客户端。同时Impalad也与State Store保持连接,用于确定哪个Impalad是健康和可以接受新的工作。在Impalad中启动三个ThriftServer: beeswax_server(连接客户端),hs2_server(借用Hive元数据), be_server(Impalad内部使用)和一个ImpalaServer服务。

        Impala State Store: 跟踪集群中的Impalad的健康状态及位置信息,由statestored进程表示,它通过创建多个线程来处理Impalad的注册订阅和与各Impalad保持心跳连接,各Impalad都会缓存一份State Store中的信息,当State Store离线后(Impalad发现State Store处于离线时,会进入recovery模式,反复注册,当State Store重新加入集群后,自动恢复正常,更新缓存数据)因为Impalad有State Store的缓存仍然可以工作,但会因为有些Impalad失效了,而已缓存数据无法更新,导致把执行计划分配给了失效的Impalad,导致查询失败。

        CLI: 提供给用户查询使用的命令行工具(Impala Shell使用python实现),同时Impala还提供了Hue,JDBC, ODBC使用接口。

2. 与Hive的关系
        Impala与Hive都是构建在Hadoop之上的数据查询工具各有不同的侧重适应面,但从客户端使用来看Impala与Hive有很多的共同之处,如数据表元数据、ODBC/JDBC驱动、SQL语法、灵活的文件格式、存储资源池等。Impala与Hive在Hadoop中的关系如图 2所示。Hive适合于长时间的批处理查询分析,而Impala适合于实时交互式SQL查询,Impala给数据分析人员提供了快速实验、验证想法的大数据分析工具。可以先使用hive进行数据转换处理,之后使用Impala在Hive处理后的结果数据集上进行快速的数据分析。

3. Impala的查询处理过程
        Impalad分为Java前端与C++处理后端,接受客户端连接的Impalad即作为这次查询的Coordinator,Coordinator通过JNI调用Java前端对用户的查询SQL进行分析生成执行计划树,不同的操作对应不用的PlanNode, 如:SelectNode, ScanNode, SortNode, AggregationNode, HashJoinNode等等。

        执行计划树的每个原子操作由一个PlanFragment表示,通常一条查询语句由多个Plan Fragment组成, Plan Fragment 0表示执行树的根,汇聚结果返回给用户,执行树的叶子结点一般是Scan操作,分布式并行执行。

        Java前端产生的执行计划树以Thrift数据格式返回给Impala C++后端(Coordinator)(执行计划分为多个阶段,每一个阶段叫做一个PlanFragment,每一个PlanFragment在执行时可以由多个Impalad实例并行执行(有些PlanFragment只能由一个Impalad实例执行,如聚合操作),整个执行计划为一执行计划树),由Coordinator根据执行计划,数据存储信息(Impala通过libhdfs与HDFS进行交互。通过hdfsGetHosts方法获得文件数据块所在节点的位置信息),通过调度器(现在只有simple-scheduler, 使用round-robin算法)Coordinator::Exec对生成的执行计划树分配给相应的后端执行器Impalad执行(查询会使用LLVM进行代码生成,编译,执行。对于使用LLVM如何提高性能这里有说明),通过调用GetNext()方法获取计算结果,如果是insert语句,则将计算结果通过libhdfs写回HDFS当所有输入数据被消耗光,执行结束,之后注销此次查询服务。

        Impala的查询处理流程大概如图3所示:

下面以一个SQL查询语句为例分析Impala的查询处理流程。如select sum(id), count(id), avg(id) from customer_small  group by id; 以此语句生成的计划为:
PLAN FRAGMENT 0
PARTITION: UNPARTITIONED
4:EXCHANGE
tuple ids: 1
PLAN FRAGMENT 1
PARTITION: HASH_PARTITIONED:
STREAM DATA SINK
EXCHANGE ID: 4
UNPARTITIONED
3:AGGREGATE
| output: SUM(), SUM()
| group by:
| tuple ids: 1
|
2:EXCHANGE
tuple ids: 1
PLAN FRAGMENT 2
PARTITION: RANDOM
STREAM DATA SINK
EXCHANGE ID: 2
HASH_PARTITIONED:
1:AGGREGATE
| output: SUM(id), COUNT(id)
| group by: id
| tuple ids: 1
|
0:SCAN HDFS
table=default.customer_small #partitions=1 size=193B
tuple ids: 0

 执行行计划树如图 4所示, 绿色的部分为可以分布式并行执行:

 
4. Impala相对于Hive所使用的优化技术
1、没有使用MapReduce进行并行计算,虽然MapReduce是非常好的并行计算框架,但它更多的面向批处理模式,而不是面向交互式的SQL执行。与MapReduce相比:Impala把整个查询分成一执行计划树,而不是一连串的MapReduce任务,在分发执行计划后,Impala使用拉式获取数据的方式获取结果,把结果数据组成按执行树流式传递汇集,减少的了把中间结果写入磁盘的步骤,再从磁盘读取数据的开销。Impala使用服务的方式避免每次执行查询都需要启动的开销,即相比Hive没了MapReduce启动时间。

2、使用LLVM产生运行代码,针对特定查询生成特定代码,同时使用Inline的方式减少函数调用的开销,加快执行效率。

3、充分利用可用的硬件指令(SSE4.2)。

4、更好的IO调度,Impala知道数据块所在的磁盘位置能够更好的利用多磁盘的优势,同时Impala支持直接数据块读取和本地代码计算checksum。

5、通过选择合适的数据存储格式可以得到最好的性能(Impala支持多种存储格式)。

6、最大使用内存,中间结果不写磁盘,及时通过网络以stream的方式传递。

5. Impala与Hive的异同
数据存储:使用相同的存储数据池都支持把数据存储于HDFS, HBase。

元数据:两者使用相同的元数据。

SQL解释处理:比较相似都是通过词法分析生成执行计划。

执行计划
Hive: 依赖于MapReduce执行框架,执行计划分成map->shuffle->reduce->map->shuffle->reduce…的模型。如果一个Query会被编译成多轮MapReduce,则会有更多的写中间结果。由于MapReduce执行框架本身的特点,过多的中间过程会增加整个Query的执行时间。
Impala: 把执行计划表现为一棵完整的执行计划树,可以更自然地分发执行计划到各个Impalad执行查询,而不用像Hive那样把它组合成管道型的map->reduce模式,以此保证Impala有更好的并发性和避免不必要的中间sort与shuffle。

数据流
Hive: 采用推的方式,每一个计算节点计算完成后将数据主动推给后续节点。
Impala: 采用拉的方式,后续节点通过getNext主动向前面节点要数据,以此方式数据可以流式的返回给客户端,且只要有1条数据被处理完,就可以立即展现出来,而不用等到全部处理完成,更符合SQL交互式查询使用。

内存使用
Hive: 在执行过程中如果内存放不下所有数据,则会使用外存,以保证Query能顺序执行完。每一轮MapReduce结束,中间结果也会写入HDFS中,同样由于MapReduce执行架构的特性,shuffle过程也会有写本地磁盘的操作。
Impala: 在遇到内存放不下数据时,当前版本1.0.1是直接返回错误,而不会利用外存,以后版本应该会进行改进。这使用得Impala目前处理Query会受到一定的限制,最好还是与Hive配合使用。Impala在多个阶段之间利用网络传输数据,在执行过程不会有写磁盘的操作(insert除外)。

调度
Hive: 任务调度依赖于Hadoop的调度策略。
Impala: 调度由自己完成,目前只有一种调度器simple-schedule,它会尽量满足数据的局部性,扫描数据的进程尽量靠近数据本身所在的物理机器。调度器目前还比较简单,在SimpleScheduler::GetBackend中可以看到,现在还没有考虑负载,网络IO状况等因素进行调度。但目前Impala已经有对执行过程的性能统计分析,应该以后版本会利用这些统计信息进行调度吧。

容错
Hive: 依赖于Hadoop的容错能力。
Impala: 在查询过程中,没有容错逻辑,如果在执行过程中发生故障,则直接返回错误(这与Impala的设计有关,因为Impala定位于实时查询,一次查询失败,再查一次就好了,再查一次的成本很低)。但从整体来看,Impala是能很好的容错,所有的Impalad是对等的结构,用户可以向任何一个Impalad提交查询,如果一个Impalad失效,其上正在运行的所有Query都将失败,但用户可以重新提交查询由其它Impalad代替执行,不会影响服务。对于State Store目前只有一个,但当State Store失效,也不会影响服务,每个Impalad都缓存了State Store的信息,只是不能再更新集群状态,有可能会把执行任务分配给已经失效的Impalad执行,导致本次Query失败。

适用面
Hive: 复杂的批处理查询任务,数据转换任务。
Impala:实时数据分析,因为不支持UDF,能处理的问题域有一定的限制,与Hive配合使用,对Hive的结果数据集进行实时分析。

6. Impala的优缺点
优点
  • 支持SQL查询,快速查询大数据。
  • 可以对已有数据进行查询,减少数据的加载,转换。
  • 多种存储格式可以选择(Parquet, Text, Avro, RCFile, SequeenceFile)。
  • 可以与Hive配合使用。
缺点
  • 不支持用户定义函数UDF。
  • 不支持text域的全文搜索。
  • 不支持Transforms。
  • 不支持查询期的容错。
  • 对内存要求高。

Hive安装之本地独立模式

唐半张 发表了文章 0 个评论 1785 次浏览 2015-10-09 10:27 来自相关话题

本地独立模式:在本地安装mysql数据库,然后hive将元数据存放在mysql中。实验前提条件:已经成功安装了hadoop集群(不管是伪分布式还是完全分布式)。 本人实验的hadoop版本为1.2.1,其安装目录为/usr/hadoop-1.2.1 ...查看全部
本地独立模式:在本地安装mysql数据库,然后hive将元数据存放在mysql中。实验前提条件:已经成功安装了hadoop集群(不管是伪分布式还是完全分布式)。
本人实验的hadoop版本为1.2.1,其安装目录为/usr/hadoop-1.2.1 ,并且已经设置了
HADOOP_HOME环境变量。具体步骤如下:
1、下载并解压 apache-hive-0.13.1-bin.tar.gz 到 /usr/hive-0.12.1 目录下,如下图:2014-7-9 23:21 上传下载附件 (33.99 KB)
2、启动 hadoop 集群 ,如下图:2014-7-9 23:23 上传下载附件 (37.45 KB)
3、启动hive,并验证是否可用,如下图:2014-7-9 23:26 上传下载附件 (13.4 KB)通过hive创建表x之后,到hadoop的hdfs中查看其生成的目录 结构:2014-7-9 23:26 上传下载附件 (13.79 KB)
4、通过yum安装 mysql 和 mysql-server,如下图:2014-7-9 23:29 上传下载附件 (29.33 KB)。。。。。。2014-7-9 23:29 上传下载附件 (19.46 KB)
5、启动mysql数据库,如下图:2014-7-9 23:31 上传下载附件 (31.56 KB)2014-7-9 23:31 上传下载附件 (16.59 KB)由于是第一次启动,打印的信息多了一些。
6、创建hive所使用的database,并创建hive用户,授予hive数据库的所有权限,其密码为“123456”,并且只能在本地连接,如下图:2014-7-9 23:34 上传下载附件 (37.49 KB)
7、通过新创建的hive用户登陆,测试用户是否创建成功,如下图:2014-7-9 23:35 上传下载附件 (46.86 KB)从打印的信息,可以看出用户hive创建成功。
8、复制 hive-0.13.1 目录下conf目录中的hive-default.xml.template 之后 并重命名为 hive-site.xml,其中只需要保留与本次实验相关的配置即可,如下所示: javax.jdo.option.ConnectionURL jdbc:mysql://localhost/hive?createDatabaseIfNotExist=true JDBC connect string for a JDBC metastore javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver Driver class name for a JDBC metastore javax.jdo.option.ConnectionUserName hive username to use against metastore database javax.jdo.option.ConnectionPassword 123456 password to use against metastore database 复制代码
9、将连接mysql数据库需要的jar包,放到hive-0.13.1目录的lib目录下。
10、启动hive,如下图:2014-7-9 23:46 上传下载附件 (8.3 KB)
11、使用hive帐户登陆到mysql数据库,查看是否生成了相关表,如下图:2014-7-9 23:48 上传下载附件 (13.08 KB)
12、到此,Hive安装之本地独立模式安装成功。PS:1、第9步连接mysql数据库需要的jar包下载:mysql-connector-java-5.1.12.zip

order by && sort by && distribute by && cluster by 区分

唐半张 发表了文章 0 个评论 1944 次浏览 2015-10-09 09:37 来自相关话题

order by 1、order by会对输入做全局排序,因此只有一个reducer(多个reducer无法保证全局有序);     只有一个reducer,会导致当输入规模较大时,需要较长的计算时间,速度会非常慢; 2、h ...查看全部
order by
1、order by会对输入做全局排序,因此只有一个reducer(多个reducer无法保证全局有序);
    只有一个reducer,会导致当输入规模较大时,需要较长的计算时间,速度会非常慢;
2、hive.mapred.mode对order by的影响
当hive.mapred.mode=nonstrict时,order by和数据库中的order by功能一致,按照某一项或者几项排序输出;
当hive.mapred.mode=strtict时,hive的order by必须要制定limit,否则执行会报错。
设置hive.mapred.mode的值
    set hive.mapred.mode=nonstrict; (default value)
    set hive.mapred.mode=strict;
    hive> select * from test order by id;
    FAILED: Error in semantic analysis: 1:28 In strict mode, if ORDER BY is specified, LIMIT must also be specified. Error encountered near token 'id'
    报错原因:在order by状态下所有数据会到一台服务器进行reducer操作,也就是说只有一个reducer,如果在数据量大的情况下会出现无法输出结果的情况,如果进行limit n,那只有 n * map number条记录而已。只有一个reduce也可以处理过来。
    sort by
    1、可以有多个reduce task;
    2、sort by不受hive.mapred.mode是否是strict、nostrict的影响;
    3、sort by的数据只能保证在同一reducer中的数据可以按照指定字段排序;
    4、使用sort by你可以指定执行的reduce的个数(set mapred.reduce.tasks=),对输出的数据再执行归并排序,即可以得到全部结果;
    distribute by
    1、按照指定的字段对数据进行划分到不同的输出reduce文件中(可以指定map到reduce端的分发key, 这样可以充分利用hadoop资源, 在多个reduce中局部按需要排序的字段进行排序)
    insert overwrite local directory '/home/hadoop/out' select * from test order by name distribute by length(name);
    此方法会根据name的长度划分到不同的reduce中,最终输出到不同的文件中;
    2、length是内建函数,也可以指定其他的函数或者使用自定义函数;
    3、distribute by与sort by连用,可发挥很好的作用:
    举例:
    select s.ymd, s.symbol, s.price_close from stocks s distribute by s.symbol sort by s.symbol asc, s.ymd desc;
    按照symbol指定到reduce,每个reduce中按照symbol升序排序,当symbol相同时,按照ymd降序;
    cluster by
    1、cluster by除了具有distribute by的功能外还兼具sort by功能;
    2、但是排序只能倒序,不能指定排序规则为asc或者desc;
    3、当distributed by col1与sort by col1连用,且跟随的字段相同时,可使用cluster by简写。
    举例:
    select s.ymd, s.symbol, s.price_close from stocks s cluster by s.symbol;
 

Hive的数据类型

唐半张 发表了文章 0 个评论 2020 次浏览 2015-10-09 09:36 来自相关话题

Hive的内置数据类型可以分为两大类:(1)、基础数据类型;(2)、复杂数据类型。其中,基础数据类型包括: TINYINT,SMALLINT,INT,BIGINT,BOOLEAN,FLOAT,DOUBLE,STRING,BINARY,TIMESTA ...查看全部
Hive的内置数据类型可以分为两大类:(1)、基础数据类型;(2)、复杂数据类型。其中,基础数据类型包括:
TINYINT,SMALLINT,INT,BIGINT,BOOLEAN,FLOAT,DOUBLE,STRING,BINARY,TIMESTAMP,DECIMAL,CHAR,VARCHAR,DATE。下面的表格列出这些基础类型所占的字节以及从什么版本开始支持这些类型。数据类型所占字节开始支持版本TINYINT1byte,-128 ~ 127SMALLINT2byte,-32,768 ~ 32,767INT4byte,-2,147,483,648 ~ 2,147,483,647BIGINT8byte,-9,223,372,036,854,775,808 ~ 9,223,372,036,854,775,807BOOLEANFLOAT4byte单精度DOUBLE8byte双精度STRINGBINARY从Hive0.8.0开始支持TIMESTAMP从Hive0.8.0开始支持DECIMAL从Hive0.11.0开始支持CHAR从Hive0.13.0开始支持VARCHAR从Hive0.12.0开始支持DATE从Hive0.12.0开始支持
复杂类型包括ARRAY,MAP,STRUCT,UNION,这些复杂类型是由基础类型组成的。
ARRAY :ARRAY类型是由一系列相同数据类型元素组成的,这些元素可以通过下标来访问比如有一个ARRAY类型的变量fruits,它是由['apple','orange','mango']组成,那么可以通过fruits[1]来访问orange; 
MAP :MAP包含key->value键值对,可以通过key来访问元素。比如”userlist”是一个map类型,其中username是key;password是value,那么我们可以通过userlist['username']来得到这个用户对应的password; 
STRUCT :STRUCT可以包含不同数据类型的元素。这些元素可以通过点的方式来得到,比如user是一个STRUCT类型,那么可以通过user.address得到这个用户的地址。 
UNION : UNIONTYPE ,他是从Hive 0.7.0开始支持的。

Hive自定义UDF和聚合函数UDAF

唐半张 发表了文章 0 个评论 1921 次浏览 2015-10-07 09:40 来自相关话题

Hive是一种构建在Hadoop上的数据仓库,Hive把SQL查询转换为一系列在Hadoop集群中运行的MapReduce作业,是MapReduce更高层次的抽象,不用编写具体的MapReduce方法。Hive将数据组织为表,这就使得HDFS上的数据有了结构, ...查看全部
Hive是一种构建在Hadoop上的数据仓库,Hive把SQL查询转换为一系列在Hadoop集群中运行的MapReduce作业,是MapReduce更高层次的抽象,不用编写具体的MapReduce方法。Hive将数据组织为表,这就使得HDFS上的数据有了结构,元数据即表的模式,都存储在名为metastore的数据库中。
可以在hive的外壳环境中直接使用dfs访问hadoop的文件系统命令。
       Hive可以允许用户编写自己定义的函数UDF,来在查询中使用。Hive中有3种UDF:
       UDF:操作单个数据行,产生单个数据行;
       UDAF:操作多个数据行,产生一个数据行。
       UDTF:操作一个数据行,产生多个数据行一个表作为输出。
用户构建的UDF使用过程如下:
      第一步:继承UDF或者UDAF或者UDTF,实现特定的方法。
      第二步:将写好的类打包为jar。如hivefirst.jar.
      第三步:进入到Hive外壳环境中,利用add jar /home/hadoop/hivefirst.jar.注册该jar文件
      第四步:为该类起一个别名,create temporary function mylength as 'com.whut.StringLength';这里注意UDF只是为这个Hive会话临时定义的。
      第五步:在select中使用mylength(); 
自定义UDF
packagewhut;
importorg.apache.commons.lang.StringUtils;
importorg.apache.hadoop.hive.ql.exec.UDF;
importorg.apache.hadoop.io.Text;
//UDF是作用于单个数据行,产生一个数据行
//用户必须要继承UDF,且必须至少实现一个evalute方法,该方法并不在UDF中
//但是Hive会检查用户的UDF是否拥有一个evalute方法
publicclassStrip extendsUDF{
privateText result=newText();
//自定义方法
publicText evaluate(Text str)
{
if(str==null)
returnnull;
result.set(StringUtils.strip(str.toString()));
returnresult;
}
publicText evaluate(Text str,String stripChars)
{
if(str==null)
returnnull;
result.set(StringUtils.strip(str.toString(),stripChars));
returnresult;
}
}

注意事项:
   1,一个用户UDF必须继承org.apache.hadoop.hive.ql.exec.UDF;
   2,一个UDF必须要包含有evaluate()方法,但是该方法并不存在于UDF中。evaluate的参数个数以及类型都是用户自己定义的。在使用的时候,Hive会调用UDF的evaluate()方法。 
自定义UDAF
该UDAF主要是找到最大值
 
packagewhut;
importorg.apache.hadoop.hive.ql.exec.UDAF;
importorg.apache.hadoop.hive.ql.exec.UDAFEvaluator;
importorg.apache.hadoop.io.IntWritable;
//UDAF是输入多个数据行,产生一个数据行
//用户自定义的UDAF必须是继承了UDAF,且内部包含多个实现了exec的静态类
publicclassMaxiNumber extendsUDAF{
publicstaticclassMaxiNumberIntUDAFEvaluator implementsUDAFEvaluator{
//最终结果
privateIntWritable result;
//负责初始化计算函数并设置它的内部状态,result是存放最终结果的
@Override
publicvoidinit() {
result=null;
}
//每次对一个新值进行聚集计算都会调用iterate方法
publicbooleaniterate(IntWritable value)
{
if(value==null)
returnfalse;
if(result==null)
result=newIntWritable(value.get());
else
result.set(Math.max(result.get(), value.get()));
returntrue;
}

//Hive需要部分聚集结果的时候会调用该方法
//会返回一个封装了聚集计算当前状态的对象
publicIntWritable terminatePartial()
{
returnresult;
}
//合并两个部分聚集值会调用这个方法
publicbooleanmerge(IntWritable other)
{
returniterate(other);
}
//Hive需要最终聚集结果时候会调用该方法
publicIntWritable terminate()
{
returnresult;
}
}
}

注意事项:
    1,用户的UDAF必须继承了org.apache.hadoop.hive.ql.exec.UDAF;
    2,用户的UDAF必须包含至少一个实现了org.apache.hadoop.hive.ql.exec的静态类,诸如常见的实现了 UDAFEvaluator。
    3,一个计算函数必须实现的5个方法的具体含义如下:
    init():主要是负责初始化计算函数并且重设其内部状态,一般就是重设其内部字段。一般在静态类中定义一个内部字段来存放最终的结果。
   iterate():每一次对一个新值进行聚集计算时候都会调用该方法,计算函数会根据聚集计算结果更新内部状态。当输入值合法或者正确计算了,则就返回true。
   terminatePartial():Hive需要部分聚集结果的时候会调用该方法,必须要返回一个封装了聚集计算当前状态的对象。
   merge():Hive进行合并一个部分聚集和另一个部分聚集的时候会调用该方法。
   terminate():Hive最终聚集结果的时候就会调用该方法。计算函数需要把状态作为一个值返回给用户。

Hive的优化

唐半张 发表了文章 0 个评论 1881 次浏览 2015-10-06 10:01 来自相关话题

Hive的优化策略及优化控制选项主要有以下几个方面: (1)列裁剪 需设置参数hive.optimize.cp=true (2)分区裁剪 需设置参数hive.optimize.pruner=true ...查看全部
Hive的优化策略及优化控制选项主要有以下几个方面:
(1)列裁剪
需设置参数hive.optimize.cp=true
(2)分区裁剪
需设置参数hive.optimize.pruner=true
(3)Joni操作
应该将条目少的表/子查询放在Join操作符的左边。原因是Join操作的Reduce阶段,Join操作符左边表中的内容会被加载到内存中,将条目少的表放在坐标可以有效减少内存溢出的几率。
(4)Map Join操作
需要设置的相关数据hive.join.emit.inter-1,hive.mapjoin.size.key,hive.map-join.cache.numrows。
(5)Group By操作
注意两点。一是Map端部分聚合。二是有数据倾斜时进行负载均衡。
(6)合并小文件
通过合并Map和Reduce的结果文件来消除小文件影响。需要设定的参数:
hive.merge.mapfiles=true,是否合并Map输入文件默认为true。
hive.merge.mapredfiles=false,设定是否合并Reduce输出文件,默认为false。
hive.merge.size.per.task=256*1000*1000,设定合并文件的大小,默认为256000000。

hive QL(HQL)简明指南

唐半张 发表了文章 0 个评论 2002 次浏览 2015-09-30 11:18 来自相关话题

1. 基本数据类型 tinyint , smallint, int, bigint, float, double, boolean: true/false, string 2. 基础运算符与函数 A IS NULL      ...查看全部
1. 基本数据类型
tinyint , smallint, int, bigint, float, double, boolean: true/false, string
2. 基础运算符与函数
A IS NULL         空
A IS NOT NULL     非空    
A LIKE B     模糊匹配
A RLIKE B     正则表达式匹配
A REGEXP B  正则表达式匹配
3. 类型转换
cast(expr as
例如:
cast('1' as BIGINT)  将字符串'1'转化成bigint型
4. 日期函数
返回值类型     名称                                        描述
string     from_unixtime(int unixtime)                    将时间戳(unix epoch秒数)转换为日期时间字符串,例如from_unixtime(0)="1970-01-01 00:00:00"
bigint     unix_timestamp()                                获得当前时间戳
bigint     unix_timestamp(string date)                    获得date表示的时间戳
bigint     to_date(string timestamp)                      返回日期字符串,例如to_date("1970-01-01 00:00:00") = "1970-01-01"
string     year(string date)                              返回年,例如year("1970-01-01 00:00:00") = 1970,year("1970-01-01") = 1970
int     month(string date)     
int     day(string date) dayofmonth(date)     
int     hour(string date)     
int     minute(string date)     
int     second(string date)     
int     weekofyear(string date)     
int     datediff(string enddate, string startdate)        返回enddate和startdate的天数的差,例如datediff('2009-03-01', '2009-02-27') = 2
int     date_add(string startdate, int days)              加days天数到startdate: date_add('2008-12-31', 1) = '2009-01-01'
int     date_sub(string startdate, int days)              减days天数到startdate: date_sub('2008-12-31', 1) = '2008-12-30'
5. 条件函数
返回值类型     名称                                                      描述
-     if(boolean testCondition, T valueTrue, T valueFalseOrNull)        当testCondition为真时返回valueTrue,testCondition为假或NULL时返回valueFalseOrNull
-     COALESCE(T v1, T v2, ...)                                        返回列表中的第一个非空元素,如果列表元素都为空则返回NULL
-     CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END                a = b,返回c;a = d,返回e;否则返回f
-     CASE WHEN a THEN b [WHEN c THEN d]* [ELSE e] END                  a 为真,返回b;c为真,返回d;否则e
例如:
(
case 
when category = '1512' then reserve_price > cast(1000 as double)
when category = '1101' then reserve_price > cast(2500 as double)
else reserve_price > cast(10 as double)
end
)
6. 常用字符串函数
返回值类型      名称                                                             描述
int     length(string A)                                                          返回字符串长度
string     reverse(string A)                                                      反转字符串
string     concat(string A, string B...)                                          合并字符串,例如concat('foo', 'bar')='foobar'。注意这一函数可以接受任意个数的参数
string     substr(string A, int start) substring(string A, int start)              返回子串,例如substr('foobar', 4)='bar',详见 [4]
string     substr(string A, int start, int len) substring(string A, int start, int len)     返回限定长度的子串,例如substr('foobar', 4, 1)='b',详见[5]
string     upper(string A) ucase(string A)                                        转换为大写
string     lower(string A) lcase(string A)                                        转换为小写
string     trim(string A)     
string     ltrim(string A)     
string     rtrim(string A) 
string     regexp_extract(string subject, string pattern, int intex)            返回使用正则表达式提取的子字串。
                                                                           例如,regexp_extract('foothebar', 'foo(.*?)(bar)', 2)='bar'。注意使用特殊字符的规则:
                                                                           使用'\s'代表的是字符's';空白字符需要使用'\\s',以此类推。
string     space(int n)                                                          返回一个包含n个空格的字符串
string     repeat(string str, int n)                                            重复str字符串n遍
string     ascii(string str)                                                    返回str中第一个字符的ascii码
string     lpad(string str, int len, string pad)                                左端补齐str到长度为len。补齐的字符串由pad指定。
string     rpad(string str, int len, string pad)                                右端补齐str到长度为len。补齐的字符串由pad指定。
array     split(string str, string pat)                                        返回使用pat作为正则表达式分割str字符串的列表。例如,split('foobar', 'o')[2] = 'bar'。                                                                          
7. 创建表
CREATE TABLE IF NOT EXISTS table_name
(
--field def
)PARTITIONED BY (pt string) 
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '...';
注意:如果不是外表部,drop table的时候会将HDFS上文件删除。
8. 创建外部表
CREATE EXTERNAL TABLE dm_all_cpv_assoc (
 --field def
 )
 PARTITIONED BY (pt string)
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY '\1'  字段分隔符
 LINES TERMINATED BY '\2'  行分隔符
 STORED AS TEXTFILE        
 LOCATION '...';
注意:在删除外部表的时候,不会删除HDFS上的关联文件。
9. 添加分区
ALTER TABLE table_name ADD PARTITION (dt='2008-08-08', country='us')
location '/path/to/us/part080808' PARTITION (dt='2008-08-09', country='us')
location '/path/to/us/part080809';
10. 删除分区
ALTER TABLE table_name DROP PARTITION (dt='2008-08-08', country='us');
11. 导入数据
a. insert overwrite table table_name partition (pt = '20110323000000')
select ... from ...
b. LOAD DATA LOCAL INPATH 'test.dat' OVERWRITE INTO table yahoo_music partition (pt=xxx);
12. 查询数据
SELECT, JOIN, LIMIT
13. 添加UDF
add jar /home/hive/jar/my_udf.jar;
create temporary function sys_date as 'com.taobao.hive.udf.UDFDateSysdate';
14. 设置reducer数量
限制最大reducer数:set hive.exec.reducers.max=15;
设置固定的reducer数:set mapred.reduce.tasks=15;

hive中的时间处理函数

唐半张 发表了文章 0 个评论 2396 次浏览 2015-09-30 11:15 来自相关话题

日期函数UNIX时间戳转日期函数: from_unixtime语法:   from_unixtime(bigint unixtime[, string format]) 返回值: string 说明: 转化UNIX时间戳(从1970-0 ...查看全部
日期函数UNIX时间戳转日期函数: from_unixtime语法:   from_unixtime(bigint unixtime[, string format])
返回值: string
说明: 转化UNIX时间戳(从1970-01-01 00:00:00 UTC到指定时间的秒数)到当前时区的时间格式
举例:
hive>   select from_unixtime(1323308943,’yyyyMMdd’) from dual;
20111208
获取当前UNIX时间戳函数: unix_timestamp语法:   unix_timestamp()
返回值:   bigint
说明: 获得当前时区的UNIX时间戳
举例:
hive>   select unix_timestamp() from dual;
1323309615
日期转UNIX时间戳函数: unix_timestamp语法:   unix_timestamp(string date)
返回值:   bigint
说明: 转换格式为“yyyy-MM-dd HH:mm:ss“的日期到UNIX时间戳。如果转化失败,则返回0。
举例:
hive>   select unix_timestamp(’2011-12-07 13:01:03′) from dual;
1323234063
指定格式日期转UNIX时间戳函数: unix_timestamp语法:   unix_timestamp(string date, string pattern)
返回值:   bigint
说明: 转换pattern格式的日期到UNIX时间戳。如果转化失败,则返回0。
举例:
hive>   select unix_timestamp(’20111207 13:01:03′,’yyyyMMdd HH:mm:ss’) from dual;
1323234063
日期时间转日期函数: to_date语法:   to_date(string timestamp)
返回值:   string
说明: 返回日期时间字段中的日期部分。
举例:
hive>   select to_date(’2011-12-08 10:03:01′) from dual;
2011-12-08
日期转年函数: year语法:   year(string date)
返回值: int
说明: 返回日期中的年。
举例:
hive>   select year(’2011-12-08 10:03:01′) from dual;
2011
hive>   select year(’2012-12-08′) from dual;
2012
日期转月函数: month语法: month   (string date)
返回值: int
说明: 返回日期中的月份。
举例:
hive>   select month(’2011-12-08 10:03:01′) from dual;
12
hive>   select month(’2011-08-08′) from dual;
8
日期转天函数: day语法: day   (string date)
返回值: int
说明: 返回日期中的天。
举例:
hive>   select day(’2011-12-08 10:03:01′) from dual;
8
hive>   select day(’2011-12-24′) from dual;
24
日期转小时函数: hour语法: hour   (string date)
返回值: int
说明: 返回日期中的小时。
举例:
hive>   select hour(’2011-12-08 10:03:01′) from dual;
10
日期转分钟函数: minute语法: minute   (string date)
返回值: int
说明: 返回日期中的分钟。
举例:
hive>   select minute(’2011-12-08 10:03:01′) from dual;
3
日期转秒函数: second语法: second   (string date)
返回值: int
说明: 返回日期中的秒。
举例:
hive>   select second(’2011-12-08 10:03:01′) from dual;
1
日期转周函数: weekofyear语法:   weekofyear (string date)
返回值: int
说明: 返回日期在当前的周数。
举例:
hive>   select weekofyear(’2011-12-08 10:03:01′) from dual;
49
日期比较函数: datediff语法:   datediff(string enddate, string startdate)
返回值: int
说明: 返回结束日期减去开始日期的天数。
举例:
hive>   select datediff(’2012-12-08′,’2012-05-09′) from dual;
213
日期增加函数: date_add语法:   date_add(string startdate, int days)
返回值: string
说明: 返回开始日期startdate增加days天后的日期。
举例:
hive>   select date_add(’2012-12-08′,10) from dual;
2012-12-18
日期减少函数: date_sub语法:   date_sub (string startdate, int days)
返回值: string
说明: 返回开始日期startdate减少days天后的日期。
举例:
hive>   select date_sub(’2012-12-08′,10) from dual;
2012-11-28
 

Hadoop Hive与Hbase整合

唐半张 发表了文章 0 个评论 1821 次浏览 2015-09-29 11:08 来自相关话题

用hbase做数据库,但由于hbase没有类sql查询方式,所以操作和计算数据非常不方便,于是整合hive,让hive支撑在hbase数据库层面 的 hql查询.hive也即 做数据仓库  1. 基于Hadoop+Hive架构对海量数 ...查看全部
用hbase做数据库,但由于hbase没有类sql查询方式,所以操作和计算数据非常不方便,于是整合hive,让hive支撑在hbase数据库层面 的 hql查询.hive也即 做数据仓库 

1. 基于Hadoop+Hive架构对海量数据进行查询:http://blog.csdn.net/kunshan_shenbin/article/details/7105319
2. HBase 0.90.5 + Hadoop 1.0.0 集成:http://blog.csdn.net/kunshan_shenbin/article/details/7209990 
本文的目的是要讲述如何让Hbase和Hive能互相访问,让Hadoop/Hbase/Hive协同工作,合为一体。 
本文测试步骤主要参考自:http://running.iteye.com/blog/898399 
当然,这边博文也是按照官网的步骤来的:http://wiki.apache.org/hadoop/Hive/HBaseIntegration 
1. 拷贝hbase-0.90.5.jar和zookeeper-3.3.2.jar到hive/lib下。 
注意:如何hive/lib下已经存在这两个文件的其他版本(例如zookeeper-3.3.1.jar),建议删除后使用hbase下的相关版本。 
2. 修改hive/conf下hive-site.xml文件,在底部添加如下内容: 
[html] view plaincopy



hive.querylog.location
/usr/local/hive/logs



hive.aux.jars.path
file:///usr/local/hive/lib/hive-hbase-handler-0.8.0.jar,file:///usr/local/hive/lib/hbase-0.90.5.jar,file:///usr/local/hive/lib/zookeeper-3.3.2.jar

注意:如果hive-site.xml不存在则自行创建,或者把hive-default.xml.template文件改名后使用。 
具体请参见:http://blog.csdn.net/kunshan_shenbin/article/details/7210020 

3. 拷贝hbase-0.90.5.jar到所有hadoop节点(包括master)的hadoop/lib下。 
4. 拷贝hbase/conf下的hbase-site.xml文件到所有hadoop节点(包括master)的hadoop/conf下。 
注意,hbase-site.xml文件配置信息参照:http://blog.csdn.net/kunshan_shenbin/article/details/7209990 
注意,如果3,4两步跳过的话,运行hive时很可能出现如下错误: 
[html] view plaincopy
org.apache.hadoop.hbase.ZooKeeperConnectionException: HBase is able to connect to ZooKeeper but the connection closes immediately.
This could be a sign that the server has too many connections (30is thedefault). Consider inspecting your ZK server logsforthat error and
then make sure you are reusing HBaseConfiguration as often as you can. See HTable's javadocformore information. at org.apache.hadoop.
hbase.zookeeper.ZooKeeperWatcher.
参考:http://blog.sina.com.cn/s/blog_410d18710100vlbq.html 

现在可以尝试启动Hive了。 
单节点启动: > bin/hive -hiveconf hbase.master=master:60000
集群启动: > bin/hive -hiveconf hbase.zookeeper.quorum=slave
如何hive-site.xml文件中没有配置hive.aux.jars.path,则可以按照如下方式启动。 
> bin/hive --auxpath /usr/local/hive/lib/hive-hbase-handler-0.8.0.jar, /usr/local/hive/lib/hbase-0.90.5.jar, /usr/local/hive/lib/zookeeper-3.3.2.jar -hiveconf hbase.zookeeper.quorum=slave
接下来可以做一些测试了。 
1.创建hbase识别的数据库: 
[sql] view plaincopy 
CREATE TABLE hbase_table_1(key int, value string)  
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'  
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val")  
TBLPROPERTIES ("hbase.table.name" = "xyz");  
hbase.table.name 定义在hbase的table名称 
hbase.columns.mapping 定义在hbase的列族 
2.使用sql导入数据 
a) 新建hive的数据表 
[sql] view plaincopy 
hive> CREATE TABLE pokes (foo INT, bar STRING);  
b) 批量插入数据 
[sql] view plaincopy 
hive> LOAD DATA LOCAL INPATH'./examples/files/kv1.txt'OVERWRITE INTO TABLE
pokes;  
c) 使用sql导入hbase_table_1 
[sql] view plaincopy 
hive> INSERT OVERWRITE TABLE hbase_table_1 SELECT * FROM pokes WHERE foo=86;  
3. 查看数据 
[sql] view plaincopy 
hive> select * from  hbase_table_1;  
这时可以登录Hbase去查看数据了. 
> /usr/local/hbase/bin/hbase shell 
hbase(main):001:0> describe 'xyz'   
hbase(main):002:0> scan 'xyz'   
hbase(main):003:0> put 'xyz','100','cf1:val','www.360buy.com' 
这时在Hive中可以看到刚才在Hbase中插入的数据了。 
hive> select * from hbase_table_1 
4. hive访问已经存在的hbase 
使用CREATE EXTERNAL TABLE 
[sql] view plaincopy 
CREATE EXTERNAL TABLE hbase_table_2(key int, value string)  
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'  
WITH SERDEPROPERTIES ("hbase.columns.mapping" = "cf1:val")  
TBLPROPERTIES("hbase.table.name" = "some_existing_table");  
多列和多列族(Multiple Columns and Families) 
1.创建数据库 
Java代码  
CREATE TABLE hbase_table_2(key int, value1 string, value2 int, value3 int)   
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'  
WITH SERDEPROPERTIES (  
"hbase.columns.mapping" = ":key,a:b,a:c,d:e"  
);  

2.插入数据 
Java代码  
INSERT OVERWRITE TABLE hbase_table_2 SELECT foo, bar, foo+1, foo+2   
FROM pokes WHERE foo=98 OR foo=100;  


这个有3个hive的列(value1和value2,value3),2个hbase的列族(a,d) 
Hive的2列(value1和value2)对应1个hbase的列族(a,在hbase的列名称b,c),hive的另外1列(value3)对应列(e)位于列族(d) 

3.登录hbase查看结构 
Java代码  
hbase(main):003:0> describe"hbase_table_2" 
DESCRIPTION ENABLED
{NAME =>'hbase_table_2', FAMILIES => [{NAME =>'a', COMPRESSION => 'Ntrue
ONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_M
EMORY =>'false', BLOCKCACHE =>'true'}, {NAME =>'d', COMPRESSION =>
'NONE', VERSIONS =>'3', TTL =>'2147483647', BLOCKSIZE =>'65536', IN
_MEMORY =>'false', BLOCKCACHE =>'true'}]}
1row(s) in1.0630seconds
4.查看hbase的数据 
Java代码  
hbase(main):004:0> scan'hbase_table_2' 
ROW COLUMN+CELL
100 column=a:b, timestamp=1297695262015, value=val_100
100 column=a:c, timestamp=1297695262015, value=101
100 column=d:e, timestamp=1297695262015, value=102
98 column=a:b, timestamp=1297695242675, value=val_98
98 column=a:c, timestamp=1297695242675, value=99
98 column=d:e, timestamp=1297695242675, value=100
2row(s) in0.0380seconds
在hive中查看 
Java代码  
hive> select * from hbase_table_2; 
OK
100 val_100101 102
98 val_98 99 100
Time taken:3.238seconds

hive 数据导入mysql

夕阳丶一抹红颜 发表了文章 0 个评论 2520 次浏览 2015-09-22 11:44 来自相关话题

下面是我将hive中数据导入到mysql中的方案,但这套方案性能速度上不是很好,想听听大家有没有更好些的方案? 目前我是在hive中有一张page_visit表,每天产生1500w数据,然后执行指定时间段内(一般三天,大概四、五千万行 ...查看全部
下面是我将hive中数据导入到mysql中的方案,但这套方案性能速度上不是很好,想听听大家有没有更好些的方案?

目前我是在hive中有一张page_visit表,每天产生1500w数据,然后执行指定时间段内(一般三天,大概四、五千万行)的数据:

insert overwrite table archive_seller_by_geo_per_day partition(partDate, shard) select sellerTokenID, partDate archiveDate, countryCode, country, countryState, countryCity, count(*) pv, count(distinct ip) uv, partDate, shard from page_visit where partDate >= ${THREE_DAY[1]} and partDate <= ${THREE_DAY[3]} group by sellerTokenID, countryCode, country, countryState, countryCity, partDate, shard having sellerTokenID>0;

到archive_seller_by_geo_per_day中的数据有1000万行,然后通过三步将archive_seller_by_geo_per_day中数据导入到mysql中:

第一步:通过hadoop fs -get 将archive_seller_by_geo_per_day的HDFS文件导入本地
hadoop fs -get "/hive/archive_seller_by_geo_per_day/partdate=${day}/shard=${sd}/*" "${mast_get}partdate=${day}/shard=${sd}"

第二步:将本地文件load到mysql数据库 hive_archive_seller_by_geo_per_day 表(临时表)中:
${SHARDDB[$sd]} <<< "LOAD DATA LOCAL INFILE '${archive_file}' INTO TABLE hive_archive_seller_by_geo_per_day CHARACTER SET utf8 fields terminated by ',' lines terminated by '\n';"

第三步:再将hive_archive_seller_by_geo_per_day表插入和更新到mysql的正式表 archive_seller_by_geo_per_day 中:
${SHARDDB[$sd]} <<< "INSERT INTO archive_seller_by_geo_per_day (sellerTokenID, archiveDate, countryCode, country, countryState, countryCity, pv, uv) SELECT sellerTokenID, archiveDate,countryCode,country,countryState,countryCity,pv, uv from hive_archive_seller_by_geo_per_day where archiveDate=${day} ON DUPLICATE KEY UPDATE pv = VALUES(pv), uv = VALUES(uv);"

Hive Tunning 补充 关于bucket

cenyuhai 发表了文章 0 个评论 1773 次浏览 2015-09-11 14:40 来自相关话题

在前面的几篇文章当中一直有一个概念bucketing不清楚到底是怎么回事。   网友南京-李先森给了他收集的一些资料,如下:   Buckets 对指定列计算 hash,根据 hash 值切分数据,目的是为了并行,每一个 Bucket ...查看全部
在前面的几篇文章当中一直有一个概念bucketing不清楚到底是怎么回事。
  网友南京-李先森给了他收集的一些资料,如下:
  Buckets 对指定列计算 hash,根据 hash 值切分数据,目的是为了并行,每一个 Bucket 对应一个文件。如将 user 列分散至 32 个 bucket,首先对 user 列的值计算 hash,对应 hash 值为 0 的 HDFS 目录为:/ warehouse /xiaojun/dt =20100801/ctry=US/part-00000;hash 值为 20 的 HDFS 目录为:/ warehouse /xiaojun/dt =20100801/ctry=US/part-00020
  这段描述是说用了bucket之后的,那为什么要用bucket,没说,本着认真负责的态度,我从网上搜索到了Oreilly《Programming.Hive》这本书,然后在里面找到了答案,现在发出来和大家分享一下。
  首先回顾一下分区,分区是切分数据的一种比较方便的方法,比较常用的就是按照日期来进行切分,bucket(中文意思就是篮子,可以放鸡蛋,哈哈)其实也是一种切分数据的方法。
  假设我们有一张日志表,我们需要按照日期和用户id来分区,目的是为了加快查询谁哪天干了什么,如下:
 CREATE TABLE weblog (url STRING, source_ip STRING)
> PARTITIONED BY (dt STRING, user_id INT);

  但是这里面用user_id去切分的话,就会产生很多很多的分区了,这些分区可大可小,这个数量是文件系统所不能承受的。
  在这种情况下,我们既想加快查询速度,又避免出现如此多的小分区,篮子(bucket)就出现了。
  具体的用法是:
  
CREATE TABLE weblog (user_id INT, url STRING, source_ip STRING)
> PARTITIONED BY (dt STRING)
> CLUSTERED BY (user_id) INTO 96 BUCKETS;

  首先按照日期分区,分区结束之后再按照user_id把日志放在96个篮子,这样同一个用户的所有日志都会在同一个篮子里面,并且一个篮子里面有好多用户的日志。
  然后我们在插入数据的时候就要注意了,我们一定要设置hive.enforce.bucketing为true。
  
hive> SET hive.enforce.bucketing = true;
hive> FROM raw_logs
> INSERT OVERWRITE TABLE weblog
> PARTITION (dt='2009-02-25')
> SELECT user_id, url, source_ip WHERE dt='2009-02-25';

  
  
  到此,bucket介绍完毕!

Hbase 学习(十一)使用hive往hbase当中导入数据

cenyuhai 发表了文章 0 个评论 1983 次浏览 2015-09-11 14:38 来自相关话题

 我们可以有很多方式可以把数据导入到hbase当中,比如说用map-reduce,使用TableOutputFormat这个类,但是这种方式不是最优的方式。   Bulk的方式直接生成HFiles,写入到文件系统当中,这种方式的效率很高。 ...查看全部
 我们可以有很多方式可以把数据导入到hbase当中,比如说用map-reduce,使用TableOutputFormat这个类,但是这种方式不是最优的方式。
  Bulk的方式直接生成HFiles,写入到文件系统当中,这种方式的效率很高。
  一般的步骤有两步
  (1)使用ImportTsv或者import工具或者自己写程序用hive/pig生成HFiles
  (2)用completebulkload把HFiles加载到hdfs上
  ImportTsv能把用Tab分隔的数据很方便的导入到hbase当中,但还有很多数据不是用Tab分隔的 下面我们介绍如何使用hive来导入数据到hbase当中。
  
  
  1.准备输入内容
  a.创建一个tables.ddl文件
  

-- pagecounts data comes from http://dumps.wikimedia.org/other/
pagecounts-raw/
-- documented http://www.mediawiki.org/wiki/Analytics/Wikistats
-- define an external table over raw pagecounts data
CREATE TABLE IF NOT EXISTS pagecounts (projectcode STRING, pagename
STRING, pageviews STRING, bytes STRING)
ROW FORMAT
DELIMITED FIELDS TERMINATED BY ' '
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '/tmp/wikistats';
-- create a view, building a custom hbase rowkey
CREATE VIEW IF NOT EXISTS pgc (rowkey, pageviews, bytes) AS
SELECT concat_ws('/',
projectcode,
concat_ws('/',
pagename,
regexp_extract(INPUT__FILE__NAME, 'pagecounts-(\\d{8}-\\d{6})\
\..*$', 1))),
pageviews, bytes
FROM pagecounts;
-- create a table to hold the input split partitions
CREATE EXTERNAL TABLE IF NOT EXISTS hbase_splits(partition STRING)
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.binarysortable.
BinarySortableSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.
HiveNullValueSequenceFileOutputFormat'
LOCATION '/tmp/hbase_splits_out';
-- create a location to store the resulting HFiles
CREATE TABLE hbase_hfiles(rowkey STRING, pageviews STRING, bytes STRING)
STORED AS
INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat'
TBLPROPERTIES('hfile.family.path' = '/tmp/hbase_hfiles/w');
View Code
  b.创建HFils分隔文件,例子:sample.hql
  

-- prepate range partitioning of hfiles
ADD JAR /usr/lib/hive/lib/hive-contrib-0.11.0.1.3.0.0-104.jar;
SET mapred.reduce.tasks=1;
CREATE TEMPORARY FUNCTION row_seq AS 'org.apache.hadoop.hive.contrib.udf.
UDFRowSequence';
-- input file contains ~4mm records. Sample it so as to produce 5 input
splits.
INSERT OVERWRITE TABLE hbase_splits
SELECT rowkey FROM
(SELECT rowkey, row_seq() AS seq FROM pgc
TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rowkey) s
ORDER BY rowkey
LIMIT 400) x
WHERE (seq % 100) = 0
ORDER BY rowkey
LIMIT 4;
-- after this is finished, combined the splits file:
dfs -cp /tmp/hbase_splits_out/* /tmp/hbase_splits;
View Code
  
  c.创建hfiles.hql
  

ADD JAR /usr/lib/hbase/hbase-0.94.6.1.3.0.0-104-security.jar;
ADD JAR /usr/lib/hive/lib/hive-hbase-handler-0.11.0.1.3.0.0-104.jar;
SET mapred.reduce.tasks=5;
SET hive.mapred.partitioner=org.apache.hadoop.mapred.lib.
TotalOrderPartitioner;
SET total.order.partitioner.path=/tmp/hbase_splits;
-- generate hfiles using the splits ranges
INSERT OVERWRITE TABLE hbase_hfiles
SELECT * FROM pgc
CLUSTER BY rowkey;
View Code
  
  2.导入数据
  注意:/$Path_to_Input_Files_on_Hive_Client是hive客户端的数据存储目录
  
mkdir /$Path_to_Input_Files_on_Hive_Client/wikistats
wget http://dumps.wikimedia.org/other/pagecounts-raw/2008/2008-10/
pagecounts-20081001-000000.gz
hadoop fs -mkdir /$Path_to_Input_Files_on_Hive_Client/wikistats
hadoop fs -put pagecounts-20081001-000000.
gz /$Path_to_Input_Files_on_Hive_Client/wikistats/

 
  3.创建必要的表
  注意:$HCATALOG_USER是HCatalog服务的用户(默认是hcat)
$HCATALOG_USER-f /$Path_to_Input_Files_on_Hive_Client/tables.ddl

  执行之后,我们会看到如下的提示:
  
OK
Time taken: 1.886 seconds
OK
Time taken: 0.654 seconds
OK
Time taken: 0.047 seconds
OK
Time taken: 0.115 seconds


  
  4.确认表已经正确创建
  执行以下语句
  
$HIVE_USER-e "select * from pagecounts limit 10;"

  
  执行之后,我们会看到如下的提示:
  
...
OK
aa Main_Page 4 41431
aa Special:ListUsers 1 5555
aa Special:Listusers 1 1052

  再执行
  
$HIVE_USER-e "select * from pgc limit 10;"

  执行之后,我们会看到如下的提示:
  
...
OK
aa/Main_Page/20081001-000000 4 41431
aa/Special:ListUsers/20081001-000000 1 5555
aa/Special:Listusers/20081001-000000 1 1052
...

  
  5.生成HFiles分隔文件
  
$HIVE_USER-f /$Path_to_Input_Files_on_Hive_Client/sample.hql
hadoop fs -ls /$Path_to_Input_Files_on_Hive_Client/hbase_splits

  
  为了确认,执行以下命令
hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.2.0.1.
3.0.0-104.jar -libjars /usr/lib/hive/lib/hive-exec-0.11.0.1.3.0.0-104.
jar -input /tmp/hbase_splits -output /tmp/hbase_splits_txt -inputformat
SequenceFileAsTextInputFormat

  执行之后,我们会看到如下的提示:
  
...
INFO streaming.StreamJob: Output: /tmp/hbase_splits_txt

  再执行这一句
  
hadoop fs -cat /tmp/hbase_splits_txt/*

  执行之后,我们会看到类似这样的结果
  
1 61 66 2e 71 2f 4d 61 69 6e 5f 50 61 67 65 2f 32 30 30 38 31 30 30 31 2d 30
30 30 30 30 30 00 (null)
01 61 66 2f 31 35 35 30 2f 32 30 30 38 31 30 30 31 2d 30 30 30 30 30 30 00
(null)
01 61 66 2f 32 38 5f 4d 61 61 72 74 2f 32 30 30 38 31 30 30 31 2d 30 30 30
30 30 30 00 (null)
01 61 66 2f 42 65 65 6c 64 3a 31 30 30 5f 31 38 33 30 2e 4a 50 47 2f 32 30
30 38 31 30 30 31 2d 30 30 30 30 30 30 00 (null)


  
  7.生成HFiles
HADOOP_CLASSPATH=/usr/lib/hbase/hbase-0.94.6.1.3.0.0-104-security.jar hive -f /$Path_to_Input_Files_on_Hive_Client/hfiles.hql

  
  以上内容是hdp的用户手册中推荐的方式,然后我顺便也从网上把最后的一步的命令格式给找出来了
  
hadoop jar hbase-VERSION.jar completebulkload /user/todd/myoutput mytable

 
  

Hive Tuning(五) 标准调优清单

cenyuhai 发表了文章 0 个评论 1654 次浏览 2015-09-11 14:28 来自相关话题

=mediumHive的标准调优清单,我们可以对照着来做我们的查询优化! =medium ...查看全部
=mediumHive的标准调优清单,我们可以对照着来做我们的查询优化!
=medium




Hive Tunning(三) 最佳实践

cenyuhai 发表了文章 0 个评论 1690 次浏览 2015-09-11 14:28 来自相关话题

=medium在上一讲的基础上,我们来做来一个实际的例子来展示如何在实操中进行高效的hive查询作业。=medium(1)首先我们建立一个表 =mediumCREATE EXTERNAL TABLE pos_staging( txnid ...查看全部
=medium在上一讲的基础上,我们来做来一个实际的例子来展示如何在实操中进行高效的hive查询作业。=medium(1)首先我们建立一个表
=mediumCREATE EXTERNAL TABLE pos_staging(
txnid STRING,
txntime STRING,
givenname STRING,
lastname STRING,
postalcode STRING,
storeid STRING,
indl STRING,
productid STRING,
purchaseamount FLOAT,
creditcard STRING
)ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
LOCATION '/user/hdfs/staging_data/pos_staging';
=medium我们建立一张外部表是为了初始化或者加载mapreduce或者pig作业产生的元数据,然后我们自己建立一张优化的表。=medium(2)建立调优表的时候,我们就要考虑使用哪种分区模式,比如按时间分区。
=medium


=medium以下是两个关于动态分区的参数:
=medium所有节点的动态分区的最大数以及每个节点的动态分区的最大数
=mediumhive.exec.max.dynamic.partitions=1000
hive.exec.max.dynamic.partitions.pernode=100



=medium(3)建立调优表
=mediumCREATE TABLE fact_pos
(
txnid STRING,
txntime STRING,
givenname STRING,
lastname STRING,
postalcode STRING,
storeidSTRING,
indl STRING,
productid STRING,
=mediumpurchaseamountFLOAT,
creditcardSTRING
) PARTITIONED BY (part_dt STRING)!
CLUSTERED BY (txnid)
SORTED BY (txnid)
INTO 24 BUCKETS
STORED AS ORC tblproperties("orc.compress"="SNAPPY");
=mediumCLUSTERED 和SORTED 使用都是同一个字段,它就是连接的时候需要使用的字段。
=mediumBUCKETS也出现了,前面一直不理解的概念,现在出现了还分了24个。



=medium(4)把数据插入到调优表中
=mediumFROM pos_staging
INSERT OVERWRITE TABLE fact_pos
PARTITION (part_dt)
SELECT
txnid,
txntime,
givenname,
lastname,
postalcode,
storeid,
indl,
productid,
purchaseamount,
creditcard,
concat(year(txntime),month(txntime)) as part_dt
SORT BY productid;
=medium语句中使用了前面教的自动分区的语句,按照年月自动分区。



=medium
=mediumhadoop fs-setrep-R –w 5 /apps/hive/warehouse/fact_pos
=medium上面的命令当中是个hdfs中存数的fact_pos表增加备份,因为hdfs的数据是存得很分散的,增加备份因为会使得节点上的数据增多,然后查询的时候,hive
=medium从本地直接就可以获取到的数据的几率提高,增快查询速度。
=medium当然考虑到空间的问题,可以减少一下备份的数量。
=medium上述流程我们也可以把它放到oozie中自动执行。



=medium
=medium。。。又一个熟悉的词出现了。
=medium在hdfs-site.xml或者Ambari settings for HDFS, 设置完要重启。
dfs.block.local-path-access.user=hdfs
dfs.client.read.shortcircuit=true
dfs.client.read.shortcircuit.skip.checksum=false
=medium
=medium开启了这个东东有什么作用呢?当数据块在本地的时候,它可以不需要开启一个端口来读,可以直接访问,就像图中的闪电那样。



=medium(5)执行查询
=mediumset hive.mapred.reduce.tasks.speculative.execution=false;
set io.sort.mb=300;
set mapreduce.reduce.input.limit=-1;
select productid, ROUND(SUM(purchaseamount),2) as total
from fact_pos
where part_dt between ‘201210’ and ‘201212’
group by productid
order by total desc
limit 100;
=medium查询之前先对查询设置相应的运行参数。


Hive Tuning(四) 从查询计划看hive.auto.convert.join的好处

cenyuhai 发表了文章 0 个评论 2617 次浏览 2015-09-11 14:23 来自相关话题

今天我们来讲一下如何看懂Hive的查询计划。 hive的执行计划包括三部分 – Abstract syntax tree – 可以直接忽略 – Stage dependencies – 依赖 – Stage pl ...查看全部
今天我们来讲一下如何看懂Hive的查询计划。
hive的执行计划包括三部分
– Abstract syntax tree – 可以直接忽略
– Stage dependencies – 依赖
– Stage plans – hive如何执行任务的信息.

 


下面还是以一个案例作为说明



设置自动连接为false的话,要走5步。
 



4 Map Reduces tells you something is not right.

Stage: Stage-1
Map Reduce
Stage: Stage-2
Map Reduce
Stage: Stage-3
Map Reduce
Stage: Stage-4
Map Reduce



设置自动连接为true就只有4步

Only 2 Map Reduces

Stage: Stage-8
Map Reduce
Stage: Stage-4
Map Reduce



 



hive直接就加载了要做连接的表,client和path表,其中client表做了过滤,剩下的map/reduce是用来连接和排序的。


Hive Tunning(二)优化存储

cenyuhai 发表了文章 0 个评论 1965 次浏览 2015-09-11 14:22 来自相关话题

接着上一章我们讲的hive的连接策略,现在我们讲一下hive的数据存储。 下面是hive支持的数据存储格式,有我们常见的文本,JSON,XML,这里我们主要讲一下ORCFile。 Built-in Formats: – OR ...查看全部
接着上一章我们讲的hive的连接策略,现在我们讲一下hive的数据存储。
下面是hive支持的数据存储格式,有我们常见的文本,JSON,XML,这里我们主要讲一下ORCFile。
Built-in Formats:
– ORCFile
– RCFile
– Avro
– Delimited Text
– Regular Expression
– S3 Logfile
– Typed Bytes
• 3
rd
-Party Addons:
– JSON
– XML


这种格式非常适合HDFS,它有以下的优点:
•高压缩
– 高压缩比.
– 字典编码.
•高性能
– 自带索引.
– 高效的精确查询.
• 灵活的数据模型
– 支持所有的hive类型,包括maps.


 





从图中可以看出,orc格式的文件存储大小仅为文本的30%左右,比gz格式的都小,采用zlib压缩的话,更小,仅有22%左右。
使用orc格式存储的方式很简单,在建表的时候STORED AS orc即可
CREATE TABLE sale (
id int, timestamp timestamp,
productsk int, storesk int,
amount decimal, state string
) STORED AS orc;



相关参数,自己看,不解释了。



不适用zlib压缩的话,查询速度更快,但是也大一些。
CREATE TABLE sale (
id int, timestamp timestamp,
productsk int, storesk int,
amount decimal, state string
) STORED AS orc tblproperties ("orc.compress"="NONE");



下面是加快hive查询的一些可以参考的方式:



(1)跳跃读取:采用分区Partition或者使用Skew,才用ORCFile二次排序。
(2)在连接字段上排序并且bucket,在连接小表的时候采用Broadcast joins。
(3)对经常使用的数据,增加备份因子,激活Short-Circuit Read,采用Tez。




当某个表很大的时候,我们往往要对其进行分区,比如按照时间来分区。
CREATE TABLE sale (
id int, amount decimal, ...
) partitioned by (xdate string, state string);
其中的xdate和state是不存在的列,你可以认为它们是虚拟列,虚拟列会在HDFS当中建立子目录,属于分区的记录会存在那个子文件夹中。
使用分区之后,在查询和插入的时候,就必须带有至少一个分区字段,否则查询将会失败。
INSERT INTO sale (xdate=‘2013-03-01’, state=‘CA’)
SELECT * FROM staging_table
WHERE xdate = ‘2013-03-01’ AND state = ‘CA’;
如果你想一次查出所有数据,不想受这个限制的话,你可以 hive.exec.dynamic.partition.mode参数置为nonstrict。
set hive.exec.dynamic.partition.mode=nonstrict;

INSERT INTO sale (xdate, state)
SELECT * FROM staging_table;
有时候插入数据的时候,我们需要重新排序,在select 语句里面把虚拟列也加上,这样会有排序的效果。
INSERT INTO sale (xdate, state=‘CA’)
SELECT
id, amount, other_stuff,
xdate, state
FROM staging_table
WHERE state = ‘CA’;


 


下面我们讲一下常用的hive查询调优
 

mapred.max.split.size和mapred.min.split.size
min 太大-> 太少mapper.
max 太小-> mapper太多.
Example:
– set mapred.max.split.size=100000000;
– set mapred.min.split.size=1000000;
当然也有个原则,当mappers出现抢占资源的时候,才调整这些参数。
 

– set io.sort.mb=100;

• All the time:
– set hive.optmize.mapjoin.mapreduce=true;
– set hive.optmize.bucketmapjoin=true;
– set hive.optmize.bucketmapjoin.sortedmerge=true;
– set hive.auto.convert.join=true;
– set hive.auto.convert.sortmerge.join=true;
– set hive.auto.convert.sortmerge.join.nocondi1onaltask=true;
• When bucketing data:
– set hive.enforce.bucketing=true;
– set hive.enforce.sortng=true;
• These and more are set by default in HDP 1.3(明显的广告词,说明HDP比较强大,已经给我们设置好了).
这些参数我们可以在hive-site.xml中查询到,我们也可以在shell中查询。
(1)查询所有的参数

(2)查询某一个参数

(3)修改参数

 


Hive Tuning(一) 连接策略

cenyuhai 发表了文章 0 个评论 1979 次浏览 2015-09-11 14:21 来自相关话题

群里共享了一本hive调优的书记,名叫《Hive Tunning》,就忍不住开始看了,也顺便记录一下自己学到的东西,备忘! 首先,这是hive的数据摘要,别问我什么意思,我也没看懂。 好,我们正式开始,首先是 ...查看全部
群里共享了一本hive调优的书记,名叫《Hive Tunning》,就忍不住开始看了,也顺便记录一下自己学到的东西,备忘!
首先,这是hive的数据摘要,别问我什么意思,我也没看懂。


好,我们正式开始,首先是连接的问题,我们都知道连接耗时长,但是连接无法避免,那hive又是怎么处理连接操作的呢?
下面是hive的连接策略



hive有三种类型的连接策略
(1)Shuffle Join : 这种类型的是通过map/reduce 来实现连接操作的,优点是不需要考虑数据的大小和分布,缺点是消耗大量的资源而且是最慢的。
(2)Broadcast Join:这种类型的方式是把一个小的表在所有节点中加载到内容当中,然后用mapper来扫描大表进行连接,速度非常快,但是其中一个表必须可以加载到内存当中。
(3)Sort-Merge-Bucket Join:mapper可以协同定位keys去进行高效的连接,速度很快,不需要考虑表的大小,但是数据必须先排序和整理。



Shuffle Join:



我们以这个销售订单这个例子来做演示,可以看到其中的图,它们是通过customer.id=order.cid来做连接的,首先Map把两个表中的数据处理成以连接字段为key,其他字段为value的作为输出,然后把两个表中id和cid相同的数据传递到同一个reducer中,从网络使用率上看是很奢侈的。


 


Broadcast Join:



这种方式比较复杂一点,首先它使用足够小的维度表来存放在所有的节点当中,单独扫描大表,然后根据模式匹配进行连接。



当两个表都很大的情况下:

第一步,首先按照连接字段排序,所有可能的匹配的都在硬盘的同一块区域。



第二步,把所有的值都移到同一个节点下面进行等值连接,不需要再进行shuffle。




Bucketing:
– Hash partition values into a configurable number of buckets.
– Usually coupled with sorting.
• Skews:
– Split values out into separate files.
– Used when certain values are frequently seen.
• Replication Factor:
– Increase replication factor to accelerate reads.
– Controlled at the HDFS layer.
• Sorting:
– Sort the values within given columns.
– Greatly accelerates query when used with ORCFilefilter pushdown.
这里就不解释了,自己看吧,这和下面的图是对应的,针对不同大小的表,hive有多种处理模式。

(1)小表,经常要用的数据,建议使用replication factor,可能是缓存的意思,具体是什么意思,等我清楚了再给大家解释。
(2)任意大小的表,有很多要精确查询的列,建议先按照最常使用的列进行排序再进行查询。
(3)大表但是又需要和另外的的大表做连接,建议先通过连接列做排序和bucket。
(4)大表,但只是利用到其中某些常用的值,可以把常用的值弄个单独的skew中。
(5)大表但是有一些自然边界,比如日期的,建议利用日期进行分区。
Map Join开启
我们可以启用连接自动转换来帮助我们转换,在执行语句之前设置一下即可。它是经过优化的Map Join,无reducer。
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask = true;
set hive.auto.convert.join.noconditionaltask.size = 10000;
Skew Join
真实数据中数据倾斜是一定的, hadoop 中默认是使用
hive.exec.reducers.bytes.per.reducer = 1000000000
也就是每个节点的reduce 默认是处理1G大小的数据,如果你的join 操作也产生了数据倾斜,那么你可以在hive 中设定
set hive.optimize.skewjoin = true;
set hive.skewjoin.key = skew_key_threshold (default = 100000)
hive 在运行的时候没有办法判断哪个key 会产生多大的倾斜,所以使用这个参数控制倾斜的阈值,如果超过这个值,新的值会发送给那些还没有达到的reduce, 一般可以设置成你
(处理的总记录数/reduce个数)的2-4倍都可以接受.
倾斜是经常会存在的,一般select 的层数超过2层,翻译成执行计划多于3个以上的mapreduce job 都很容易产生倾斜,建议每次运行比较复杂的sql 之前都可以设一下这个参数. 如果你不知道设置多少,可以就按官方默认的1个reduce 只处理1G 的算法,那么 skew_key_threshold = 1G/平均行长. 或者默认直接设成250000000 (差不多算平均行长4个字节)
Sort-Merge-Bucket Join
如果表已经排序并且已经bucketed,可以启用SMB joins
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
set hive.auto.convert.sortmerge.join.bigtable.selection.policy = org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ;


hadoop 1.1.2和 hive 0.10 和hbase 0.94.9整合

cenyuhai 发表了文章 0 个评论 1592 次浏览 2015-09-11 14:17 来自相关话题

今天弄了一下hive0.10和hbase0.94.9整合,需要设置的并不多,但是也遇到了一些问题。   1.复制jar包   拷贝hbase-0.94.9.jar,zookeeper-3.4.5.jar,protobuf-java-2. ...查看全部
今天弄了一下hive0.10和hbase0.94.9整合,需要设置的并不多,但是也遇到了一些问题。
  1.复制jar包
  拷贝hbase-0.94.9.jar,zookeeper-3.4.5.jar,protobuf-java-2.4.0a.jar到hive/lib下,删掉lib下面旧版的jar包。
  拷贝hbase-0.94.9.jar到所有hadoop节点的lib文件夹下面,拷贝hbase/confi的hbase-site.xml文件拷贝到所有的hadoop节点conf文件夹下。
  2.修改hive-site.xml配置文件,添加以下内容

     
hive.querylog.location
/usr/hive/logs


hive.aux.jars.path
file:///usr/hive/lib/hive-hbase-handler-0.10.0.jar,file:///usr/hive/lib/hbase-0.94.9.jar,file:///usr/hive/lib/zookeeper-3.4.5.jar,file:///usr/hive/lib/protobuf-java-2.4.0a.jar
View Code
  
  
  3.启动hive,hive -hiveconf hbase.zookeeper.quorum=node1,node2,node3
   实际上只需要填一个即可,我只填了一个。
  4.开始测试,建一个表试验。
CREATE TABLE hbase_table1(key int, value1 string, value2 int, value3 int) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
"hbase.columns.mapping" = ":key,cf1:value1,cf1:value2,cf2:value3"
)TBLPROPERTIES("hbase.table.name" = "table1");
 
TBLPROPERTIES参数是可选的,如果不写的话,就默认是hive和hbase中的表名称一致
  5.打开hbase看看,使用describe “table1”来查询一下,发一个我真实建立的表吧。


hbase(main):001:0> describe "wdp"
DESCRIPTION ENABLED
'wdp', {NAME => 'cf', DATA_BLOCK_ENCODING => 'NONE' true
, BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0',
VERSIONS => '3', COMPRESSION => 'NONE', MIN_VERSION
S => '0', TTL => '2147483647', KEEP_DELETED_CELLS =
> 'false', BLOCKSIZE => '65536', IN_MEMORY => 'fals
e', ENCODE_ON_DISK => 'true', BLOCKCACHE => 'true'}
1 row(s) in 1.1980 seconds

hbase(main):002:0>
View Code
   好了,就这样啦,我还没插入数据测试呢,就先这样吧。
  最后发一个它官方的文档地址,想了解更多的到这个网站上面看看
  https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration

hive thrift 开机启动

cenyuhai 发表了文章 0 个评论 1603 次浏览 2015-09-11 14:16 来自相关话题

这个问题困扰我很久,之前redis的时候,也出现了这个问题,从网上找的thrift脚本没有一个好使的,最后通过修改/etc/rc.d/rc.local来执行一些非服务的命令,这样子就不需要像写服务那样写start,stop方法啦,不过修改这个配置文件要小心,命 ...查看全部
这个问题困扰我很久,之前redis的时候,也出现了这个问题,从网上找的thrift脚本没有一个好使的,最后通过修改/etc/rc.d/rc.local来执行一些非服务的命令,这样子就不需要像写服务那样写start,stop方法啦,不过修改这个配置文件要小心,命令里面不要包含阻塞式的命令,否则开机进不了界面,就悲剧了,我就这样玩挂了一次系统。
  经过一顿挣扎之后,终于找到解决的方法了。
  su - cenyuhai -c "hive --service hiveserver &"
  上面的这句命令的意思是用账号cenyuhai来执行 "hive --service hiverserver" 这个命令, 命令结束后的&意思是在后台运行,则该命令不会阻塞系统,否则就悲剧了,发生进不去系统的问题。
  分享结束,收工!
Hive是Hadoop上事实的、功能最强大的SQL和元数据标准。也是Hadoop的MR, Spark, Tez作业的常用提交工具。