/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.job.process.logging;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessage;

public class CppLogMessageHandler
implements Closeable {
    private static final Logger LOGGER = Loggers.getLogger(CppLogMessageHandler.class);
    private static final int DEFAULT_READBUF_SIZE = 1024;
    private static final int DEFAULT_ERROR_STORE_SIZE = 5;
    private final String jobId;
    private final InputStream inputStream;
    private final int readBufSize;
    private final int errorStoreSize;
    private final Deque<String> errorStore;
    private final CountDownLatch pidLatch;
    private final CountDownLatch cppCopyrightLatch;
    private volatile boolean hasLogStreamEnded;
    private volatile boolean seenFatalError;
    private volatile long pid;
    private volatile String cppCopyright;

    public CppLogMessageHandler(String jobId, InputStream inputStream) {
        this(inputStream, jobId, 1024, 5);
    }

    CppLogMessageHandler(InputStream inputStream, String jobId, int readBufSize, int errorStoreSize) {
        this.jobId = jobId;
        this.inputStream = Objects.requireNonNull(inputStream);
        this.readBufSize = readBufSize;
        this.errorStoreSize = errorStoreSize;
        this.errorStore = ConcurrentCollections.newDeque();
        this.pidLatch = new CountDownLatch(1);
        this.cppCopyrightLatch = new CountDownLatch(1);
        this.hasLogStreamEnded = false;
    }

    @Override
    public void close() throws IOException {
        this.inputStream.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void tailStream() throws IOException {
        try {
            XContent xContent = XContentFactory.xContent((XContentType)XContentType.JSON);
            Object bytesRef = null;
            byte[] readBuf = new byte[this.readBufSize];
            int bytesRead = this.inputStream.read(readBuf);
            while (bytesRead != -1) {
                bytesRef = bytesRef == null ? new BytesArray(readBuf, 0, bytesRead) : new CompositeBytesReference(new BytesReference[]{bytesRef, new BytesArray(readBuf, 0, bytesRead)});
                bytesRef = this.parseMessages(xContent, (BytesReference)bytesRef);
                readBuf = new byte[this.readBufSize];
                bytesRead = this.inputStream.read(readBuf);
            }
        }
        finally {
            this.hasLogStreamEnded = true;
        }
    }

    public boolean hasLogStreamEnded() {
        return this.hasLogStreamEnded;
    }

    public boolean seenFatalError() {
        return this.seenFatalError;
    }

    public long getPid(Duration timeout) throws TimeoutException {
        if (this.pid == 0L) {
            try {
                this.pidLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (this.pid == 0L) {
                throw new TimeoutException("Timed out waiting for C++ process PID");
            }
        }
        return this.pid;
    }

    public String getCppCopyright(Duration timeout) throws TimeoutException {
        if (this.cppCopyright == null) {
            try {
                this.cppCopyrightLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (this.cppCopyright == null) {
                throw new TimeoutException("Timed out waiting for C++ process copyright");
            }
        }
        return this.cppCopyright;
    }

    public String getErrors() {
        String[] errorSnapshot = this.errorStore.toArray(new String[0]);
        StringBuilder errors = new StringBuilder();
        for (String error : errorSnapshot) {
            errors.append(error).append('\n');
        }
        return errors.toString();
    }

    private BytesReference parseMessages(XContent xContent, BytesReference bytesRef) {
        int nextMarker;
        byte marker = xContent.streamSeparator();
        int from = 0;
        while ((nextMarker = CppLogMessageHandler.findNextMarker(marker, bytesRef, from)) != -1) {
            if (nextMarker > from) {
                this.parseMessage(xContent, bytesRef.slice(from, nextMarker - from));
            }
            from = nextMarker + 1;
        }
        if (from >= bytesRef.length()) {
            return null;
        }
        return bytesRef.slice(from, bytesRef.length() - from);
    }

    private void parseMessage(XContent xContent, BytesReference bytesRef) {
        try {
            XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY, bytesRef);
            CppLogMessage msg = (CppLogMessage)((Object)CppLogMessage.PARSER.apply(parser, null));
            Level level = Level.getLevel((String)msg.getLevel());
            if (level == null) {
                level = Level.WARN;
            } else if (level.isMoreSpecificThan(Level.ERROR)) {
                this.storeError(msg.getMessage());
                if (level.isMoreSpecificThan(Level.FATAL)) {
                    this.seenFatalError = true;
                }
            }
            long latestPid = msg.getPid();
            if (this.pid != latestPid) {
                this.pid = latestPid;
                this.pidLatch.countDown();
            }
            String latestMessage = msg.getMessage();
            if (this.cppCopyright == null && latestMessage.contains("Copyright")) {
                this.cppCopyright = latestMessage;
                this.cppCopyrightLatch.countDown();
            }
            if (this.jobId != null) {
                LOGGER.log(level, "[{}] [{}/{}] [{}@{}] {}", (Object)this.jobId, (Object)msg.getLogger(), (Object)latestPid, (Object)msg.getFile(), (Object)msg.getLine(), (Object)latestMessage);
            } else {
                LOGGER.log(level, "[{}/{}] [{}@{}] {}", (Object)msg.getLogger(), (Object)latestPid, (Object)msg.getFile(), (Object)msg.getLine(), (Object)latestMessage);
            }
        }
        catch (IOException e) {
            if (this.jobId != null) {
                LOGGER.warn((Message)new ParameterizedMessage("[{}] Failed to parse C++ log message: {}", new Object[]{this.jobId, bytesRef.utf8ToString()}), (Throwable)e);
            }
            LOGGER.warn((Message)new ParameterizedMessage("Failed to parse C++ log message: {}", new Object[]{bytesRef.utf8ToString()}), (Throwable)e);
        }
    }

    private void storeError(String error) {
        if (Strings.isNullOrEmpty((String)error) || this.errorStoreSize <= 0) {
            return;
        }
        if (this.errorStore.size() >= this.errorStoreSize) {
            this.errorStore.removeFirst();
        }
        this.errorStore.offerLast(error);
    }

    private static int findNextMarker(byte marker, BytesReference bytesRef, int from) {
        for (int i = from; i < bytesRef.length(); ++i) {
            if (bytesRef.get(i) != marker) continue;
            return i;
        }
        return -1;
    }
}

