gobblin使用中碰到问题了,请老师看一下

目的:将kafka中的topic search-query导入到hdfs上

gobblin-env.sh配置如下:

GOBBLIN_WORK_DIR=/tmp/gobblin/work_dir
HADOOP_HOME=/etc/hadoop/conf
HADOOP_BIN_DIR=/usr/bin/

 

kafka-search-query.pull配置如下:

job.name=PullFromKafka
job.group=Kafka
job.description=Kafka Extractor for Gobblin
job.lock.enabled=false

source.class=gobblin.source.extractor.extract.kafka.KafkaAvroSource

extract.namespace=gobblin.extract.kafka

writer.destination.type=HDFS
writer.output.format=AVRO
writer.fs.uri=hdfs://SU-1:8020/

data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher

topic.whitelist=search-query
bootstrap.with.offset=earliest

kafka.brokers=SU-2:9092,SU-3:9092,SU-4:9092

writer.partition.level=hourly
writer.partition.pattern=YYYY/MM/dd/HH
writer.builder.class=gobblin.writer.AvroTimePartitionedWriterBuilder
writer.file.path.type=tablename
writer.partition.column.name=header.time


writer.staging.dir=${env:GOBBLIN_WORK_DIR}/task-staging

extract.limit.enabled=true
extract.limit.type=time
extract.limit.time.limit=15
extract.limit.time.limit.timeunit=minutes 

运行方式:

sudo -u hdfs ./bin/gobblin-mapreduce.sh --conf conf/kafka-search-query.pull --workdir /tmp/gobblin/work_dir

报如下错误:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/git_project/gobblin-dist/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
WARN [KafkaSource] Previous offset for partition search-query:0 does not exist. This partition will start from the earliest offset: 0
WARN [KafkaSource] Previous offset for partition search-query:1 does not exist. This partition will start from the earliest offset: 0
WARN [KafkaSource] Avg event size for partition search-query:0 not available, using default size 1024
WARN [KafkaSource] Avg event size for partition search-query:1 not available, using default size 1024
WARN [UserGroupInformation] PriviledgedActionException as:hdfs (auth:SIMPLE) cause:org.apache.hadoop.fs.UnsupportedFileSystemException: No AbstractFileSystem for scheme: null
WARN [UserGroupInformation] PriviledgedActionException as:hdfs (auth:SIMPLE) cause:org.apache.hadoop.fs.UnsupportedFileSystemException: No AbstractFileSystem for scheme: null
ERROR [AbstractJobLauncher] Failed to launch and run job job_PullFromKafka_1439536783133: org.apache.hadoop.fs.UnsupportedFileSystemException: No AbstractFileSystem for scheme: null
org.apache.hadoop.fs.UnsupportedFileSystemException: No AbstractFileSystem for scheme: null
        at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:152)
        at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:240)
        at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:332)
        at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:329)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:329)
        at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:443)
        at org.apache.hadoop.mapred.YARNRunner.createApplicationSubmissionContext(YARNRunner.java:360)
        at org.apache.hadoop.mapred.YARNRunner.submitJob(YARNRunner.java:285)
        at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:432)
        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
        at gobblin.runtime.mapreduce.MRJobLauncher.runWorkUnits(MRJobLauncher.java:200)
        at gobblin.runtime.AbstractJobLauncher.launchJob(AbstractJobLauncher.java:258)
        at gobblin.runtime.mapreduce.CliMRJobLauncher.run(CliMRJobLauncher.java:60)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at gobblin.runtime.mapreduce.CliMRJobLauncher.main(CliMRJobLauncher.java:133)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
Exception in thread "main" java.lang.IllegalArgumentException: Missing required property writer.staging.dir
        at com.google.common.base.Preconditions.checkArgument(Preconditions.java:93)
        at gobblin.util.WriterUtils.getWriterStagingDir(WriterUtils.java:55)
        at gobblin.util.JobLauncherUtils.cleanStagingData(JobLauncherUtils.java:172)
        at gobblin.runtime.AbstractJobLauncher.cleanupStagingData(AbstractJobLauncher.java:460)
        at gobblin.runtime.AbstractJobLauncher.launchJob(AbstractJobLauncher.java:285)
        at gobblin.runtime.mapreduce.CliMRJobLauncher.run(CliMRJobLauncher.java:60)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at gobblin.runtime.mapreduce.CliMRJobLauncher.main(CliMRJobLauncher.java:133)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

   

 

writer.staging.dir我已将如下配置了:

writer.staging.dir=${env:GOBBLIN_WORK_DIR}/task-staging

 

请老师帮我看下吧

存在 - 一句话介绍

赞同来自:

我想debuggobblin,查看到gobblin-mapreduce.sh启动命令如下: $HADOOP_BIN_DIR/hadoop jar \         $FWDIR_LIB/gobblin-runtime.jar \         gobblin.runtime.mapreduce.CliMRJobLauncher \         -D mapreduce.user.classpath.first=true \         -D mapreduce.job.user.classpath.first=true \         $JT_COMMAND \         $FS_COMMAND \         -libjars $LIBJARS \         -sysconfig $GOBBLIN_CONFIG_FILE \         -jobconfig $JOB_CONFIG_FILE   像 JDWP:-agentlib:jdwp=transport=dt_socket.address=8004,server=y,supend=y,我怎么加?

zp0824 - 好好学习,天天向上

赞同来自:

直接加到$HADOOP_BIN_DIR/hadoop脚本里面,足够debug使用。

存在 - 一句话介绍

赞同来自:

在/usr/lib/hadoop/bin/hadoop 我如下加了: HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender} -agentlib:jdwp=transport=dt_socket.address=8004,server=y,supend=n" 启动程序后,报个错,如下: [root@SU-4 bin]# sudo -u hdfs ./gobblin-mapreduce.sh --conf kafka-search-query.pull --workdir /tmp/gobblin/work_dir ERROR: JDWP option syntax error: -agentlib:jdwp=transport=dt_socket.address=8004,server=y,supend=n

zp0824 - 好好学习,天天向上

赞同来自:

-agentlib:jdwp=transport=dt_socket,address=8004,server=y,supend=n,注意dt_socket后面是逗号不是点

存在 - 一句话介绍

赞同来自:

# Always respect HADOOP_OPTS and HADOOP_CLIENT_OPTS HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS -agentlib:jdwp=transport=dt_socket,address=8004,server=y,supend=n" [root@SU-4 bin]# sudo -u hdfs ./gobblin-mapreduce.sh --conf kafka-search-query.pull --workdir /tmp/gobblin/work_dir ERROR: JDWP option syntax error: -agentlib:jdwp=transport=dt_socket,address=8004,server=y,supend=n

存在 - 一句话介绍

赞同来自:

老师,您可以在我机器上试试

存在 - 一句话介绍

赞同来自:

我把机器地址和密码,qq发给您

zp0824 - 好好学习,天天向上

赞同来自:

晚上回去再帮你看一下,现在登录不了机器,看起来你这个参数writer.staging.dir没有传进去,你可以简单跟踪一下。JDWP:-agentlib:jdwp=transport=dt_socket.address=8004,server=y,supend=y 这个添加的位置,可以简单浏览下$HADOOP_BIN_DIR/hadoop文件

存在 - 一句话介绍

赞同来自:

在/usr/lib/hadoop/bin/hadoop 我如下加了: HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender} -agentlib:jdwp=transport=dt_socket.address=8004,server=y,supend=n" 启动程序后,报个错,如下: [root@SU-4 bin]# sudo -u hdfs ./gobblin-mapreduce.sh --conf kafka-search-query.pull --workdir /tmp/gobblin/work_dir ERROR: JDWP option syntax error: -agentlib:jdwp=transport=dt_socket.address=8004,server=y,supend=n

zp0824 - 好好学习,天天向上

赞同来自:

ERROR: JDWP option syntax error: -agentlib:jdwp=transport=dt_socket.address=8004,server=y,supend=n 这个里面suspend少了一个字幕s。

zp0824 - 好好学习,天天向上

赞同来自:

调试了下脚本,感觉程序没有加载kafka-search-query.pull文件,需要分析下源代码了,你可以使用jdwp先调试一下

存在 - 一句话介绍

赞同来自:

OK

zp0824 - 好好学习,天天向上

赞同来自:

camus现在已经被Gobblin(https://github.com/linkedin/gobblin)项目取代,将来camus应该不会做更多的更新。

要回复问题请先登录注册