flume+kafka+storm+mysql 数据流

天终于将 flume + kafka + storm + mysql 这条数据流走通了,虽然只是一个简单的测试例子,但是依据这条数据流可以做的事情很多。
先简单看一下这几个工具的架构吧,架构图会更好说明:
flume的架构图:


kafka的架构图:



storm的架构图:


我们使用的  flume + kafka + storm +mysql的数据流架构图:

下面介绍一下kafka到storm的配置:
其实这些都是通过java代码实现的,这里用到了 KafkaSpout类,RDBMSDumperBolt类(以后这些可以作为工具类打包上传到集群中)
storm作业中,我们写了一个KafkaStormRdbms类,作业具体配置如下:
首先设置连接mysql的参数:
[java] view plaincopyprint?
  • ArrayList<String> columnNames = new ArrayList<String>();  
  • ArrayList<String> columnTypes = new ArrayList<String>();  
  • String tableName = "stormTestTable_01";  
  • // Note: if the rdbms table need not to have a primary key, set the variable 'primaryKey' to 'N/A'  
  • // else set its value to the name of the tuple field which is to be treated as primary key  
  • String primaryKey = "N/A";  
  • String rdbmsUrl = "jdbc:mysql://$hostname:3306/fuqingwuDB" ;  
  • String rdbmsUserName = "fuqingwu";  
  • String rdbmsPassword = "password";  
  •   
  • //add the column names and the respective types in the two arraylists  
  • columnNames.add("word");  
  •   
  • //add the types  
  • columnTypes.add("varchar (100)");  



配置 KafkaSpout 及 Topology:[java] view plaincopyprint?
  • TopologyBuilder builder = new TopologyBuilder();  
  •          
  •         List<String> hosts = new ArrayList<String>();  
  •         hosts.add("hadoop01");  
  •         SpoutConfig spoutConf = SpoutConfig.fromHostStrings(hosts, 1, "flume_kafka", "/root", "id");  
  •         spoutConf.scheme = new StringScheme();  
  •         spoutConf.forceStartOffsetTime(-2);  
  •          
  •         spoutConf.zkServers = new ArrayList<String>() {{  
  •                       add("hadoop01");   
  •                     }};  
  •         spoutConf.zkPort = 2181;  
  •          
  •         //set the spout for the topology  
  •         builder.setSpout("spout",  new KafkaSpout(spoutConf), 1);  
  •   
  •         //dump the stream data into rdbms table      
  •         RDBMSDumperBolt dumperBolt = new RDBMSDumperBolt(primaryKey, tableName, columnNames, columnTypes, rdbmsUrl, rdbmsUserName, rdbmsPassword);  
  •         builder.setBolt("dumperBolt",dumperBolt, 1).shuffleGrouping("spout");  

0 个评论

要回复文章请先登录注册