kafka集群运行正常,可以传递消息,storm访问kafka连接拒绝

3台机器,kafka在2台机器间可以运行,topology访问kafka报错Connection refused,
同样的端口,topic,节点名2种执行2种结果:
1.在topology中部分配置如下
props.put("metadata.broker.list", "node1:9092");
String topicName = "mylog";
2.在linux环境下执行/usr/local/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --sync --topic mylog 可以正常发消息,可查看

fish - Hadooper

赞同来自: devinou

metadata.broker.list这个配置名是storm可以识别的么? Storm中的kafkaspout,实际是kafka的consumer,一般来说,kafka consumer从zookeeper获取broker list的信息比较靠谱,因为,broker有可能会在执行中发生变化。   差不多类似这样行么?
BrokerHosts brokerHosts = new ZkHosts("zk1:2181,zk2:2181");

String topic = "xxx";  
String zkRoot = "yyy";  
String spoutId = "mySpout";  
  
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);  
spoutConfig.scheme = new SchemeAsMultiScheme(new UserDefinedScheme());  
  
TopologyBuilder builder = new TopologyBuilder();  
builder.setSpout("spout", new KafkaSpout(spoutConfig), spoutNum); 

wemike

赞同来自:

我2的代码差不多,我看了一下报错信息,跟了一下代码,应该是用下面的 props.put("metadata.broker.list", "node1:9092");信息连接报的错
    TopologyBuilder builder = new TopologyBuilder();

    // config kafka spout
    String topicName = "mylog";
    BrokerHosts hosts = new ZkHosts("node1:2181,node2:2181,node3:2181");
    SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    // set kafka spout
    builder.setSpout("kafka_spout", kafkaSpout, 4);

    // set bolt
    builder.setBolt("filter", new FilterBolt(), 8).shuffleGrouping("kafka_spout");
    //builder.setBolt("filter", new FilterShellBolt(), 8).shuffleGrouping("kafka_spout");

    // set kafka bolt
    KafkaBolt kafka_bolt = new KafkaBolt()
                .withTopicSelector(new DefaultTopicSelector(topicName + "_ERROR"))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
    builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter");

    Config conf = new Config();

    //set producer properties.
    Properties props = new Properties();
    props.put("metadata.broker.list", "node1:9092");
    props.put("request.required.acks", "1");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);

    conf.setNumWorkers(4);
    StormSubmitter.submitTopologyWithProgressBar("logfilter", conf, builder.createTopology());
  

wemike

赞同来自:

晕,傻了,我可以直接用本地调试的,结贴吧

要回复问题请先登录注册