/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.sink.compactor.operator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
import org.apache.flink.connector.file.sink.compactor.FileCompactor;
import org.apache.flink.connector.file.sink.compactor.operator.CompactService;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorRequestSerializer;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
public class CompactorOperator
extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>>
implements OneInputStreamOperator<CompactorRequest, CommittableMessage<FileSinkCommittable>>,
BoundedOneInput,
CheckpointListener {
    private static final long SUBMITTED_ID = -1L;
    static final ListStateDescriptor<byte[]> REMAINING_REQUESTS_RAW_STATES_DESC = new ListStateDescriptor("remaining_requests_raw_state", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
    private final FileCompactStrategy strategy;
    private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer;
    private final FileCompactor fileCompactor;
    private final BucketWriter<?, String> bucketWriter;
    private transient CompactService compactService;
    private List<CompactorRequest> collectingRequests = new ArrayList<CompactorRequest>();
    private final TreeMap<Long, List<CompactorRequest>> checkpointRequests = new TreeMap();
    private final List<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> compactingRequests = new LinkedList<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>>();
    private ListState<Map<Long, List<CompactorRequest>>> remainingRequestsState;

    public CompactorOperator(FileCompactStrategy strategy, SimpleVersionedSerializer<FileSinkCommittable> committableSerializer, FileCompactor fileCompactor, BucketWriter<?, String> bucketWriter) {
        this.strategy = strategy;
        this.committableSerializer = committableSerializer;
        this.fileCompactor = fileCompactor;
        this.bucketWriter = bucketWriter;
    }

    public void open() throws Exception {
        super.open();
        this.compactService = new CompactService(this.strategy.getNumCompactThreads(), this.fileCompactor, this.bucketWriter);
        this.compactService.open();
        this.submitUntil(-1L);
    }

    public void processElement(StreamRecord<CompactorRequest> element) throws Exception {
        this.collectingRequests.add((CompactorRequest)element.getValue());
    }

    public void endInput() throws Exception {
        this.checkpointRequests.put(Long.MAX_VALUE, this.collectingRequests);
        this.collectingRequests = new ArrayList<CompactorRequest>();
        this.submitUntil(Long.MAX_VALUE);
        assert (this.checkpointRequests.isEmpty());
        this.getAllTasksFuture().join();
        this.emitCompacted(null);
        assert (this.compactingRequests.isEmpty());
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        this.submitUntil(checkpointId);
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        this.emitCompacted(checkpointId);
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.checkpointRequests.put(context.getCheckpointId(), this.collectingRequests);
        this.collectingRequests = new ArrayList<CompactorRequest>();
        HashMap<Long, List<CompactorRequest>> requests = new HashMap<Long, List<CompactorRequest>>(this.checkpointRequests);
        requests.computeIfAbsent(-1L, id -> new ArrayList()).addAll(this.compactingRequests.stream().map(r -> (CompactorRequest)r.f0).collect(Collectors.toList()));
        this.remainingRequestsState.update(Collections.singletonList(requests));
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.remainingRequestsState = new SimpleVersionedListState(context.getOperatorStateStore().getListState(REMAINING_REQUESTS_RAW_STATES_DESC), (SimpleVersionedSerializer)new RemainingRequestsSerializer(new CompactorRequestSerializer(this.committableSerializer)));
        Iterable stateRemaining = (Iterable)this.remainingRequestsState.get();
        if (stateRemaining != null) {
            for (Map requests : stateRemaining) {
                for (Map.Entry e : requests.entrySet()) {
                    List list = this.checkpointRequests.computeIfAbsent((Long)e.getKey(), (Function<Long, List<CompactorRequest>>)((Function<Long, List>)id -> new ArrayList()));
                    list.addAll((Collection)e.getValue());
                }
            }
        }
    }

    public void close() throws Exception {
        if (this.compactService != null) {
            this.compactService.close();
        }
    }

    private void submitUntil(long checkpointId) {
        NavigableMap<Long, List<CompactorRequest>> canSubmit = this.checkpointRequests.subMap(Long.MIN_VALUE, true, checkpointId, true);
        for (Map.Entry requestEntry : canSubmit.entrySet()) {
            for (CompactorRequest request : (List)requestEntry.getValue()) {
                CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = new CompletableFuture<Iterable<FileSinkCommittable>>();
                this.compactingRequests.add((Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>)new Tuple2((Object)request, resultFuture));
                this.compactService.submit(request, resultFuture);
            }
        }
        canSubmit.clear();
    }

    private void emitCompacted(@Nullable Long checkpointId) throws Exception {
        ArrayList<FileSinkCommittable> compacted = new ArrayList<FileSinkCommittable>();
        Iterator<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> iter = this.compactingRequests.iterator();
        while (iter.hasNext()) {
            Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>> compacting = iter.next();
            CompletableFuture future = (CompletableFuture)compacting.f1;
            if (!future.isDone()) continue;
            iter.remove();
            for (FileSinkCommittable c : (Iterable)future.get()) {
                compacted.add(c);
            }
        }
        if (compacted.isEmpty()) {
            return;
        }
        CommittableSummary summary = new CommittableSummary(this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(), checkpointId, compacted.size(), compacted.size(), 0);
        this.output.collect((Object)new StreamRecord((Object)summary));
        for (FileSinkCommittable c : compacted) {
            CommittableWithLineage comm = new CommittableWithLineage((Object)c, checkpointId, this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
            this.output.collect((Object)new StreamRecord((Object)comm));
        }
    }

    @VisibleForTesting
    public CompletableFuture<?> getAllTasksFuture() {
        return CompletableFuture.allOf((CompletableFuture[])this.compactingRequests.stream().map(r -> (CompletableFuture)r.f1).toArray(CompletableFuture[]::new));
    }

    static class RemainingRequestsSerializer
    implements SimpleVersionedSerializer<Map<Long, List<CompactorRequest>>> {
        private static final int MAGIC_NUMBER = -1454981501;
        private final CompactorRequestSerializer requestSerializer;

        RemainingRequestsSerializer(CompactorRequestSerializer requestSerializer) {
            this.requestSerializer = requestSerializer;
        }

        public int getVersion() {
            return 1;
        }

        public byte[] serialize(Map<Long, List<CompactorRequest>> remainingRequests) throws IOException {
            DataOutputSerializer out = new DataOutputSerializer(256);
            out.writeInt(-1454981501);
            this.serializeV1(remainingRequests, out);
            return out.getCopyOfBuffer();
        }

        public Map<Long, List<CompactorRequest>> deserialize(int version, byte[] serialized) throws IOException {
            DataInputDeserializer in = new DataInputDeserializer(serialized);
            switch (version) {
                case 1: {
                    RemainingRequestsSerializer.validateMagicNumber((DataInputView)in);
                    return this.deserializeV1(in);
                }
            }
            throw new IOException("Unrecognized version or corrupt state: " + version);
        }

        private void serializeV1(Map<Long, List<CompactorRequest>> request, DataOutputSerializer out) throws IOException {
            out.writeInt(request.size());
            for (Map.Entry<Long, List<CompactorRequest>> e : request.entrySet()) {
                out.writeLong(e.getKey().longValue());
                SimpleVersionedSerialization.writeVersionAndSerializeList((SimpleVersionedSerializer)this.requestSerializer, e.getValue(), (DataOutputView)out);
            }
        }

        private Map<Long, List<CompactorRequest>> deserializeV1(DataInputDeserializer in) throws IOException {
            int size = in.readInt();
            HashMap<Long, List<CompactorRequest>> requestMap = new HashMap<Long, List<CompactorRequest>>(size);
            for (int i = 0; i < size; ++i) {
                long cpId = in.readLong();
                List requests = SimpleVersionedSerialization.readVersionAndDeserializeList((SimpleVersionedSerializer)this.requestSerializer, (DataInputView)in);
                requestMap.put(cpId, requests);
            }
            return requestMap;
        }

        private static void validateMagicNumber(DataInputView in) throws IOException {
            int magicNumber = in.readInt();
            if (magicNumber != -1454981501) {
                throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
            }
        }
    }
}

