Java|lang包源码解读之ProcessBuilder

ProcessBuilder代码注释解读 ?在正式解读ProcessBuilder源码之前,首先来解读一下类中的源码注释。

/** * This class is used to create operating system processes. * * 【Java|lang包源码解读之ProcessBuilder】Each {@code ProcessBuilder} instance manages a collection * of process attributes.The {@link #start()} method creates a new * {@link Process} instance with those attributes.The {@link * #start()} method can be invoked repeatedly from the same instance * to create new subprocesses with identical or related attributes. * * Each process builder manages these process attributes: * *
    * *
  • a command, a list of strings which signifies the * external program file to be invoked and its arguments, if any. * Which string lists represent a valid operating system command is * system-dependent.For example, it is common for each conceptual * argument to be an element in this list, but there are operating * systems where programs are expected to tokenize command line * strings themselves - on such a system a Java implementation might * require commands to contain exactly two elements. * *
  • an environment, which is a system-dependent mapping from * variables to values.The initial value is a copy of * the environment of the current process (see {@link System#getenv()}). * *
  • a working directory.The default value is the current * working directory of the current process, usually the directory * named by the system property {@code user.dir}. * *
  • name="redirect-input">a source of standard input. * By default, the subprocess reads input from a pipe.Java code * can access this pipe via the output stream returned by * {@link Process#getOutputStream()}.However, standard input may * be redirected to another source using * {@link #redirectInput(Redirect) redirectInput}. * In this case, {@link Process#getOutputStream()} will return a * null output stream, for which: * *
      *
    • the {@link OutputStream#write(int) write} methods always * throw {@code IOException} *
    • the {@link OutputStream#close() close} method does nothing *
    * *
  • name="redirect-output">a destination for standard output * and standard error.By default, the subprocess writes standard * output and standard error to pipes.Java code can access these pipes * via the input streams returned by {@link Process#getInputStream()} and * {@link Process#getErrorStream()}.However, standard output and * standard error may be redirected to other destinations using * {@link #redirectOutput(Redirect) redirectOutput} and * {@link #redirectError(Redirect) redirectError}. * In this case, {@link Process#getInputStream()} and/or * {@link Process#getErrorStream()} will return a null input * stream, for which: * *
      *
    • the {@link InputStream#read() read} methods always return * {@code -1} *
    • the {@link InputStream#available() available} method always returns * {@code 0} *
    • the {@link InputStream#close() close} method does nothing *
    * *
  • a redirectErrorStream property.Initially, this property * is {@code false}, meaning that the standard output and error * output of a subprocess are sent to two separate streams, which can * be accessed using the {@link Process#getInputStream()} and {@link * Process#getErrorStream()} methods. * * If the value is set to {@code true}, then: * *
      *
    • standard error is merged with the standard output and always sent * to the same destination (this makes it easier to correlate error * messages with the corresponding output) *
    • the common destination of standard error and standard output can be * redirected using * {@link #redirectOutput(Redirect) redirectOutput} *
    • any redirection set by the * {@link #redirectError(Redirect) redirectError} * method is ignored when creating a subprocess *
    • the stream returned from {@link Process#getErrorStream()} will * always be a "#redirect-output">null input stream *
    * *
* * Modifying a process builder's attributes will affect processes * subsequently started by that object's {@link #start()} method, but * will never affect previously started processes or the Java process * itself. * * Most error checking is performed by the {@link #start()} method. * It is possible to modify the state of an object so that {@link * #start()} will fail.For example, setting the command attribute to * an empty list will not throw an exception unless {@link #start()} * is invoked. * * Note that this class is not synchronized. * If multiple threads access a {@code ProcessBuilder} instance * concurrently, and at least one of the threads modifies one of the * attributes structurally, it must be synchronized externally. */

?注释中关于ProcessBuilder的核心信息解释如下:
- ProcessBuilder用来创建一个操作系统进程。ProcessBuilder实例管理着操作系统属性集。ProcessBuilder中的start()方法利用这些属性集创建一个新的Process的实例。ProcessBuilder的start()方法能够被相同的ProcessBuilder实例快速地调用来创建具有相同或者相关操作系统属性的子进程。每个process builder管理着这些进程属性。
  • 操作系统命令,由字符串列表构成,这些列表指明了能被调用的外部程序文件,如果有参数的话,还会指明参数信息。这些表达有效的操作系统命令字符串列表是与系统相关的。例如,很常见的是每个概念性的参数是这个列表中的一个元素,但是在操作系统中,程序是被期望用来标记命令行字符串本身。在这样的系统中,一个用Java实现的程序可能需要命令来精确地包含两个元素。
  • ProcessBuilder中的environment属性是与系统相关的环境变量值映射。它的初始值是当前进程中的environment属性的拷贝值。
    ProcessBuilder中的working directory属性,它的默认值是当前进程的当前工作目录。通常这个目录用系统属性user.dir命名。
  • redirect-input表示标准输入的源头。默认情况下,子进程从管道中读取这个输入。Java程序能够通过Process.getOutputStream()这个方法返回的输出流访问这个管道。然而,标准的输入能够通过redirectInput(Redirect) redirectInput方法被重定向到其它源头。在这种情况下, Process.getOutputStream()方法将返回一个空的输出流。
  • redirect-output表示标准输出和错误输出的目的地。默认情况下,子进程往管道中写标准输出和错误输出。Java程序能够通过 Process.getInputStream()和Process.getErrorStream()这个方法返回的输入流访问这个管道。然而,标准输出和错误输出能够通过redirectOutput(Redirect) redirectOutput和redirectError(Redirect) redirectError方法被重定向到其它目的地。在这种情况下, Process.getInputStream()和Process.getErrorStream()方法将返回一个空的输入流。
  • ProcessBuilder中的redirectErrorStream属性,这个属性的默认值是false,意味着子进程的标准输出和错误输出是被发送到两个分开的流中。这两个流能通过Process#getInputStream()和Process#getErrorStream()方法访问到。如果redirectErrorStream属性的值被设为true,标准错误输出流将会合并到标准输出流中。并且被发送到同一个地方。这使得更容易收集错误信息通过相关的输出流。这个被发送的地方可以用redirectOutput()方法进行重定向。redirectError方法被忽略当创建一个子进程的时候。Process#getErrorStream()方法的返回的流将会是一个空的输入流。
  • 修改一个process builder的属性值会影响它的start()方法启动的进程。但是不会影响之前启动的进程或者java进程。
  • 大部分的错误检查机制都是通过start()方法来执行的。或许可以通过修改object的状态来使得start()方法执行失败。例如,将命令熟悉设为空列表,在调用start()方法的时候,将会抛出异常。
  • 非常需要注意的是,ProcessBuilder不是线程同步的。当多线程并发访问ProcessBuilder的时候,如果至少有一个线程在修改ProcessBuilder中的某个属性的时候,这个操作必须被同步。
ProcessBuilder源码解读 ?这个代码注释可以说已经提供了很多关于ProcessBuilder的核心信息。首先看一下ProcessBuilder的类信息如下:
public final class ProcessBuilder { private List command; //字符串组成的操作系统命令集 private File directory; private Map environment; private boolean redirectErrorStream; private Redirect[] redirects; ........ }

?这是个常量类,有以上四个属性,主要你关注有个command列表,这个是传入的命令字符串集。然后对于command有三个重载的处理command()函数如下:
public ProcessBuilder command(List command) { if (command == null) throw new NullPointerException(); this.command = command; return this; } public ProcessBuilder command(String... command) { this.command = new ArrayList<>(command.length); for (String arg : command) this.command.add(arg); return this; }public List command() { return command; }

?这三个重构函数其实就是对List command属性的处理;重点关注的是ProcessBuilder中的 start()方法,它会开启一个进程通过使用 process builder的属性集。开启的进程会调用command命令列表和相关参数。这个函数会检测command的正确性以及做系统安全性检测。
?源码如下:
//首先它的返回值是一个Process实例 public Process start() throws IOException { //首先会对命令字符串列表做参数的正确性校验以及系统安全性校验 String[] cmdarray = command.toArray(new String[command.size()]); cmdarray = cmdarray.clone(); for (String arg : cmdarray) if (arg == null) throw new NullPointerException(); String prog = cmdarray[0]; //系统安全性检测 SecurityManager security = System.getSecurityManager(); if (security != null) { security.checkExec(prog); } //对工作目录做空指针判断处理 String dir = directory == null ? null : directory.toString(); for (int i = 1; i < cmdarray.length; i++) { if (cmdarray[i].indexOf('\u0000') >= 0) { throw new IOException("invalid null character in command"); } } try { //最后掉用ProcessImpl.start()方法返回一个Process实例 return ProcessImpl.start(cmdarray, environment, dir, redirects, redirectErrorStream); } catch (IOException | IllegalArgumentException e) { String exceptionInfo = ": " + e.getMessage(); Throwable cause = e; if ((e instanceof IOException) && security != null) { try { security.checkRead(prog); } catch (AccessControlException ace) { exceptionInfo = ""; cause = ace; } } throw new IOException( "Cannot run program \"" + prog + "\"" + (dir == null ? "" : " (in directory \"" + dir + "\")") + exceptionInfo, cause); } }

?如上面的代码注释所写,首先看一下start() 返回值Process类,这是一个抽象类,ProcessBuilder的start()方法以及Runtime.exec(String[],String[],File) Runtime.exec()方法会返回一个Process的子类的一个实例。这个实例可以用来控制进程以及获得进程的信息。它提供了六个有关进程操作的方法如下所示。Process类源码如下:
package java.lang; import java.io.*; public abstract class Process {//返回连接子进程正常输入的输出流 abstract public OutputStream getOutputStream(); //返回连接子进程输出的输入流 abstract public InputStream getInputStream(); //返回连接子进程异常输出的输入流 abstract public InputStream getErrorStream(); //促使当前线程等待,直至只当进程已经结束。子进程结束时函数立即返回 abstract public int waitFor() throws InterruptedException; //返回子进程结束时候的退出值 abstract public int exitValue(); //杀死子进程 abstract public void destroy(); }

?我们在回到ProcessBuilder的start()方法最后的返回值:
return ProcessImpl.start(cmdarray, environment, dir, redirects, redirectErrorStream);

?它调用了ProcessImpl的start()方法。看一下ProcessImpl,从类名上看他是Process的实现,再看看ProcessImpl的start()方法实现:
final class ProcessImpl extends Process {...... static Process start(String cmdarray[], java.util.Map environment, String dir, ProcessBuilder.Redirect[] redirects, boolean redirectErrorStream) throws IOException {....... //忽略前面的一些有关输入输出流操作信息,它会调用自己的构造函数,返回一个ProcessImpl实例 return new ProcessImpl(cmdarray, envblock, dir, stdHandles, redirectErrorStream); } finally {try { if (f0 != null) f0.close(); } finally { try { if (f1 != null) f1.close(); } finally { if (f2 != null) f2.close(); } } }} }

?关于ProcessImpl构造函数实现,略复杂,暂时不做分析。至此ProcessBuilder类方法调用过程已经完成解读。了解了这些过程以及原理,对于实际生产环境使用ProcessBuilder基本上已经满足需求。下面我们来分享一个最近在开发修改阿里巴巴开源Zeus过程中的一次ProcessBuilder使用经历,可谓是一次刻骨铭心的使用经历,一个让我纠结了整整了几天的bug.
ProcessBuilder实战 ?关于开源Zeus是什么,前面的博客中已经做过介绍。开源Zeus中一直存在这样的一个bug,Zeus中杀死脚本任务的时候,并不能完全杀死任务产生的子进程,我们跟踪Zeus源码中杀死任务ProcessJob的源码一下:
/** * 通过操作系统创建进程Process的Job任务 * @author zhoufang * */ public abstract class ProcessJob extends AbstractJob implements Job {.......public void cancel(){ try { new CancelHadoopJob(jobContext).run(); } catch (Exception e1) { log(e1); } //强制kill 进程 if (process != null) { log("WARN Attempting to kill the process "); try { process.destroy(); int pid=getProcessId(); Runtime.getRuntime().exec("sudo kill -9 "+"-"+ pid); } catch (Exception e) { log(e); } finally{ process=null; } } } ....... }

源码中通过java调用shell命令 sudo kill -9 pid命令来杀死任务产生的进程,了解Linux相关命令的人都会知道,这命令并不能杀死进程树中的所有进程,也就是进程id为pid下的子进程并不能被杀死。通过下面的复杂的shell命令
sudo sh -c "cd; pstree pid -p | grep -o '([0-9]*)' | awk -F'[()]' '{print \$2}' | xargs kill -9"

来杀死任务进程及其子进程,本以为做完shell脚本的替换就能很快完成任务,由于Zeus打包上服务器测试很麻烦,于是我单独在服务器上写了Java程序来调用这个Shell脚本,原先的测试代码如下:
package base; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; public class LinuxCommandTest {public static void main(String[] args) throws IOException {Process process = null; StringBuffer command = new StringBuffer(); //args[0]传入进程pid String commands = "sudo sh -c \"cd; pstree " + args[0] +" -p | grep -o \"([0-9]*)\" | awk -F\"[()]\" '{print $2}' | xargs kill -9"; System.out.println(command); ProcessBuilder processBuilder = new ProcessBuilder(commands); try { process = processBuilder.start(); } catch (IOException e) { e.printStackTrace(); } String result = null; String errorresult = null; InputStream in = process.getInputStream(); //得到命令执行的流 BufferedReader br = new BufferedReader(new InputStreamReader(in)); InputStream error = process.getErrorStream(); //得到命令执行的错误流 BufferedReader errorbr = new BufferedReader(new InputStreamReader(error)); String lineStr; while ((lineStr = br.readLine()) != null) { result = lineStr; } br.close(); in.close(); System.out.println("==============================result" + result); while ((lineStr = errorbr.readLine()) != null) { errorresult = lineStr; } errorbr.close(); error.close(); System.out.println("==============================errorresult" + errorresult); try { final int status = process.waitFor(); //阻塞,直到上述命令执行完 } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("执行结束"); }} }

?代码上传到服务器,同时起一个sqoop导数据脚本,因为sqoop到数据脚本任务会产生复杂的java进程树,很适合做进程树测试的例子。服务器上编译执行执行LinuxCommandTest类,发现传入sqoop进程ID,程序会报出异常,而用shell原始命令执行,进程树会被杀干净。这就很奇怪了,难道是Java程序无法调用shell吗,我对人生产生了怀疑。。。。代码执行操作如下截图1:Java|lang包源码解读之ProcessBuilder
文章图片

图1 看似毫无破绽的调用过程,这个时候,我们需要仔细的阅读jdk源码注释了,看看ProcessBuilder构造函数中对命令字符串列表command的注释,containing the program and its arguments包含命令和参数。
/** * Constructs a process builder with the specified operating * system program and arguments.This constructor does not * make a copy of the {@code command} list.Subsequent * updates to the list will be reflected in the state of the * process builder.It is not checked whether * {@code command} corresponds to a valid operating system * command. * * @paramcommand the list containing the program and its arguments * @throws NullPointerException if the argument is null */ public ProcessBuilder(List command) { if (command == null) throw new NullPointerException(); this.command = command; }/** * Constructs a process builder with the specified operating * system program and arguments.This is a convenience * constructor that sets the process builder's command to a string * list containing the same strings as the {@code command} * array, in the same order.It is not checked whether * {@code command} corresponds to a valid operating system * command. * * @param command a string array containing the program and its arguments */ public ProcessBuilder(String... command) { this.command = new ArrayList<>(command.length); for (String arg : command) this.command.add(arg); }

?也就是说,传给ProcessBuilder构造shell指令应该将命令和命令所需传入的参数分开,而我们之前的调用方式是将整个杀死进程的脚本一次性传入,所以调用过程会报异常
java.io.IOException: Cannot run program "sudo sh -c "cd; pstree 23764 -p | grep -o '([0-9]*)' | awk -F'[()]' '{print \$2}' | xargs kill -9"": error=2, No such file or directory at java.lang.ProcessBuilder.start(ProcessBuilder.java:1047) at LinuxCommandTest.main(LinuxCommandTest.java:24) Caused by: java.io.IOException: error=2, No such file or directory at java.lang.UNIXProcess.forkAndExec(Native Method) at java.lang.UNIXProcess.(UNIXProcess.java:187) at java.lang.ProcessImpl.start(ProcessImpl.java:130) at java.lang.ProcessBuilder.start(ProcessBuilder.java:1028) ... 1 more Exception in thread "main" java.lang.NullPointerException at LinuxCommandTest.main(LinuxCommandTest.java:30)

?再看自己看一下刚刚那个复杂的kill进程树的shell脚本
sudo sh -c "cd; pstree pid -p | grep -o '([0-9]*)' | awk -F'[()]' '{print \$2}' | xargs kill -9"

sudo是shell的命令,后面的传入的相当于都是shell的参数,因此我们需要把shell命令拆开为不同的部分传入到ProcessBuilder的构造函数中,类似于如下的一个数组
sudo sh -c "cd; pstree pid -p | grep -o '([0-9]*)' | awk -F'[()]' '{print \$2}' | xargs kill -9" 拆分如下: String st = "cd; pstree " + args[0] + " -p | grep -o '([0-9]*)' | awk -F'[()]' '{print $2}' | xargs kill -9"; String[] commands = {"sudo", "sh", "-c", st};

?修改代码调用如下:
package base; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; /** * @author 凌霄 * @time 2017/10/17 * @desc */ public class LinuxCommandTest {public static void main(String[] args) throws IOException {Process process = null; StringBuffer command = new StringBuffer(); String st = "cd; pstree " + args[0] + " -p | grep -o '([0-9]*)' | awk -F'[()]' '{print $2}' | xargs kill -9"; String[] commands = {"sudo", "sh", "-c", st}; ProcessBuilder processBuilder = new ProcessBuilder(commands); try { process = processBuilder.start(); } catch (IOException e) { e.printStackTrace(); } String result = null; String errorresult = null; InputStream in = process.getInputStream(); //得到命令执行的流 BufferedReader br = new BufferedReader(new InputStreamReader(in)); InputStream error = process.getErrorStream(); //得到命令执行的错误流 BufferedReader errorbr = new BufferedReader(new InputStreamReader(error)); String lineStr; while ((lineStr = br.readLine()) != null) { result = lineStr; } br.close(); in.close(); System.out.println("==============================result" + result); while ((lineStr = errorbr.readLine()) != null) { errorresult = lineStr; } errorbr.close(); error.close(); System.out.println("==============================errorresult" + errorresult); try { final int status = process.waitFor(); //阻塞,直到上述命令执行完 } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("执行结束"); }} }

重新修改后调用,完美的解决了shell脚本
sudo sh -c "cd; pstree pid -p | grep -o '([0-9]*)' | awk -F'[()]' '{print \$2}' | xargs kill -9"

调用,能够杀死sqoop脚本产生的进程及其子进程。
完成这项脚本调用测试,接下来我们就可以修改Zeus中的杀死任务源码如下:
public abstract class ProcessJob extends AbstractJob implements Job {......public void cancel(){ try { new CancelHadoopJob(jobContext).run(); } catch (Exception e1) { log(e1); } //强制kill 进程 if (process != null) { log("WARN Attempting to kill the process "); try { process.destroy(); int pid=getProcessId(); String st = "sudo sh -c \"cd; pstree "+pid +" -p | grep -o '([0-9]*)' | awk -F'[()]' '{print \\$2}' | xargs kill -9\""; String[] commands = {"sudo", "sh", "-c", st}; ProcessBuilder processBuilder = new ProcessBuilder(commands); try { process = processBuilder.start(); } catch (Exception e) { log(e); } } catch (Exception e) { log(e); } finally{ process=null; } } }...... }

?虽然修改只是短短的几行代码,但是对于一个分布式项目的源码修改过程的远远不止改完这几行代码这么简单,同时对这个复杂的shell命令的反复调试修改与最后的命令参数拆分,可谓是煞费苦心的调试。后面的分布式集成测试还是相当的需要花费时间和很多测试用例来测试代码逻辑的正确性。

    推荐阅读