/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.direct_java.runners.core.construction;

import com.google.auto.value.AutoValue;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.jobmanagement.v1.LegacyArtifactStagingServiceGrpc;
import org.apache.beam.repackaged.direct_java.runners.core.construction.AutoValue_ArtifactServiceStager_StagedFile;
import org.apache.beam.repackaged.direct_java.runners.core.construction.AutoValue_ArtifactServiceStager_StagingResult;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.sdk.util.ThrowingSupplier;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hasher;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ArtifactServiceStager {
    private static final int DEFAULT_BUFFER_SIZE = 0x200000;
    private static final Logger LOG = LoggerFactory.getLogger(ArtifactServiceStager.class);
    private final int bufferSize;
    private final LegacyArtifactStagingServiceGrpc.LegacyArtifactStagingServiceStub stub;
    private final LegacyArtifactStagingServiceGrpc.LegacyArtifactStagingServiceBlockingStub blockingStub;
    private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator((ExecutorService)Executors.newCachedThreadPool());

    public static ArtifactServiceStager overChannel(Channel channel) {
        return ArtifactServiceStager.overChannel(channel, 0x200000);
    }

    static ArtifactServiceStager overChannel(Channel channel, int bufferSize) {
        return new ArtifactServiceStager(channel, bufferSize);
    }

    private ArtifactServiceStager(Channel channel, int bufferSize) {
        this.stub = LegacyArtifactStagingServiceGrpc.newStub((Channel)channel);
        this.blockingStub = LegacyArtifactStagingServiceGrpc.newBlockingStub((Channel)channel);
        this.bufferSize = bufferSize;
    }

    public String stage(String stagingSessionToken, Collection<StagedFile> files) throws IOException, InterruptedException {
        HashMap<StagedFile, CompletionStage> futures = new HashMap<StagedFile, CompletionStage>();
        LOG.info("Staging {} files (token: {})", (Object)files.size(), (Object)stagingSessionToken);
        for (StagedFile file : files) {
            futures.put(file, MoreFutures.supplyAsync((ThrowingSupplier)new StagingCallable(stagingSessionToken, file), (ExecutorService)this.executorService));
        }
        CompletionStage<StagingResult> stagingResult = MoreFutures.allAsList(futures.values()).thenApply(ignored -> new ExtractStagingResultsCallable(futures).call());
        return this.stageManifest(stagingSessionToken, stagingResult);
    }

    private String stageManifest(String stagingSessionToken, CompletionStage<StagingResult> stagingFuture) throws InterruptedException {
        try {
            StagingResult stagingResult = (StagingResult)MoreFutures.get(stagingFuture);
            if (stagingResult.isSuccess()) {
                LOG.info("Staged {} files (token: {})", (Object)stagingResult.getMetadata().size(), (Object)stagingSessionToken);
                ArtifactApi.Manifest manifest = ArtifactApi.Manifest.newBuilder().addAllArtifact(stagingResult.getMetadata()).build();
                ArtifactApi.CommitManifestResponse response = this.blockingStub.commitManifest(ArtifactApi.CommitManifestRequest.newBuilder().setStagingSessionToken(stagingSessionToken).setManifest(manifest).build());
                return response.getRetrievalToken();
            }
            RuntimeException failure = new RuntimeException(String.format("Failed to stage %s files: %s", stagingResult.getFailures().size(), stagingResult.getFailures().keySet()));
            for (Throwable t : stagingResult.getFailures().values()) {
                failure.addSuppressed(t);
            }
            throw failure;
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @AutoValue
    static abstract class StagingResult {
        StagingResult() {
        }

        static StagingResult success(Set<ArtifactApi.ArtifactMetadata> metadata) {
            return new AutoValue_ArtifactServiceStager_StagingResult(metadata, Collections.emptyMap());
        }

        static StagingResult failure(Map<StagedFile, Throwable> failures) {
            return new AutoValue_ArtifactServiceStager_StagingResult(null, failures);
        }

        boolean isSuccess() {
            return this.getMetadata() != null;
        }

        @Nullable
        abstract Set<ArtifactApi.ArtifactMetadata> getMetadata();

        abstract Map<StagedFile, Throwable> getFailures();
    }

    @AutoValue
    public static abstract class StagedFile {
        public static StagedFile of(File file, String stagingName) {
            return new AutoValue_ArtifactServiceStager_StagedFile(file, stagingName);
        }

        public abstract File getFile();

        public abstract String getStagingName();
    }

    private static class ExtractStagingResultsCallable
    implements Callable<StagingResult> {
        private final Map<StagedFile, CompletionStage<ArtifactApi.ArtifactMetadata>> futures;

        private ExtractStagingResultsCallable(Map<StagedFile, CompletionStage<ArtifactApi.ArtifactMetadata>> futures) {
            this.futures = futures;
        }

        @Override
        public StagingResult call() {
            HashSet<ArtifactApi.ArtifactMetadata> metadata = new HashSet<ArtifactApi.ArtifactMetadata>();
            HashMap<StagedFile, Throwable> failures = new HashMap<StagedFile, Throwable>();
            for (Map.Entry<StagedFile, CompletionStage<ArtifactApi.ArtifactMetadata>> stagedFileResult : this.futures.entrySet()) {
                try {
                    metadata.add((ArtifactApi.ArtifactMetadata)MoreFutures.get(stagedFileResult.getValue()));
                }
                catch (ExecutionException ee) {
                    failures.put(stagedFileResult.getKey(), ee.getCause());
                }
                catch (InterruptedException ie) {
                    throw new AssertionError("This should never happen. All of the futures are complete by construction", ie);
                }
            }
            if (failures.isEmpty()) {
                return StagingResult.success(metadata);
            }
            return StagingResult.failure(failures);
        }
    }

    private class StagingCallable
    implements ThrowingSupplier<ArtifactApi.ArtifactMetadata> {
        private final String stagingSessionToken;
        private final StagedFile file;

        private StagingCallable(String stagingSessionToken, StagedFile file) {
            this.stagingSessionToken = stagingSessionToken;
            this.file = file;
        }

        public ArtifactApi.ArtifactMetadata get() throws Exception {
            PutArtifactResponseObserver responseObserver = new PutArtifactResponseObserver();
            StreamObserver requestObserver = ArtifactServiceStager.this.stub.putArtifact((StreamObserver)responseObserver);
            ArtifactApi.ArtifactMetadata metadata = ArtifactApi.ArtifactMetadata.newBuilder().setName(this.file.getStagingName()).build();
            ArtifactApi.PutArtifactMetadata putMetadata = ArtifactApi.PutArtifactMetadata.newBuilder().setMetadata(metadata).setStagingSessionToken(this.stagingSessionToken).build();
            requestObserver.onNext((Object)ArtifactApi.PutArtifactRequest.newBuilder().setMetadata(putMetadata).build());
            Hasher hasher = Hashing.sha256().newHasher();
            FileChannel channel = new FileInputStream(this.file.getFile()).getChannel();
            ByteBuffer readBuffer = ByteBuffer.allocate(ArtifactServiceStager.this.bufferSize);
            while (!responseObserver.isTerminal() && channel.position() < channel.size()) {
                readBuffer.clear();
                channel.read(readBuffer);
                readBuffer.flip();
                ByteString chunk = ByteString.copyFrom((ByteBuffer)readBuffer);
                hasher.putBytes(chunk.toByteArray());
                readBuffer.rewind();
                ArtifactApi.PutArtifactRequest request = ArtifactApi.PutArtifactRequest.newBuilder().setData(ArtifactApi.ArtifactChunk.newBuilder().setData(chunk).build()).build();
                requestObserver.onNext((Object)request);
            }
            requestObserver.onCompleted();
            responseObserver.awaitTermination();
            if (responseObserver.err.get() != null) {
                throw new RuntimeException((Throwable)responseObserver.err.get());
            }
            return metadata.toBuilder().setSha256(hasher.hash().toString()).build();
        }

        private class PutArtifactResponseObserver
        implements StreamObserver<ArtifactApi.PutArtifactResponse> {
            private final CountDownLatch completed = new CountDownLatch(1);
            private final AtomicReference<Throwable> err = new AtomicReference<Object>(null);

            private PutArtifactResponseObserver() {
            }

            public void onNext(ArtifactApi.PutArtifactResponse value) {
            }

            public void onError(Throwable t) {
                this.err.set(t);
                this.completed.countDown();
                throw new RuntimeException(t);
            }

            public void onCompleted() {
                this.completed.countDown();
            }

            public boolean isTerminal() {
                return this.completed.getCount() == 0L;
            }

            public void awaitTermination() throws InterruptedException {
                this.completed.await();
            }
        }
    }
}

