Hbase 学习(一) hbase配置文件同步

最近在狂啃hadoop的书籍,这部《hbase:权威指南》就进入我的视野里面了,啃吧,因为是英文的书籍,有些个人理解不对的地方,欢迎各位拍砖。HDFS和Hbase配置同步
hbase的配置中有一些和hdfs关联的配置,当hdfs中修改了,但是hbase中修改了,hbase中是不会知道的,比如dfs.replication,有时候我们想增加备份的数量,在hdfs中设置为5了,但是hbase中默认为3,这样hbase还是只保存3份。
那么有什么方法可以使他们的配置文件同步,有三种方法:
(1)在hbase-env.sh的HBASE_CLASSPATH环境变量增加HADOOP_CONF_DIR。
(2)在${HBASE_HOME}/conf下放一份hadoop的配置文件hdfs-site.xml (or hadoop-site.xml)。
(3)直接在hbase-site.xml中添加。
从上述三种方法当中,目测是第一种方法比较靠谱,当然要同步配置文件还有别的方法,后续再进行介绍。Hbase配置文件同步的脚本
以下这两个脚本都可以实现集群的hbase配置文件同步,第二个还带有删除之前配置文件的方法,用的时候注意一些
#!/bin/bash 
# Rsyncs HBase files across all slaves. Must run on master.
Assumes
# all files are located in /usr/local
if [ "$#" != "2" ]; then
echo "usage: $(basename $0) <dir-name> <ln-name>"
echo " example: $(basename $0) hbase-0.1 hbase"
exit 1
fi
SRC_PATH="/usr/local/$1/conf/regionservers"
for srv in $(cat $SRC_PATH); do
echo "Sending command to $srv...";
rsync -vaz --exclude='logs/*' /usr/local/$1
$srv:/usr/local/
ssh $srv "rm -fR /usr/local/$2 ; ln -s /usr/local/$1
/usr/local/$2"
done
echo "done."



另一个脚本,同样的功能,这个更简单些
#!/bin/bash 
# Rsync's HBase config files across all region servers. Must
run on master.
for srv in $(cat /usr/local/hbase/conf/regionservers); do
echo "Sending command to $srv...";
rsync -vaz --delete --exclude='logs/*' /usr/local/hadoop/
$srv:/usr/local/hadoop/
rsync -vaz --delete --exclude='logs/*' /usr/local/hbase/
$srv:/usr/local/hbase/
done
echo "done."

 
 
继续阅读 »
最近在狂啃hadoop的书籍,这部《hbase:权威指南》就进入我的视野里面了,啃吧,因为是英文的书籍,有些个人理解不对的地方,欢迎各位拍砖。HDFS和Hbase配置同步
hbase的配置中有一些和hdfs关联的配置,当hdfs中修改了,但是hbase中修改了,hbase中是不会知道的,比如dfs.replication,有时候我们想增加备份的数量,在hdfs中设置为5了,但是hbase中默认为3,这样hbase还是只保存3份。
那么有什么方法可以使他们的配置文件同步,有三种方法:
(1)在hbase-env.sh的HBASE_CLASSPATH环境变量增加HADOOP_CONF_DIR。
(2)在${HBASE_HOME}/conf下放一份hadoop的配置文件hdfs-site.xml (or hadoop-site.xml)。
(3)直接在hbase-site.xml中添加。
从上述三种方法当中,目测是第一种方法比较靠谱,当然要同步配置文件还有别的方法,后续再进行介绍。Hbase配置文件同步的脚本
以下这两个脚本都可以实现集群的hbase配置文件同步,第二个还带有删除之前配置文件的方法,用的时候注意一些
#!/bin/bash 
# Rsyncs HBase files across all slaves. Must run on master.
Assumes
# all files are located in /usr/local
if [ "$#" != "2" ]; then
echo "usage: $(basename $0) <dir-name> <ln-name>"
echo " example: $(basename $0) hbase-0.1 hbase"
exit 1
fi
SRC_PATH="/usr/local/$1/conf/regionservers"
for srv in $(cat $SRC_PATH); do
echo "Sending command to $srv...";
rsync -vaz --exclude='logs/*' /usr/local/$1
$srv:/usr/local/
ssh $srv "rm -fR /usr/local/$2 ; ln -s /usr/local/$1
/usr/local/$2"
done
echo "done."



另一个脚本,同样的功能,这个更简单些
#!/bin/bash 
# Rsync's HBase config files across all region servers. Must
run on master.
for srv in $(cat /usr/local/hbase/conf/regionservers); do
echo "Sending command to $srv...";
rsync -vaz --delete --exclude='logs/*' /usr/local/hadoop/
$srv:/usr/local/hadoop/
rsync -vaz --delete --exclude='logs/*' /usr/local/hbase/
$srv:/usr/local/hbase/
done
echo "done."

 
  收起阅读 »

hadoop 参数

看《Hadoop:权威指南》的时候收集了书上写的一些需要优化的参数,记录了一下子,给大家分享一下吧。


1.mapred.task.timeout
任务超时时间,默认是10分钟

2.mapred.map.max.attempts mapred.reduce.max.attempts
默认任务失败重复次数为4

3.mapred.max.map.failures.percent mapred.reduce.map.failures.percent
不触发错误的失败的最大百分比

4.mapred.jobtracker.taskScheduler
作业调度算法设置,默认是FIFO

5.io.sort.mb io.sort.spill.percent
缓冲区大小默认为100MB,以及缓冲区阀值默认为0.8,超过80%就保存到硬盘

6.io.sort.factor
默认为10,一次只能合并10个溢出文件

7.mapred.compress.map.output
默认为false,不压缩输出文件
压缩算法由mapred.map.output.compression.codec指定

8.tracker.http.Threads
tasktracker用于默认为40的啦

9.mapred.reduce.parallel.copies
reduce复制map输出的线程数,默认是5个

10.mapred.inmem.merge.threshold
控制map输出阀值,如果reduce函数的内存需求不大,那么设置为0
mapred.job.reduce.input.buffer.percent设置为1,可以带来性能的提升。

11.io.sort.factor
合并因子,默认为10,意思是一次合并多少个Map输出
如果Map输出为50个文件,则每次把10个合并成一个文件,最后有5个中间文件。

12.mapred.child.java.opts
设置任务节点的内存大小

13.io.file.buffer.size
缓冲区,默认为4KB的缓冲区

14.mapred.map.tasks.speculative.execution mapred.reduce.tasks.speculative.execution
推测执行默认值为true,

15.mapred.job.reuse.jvm.num.tasks
默认值为1,指定作业的jvm执行任务的最大数,如果为-1,则同一作业中的任务可以共享一个JVM,数量不限

16.mapred.linerecordreader.maxlength
数据行长度的最大值,防止因为内存溢出导致的错误

17.SkipBadRecord开启跳跃模式,跳过失败的坏记录
mapred.map.max.attemps mapred.reduce.attemps 一次只能跳过一个错误记录

18.关闭安全模式
hadoop dfsadmin -safemode leave
继续阅读 »
看《Hadoop:权威指南》的时候收集了书上写的一些需要优化的参数,记录了一下子,给大家分享一下吧。


1.mapred.task.timeout
任务超时时间,默认是10分钟

2.mapred.map.max.attempts mapred.reduce.max.attempts
默认任务失败重复次数为4

3.mapred.max.map.failures.percent mapred.reduce.map.failures.percent
不触发错误的失败的最大百分比

4.mapred.jobtracker.taskScheduler
作业调度算法设置,默认是FIFO

5.io.sort.mb io.sort.spill.percent
缓冲区大小默认为100MB,以及缓冲区阀值默认为0.8,超过80%就保存到硬盘

6.io.sort.factor
默认为10,一次只能合并10个溢出文件

7.mapred.compress.map.output
默认为false,不压缩输出文件
压缩算法由mapred.map.output.compression.codec指定

8.tracker.http.Threads
tasktracker用于默认为40的啦

9.mapred.reduce.parallel.copies
reduce复制map输出的线程数,默认是5个

10.mapred.inmem.merge.threshold
控制map输出阀值,如果reduce函数的内存需求不大,那么设置为0
mapred.job.reduce.input.buffer.percent设置为1,可以带来性能的提升。

11.io.sort.factor
合并因子,默认为10,意思是一次合并多少个Map输出
如果Map输出为50个文件,则每次把10个合并成一个文件,最后有5个中间文件。

12.mapred.child.java.opts
设置任务节点的内存大小

13.io.file.buffer.size
缓冲区,默认为4KB的缓冲区

14.mapred.map.tasks.speculative.execution mapred.reduce.tasks.speculative.execution
推测执行默认值为true,

15.mapred.job.reuse.jvm.num.tasks
默认值为1,指定作业的jvm执行任务的最大数,如果为-1,则同一作业中的任务可以共享一个JVM,数量不限

16.mapred.linerecordreader.maxlength
数据行长度的最大值,防止因为内存溢出导致的错误

17.SkipBadRecord开启跳跃模式,跳过失败的坏记录
mapred.map.max.attemps mapred.reduce.attemps 一次只能跳过一个错误记录

18.关闭安全模式
hadoop dfsadmin -safemode leave 收起阅读 »

Hive Tuning(四) 从查询计划看hive.auto.convert.join的好处

今天我们来讲一下如何看懂Hive的查询计划。
hive的执行计划包括三部分
– Abstract syntax tree – 可以直接忽略
– Stage dependencies – 依赖
– Stage plans – hive如何执行任务的信息.

 

下面还是以一个案例作为说明


设置自动连接为false的话,要走5步。
 


4 Map Reduces tells you something is not right.

Stage: Stage-1
Map Reduce
Stage: Stage-2
Map Reduce
Stage: Stage-3
Map Reduce
Stage: Stage-4
Map Reduce


设置自动连接为true就只有4步

Only 2 Map Reduces

Stage: Stage-8
Map Reduce
Stage: Stage-4
Map Reduce


 


hive直接就加载了要做连接的表,client和path表,其中client表做了过滤,剩下的map/reduce是用来连接和排序的。


继续阅读 »
今天我们来讲一下如何看懂Hive的查询计划。
hive的执行计划包括三部分
– Abstract syntax tree – 可以直接忽略
– Stage dependencies – 依赖
– Stage plans – hive如何执行任务的信息.

 

下面还是以一个案例作为说明


设置自动连接为false的话,要走5步。
 


4 Map Reduces tells you something is not right.

Stage: Stage-1
Map Reduce
Stage: Stage-2
Map Reduce
Stage: Stage-3
Map Reduce
Stage: Stage-4
Map Reduce


设置自动连接为true就只有4步

Only 2 Map Reduces

Stage: Stage-8
Map Reduce
Stage: Stage-4
Map Reduce


 


hive直接就加载了要做连接的表,client和path表,其中client表做了过滤,剩下的map/reduce是用来连接和排序的。


收起阅读 »

Hive Tunning(二)优化存储

接着上一章我们讲的hive的连接策略,现在我们讲一下hive的数据存储。
下面是hive支持的数据存储格式,有我们常见的文本,JSON,XML,这里我们主要讲一下ORCFile。
Built-in Formats:
– ORCFile
– RCFile
– Avro
– Delimited Text
– Regular Expression
– S3 Logfile
– Typed Bytes
• 3
rd
-Party Addons:
– JSON
– XML

这种格式非常适合HDFS,它有以下的优点:
•高压缩
– 高压缩比.
– 字典编码.
•高性能
– 自带索引.
– 高效的精确查询.
• 灵活的数据模型
– 支持所有的hive类型,包括maps.


 


从图中可以看出,orc格式的文件存储大小仅为文本的30%左右,比gz格式的都小,采用zlib压缩的话,更小,仅有22%左右。
使用orc格式存储的方式很简单,在建表的时候STORED AS orc即可
CREATE TABLE sale (
id int, timestamp timestamp,
productsk int, storesk int,
amount decimal, state string
) STORED AS orc;


相关参数,自己看,不解释了。


不适用zlib压缩的话,查询速度更快,但是也大一些。
CREATE TABLE sale (
id int, timestamp timestamp,
productsk int, storesk int,
amount decimal, state string
) STORED AS orc tblproperties ("orc.compress"="NONE");


下面是加快hive查询的一些可以参考的方式:


(1)跳跃读取:采用分区Partition或者使用Skew,才用ORCFile二次排序。
(2)在连接字段上排序并且bucket,在连接小表的时候采用Broadcast joins。
(3)对经常使用的数据,增加备份因子,激活Short-Circuit Read,采用Tez。



当某个表很大的时候,我们往往要对其进行分区,比如按照时间来分区。
CREATE TABLE sale (
id int, amount decimal, ...
) partitioned by (xdate string, state string);
其中的xdate和state是不存在的列,你可以认为它们是虚拟列,虚拟列会在HDFS当中建立子目录,属于分区的记录会存在那个子文件夹中。
使用分区之后,在查询和插入的时候,就必须带有至少一个分区字段,否则查询将会失败。
INSERT INTO sale (xdate=‘2013-03-01’, state=‘CA’)
SELECT * FROM staging_table
WHERE xdate = ‘2013-03-01’ AND state = ‘CA’;
如果你想一次查出所有数据,不想受这个限制的话,你可以 hive.exec.dynamic.partition.mode参数置为nonstrict。
set hive.exec.dynamic.partition.mode=nonstrict;

INSERT INTO sale (xdate, state)
SELECT * FROM staging_table;
有时候插入数据的时候,我们需要重新排序,在select 语句里面把虚拟列也加上,这样会有排序的效果。
INSERT INTO sale (xdate, state=‘CA’)
SELECT
id, amount, other_stuff,
xdate, state
FROM staging_table
WHERE state = ‘CA’;


 

下面我们讲一下常用的hive查询调优
 

mapred.max.split.size和mapred.min.split.size
min 太大-> 太少mapper.
max 太小-> mapper太多.
Example:
– set mapred.max.split.size=100000000;
– set mapred.min.split.size=1000000;
当然也有个原则,当mappers出现抢占资源的时候,才调整这些参数。
 

– set io.sort.mb=100;

• All the time:
– set hive.optmize.mapjoin.mapreduce=true;
– set hive.optmize.bucketmapjoin=true;
– set hive.optmize.bucketmapjoin.sortedmerge=true;
– set hive.auto.convert.join=true;
– set hive.auto.convert.sortmerge.join=true;
– set hive.auto.convert.sortmerge.join.nocondi1onaltask=true;
• When bucketing data:
– set hive.enforce.bucketing=true;
– set hive.enforce.sortng=true;
• These and more are set by default in HDP 1.3(明显的广告词,说明HDP比较强大,已经给我们设置好了).
这些参数我们可以在hive-site.xml中查询到,我们也可以在shell中查询。
(1)查询所有的参数

(2)查询某一个参数

(3)修改参数

 


继续阅读 »
接着上一章我们讲的hive的连接策略,现在我们讲一下hive的数据存储。
下面是hive支持的数据存储格式,有我们常见的文本,JSON,XML,这里我们主要讲一下ORCFile。
Built-in Formats:
– ORCFile
– RCFile
– Avro
– Delimited Text
– Regular Expression
– S3 Logfile
– Typed Bytes
• 3
rd
-Party Addons:
– JSON
– XML

这种格式非常适合HDFS,它有以下的优点:
•高压缩
– 高压缩比.
– 字典编码.
•高性能
– 自带索引.
– 高效的精确查询.
• 灵活的数据模型
– 支持所有的hive类型,包括maps.


 


从图中可以看出,orc格式的文件存储大小仅为文本的30%左右,比gz格式的都小,采用zlib压缩的话,更小,仅有22%左右。
使用orc格式存储的方式很简单,在建表的时候STORED AS orc即可
CREATE TABLE sale (
id int, timestamp timestamp,
productsk int, storesk int,
amount decimal, state string
) STORED AS orc;


相关参数,自己看,不解释了。


不适用zlib压缩的话,查询速度更快,但是也大一些。
CREATE TABLE sale (
id int, timestamp timestamp,
productsk int, storesk int,
amount decimal, state string
) STORED AS orc tblproperties ("orc.compress"="NONE");


下面是加快hive查询的一些可以参考的方式:


(1)跳跃读取:采用分区Partition或者使用Skew,才用ORCFile二次排序。
(2)在连接字段上排序并且bucket,在连接小表的时候采用Broadcast joins。
(3)对经常使用的数据,增加备份因子,激活Short-Circuit Read,采用Tez。



当某个表很大的时候,我们往往要对其进行分区,比如按照时间来分区。
CREATE TABLE sale (
id int, amount decimal, ...
) partitioned by (xdate string, state string);
其中的xdate和state是不存在的列,你可以认为它们是虚拟列,虚拟列会在HDFS当中建立子目录,属于分区的记录会存在那个子文件夹中。
使用分区之后,在查询和插入的时候,就必须带有至少一个分区字段,否则查询将会失败。
INSERT INTO sale (xdate=‘2013-03-01’, state=‘CA’)
SELECT * FROM staging_table
WHERE xdate = ‘2013-03-01’ AND state = ‘CA’;
如果你想一次查出所有数据,不想受这个限制的话,你可以 hive.exec.dynamic.partition.mode参数置为nonstrict。
set hive.exec.dynamic.partition.mode=nonstrict;

INSERT INTO sale (xdate, state)
SELECT * FROM staging_table;
有时候插入数据的时候,我们需要重新排序,在select 语句里面把虚拟列也加上,这样会有排序的效果。
INSERT INTO sale (xdate, state=‘CA’)
SELECT
id, amount, other_stuff,
xdate, state
FROM staging_table
WHERE state = ‘CA’;


 

下面我们讲一下常用的hive查询调优
 

mapred.max.split.size和mapred.min.split.size
min 太大-> 太少mapper.
max 太小-> mapper太多.
Example:
– set mapred.max.split.size=100000000;
– set mapred.min.split.size=1000000;
当然也有个原则,当mappers出现抢占资源的时候,才调整这些参数。
 

– set io.sort.mb=100;

• All the time:
– set hive.optmize.mapjoin.mapreduce=true;
– set hive.optmize.bucketmapjoin=true;
– set hive.optmize.bucketmapjoin.sortedmerge=true;
– set hive.auto.convert.join=true;
– set hive.auto.convert.sortmerge.join=true;
– set hive.auto.convert.sortmerge.join.nocondi1onaltask=true;
• When bucketing data:
– set hive.enforce.bucketing=true;
– set hive.enforce.sortng=true;
• These and more are set by default in HDP 1.3(明显的广告词,说明HDP比较强大,已经给我们设置好了).
这些参数我们可以在hive-site.xml中查询到,我们也可以在shell中查询。
(1)查询所有的参数

(2)查询某一个参数

(3)修改参数

 


收起阅读 »

Hive Tuning(一) 连接策略

群里共享了一本hive调优的书记,名叫《Hive Tunning》,就忍不住开始看了,也顺便记录一下自己学到的东西,备忘!
首先,这是hive的数据摘要,别问我什么意思,我也没看懂。

好,我们正式开始,首先是连接的问题,我们都知道连接耗时长,但是连接无法避免,那hive又是怎么处理连接操作的呢?
下面是hive的连接策略


hive有三种类型的连接策略
(1)Shuffle Join : 这种类型的是通过map/reduce 来实现连接操作的,优点是不需要考虑数据的大小和分布,缺点是消耗大量的资源而且是最慢的。
(2)Broadcast Join:这种类型的方式是把一个小的表在所有节点中加载到内容当中,然后用mapper来扫描大表进行连接,速度非常快,但是其中一个表必须可以加载到内存当中。
(3)Sort-Merge-Bucket Join:mapper可以协同定位keys去进行高效的连接,速度很快,不需要考虑表的大小,但是数据必须先排序和整理。


Shuffle Join:


我们以这个销售订单这个例子来做演示,可以看到其中的图,它们是通过customer.id=order.cid来做连接的,首先Map把两个表中的数据处理成以连接字段为key,其他字段为value的作为输出,然后把两个表中id和cid相同的数据传递到同一个reducer中,从网络使用率上看是很奢侈的。


 

Broadcast Join:


这种方式比较复杂一点,首先它使用足够小的维度表来存放在所有的节点当中,单独扫描大表,然后根据模式匹配进行连接。


当两个表都很大的情况下:

第一步,首先按照连接字段排序,所有可能的匹配的都在硬盘的同一块区域。


第二步,把所有的值都移到同一个节点下面进行等值连接,不需要再进行shuffle。



Bucketing:
– Hash partition values into a configurable number of buckets.
– Usually coupled with sorting.
• Skews:
– Split values out into separate files.
– Used when certain values are frequently seen.
• Replication Factor:
– Increase replication factor to accelerate reads.
– Controlled at the HDFS layer.
• Sorting:
– Sort the values within given columns.
– Greatly accelerates query when used with ORCFilefilter pushdown.
这里就不解释了,自己看吧,这和下面的图是对应的,针对不同大小的表,hive有多种处理模式。

(1)小表,经常要用的数据,建议使用replication factor,可能是缓存的意思,具体是什么意思,等我清楚了再给大家解释。
(2)任意大小的表,有很多要精确查询的列,建议先按照最常使用的列进行排序再进行查询。
(3)大表但是又需要和另外的的大表做连接,建议先通过连接列做排序和bucket。
(4)大表,但只是利用到其中某些常用的值,可以把常用的值弄个单独的skew中。
(5)大表但是有一些自然边界,比如日期的,建议利用日期进行分区。
Map Join开启
我们可以启用连接自动转换来帮助我们转换,在执行语句之前设置一下即可。它是经过优化的Map Join,无reducer。
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask = true;
set hive.auto.convert.join.noconditionaltask.size = 10000;
Skew Join
真实数据中数据倾斜是一定的, hadoop 中默认是使用
hive.exec.reducers.bytes.per.reducer = 1000000000
也就是每个节点的reduce 默认是处理1G大小的数据,如果你的join 操作也产生了数据倾斜,那么你可以在hive 中设定
set hive.optimize.skewjoin = true;
set hive.skewjoin.key = skew_key_threshold (default = 100000)
hive 在运行的时候没有办法判断哪个key 会产生多大的倾斜,所以使用这个参数控制倾斜的阈值,如果超过这个值,新的值会发送给那些还没有达到的reduce, 一般可以设置成你
(处理的总记录数/reduce个数)的2-4倍都可以接受.
倾斜是经常会存在的,一般select 的层数超过2层,翻译成执行计划多于3个以上的mapreduce job 都很容易产生倾斜,建议每次运行比较复杂的sql 之前都可以设一下这个参数. 如果你不知道设置多少,可以就按官方默认的1个reduce 只处理1G 的算法,那么 skew_key_threshold = 1G/平均行长. 或者默认直接设成250000000 (差不多算平均行长4个字节)
Sort-Merge-Bucket Join
如果表已经排序并且已经bucketed,可以启用SMB joins
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
set hive.auto.convert.sortmerge.join.bigtable.selection.policy = org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ;


继续阅读 »
群里共享了一本hive调优的书记,名叫《Hive Tunning》,就忍不住开始看了,也顺便记录一下自己学到的东西,备忘!
首先,这是hive的数据摘要,别问我什么意思,我也没看懂。

好,我们正式开始,首先是连接的问题,我们都知道连接耗时长,但是连接无法避免,那hive又是怎么处理连接操作的呢?
下面是hive的连接策略


hive有三种类型的连接策略
(1)Shuffle Join : 这种类型的是通过map/reduce 来实现连接操作的,优点是不需要考虑数据的大小和分布,缺点是消耗大量的资源而且是最慢的。
(2)Broadcast Join:这种类型的方式是把一个小的表在所有节点中加载到内容当中,然后用mapper来扫描大表进行连接,速度非常快,但是其中一个表必须可以加载到内存当中。
(3)Sort-Merge-Bucket Join:mapper可以协同定位keys去进行高效的连接,速度很快,不需要考虑表的大小,但是数据必须先排序和整理。


Shuffle Join:


我们以这个销售订单这个例子来做演示,可以看到其中的图,它们是通过customer.id=order.cid来做连接的,首先Map把两个表中的数据处理成以连接字段为key,其他字段为value的作为输出,然后把两个表中id和cid相同的数据传递到同一个reducer中,从网络使用率上看是很奢侈的。


 

Broadcast Join:


这种方式比较复杂一点,首先它使用足够小的维度表来存放在所有的节点当中,单独扫描大表,然后根据模式匹配进行连接。


当两个表都很大的情况下:

第一步,首先按照连接字段排序,所有可能的匹配的都在硬盘的同一块区域。


第二步,把所有的值都移到同一个节点下面进行等值连接,不需要再进行shuffle。



Bucketing:
– Hash partition values into a configurable number of buckets.
– Usually coupled with sorting.
• Skews:
– Split values out into separate files.
– Used when certain values are frequently seen.
• Replication Factor:
– Increase replication factor to accelerate reads.
– Controlled at the HDFS layer.
• Sorting:
– Sort the values within given columns.
– Greatly accelerates query when used with ORCFilefilter pushdown.
这里就不解释了,自己看吧,这和下面的图是对应的,针对不同大小的表,hive有多种处理模式。

(1)小表,经常要用的数据,建议使用replication factor,可能是缓存的意思,具体是什么意思,等我清楚了再给大家解释。
(2)任意大小的表,有很多要精确查询的列,建议先按照最常使用的列进行排序再进行查询。
(3)大表但是又需要和另外的的大表做连接,建议先通过连接列做排序和bucket。
(4)大表,但只是利用到其中某些常用的值,可以把常用的值弄个单独的skew中。
(5)大表但是有一些自然边界,比如日期的,建议利用日期进行分区。
Map Join开启
我们可以启用连接自动转换来帮助我们转换,在执行语句之前设置一下即可。它是经过优化的Map Join,无reducer。
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask = true;
set hive.auto.convert.join.noconditionaltask.size = 10000;
Skew Join
真实数据中数据倾斜是一定的, hadoop 中默认是使用
hive.exec.reducers.bytes.per.reducer = 1000000000
也就是每个节点的reduce 默认是处理1G大小的数据,如果你的join 操作也产生了数据倾斜,那么你可以在hive 中设定
set hive.optimize.skewjoin = true;
set hive.skewjoin.key = skew_key_threshold (default = 100000)
hive 在运行的时候没有办法判断哪个key 会产生多大的倾斜,所以使用这个参数控制倾斜的阈值,如果超过这个值,新的值会发送给那些还没有达到的reduce, 一般可以设置成你
(处理的总记录数/reduce个数)的2-4倍都可以接受.
倾斜是经常会存在的,一般select 的层数超过2层,翻译成执行计划多于3个以上的mapreduce job 都很容易产生倾斜,建议每次运行比较复杂的sql 之前都可以设一下这个参数. 如果你不知道设置多少,可以就按官方默认的1个reduce 只处理1G 的算法,那么 skew_key_threshold = 1G/平均行长. 或者默认直接设成250000000 (差不多算平均行长4个字节)
Sort-Merge-Bucket Join
如果表已经排序并且已经bucketed,可以启用SMB joins
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
set hive.auto.convert.sortmerge.join.bigtable.selection.policy = org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ;


收起阅读 »

MapReduce 学习(一)

首先我们先来欣赏一下MapReduce的执行过程吧,如下图,自己看,不解释了。

Map 和 Reduce 的处理都是基于Key/Value来进行的,在Map中对文件的每一行进行处理,有两个输入参数,KeyInput,ValueInput,然后有两个输出,KeyOut,ValueOut,在Map执行之后有个Combiner,负责把多个Map传过来的Key相同的Value生成一个Iterable接口的集合,也可以自己指定一个Combiner,可以提高性能,要慎用,经过Combiner处理之后,就把处理过的内容传给Reduce,这是个一对一的过程,Reduce的输出也是KeyOut,ValueOut,最后是输出到文件,这里还有一个Partitiner,实现它可以把输出分别写到多个文件上,否则将会把所有reduce产生的文件输出到一个文件当中,好,我们来看一下下面这个图,大家就可以有一个更直观的感受了!



 

好啦,理论就讲到这里。


继续阅读 »
首先我们先来欣赏一下MapReduce的执行过程吧,如下图,自己看,不解释了。

Map 和 Reduce 的处理都是基于Key/Value来进行的,在Map中对文件的每一行进行处理,有两个输入参数,KeyInput,ValueInput,然后有两个输出,KeyOut,ValueOut,在Map执行之后有个Combiner,负责把多个Map传过来的Key相同的Value生成一个Iterable接口的集合,也可以自己指定一个Combiner,可以提高性能,要慎用,经过Combiner处理之后,就把处理过的内容传给Reduce,这是个一对一的过程,Reduce的输出也是KeyOut,ValueOut,最后是输出到文件,这里还有一个Partitiner,实现它可以把输出分别写到多个文件上,否则将会把所有reduce产生的文件输出到一个文件当中,好,我们来看一下下面这个图,大家就可以有一个更直观的感受了!



 

好啦,理论就讲到这里。


收起阅读 »

hbase 部署

hbase的部署相对于java来说就比较简单啦,主要过程如下:
  1.下载hbase最新的稳定版
  2.拷贝到相应的目录
  3.修改conf目录下的hbase-env.sh,设置java 和不适用内置的zookeeper
  export JAVA_HOME=/usr/java/jdk1.7.0_21/ export HBASE_MANAGES_ZK=false
  4.修改hbase-site.xml,添加以下内容
  

<property> 

  <name>hbase.rootdir</name>

  <value>hdfs://hadoop.Master:9000/hbase</value>

</property>

<property>

  <name>hbase.cluster.distributed</name>

  <value>true</value>

</property>

<property>

  <name>hbase.master</name>

  <value>192.168.1.133:60000</value>

</property>

<property>

  <name>hbase.zookeeper.quorum</name>

  <value>hadoop.Master,hadoop.SlaveT1,hadoop.SlaveT2</value>

</property>
View Code
 
  5.打开regionservers注册奴隶机器 hadoop.SlaveT1 hadoop.SlaveT2
  6.把hadoop.core.jar包替换掉hbase的lib目录下的jar包
  7.然后把相同的工作,在集群的其他机器上再重做一遍吧
继续阅读 »
hbase的部署相对于java来说就比较简单啦,主要过程如下:
  1.下载hbase最新的稳定版
  2.拷贝到相应的目录
  3.修改conf目录下的hbase-env.sh,设置java 和不适用内置的zookeeper
  export JAVA_HOME=/usr/java/jdk1.7.0_21/ export HBASE_MANAGES_ZK=false
  4.修改hbase-site.xml,添加以下内容
  

<property> 

  <name>hbase.rootdir</name>

  <value>hdfs://hadoop.Master:9000/hbase</value>

</property>

<property>

  <name>hbase.cluster.distributed</name>

  <value>true</value>

</property>

<property>

  <name>hbase.master</name>

  <value>192.168.1.133:60000</value>

</property>

<property>

  <name>hbase.zookeeper.quorum</name>

  <value>hadoop.Master,hadoop.SlaveT1,hadoop.SlaveT2</value>

</property>
View Code
 
  5.打开regionservers注册奴隶机器 hadoop.SlaveT1 hadoop.SlaveT2
  6.把hadoop.core.jar包替换掉hbase的lib目录下的jar包
  7.然后把相同的工作,在集群的其他机器上再重做一遍吧 收起阅读 »

java.lang.OutOfMemoryError: Java heap space 解决方法

从网上抄过来的,因为经常碰到这个问题,记录一下。
java.lang.OutOfMemoryError: Java heap space 解决方法
这个问题的根源是jvm虚拟机的默认Heap大小是64M,可以通过设置其最大和最小值来实现.设置的方法主要是几个.
1.可以在windows 更改系统环境变量 加上JAVA_OPTS=-Xms64m -Xmx512m
2,如果用的tomcat,在windows下,可以在
C:\tomcat5.5.9\bin\catalina.bat 中加上:
set JAVA_OPTS=-Xms64m -Xmx256m
位置在: rem Guess CATALINA_HOME if not defined 这行的下面加合适.
3.如果是linux系统 Linux 在{tomcat_home}/bin/catalina.sh的前面,加 set JAVA_OPTS='-Xms64 -Xmx512'
java.lang.OutOfMemoryError: Java heap space
使用Java程序从数据库中查询大量的数据时出现异常: java.lang.OutOfMemoryError: Java heap space
在JVM中如果98%的时间是用于GC且可用的 Heap size 不足2%的时候将抛出此异常信息。
JVM堆的设置是指java程序运行过程中JVM可以调配使用的内存空间的设置.JVM在启动的时候会自动设置Heap size的值,其初始空间(即-Xms)是物理内存的1/64,最大空间(-Xmx)是物理内存的1/4。可以利用JVM提供的-Xmn -Xms -Xmx等选项可进行设置。 例如:java -jar -Xmn16m -Xms64m -Xmx128m MyApp.jar 如果Heap Size设置偏小,除了这些异常信息外,还会发现程序的响应速度变慢了。GC占用了更多的时间,而应用分配到的执行时间较少。 Heap Size 最大不要超过可用物理内存的80%,一般的要将-Xms和-Xmx选项设置为相同,而-Xmn为1/4的-Xmx值。 Heap size的 -Xms -Xmn 设置不要超出物理内存的大小。否则会提示“Error occurred during initialization of VM Could not reserve enough space for object heap”。
转载地址:http://www.blogjava.net/liuwentao253/archive/2008/06/03/205466.html
继续阅读 »
从网上抄过来的,因为经常碰到这个问题,记录一下。
java.lang.OutOfMemoryError: Java heap space 解决方法
这个问题的根源是jvm虚拟机的默认Heap大小是64M,可以通过设置其最大和最小值来实现.设置的方法主要是几个.
1.可以在windows 更改系统环境变量 加上JAVA_OPTS=-Xms64m -Xmx512m
2,如果用的tomcat,在windows下,可以在
C:\tomcat5.5.9\bin\catalina.bat 中加上:
set JAVA_OPTS=-Xms64m -Xmx256m
位置在: rem Guess CATALINA_HOME if not defined 这行的下面加合适.
3.如果是linux系统 Linux 在{tomcat_home}/bin/catalina.sh的前面,加 set JAVA_OPTS='-Xms64 -Xmx512'
java.lang.OutOfMemoryError: Java heap space
使用Java程序从数据库中查询大量的数据时出现异常: java.lang.OutOfMemoryError: Java heap space
在JVM中如果98%的时间是用于GC且可用的 Heap size 不足2%的时候将抛出此异常信息。
JVM堆的设置是指java程序运行过程中JVM可以调配使用的内存空间的设置.JVM在启动的时候会自动设置Heap size的值,其初始空间(即-Xms)是物理内存的1/64,最大空间(-Xmx)是物理内存的1/4。可以利用JVM提供的-Xmn -Xms -Xmx等选项可进行设置。 例如:java -jar -Xmn16m -Xms64m -Xmx128m MyApp.jar 如果Heap Size设置偏小,除了这些异常信息外,还会发现程序的响应速度变慢了。GC占用了更多的时间,而应用分配到的执行时间较少。 Heap Size 最大不要超过可用物理内存的80%,一般的要将-Xms和-Xmx选项设置为相同,而-Xmn为1/4的-Xmx值。 Heap size的 -Xms -Xmn 设置不要超出物理内存的大小。否则会提示“Error occurred during initialization of VM Could not reserve enough space for object heap”。
转载地址:http://www.blogjava.net/liuwentao253/archive/2008/06/03/205466.html 收起阅读 »

线程池

学习java很久很久了,得有个5年了,但是从来都没有真正的走进java世界,希望从这篇文章开始,把自己对java的点点滴滴都记录下来。
  从java5开始,java就提供了名叫Executor framework的机制,主要是围绕着Executor接口, 它的接口 ExecutorService, 以及实现了这两个接口的ThreadPoolExecutor类来展开,这种机制把线程的执行和创建分离开了,你只需要创建一个线程,然后把线程丢给Executor,让它执行去吧。使用这个机制的另外一个好处是可以使用Callable接口,它类似于Runnable接口,但是有两个不一样的特性。
  • 它的主要方法是call(), 它可以携带一个返回值。
  • 当你发送了一个Callable对象给executor之后,你可以拿到一个实现了Future接口的对象,通过这个对象,你可以控制对象的状态以及Callable对象的结果。

  
  

public Server(){
executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(5);
System.out.printf("Server: Task Count: %d\n",executor.
getTaskCount());
}

提交带返回值的任务。
public class FactorialCalculator implements Callable<Integer> {
private Integer number;
public FactorialCalculator(Integer number){
this.number=number;
}
@Override
public Integer call() throws Exception {
int result = 1;
if ((num==0)||(num==1)) {
result=1;
} else {
for (int i=2; i<=number; i++) {
result*=i;
TimeUnit.MILLISECONDS.sleep(20);
}
}
System.out.printf("%s: %d\n",Thread.currentThread().
getName(),result);
return result;
}

public class Main {
public static void main(String[] args) {
ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.
newFixedThreadPool(2);
List<Future<Integer>> resultList=new ArrayList<>();
Random random=new Random();
for (int i=0; i<10; i++){
Integer number= random.nextInt(10);
FactorialCalculator calculator=new
FactorialCalculator(number);
Future<Integer> result=executor.submit(calculator);
resultList.add(result);
}
do {
for (int i=0; i<resultList.size(); i++) {
Future<Integer> result=resultList.get(i);
System.out.printf("Main: Task %d: %s\n",i,result.
isDone());
}
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (executor.getCompletedTaskCount()<resultList.size());
System.out.printf("Main: Results\n");
for (int i=0; i<resultList.size(); i++) {
Future<Integer> result=resultList.get(i);
Integer number=null;
try {
number=result.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.printf("Main: Task %d: %d\n",i,number);
}
executor.shutdown();
}
}

}
View Code
  
  在Future对象里面,我们可以通过result.isDone()方法来判断线程是否计算完毕。
  执行一堆任务,只返回第一个完成的任务。result = executor.invokeAny(taskList);
  执行全部,resultList=executor.invokeAll(taskList);
  推迟执行,executor.awaitTermination(1, TimeUnit.DAYS);
  把任务的执行和结果的处理分开,需要用到CompletionService, CompletionServicei有两个方法,take和poll,poll是如果没有,它就会立刻返回一个null,take是没有的话,会一直等待。
  当线程池调用了关闭之后,它需要等待当前所有进行中的线程结束才会完全关闭,在这个过程当中提交的线程,需要拒绝处理,我们需要实现一个RejectedExecutionHandler,重写它的rejectedExecution方法,然后听过executor的setRejectedExecutionHandler()方法来设置。
  

public class RejectedTaskController implements 
RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor
executor) {
System.out.printf("RejectedTaskController: The task %s has
been rejected\n",r.toString());
System.out.printf("RejectedTaskController: %s\n",executor.
toString());
System.out.printf("RejectedTaskController: Terminating:
%s\n",executor.isTerminating());
System.out.printf("RejectedTaksController: Terminated:
%s\n",executor.isTerminated());
}
View Code
  
  Fork/Join Framework
   Fork/Join Framwork的诞生是为了解决如下图这样的问题的
  
   我们可以利用ForkJoinPool,它是一个特殊的Executors。
ForkJoin 框架主要是有两个操作:
Fork:把一个任务分成几个小任务,然后执行
Join:等待小任务的完成,并生成一个新任务。
这里我把ForkJoin称为刀叉框架,刀叉框架和Executor框架不一样的地方在于work-stealing算法,不同于Executor框架,刀叉框架在等待子任务的完成之前就已经创建并开始运行Join方法,Join方法一直在检测任务是否完成并且开始运行。通过这样的方式,可以很好的利用runtime的优势,提高性能。
这样就有一些限制:
任务只能采用Fork和Join来作为同步机制,而不能采用别的同步机制,如果采用其他的机制,他们在同步操作的时候,不能执行别的任务。比如:当你在刀叉框架里面一个任务sleep的时候,别的正在执行的任务也会停止,直到该线程sleep结束。
刀叉框架的核心是两个类:
(1)ForkJoinPool,它实现了ExecutorService接口和work-stealing算法,通过它可以很好的管理正在运行的任务以及了解任务的信息。
(2)ForkJoinTask,任务的基类,它提供了fork()方法和join()方法来控制任务的状态。不同通常,你会实现他们的两个子类,不带返回结果的RecursiveAction和带返回结果的RecursiveTask。
继续阅读 »
学习java很久很久了,得有个5年了,但是从来都没有真正的走进java世界,希望从这篇文章开始,把自己对java的点点滴滴都记录下来。
  从java5开始,java就提供了名叫Executor framework的机制,主要是围绕着Executor接口, 它的接口 ExecutorService, 以及实现了这两个接口的ThreadPoolExecutor类来展开,这种机制把线程的执行和创建分离开了,你只需要创建一个线程,然后把线程丢给Executor,让它执行去吧。使用这个机制的另外一个好处是可以使用Callable接口,它类似于Runnable接口,但是有两个不一样的特性。
  • 它的主要方法是call(), 它可以携带一个返回值。
  • 当你发送了一个Callable对象给executor之后,你可以拿到一个实现了Future接口的对象,通过这个对象,你可以控制对象的状态以及Callable对象的结果。

  
  

public Server(){
executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(5);
System.out.printf("Server: Task Count: %d\n",executor.
getTaskCount());
}

提交带返回值的任务。
public class FactorialCalculator implements Callable<Integer> {
private Integer number;
public FactorialCalculator(Integer number){
this.number=number;
}
@Override
public Integer call() throws Exception {
int result = 1;
if ((num==0)||(num==1)) {
result=1;
} else {
for (int i=2; i<=number; i++) {
result*=i;
TimeUnit.MILLISECONDS.sleep(20);
}
}
System.out.printf("%s: %d\n",Thread.currentThread().
getName(),result);
return result;
}

public class Main {
public static void main(String[] args) {
ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.
newFixedThreadPool(2);
List<Future<Integer>> resultList=new ArrayList<>();
Random random=new Random();
for (int i=0; i<10; i++){
Integer number= random.nextInt(10);
FactorialCalculator calculator=new
FactorialCalculator(number);
Future<Integer> result=executor.submit(calculator);
resultList.add(result);
}
do {
for (int i=0; i<resultList.size(); i++) {
Future<Integer> result=resultList.get(i);
System.out.printf("Main: Task %d: %s\n",i,result.
isDone());
}
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (executor.getCompletedTaskCount()<resultList.size());
System.out.printf("Main: Results\n");
for (int i=0; i<resultList.size(); i++) {
Future<Integer> result=resultList.get(i);
Integer number=null;
try {
number=result.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.printf("Main: Task %d: %d\n",i,number);
}
executor.shutdown();
}
}

}
View Code
  
  在Future对象里面,我们可以通过result.isDone()方法来判断线程是否计算完毕。
  执行一堆任务,只返回第一个完成的任务。result = executor.invokeAny(taskList);
  执行全部,resultList=executor.invokeAll(taskList);
  推迟执行,executor.awaitTermination(1, TimeUnit.DAYS);
  把任务的执行和结果的处理分开,需要用到CompletionService, CompletionServicei有两个方法,take和poll,poll是如果没有,它就会立刻返回一个null,take是没有的话,会一直等待。
  当线程池调用了关闭之后,它需要等待当前所有进行中的线程结束才会完全关闭,在这个过程当中提交的线程,需要拒绝处理,我们需要实现一个RejectedExecutionHandler,重写它的rejectedExecution方法,然后听过executor的setRejectedExecutionHandler()方法来设置。
  

public class RejectedTaskController implements 
RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor
executor) {
System.out.printf("RejectedTaskController: The task %s has
been rejected\n",r.toString());
System.out.printf("RejectedTaskController: %s\n",executor.
toString());
System.out.printf("RejectedTaskController: Terminating:
%s\n",executor.isTerminating());
System.out.printf("RejectedTaksController: Terminated:
%s\n",executor.isTerminated());
}
View Code
  
  Fork/Join Framework
   Fork/Join Framwork的诞生是为了解决如下图这样的问题的
  
   我们可以利用ForkJoinPool,它是一个特殊的Executors。
ForkJoin 框架主要是有两个操作:
Fork:把一个任务分成几个小任务,然后执行
Join:等待小任务的完成,并生成一个新任务。
这里我把ForkJoin称为刀叉框架,刀叉框架和Executor框架不一样的地方在于work-stealing算法,不同于Executor框架,刀叉框架在等待子任务的完成之前就已经创建并开始运行Join方法,Join方法一直在检测任务是否完成并且开始运行。通过这样的方式,可以很好的利用runtime的优势,提高性能。
这样就有一些限制:
任务只能采用Fork和Join来作为同步机制,而不能采用别的同步机制,如果采用其他的机制,他们在同步操作的时候,不能执行别的任务。比如:当你在刀叉框架里面一个任务sleep的时候,别的正在执行的任务也会停止,直到该线程sleep结束。
刀叉框架的核心是两个类:
(1)ForkJoinPool,它实现了ExecutorService接口和work-stealing算法,通过它可以很好的管理正在运行的任务以及了解任务的信息。
(2)ForkJoinTask,任务的基类,它提供了fork()方法和join()方法来控制任务的状态。不同通常,你会实现他们的两个子类,不带返回结果的RecursiveAction和带返回结果的RecursiveTask。 收起阅读 »

hadoop 1.1.2和 hive 0.10 和hbase 0.94.9整合

今天弄了一下hive0.10和hbase0.94.9整合,需要设置的并不多,但是也遇到了一些问题。
  1.复制jar包
  拷贝hbase-0.94.9.jar,zookeeper-3.4.5.jar,protobuf-java-2.4.0a.jar到hive/lib下,删掉lib下面旧版的jar包。
  拷贝hbase-0.94.9.jar到所有hadoop节点的lib文件夹下面,拷贝hbase/confi的hbase-site.xml文件拷贝到所有的hadoop节点conf文件夹下。
  2.修改hive-site.xml配置文件,添加以下内容

<property>     
<name>hive.querylog.location</name>
<value>/usr/hive/logs</value>
</property>
<property>
<name>hive.aux.jars.path</name>
<value>file:///usr/hive/lib/hive-hbase-handler-0.10.0.jar,file:///usr/hive/lib/hbase-0.94.9.jar,file:///usr/hive/lib/zookeeper-3.4.5.jar,file:///usr/hive/lib/protobuf-java-2.4.0a.jar</value>
</property>
View Code
  
  
  3.启动hive,hive -hiveconf hbase.zookeeper.quorum=node1,node2,node3
   实际上只需要填一个即可,我只填了一个。
  4.开始测试,建一个表试验。
CREATE TABLE hbase_table1(key int, value1 string, value2 int, value3 int) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
"hbase.columns.mapping" = ":key,cf1:value1,cf1:value2,cf2:value3"
)TBLPROPERTIES("hbase.table.name" = "table1");
 
TBLPROPERTIES参数是可选的,如果不写的话,就默认是hive和hbase中的表名称一致
  5.打开hbase看看,使用describe “table1”来查询一下,发一个我真实建立的表吧。


hbase(main):001:0> describe "wdp"
DESCRIPTION ENABLED
'wdp', {NAME => 'cf', DATA_BLOCK_ENCODING => 'NONE' true
, BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0',
VERSIONS => '3', COMPRESSION => 'NONE', MIN_VERSION
S => '0', TTL => '2147483647', KEEP_DELETED_CELLS =
> 'false', BLOCKSIZE => '65536', IN_MEMORY => 'fals
e', ENCODE_ON_DISK => 'true', BLOCKCACHE => 'true'}
1 row(s) in 1.1980 seconds

hbase(main):002:0>
View Code
   好了,就这样啦,我还没插入数据测试呢,就先这样吧。
  最后发一个它官方的文档地址,想了解更多的到这个网站上面看看
  https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration
继续阅读 »
今天弄了一下hive0.10和hbase0.94.9整合,需要设置的并不多,但是也遇到了一些问题。
  1.复制jar包
  拷贝hbase-0.94.9.jar,zookeeper-3.4.5.jar,protobuf-java-2.4.0a.jar到hive/lib下,删掉lib下面旧版的jar包。
  拷贝hbase-0.94.9.jar到所有hadoop节点的lib文件夹下面,拷贝hbase/confi的hbase-site.xml文件拷贝到所有的hadoop节点conf文件夹下。
  2.修改hive-site.xml配置文件,添加以下内容

<property>     
<name>hive.querylog.location</name>
<value>/usr/hive/logs</value>
</property>
<property>
<name>hive.aux.jars.path</name>
<value>file:///usr/hive/lib/hive-hbase-handler-0.10.0.jar,file:///usr/hive/lib/hbase-0.94.9.jar,file:///usr/hive/lib/zookeeper-3.4.5.jar,file:///usr/hive/lib/protobuf-java-2.4.0a.jar</value>
</property>
View Code
  
  
  3.启动hive,hive -hiveconf hbase.zookeeper.quorum=node1,node2,node3
   实际上只需要填一个即可,我只填了一个。
  4.开始测试,建一个表试验。
CREATE TABLE hbase_table1(key int, value1 string, value2 int, value3 int) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
"hbase.columns.mapping" = ":key,cf1:value1,cf1:value2,cf2:value3"
)TBLPROPERTIES("hbase.table.name" = "table1");
 
TBLPROPERTIES参数是可选的,如果不写的话,就默认是hive和hbase中的表名称一致
  5.打开hbase看看,使用describe “table1”来查询一下,发一个我真实建立的表吧。


hbase(main):001:0> describe "wdp"
DESCRIPTION ENABLED
'wdp', {NAME => 'cf', DATA_BLOCK_ENCODING => 'NONE' true
, BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0',
VERSIONS => '3', COMPRESSION => 'NONE', MIN_VERSION
S => '0', TTL => '2147483647', KEEP_DELETED_CELLS =
> 'false', BLOCKSIZE => '65536', IN_MEMORY => 'fals
e', ENCODE_ON_DISK => 'true', BLOCKCACHE => 'true'}
1 row(s) in 1.1980 seconds

hbase(main):002:0>
View Code
   好了,就这样啦,我还没插入数据测试呢,就先这样吧。
  最后发一个它官方的文档地址,想了解更多的到这个网站上面看看
  https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration 收起阅读 »

hive thrift 开机启动

这个问题困扰我很久,之前redis的时候,也出现了这个问题,从网上找的thrift脚本没有一个好使的,最后通过修改/etc/rc.d/rc.local来执行一些非服务的命令,这样子就不需要像写服务那样写start,stop方法啦,不过修改这个配置文件要小心,命令里面不要包含阻塞式的命令,否则开机进不了界面,就悲剧了,我就这样玩挂了一次系统。
  经过一顿挣扎之后,终于找到解决的方法了。
  su - cenyuhai -c "hive --service hiveserver &"
  上面的这句命令的意思是用账号cenyuhai来执行 "hive --service hiverserver" 这个命令, 命令结束后的&意思是在后台运行,则该命令不会阻塞系统,否则就悲剧了,发生进不去系统的问题。
  分享结束,收工!
继续阅读 »
这个问题困扰我很久,之前redis的时候,也出现了这个问题,从网上找的thrift脚本没有一个好使的,最后通过修改/etc/rc.d/rc.local来执行一些非服务的命令,这样子就不需要像写服务那样写start,stop方法啦,不过修改这个配置文件要小心,命令里面不要包含阻塞式的命令,否则开机进不了界面,就悲剧了,我就这样玩挂了一次系统。
  经过一顿挣扎之后,终于找到解决的方法了。
  su - cenyuhai -c "hive --service hiveserver &"
  上面的这句命令的意思是用账号cenyuhai来执行 "hive --service hiverserver" 这个命令, 命令结束后的&意思是在后台运行,则该命令不会阻塞系统,否则就悲剧了,发生进不去系统的问题。
  分享结束,收工! 收起阅读 »

springMVC下的javascript调试

最近想弄一个hadoop的管理界面,所以在网上下了一个名为jeecg的快速开发平台,由于工作之后没有用过java做网站,遇到了好多小问题,其中一个就是现在要说的javascript脚本调试的问题。说来也奇怪,其实我也分辨不出来这到底是因为是springMVC,还是easy ui 给屏蔽掉的,找自己在自己的页面上的那段javascript脚本异常费劲,
可能这个真是是因为springMVC的缘故吧,因为右键出来的页面地址就是带有xxx.do这样的,而不是真是的jsp页面,所以看不到源码。这可怎么办呀,愁死我了,折腾一阵强有力的折腾之后,终于被我找到方法了,在三大浏览器上都找到了方法。。 都是误打误撞给装出来。
  第一个就是Chrome(只是略微描述),因为我的那个时间是通过点击开始的,我就给点击事件加了断点,一步一步的按下去,就进去了我的那个函数,之前也是找不到。
第二个出来的是IE10,因为我平常用习惯了IE,平时要调试个什么东西,我都是用的IE来调试。IE上其实也很容易找出来,比Chrome省事多了。
  首先打开页面,然后按F12放狗!
  
  然后别费劲找了,现在是找不到的,直接点击“开始调试”,然后在脚本列表里面就会出现动态脚本啦,我写的脚本就在里面,当然如果没出现的话,你可以先点击一下你要测试的功能。
  
  
  然后我们就可以搜索自己写的那段脚本在哪里了
  
   OK,然后给它打上断点即可,哈哈。
  
  第三个是火狐啦,火狐需要下载一个插件firebug,用自带的那个工具也不好找,但是用firebug的话,用那个搜索功能一搜索立马就能出来,实在是太强大了。
继续阅读 »
最近想弄一个hadoop的管理界面,所以在网上下了一个名为jeecg的快速开发平台,由于工作之后没有用过java做网站,遇到了好多小问题,其中一个就是现在要说的javascript脚本调试的问题。说来也奇怪,其实我也分辨不出来这到底是因为是springMVC,还是easy ui 给屏蔽掉的,找自己在自己的页面上的那段javascript脚本异常费劲,
可能这个真是是因为springMVC的缘故吧,因为右键出来的页面地址就是带有xxx.do这样的,而不是真是的jsp页面,所以看不到源码。这可怎么办呀,愁死我了,折腾一阵强有力的折腾之后,终于被我找到方法了,在三大浏览器上都找到了方法。。 都是误打误撞给装出来。
  第一个就是Chrome(只是略微描述),因为我的那个时间是通过点击开始的,我就给点击事件加了断点,一步一步的按下去,就进去了我的那个函数,之前也是找不到。
第二个出来的是IE10,因为我平常用习惯了IE,平时要调试个什么东西,我都是用的IE来调试。IE上其实也很容易找出来,比Chrome省事多了。
  首先打开页面,然后按F12放狗!
  
  然后别费劲找了,现在是找不到的,直接点击“开始调试”,然后在脚本列表里面就会出现动态脚本啦,我写的脚本就在里面,当然如果没出现的话,你可以先点击一下你要测试的功能。
  
  
  然后我们就可以搜索自己写的那段脚本在哪里了
  
   OK,然后给它打上断点即可,哈哈。
  
  第三个是火狐啦,火狐需要下载一个插件firebug,用自带的那个工具也不好找,但是用firebug的话,用那个搜索功能一搜索立马就能出来,实在是太强大了。 收起阅读 »

hadoop的调试

折腾hadoop的调试很久了,一直都没折腾对,查过很多资料,但是都没试出来,最终在不断地尝试当中调试出来了,所以想把这个过程记录下来,和大家分享一下。
调试分为两部分,MapReduce的调试和源码的调试。
MapReduce的调试很简单,首先要部署好hadoop,这个我就不说了,自己去百度。部署好之后,下载Hadoop对应的eclipse插件,有了这个插件之后,变得异常简单。
  这是我在网上下的hadoop1.1.2的eclipse插件的地址:http://download.csdn.net/detail/cenyuhaiwork/5716051
  下载完毕之后,把它放入eclipse目录的dropins文件夹中即可,然后重启eclipse。

  打开windows preferences 中,发现有Hadoop Map/Reduce就说明已经成功了。
打开window show View ,选择Map/Reduce Locations

编辑Map/Reduce Locations

修改成实际的Ip地址和端口即可。

  设置完毕,开始新建工程。
 
  点击File,新建工程,选择Map/Reduce Project.

新建工程之后,然后新建一个类,我们可以把工程里面的examples里面的WorkCount拿出来试验一下,直接点调试即可开始,就像我们正常调试程序一样。examples里面还提供了其他很多的例子,大家可以去看看挺好的。
  好了,现在我们开始进入hadoop源码的调试当中,在调试之前我们首先要把源码处理成不报错的状态。
把源码导入eclipse当中,源码可以在发布版的hadoop的src文件夹中找,然后我们导入jar,右键点击属性,选择java Build path,点击add jars,把发布版的lib文件夹中所有的jar包都导入。

然后我们再点击Source标签页,去掉多余的内容,我们只需要编译核心的几个目录即可,多编译因为缺少一些别的jar包报错,比如ant的。
先删掉原来的src目录,然后重新添加,我点Add Folder添加了一下目录,别的我就没有编译了。

 
然后再点击左侧的Java Compiler,选择子项Error/Warnning ,如下图所示,把Error改成Warning.

 
点击Ok,设置完毕,可以开始Build啦。
好,我们打开hadoop生产环境下的bin目录下的hadoop,我们以调试NameNode为例子,找到elif ["COMMAND" = "namenode"]这一段,在HADOOP_OPTS
的字符串后面,添加-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000。

其中suspend表示是否挂起等待调试连接,这里我没有让它挂起,有需要的朋友可以改成y,让它挂起,那样效果很明显,一直等着你连接。
这边就算设置完毕了。可以启动hadoop,命令行会出现,8000端口已经被监听的提示,然后在Eclipse这边打开NameNode的代码,打上断点,然后打开Debug Configurations 面板,在左侧选择Remote Java Application,如下图:

点击Debug,然后进入熟悉的debug页面就是成功啦!
分享结束,有什么不明白可以留言。
继续阅读 »
折腾hadoop的调试很久了,一直都没折腾对,查过很多资料,但是都没试出来,最终在不断地尝试当中调试出来了,所以想把这个过程记录下来,和大家分享一下。
调试分为两部分,MapReduce的调试和源码的调试。
MapReduce的调试很简单,首先要部署好hadoop,这个我就不说了,自己去百度。部署好之后,下载Hadoop对应的eclipse插件,有了这个插件之后,变得异常简单。
  这是我在网上下的hadoop1.1.2的eclipse插件的地址:http://download.csdn.net/detail/cenyuhaiwork/5716051
  下载完毕之后,把它放入eclipse目录的dropins文件夹中即可,然后重启eclipse。

  打开windows preferences 中,发现有Hadoop Map/Reduce就说明已经成功了。
打开window show View ,选择Map/Reduce Locations

编辑Map/Reduce Locations

修改成实际的Ip地址和端口即可。

  设置完毕,开始新建工程。
 
  点击File,新建工程,选择Map/Reduce Project.

新建工程之后,然后新建一个类,我们可以把工程里面的examples里面的WorkCount拿出来试验一下,直接点调试即可开始,就像我们正常调试程序一样。examples里面还提供了其他很多的例子,大家可以去看看挺好的。
  好了,现在我们开始进入hadoop源码的调试当中,在调试之前我们首先要把源码处理成不报错的状态。
把源码导入eclipse当中,源码可以在发布版的hadoop的src文件夹中找,然后我们导入jar,右键点击属性,选择java Build path,点击add jars,把发布版的lib文件夹中所有的jar包都导入。

然后我们再点击Source标签页,去掉多余的内容,我们只需要编译核心的几个目录即可,多编译因为缺少一些别的jar包报错,比如ant的。
先删掉原来的src目录,然后重新添加,我点Add Folder添加了一下目录,别的我就没有编译了。

 
然后再点击左侧的Java Compiler,选择子项Error/Warnning ,如下图所示,把Error改成Warning.

 
点击Ok,设置完毕,可以开始Build啦。
好,我们打开hadoop生产环境下的bin目录下的hadoop,我们以调试NameNode为例子,找到elif ["COMMAND" = "namenode"]这一段,在HADOOP_OPTS
的字符串后面,添加-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000。

其中suspend表示是否挂起等待调试连接,这里我没有让它挂起,有需要的朋友可以改成y,让它挂起,那样效果很明显,一直等着你连接。
这边就算设置完毕了。可以启动hadoop,命令行会出现,8000端口已经被监听的提示,然后在Eclipse这边打开NameNode的代码,打上断点,然后打开Debug Configurations 面板,在左侧选择Remote Java Application,如下图:

点击Debug,然后进入熟悉的debug页面就是成功啦!
分享结束,有什么不明白可以留言。 收起阅读 »

部署zookeeper集群

1.把zookeeper.tar.gz解压之后,移动到/usr目录下

2.首先要给zookeeper之间的每个节点的ssh设置无密码登陆

3.在zookeeper目录下编辑zoo.cfg,复制zoo_sample.cfg进行修改 最终配置内容如下:

tickTime=2000

initLimit=5

syncLimit=2

dataDir=/usr/zookeeper/data

dataLogDir=/usr/zookeeper/logs

clientPort=2181

server.1=hadoop.Master:2888:3888

server.2=hadoop.SlaveT1:2888:3888

server.3=hadoop.SlaveT2:2888:3888

这三个地址已经在/etc/hosts文件中进行了设置

hadoop.Master:192.168.1.133

hadoop.SlaveT1:192.168.1.134

hadoop.SlaveT2:192.168.1.135 3

在/usr/zookeeper/data目录下新建myid文件

在hadoop.Master设置为1

在hadoop.SlaveT1设置为2

在hadoop.SlaveT2设置为3

4.现在基本已经设置完了,启动之后,使用zkServer.sh status查看状态,结果告诉你,It's probably not running.没有运行,这是咋回事 原来是防火墙的问题,把防火墙都关闭了,就好了。

5.关闭防火墙 service iptables stop

6.分别启动这三个zookeeper

/usr/zookeeper/bin/zkServer.sh start

查看状态

/usr/zookeeper/bin/zkServer.sh stauts

第一个启动的会出现以下文字:

JMX enabled by default
Using config: /usr/zookeeper/bin/../conf/zoo.cfg
Mode: leader

之后启动的会出现以下文字:

JMX enabled by default
Using config: /usr/zookeeper/bin/../conf/zoo.cfg
Mode: followers

 
继续阅读 »
1.把zookeeper.tar.gz解压之后,移动到/usr目录下

2.首先要给zookeeper之间的每个节点的ssh设置无密码登陆

3.在zookeeper目录下编辑zoo.cfg,复制zoo_sample.cfg进行修改 最终配置内容如下:

tickTime=2000

initLimit=5

syncLimit=2

dataDir=/usr/zookeeper/data

dataLogDir=/usr/zookeeper/logs

clientPort=2181

server.1=hadoop.Master:2888:3888

server.2=hadoop.SlaveT1:2888:3888

server.3=hadoop.SlaveT2:2888:3888

这三个地址已经在/etc/hosts文件中进行了设置

hadoop.Master:192.168.1.133

hadoop.SlaveT1:192.168.1.134

hadoop.SlaveT2:192.168.1.135 3

在/usr/zookeeper/data目录下新建myid文件

在hadoop.Master设置为1

在hadoop.SlaveT1设置为2

在hadoop.SlaveT2设置为3

4.现在基本已经设置完了,启动之后,使用zkServer.sh status查看状态,结果告诉你,It's probably not running.没有运行,这是咋回事 原来是防火墙的问题,把防火墙都关闭了,就好了。

5.关闭防火墙 service iptables stop

6.分别启动这三个zookeeper

/usr/zookeeper/bin/zkServer.sh start

查看状态

/usr/zookeeper/bin/zkServer.sh stauts

第一个启动的会出现以下文字:

JMX enabled by default
Using config: /usr/zookeeper/bin/../conf/zoo.cfg
Mode: leader

之后启动的会出现以下文字:

JMX enabled by default
Using config: /usr/zookeeper/bin/../conf/zoo.cfg
Mode: followers

  收起阅读 »

使用Ant编译Hadoop工程报错

在win7用Ant编译hadoop工程的时候,遇到了一个报错,如下:
  org.eclipse.core.runtime.CoreException: D:\workspace\hadoop-1.1.2\build.xml:83: Execute failed: java.io.IOException: Cannot run program "sed"
  打开build.xml文件,找到sed,然后把注释掉即可,这段内容是为了替换苹果系统的空格的,但是。。Windows出错啦,也难怪,因为hadoop他们压根儿就没想让它在windows上面跑。
<exec executable="sed" inputstring="${os.name}"
outputproperty="nonspace.os">
<arg value="s/ /_/g"/>
</exec>
继续阅读 »
在win7用Ant编译hadoop工程的时候,遇到了一个报错,如下:
  org.eclipse.core.runtime.CoreException: D:\workspace\hadoop-1.1.2\build.xml:83: Execute failed: java.io.IOException: Cannot run program "sed"
  打开build.xml文件,找到sed,然后把注释掉即可,这段内容是为了替换苹果系统的空格的,但是。。Windows出错啦,也难怪,因为hadoop他们压根儿就没想让它在windows上面跑。
<exec executable="sed" inputstring="${os.name}"
outputproperty="nonspace.os">
<arg value="s/ /_/g"/>
</exec> 收起阅读 »

WF追忆

前一阵子学习了一下工作流,现在写个总结记录一下这个过程。要弄工作流,首先就要有个界面来画图,做web的,没办法,只能选择javascript和silverlight,找来找去,最后用了Shareidea的和Workflow11的界面,在此对他们表示感谢,界面是在Shareidea上面进行的修改,把Workflow11的很多东西也揉进来了,最后合成的一个杂交体。但是最后因为要玩hadoop,要清理磁盘空间,把工程给误删了,直到现在才发现。。我3个月的业余时间完成的代码全部被干掉了,已经无法挽回了,只能做一下追忆罢了,现在把残存的一些代码给发上来,算是纪念一下。


  1 /// <summary>
2 /// 解析工作流
3 /// </summary>
4 /// <param name="xml"></param>
5 /// <returns></returns>
6 private string AnalyzerWorkFlow(string xml)
7 {
8 #region 读取xml信息,生成linq to xml 信息
9
10 string xaml = string.Empty;
11 //把字符串解析成XElement
12 Byte[] b = System.Text.UTF8Encoding.UTF8.GetBytes(xml);
13 XElement element = XElement.Load(System.Xml.XmlReader.Create(new MemoryStream(b)));
14
15 #endregion
16
17 #region 保存模板
18
19 //模板信息
20 var template = new Template();
21 template.Name = element.Attribute(XName.Get("Name")).Value;
22 template.Guid = Guid.Parse(element.Attribute(XName.Get("UniqueID")).Value);
23 template.Version = element.Attribute(XName.Get("Version")).Value;
24 template.Templates = xml;
25 template.UpdateTime = DateTime.Now;
26
27 #endregion
28
29 #region 初始化变量
30
31 //获取所有的activity和rule
32 var nodes = from item in element.Descendants("Activity") select item;
33 var rules = from item in element.Descendants("Rule") select item;
34 //建立flowchart,从第一个节点开始遍历整个图
35 Flowchart flowchart = new Flowchart()
36 {
37 DisplayName = template.Name
38 };
39 IActivity activity;
40 FlowNode preStep = new FlowStep();
41 FlowNode nextStep;
42 FlowSwitch<string> flowSwitch;
43 XElement xele;
44 IEnumerable<XElement> linkedRules;
45 string uniqueId = string.Empty;
46 string activityId = string.Empty;
47 //实例化开始节点
48 var firstNode = nodes.First((node) => node.Attribute(XName.Get("Type")).Value == "INITIAL");
49 var startActivity = new StartActivity(firstNode.Attribute(XName.Get("UniqueID")).Value);
50 startActivity.DisplayName = firstNode.Attribute(XName.Get("ActivityName")).Value;
51 ((FlowStep)preStep).Action = startActivity;
52 flowchart.Nodes.Add(preStep);
53 flowchart.StartNode = preStep;
54 //设置一个栈,把东西put进去;设置一个字典,把创建过的活动id,以及位置记录进去
55 var stack = new Stack<IActivity>();
56 var dic = new Dictionary<string, int>();
57 stack.Push(startActivity);
58 dic.Add(startActivity.ActivityId, flowchart.Nodes.Count - 1);
59
60
61 #endregion
62
63 #region 遍历生成flowchart图形
64
65 while (stack.Count > 0)
66 {
67 activity = stack.Pop();
68 activityId = activity.ActivityId;
69 linkedRules = from rule in rules
70 where rule.Attribute(XName.Get("BeginActivityUniqueID")).Value == activityId
71 select rule;
72 //节点被清空之后,重新定位
73 if (preStep == null)
74 {
75 preStep = flowchart.Nodes[dic[activityId]];
76 }
77 //后续活动有多个
78 if (linkedRules.Count() > 1)
79 {
80 //条件自动判断路径活动

81 if (activity is ConditionActivity)
82 {
83 #region 判断节点,根据用户事先预置的条件自动判断选择下一步节点
84
85 string trueActivityId = ((ConditionActivity)activity).Property.ActivityForTrue;
86 string falseActivityId = ((ConditionActivity)activity).Property.ActivityForFalse;
87
88 //把false写在前面是因为栈是倒序的,遍历节点按照倒序的方式来遍历了,但是在生成xaml的时候,
89 //生成出来的xaml的条件中的true节点的后续节点在后面呢,还没建立,所以无法引用到后续的节点
90 //只有前面的节点先建立了,后面的节点才能使用前面的节点的引用
91 if (!dic.ContainsKey(falseActivityId))
92 {
93 xele = nodes.First((node) =>
94 node.Attribute(XName.Get("UniqueID")).Value == falseActivityId);
95 activity = GetActivityByType(xele);
96 nextStep = CreateFlowNodeByType(activity);
97 flowchart.Nodes.Add(nextStep);
98 dic.Add(falseActivityId, flowchart.Nodes.Count - 1);
99 ((FlowDecision)preStep).False = nextStep;
100 stack.Push(activity);
101 }
102 else
103 {
104 ((FlowDecision)preStep).False = flowchart.Nodes[(dic[falseActivityId])];
105 }
106
107 if (!dic.ContainsKey(trueActivityId))
108 {
109 xele = nodes.First((node) =>
110 node.Attribute(XName.Get("UniqueID")).Value == trueActivityId);
111 activity = GetActivityByType(xele);
112 nextStep = CreateFlowNodeByType(activity);
113 flowchart.Nodes.Add(nextStep);
114 dic.Add(trueActivityId, flowchart.Nodes.Count - 1);
115 ((FlowDecision)preStep).True = nextStep;
116 preStep = nextStep;
117 stack.Push(activity);
118 }
119 else
120 {
121 ((FlowDecision)preStep).True = flowchart.Nodes[(dic[trueActivityId])];
122 }
123
124 #endregion
125
126 }
127 //用户选择路径活动
128 else if (activity is ActiveActivity)
129 {
130 //后续活动类型为串行
131 if (((ActiveActivity)activity).Property.after_type == 1)
132 {
133 #region 串行活动,处理人选择下一步操作
134
135 flowSwitch = new FlowSwitch<string>();
136 flowSwitch.Expression = new VisualBasicValue<string>() { ExpressionText = "NextWay" };
137 flowSwitch.Default = flowchart.StartNode;
138 foreach (XElement linkedRule in linkedRules)
139 {
140 uniqueId = linkedRule.Attribute(XName.Get("EndActivityUniqueID")).Value;
141 if (!dic.ContainsKey(uniqueId))
142 {
143 xele = nodes.First((node) =>
144 node.Attribute(XName.Get("UniqueID")).Value == uniqueId);
145 activity = GetActivityByType(xele);
146 nextStep = CreateFlowNodeByType(activity);
147 flowchart.Nodes.Add(nextStep);
148 dic.Add(activity.ActivityId, flowchart.Nodes.Count() - 1);
149 flowSwitch.Cases.Add(uniqueId.ToLower(), nextStep);
150 preStep = nextStep;
151 stack.Push(activity);
152 }
153 else
154 {
155 flowSwitch.Cases.Add(uniqueId.ToLower(), flowchart.Nodes[(dic[uniqueId])]);
156 }
157 }
158 flowchart.Nodes.Add(flowSwitch);
159 //通过activityId找到节点在flowchart中的位置,然后设置它的next节点
160 ((FlowStep)flowchart.Nodes[dic[activityId]]).Next =
161 flowchart.Nodes[flowchart.Nodes.Count - 1];
162
163 #endregion
164
165 }
166 //后续活动类型为并行活动
167 else
168 {
169 #region 并行活动
170
171 var parallel = new Parallel();
172 parallel.CompletionCondition = false;
173 Switch<string> witch;
174 Sequence seq;
175 var currentCount = 0;
176 //取得汇合节点的id
177 var joinPointId = GetJoinPointId(rules, activityId);
178 foreach (XElement linkedRule in linkedRules)
179 {
180 uniqueId = linkedRule.Attribute(XName.Get("EndActivityUniqueID")).Value;
181 //如果连线直接连着
182 if (uniqueId == joinPointId) continue;
183 xele = nodes.First((node) =>
184 node.Attribute(XName.Get("UniqueID")).Value == uniqueId);
185 activity = GetActivityByType(xele);
186 currentCount = stack.Count;
187 seq = new Sequence();
188 seq.Activities.Add((Activity)activity);
189 stack.Push(activity);
190 while (stack.Count > currentCount)
191 {
192 activity = stack.Pop();
193 uniqueId = activity.ActivityId;
194 var seqRules = from rule in rules
195 where
196 rule.Attribute(XName.Get("BeginActivityUniqueID")).Value ==
197 uniqueId
198 select rule;
199 if (seqRules.Count() > 1)
200 {
201 witch = new Switch<string>();
202 witch.Expression = new VisualBasicValue<string>()
203 {
204 ExpressionText = "NextWay"
205 };
206 foreach (XElement seqRule in seqRules)
207 {
208 var caseId = seqRule.Attribute("EndActivityUniqueID").Value;
209 if (caseId != joinPointId)
210 {
211 xele = nodes.First((node) =>
212 node.Attribute(XName.Get("UniqueID")).Value == caseId);
213 activity = GetActivityByType(xele);
214 witch.Cases.Add(caseId.ToLower(), (Activity)activity);
215 stack.Push(activity);
216 }
217 }
218 seq.Activities.Add(witch);
219 }
220 else if (seqRules.Count() == 1)
221 {
222 uniqueId = seqRules.First().Attribute("EndActivityUniqueID").Value;
223 if (uniqueId != joinPointId)
224 {
225 xele = nodes.First((node) =>
226 node.Attribute(XName.Get("UniqueID")).Value == uniqueId);
227 activity = GetActivityByType(xele);
228 seq.Activities.Add((Activity)activity);
229 stack.Push(activity);
230 }
231 }
232 }
233 parallel.Branches.Add(seq);
234 }
235 //并行节点作为flowchart中的一个节点来处理
236 nextStep = new FlowStep();
237 ((FlowStep)nextStep).Action = parallel;
238 ((FlowStep)preStep).Next = nextStep;
239 flowchart.Nodes.Add(nextStep);
240 preStep = nextStep;
241 //处理完并行结构之后,添加汇合节点
242 xele = nodes.First((node) =>
243 node.Attribute(XName.Get("UniqueID")).Value == uniqueId);
244 activity = GetActivityByType(xele);
245 nextStep = CreateFlowNodeByType(activity);
246 flowchart.Nodes.Add(nextStep);
247 dic.Add(activity.ActivityId, flowchart.Nodes.Count() - 1);
248 ((FlowStep)preStep).Next = nextStep;
249 preStep = nextStep;
250 stack.Push(activity);
251
252 #endregion
253 }
254 }
255 }
256 //后续活动只有一个
257 else if (linkedRules.Count() == 1)
258 {
259 #region 后续只有一个活动节点
260
261 uniqueId = linkedRules.First().Attribute("EndActivityUniqueID").Value;
262
263 if (!dic.ContainsKey(uniqueId))
264 {
265 xele = nodes.First((node) => node.Attribute(XName.Get("UniqueID")).Value == uniqueId);
266 activity = GetActivityByType(xele);
267 nextStep = CreateFlowNodeByType(activity);
268 flowchart.Nodes.Add(nextStep);
269 dic.Add(activity.ActivityId, flowchart.Nodes.Count() - 1);
270 ((FlowStep)preStep).Next = nextStep;
271 preStep = nextStep;
272 stack.Push(activity);
273 }
274 else
275 {
276 //活动已存在,通过dic字典中记录的位置,将“前节点”的Next指针指向它
277 ((FlowStep)flowchart.Nodes[dic[activityId]]).Next =
278 flowchart.Nodes[dic[uniqueId]];
279 //((FlowStep)preStep).Next = flowchart.Nodes.ElementAt(dic[uniqueId]);
280 }
281
282 #endregion
283 }
284 //没有后续节点
285 else
286 {
287 //如果没有后续节点,则把“前节点”清空,然后重新定位前节点
288 preStep = null;
289 }
290
291 }
292
293 #endregion
294
295 #region 将图形转成xaml格式的文件,并且保存
296
297 try
298 {
299
300 xaml = GetXmlFromActiviyBuilder(flowchart);
301 //xaml = XamlServices.Save(flowchart);
302 template.XamlTemplate = xaml;
303 using (var scope = new System.Transactions.TransactionScope())
304 {
305 TemplateService.AddTemplate(template);
306 CreateRoad(rules, template.Guid);
307 scope.Complete();
308 }
309
310 }
311 catch (Exception ex)
312 {
313 xaml = ex.Message;
314 }
315
316 #endregion
317
318 return xaml;
319 }
320
321 #region 辅助函数
322
323 /// <summary>
324 /// 通过ActivityBuilder给添加一个传入参数,否则无法传值。
325 /// </summary>
326 /// <param name="flowchart"></param>
327 /// <returns></returns>
328 private string GetXmlFromActiviyBuilder(Flowchart flowchart)
329 {
330 ActivityBuilder ab = new ActivityBuilder();
331 ab.Implementation = flowchart;
332 ab.Properties.Add(new DynamicActivityProperty()
333 {
334 Name = "Entity",
335 Type = typeof(InOutArgument<object>)
336 });
337 ab.Properties.Add(new DynamicActivityProperty()
338 {
339 Name = "NextWay",
340 Type = typeof(InOutArgument<string>)
341 });
342 StringBuilder stringBuilder = new StringBuilder();
343 StringWriter stringWriter = new StringWriter(stringBuilder);
344 XamlSchemaContext xamlSchemaContext = new XamlSchemaContext();
345 XamlXmlWriter xamlXmlWriter = new XamlXmlWriter(stringWriter, xamlSchemaContext);
346 XamlWriter xamlWriter = ActivityXamlServices.CreateBuilderWriter(xamlXmlWriter);
347 XamlServices.Save(xamlWriter, ab);
348 return stringBuilder.ToString();
349 }
350
351
352 /// <summary>
353 /// 创建路径线路图,用于用户打开单据时候,生成操作按钮
354 /// </summary>
355 /// <param name="rules"></param>
356 /// <param name="templateId"></param>
357 private void CreateRoad(IEnumerable<XElement> rules, Guid templateId)
358 {
359 var roadList = new List<object>();
360 TemplateRoads road = null;
361 foreach (var rule in rules)
362 {
363 road = new TemplateRoads();
364 road.Id = Guid.NewGuid();
365 road.Source = rule.Attribute(XName.Get("BeginActivityUniqueID")).Value;
366 road.Target = rule.Attribute(XName.Get("EndActivityUniqueID")).Value;
367 road.Name = rule.Attribute(XName.Get("RuleName")).Value;
368 road.TemplateId = templateId;
369 //这个是控制他们的顺序的
370 road.Zindex = Convert.ToInt32(rule.Attribute(XName.Get("ZIndex")).Value);
371 roadList.Add(road);
372 }
373 DBHelper.WriteDataTableToDb(Common.FillDataTable(roadList));
374 }
375
376 /// <summary>
377 /// 创建路径线路图,用于用户打开单据时候,生成操作按钮
378 /// </summary>
379 /// <param name="activitys"></param>
380 /// <param name="templateId"></param>
381 private void CreateActivitys(IEnumerable<XElement> activitys, Guid templateId)
382 {
383 var roadList = new List<object>();
384 TemplateRoads road;
385 foreach (var activity in activitys)
386 {
387 road = new TemplateRoads();
388 road.Id = Guid.NewGuid();
389 road.Source = activity.Attribute(XName.Get("BeginActivityUniqueID")).Value;
390 road.Target = activity.Attribute(XName.Get("EndActivityUniqueID")).Value;
391 road.Name = activity.Attribute(XName.Get("RuleName")).Value;
392 road.TemplateId = templateId;
393 //这个是控制他们的顺序的
394 road.Zindex = Convert.ToInt32(activity.Attribute(XName.Get("ZIndex")).Value);
395 roadList.Add(road);
396 }
397 DBHelper.WriteDataTableToDb(Common.FillDataTable(roadList));
398 }
399
400
401 /// <summary>
402 /// 通过开始分叉节点的id,计算流程汇合的节点的id
403 /// </summary>
404 /// <param name="rules">所有的线路</param>
405 /// <param name="startNodeId">开始节点的id</param>
406 /// <returns></returns>
407 private string GetJoinPointId(IEnumerable<XElement> rules, string startNodeId)
408 {
409 var linkedRules = from rule in rules
410 where rule.Attribute(XName.Get("BeginActivityUniqueID")).Value == startNodeId
411 select rule;
412
413 if (linkedRules.Count() > 1)
414 {
415 var list = new List<IEnumerable<XElement>>();
416 var uniqueId = string.Empty;
417 for (int i = 0; i < linkedRules.Count(); i++)
418 {
419 uniqueId = linkedRules.ElementAt(i).Attribute(XName.Get("EndActivityUniqueID")).Value;
420 list.Add(GetAfterRules(rules, uniqueId).ToList());
421 }
422 //计算交集
423 IEnumerable<XElement> result = null;
424 foreach (IEnumerable<XElement> item in list)
425 {
426 if (result == null)
427 {
428 result = item;
429 }
430 else
431 {
432 result = result.Intersect(item);
433 }
434 }
435 if (result != null && result.Count() > 0)
436 {
437 return result.First().Attribute(XName.Get("BeginActivityUniqueID")).Value;
438 }
439
440
441 }
442
443 return null;
444
445
446 }
447
448
449 /// <summary>
450 /// 递归查找某个节点的后续连线
451 /// </summary>
452 /// <param name="rules">所有的线路</param>
453 /// <param name="startNodeId">开始节点</param>
454 /// <returns></returns>
455 private IEnumerable<XElement> GetAfterRules(IEnumerable<XElement> rules, string startNodeId)
456 {
457 var linkedRules = from rule in rules
458 where rule.Attribute(XName.Get("BeginActivityUniqueID")).Value == startNodeId
459 select rule;
460
461 return linkedRules.ToList().Concat(linkedRules.ToList().SelectMany(
462 t => GetAfterRules(rules, t.Attribute(XName.Get("EndActivityUniqueID")).Value)));
463 }
464
465
466
467 /// <summary>
468 /// 根据activity的类型,返回相应的FlowNode节点类型
469 /// </summary>
470 /// <param name="activity"></param>
471 /// <returns></returns>
472 private FlowNode CreateFlowNodeByType(IActivity activity)
473 {
474 if (activity is ConditionActivity)
475 {
476 return new FlowDecision() { Condition = activity as ConditionActivity };
477 }
478 else
479 {
480 return new FlowStep() { Action = (Activity)activity };
481 }
482 }
483
484 /// <summary>
485 /// 通过类型来创建活动
486 /// </summary>
487 /// <param name="element">节点元素</param>
488 /// <returns>返回对应的活动</returns>
489 private IActivity GetActivityByType(XElement element)
490 {
491 var uniqueId = element.Attribute(XName.Get("UniqueID")).Value;
492 var type = element.Attribute(XName.Get("Type")).Value;
493 //取得属性节点
494 var property = element.FirstNode.NextNode as XElement;
495 dynamic propertyObj = null;
496
497 switch (type)
498 {
499 case "INITIAL":
500 return new StartActivity(uniqueId);
501 case "COMPLETION":
502 return new EndActivity(uniqueId);
503 case "INTERACTION":
504 propertyObj = new WFActivityProperty();
505 XmlUtils.XMLToModel(property.ToString(), propertyObj);
506 return new ActiveActivity(uniqueId, propertyObj);
507 case "CONDITION":
508 propertyObj = new WFConditionProperty();
509 XmlUtils.XMLToModel(property.ToString(), propertyObj);
510 return new ConditionActivity(uniqueId, propertyObj);
511 default:
512 return null;
513 }
514
515 }
516
517 #endregion
View Code
 


   这是生成xaml的算法。还想说点什么,但是也没有代码了,说啥啊。。 无代码无真相。。 就说点关于自定义节点的问题吧,用flowchart来构图的话,会遇到一个问题,就是并行节点的处理,在我上面的算法当中,是把并行节点开始到并行结束节点之间的节点视作一个FlowNode,但是如果需要并行之后还有并行这些更复杂的工作流节点的话,可以考虑用NativeActivity,下面是我在写动态修改工作流实例的时候在官网上面找到的一些代码,它是一个并行节点的实现,我觉得是一个很重大的发现。

 1 public sealed class MyParallelActivity : NativeActivity
2 {
3 Collection<Activity> branches;
4
5 public Collection<Activity> Branches
6 {
7 get
8 {
9 if (this.branches == null)
10 {
11 this.branches = new Collection<Activity>();
12 }
13 return this.branches;
14 }
15 }
16
17 protected override void Execute(NativeActivityContext context)
18 {
19 foreach (Activity branch in this.Branches)
20 {
21 context.ScheduleActivity(branch);
22 }
23 }
24
25 protected override void OnCreateDynamicUpdateMap(NativeActivityUpdateMapMetadata metadata, Activity originalActivity)
26 {
27 metadata.AllowUpdateInsideThisActivity();
28 }
29
30 protected override void UpdateInstance(NativeActivityUpdateContext updateContext)
31 {
32 // look for new branches that need to be scheduled
33 foreach (Activity branch in this.Branches)
34 {
35 // NativeActivityUpdateContext.IsNewlyAdded looks for children not present in the original (pre-update) activity
36 if (updateContext.IsNewlyAdded(branch))
37 {
38 updateContext.ScheduleActivity(branch);
39 }
40 }
41 }
42 }
View Code
 
  注意Execute方法中的一句话:context.ScheduleActivity(branch); --->调度执行子活动,看到这一句之后,我确实是很兴奋的,因为之前也想过自己写一个完整的Activity,但是苦于不知道怎么执行它的下一个活动。所以如果想重新实现的朋友请继承NativeActivity来实现,因为除了原生的类型之后,WF只支持NativeActivity动态修改后面的流程。
再想想,还有什么没交代的。。。想到了一个,就是判断条件的,比如switch的这种开关的判断条件,它的判断条件可以是一个CodeActivity<string>,我们可以继承重写一个,然后就可以在Execute方法当中写判断的代码了,这里主要是要用到CodeDom来在运行时动态计算结果。就说这么多了,没代码,什么都讲不清楚了,说了也白说。
  这个故事告诉我,我需要一个保存代码的服务器了。。。
  最后把我残存的那一点代码放出来吧,在CSDN上下载http://download.csdn.net/detail/cenyuhaiwork/5670947
继续阅读 »
前一阵子学习了一下工作流,现在写个总结记录一下这个过程。要弄工作流,首先就要有个界面来画图,做web的,没办法,只能选择javascript和silverlight,找来找去,最后用了Shareidea的和Workflow11的界面,在此对他们表示感谢,界面是在Shareidea上面进行的修改,把Workflow11的很多东西也揉进来了,最后合成的一个杂交体。但是最后因为要玩hadoop,要清理磁盘空间,把工程给误删了,直到现在才发现。。我3个月的业余时间完成的代码全部被干掉了,已经无法挽回了,只能做一下追忆罢了,现在把残存的一些代码给发上来,算是纪念一下。


  1 /// <summary>
2 /// 解析工作流
3 /// </summary>
4 /// <param name="xml"></param>
5 /// <returns></returns>
6 private string AnalyzerWorkFlow(string xml)
7 {
8 #region 读取xml信息,生成linq to xml 信息
9
10 string xaml = string.Empty;
11 //把字符串解析成XElement
12 Byte[] b = System.Text.UTF8Encoding.UTF8.GetBytes(xml);
13 XElement element = XElement.Load(System.Xml.XmlReader.Create(new MemoryStream(b)));
14
15 #endregion
16
17 #region 保存模板
18
19 //模板信息
20 var template = new Template();
21 template.Name = element.Attribute(XName.Get("Name")).Value;
22 template.Guid = Guid.Parse(element.Attribute(XName.Get("UniqueID")).Value);
23 template.Version = element.Attribute(XName.Get("Version")).Value;
24 template.Templates = xml;
25 template.UpdateTime = DateTime.Now;
26
27 #endregion
28
29 #region 初始化变量
30
31 //获取所有的activity和rule
32 var nodes = from item in element.Descendants("Activity") select item;
33 var rules = from item in element.Descendants("Rule") select item;
34 //建立flowchart,从第一个节点开始遍历整个图
35 Flowchart flowchart = new Flowchart()
36 {
37 DisplayName = template.Name
38 };
39 IActivity activity;
40 FlowNode preStep = new FlowStep();
41 FlowNode nextStep;
42 FlowSwitch<string> flowSwitch;
43 XElement xele;
44 IEnumerable<XElement> linkedRules;
45 string uniqueId = string.Empty;
46 string activityId = string.Empty;
47 //实例化开始节点
48 var firstNode = nodes.First((node) => node.Attribute(XName.Get("Type")).Value == "INITIAL");
49 var startActivity = new StartActivity(firstNode.Attribute(XName.Get("UniqueID")).Value);
50 startActivity.DisplayName = firstNode.Attribute(XName.Get("ActivityName")).Value;
51 ((FlowStep)preStep).Action = startActivity;
52 flowchart.Nodes.Add(preStep);
53 flowchart.StartNode = preStep;
54 //设置一个栈,把东西put进去;设置一个字典,把创建过的活动id,以及位置记录进去
55 var stack = new Stack<IActivity>();
56 var dic = new Dictionary<string, int>();
57 stack.Push(startActivity);
58 dic.Add(startActivity.ActivityId, flowchart.Nodes.Count - 1);
59
60
61 #endregion
62
63 #region 遍历生成flowchart图形
64
65 while (stack.Count > 0)
66 {
67 activity = stack.Pop();
68 activityId = activity.ActivityId;
69 linkedRules = from rule in rules
70 where rule.Attribute(XName.Get("BeginActivityUniqueID")).Value == activityId
71 select rule;
72 //节点被清空之后,重新定位
73 if (preStep == null)
74 {
75 preStep = flowchart.Nodes[dic[activityId]];
76 }
77 //后续活动有多个
78 if (linkedRules.Count() > 1)
79 {
80 //条件自动判断路径活动

81 if (activity is ConditionActivity)
82 {
83 #region 判断节点,根据用户事先预置的条件自动判断选择下一步节点
84
85 string trueActivityId = ((ConditionActivity)activity).Property.ActivityForTrue;
86 string falseActivityId = ((ConditionActivity)activity).Property.ActivityForFalse;
87
88 //把false写在前面是因为栈是倒序的,遍历节点按照倒序的方式来遍历了,但是在生成xaml的时候,
89 //生成出来的xaml的条件中的true节点的后续节点在后面呢,还没建立,所以无法引用到后续的节点
90 //只有前面的节点先建立了,后面的节点才能使用前面的节点的引用
91 if (!dic.ContainsKey(falseActivityId))
92 {
93 xele = nodes.First((node) =>
94 node.Attribute(XName.Get("UniqueID")).Value == falseActivityId);
95 activity = GetActivityByType(xele);
96 nextStep = CreateFlowNodeByType(activity);
97 flowchart.Nodes.Add(nextStep);
98 dic.Add(falseActivityId, flowchart.Nodes.Count - 1);
99 ((FlowDecision)preStep).False = nextStep;
100 stack.Push(activity);
101 }
102 else
103 {
104 ((FlowDecision)preStep).False = flowchart.Nodes[(dic[falseActivityId])];
105 }
106
107 if (!dic.ContainsKey(trueActivityId))
108 {
109 xele = nodes.First((node) =>
110 node.Attribute(XName.Get("UniqueID")).Value == trueActivityId);
111 activity = GetActivityByType(xele);
112 nextStep = CreateFlowNodeByType(activity);
113 flowchart.Nodes.Add(nextStep);
114 dic.Add(trueActivityId, flowchart.Nodes.Count - 1);
115 ((FlowDecision)preStep).True = nextStep;
116 preStep = nextStep;
117 stack.Push(activity);
118 }
119 else
120 {
121 ((FlowDecision)preStep).True = flowchart.Nodes[(dic[trueActivityId])];
122 }
123
124 #endregion
125
126 }
127 //用户选择路径活动
128 else if (activity is ActiveActivity)
129 {
130 //后续活动类型为串行
131 if (((ActiveActivity)activity).Property.after_type == 1)
132 {
133 #region 串行活动,处理人选择下一步操作
134
135 flowSwitch = new FlowSwitch<string>();
136 flowSwitch.Expression = new VisualBasicValue<string>() { ExpressionText = "NextWay" };
137 flowSwitch.Default = flowchart.StartNode;
138 foreach (XElement linkedRule in linkedRules)
139 {
140 uniqueId = linkedRule.Attribute(XName.Get("EndActivityUniqueID")).Value;
141 if (!dic.ContainsKey(uniqueId))
142 {
143 xele = nodes.First((node) =>
144 node.Attribute(XName.Get("UniqueID")).Value == uniqueId);
145 activity = GetActivityByType(xele);
146 nextStep = CreateFlowNodeByType(activity);
147 flowchart.Nodes.Add(nextStep);
148 dic.Add(activity.ActivityId, flowchart.Nodes.Count() - 1);
149 flowSwitch.Cases.Add(uniqueId.ToLower(), nextStep);
150 preStep = nextStep;
151 stack.Push(activity);
152 }
153 else
154 {
155 flowSwitch.Cases.Add(uniqueId.ToLower(), flowchart.Nodes[(dic[uniqueId])]);
156 }
157 }
158 flowchart.Nodes.Add(flowSwitch);
159 //通过activityId找到节点在flowchart中的位置,然后设置它的next节点
160 ((FlowStep)flowchart.Nodes[dic[activityId]]).Next =
161 flowchart.Nodes[flowchart.Nodes.Count - 1];
162
163 #endregion
164
165 }
166 //后续活动类型为并行活动
167 else
168 {
169 #region 并行活动
170
171 var parallel = new Parallel();
172 parallel.CompletionCondition = false;
173 Switch<string> witch;
174 Sequence seq;
175 var currentCount = 0;
176 //取得汇合节点的id
177 var joinPointId = GetJoinPointId(rules, activityId);
178 foreach (XElement linkedRule in linkedRules)
179 {
180 uniqueId = linkedRule.Attribute(XName.Get("EndActivityUniqueID")).Value;
181 //如果连线直接连着
182 if (uniqueId == joinPointId) continue;
183 xele = nodes.First((node) =>
184 node.Attribute(XName.Get("UniqueID")).Value == uniqueId);
185 activity = GetActivityByType(xele);
186 currentCount = stack.Count;
187 seq = new Sequence();
188 seq.Activities.Add((Activity)activity);
189 stack.Push(activity);
190 while (stack.Count > currentCount)
191 {
192 activity = stack.Pop();
193 uniqueId = activity.ActivityId;
194 var seqRules = from rule in rules
195 where
196 rule.Attribute(XName.Get("BeginActivityUniqueID")).Value ==
197 uniqueId
198 select rule;
199 if (seqRules.Count() > 1)
200 {
201 witch = new Switch<string>();
202 witch.Expression = new VisualBasicValue<string>()
203 {
204 ExpressionText = "NextWay"
205 };
206 foreach (XElement seqRule in seqRules)
207 {
208 var caseId = seqRule.Attribute("EndActivityUniqueID").Value;
209 if (caseId != joinPointId)
210 {
211 xele = nodes.First((node) =>
212 node.Attribute(XName.Get("UniqueID")).Value == caseId);
213 activity = GetActivityByType(xele);
214 witch.Cases.Add(caseId.ToLower(), (Activity)activity);
215 stack.Push(activity);
216 }
217 }
218 seq.Activities.Add(witch);
219 }
220 else if (seqRules.Count() == 1)
221 {
222 uniqueId = seqRules.First().Attribute("EndActivityUniqueID").Value;
223 if (uniqueId != joinPointId)
224 {
225 xele = nodes.First((node) =>
226 node.Attribute(XName.Get("UniqueID")).Value == uniqueId);
227 activity = GetActivityByType(xele);
228 seq.Activities.Add((Activity)activity);
229 stack.Push(activity);
230 }
231 }
232 }
233 parallel.Branches.Add(seq);
234 }
235 //并行节点作为flowchart中的一个节点来处理
236 nextStep = new FlowStep();
237 ((FlowStep)nextStep).Action = parallel;
238 ((FlowStep)preStep).Next = nextStep;
239 flowchart.Nodes.Add(nextStep);
240 preStep = nextStep;
241 //处理完并行结构之后,添加汇合节点
242 xele = nodes.First((node) =>
243 node.Attribute(XName.Get("UniqueID")).Value == uniqueId);
244 activity = GetActivityByType(xele);
245 nextStep = CreateFlowNodeByType(activity);
246 flowchart.Nodes.Add(nextStep);
247 dic.Add(activity.ActivityId, flowchart.Nodes.Count() - 1);
248 ((FlowStep)preStep).Next = nextStep;
249 preStep = nextStep;
250 stack.Push(activity);
251
252 #endregion
253 }
254 }
255 }
256 //后续活动只有一个
257 else if (linkedRules.Count() == 1)
258 {
259 #region 后续只有一个活动节点
260
261 uniqueId = linkedRules.First().Attribute("EndActivityUniqueID").Value;
262
263 if (!dic.ContainsKey(uniqueId))
264 {
265 xele = nodes.First((node) => node.Attribute(XName.Get("UniqueID")).Value == uniqueId);
266 activity = GetActivityByType(xele);
267 nextStep = CreateFlowNodeByType(activity);
268 flowchart.Nodes.Add(nextStep);
269 dic.Add(activity.ActivityId, flowchart.Nodes.Count() - 1);
270 ((FlowStep)preStep).Next = nextStep;
271 preStep = nextStep;
272 stack.Push(activity);
273 }
274 else
275 {
276 //活动已存在,通过dic字典中记录的位置,将“前节点”的Next指针指向它
277 ((FlowStep)flowchart.Nodes[dic[activityId]]).Next =
278 flowchart.Nodes[dic[uniqueId]];
279 //((FlowStep)preStep).Next = flowchart.Nodes.ElementAt(dic[uniqueId]);
280 }
281
282 #endregion
283 }
284 //没有后续节点
285 else
286 {
287 //如果没有后续节点,则把“前节点”清空,然后重新定位前节点
288 preStep = null;
289 }
290
291 }
292
293 #endregion
294
295 #region 将图形转成xaml格式的文件,并且保存
296
297 try
298 {
299
300 xaml = GetXmlFromActiviyBuilder(flowchart);
301 //xaml = XamlServices.Save(flowchart);
302 template.XamlTemplate = xaml;
303 using (var scope = new System.Transactions.TransactionScope())
304 {
305 TemplateService.AddTemplate(template);
306 CreateRoad(rules, template.Guid);
307 scope.Complete();
308 }
309
310 }
311 catch (Exception ex)
312 {
313 xaml = ex.Message;
314 }
315
316 #endregion
317
318 return xaml;
319 }
320
321 #region 辅助函数
322
323 /// <summary>
324 /// 通过ActivityBuilder给添加一个传入参数,否则无法传值。
325 /// </summary>
326 /// <param name="flowchart"></param>
327 /// <returns></returns>
328 private string GetXmlFromActiviyBuilder(Flowchart flowchart)
329 {
330 ActivityBuilder ab = new ActivityBuilder();
331 ab.Implementation = flowchart;
332 ab.Properties.Add(new DynamicActivityProperty()
333 {
334 Name = "Entity",
335 Type = typeof(InOutArgument<object>)
336 });
337 ab.Properties.Add(new DynamicActivityProperty()
338 {
339 Name = "NextWay",
340 Type = typeof(InOutArgument<string>)
341 });
342 StringBuilder stringBuilder = new StringBuilder();
343 StringWriter stringWriter = new StringWriter(stringBuilder);
344 XamlSchemaContext xamlSchemaContext = new XamlSchemaContext();
345 XamlXmlWriter xamlXmlWriter = new XamlXmlWriter(stringWriter, xamlSchemaContext);
346 XamlWriter xamlWriter = ActivityXamlServices.CreateBuilderWriter(xamlXmlWriter);
347 XamlServices.Save(xamlWriter, ab);
348 return stringBuilder.ToString();
349 }
350
351
352 /// <summary>
353 /// 创建路径线路图,用于用户打开单据时候,生成操作按钮
354 /// </summary>
355 /// <param name="rules"></param>
356 /// <param name="templateId"></param>
357 private void CreateRoad(IEnumerable<XElement> rules, Guid templateId)
358 {
359 var roadList = new List<object>();
360 TemplateRoads road = null;
361 foreach (var rule in rules)
362 {
363 road = new TemplateRoads();
364 road.Id = Guid.NewGuid();
365 road.Source = rule.Attribute(XName.Get("BeginActivityUniqueID")).Value;
366 road.Target = rule.Attribute(XName.Get("EndActivityUniqueID")).Value;
367 road.Name = rule.Attribute(XName.Get("RuleName")).Value;
368 road.TemplateId = templateId;
369 //这个是控制他们的顺序的
370 road.Zindex = Convert.ToInt32(rule.Attribute(XName.Get("ZIndex")).Value);
371 roadList.Add(road);
372 }
373 DBHelper.WriteDataTableToDb(Common.FillDataTable(roadList));
374 }
375
376 /// <summary>
377 /// 创建路径线路图,用于用户打开单据时候,生成操作按钮
378 /// </summary>
379 /// <param name="activitys"></param>
380 /// <param name="templateId"></param>
381 private void CreateActivitys(IEnumerable<XElement> activitys, Guid templateId)
382 {
383 var roadList = new List<object>();
384 TemplateRoads road;
385 foreach (var activity in activitys)
386 {
387 road = new TemplateRoads();
388 road.Id = Guid.NewGuid();
389 road.Source = activity.Attribute(XName.Get("BeginActivityUniqueID")).Value;
390 road.Target = activity.Attribute(XName.Get("EndActivityUniqueID")).Value;
391 road.Name = activity.Attribute(XName.Get("RuleName")).Value;
392 road.TemplateId = templateId;
393 //这个是控制他们的顺序的
394 road.Zindex = Convert.ToInt32(activity.Attribute(XName.Get("ZIndex")).Value);
395 roadList.Add(road);
396 }
397 DBHelper.WriteDataTableToDb(Common.FillDataTable(roadList));
398 }
399
400
401 /// <summary>
402 /// 通过开始分叉节点的id,计算流程汇合的节点的id
403 /// </summary>
404 /// <param name="rules">所有的线路</param>
405 /// <param name="startNodeId">开始节点的id</param>
406 /// <returns></returns>
407 private string GetJoinPointId(IEnumerable<XElement> rules, string startNodeId)
408 {
409 var linkedRules = from rule in rules
410 where rule.Attribute(XName.Get("BeginActivityUniqueID")).Value == startNodeId
411 select rule;
412
413 if (linkedRules.Count() > 1)
414 {
415 var list = new List<IEnumerable<XElement>>();
416 var uniqueId = string.Empty;
417 for (int i = 0; i < linkedRules.Count(); i++)
418 {
419 uniqueId = linkedRules.ElementAt(i).Attribute(XName.Get("EndActivityUniqueID")).Value;
420 list.Add(GetAfterRules(rules, uniqueId).ToList());
421 }
422 //计算交集
423 IEnumerable<XElement> result = null;
424 foreach (IEnumerable<XElement> item in list)
425 {
426 if (result == null)
427 {
428 result = item;
429 }
430 else
431 {
432 result = result.Intersect(item);
433 }
434 }
435 if (result != null && result.Count() > 0)
436 {
437 return result.First().Attribute(XName.Get("BeginActivityUniqueID")).Value;
438 }
439
440
441 }
442
443 return null;
444
445
446 }
447
448
449 /// <summary>
450 /// 递归查找某个节点的后续连线
451 /// </summary>
452 /// <param name="rules">所有的线路</param>
453 /// <param name="startNodeId">开始节点</param>
454 /// <returns></returns>
455 private IEnumerable<XElement> GetAfterRules(IEnumerable<XElement> rules, string startNodeId)
456 {
457 var linkedRules = from rule in rules
458 where rule.Attribute(XName.Get("BeginActivityUniqueID")).Value == startNodeId
459 select rule;
460
461 return linkedRules.ToList().Concat(linkedRules.ToList().SelectMany(
462 t => GetAfterRules(rules, t.Attribute(XName.Get("EndActivityUniqueID")).Value)));
463 }
464
465
466
467 /// <summary>
468 /// 根据activity的类型,返回相应的FlowNode节点类型
469 /// </summary>
470 /// <param name="activity"></param>
471 /// <returns></returns>
472 private FlowNode CreateFlowNodeByType(IActivity activity)
473 {
474 if (activity is ConditionActivity)
475 {
476 return new FlowDecision() { Condition = activity as ConditionActivity };
477 }
478 else
479 {
480 return new FlowStep() { Action = (Activity)activity };
481 }
482 }
483
484 /// <summary>
485 /// 通过类型来创建活动
486 /// </summary>
487 /// <param name="element">节点元素</param>
488 /// <returns>返回对应的活动</returns>
489 private IActivity GetActivityByType(XElement element)
490 {
491 var uniqueId = element.Attribute(XName.Get("UniqueID")).Value;
492 var type = element.Attribute(XName.Get("Type")).Value;
493 //取得属性节点
494 var property = element.FirstNode.NextNode as XElement;
495 dynamic propertyObj = null;
496
497 switch (type)
498 {
499 case "INITIAL":
500 return new StartActivity(uniqueId);
501 case "COMPLETION":
502 return new EndActivity(uniqueId);
503 case "INTERACTION":
504 propertyObj = new WFActivityProperty();
505 XmlUtils.XMLToModel(property.ToString(), propertyObj);
506 return new ActiveActivity(uniqueId, propertyObj);
507 case "CONDITION":
508 propertyObj = new WFConditionProperty();
509 XmlUtils.XMLToModel(property.ToString(), propertyObj);
510 return new ConditionActivity(uniqueId, propertyObj);
511 default:
512 return null;
513 }
514
515 }
516
517 #endregion
View Code
 


   这是生成xaml的算法。还想说点什么,但是也没有代码了,说啥啊。。 无代码无真相。。 就说点关于自定义节点的问题吧,用flowchart来构图的话,会遇到一个问题,就是并行节点的处理,在我上面的算法当中,是把并行节点开始到并行结束节点之间的节点视作一个FlowNode,但是如果需要并行之后还有并行这些更复杂的工作流节点的话,可以考虑用NativeActivity,下面是我在写动态修改工作流实例的时候在官网上面找到的一些代码,它是一个并行节点的实现,我觉得是一个很重大的发现。

 1 public sealed class MyParallelActivity : NativeActivity
2 {
3 Collection<Activity> branches;
4
5 public Collection<Activity> Branches
6 {
7 get
8 {
9 if (this.branches == null)
10 {
11 this.branches = new Collection<Activity>();
12 }
13 return this.branches;
14 }
15 }
16
17 protected override void Execute(NativeActivityContext context)
18 {
19 foreach (Activity branch in this.Branches)
20 {
21 context.ScheduleActivity(branch);
22 }
23 }
24
25 protected override void OnCreateDynamicUpdateMap(NativeActivityUpdateMapMetadata metadata, Activity originalActivity)
26 {
27 metadata.AllowUpdateInsideThisActivity();
28 }
29
30 protected override void UpdateInstance(NativeActivityUpdateContext updateContext)
31 {
32 // look for new branches that need to be scheduled
33 foreach (Activity branch in this.Branches)
34 {
35 // NativeActivityUpdateContext.IsNewlyAdded looks for children not present in the original (pre-update) activity
36 if (updateContext.IsNewlyAdded(branch))
37 {
38 updateContext.ScheduleActivity(branch);
39 }
40 }
41 }
42 }
View Code
 
  注意Execute方法中的一句话:context.ScheduleActivity(branch); --->调度执行子活动,看到这一句之后,我确实是很兴奋的,因为之前也想过自己写一个完整的Activity,但是苦于不知道怎么执行它的下一个活动。所以如果想重新实现的朋友请继承NativeActivity来实现,因为除了原生的类型之后,WF只支持NativeActivity动态修改后面的流程。
再想想,还有什么没交代的。。。想到了一个,就是判断条件的,比如switch的这种开关的判断条件,它的判断条件可以是一个CodeActivity<string>,我们可以继承重写一个,然后就可以在Execute方法当中写判断的代码了,这里主要是要用到CodeDom来在运行时动态计算结果。就说这么多了,没代码,什么都讲不清楚了,说了也白说。
  这个故事告诉我,我需要一个保存代码的服务器了。。。
  最后把我残存的那一点代码放出来吧,在CSDN上下载http://download.csdn.net/detail/cenyuhaiwork/5670947收起阅读 »

利用WCF改进文件流传输的三种方式

WCF在跨域传输使用了两种模型的方法调用:一种是同步模型,这种模型显然对那些需要大量操作时间的方法调用(如从数据库中获取大量数据时)是一种痛苦的选择。另一种是异步模型的方法调用,这种模型是一种非阻塞方法,其方法调用期间并不等到方法调用结束获得结果才返回,而是方法调用一经开始就马上返回,程序可以继续向前执行,被调用方法和主程序同时执行,在调用方法结束才返回结果。显然这种模型给了我们很好的编程和使用体验。
基于WCF在普通的编码是以文本编码方式在信道之间传输信息的,这种编码会把所有的二进制信息以字节数组的形式存储,并以Base64进行编码,而Base64则是用三个字节来储存4 个字符信息。使得数据量增大约30%以上。在WCF中引入了一种专门针对数据流进行优化编码的MTOM模型。下面我们使用编码模型和调用模型三种方式来改写文件流的传输,以提高WCF应用程序的性能。
1、 MTOM模型:
这模型在于将SOAP消息编码成SOAP MT OM(消息传输优化机制)编码。这种编码是为那些包含大量的二进制数据的SOAP消息而做的,它是把数据流作为SOAP消息的附件而添加的。所以利用这种编码在传输信道之间传输可以显著提高传输性能。在WCF中MTOM模型的操作契约中只能使用单个Stream对象作为参数或者返回类型。

这种模型的特点如图所示:
1.1实现服务契约
服务契约是服务所支持的操作、使用的消息交换模式和每一则消息的格式,它控制消息被格式化的方式,在这里由于要使用MTOM编码消息,所以在操作契约中必须要以单一的Stream对象为输入输出参数。所以这儿我们把服务定义为如下的形式:
[ServiceContract]
public interface ISendStreamService
{
[OperationContract]
void SendStream(Stream stream);
//这个方法的是为了传递文件的参数而设的
[OperationContract]
void FileNameSetting(string filename, string destinationpath);
}
另外我们还定义了一个传输文件路径的名称的辅助方法:FileNameSetting();
1.2实现服务器方法
在上面定义了公共的接口后,接下来我们就实现接口的方法,主要的方法的目的是为了传输Stream对象,由于Stream是一个抽象类,所以这儿以文件流为操作对象来使用SendStream()这个方法。
public class SendStreamService : ISendStreamService
{
static FileStream outStream = null;
static int startLength;
static int fileLength;
static int maxBytesCount=4096;
static byte[] bytes = new byte[int maxBytesCount];
string filePath;
static string fileName;
public void SendStream(System.IO.Stream stream)
{
string file = filePath + "//" + fileName;
outStream = new FileStream(file, FileMode.OpenOrCreate, FileAccess.Write);
try {
while ((startLength = stream.Read(bytes, 0, int maxBytesCount)) > 0){
outStream.Write(bytes, 0, startLength);
fileLength += startLength;
}
}
catch (Exception e) { }
}
public void FileNameSetting(string filename,string destinationpath)
{
fileName = filename;
filePath = destinationpath;
}
}
1.3客户通过接口调用服务器方法
客户端调用服务器方法至少有三种,这里我们选择工厂方法来实现,System.ServiceMode.Channel.ChannelFactory<T>类是这个信道工厂类,它的方法CreateChannel()可以创建T的实例。
ISendStreamService proxy=new
ChannelFactory<ISendStreamService>(“WSHttpBinding_ISendStreamService”).Create-
Channel();
proxy.FileNameSetting(file.Substring (file.LastIndexOf ("\\")+1), filePath);
proxy.SendStream(inStream);
1.4服务器和客户端的配置信息
配置信息定义了双方通信的终结点、绑定、契约行为及其他的配置如安全,可靠性等。服务器的配置如:
<service behaviorConfiguration="SendStreamServiceBehavior"
name="SendStreamService">
<endpoint address=" http://localhost:5504/WebSite2/ISendStreamService "
binding="wsHttpBinding" bindingConfiguration="MTMOBinding"
contract="ISendStreamService">
</endpoint>
<bindings>
<wsHttpBinding>
<binding name="MTMOBinding" messageEncoding="Mtom">
</wsHttpBinding>
</bindings>
</service>
同样客户端的配置如:
<client>
<endpoint address="http://localhost:5504/WebSite2 ... ot%3B
binding="wsHttpBinding" bindingConfiguration="WSHttpBinding_ISendStreamService"
contract="ServiceReference1.ISendStreamService" name="WSHttpBinding_ISendStreamService">
</endpoint>
</client>
<bindings>
<wsHttpBinding>
<binding name="WSHttpBinding_ISendStreamService"
messageEncoding="Mtom" textEncoding="utf-8" >
</binding>
</wsHttpBinding>
</bindings>
注意:在这种方式下使用同步和异步方法没有明显的差别,后来我在分析了Windows Trace Viewer的消息包,发现在用异步方法时,整个过程只用两个消息来回,这就意味着第一次的SOAP包是在把SOAP消息加上MTOM编码的文件流作为附件一起发送的,在等待文件传输完成后才会返回一个加高消息给方法。也就是说异步方法IAsyncResult Begin*(params parameters,AsyncCallback callback,object state)是在发送第一个SOAP包,并等待服务器接收完第一个包后回应消息包才会返回的。由于在发送文件流时,因为文本字符始终不会超过一个SOAP包而必须等待。所在在这种编码方式下异步调用和同步调用没有差别。
2、 基于同步传输的异步回调模型:
同步传输是指方法在调用过程中一直阻塞到方法调用结束返回结果才会让程序继续向前执行,这种行为比较耗费资源,因为网络访问在等待方法完成的时间内是阻塞的。而且如果远程对象的调用时花费的时间会更长,所以这种时间的浪费让人是不可接受的,这在大文件传输中尤为明显。于是一种让方法的异步调用的机制便产生了。这种方法的内部处理中使用线程池中的一个线程接管这个调用,程序可以获得异步调用的返回信息而继续向前执行。
WCF编程模型中采用了一种让同步传输中使用异步回调的方式来提高应用程序的响应。具体是在每个操作契约中可以选择生成异步方法的调用,具体是在同步方法的前面加上
IAsyncResult Begin…..(params param,AsyncCallback,object state)形式表明这是一个异步调用。并且生成相应的void End……(IAsyncResult state)来返回结果。
2.1定义契约和实现相应的同步方法
这里在服务契约中定义了相应的同步方法,用这个调用FileStream类的同步方法Read()和Write()方法对文件进行读写操作,以实现将文件传输到服务的机器上。这里在服务契约中通过设置属性CallbackContract来实现客户端的回调功能。来其相应的代码如下:
[ServiceContract(CallbackContract = typeof(IUploadCallback))]
public interface IUploadFileService
{
//同步传输的接口
[OperationContract]
void FileUpload(string localFilePath, string netPath);
}
public interface IUploadCallback
{
[OperationContract(IsOneWay = true)]
void ReportFileUpload(int length);//这个回调函数是文件传输完成时发布一个通知
}
//实现文件读写的服务器方法
public class UploadFileService : IUploadFileService
{
static int startLength;
static int fileLength;
static int bytesLength;
static byte[] bytes;
static int maxLength=4096;
static FileStream inStream = null;
static FileStream outStream = null;
IUploadCallback client = null;
public void FileUpload(string localFilePath, string netPath)
{
//获得客户端代理的回调
client = OperationContext.Current.GetCallbackChannel<IUploadCallback>();
//得到原始文件名
string fileName = localFilePath.Substring(localFilePath.LastIndexOf("\\") + 1);
string netFile = netPath + fileName;
bytes = new byte[maxLength];//设置缓冲区
try
{
outStream = new FileStream(netFile, FileMode.OpenOrCreate, FileAccess.Write);
inStream = File.OpenRead(localFilePath);//打开文件读
int length;
while ((length = inStream.Read(bytes, 0, maxLength)) > 0)
{
fileLength += length;
outStream.Write(bytes, 0, length);
}
}
catch (Exception e)
{
Console.WriteLine("文件上传错误:" + e.Message);
inStream.Close();
outStream.Close();
}
finally
{
client.ReportFileUpload(fileLength);//使用回调报告文件的状态
inStream.Close();
outStream.Close();
}
}
2.2在客户端调用方法
在客户端调用BeginFileUpload()和EndFileUpload()方法来实现客户端的异步回调。并在这些方法完成后服务调用客户回调ReportFileUpload()报告给客户端相应的信息。
3、 基于异步传输的异步模型:
在同步方式处理中,文件传输的时间是和文件的长度密切相关的,对于一个大容量的文件传输,如果全部在主线程中执行,那么应用程序可能会等待很长的时间,因此我们给予文件流以异步方法读写的方法来实现性能的改进。这只调用了文件操作的异步处理。第二种模式一样这也是采用线程池来完成的。这实际上是利用了文件流的异步方法。
在这儿我们仍然使用第二种模型的WCF框架,只是我们这儿使用了FileStream对象BeginWrite();BeginRead()方法及相应的EndWrite();EndRead()方法。这儿我们只给出了服务器的方法实现:
public void AsyncFileUpload(string localFilePath, string netPath)
{
//获得客户端代理的回调
client = OperationContext.Current.GetCallbackChannel<IUploadCallback>();
//得到原始文件名
string fileName = localFilePath.Substring(localFilePath.LastIndexOf("\\") + 1);
string netFile = netPath + fileName;
bytes = new byte[maxLength];//设置缓冲区
try{
outStream = new
FileStream(netFile, FileMode.OpenOrCreate, FileAccess.Write);
inStream = File.OpenRead(localFilePath);//打开文件读
inStream.BeginRead(bytes, 0, maxLength, CallbackOnRead, null);
}
catch (Exception e){
inStream.Close();
outStream.Close();
}
}
void CallbackOnRead(IAsyncResult result)
{
int length =inStream.EndRead(result);
if (length >= 0){
fileLength += length;
outStream.BeginWrite(bytes, 0, length, CallbackOnWrite, null);
}
else{
client.ReportFileUpload(fileLength);//使用回调
inStream.Close();
outStream.Close();
}
}
void CallbackOnWrite(IAsyncResult result){
outStream.EndWrite(result);
inStream.BeginRead(bytes, 0, maxLength, CallbackOnRead, null);
}
通过以上的分析, 基于MTOM编码的文件流传输时,可以提高传输性能,而对于后两种方式的前提是必须是普通的文本消息编码才会有效果,才可以提高程序的响应性能。也就是说后两种方式只是一种提高WCF应用程序响应性能的方式,它的传输数据量会有明显的膨胀。具体设计中要看在传输效率和响应性能两者取舍来选取其一而用。
继续阅读 »
WCF在跨域传输使用了两种模型的方法调用:一种是同步模型,这种模型显然对那些需要大量操作时间的方法调用(如从数据库中获取大量数据时)是一种痛苦的选择。另一种是异步模型的方法调用,这种模型是一种非阻塞方法,其方法调用期间并不等到方法调用结束获得结果才返回,而是方法调用一经开始就马上返回,程序可以继续向前执行,被调用方法和主程序同时执行,在调用方法结束才返回结果。显然这种模型给了我们很好的编程和使用体验。
基于WCF在普通的编码是以文本编码方式在信道之间传输信息的,这种编码会把所有的二进制信息以字节数组的形式存储,并以Base64进行编码,而Base64则是用三个字节来储存4 个字符信息。使得数据量增大约30%以上。在WCF中引入了一种专门针对数据流进行优化编码的MTOM模型。下面我们使用编码模型和调用模型三种方式来改写文件流的传输,以提高WCF应用程序的性能。
1、 MTOM模型:
这模型在于将SOAP消息编码成SOAP MT OM(消息传输优化机制)编码。这种编码是为那些包含大量的二进制数据的SOAP消息而做的,它是把数据流作为SOAP消息的附件而添加的。所以利用这种编码在传输信道之间传输可以显著提高传输性能。在WCF中MTOM模型的操作契约中只能使用单个Stream对象作为参数或者返回类型。

这种模型的特点如图所示:
1.1实现服务契约
服务契约是服务所支持的操作、使用的消息交换模式和每一则消息的格式,它控制消息被格式化的方式,在这里由于要使用MTOM编码消息,所以在操作契约中必须要以单一的Stream对象为输入输出参数。所以这儿我们把服务定义为如下的形式:
[ServiceContract]
public interface ISendStreamService
{
[OperationContract]
void SendStream(Stream stream);
//这个方法的是为了传递文件的参数而设的
[OperationContract]
void FileNameSetting(string filename, string destinationpath);
}
另外我们还定义了一个传输文件路径的名称的辅助方法:FileNameSetting();
1.2实现服务器方法
在上面定义了公共的接口后,接下来我们就实现接口的方法,主要的方法的目的是为了传输Stream对象,由于Stream是一个抽象类,所以这儿以文件流为操作对象来使用SendStream()这个方法。
public class SendStreamService : ISendStreamService
{
static FileStream outStream = null;
static int startLength;
static int fileLength;
static int maxBytesCount=4096;
static byte[] bytes = new byte[int maxBytesCount];
string filePath;
static string fileName;
public void SendStream(System.IO.Stream stream)
{
string file = filePath + "//" + fileName;
outStream = new FileStream(file, FileMode.OpenOrCreate, FileAccess.Write);
try {
while ((startLength = stream.Read(bytes, 0, int maxBytesCount)) > 0){
outStream.Write(bytes, 0, startLength);
fileLength += startLength;
}
}
catch (Exception e) { }
}
public void FileNameSetting(string filename,string destinationpath)
{
fileName = filename;
filePath = destinationpath;
}
}
1.3客户通过接口调用服务器方法
客户端调用服务器方法至少有三种,这里我们选择工厂方法来实现,System.ServiceMode.Channel.ChannelFactory<T>类是这个信道工厂类,它的方法CreateChannel()可以创建T的实例。
ISendStreamService proxy=new
ChannelFactory<ISendStreamService>(“WSHttpBinding_ISendStreamService”).Create-
Channel();
proxy.FileNameSetting(file.Substring (file.LastIndexOf ("\\")+1), filePath);
proxy.SendStream(inStream);
1.4服务器和客户端的配置信息
配置信息定义了双方通信的终结点、绑定、契约行为及其他的配置如安全,可靠性等。服务器的配置如:
<service behaviorConfiguration="SendStreamServiceBehavior"
name="SendStreamService">
<endpoint address=" http://localhost:5504/WebSite2/ISendStreamService "
binding="wsHttpBinding" bindingConfiguration="MTMOBinding"
contract="ISendStreamService">
</endpoint>
<bindings>
<wsHttpBinding>
<binding name="MTMOBinding" messageEncoding="Mtom">
</wsHttpBinding>
</bindings>
</service>
同样客户端的配置如:
<client>
<endpoint address="http://localhost:5504/WebSite2 ... ot%3B
binding="wsHttpBinding" bindingConfiguration="WSHttpBinding_ISendStreamService"
contract="ServiceReference1.ISendStreamService" name="WSHttpBinding_ISendStreamService">
</endpoint>
</client>
<bindings>
<wsHttpBinding>
<binding name="WSHttpBinding_ISendStreamService"
messageEncoding="Mtom" textEncoding="utf-8" >
</binding>
</wsHttpBinding>
</bindings>
注意:在这种方式下使用同步和异步方法没有明显的差别,后来我在分析了Windows Trace Viewer的消息包,发现在用异步方法时,整个过程只用两个消息来回,这就意味着第一次的SOAP包是在把SOAP消息加上MTOM编码的文件流作为附件一起发送的,在等待文件传输完成后才会返回一个加高消息给方法。也就是说异步方法IAsyncResult Begin*(params parameters,AsyncCallback callback,object state)是在发送第一个SOAP包,并等待服务器接收完第一个包后回应消息包才会返回的。由于在发送文件流时,因为文本字符始终不会超过一个SOAP包而必须等待。所在在这种编码方式下异步调用和同步调用没有差别。
2、 基于同步传输的异步回调模型:
同步传输是指方法在调用过程中一直阻塞到方法调用结束返回结果才会让程序继续向前执行,这种行为比较耗费资源,因为网络访问在等待方法完成的时间内是阻塞的。而且如果远程对象的调用时花费的时间会更长,所以这种时间的浪费让人是不可接受的,这在大文件传输中尤为明显。于是一种让方法的异步调用的机制便产生了。这种方法的内部处理中使用线程池中的一个线程接管这个调用,程序可以获得异步调用的返回信息而继续向前执行。
WCF编程模型中采用了一种让同步传输中使用异步回调的方式来提高应用程序的响应。具体是在每个操作契约中可以选择生成异步方法的调用,具体是在同步方法的前面加上
IAsyncResult Begin…..(params param,AsyncCallback,object state)形式表明这是一个异步调用。并且生成相应的void End……(IAsyncResult state)来返回结果。
2.1定义契约和实现相应的同步方法
这里在服务契约中定义了相应的同步方法,用这个调用FileStream类的同步方法Read()和Write()方法对文件进行读写操作,以实现将文件传输到服务的机器上。这里在服务契约中通过设置属性CallbackContract来实现客户端的回调功能。来其相应的代码如下:
[ServiceContract(CallbackContract = typeof(IUploadCallback))]
public interface IUploadFileService
{
//同步传输的接口
[OperationContract]
void FileUpload(string localFilePath, string netPath);
}
public interface IUploadCallback
{
[OperationContract(IsOneWay = true)]
void ReportFileUpload(int length);//这个回调函数是文件传输完成时发布一个通知
}
//实现文件读写的服务器方法
public class UploadFileService : IUploadFileService
{
static int startLength;
static int fileLength;
static int bytesLength;
static byte[] bytes;
static int maxLength=4096;
static FileStream inStream = null;
static FileStream outStream = null;
IUploadCallback client = null;
public void FileUpload(string localFilePath, string netPath)
{
//获得客户端代理的回调
client = OperationContext.Current.GetCallbackChannel<IUploadCallback>();
//得到原始文件名
string fileName = localFilePath.Substring(localFilePath.LastIndexOf("\\") + 1);
string netFile = netPath + fileName;
bytes = new byte[maxLength];//设置缓冲区
try
{
outStream = new FileStream(netFile, FileMode.OpenOrCreate, FileAccess.Write);
inStream = File.OpenRead(localFilePath);//打开文件读
int length;
while ((length = inStream.Read(bytes, 0, maxLength)) > 0)
{
fileLength += length;
outStream.Write(bytes, 0, length);
}
}
catch (Exception e)
{
Console.WriteLine("文件上传错误:" + e.Message);
inStream.Close();
outStream.Close();
}
finally
{
client.ReportFileUpload(fileLength);//使用回调报告文件的状态
inStream.Close();
outStream.Close();
}
}
2.2在客户端调用方法
在客户端调用BeginFileUpload()和EndFileUpload()方法来实现客户端的异步回调。并在这些方法完成后服务调用客户回调ReportFileUpload()报告给客户端相应的信息。
3、 基于异步传输的异步模型:
在同步方式处理中,文件传输的时间是和文件的长度密切相关的,对于一个大容量的文件传输,如果全部在主线程中执行,那么应用程序可能会等待很长的时间,因此我们给予文件流以异步方法读写的方法来实现性能的改进。这只调用了文件操作的异步处理。第二种模式一样这也是采用线程池来完成的。这实际上是利用了文件流的异步方法。
在这儿我们仍然使用第二种模型的WCF框架,只是我们这儿使用了FileStream对象BeginWrite();BeginRead()方法及相应的EndWrite();EndRead()方法。这儿我们只给出了服务器的方法实现:
public void AsyncFileUpload(string localFilePath, string netPath)
{
//获得客户端代理的回调
client = OperationContext.Current.GetCallbackChannel<IUploadCallback>();
//得到原始文件名
string fileName = localFilePath.Substring(localFilePath.LastIndexOf("\\") + 1);
string netFile = netPath + fileName;
bytes = new byte[maxLength];//设置缓冲区
try{
outStream = new
FileStream(netFile, FileMode.OpenOrCreate, FileAccess.Write);
inStream = File.OpenRead(localFilePath);//打开文件读
inStream.BeginRead(bytes, 0, maxLength, CallbackOnRead, null);
}
catch (Exception e){
inStream.Close();
outStream.Close();
}
}
void CallbackOnRead(IAsyncResult result)
{
int length =inStream.EndRead(result);
if (length >= 0){
fileLength += length;
outStream.BeginWrite(bytes, 0, length, CallbackOnWrite, null);
}
else{
client.ReportFileUpload(fileLength);//使用回调
inStream.Close();
outStream.Close();
}
}
void CallbackOnWrite(IAsyncResult result){
outStream.EndWrite(result);
inStream.BeginRead(bytes, 0, maxLength, CallbackOnRead, null);
}
通过以上的分析, 基于MTOM编码的文件流传输时,可以提高传输性能,而对于后两种方式的前提是必须是普通的文本消息编码才会有效果,才可以提高程序的响应性能。也就是说后两种方式只是一种提高WCF应用程序响应性能的方式,它的传输数据量会有明显的膨胀。具体设计中要看在传输效率和响应性能两者取舍来选取其一而用。 收起阅读 »

MD5鉴定文件是否相同

由于诸多安全因素,需要对网上下载的一些文件进行完整性校验。比如,由于工作需要我下载了一个EMOS_1.5_i386.iso镜像文件(extmail邮件系统),需要对其进行MD5校验。为此,用C# 2005写了一个获取文件MD5码的简单程序。
用MD5进行文件校验,步骤如下:
1) 从文件发布单位那获取原始MD5码;
2) 用程序获取该文件的MD5码;
3) 对比1)和2)的MD5码是否一致;
从上可以看出,根据文件通过程序计算其MD5码是关键,下表所示为C#获取文件MD5码的代码。新建一个windows应用程序,在默认窗体form1中添加:
一个按钮”btnOpenFile”,click事件代码如下;
一个文本框”txtMD5”,显示文件的MD5码;
//选择文件
private void btnOpenFile_Click(object sender, EventArgs e)
{
using (OpenFileDialog dialog = new OpenFileDialog())
{
if (dialog.ShowDialog() == DialogResult.OK)
{
String fileName = dialog.FileName;
this.txtMD5.Text = "";
//this.txtSH1.Text = "";
//
this.txtMD5.Text = getMD5Hash(fileName);
//this.txtSH1.Text = GetMD5Hash(fileName);
}
}
}
//计算文件的MD5码
private string getMD5Hash(string pathName)
{
string strResult = "";
string strHashData = "";
 
byte[] arrbytHashValue;
System.IO.FileStream oFileStream = null;
 
System.Security.Cryptography.MD5CryptoServiceProvider oMD5Hasher =
new System.Security.Cryptography.MD5CryptoServiceProvider();
 
try
{
oFileStream = new System.IO.FileStream(pathName, System.IO.FileMode.Open,
System.IO.FileAccess.Read, System.IO.FileShare.ReadWrite));
arrbytHashValue = oMD5Hasher.ComputeHash(oFileStream);//计算指定Stream 对象的哈希值
oFileStream.Close();
//由以连字符分隔的十六进制对构成的String,其中每一对表示value 中对应的元素;例如“F-2C-4A”
strHashData = System.BitConverter.ToString(arrbytHashValue);
//替换-
strHashData = strHashData.Replace("-", "");
strResult = strHashData;
}
catch (System.Exception ex)
{
MessageBox.Show(ex.Message);
}
 
return strResult;
}
 
本demo的文件EMOS_1.5_i386.iso,其官方MD5代码为c8b4494715166118bd94dd2a39e640c4,程序得到的md5代码为c8b4494715166118bd94dd2a39e640c4。
继续阅读 »
由于诸多安全因素,需要对网上下载的一些文件进行完整性校验。比如,由于工作需要我下载了一个EMOS_1.5_i386.iso镜像文件(extmail邮件系统),需要对其进行MD5校验。为此,用C# 2005写了一个获取文件MD5码的简单程序。
用MD5进行文件校验,步骤如下:
1) 从文件发布单位那获取原始MD5码;
2) 用程序获取该文件的MD5码;
3) 对比1)和2)的MD5码是否一致;
从上可以看出,根据文件通过程序计算其MD5码是关键,下表所示为C#获取文件MD5码的代码。新建一个windows应用程序,在默认窗体form1中添加:
一个按钮”btnOpenFile”,click事件代码如下;
一个文本框”txtMD5”,显示文件的MD5码;
//选择文件
private void btnOpenFile_Click(object sender, EventArgs e)
{
using (OpenFileDialog dialog = new OpenFileDialog())
{
if (dialog.ShowDialog() == DialogResult.OK)
{
String fileName = dialog.FileName;
this.txtMD5.Text = "";
//this.txtSH1.Text = "";
//
this.txtMD5.Text = getMD5Hash(fileName);
//this.txtSH1.Text = GetMD5Hash(fileName);
}
}
}
//计算文件的MD5码
private string getMD5Hash(string pathName)
{
string strResult = "";
string strHashData = "";
 
byte[] arrbytHashValue;
System.IO.FileStream oFileStream = null;
 
System.Security.Cryptography.MD5CryptoServiceProvider oMD5Hasher =
new System.Security.Cryptography.MD5CryptoServiceProvider();
 
try
{
oFileStream = new System.IO.FileStream(pathName, System.IO.FileMode.Open,
System.IO.FileAccess.Read, System.IO.FileShare.ReadWrite));
arrbytHashValue = oMD5Hasher.ComputeHash(oFileStream);//计算指定Stream 对象的哈希值
oFileStream.Close();
//由以连字符分隔的十六进制对构成的String,其中每一对表示value 中对应的元素;例如“F-2C-4A”
strHashData = System.BitConverter.ToString(arrbytHashValue);
//替换-
strHashData = strHashData.Replace("-", "");
strResult = strHashData;
}
catch (System.Exception ex)
{
MessageBox.Show(ex.Message);
}
 
return strResult;
}
 
本demo的文件EMOS_1.5_i386.iso,其官方MD5代码为c8b4494715166118bd94dd2a39e640c4,程序得到的md5代码为c8b4494715166118bd94dd2a39e640c4。 收起阅读 »

修改表的主键

在家里创建表的时候经常会忘掉设置主键,不是一次两次了,每次都是浪费了不少时间才搞定,下面把这两句代码放上来,以示警惕!
  alter table Template alter column Guid uniqueidentifier not null
  alter table Template add constraint PK_template primary key (Guid)
继续阅读 »
在家里创建表的时候经常会忘掉设置主键,不是一次两次了,每次都是浪费了不少时间才搞定,下面把这两句代码放上来,以示警惕!
  alter table Template alter column Guid uniqueidentifier not null
  alter table Template add constraint PK_template primary key (Guid) 收起阅读 »

SqlServer 索引

什么是索引
拿汉语字典的目录页(索引)打比方:正如汉语字典中的汉字按页存放一样,SQL Server中的数据记录也是按页存放的,每页容量一般为4K 。为了加快查找的速度,汉语字(词)典一般都有按拼音、笔画、偏旁部首等排序的目录(索引),我们可以选择按拼音或笔画查找方式,快速查找到需要的字(词)。
同理,SQL Server允许用户在表中创建索引,指定按某列预先排序,从而大大提高查询速度。
• SQL Server中的数据也是按页( 4KB )存放
• 索引:是SQL Server编排数据的内部方法。它为SQL Server提供一种方法来编排查询数据 。
• 索引页:数据库中存储索引的数据页;索引页类似于汉语字(词)典中按拼音或笔画排序的目录页。
• 索引的作用:通过使用索引,可以大大提高数据库的检索速度,改善数据库性能。
 
索引类型
唯一索引:唯一索引不允许两行具有相同的索引值
主键索引:为表定义一个主键将自动创建主键索引,主键索引是唯一索引的特殊类型。主键索引要求主键中的每个值是唯一的,并且不能为空
聚集索引(Clustered):表中各行的物理顺序与键值的逻辑(索引)顺序相同,每个表只能有一个
非聚集索引(Non-clustered):非聚集索引指定表的逻辑顺序。数据存储在一个位置,索引存储在另一个位置,索引中包含指向数据存储位置的指针。可以有多个,小于249个
 
索引类型:再次用汉语字典打比方,希望大家能够明白聚集索引和非聚集索引这两个概念。
 
唯一索引:
唯一索引不允许两行具有相同的索引值。
如果现有数据中存在重复的键值,则大多数数据库都不允许将新创建的唯一索引与表一起保存。当新数据将使表中的键值重复时,数据库也拒绝接受此数据。例如,如果在stuInfo表中的学员员身份证号(stuID) 列上创建了唯一索引,则所有学员的身份证号不能重复。
提示:创建了唯一约束,将自动创建唯一索引。尽管唯一索引有助于找到信息,但为了获得最佳性能,建议使用主键约束或唯一约束。
 
主键索引:
在数据库关系图中为表定义一个主键将自动创建主键索引,主键索引是唯一索引的特殊类型。主键索引要求主键中的每个值是唯一的。当在查询中使用主键索引时,它还允许快速访问数据。
 
聚集索引(clustered index
在聚集索引中,表中各行的物理顺序与键值的逻辑(索引)顺序相同。表只能包含一个聚集索引。例如:汉语字(词)典默认按拼音排序编排字典中的每页页码。拼音字母a,b,c,d……x,y,z就是索引的逻辑顺序,而页码1,2,3……就是物理顺序。默认按拼音排序的字典,其索引顺序和逻辑顺序是一致的。即拼音顺序较后的字(词)对应的页码也较大。如拼音“ha”对应的字(词)页码就比拼音“ba” 对应的字(词)页码靠后。
 
非聚集索引(Non-clustered)
如果不是聚集索引,表中各行的物理顺序与键值的逻辑顺序不匹配。聚集索引比非聚集索引(nonclustered index)有更快的数据访问速度。例如,按笔画排序的索引就是非聚集索引,“1”画的字(词)对应的页码可能比“3”画的字(词)对应的页码大(靠后)。
提示:SQL Server中,一个表只能创建1个聚集索引,多个非聚集索引。设置某列为主键,该列就默认为聚集索引
 
如何创建索引
使用T-SQL语句创建索引的语法:
CREATE [UNIQUE] [CLUSTERED|NONCLUSTERED]
INDEX index_name
ON table_name (column_name…)
[WITH FILLFACTOR=x]
q UNIQUE表示唯一索引,可选
q CLUSTERED、NONCLUSTERED表示聚集索引还是非聚集索引,可选
q FILLFACTOR表示填充因子,指定一个0到100之间的值,该值指示索引页填满的空间所占的百分比
 
在stuMarks表的writtenExam列创建索引:
USE stuDB
GO
IF EXISTS (SELECT name FROM sysindexes
WHERE name = 'IX_writtenExam')
DROP INDEX stuMarks.IX_writtenExam
/*--笔试列创建非聚集索引:填充因子为30%--*/
CREATE NONCLUSTERED INDEX IX_writtenExam
ON stuMarks(writtenExam)
WITH FILLFACTOR= 30
GO
/*-----指定按索引 IX_writtenExam 查询----*/
SELECT * FROM stuMarks (INDEX=IX_writtenExam)
WHERE writtenExam BETWEEN 60 AND 90
虽然我们可以指定SQL Server按哪个索引进行数据查询,但一般不需要我们人工指定。SQL Server将会根据我们创建的索引,自动优化查询 。
 
索引的优缺点
• 优点
– 加快访问速度
– 加强行的唯一性
• 缺点
– 带索引的表在数据库中需要更多的存储空间
– 操纵数据的命令需要更长的处理时间,因为它们需要对索引进行更新
 
创建索引的指导原则
• 请按照下列标准选择建立索引的列。
– 该列用于频繁搜索
– 该列用于对数据进行排序
• 请不要使用下面的列创建索引:
– 列中仅包含几个不同的值。
– 表中仅包含几行。为小型表创建索引可能不太划算,因为SQL Server在索引中搜索数据所花的时间比在表中逐行搜索所花的时间更长
继续阅读 »
什么是索引
拿汉语字典的目录页(索引)打比方:正如汉语字典中的汉字按页存放一样,SQL Server中的数据记录也是按页存放的,每页容量一般为4K 。为了加快查找的速度,汉语字(词)典一般都有按拼音、笔画、偏旁部首等排序的目录(索引),我们可以选择按拼音或笔画查找方式,快速查找到需要的字(词)。
同理,SQL Server允许用户在表中创建索引,指定按某列预先排序,从而大大提高查询速度。
• SQL Server中的数据也是按页( 4KB )存放
• 索引:是SQL Server编排数据的内部方法。它为SQL Server提供一种方法来编排查询数据 。
• 索引页:数据库中存储索引的数据页;索引页类似于汉语字(词)典中按拼音或笔画排序的目录页。
• 索引的作用:通过使用索引,可以大大提高数据库的检索速度,改善数据库性能。
 
索引类型
唯一索引:唯一索引不允许两行具有相同的索引值
主键索引:为表定义一个主键将自动创建主键索引,主键索引是唯一索引的特殊类型。主键索引要求主键中的每个值是唯一的,并且不能为空
聚集索引(Clustered):表中各行的物理顺序与键值的逻辑(索引)顺序相同,每个表只能有一个
非聚集索引(Non-clustered):非聚集索引指定表的逻辑顺序。数据存储在一个位置,索引存储在另一个位置,索引中包含指向数据存储位置的指针。可以有多个,小于249个
 
索引类型:再次用汉语字典打比方,希望大家能够明白聚集索引和非聚集索引这两个概念。
 
唯一索引:
唯一索引不允许两行具有相同的索引值。
如果现有数据中存在重复的键值,则大多数数据库都不允许将新创建的唯一索引与表一起保存。当新数据将使表中的键值重复时,数据库也拒绝接受此数据。例如,如果在stuInfo表中的学员员身份证号(stuID) 列上创建了唯一索引,则所有学员的身份证号不能重复。
提示:创建了唯一约束,将自动创建唯一索引。尽管唯一索引有助于找到信息,但为了获得最佳性能,建议使用主键约束或唯一约束。
 
主键索引:
在数据库关系图中为表定义一个主键将自动创建主键索引,主键索引是唯一索引的特殊类型。主键索引要求主键中的每个值是唯一的。当在查询中使用主键索引时,它还允许快速访问数据。
 
聚集索引(clustered index
在聚集索引中,表中各行的物理顺序与键值的逻辑(索引)顺序相同。表只能包含一个聚集索引。例如:汉语字(词)典默认按拼音排序编排字典中的每页页码。拼音字母a,b,c,d……x,y,z就是索引的逻辑顺序,而页码1,2,3……就是物理顺序。默认按拼音排序的字典,其索引顺序和逻辑顺序是一致的。即拼音顺序较后的字(词)对应的页码也较大。如拼音“ha”对应的字(词)页码就比拼音“ba” 对应的字(词)页码靠后。
 
非聚集索引(Non-clustered)
如果不是聚集索引,表中各行的物理顺序与键值的逻辑顺序不匹配。聚集索引比非聚集索引(nonclustered index)有更快的数据访问速度。例如,按笔画排序的索引就是非聚集索引,“1”画的字(词)对应的页码可能比“3”画的字(词)对应的页码大(靠后)。
提示:SQL Server中,一个表只能创建1个聚集索引,多个非聚集索引。设置某列为主键,该列就默认为聚集索引
 
如何创建索引
使用T-SQL语句创建索引的语法:
CREATE [UNIQUE] [CLUSTERED|NONCLUSTERED]
INDEX index_name
ON table_name (column_name…)
[WITH FILLFACTOR=x]
q UNIQUE表示唯一索引,可选
q CLUSTERED、NONCLUSTERED表示聚集索引还是非聚集索引,可选
q FILLFACTOR表示填充因子,指定一个0到100之间的值,该值指示索引页填满的空间所占的百分比
 
在stuMarks表的writtenExam列创建索引:
USE stuDB
GO
IF EXISTS (SELECT name FROM sysindexes
WHERE name = 'IX_writtenExam')
DROP INDEX stuMarks.IX_writtenExam
/*--笔试列创建非聚集索引:填充因子为30%--*/
CREATE NONCLUSTERED INDEX IX_writtenExam
ON stuMarks(writtenExam)
WITH FILLFACTOR= 30
GO
/*-----指定按索引 IX_writtenExam 查询----*/
SELECT * FROM stuMarks (INDEX=IX_writtenExam)
WHERE writtenExam BETWEEN 60 AND 90
虽然我们可以指定SQL Server按哪个索引进行数据查询,但一般不需要我们人工指定。SQL Server将会根据我们创建的索引,自动优化查询 。
 
索引的优缺点
• 优点
– 加快访问速度
– 加强行的唯一性
• 缺点
– 带索引的表在数据库中需要更多的存储空间
– 操纵数据的命令需要更长的处理时间,因为它们需要对索引进行更新
 
创建索引的指导原则
• 请按照下列标准选择建立索引的列。
– 该列用于频繁搜索
– 该列用于对数据进行排序
• 请不要使用下面的列创建索引:
– 列中仅包含几个不同的值。
– 表中仅包含几行。为小型表创建索引可能不太划算,因为SQL Server在索引中搜索数据所花的时间比在表中逐行搜索所花的时间更长 收起阅读 »