/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.transport.commandapi;

import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.transport.AsyncApiRequestHandler;
import io.camunda.zeebe.broker.transport.ErrorResponseWriter;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiRequestReader;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiResponseWriter;
import io.camunda.zeebe.logstreams.log.LogAppendEntry;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.log.WriteContext;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.protocol.record.ExecuteCommandRequestDecoder;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.util.Either;
import java.util.HashMap;
import java.util.Map;
import org.agrona.collections.Int2ObjectHashMap;
import org.slf4j.Logger;

final class CommandApiRequestHandler
extends AsyncApiRequestHandler<CommandApiRequestReader, CommandApiResponseWriter> {
    private static final Logger LOG = Loggers.TRANSPORT_LOGGER;
    private final Int2ObjectHashMap<LogStreamWriter> leadingStreams = new Int2ObjectHashMap();
    private boolean isDiskSpaceAvailable = true;
    private final Map<Integer, Boolean> processingPaused = new HashMap<Integer, Boolean>();

    CommandApiRequestHandler() {
        super(CommandApiRequestReader::new, CommandApiResponseWriter::new);
    }

    @Override
    protected ActorFuture<Either<ErrorResponseWriter, CommandApiResponseWriter>> handleAsync(int partitionId, long requestId, CommandApiRequestReader requestReader, CommandApiResponseWriter responseWriter, ErrorResponseWriter errorWriter) {
        return CompletableActorFuture.completed(this.handle(partitionId, requestId, requestReader, responseWriter, errorWriter));
    }

    public void onRecovered(int partitionId) {
        this.actor.run(() -> this.processingPaused.put(partitionId, false));
    }

    public void onPaused(int partitionId) {
        this.actor.run(() -> this.processingPaused.put(partitionId, true));
    }

    public void onResumed(int partitionId) {
        this.actor.run(() -> this.processingPaused.put(partitionId, false));
    }

    private Either<ErrorResponseWriter, CommandApiResponseWriter> handle(int partitionId, long requestId, CommandApiRequestReader requestReader, CommandApiResponseWriter responseWriter, ErrorResponseWriter errorWriter) {
        return this.handleExecuteCommandRequest(partitionId, requestId, requestReader, responseWriter, errorWriter);
    }

    private Either<ErrorResponseWriter, CommandApiResponseWriter> handleExecuteCommandRequest(int partitionId, long requestId, CommandApiRequestReader reader, CommandApiResponseWriter responseWriter, ErrorResponseWriter errorWriter) {
        if (!this.isDiskSpaceAvailable) {
            return Either.left((Object)errorWriter.outOfDiskSpace(partitionId));
        }
        if (this.processingPaused.getOrDefault(partitionId, false).booleanValue()) {
            return Either.left((Object)errorWriter.internalError("Processing paused for partition '%s'", partitionId));
        }
        ExecuteCommandRequestDecoder command = reader.getMessageDecoder();
        LogStreamWriter logStreamWriter = (LogStreamWriter)this.leadingStreams.get(partitionId);
        ValueType valueType = command.valueType();
        Intent intent = Intent.fromProtocolValue((ValueType)valueType, (short)command.intent());
        UnifiedRecordValue value = reader.value();
        long operationReference = command.operationReference();
        RecordMetadata metadata = reader.metadata();
        metadata.requestId(requestId);
        metadata.requestStreamId(partitionId);
        metadata.recordType(RecordType.COMMAND);
        metadata.intent(intent);
        metadata.valueType(valueType);
        metadata.operationReference(operationReference);
        if (logStreamWriter == null) {
            errorWriter.partitionLeaderMismatch(partitionId);
            return Either.left((Object)errorWriter);
        }
        if (value == null) {
            errorWriter.unsupportedMessage(valueType.name(), CommandApiRequestReader.RECORDS_BY_TYPE.keySet().toArray());
            return Either.left((Object)errorWriter);
        }
        try {
            return this.writeCommand(command.key(), metadata, value, logStreamWriter, errorWriter, partitionId).map(b -> responseWriter).mapLeft(failure -> errorWriter);
        }
        catch (Exception error) {
            String errorMessage = "Failed to write client request to partition '%d', %s".formatted(partitionId, error);
            LOG.error(errorMessage);
            return Either.left((Object)errorWriter.internalError(errorMessage, new Object[0]));
        }
    }

    private Either<ErrorResponseWriter, Boolean> writeCommand(long key, RecordMetadata metadata, UnifiedRecordValue value, LogStreamWriter logStreamWriter, ErrorResponseWriter errorWriter, int partitionId) {
        LogAppendEntry appendEntry = key != ExecuteCommandRequestDecoder.keyNullValue() ? LogAppendEntry.of((long)key, (RecordMetadata)metadata, (UnifiedRecordValue)value) : LogAppendEntry.of((RecordMetadata)metadata, (UnifiedRecordValue)value);
        if (logStreamWriter.canWriteEvents(1, appendEntry.getLength())) {
            return logStreamWriter.tryWrite(WriteContext.userCommand((Intent)metadata.getIntent()), appendEntry).map(ignore -> true).mapLeft(error -> errorWriter.mapWriteError(partitionId, (LogStreamWriter.WriteFailure)error));
        }
        return Either.left((Object)errorWriter.errorCode(ErrorCode.MALFORMED_REQUEST).errorMessage("Request size is above configured maxMessageSize."));
    }

    void addPartition(int partitionId, LogStreamWriter logStreamWriter) {
        this.actor.submit(() -> this.leadingStreams.put(partitionId, (Object)logStreamWriter));
    }

    void removePartition(int partitionId) {
        this.actor.submit(() -> this.leadingStreams.remove(partitionId));
    }

    void onDiskSpaceNotAvailable() {
        this.actor.submit(() -> {
            this.isDiskSpaceAvailable = false;
            LOG.debug("Broker is out of disk space. All client requests will be rejected");
        });
    }

    void onDiskSpaceAvailable() {
        this.actor.submit(() -> {
            this.isDiskSpaceAvailable = true;
        });
    }
}

