/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.runtime.operators.schema.coordinator;

import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.CreateTableEvent;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.sink.MetadataApplier;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaManager;
import com.ververica.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import com.ververica.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse;
import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@NotThreadSafe
public class SchemaRegistryRequestHandler {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistryRequestHandler.class);
    private final MetadataApplier metadataApplier;
    private final Set<Integer> activeSinkWriters;
    private final SchemaManager schemaManager;
    private final List<PendingSchemaChange> pendingSchemaChanges;
    private final Set<Integer> flushedSinkWriters;

    public SchemaRegistryRequestHandler(MetadataApplier metadataApplier, SchemaManager schemaManager) {
        this.metadataApplier = metadataApplier;
        this.activeSinkWriters = new HashSet<Integer>();
        this.flushedSinkWriters = new HashSet<Integer>();
        this.pendingSchemaChanges = new LinkedList<PendingSchemaChange>();
        this.schemaManager = schemaManager;
    }

    private void applySchemaChange(TableId tableId, SchemaChangeEvent changeEvent) {
        LOG.debug("Apply schema change {} to table {}.", (Object)changeEvent, (Object)tableId);
        this.metadataApplier.applySchemaChange(changeEvent);
    }

    public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(SchemaChangeRequest request) {
        if (this.pendingSchemaChanges.isEmpty()) {
            LOG.info("Received schema change event request from table {}. Start to buffer requests for others.", (Object)request.getTableId().toString());
            if (request.getSchemaChangeEvent() instanceof CreateTableEvent && this.schemaManager.schemaExists(request.getTableId())) {
                return CompletableFuture.completedFuture(CoordinationResponseUtils.wrap(new SchemaChangeResponse(false)));
            }
            CompletableFuture<CoordinationResponse> response = CompletableFuture.completedFuture(CoordinationResponseUtils.wrap(new SchemaChangeResponse(true)));
            this.schemaManager.applySchemaChange(request.getSchemaChangeEvent());
            this.pendingSchemaChanges.add(new PendingSchemaChange(request, response));
            this.pendingSchemaChanges.get(0).startToWaitForReleaseRequest();
            return response;
        }
        LOG.info("There are already processing requests. Wait for processing.");
        CompletableFuture<CoordinationResponse> response = new CompletableFuture<CoordinationResponse>();
        this.pendingSchemaChanges.add(new PendingSchemaChange(request, response));
        return response;
    }

    public CompletableFuture<CoordinationResponse> handleReleaseUpstreamRequest() {
        CompletableFuture<CoordinationResponse> response = this.pendingSchemaChanges.get(0).getResponseFuture();
        if (response.isDone()) {
            this.startNextSchemaChangeRequest();
        } else {
            this.pendingSchemaChanges.get(0).receiveReleaseRequest();
        }
        return response;
    }

    public void registerSinkWriter(int sinkSubtask) {
        LOG.info("Register sink subtask {}.", (Object)sinkSubtask);
        this.activeSinkWriters.add(sinkSubtask);
    }

    public void flushSuccess(TableId tableId, int sinkSubtask) {
        this.flushedSinkWriters.add(sinkSubtask);
        if (this.flushedSinkWriters.equals(this.activeSinkWriters)) {
            LOG.info("All sink subtask have flushed for table {}. Start to apply schema change.", (Object)tableId.toString());
            PendingSchemaChange waitFlushSuccess = this.pendingSchemaChanges.get(0);
            this.applySchemaChange(tableId, waitFlushSuccess.getChangeRequest().getSchemaChangeEvent());
            waitFlushSuccess.getResponseFuture().complete(CoordinationResponseUtils.wrap(new ReleaseUpstreamResponse()));
            if (RequestStatus.RECEIVED_RELEASE_REQUEST.equals((Object)waitFlushSuccess.getStatus())) {
                this.startNextSchemaChangeRequest();
            }
        }
    }

    private void startNextSchemaChangeRequest() {
        this.pendingSchemaChanges.remove(0);
        this.flushedSinkWriters.clear();
        while (!this.pendingSchemaChanges.isEmpty()) {
            PendingSchemaChange pendingSchemaChange = this.pendingSchemaChanges.get(0);
            SchemaChangeRequest request = pendingSchemaChange.changeRequest;
            if (request.getSchemaChangeEvent() instanceof CreateTableEvent && this.schemaManager.schemaExists(request.getTableId())) {
                pendingSchemaChange.getResponseFuture().complete(CoordinationResponseUtils.wrap(new SchemaChangeResponse(false)));
                this.pendingSchemaChanges.remove(0);
                continue;
            }
            this.schemaManager.applySchemaChange(request.getSchemaChangeEvent());
            pendingSchemaChange.getResponseFuture().complete(CoordinationResponseUtils.wrap(new SchemaChangeResponse(true)));
            pendingSchemaChange.startToWaitForReleaseRequest();
            break;
        }
    }

    static enum RequestStatus {
        PENDING,
        WAIT_RELEASE_REQUEST,
        RECEIVED_RELEASE_REQUEST;

    }

    private static class PendingSchemaChange {
        private final SchemaChangeRequest changeRequest;
        private CompletableFuture<CoordinationResponse> responseFuture;
        private RequestStatus status;

        public PendingSchemaChange(SchemaChangeRequest changeRequest, CompletableFuture<CoordinationResponse> responseFuture) {
            this.changeRequest = changeRequest;
            this.responseFuture = responseFuture;
            this.status = RequestStatus.PENDING;
        }

        public SchemaChangeRequest getChangeRequest() {
            return this.changeRequest;
        }

        public CompletableFuture<CoordinationResponse> getResponseFuture() {
            return this.responseFuture;
        }

        public RequestStatus getStatus() {
            return this.status;
        }

        public void startToWaitForReleaseRequest() {
            if (!this.responseFuture.isDone()) {
                throw new IllegalStateException("Cannot start to wait for flush success before the SchemaChangeRequest is done.");
            }
            this.responseFuture = new CompletableFuture();
            this.status = RequestStatus.WAIT_RELEASE_REQUEST;
        }

        public void receiveReleaseRequest() {
            this.status = RequestStatus.RECEIVED_RELEASE_REQUEST;
        }
    }
}

