/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.processor;

import io.axoniq.axonserver.connector.AxonServerConnection;
import io.axoniq.axonserver.connector.admin.AdminChannel;
import io.axoniq.axonserver.connector.control.ControlChannel;
import io.axoniq.axonserver.connector.control.ProcessorInstructionHandler;
import io.axoniq.axonserver.grpc.control.EventProcessorInfo;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.axonserver.connector.processor.StreamingEventProcessorInfoMessage;
import org.axonframework.common.FutureUtils;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.configuration.Configuration;
import org.axonframework.eventhandling.processors.EventProcessor;
import org.axonframework.eventhandling.processors.streaming.StreamingEventProcessor;
import org.axonframework.eventhandling.processors.streaming.token.store.TokenStore;
import org.axonframework.eventhandling.processors.subscribing.SubscribingEventProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventProcessorControlService {
    private static final Logger logger = LoggerFactory.getLogger(EventProcessorControlService.class);
    private static final String SUBSCRIBING_EVENT_PROCESSOR_MODE = "Subscribing";
    private static final String UNKNOWN_EVENT_PROCESSOR_MODE = "Unknown";
    protected final AxonServerConnectionManager axonServerConnectionManager;
    protected final Configuration eventProcessingConfiguration;
    protected final String context;
    protected final Map<String, AxonServerConfiguration.Eventhandling.ProcessorSettings> processorConfig;

    public EventProcessorControlService(AxonServerConnectionManager axonServerConnectionManager, Configuration eventProcessingConfiguration, AxonServerConfiguration axonServerConfiguration) {
        this(axonServerConnectionManager, eventProcessingConfiguration, axonServerConfiguration.getContext(), axonServerConfiguration.getEventhandling().getProcessors());
    }

    public EventProcessorControlService(AxonServerConnectionManager axonServerConnectionManager, Configuration eventProcessingConfiguration, String context, Map<String, AxonServerConfiguration.Eventhandling.ProcessorSettings> processorConfig) {
        this.axonServerConnectionManager = axonServerConnectionManager;
        this.eventProcessingConfiguration = eventProcessingConfiguration;
        this.context = context;
        this.processorConfig = processorConfig;
    }

    public void start() {
        if (this.axonServerConnectionManager == null || this.eventProcessingConfiguration == null) {
            return;
        }
        Map eventProcessors = (Map)this.eventProcessingConfiguration.getComponent(Map.class);
        AxonServerConnection connection = this.axonServerConnectionManager.getConnection(this.context);
        this.registerInstructionHandlers(connection, eventProcessors);
        this.setLoadBalancingStrategies(connection, eventProcessors.keySet());
    }

    private void setLoadBalancingStrategies(AxonServerConnection connection, Set<String> processorNames) {
        AdminChannel adminChannel = connection.adminChannel();
        Map<String, String> strategiesPerProcessor = this.processorConfig.entrySet().stream().filter(entry -> {
            if (!processorNames.contains(entry.getKey())) {
                logger.info("Event Processor [{}] is not a registered. Please check the name or register the Event Processor", entry.getKey());
                return false;
            }
            return true;
        }).collect(Collectors.toMap(Map.Entry::getKey, entry -> ((AxonServerConfiguration.Eventhandling.ProcessorSettings)entry.getValue()).getLoadBalancingStrategy()));
        strategiesPerProcessor.forEach((processorName, strategy) -> {
            Optional<String> optionalIdentifier = this.tokenStoreIdentifierFor((String)processorName);
            if (!optionalIdentifier.isPresent()) {
                logger.warn("Cannot find token store identifier for processor [{}]. Load balancing cannot be configured without this identifier.", processorName);
                return;
            }
            String tokenStoreIdentifier = optionalIdentifier.get();
            adminChannel.loadBalanceEventProcessor(processorName, tokenStoreIdentifier, strategy).whenComplete((r, e) -> {
                if (e == null) {
                    logger.debug("Successfully requested to load balance processor [{}] with strategy [{}].", processorName, strategy);
                    return;
                }
                logger.warn("Requesting to load balance processor [{}] with strategy [{}] failed.", new Object[]{processorName, strategy, e});
            });
            if (this.processorConfig.get(processorName).isAutomaticBalancing()) {
                adminChannel.setAutoLoadBalanceStrategy(processorName, tokenStoreIdentifier, strategy).whenComplete((r, e) -> {
                    if (e == null) {
                        logger.debug("Successfully requested to automatically balance processor [{}] with strategy [{}].", processorName, strategy);
                        return;
                    }
                    logger.warn("Requesting to automatically balance processor [{}] with strategy [{}] failed.", new Object[]{processorName, strategy, e});
                });
            }
        });
    }

    private Optional<String> tokenStoreIdentifierFor(String processorName) {
        TokenStore tokenStore = (TokenStore)this.eventProcessingConfiguration.getComponent(TokenStore.class);
        return (Optional)((TransactionManager)this.eventProcessingConfiguration.getComponent(TransactionManager.class)).fetchInTransaction(() -> (Optional)FutureUtils.joinAndUnwrap((CompletableFuture)tokenStore.retrieveStorageIdentifier(null)));
    }

    private void registerInstructionHandlers(AxonServerConnection connection, Map<String, EventProcessor> eventProcessors) {
        ControlChannel controlChannel = connection.controlChannel();
        eventProcessors.forEach((name, processor) -> controlChannel.registerEventProcessor(name, this.infoSupplier((EventProcessor)processor), (ProcessorInstructionHandler)new AxonProcessorInstructionHandler((EventProcessor)processor, (String)name)));
    }

    protected Supplier<EventProcessorInfo> infoSupplier(EventProcessor processor) {
        if (processor instanceof StreamingEventProcessor) {
            return () -> StreamingEventProcessorInfoMessage.describe((StreamingEventProcessor)processor);
        }
        if (processor instanceof SubscribingEventProcessor) {
            return () -> this.subscribingProcessorInfo(processor);
        }
        return () -> this.unknownProcessorTypeInfo(processor);
    }

    private EventProcessorInfo subscribingProcessorInfo(EventProcessor eventProcessor) {
        return EventProcessorInfo.newBuilder().setProcessorName(eventProcessor.name()).setMode(SUBSCRIBING_EVENT_PROCESSOR_MODE).setIsStreamingProcessor(false).build();
    }

    private EventProcessorInfo unknownProcessorTypeInfo(EventProcessor eventProcessor) {
        return EventProcessorInfo.newBuilder().setProcessorName(eventProcessor.name()).setMode(UNKNOWN_EVENT_PROCESSOR_MODE).setIsStreamingProcessor(false).build();
    }

    protected static class AxonProcessorInstructionHandler
    implements ProcessorInstructionHandler {
        private final EventProcessor processor;
        private final String name;

        public AxonProcessorInstructionHandler(EventProcessor processor, String name) {
            this.processor = processor;
            this.name = name;
        }

        public CompletableFuture<Boolean> releaseSegment(int segmentId) {
            try {
                if (!(this.processor instanceof StreamingEventProcessor)) {
                    logger.info("Release segment requested for processor [{}] which is not a Streaming Event Processor", (Object)this.name);
                    return CompletableFuture.completedFuture(false);
                }
                return ((StreamingEventProcessor)this.processor).releaseSegment(segmentId).thenApply(r -> true);
            }
            catch (Exception e) {
                return this.exceptionallyCompletedFuture(e);
            }
        }

        public CompletableFuture<Boolean> splitSegment(int segmentId) {
            try {
                if (!(this.processor instanceof StreamingEventProcessor)) {
                    logger.info("Split segment requested for processor [{}] which is not a Streaming Event Processor", (Object)this.name);
                    return CompletableFuture.completedFuture(false);
                }
                return ((StreamingEventProcessor)this.processor).splitSegment(segmentId).thenApply(result -> {
                    if (Boolean.TRUE.equals(result)) {
                        logger.info("Successfully split segment [{}] of processor [{}]", (Object)segmentId, (Object)this.name);
                    } else {
                        logger.warn("Was not able to split segment [{}] for processor [{}]", (Object)segmentId, (Object)this.name);
                    }
                    return result;
                });
            }
            catch (Exception e) {
                return this.exceptionallyCompletedFuture(e);
            }
        }

        public CompletableFuture<Boolean> mergeSegment(int segmentId) {
            try {
                if (!(this.processor instanceof StreamingEventProcessor)) {
                    logger.warn("Merge segment request received for processor [{}] which is not a Streaming Event Processor", (Object)this.name);
                    return CompletableFuture.completedFuture(false);
                }
                return ((StreamingEventProcessor)this.processor).mergeSegment(segmentId).thenApply(result -> {
                    if (Boolean.TRUE.equals(result)) {
                        logger.info("Successfully merged segment [{}] of processor [{}]", (Object)segmentId, (Object)this.name);
                    } else {
                        logger.warn("Was not able to merge segment [{}] for processor [{}]", (Object)segmentId, (Object)this.name);
                    }
                    return result;
                });
            }
            catch (Exception e) {
                return this.exceptionallyCompletedFuture(e);
            }
        }

        public CompletableFuture<Void> pauseProcessor() {
            return this.processor.shutdown();
        }

        public CompletableFuture<Void> startProcessor() {
            return this.processor.start();
        }

        private <T> CompletableFuture<T> exceptionallyCompletedFuture(Exception e) {
            CompletableFuture future = new CompletableFuture();
            future.completeExceptionally(e);
            return future;
        }
    }
}

