/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.transport.commandapi;

import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.PartitionListener;
import io.camunda.zeebe.broker.system.configuration.QueryApiCfg;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageListener;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiRequestHandler;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiService;
import io.camunda.zeebe.broker.transport.commandapi.CommandResponseWriterImpl;
import io.camunda.zeebe.broker.transport.queryapi.QueryApiRequestHandler;
import io.camunda.zeebe.engine.state.QueryService;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.stream.api.CommandResponseWriter;
import io.camunda.zeebe.transport.RequestHandler;
import io.camunda.zeebe.transport.RequestType;
import io.camunda.zeebe.transport.ServerOutput;
import io.camunda.zeebe.transport.ServerTransport;
import org.agrona.collections.IntHashSet;

public final class CommandApiServiceImpl
extends Actor
implements PartitionListener,
DiskSpaceUsageListener,
CommandApiService {
    private final ServerTransport serverTransport;
    private final CommandApiRequestHandler commandHandler;
    private final QueryApiRequestHandler queryHandler;
    private final IntHashSet leadPartitions = new IntHashSet();
    private final ActorSchedulingService scheduler;

    public CommandApiServiceImpl(ServerTransport serverTransport, ActorSchedulingService scheduler, QueryApiCfg queryApiCfg) {
        this.serverTransport = serverTransport;
        this.scheduler = scheduler;
        this.commandHandler = new CommandApiRequestHandler();
        this.queryHandler = new QueryApiRequestHandler(queryApiCfg);
    }

    public String getName() {
        return "CommandApiService";
    }

    protected void onActorStarting() {
        this.scheduler.submitActor((Actor)this.queryHandler);
        this.scheduler.submitActor((Actor)this.commandHandler);
    }

    protected void onActorClosing() {
        for (Integer leadPartition : this.leadPartitions) {
            this.removeLeaderHandlers(leadPartition);
        }
        this.leadPartitions.clear();
        this.actor.runOnCompletion(this.commandHandler.closeAsync(), (ok, error) -> {
            if (error != null) {
                Loggers.TRANSPORT_LOGGER.error("Error closing command api request handler", error);
            }
        });
        this.actor.runOnCompletion(this.queryHandler.closeAsync(), (ok, error) -> {
            if (error != null) {
                Loggers.TRANSPORT_LOGGER.warn("Failed to close query API request handler", error);
            }
        });
    }

    @Override
    public ActorFuture<Void> onBecomingFollower(int partitionId, long term) {
        return this.removeLeaderHandlersAsync(partitionId);
    }

    @Override
    public ActorFuture<Void> onBecomingLeader(int partitionId, long term, LogStream logStream, QueryService queryService) {
        CompletableActorFuture future = new CompletableActorFuture();
        this.actor.call(() -> {
            this.leadPartitions.add(partitionId);
            this.queryHandler.addPartition(partitionId, queryService);
            this.serverTransport.subscribe(partitionId, RequestType.QUERY, (RequestHandler)this.queryHandler);
            LogStreamWriter logStreamWriter = logStream.newLogStreamWriter();
            this.commandHandler.addPartition(partitionId, logStreamWriter);
            this.serverTransport.subscribe(partitionId, RequestType.COMMAND, (RequestHandler)this.commandHandler);
            future.complete(null);
        });
        return future;
    }

    @Override
    public ActorFuture<Void> onBecomingInactive(int partitionId, long term) {
        return this.removeLeaderHandlersAsync(partitionId);
    }

    private ActorFuture<Void> removeLeaderHandlersAsync(int partitionId) {
        return this.actor.call(() -> this.removeLeaderHandlers(partitionId));
    }

    private void removeLeaderHandlers(int partitionId) {
        this.commandHandler.removePartition(partitionId);
        this.queryHandler.removePartition(partitionId);
        this.cleanLeadingPartition(partitionId);
    }

    private void cleanLeadingPartition(int partitionId) {
        this.leadPartitions.remove(partitionId);
        this.removeForPartitionId(partitionId);
    }

    private void removeForPartitionId(int partitionId) {
        this.serverTransport.unsubscribe(partitionId, RequestType.COMMAND);
        this.serverTransport.unsubscribe(partitionId, RequestType.QUERY);
    }

    @Override
    public CommandResponseWriter newCommandResponseWriter() {
        return new CommandResponseWriterImpl((ServerOutput)this.serverTransport);
    }

    @Override
    public void onRecovered(int partitionId) {
        this.commandHandler.onRecovered(partitionId);
    }

    @Override
    public void onPaused(int partitionId) {
        this.commandHandler.onPaused(partitionId);
    }

    @Override
    public void onResumed(int partitionId) {
        this.commandHandler.onResumed(partitionId);
    }

    @Override
    public void onDiskSpaceNotAvailable() {
        this.actor.run(this.commandHandler::onDiskSpaceNotAvailable);
    }

    @Override
    public void onDiskSpaceAvailable() {
        this.actor.run(this.commandHandler::onDiskSpaceAvailable);
    }
}

