/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.processor;

import io.axoniq.axonserver.connector.control.ControlChannel;
import io.axoniq.axonserver.connector.control.ProcessorInstructionHandler;
import io.axoniq.axonserver.grpc.control.EventProcessorInfo;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.axonserver.connector.processor.TrackingEventProcessorInfoMessage;
import org.axonframework.config.EventProcessingConfiguration;
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.SubscribingEventProcessor;
import org.axonframework.eventhandling.TrackingEventProcessor;
import org.axonframework.lifecycle.StartHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventProcessorControlService {
    private static final Logger logger = LoggerFactory.getLogger(EventProcessorControlService.class);
    private static final String SUBSCRIBING_EVENT_PROCESSOR_MODE = "Subscribing";
    private static final String UNKNOWN_EVENT_PROCESSOR_MODE = "Unknown";
    private final AxonServerConnectionManager axonServerConnectionManager;
    private final EventProcessingConfiguration eventProcessingConfiguration;
    private final String context;

    public EventProcessorControlService(AxonServerConnectionManager axonServerConnectionManager, EventProcessingConfiguration eventProcessingConfiguration, AxonServerConfiguration axonServerConfiguration) {
        this(axonServerConnectionManager, eventProcessingConfiguration, axonServerConfiguration.getContext());
    }

    public EventProcessorControlService(AxonServerConnectionManager axonServerConnectionManager, EventProcessingConfiguration eventProcessingConfiguration, String context) {
        this.axonServerConnectionManager = axonServerConnectionManager;
        this.eventProcessingConfiguration = eventProcessingConfiguration;
        this.context = context;
    }

    @StartHandler(phase=0x40000009)
    public void start() {
        if (this.axonServerConnectionManager != null && this.eventProcessingConfiguration != null) {
            ControlChannel controlChannel = this.axonServerConnectionManager.getConnection(this.context).controlChannel();
            this.eventProcessingConfiguration.eventProcessors().forEach((name, processor) -> controlChannel.registerEventProcessor(name, this.infoSupplier((EventProcessor)processor), (ProcessorInstructionHandler)new AxonProcessorInstructionHandler((EventProcessor)processor, (String)name)));
        }
    }

    public Supplier<EventProcessorInfo> infoSupplier(EventProcessor processor) {
        if (processor instanceof TrackingEventProcessor) {
            return () -> TrackingEventProcessorInfoMessage.describe((TrackingEventProcessor)processor);
        }
        if (processor instanceof SubscribingEventProcessor) {
            return () -> this.subscribingProcessorInfo(processor);
        }
        return () -> this.unknownProcessorTypeInfo(processor);
    }

    private EventProcessorInfo subscribingProcessorInfo(EventProcessor eventProcessor) {
        return EventProcessorInfo.newBuilder().setProcessorName(eventProcessor.getName()).setMode(SUBSCRIBING_EVENT_PROCESSOR_MODE).build();
    }

    private EventProcessorInfo unknownProcessorTypeInfo(EventProcessor eventProcessor) {
        return EventProcessorInfo.newBuilder().setProcessorName(eventProcessor.getName()).setMode(UNKNOWN_EVENT_PROCESSOR_MODE).build();
    }

    private static class AxonProcessorInstructionHandler
    implements ProcessorInstructionHandler {
        private final EventProcessor processor;
        private final String name;

        public AxonProcessorInstructionHandler(EventProcessor processor, String name) {
            this.processor = processor;
            this.name = name;
        }

        public CompletableFuture<Boolean> releaseSegment(int segmentId) {
            try {
                if (!(this.processor instanceof TrackingEventProcessor)) {
                    logger.info("Release segment requested for processor [{}] which is not a Tracking Event Processor", (Object)this.name);
                    return CompletableFuture.completedFuture(false);
                }
                ((TrackingEventProcessor)this.processor).releaseSegment(segmentId);
            }
            catch (Exception e) {
                return this.exceptionallyCompletedFuture(e);
            }
            return CompletableFuture.completedFuture(true);
        }

        public CompletableFuture<Boolean> splitSegment(int segmentId) {
            try {
                if (!(this.processor instanceof TrackingEventProcessor)) {
                    logger.info("Split segment requested for processor [{}] which is not a Tracking Event Processor", (Object)this.name);
                    return CompletableFuture.completedFuture(false);
                }
                return ((TrackingEventProcessor)this.processor).splitSegment(segmentId).thenApply(result -> {
                    if (Boolean.TRUE.equals(result)) {
                        logger.info("Successfully split segment [{}] of processor [{}]", (Object)segmentId, (Object)this.name);
                    } else {
                        logger.warn("Was not able to split segment [{}] for processor [{}]", (Object)segmentId, (Object)this.name);
                    }
                    return result;
                });
            }
            catch (Exception e) {
                return this.exceptionallyCompletedFuture(e);
            }
        }

        public CompletableFuture<Boolean> mergeSegment(int segmentId) {
            try {
                if (!(this.processor instanceof TrackingEventProcessor)) {
                    logger.warn("Merge segment request received for processor [{}] which is not a Tracking Event Processor", (Object)this.name);
                    return CompletableFuture.completedFuture(false);
                }
                return ((TrackingEventProcessor)this.processor).mergeSegment(segmentId).thenApply(result -> {
                    if (Boolean.TRUE.equals(result)) {
                        logger.info("Successfully merged segment [{}] of processor [{}]", (Object)segmentId, (Object)this.name);
                    } else {
                        logger.warn("Was not able to merge segment [{}] for processor [{}]", (Object)segmentId, (Object)this.name);
                    }
                    return result;
                });
            }
            catch (Exception e) {
                return this.exceptionallyCompletedFuture(e);
            }
        }

        public CompletableFuture<Void> pauseProcessor() {
            try {
                this.processor.shutDown();
                return CompletableFuture.completedFuture(null);
            }
            catch (Exception e) {
                return this.exceptionallyCompletedFuture(e);
            }
        }

        public CompletableFuture<Void> startProcessor() {
            try {
                this.processor.start();
                return CompletableFuture.completedFuture(null);
            }
            catch (Exception e) {
                return this.exceptionallyCompletedFuture(e);
            }
        }

        private <T> CompletableFuture<T> exceptionallyCompletedFuture(Exception e) {
            CompletableFuture future = new CompletableFuture();
            future.completeExceptionally(e);
            return future;
        }
    }
}

