/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.state.AbstractChannelStateHandle;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Preconditions;

@Internal
public abstract class AbstractMergedChannelStateHandle<Info, ChannelHandle extends AbstractChannelStateHandle<Info>>
implements StateObject {
    private static final long serialVersionUID = 1L;
    protected final int subtaskIndex;
    protected final StreamStateHandle delegate;
    protected final long size;
    protected final byte[] serializedChannelOffsets;
    private volatile transient Collection<ChannelHandle> unmergedHandles;

    protected AbstractMergedChannelStateHandle(int subtaskIndex, StreamStateHandle delegate, long size, byte[] serializedChannelOffsets) {
        this.subtaskIndex = subtaskIndex;
        this.delegate = delegate;
        this.size = size;
        this.serializedChannelOffsets = serializedChannelOffsets;
    }

    protected AbstractMergedChannelStateHandle(Collection<ChannelHandle> handles) {
        Preconditions.checkState(handles != null && handles.size() > 0);
        Iterator<ChannelHandle> iterator = handles.iterator();
        AbstractChannelStateHandle handle = (AbstractChannelStateHandle)iterator.next();
        this.subtaskIndex = handle.getSubtaskIndex();
        this.delegate = handle.getDelegate();
        long sizeAcc = 0L;
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(bos);
        try {
            dos.writeInt(handles.size());
        }
        catch (IOException e) {
            throw new IllegalStateException("Can not serialize Channel offsets !", e);
        }
        sizeAcc += handle.getStateSize();
        this.serializeChannelOffsets(handle, dos);
        while (iterator.hasNext()) {
            handle = (AbstractChannelStateHandle)iterator.next();
            Preconditions.checkState(this.subtaskIndex == handle.getSubtaskIndex(), "All channel state handles to be converted to merged channel state handles must have the same subtask index !");
            Preconditions.checkState(this.delegate == handle.getDelegate(), "All channel state handles to be converted to merged channel state handles must have the same delegate StreamStateHandle !");
            sizeAcc += handle.getStateSize();
            this.serializeChannelOffsets(handle, dos);
        }
        this.size = sizeAcc;
        this.serializedChannelOffsets = bos.toByteArray();
        this.unmergedHandles = Collections.unmodifiableCollection(handles);
    }

    @Override
    public void discardState() throws Exception {
        this.delegate.discardState();
    }

    @Override
    public long getStateSize() {
        return this.size;
    }

    public int getSubtaskIndex() {
        return this.subtaskIndex;
    }

    public StreamStateHandle getDelegate() {
        return this.delegate;
    }

    public byte[] getSerializedChannelOffsets() {
        return this.serializedChannelOffsets;
    }

    public List<Info> getInfos() {
        this.ensureCacheUnmergedHandles();
        return this.unmergedHandles.stream().map(h -> h.getInfo()).collect(Collectors.toList());
    }

    protected abstract void writeInfo(Info var1, DataOutputStream var2) throws IOException;

    protected abstract Info readInfo(DataInputStream var1) throws IOException;

    protected abstract ChannelHandle createUnmergedHandle(int var1, StreamStateHandle var2, Info var3, long var4, List<Long> var6);

    protected Collection<ChannelHandle> getUnmergedHandles() {
        this.ensureCacheUnmergedHandles();
        return this.unmergedHandles;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void ensureCacheUnmergedHandles() {
        if (this.unmergedHandles == null) {
            AbstractMergedChannelStateHandle abstractMergedChannelStateHandle = this;
            synchronized (abstractMergedChannelStateHandle) {
                if (this.unmergedHandles == null) {
                    try {
                        this.restoreAndCacheUnmergedHandles();
                    }
                    catch (IOException e) {
                        throw new IllegalStateException("Corrupted serializedChannelOffsets !", e);
                    }
                }
            }
        }
    }

    private void serializeChannelOffsets(ChannelHandle handle, DataOutputStream dos) {
        try {
            this.writeInfo(((AbstractChannelStateHandle)handle).getInfo(), dos);
            dos.writeLong(((AbstractChannelStateHandle)handle).getStateSize());
            dos.writeInt(((AbstractChannelStateHandle)handle).getOffsets().size());
            for (Long offset : ((AbstractChannelStateHandle)handle).getOffsets()) {
                dos.writeLong(offset);
            }
        }
        catch (IOException e) {
            throw new IllegalStateException("Can not serialize Channel offsets !", e);
        }
    }

    private void restoreAndCacheUnmergedHandles() throws IOException {
        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(this.serializedChannelOffsets));
        int remainHandle = dis.readInt();
        ArrayList<ChannelHandle> handles = new ArrayList<ChannelHandle>(remainHandle);
        while (remainHandle-- > 0) {
            Info info = this.readInfo(dis);
            long stateSize = dis.readLong();
            int remainOffset = dis.readInt();
            ArrayList<Long> offsets = new ArrayList<Long>(remainOffset);
            while (remainOffset-- > 0) {
                offsets.add(dis.readLong());
            }
            handles.add(this.createUnmergedHandle(this.subtaskIndex, this.delegate, info, stateSize, offsets));
        }
        this.unmergedHandles = Collections.unmodifiableCollection(handles);
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        AbstractMergedChannelStateHandle that = (AbstractMergedChannelStateHandle)o;
        return this.subtaskIndex == that.subtaskIndex && this.delegate.equals(that.delegate) && this.size == that.size && Arrays.equals(this.serializedChannelOffsets, that.serializedChannelOffsets);
    }

    public int hashCode() {
        return Objects.hash(this.subtaskIndex, this.delegate, this.size, Arrays.hashCode(this.serializedChannelOffsets));
    }

    public String toString() {
        return "AbstractMergedChannelStateHandle{subtaskIndex=" + this.subtaskIndex + ", delegate=" + String.valueOf(this.delegate) + ", serializedChannelOffsets=" + Arrays.toString(this.serializedChannelOffsets) + ", size=" + this.size + "}";
    }
}

