/*
 * Decompiled with CFR 0.152.
 */
package com.liferay.petra.process.local;

import com.liferay.petra.concurrent.AsyncBroker;
import com.liferay.petra.concurrent.DefaultNoticeableFuture;
import com.liferay.petra.concurrent.NoticeableFuture;
import com.liferay.petra.io.ClassLoaderObjectInputStream;
import com.liferay.petra.io.unsync.UnsyncBufferedInputStream;
import com.liferay.petra.io.unsync.UnsyncByteArrayOutputStream;
import com.liferay.petra.process.ProcessCallable;
import com.liferay.petra.process.ProcessChannel;
import com.liferay.petra.process.ProcessConfig;
import com.liferay.petra.process.ProcessException;
import com.liferay.petra.process.ProcessExecutor;
import com.liferay.petra.process.ProcessLog;
import com.liferay.petra.process.TerminationProcessException;
import com.liferay.petra.process.local.AsyncBrokerThreadLocal;
import com.liferay.petra.process.local.LocalProcessChannel;
import com.liferay.petra.process.local.LocalProcessLauncher;
import com.liferay.petra.process.local.LocalProcessLog;
import com.liferay.petra.process.local.ResultProcessCallable;
import com.liferay.petra.string.StringBundler;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.io.StreamCorruptedException;
import java.io.WriteAbortedException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.function.Consumer;

public class LocalProcessExecutor
implements ProcessExecutor {
    @Override
    public <T extends Serializable> ProcessChannel<T> execute(ProcessConfig processConfig, ProcessCallable<T> processCallable) throws ProcessException {
        try {
            List<String> arguments = processConfig.getArguments();
            ArrayList<String> commands = new ArrayList<String>(arguments.size() + 4);
            commands.add(processConfig.getJavaExecutable());
            commands.add("-cp");
            commands.add(processConfig.getBootstrapClassPath());
            commands.addAll(arguments);
            commands.add(LocalProcessLauncher.class.getName());
            ProcessBuilder processBuilder = new ProcessBuilder(commands);
            Map<String, String> environment = processConfig.getEnvironment();
            if (environment != null) {
                Map<String, String> currentEnvironment = processBuilder.environment();
                currentEnvironment.clear();
                currentEnvironment.putAll(environment);
            }
            Process process = processBuilder.start();
            ObjectOutputStream bootstrapObjectOutputStream = new ObjectOutputStream(process.getOutputStream());
            bootstrapObjectOutputStream.writeObject(processCallable.toString());
            bootstrapObjectOutputStream.writeObject(processConfig.getRuntimeClassPath());
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(bootstrapObjectOutputStream);
            objectOutputStream.writeObject(processCallable);
            objectOutputStream.flush();
            AsyncBroker asyncBroker = new AsyncBroker();
            SubprocessReactor subprocessReactor = new SubprocessReactor(process, processConfig.getProcessLogConsumer(), processConfig.getReactClassLoader(), asyncBroker);
            NoticeableFuture noticeableFuture = LocalProcessExecutor._submit(LocalProcessExecutor._buildThreadName(processCallable, arguments), subprocessReactor);
            noticeableFuture.addFutureListener(future -> {
                if (future.isCancelled()) {
                    process.destroy();
                }
            });
            return new LocalProcessChannel(noticeableFuture, objectOutputStream, (AsyncBroker<Long, Serializable>)asyncBroker);
        }
        catch (IOException ioe) {
            throw new ProcessException(ioe);
        }
    }

    private static String _buildThreadName(ProcessCallable<?> processCallable, List<String> arguments) {
        StringBundler sb = new StringBundler(arguments.size() * 2 + 2);
        sb.append(processCallable);
        sb.append("[");
        for (String argument : arguments) {
            sb.append(argument);
            sb.append(" ");
        }
        sb.setStringAt("]", sb.index() - 1);
        sb.append("-");
        return sb.toString();
    }

    private static <T> NoticeableFuture<T> _submit(String threadName, Callable<T> callable) {
        DefaultNoticeableFuture defaultNoticeableFuture = new DefaultNoticeableFuture(callable);
        Thread thread = new Thread((Runnable)defaultNoticeableFuture, threadName);
        thread.setDaemon(true);
        thread.start();
        return defaultNoticeableFuture;
    }

    private class SubprocessReactor<T extends Serializable>
    implements Callable<T> {
        private final AsyncBroker<Long, Serializable> _asyncBroker;
        private final Process _process;
        private final Consumer<ProcessLog> _processLogConsumer;
        private final ClassLoader _reactClassLoader;

        @Override
        public T call() throws Exception {
            ProcessCallable resultProcessCallable = null;
            AsyncBrokerThreadLocal.setAsyncBroker(this._asyncBroker);
            UnsyncBufferedInputStream unsyncBufferedInputStream = new UnsyncBufferedInputStream(this._process.getInputStream());
            try {
                try {
                    ClassLoaderObjectInputStream objectInputStream = null;
                    UnsyncByteArrayOutputStream unsyncByteArrayOutputStream = new UnsyncByteArrayOutputStream();
                    while (true) {
                        try {
                            unsyncBufferedInputStream.mark(4);
                            objectInputStream = new ClassLoaderObjectInputStream((InputStream)unsyncBufferedInputStream, this._reactClassLoader);
                            if (unsyncByteArrayOutputStream.size() > 0) {
                                this._processLogConsumer.accept(new LocalProcessLog(ProcessLog.Level.WARN, "Found corrupt leading log " + unsyncByteArrayOutputStream.toString(), null));
                            }
                            unsyncByteArrayOutputStream = null;
                        }
                        catch (StreamCorruptedException sce) {
                            unsyncBufferedInputStream.reset();
                            unsyncByteArrayOutputStream.write(unsyncBufferedInputStream.read());
                            continue;
                        }
                        break;
                    }
                    while (true) {
                        Object obj = null;
                        try {
                            obj = objectInputStream.readObject();
                        }
                        catch (WriteAbortedException wae) {
                            this._processLogConsumer.accept(new LocalProcessLog(ProcessLog.Level.WARN, "Caught a write aborted exception", wae));
                            continue;
                        }
                        if (!(obj instanceof ProcessCallable)) {
                            this._processLogConsumer.accept(new LocalProcessLog(ProcessLog.Level.INFO, "Received a nonprocess callable piping back " + obj, null));
                            continue;
                        }
                        ProcessCallable processCallable = (ProcessCallable)obj;
                        if (processCallable instanceof ResultProcessCallable) {
                            resultProcessCallable = processCallable;
                            continue;
                        }
                        try {
                            Object returnValue = processCallable.call();
                            this._processLogConsumer.accept(new LocalProcessLog(ProcessLog.Level.DEBUG, StringBundler.concat((Object[])new Object[]{"Invoked generic process callable ", processCallable, " with return value ", returnValue}), null));
                        }
                        catch (Throwable t) {
                            this._processLogConsumer.accept(new LocalProcessLog(ProcessLog.Level.ERROR, "Unable to invoke generic process callable", t));
                        }
                    }
                }
                catch (StreamCorruptedException sce) {
                    Path path = Files.createTempFile("corrupted-stream-dump-", ".log", new FileAttribute[0]);
                    this._processLogConsumer.accept(new LocalProcessLog(ProcessLog.Level.ERROR, "Dumping content of corrupted object input stream to " + path.toAbsolutePath(), sce));
                    Files.copy((InputStream)unsyncBufferedInputStream, path, StandardCopyOption.REPLACE_EXISTING);
                    throw new ProcessException("Corrupted object input stream", sce);
                }
                catch (EOFException eofe) {
                    throw new ProcessException("Subprocess piping back ended prematurely", eofe);
                }
                catch (Throwable t) {
                    this._processLogConsumer.accept(new LocalProcessLog(ProcessLog.Level.ERROR, "Abort subprocess piping", t));
                    throw t;
                }
            }
            catch (Throwable throwable) {
                try {
                    int exitCode = this._process.waitFor();
                    if (exitCode != 0) {
                        throw new TerminationProcessException(exitCode);
                    }
                }
                catch (InterruptedException ie) {
                    this._process.destroy();
                    throw new ProcessException("Forcibly killed subprocess on interruption", ie);
                }
                AsyncBrokerThreadLocal.removeAsyncBroker();
                if (resultProcessCallable != null) {
                    return resultProcessCallable.call();
                }
                throw throwable;
            }
        }

        private SubprocessReactor(Process process, Consumer<ProcessLog> processLogConsumer, ClassLoader reactClassLoader, AsyncBroker<Long, Serializable> asyncBroker) {
            this._process = process;
            this._processLogConsumer = processLogConsumer;
            this._reactClassLoader = reactClassLoader;
            this._asyncBroker = asyncBroker;
        }
    }
}

