Giter VIP home page Giter VIP logo

pp4j's Introduction

PP4J Build Status Quality Gate SonarCloud Coverage Maven Central License Docs

While the standard Java libraries support multithreading based concurrency extensively, they do not support effective multiprocessing out of the box. In most situations, multithreading is more performant; however, the fact that threads share the address spaces of their parent processes means that care needs to be taken to ensure the threads can be safely run concurrently. Multiprocessing guarantees that execution units have their own address spaces and that no data is exchanged between them without explicit inter-process communication. This may be useful if a Java application has to a) execute non-thread-safe or non-reentrant code concurrently or b) invoke native code via JNI/JNA without the risk of crashing the main JVM. PP4J (Process Pool for Java) is a multiprocessing library for Java that provides a flexible API and process executor implementations to help satisfy the above requirements.

Java Process Pool

PP4J includes a Java process pool implementation that uses JVM instances to execute tasks in separate processes. This class, JavaProcessPoolExecutor, implements the JavaProcessExecutorService interface which extends the ExecutorService interface. This allows it to be used similarly to the standard Java thread pools with the only difference that the tasks submitted must implement the Serializable interface. This implicit requirement enables the pool to serialize and encode the tasks before sending them to the JVM instances for execution. The JVM instances return the results of the tasks—or the exceptions thrown—the same way, which requires the return values of the tasks to be serializable as well.

JavaProcessConfig jvmConfig = new SimpleJavaProcessConfig(JVMType.CLIENT, 2, 8, 256);
JavaProcessExecutorService jvmPool = new JavaProcessPoolExecutor(new JavaProcessManagerFactory<>(jvmConfig), 10, 20, 2);

The code snippet above demonstrates the construction of a JavaProcessPoolExecutor instance. The first argument of the constructor is a JavaProcessManagerFactory instance that is responsible for creating the process managers for the pool's processes. The process manager factory's constructor takes an instance of the JavaProcessConfig interface, which allows for the definition of different settings to use for the JVMs. These settings include the architecture, type, minimum heap size, maximum heap size, and stack size of the JVM. Besides these, it also allows for the specification of the Java application launcher command if a simple java does not suffice, and for the definition of additional class paths to load classes from. Other, optional arguments of the process manager factory's constuctor include a serializable Runnable task that is executed in every Java process on startup, a wrap-up task of the same type that is executed in every process before it's terminated, and the timeout value of the Java processes which specifies after how many milliseconds of idleness the processes should be terminated. The first argument after the process manager factory is the minimum size of the pool. This is the minimum number of JVM instances the process pool will strive to maintain, even if the submission queue is empty. The second argument is the maximum size. The number of JVM instances maintained by the pool is guaranteed to never exceed this value. The third argument is the reserve size. This is the minimum number of available, i.e. idle, JVM instances the pool will strive to maintain at all times. It is important to note that the constructor of JavaProcessExecutorService blocks until the minimum number of JVM processes have successfully started up. Specifying a startup task negatively affects the startup times of the processes; however, it may significantly reduce initial submission execution delays by ensuring that the JVM instances load some of the required classes beforehand. Moreover, the JVM config can be used to limit the heap sizes of the JVM processes, thus enabling the running of a great number of them without taking up too much RAM.

Random rand = new Random();
List<Future<Long>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
  futures.add(jvmPool.submit((Callable<Long> & Serializable) () -> {
    Thread.sleep(1000);
    return rand.nextLong();
  }));
}
for (Future<Long> future : futures) {
  System.out.println(future.get());
}
jvmPool.shutdown();
jvmPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);

Lambda expressions may be defined as serializable by casting them to an intersection of types as shown in the above snippet. Although somewhat slower than multithreading, the execution of the code above still takes only a few milliseconds longer than the desired one second. This example also demonstrates the primary difference between multithreading and multiprocessing, i.e. processes have their own address spaces as opposed to threads. All the invocations of the nextLong method of rand return the same value as each process has its own copy of the object.

While it is possible to achieve good performance using the Java process pool, the overhead of starting up Java processes and getting the JVMs in high gear can be quite significant. The usage of native processes, whenever possible, allows for superior performance. If the objective is the parallel execution of non-thread-safe or non-reentrant native code, pools of native processes are almost always a better choice. They might require the writing of an executable wrapper program, but they eliminate the need for JNI/JNA and their performance exceeds that of Java process pools. The following section introduces the flexible API and process pool implementation that JavaProcessPoolExecutor is built upon.

Process Pool

The high level design diagram below sums up the mechanics of the core process pool of the PP4J library. This process pool maintains a number of processes that implement a communication protocol over their standard streams (possibly to expose methods of a native library). It also accepts textual command submissions that it then delegates to available processes. These submissions honour the communication protocol as well and are responsible for handling the responses of the processes they have been delegated to. Through callback methods, the submissions notify their submitters when the processes are done processing them. The pool may also adjust its size dynamically to maintain its throughput. It does so via process managers that may or may not need to communicate with the processes. To explain how such a process pool can be set up, the following sections introduce the library's base API.

arch

All process pools of PP4J implement the ProcessExecutorService interface. The standard process pool, ProcessPoolExecutor, communicates with the processes via their standard streams. Instances of this process pool can be created by invoking the constructor directly. The first parameter of the constructor is an implementation of the ProcessManagerFactory functional interface for creating new instances of an implementation of the ProcessManager interface. These instances are responsible for specifying the processes and handling their startup and possibly termination. Other parameters include the minimum and maximum size of the pool and its reserve size. The size of the pool is always kept between the minimum pool size and the maximum pool size (both inclusive). Once the process pool is initialized, it accepts commands in the form of Submission instances which contain one or more Command instances. The submission is assigned to any one of the available processes in the pool. While executing a submission, the process cannot accept further submissions. The commands allow for communication with a process via its standard in and standard out/error streams. The implementation of the Command interface specifies the instruction to send to the process' standard in and handles the output generated by the process as a response to the instruction. Moreover, the implementation also determines when the instruction may be considered processed and therefore when the process is ready for the next instruction. The PP4J library also provides some standard implementations of the ProcessManager, Submission, and Command interfaces to allow for the concise definition of process pooling systems for typical situations.

ProcessManagerFactory processManagerFactory = () -> new SimpleProcessManager(new ProcessBuilder("test.exe"),
    Charset.defaultCharset(),
    (outputLine, startupOutputStore) -> "hi".equals(outputLine),
    60000L,
    () -> new SimpleSubmission<>(new SimpleCommand("start", (outputLine, commandOutputStore) -> "ok".equals(outputLine))),
    () -> new SimpleSubmission<>(new SimpleCommand("stop", (outputLine, commandOutputStore) -> "bye".equals(outputLine))));
ProcessExecutorService pool = new ProcessPoolExecutor(processManagerFactory, 10, 50, 5);

In the example above, a process pool for instances of a program called "test.exe" is created. Every time the pool starts a new process, it waits until the message "hi" is output to the process' standard out, signaling that it has started up. By default, if the process outputs something to its standard error stream, the process manager considers the startup unsuccessful and throws a FailedStartupException which results in the shutdown of the pool. This behaviour can be configured by defining a predicate to handle the standard error output of the process during startup. If everything goes to plan, after a successful startup, the manager sends the instruction "start" to the process' standard in. The instruction "start" has the process perform some startup activities before it outputs "ok". Once this message is output to the process' standard out, the manager considers the process ready for submissions. By default, when something is output to the process' standard error stream during command execution, a FailedCommandException is thrown. This behaviour can be overriden by specifying an additional predicate for the standard error stream as the third argument of the constructor. Throwing a FailedCommandException from these predicates or from the isCompleted method (when implementing the Command interface directly) is indicative of the completion of the command and results in the abortion of the execution of the submission. Whenever the process needs to be terminated (either due to timing out or cancellation after the execution of a submission), the pool tries to terminate the process in an orderly way by sending it the "stop" instruction. If the response to this is "bye", the process is considered terminated. However, if something is printed to the process' standard error stream in response to the "stop" instruction, the process is killed forcibly. As specified by the fourth argument of the constructor of SimpleProcessManager, processes in the pool are terminated after 1 minute of idleness. The pool's minimum size is 10, its maximum size is 50, and its reserve size is 5.

List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 30; i++) {
  Thread.sleep(100);
  Submission<?> submission = new SimpleSubmission<>(new SimpleCommand("process 5",
      (outputLine, commandOutputStore) -> "ready".equals(outputLine)));
  futures.add(pool.submit(submission, true));
}
pool.shutdown();
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);

Once the pool is initialized, it is sent 30 instructions within 3 seconds. The instruction "process 5" has the process sleep for 5 seconds, printing "in progress" to its standard out every second except for the 5th second, when it prints "ready". This final output signals the completion of the execution of the command. The submissions above also result in the termination of the executing processes as denoted by the second, boolean parameter of the submit method of the pool. As the pool receives the submissions, it manages its size according to its minimum, maximum, and reserve size parameters. After the execution of the submissions, the pool is shutdown identically in behaviour to Java thread pools.

PP4J also allows for submissions to have return values. The result of a submission can be set, usually while processing the output of its commands, through the setResult method of the AbstractSubmission class. However, to be able to invoke the setResult method of the submission from the command implementations, the AbstractSubmission class must be extended and the commands must be defined in the implementation of the getCommands method. To make the definition of submissions with a return value simpler, the SimpleSubmission class has special constructors that take the result object as a parameter. As shown in the code snippet below, this result parameter can be mutated conveniently in the command definitions without the need for a reference to the submission instance. At last, the final result can be accessed through the Future instance returned by the process pool's submit method or directly through invoking the getResult method of the submission itself after its execution is completed.

ProcessManagerFactory processManagerFactory = () -> new SimpleProcessManager(new ProcessBuilder("bash"),
    Charset.defaultCharset());
ProcessExecutorService pool = new ProcessPoolExecutor(processManagerFactory, 10, 10, 0);
List<Future<AtomicReference<String>>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
  AtomicReference<String> result = new AtomicReference<>();
  Command command = new SimpleCommand("echo user:$USER", (outputLine, commandOutputStore) -> {
    if (outputLine.startsWith("user:")) {
      result.set(outputLine.substring(5));
      return true;
    }
    return false;
  });
  Submission<AtomicReference<String>> submission = new SimpleSubmission<>(command, result);
  futures.add(pool.submit(submission));
}
for (Future<AtomicReference<String>> future : futures) {
  System.out.println(future.get().get());
}
pool.shutdown();
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);

The above examples demonstrate both the flexibility of the API and the effective usage of some of the default implementations of the ProcessManager, Submission, and Command interfaces for the concise definition of process pools and tasks to parallelize.

Process Executor

Besides the process pools, PP4J also provides a standard implementation of the ProcessExecutor interface, SimpleProcessExecutor for the running of single processes without pooling. This, as presented below, allows for the synchronous execution of submissions in a single separate process with ease.

ProcessManager processManager = new SimpleProcessManager(new ProcessBuilder("cmd.exe"), Charset.defaultCharset());
SimpleCommand command = new SimpleCommand("netstat & echo netstat done",
    (outputLine, commandOutputStore) -> "netstat done".equals(outputLine));
SimpleSubmission<?> submission = new SimpleSubmission<>(command);
try (SimpleProcessExecutor executor = new SimpleProcessExecutor(processManager)) {
  executor.start();
  executor.execute(submission);
  System.out.println(command.getCommandOutputStore().getJointStandardOutLines());
}

Logging

PP4J uses SLF4J for logging which can be bound to different logging frameworks, such as Logback, that usually allow for the configuration of both the format and granularity of logging. This can be really helpful when setting up complex process pools or executors to understand the behaviour of the managed processes and submissions.

pp4j's People

Contributors

torrescd avatar viktorc avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

pp4j's Issues

Question about updateSubmissionQueue()

Hi again,

Firstly, thank you again for helping me on previous topic about JavaProcess.exit() !

Still on that topic, I had another question actually...

While making some tests, I realized that in some rare cases, when there is a crash in C++ part, it may terminates the Java.exe process without throwing any exception to the Java part (I suppose it depends how much the memory is corrupted...).

Problem is that is such situation, I can't throw any exception to cancel the task "Submission" and now the issue I have is coming from this part of code in InternalProcessExecutor.updateSubmissionQueue(InternalSubmission<?> submission):
// If the execute method failed and there was no exception thrown, put the submission back into the queue at the front. submission.setThread(null); submissionQueue.addFirst(submission);

Because of this and because the Submission is not considered as finished, it rerun it again and the process terminates again, and rerun it, etc... forever.

Actually, I am not sure what should be done here, because I understand what was the purpose of this part of code.
However, I think it would help me if I could configure the ProcessExecutor behavior to cancel Submission and raise an exception if for any reason the Process that was running it terminates with an error code.

What do you think ?

Thank you !

Using Modules for new Java Process (for JavaFX, for example)

Hi,

is it possible to include modules / module paths in created java processes?

I'm trying to spawn JavaFx Applications in their own processes to encapsulate them, but have a small GUI just to give a visual indicator of theri progress. But since JavaFX ist now separated, it needs to be added as modules... Without that, the ClassLoader can't find the JavaFX Application-class...

See here for an example: https://blog.idrsolutions.com/using-javafx-with-java-11/

When I create a simple JavaProcessPool new JavaProcessPoolExecutor(new JavaProcessManagerFactory<>(new SimpleJavaProcessConfig()), ... ), the classpath of the calling application will be in the child process as well, but this isn't true for modules.

I have outputted the module path in the calling app an in the child app (System.getProperty("jdk.module.path")) - for the calling, it contains javaFx, for the child it's just null.

It would be great, if the modules from the calling app could be added to the child process / to the child process's module path (–module-path ... and –add-modules ...). Or is there any workaround to get this happening?

Any thoughts? :-)

Pool fails to process submissions > 16

Hi,

Ive built a MatlabExecutor class that starts a static pool as follows:

static {
		JavaProcessOptions jvmConfig = new SimpleJavaProcessOptions(JVMArch.BIT_64, JVMType.CLIENT, 128, 256, 256, 5*60*1000);
		
		try {
			jvmPool = new JavaProcessPoolExecutor(jvmConfig,4, 8, 1, null, true);
			System.out.println("Started jvmPool");
		} catch (InterruptedException e) {
			logger.error(e,e);
		}
	}

It will eventually be a servlet, so it has a doPost(request,response) method, but at present it just runs via a test that calls the MatlabExecutor wrapped in a runnable. eg

	@Test
	public void shouldProcessCalc() throws ServletException, IOException {
	
		for(int x=0;x<16;x++) {
			Runnable run = new Runnable() {
				
				@Override
				public void run() {
					MockHttpServletRequest request = new MockHttpServletRequest();
					request.setContent(requestJson.getBytes());
					MockHttpServletResponse response = new MockHttpServletResponse();
					try {
						new ModelScoreExecutor().doPost(request, response);
						logger.debug("Response Status: {}",response.getStatus());
						String responseStr = response.getContentAsString();
						logger.debug("Response Content: {}",responseStr);
					} catch (ServletException | IOException e) {
						logger.error(e,e);
					}
				}
			};
			new Thread(run).start();
		}
}

the important part of the doPost method is:

			String input = IOUtils.toString(request.getReader());
			Json json = Json.read(input);

			java.util.concurrent.Future<String> future =  jvmPool.submit(new Calc(json),true);

			String out = future.get(1, TimeUnit.MINUTES);		
			response.setStatus(HttpServletResponse.SC_OK);
			

Calc.java is:

	public String call() throws Exception {
		String p1 = json.at("rhs").at(0).asString();
		String p2 = json.at("rhs").at(1).asString();

		List<Object> p = MatlabUtil.getModelScoreInputFromRhs(json);
//makes a JNI call to native matlab calculation
		double[][] s = MatlabUtil.executeModelScore(new CPFunctions_MCR.ModelScore(), p1, p2, p);
		return MatlabUtil.convert2Lhs(s);

	}

So now the problems.

  1. If I run the above with "jvmPool.submit(new Calc(json), false);" the I get about 8 successful calcs, and then concurrent exceptions and stream corruptions. It seems like the submissions are not truly atomic and the JVM or streams are left in a bad state.
    Ideally I want to reuse JVMs for efficiency to avoid startup delays.
ERROR com.cp.matlab.servlet.MatlabExecutor.doPost(MatlabExecutor.java:141) - java.util.concurrent.ExecutionException: java.io.StreamCorruptedException: invalid type code: E0
java.util.concurrent.ExecutionException: java.io.StreamCorruptedException: invalid type code: E0
	at net.viktorc.pp4j.impl.JavaProcessPoolExecutor$JavaSubmission.getResult(JavaProcessPoolExecutor.java:452) ~[pp4j-2.2.jar:?]
	at net.viktorc.pp4j.impl.JavaProcessPoolExecutor$JavaSubmission.getResult(JavaProcessPoolExecutor.java:409) ~[pp4j-2.2.jar:?]
	at net.viktorc.pp4j.impl.ProcessPoolExecutor$InternalSubmission.getResult(ProcessPoolExecutor.java:471) ~[pp4j-2.2.jar:?]
	at net.viktorc.pp4j.impl.ProcessPoolExecutor$InternalSubmissionFuture.get(ProcessPoolExecutor.java:569) ~[pp4j-2.2.jar:?]
	at net.viktorc.pp4j.impl.JavaProcessPoolExecutor$CastFuture.get(JavaProcessPoolExecutor.java:502) ~[pp4j-2.2.jar:?]
	at com.cp.matlab.servlet.MatlabExecutor.doPost(MatlabExecutor.java:130) [classes/:?]
	at com.cp.matlab.servlet.CalcExecutorTest$1.run(CalcExecutorTest.java:65) [test-classes/:?]
  1. If I use "jvmPool.submit(new Calc(json),true);" then it works, but not all submissions are completed. I typically get 15 of 16 completed, but if i send 24 submissions i also get 15 completed. The rest dont show errors, but are never processed. Something in the submission queue maybe?

Its possible there is a better way to do this, I'm open to suggestions.

How to kill a process?

For me, one reason to use separate processes, is to be able to terminate them, for example if they are running too long or aren't responding anymore. The child-processe's code isn't always behaving wrt. thread interrupts, so I need to have the big gun and kill the process.

In PP4J it seems to be possible in AbstractProcessExecutor, but for the JavaProcessPool, I don't see any chance to get there... Cancelling the returned Future doesn't have any effect on the process.

Question about task submit and ProcessPool

Hi,

Firstly, congratulation for this great library, it is very helpful. I just had a question about it (not an issue) !

I use this library to call some C++ code and I was wondering if it could be possible to terminate a process in which a crash happened while running a task, so that it won't be used anymore for new tasks.
The goal is to stop using process in which unmanaged C++ memory may be corrupted even if the crash exception was catched correctly.

For ex:
jvmPool.submit((Callable & Serializable) () -> {
SomeCppFunctionCall(); // A CRASH HAPPENS HERE
return rand.nextLong();
}));

Is there a way to "tell" the ProcessPool to terminate the current process in which the crash happened ?

Thank you !

Can we use PP4J inside a web application deployed on tomcat?

Hi,

I am trying to use PP4J as part of a web application deployed on tomcat. I am initializing the JVM pool as part of Spring bean initialization. I see that it gets stuck on the initialization and doesnt allow the application to be deployed. Do you know why that happens?

Can you throw some light on that?

thanks,
Pradeep B.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.