/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.gateway.workflow;

import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.CreateEmbeddedSchedulerWorkflowHeaders;
import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.DeleteEmbeddedSchedulerWorkflowHeaders;
import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.ResumeEmbeddedSchedulerWorkflowHeaders;
import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.SuspendEmbeddedSchedulerWorkflowHeaders;
import org.apache.flink.table.gateway.rest.message.materializedtable.scheduler.CreateEmbeddedSchedulerWorkflowRequestBody;
import org.apache.flink.table.gateway.rest.message.materializedtable.scheduler.CreateEmbeddedSchedulerWorkflowResponseBody;
import org.apache.flink.table.gateway.rest.message.materializedtable.scheduler.EmbeddedSchedulerWorkflowRequestBody;
import org.apache.flink.table.gateway.rest.message.materializedtable.scheduler.ResumeEmbeddedSchedulerWorkflowRequestBody;
import org.apache.flink.table.gateway.workflow.EmbeddedRefreshHandler;
import org.apache.flink.table.gateway.workflow.EmbeddedRefreshHandlerSerializer;
import org.apache.flink.table.workflow.CreatePeriodicRefreshWorkflow;
import org.apache.flink.table.workflow.CreateRefreshWorkflow;
import org.apache.flink.table.workflow.DeleteRefreshWorkflow;
import org.apache.flink.table.workflow.ModifyRefreshWorkflow;
import org.apache.flink.table.workflow.ResumeRefreshWorkflow;
import org.apache.flink.table.workflow.SuspendRefreshWorkflow;
import org.apache.flink.table.workflow.WorkflowException;
import org.apache.flink.table.workflow.WorkflowScheduler;
import org.apache.flink.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class EmbeddedWorkflowScheduler
implements WorkflowScheduler<EmbeddedRefreshHandler> {
    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedWorkflowScheduler.class);
    private final Configuration configuration;
    private final String restAddress;
    private final int port;
    private RestClient restClient;

    public EmbeddedWorkflowScheduler(Configuration configuration) {
        this.configuration = configuration;
        this.restAddress = (String)configuration.get(RestOptions.ADDRESS);
        this.port = (Integer)configuration.get(RestOptions.PORT);
    }

    public void open() throws WorkflowException {
        try {
            this.restClient = new RestClient(this.configuration, Executors.directExecutor());
        }
        catch (Exception e) {
            throw new WorkflowException("Could not create RestClient to connect to embedded scheduler.", (Throwable)e);
        }
    }

    public void close() throws WorkflowException {
        if (this.restClient != null) {
            this.restClient.closeAsync();
        }
    }

    public EmbeddedRefreshHandlerSerializer getRefreshHandlerSerializer() {
        return EmbeddedRefreshHandlerSerializer.INSTANCE;
    }

    public EmbeddedRefreshHandler createRefreshWorkflow(CreateRefreshWorkflow createRefreshWorkflow) throws WorkflowException {
        if (createRefreshWorkflow instanceof CreatePeriodicRefreshWorkflow) {
            CreateEmbeddedSchedulerWorkflowResponseBody responseBody;
            CreatePeriodicRefreshWorkflow periodicRefreshWorkflow = (CreatePeriodicRefreshWorkflow)createRefreshWorkflow;
            ObjectIdentifier materializedTableIdentifier = periodicRefreshWorkflow.getMaterializedTableIdentifier();
            CreateEmbeddedSchedulerWorkflowRequestBody requestBody = new CreateEmbeddedSchedulerWorkflowRequestBody(materializedTableIdentifier.asSerializableString(), periodicRefreshWorkflow.getCronExpression(), periodicRefreshWorkflow.getInitConfig(), periodicRefreshWorkflow.getExecutionConfig(), periodicRefreshWorkflow.getRestEndpointUrl());
            try {
                responseBody = (CreateEmbeddedSchedulerWorkflowResponseBody)this.restClient.sendRequest(this.restAddress, this.port, (MessageHeaders)CreateEmbeddedSchedulerWorkflowHeaders.getInstance(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)requestBody).get(30L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                LOG.error("Failed to create periodic refresh workflow for materialized table {}.", (Object)materializedTableIdentifier, (Object)e);
                throw new WorkflowException(String.format("Failed to create periodic refresh workflow for materialized table %s.", materializedTableIdentifier), (Throwable)e);
            }
            return new EmbeddedRefreshHandler(responseBody.getWorkflowName(), responseBody.getWorkflowGroup());
        }
        LOG.error("Unsupported create refresh workflow type {}.", (Object)createRefreshWorkflow.getClass().getSimpleName());
        throw new WorkflowException(String.format("Unsupported create refresh workflow type %s.", createRefreshWorkflow.getClass().getSimpleName()));
    }

    public void modifyRefreshWorkflow(ModifyRefreshWorkflow<EmbeddedRefreshHandler> modifyRefreshWorkflow) throws WorkflowException {
        EmbeddedRefreshHandler embeddedRefreshHandler = (EmbeddedRefreshHandler)modifyRefreshWorkflow.getRefreshHandler();
        if (modifyRefreshWorkflow instanceof SuspendRefreshWorkflow) {
            EmbeddedSchedulerWorkflowRequestBody suspendRequestBody = new EmbeddedSchedulerWorkflowRequestBody(embeddedRefreshHandler.getWorkflowName(), embeddedRefreshHandler.getWorkflowGroup());
            try {
                this.restClient.sendRequest(this.restAddress, this.port, (MessageHeaders)SuspendEmbeddedSchedulerWorkflowHeaders.getInstance(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)suspendRequestBody).get(30L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                LOG.error("Failed to suspend refresh workflow {}.", (Object)embeddedRefreshHandler.asSummaryString(), (Object)e);
                throw new WorkflowException(String.format("Failed to suspend refresh workflow %s.", embeddedRefreshHandler.asSummaryString()), (Throwable)e);
            }
        } else if (modifyRefreshWorkflow instanceof ResumeRefreshWorkflow) {
            ResumeEmbeddedSchedulerWorkflowRequestBody requestBody = new ResumeEmbeddedSchedulerWorkflowRequestBody(embeddedRefreshHandler.getWorkflowName(), embeddedRefreshHandler.getWorkflowGroup(), ((ResumeRefreshWorkflow)modifyRefreshWorkflow).getDynamicOptions());
            try {
                this.restClient.sendRequest(this.restAddress, this.port, (MessageHeaders)ResumeEmbeddedSchedulerWorkflowHeaders.getInstance(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)requestBody).get(30L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                LOG.error("Failed to resume refresh workflow {}.", (Object)embeddedRefreshHandler.asSummaryString(), (Object)e);
                throw new WorkflowException(String.format("Failed to resume refresh workflow %s.", embeddedRefreshHandler.asSummaryString()), (Throwable)e);
            }
        } else {
            LOG.error("Unsupported modify refresh workflow type {}.", (Object)modifyRefreshWorkflow.getClass().getSimpleName());
            throw new WorkflowException(String.format("Unsupported modify refresh workflow type %s.", modifyRefreshWorkflow.getClass().getSimpleName()));
        }
    }

    public void deleteRefreshWorkflow(DeleteRefreshWorkflow<EmbeddedRefreshHandler> deleteRefreshWorkflow) throws WorkflowException {
        EmbeddedRefreshHandler embeddedRefreshHandler = (EmbeddedRefreshHandler)deleteRefreshWorkflow.getRefreshHandler();
        EmbeddedSchedulerWorkflowRequestBody requestBody = new EmbeddedSchedulerWorkflowRequestBody(embeddedRefreshHandler.getWorkflowName(), embeddedRefreshHandler.getWorkflowGroup());
        try {
            this.restClient.sendRequest(this.restAddress, this.port, (MessageHeaders)DeleteEmbeddedSchedulerWorkflowHeaders.getInstance(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)requestBody).get(30L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            LOG.error("Failed to delete refresh workflow {}.", (Object)embeddedRefreshHandler.asSummaryString(), (Object)e);
            throw new WorkflowException(String.format("Failed to delete refresh workflow %s.", embeddedRefreshHandler.asSummaryString()), (Throwable)e);
        }
    }
}

