/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.shuffle.common;

import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import javax.crypto.SecretKey;
import javax.net.ssl.HttpsURLConnection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleHeader;
import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchResult;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;

public class Fetcher
implements Callable<FetchResult> {
    private static final Log LOG = LogFactory.getLog(Fetcher.class);
    private static final int UNIT_CONNECT_TIMEOUT = 60000;
    private static final AtomicInteger fetcherIdGen = new AtomicInteger(0);
    private CompressionCodec codec;
    private int connectionTimeout;
    private int readTimeout;
    private boolean ifileReadAhead = true;
    private int ifileReadAheadLength = 0x400000;
    private final SecretKey shuffleSecret;
    private final FetcherCallback fetcherCallback;
    private final FetchedInputAllocator inputManager;
    private final ApplicationId appId;
    private static boolean sslShuffle = false;
    private static SSLFactory sslFactory;
    private static boolean sslFactoryInited;
    private final int fetcherIdentifier;
    private List<InputAttemptIdentifier> srcAttempts;
    private String host;
    private int port;
    private int partition;
    private final Map<String, InputAttemptIdentifier> pathToAttemptMap;
    private LinkedHashSet<InputAttemptIdentifier> remaining;
    private URL url;
    private String encHash;
    private String msgToEncode;

    private Fetcher(FetcherCallback fetcherCallback, FetchedInputAllocator inputManager, ApplicationId appId, SecretKey shuffleSecret, Configuration conf) {
        this.fetcherCallback = fetcherCallback;
        this.inputManager = inputManager;
        this.shuffleSecret = shuffleSecret;
        this.appId = appId;
        this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
        this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
    }

    @Override
    public FetchResult call() throws Exception {
        DataInputStream input;
        HttpURLConnection connection;
        if (this.srcAttempts.size() == 0) {
            return new FetchResult(this.host, this.port, this.partition, this.srcAttempts);
        }
        for (InputAttemptIdentifier in : this.srcAttempts) {
            this.pathToAttemptMap.put(in.getPathComponent(), in);
        }
        this.remaining = new LinkedHashSet<InputAttemptIdentifier>(this.srcAttempts);
        try {
            connection = this.connectToShuffleHandler(this.host, this.port, this.partition, this.srcAttempts);
        }
        catch (IOException e) {
            Iterator leftIter = this.remaining.iterator();
            while (leftIter.hasNext()) {
                this.fetcherCallback.fetchFailed(this.host, (InputAttemptIdentifier)leftIter.next(), true);
            }
            return new FetchResult(this.host, this.port, this.partition, this.remaining);
        }
        try {
            input = new DataInputStream(connection.getInputStream());
            this.validateConnectionResponse(connection, this.url, this.msgToEncode, this.encHash);
        }
        catch (IOException e) {
            InputAttemptIdentifier firstAttempt = this.srcAttempts.get(0);
            LOG.warn((Object)("Fetch Failure from host while connecting: " + this.host + ", attempt: " + firstAttempt + " Informing ShuffleManager: "), (Throwable)e);
            this.fetcherCallback.fetchFailed(this.host, firstAttempt, false);
            return new FetchResult(this.host, this.port, this.partition, this.remaining);
        }
        Object[] failedInputs = null;
        while (!this.remaining.isEmpty() && failedInputs == null) {
            failedInputs = this.fetchInputs(input);
        }
        if (failedInputs != null && failedInputs.length > 0) {
            LOG.warn((Object)("copyInputs failed for tasks " + Arrays.toString(failedInputs)));
            for (Object left : failedInputs) {
                this.fetcherCallback.fetchFailed(this.host, (InputAttemptIdentifier)left, false);
            }
        }
        IOUtils.cleanup((Log)LOG, (Closeable[])new Closeable[]{input});
        if (failedInputs == null && !this.remaining.isEmpty()) {
            throw new IOException("server didn't return all expected map outputs: " + this.remaining.size() + " left.");
        }
        return new FetchResult(this.host, this.port, this.partition, this.remaining);
    }

    private InputAttemptIdentifier[] fetchInputs(DataInputStream input) {
        FetchedInput fetchedInput = null;
        InputAttemptIdentifier srcAttemptId = null;
        long decompressedLength = -1L;
        long compressedLength = -1L;
        try {
            long startTime = System.currentTimeMillis();
            int responsePartition = -1;
            String pathComponent = null;
            try {
                ShuffleHeader header = new ShuffleHeader();
                header.readFields(input);
                pathComponent = header.getMapId();
                srcAttemptId = this.pathToAttemptMap.get(pathComponent);
                compressedLength = header.getCompressedLength();
                decompressedLength = header.getUncompressedLength();
                responsePartition = header.getPartition();
            }
            catch (IllegalArgumentException e) {
                LOG.warn((Object)"Invalid src id ", (Throwable)e);
                return this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]);
            }
            if (!this.verifySanity(compressedLength, decompressedLength, responsePartition, srcAttemptId, pathComponent)) {
                if (srcAttemptId == null) {
                    LOG.warn((Object)("Was expecting " + this.getNextRemainingAttempt() + " but got null"));
                    srcAttemptId = this.getNextRemainingAttempt();
                }
                assert (srcAttemptId != null);
                return new InputAttemptIdentifier[]{srcAttemptId};
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("header: " + srcAttemptId + ", len: " + compressedLength + ", decomp len: " + decompressedLength));
            }
            fetchedInput = this.inputManager.allocate(decompressedLength, compressedLength, srcAttemptId);
            LOG.info((Object)("fetcher about to shuffle output of srcAttempt " + fetchedInput.getInputAttemptIdentifier() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + (Object)((Object)fetchedInput.getType())));
            if (fetchedInput.getType() == FetchedInput.Type.MEMORY) {
                ShuffleUtils.shuffleToMemory((MemoryFetchedInput)fetchedInput, input, (int)decompressedLength, (int)compressedLength, this.codec, this.ifileReadAhead, this.ifileReadAheadLength, LOG);
            } else {
                ShuffleUtils.shuffleToDisk((DiskFetchedInput)fetchedInput, input, compressedLength, LOG);
            }
            long endTime = System.currentTimeMillis();
            this.fetcherCallback.fetchSucceeded(this.host, srcAttemptId, fetchedInput, compressedLength, endTime - startTime);
            this.remaining.remove(srcAttemptId);
            return null;
        }
        catch (IOException ioe) {
            if (srcAttemptId == null || fetchedInput == null) {
                LOG.info((Object)("fetcher failed to read map header" + srcAttemptId + " decomp: " + decompressedLength + ", " + compressedLength), (Throwable)ioe);
                if (srcAttemptId == null) {
                    return this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]);
                }
                return new InputAttemptIdentifier[]{srcAttemptId};
            }
            LOG.warn((Object)("Failed to shuffle output of " + srcAttemptId + " from " + this.host), (Throwable)ioe);
            try {
                fetchedInput.abort();
            }
            catch (IOException e) {
                LOG.info((Object)("Failure to cleanup fetchedInput: " + fetchedInput));
            }
            return new InputAttemptIdentifier[]{srcAttemptId};
        }
    }

    private boolean verifySanity(long compressedLength, long decompressedLength, int fetchPartition, InputAttemptIdentifier srcAttemptId, String pathComponent) {
        if (compressedLength < 0L || decompressedLength < 0L) {
            LOG.warn((Object)(" invalid lengths in input header -> headerPathComponent: " + pathComponent + ", nextRemainingSrcAttemptId: " + this.getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId + " len: " + compressedLength + ", decomp len: " + decompressedLength));
            return false;
        }
        if (fetchPartition != this.partition) {
            LOG.warn((Object)(" data for the wrong reduce -> headerPathComponent: " + pathComponent + "nextRemainingSrcAttemptId: " + this.getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId + " len: " + compressedLength + " decomp len: " + decompressedLength + " for reduce " + fetchPartition));
            return false;
        }
        if (!this.remaining.contains(srcAttemptId)) {
            LOG.warn((Object)("Invalid input. Received output for headerPathComponent: " + pathComponent + "nextRemainingSrcAttemptId: " + this.getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId));
            return false;
        }
        return true;
    }

    private InputAttemptIdentifier getNextRemainingAttempt() {
        if (this.remaining.size() > 0) {
            return (InputAttemptIdentifier)this.remaining.iterator().next();
        }
        return null;
    }

    private HttpURLConnection connectToShuffleHandler(String host, int port, int partition, List<InputAttemptIdentifier> inputs) throws IOException {
        try {
            this.url = this.constructInputURL(host, port, partition, inputs);
            HttpURLConnection connection = this.openConnection(this.url);
            this.msgToEncode = SecureShuffleUtils.buildMsgFrom(this.url);
            this.encHash = SecureShuffleUtils.hashFromString(this.msgToEncode, this.shuffleSecret);
            connection.addRequestProperty("UrlHash", this.encHash);
            connection.setReadTimeout(this.readTimeout);
            connection.addRequestProperty("name", "mapreduce");
            connection.addRequestProperty("version", "1.0.0");
            this.connect(connection, this.connectionTimeout);
            return connection;
        }
        catch (IOException e) {
            LOG.warn((Object)("Failed to connect to " + host + " with " + this.srcAttempts.size() + " inputs"), (Throwable)e);
            throw e;
        }
    }

    private void validateConnectionResponse(HttpURLConnection connection, URL url, String msgToEncode, String encHash) throws IOException {
        int rc = connection.getResponseCode();
        if (rc != 200) {
            throw new IOException("Got invalid response code " + rc + " from " + url + ": " + connection.getResponseMessage());
        }
        if (!"mapreduce".equals(connection.getHeaderField("name")) || !"1.0.0".equals(connection.getHeaderField("version"))) {
            throw new IOException("Incompatible shuffle response version");
        }
        String replyHash = connection.getHeaderField("ReplyHash");
        if (replyHash == null) {
            throw new IOException("security validation of TT Map output failed");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("url=" + msgToEncode + ";encHash=" + encHash + ";replyHash=" + replyHash));
        }
        SecureShuffleUtils.verifyReply(replyHash, encHash, this.shuffleSecret);
        LOG.info((Object)("for url=" + msgToEncode + " sent hash and receievd reply"));
    }

    protected HttpURLConnection openConnection(URL url) throws IOException {
        HttpURLConnection conn = (HttpURLConnection)url.openConnection();
        if (sslShuffle) {
            HttpsURLConnection httpsConn = (HttpsURLConnection)conn;
            try {
                httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
            }
            catch (GeneralSecurityException ex) {
                throw new IOException(ex);
            }
            httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
        }
        return conn;
    }

    private void connect(URLConnection connection, int connectionTimeout) throws IOException {
        int unit = 0;
        if (connectionTimeout < 0) {
            throw new IOException("Invalid timeout [timeout = " + connectionTimeout + " ms]");
        }
        if (connectionTimeout > 0) {
            unit = Math.min(60000, connectionTimeout);
        }
        connection.setConnectTimeout(unit);
        while (true) {
            try {
                connection.connect();
            }
            catch (IOException ioe) {
                if ((connectionTimeout -= unit) == 0) {
                    throw ioe;
                }
                if (connectionTimeout >= unit) continue;
                unit = connectionTimeout;
                connection.setConnectTimeout(unit);
                continue;
            }
            break;
        }
    }

    private URL constructInputURL(String host, int port, int partition, List<InputAttemptIdentifier> inputs) throws MalformedURLException {
        StringBuilder url = ShuffleUtils.constructBaseURIForShuffleHandler(host, port, partition, this.appId);
        boolean first = true;
        for (InputAttemptIdentifier input : inputs) {
            if (first) {
                first = false;
                url.append(input.getPathComponent());
                continue;
            }
            url.append(",").append(input.getPathComponent());
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("InputFetch URL for: " + host + " : " + url.toString()));
        }
        return new URL(url.toString());
    }

    public int hashCode() {
        return this.fetcherIdentifier;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null) {
            return false;
        }
        if (this.getClass() != obj.getClass()) {
            return false;
        }
        Fetcher other = (Fetcher)obj;
        return this.fetcherIdentifier == other.fetcherIdentifier;
    }

    public static class FetcherBuilder {
        private Fetcher fetcher;
        private boolean workAssigned = false;

        public FetcherBuilder(FetcherCallback fetcherCallback, FetchedInputAllocator inputManager, ApplicationId appId, SecretKey shuffleSecret, Configuration conf) {
            this.fetcher = new Fetcher(fetcherCallback, inputManager, appId, shuffleSecret, conf);
        }

        public FetcherBuilder setCompressionParameters(CompressionCodec codec) {
            this.fetcher.codec = codec;
            return this;
        }

        public FetcherBuilder setConnectionParameters(int connectionTimeout, int readTimeout) {
            this.fetcher.connectionTimeout = connectionTimeout;
            this.fetcher.readTimeout = readTimeout;
            return this;
        }

        public FetcherBuilder setIFileParams(boolean readAhead, int readAheadBytes) {
            this.fetcher.ifileReadAhead = readAhead;
            this.fetcher.ifileReadAheadLength = readAheadBytes;
            return this;
        }

        public FetcherBuilder assignWork(String host, int port, int partition, List<InputAttemptIdentifier> inputs) {
            this.fetcher.host = host;
            this.fetcher.port = port;
            this.fetcher.partition = partition;
            this.fetcher.srcAttempts = inputs;
            this.workAssigned = true;
            return this;
        }

        public Fetcher build() {
            Preconditions.checkState((this.workAssigned ? 1 : 0) != 0, (Object)"Cannot build a fetcher withot assigning work to it");
            return this.fetcher;
        }
    }
}

