spark集群中运行jar包的问题

用java写了远程执行shell命令的程序,在spark集群上用spark-submit方式执行jar包。但是,jar包执行完成后返回值为空,因此java程序这边一直以为是执行错误了,请问如何获得jar包的运行结果?因为配置了spark集群和hadoop集群,还配置了日志并存放到hdfs上,后来查看日志,发现println的日志都在里面,而且因为是集群运行的,日志语句在slave节点中的一个日志上。
远程执行的shell语句如下:
/usr/local/spark-2.2.0-bin-hadoop2.7/bin/spark-submit --class MLexamples.PredRFR --master yarn --deploy-mode cluster --driver-memory 1G --executor-memory 1G --executor-cores 1  /usr/local/spark-2.2.0-bin-hadoop2.7/execSparkJar/model.jar

jane3von

赞同来自: 小象老师

网上找了下,使用下面这个方法可以获取返回值!因为我的需求是阻塞式的,就是一定要执行完成这个jar包之后才能执行别的代码,因此,还是用下面这种方法实现!   import java.io.BufferedReader;   import java.io.IOException;   import java.io.InputStream;   import java.io.InputStreamReader;   import java.io.UnsupportedEncodingException;   import org.apache.commons.lang.StringUtils;   import org.slf4j.Logger; import org.slf4j.LoggerFactory; import ch.ethz.ssh2.ChannelCondition; import ch.ethz.ssh2.Connection;   import ch.ethz.ssh2.Session;   import ch.ethz.ssh2.StreamGobbler;    public String execShell_new(String cmd){          String result="";            try {               if(login()){                    Session sess= conn.openSession();//打开一个会话                    sess.execCommand(cmd);//执行命令                    InputStream stdout = sess.getStdout();                    InputStream stderr = sess.getStderr();                                                      byte[] buffer = new byte[100];                                    while (true) {                        if ((stdout.available() == 0)) {                          int conditions = sess.waitForCondition(ChannelCondition.STDOUT_DATA |                                 ChannelCondition.STDERR_DATA | ChannelCondition.EOF, 1000*5);                            if ((conditions & ChannelCondition.TIMEOUT) != 0) {                                logger.debug("execShell_new: time out break !");                                break;//超时后退出循环,要保证超时时间内,脚本可以运行完成                            }                          if ((conditions & ChannelCondition.EOF) != 0) {                            if ((conditions & (ChannelCondition.STDOUT_DATA |                                     ChannelCondition.STDERR_DATA)) == 0) {                                    logger.debug("execShell_new: break !");                                    break;                                }                            }                        }                      while (stdout.available() > 0) {                        int len = stdout.read(buffer);                        if (len > 0){                                //System.err.write(buffer, 0, len);                                result += new String(buffer, 0, len);                                logger.debug("execShell_new: result={}",result);                          }                      }                                                        while (stderr.available() > 0) {                            int len = stderr.read(buffer);                            if (len > 0){                                //System.err.write(buffer, 0, len);                                result += new String(buffer, 0, len);                                logger.debug("execShell_new: result={}",result);                          }                        }                    }                                      logger.debug("远程执行shll脚本或者命令的结果为:result={}",result);              }            } catch (IOException e) {                e.printStackTrace();                logger.error("远程执行shll脚本或者命令 异常!msg={}",e);          }            return result;                }

要回复问题请先登录注册