/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geode.management.internal.cli.commands;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.configuration.CacheConfig;
import org.apache.geode.cache.configuration.DeclarableType;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.management.DistributedSystemMXBean;
import org.apache.geode.management.cli.CliMetaData;
import org.apache.geode.management.cli.SingleGfshCommand;
import org.apache.geode.management.internal.cli.AbstractCliAroundInterceptor;
import org.apache.geode.management.internal.cli.GfshParseResult;
import org.apache.geode.management.internal.cli.functions.GatewaySenderCreateFunction;
import org.apache.geode.management.internal.cli.functions.GatewaySenderFunctionArgs;
import org.apache.geode.management.internal.cli.result.model.ResultModel;
import org.apache.geode.management.internal.functions.CliFunctionResult;
import org.apache.geode.management.internal.security.ResourceOperation;
import org.apache.geode.security.ResourcePermission;
import org.apache.logging.log4j.Logger;
import org.springframework.shell.standard.ShellMethod;
import org.springframework.shell.standard.ShellOption;

public class CreateGatewaySenderCommand
extends SingleGfshCommand {
    private static final Logger logger = LogService.getLogger();
    private static final int MBEAN_CREATION_WAIT_TIME = 10000;

    @ShellMethod(value="Create the Gateway Sender on a member or members.", key={"create gateway-sender"})
    @CliMetaData(relatedTopic={"WAN"}, interceptor="org.apache.geode.management.internal.cli.commands.CreateGatewaySenderCommand$Interceptor")
    @ResourceOperation(resource=ResourcePermission.Resource.CLUSTER, operation=ResourcePermission.Operation.MANAGE, target=ResourcePermission.Target.GATEWAY)
    public ResultModel createGatewaySender(@ShellOption(value={"id"}, help="Id of the GatewaySender.") String id, @ShellOption(value={"remote-distributed-system-id"}, help="Id of the remote distributed system to which the sender will send events.") Integer remoteDistributedSystemId, @ShellOption(value={"group", "groups"}, help="Group(s) of members on which to create the Gateway Sender.") String[] onGroups, @ShellOption(value={"member", "members"}, help="Name/Id of the member on which to create the Gateway Sender.") String[] onMember, @ShellOption(value={"group-transaction-events"}, defaultValue="false", help="Ensure that all the events of a transaction are sent in the same batch, i.e., they are never spread across different batches.\nOnly allowed on serial gateway senders with 1 dispatcher thread or on parallel ones.\nNote that in order to work for a transaction, the regions to which the transaction \nevents belong must be replicated by the same set of senders with this flag enabled.") boolean groupTransactionEvents, @ShellOption(value={"parallel"}, defaultValue="false", help="Whether this is Parallel GatewaySender.") boolean parallel, @ShellOption(value={"manual-start"}, defaultValue="false", help="Whether manual start is to be enabled or the sender will start automatically after creation.\nDeprecated: Since Geode 1.4. Manual start of senders is deprecated and will be removed in a later release.") Boolean manualStart, @ShellOption(value={"socket-buffer-size"}, help="The buffer size of the socket connection between this GatewaySender and its receiving GatewayReceiver.") Integer socketBufferSize, @ShellOption(value={"socket-read-timeout"}, help="The amount of time in milliseconds that a socket read between a sending GatewaySender and its receiving GatewayReceiver will block.") Integer socketReadTimeout, @ShellOption(value={"enable-batch-conflation"}, defaultValue="false", help="Whether batch conflation is to be enabled for a GatewaySender.") Boolean enableBatchConflation, @ShellOption(value={"batch-size"}, help="The batch size for the GatewaySender.") Integer batchSize, @ShellOption(value={"batch-time-interval"}, help="The batch time interval for the GatewaySender.") Integer batchTimeInterval, @ShellOption(value={"enable-persistence"}, defaultValue="false", help="Whether persistence is to be enabled for the GatewaySender.") Boolean enablePersistence, @ShellOption(value={"disk-store-name"}, help="The disk store name to be configured for overflow or persistence.") String diskStoreName, @ShellOption(value={"disk-synchronous"}, defaultValue="true", help="Whether writes to the disk in case of persistence are synchronous.") Boolean diskSynchronous, @ShellOption(value={"maximum-queue-memory"}, help="The maximum amount of memory (in MB) for a GatewaySender's queue.") Integer maxQueueMemory, @ShellOption(value={"alert-threshold"}, help="The alert threshold for entries in a GatewaySender's queue.") Integer alertThreshold, @ShellOption(value={"dispatcher-threads"}, help="The number of dispatcher threads working for this GatewaySender. When dispatcher threads is set to > 1, appropriate order policy is required to be set.") Integer dispatcherThreads, @ShellOption(value={"order-policy"}, help="The order policy followed while dispatching the events to remote distributed system. Order policy is set only when dispatcher threads are > 1. Possible values are 'THREAD', 'KEY', 'PARTITION'.") GatewaySender.OrderPolicy orderPolicy, @ShellOption(value={"gateway-event-filter"}, help="The list of fully qualified class names of GatewayEventFilters (separated by comma) to be associated with the GatewaySender. This serves as a callback for users to filter out events before dispatching to remote distributed system. e.g gateway-event-filter=com.user.filters.MyFilter1,com.user.filters.MyFilters2") String[] gatewayEventFilters, @ShellOption(value={"gateway-transport-filter"}, help="The fully qualified class name of GatewayTransportFilter to be added to the GatewaySender. ") String[] gatewayTransportFilter, @ShellOption(value={"enforce-threads-connect-same-receiver"}, defaultValue="false", help="Whether or not the sender threads have to verify the receiver member id to verify if they are connected to the same server.") Boolean enforceThreadsConnectSameReceiver) {
        CacheConfig.GatewaySender configuration = this.buildConfiguration(id, remoteDistributedSystemId, parallel, manualStart, socketBufferSize, socketReadTimeout, enableBatchConflation, batchSize, batchTimeInterval, enablePersistence, diskStoreName, diskSynchronous, maxQueueMemory, alertThreshold, dispatcherThreads, orderPolicy == null ? null : orderPolicy.name(), gatewayEventFilters, gatewayTransportFilter, groupTransactionEvents, enforceThreadsConnectSameReceiver);
        GatewaySenderFunctionArgs gatewaySenderFunctionArgs = new GatewaySenderFunctionArgs(configuration);
        Set<DistributedMember> membersToCreateGatewaySenderOn = this.getMembers(onGroups, onMember);
        if (!this.verifyAllCurrentVersion(membersToCreateGatewaySenderOn)) {
            return ResultModel.createError("Gateway Sender cannot be created until all members are the current version");
        }
        List<CliFunctionResult> gatewaySenderCreateResults = this.executeAndGetFunctionResult((Function<?>)GatewaySenderCreateFunction.INSTANCE, gatewaySenderFunctionArgs, membersToCreateGatewaySenderOn);
        ResultModel resultModel = ResultModel.createMemberStatusResult(gatewaySenderCreateResults);
        resultModel.setConfigObject(configuration);
        if (!this.waitForGatewaySenderMBeanCreation(id, membersToCreateGatewaySenderOn)) {
            resultModel.addInfo().addLine("Did not complete waiting for GatewaySenderMBean proxy creation");
        }
        return resultModel;
    }

    @VisibleForTesting
    boolean waitForGatewaySenderMBeanCreation(String id, Set<DistributedMember> membersToCreateGatewaySenderOn) {
        DistributedSystemMXBean dsMXBean = this.getManagementService().getDistributedSystemMXBean();
        return this.poll(10000L, TimeUnit.MILLISECONDS, () -> membersToCreateGatewaySenderOn.stream().allMatch(m -> CreateGatewaySenderCommand.gatewaySenderBeanExists(dsMXBean, m.getName(), id)));
    }

    static boolean gatewaySenderBeanExists(DistributedSystemMXBean dsMXBean, String member, String id) {
        try {
            dsMXBean.fetchGatewaySenderObjectName(member, id);
            return true;
        }
        catch (Exception e) {
            if (!e.getMessage().toLowerCase().contains("not found")) {
                logger.warn("Unable to retrieve GatewaySender ObjectName for member: {}, id: {} - {}", (Object)member, (Object)id, (Object)e.getMessage());
            }
            return false;
        }
    }

    @Override
    public boolean updateConfigForGroup(String group, CacheConfig config, Object configObject) {
        config.getGatewaySenders().add((CacheConfig.GatewaySender)configObject);
        return true;
    }

    private boolean verifyAllCurrentVersion(Set<DistributedMember> members) {
        return members.stream().allMatch(member -> ((InternalDistributedMember)member).getVersion().equals(KnownVersion.CURRENT));
    }

    private CacheConfig.GatewaySender buildConfiguration(String id, Integer remoteDSId, Boolean parallel, Boolean manualStart, Integer socketBufferSize, Integer socketReadTimeout, Boolean enableBatchConflation, Integer batchSize, Integer batchTimeInterval, Boolean enablePersistence, String diskStoreName, Boolean diskSynchronous, Integer maxQueueMemory, Integer alertThreshold, Integer dispatcherThreads, String orderPolicy, String[] gatewayEventFilters, String[] gatewayTransportFilters, Boolean groupTransactionEvents, Boolean enforceThreadsConnectSameReceiver) {
        CacheConfig.GatewaySender sender = new CacheConfig.GatewaySender();
        sender.setId(id);
        sender.setRemoteDistributedSystemId(this.int2string(remoteDSId));
        sender.setParallel(parallel);
        sender.setManualStart(manualStart);
        sender.setSocketBufferSize(this.int2string(socketBufferSize));
        sender.setSocketReadTimeout(this.int2string(socketReadTimeout));
        sender.setEnableBatchConflation(enableBatchConflation);
        sender.setBatchSize(this.int2string(batchSize));
        sender.setBatchTimeInterval(this.int2string(batchTimeInterval));
        sender.setEnablePersistence(enablePersistence);
        sender.setDiskStoreName(diskStoreName);
        sender.setDiskSynchronous(diskSynchronous);
        sender.setMaximumQueueMemory(this.int2string(maxQueueMemory));
        sender.setAlertThreshold(this.int2string(alertThreshold));
        sender.setDispatcherThreads(this.int2string(dispatcherThreads));
        sender.setOrderPolicy(orderPolicy);
        sender.setGroupTransactionEvents(groupTransactionEvents);
        if (gatewayEventFilters != null) {
            sender.getGatewayEventFilters().addAll(this.stringsToDeclarableTypes(gatewayEventFilters));
        }
        if (gatewayTransportFilters != null) {
            sender.getGatewayTransportFilters().addAll(this.stringsToDeclarableTypes(gatewayTransportFilters));
        }
        sender.setEnforceThreadsConnectSameReceiver(enforceThreadsConnectSameReceiver);
        return sender;
    }

    private List<DeclarableType> stringsToDeclarableTypes(String[] objects) {
        return Arrays.stream(objects).map(fullyQualifiedClassName -> {
            DeclarableType thisFilter = new DeclarableType();
            thisFilter.setClassName(fullyQualifiedClassName);
            return thisFilter;
        }).collect(Collectors.toList());
    }

    private String int2string(Integer i) {
        return Optional.ofNullable(i).map(String::valueOf).orElse(null);
    }

    public static class Interceptor
    extends AbstractCliAroundInterceptor {
        @Override
        public ResultModel preExecution(GfshParseResult parseResult) {
            Boolean parallel = (Boolean)parseResult.getParamValue("parallel");
            GatewaySender.OrderPolicy orderPolicy = (GatewaySender.OrderPolicy)parseResult.getParamValue("order-policy");
            Integer dispatcherThreads = (Integer)parseResult.getParamValue("dispatcher-threads");
            Boolean groupTransactionEvents = (Boolean)parseResult.getParamValue("group-transaction-events");
            Boolean batchConflationEnabled = (Boolean)parseResult.getParamValue("enable-batch-conflation");
            Boolean enforceThreadsConnectSameReceiver = (Boolean)parseResult.getParamValue("enforce-threads-connect-same-receiver");
            if (!parallel.booleanValue() && dispatcherThreads != null && dispatcherThreads > 1 && orderPolicy == null) {
                return ResultModel.createError("Must specify --order-policy when --dispatcher-threads is larger than 1.");
            }
            if (parallel.booleanValue() && orderPolicy == GatewaySender.OrderPolicy.THREAD) {
                return ResultModel.createError("Parallel Gateway Sender can not be created with THREAD OrderPolicy");
            }
            if (!parallel.booleanValue() && dispatcherThreads != null && dispatcherThreads > 1 && groupTransactionEvents.booleanValue()) {
                return ResultModel.createError("Serial Gateway Sender cannot be created with --group-transaction-events when --dispatcher-threads is greater than 1.");
            }
            if (groupTransactionEvents.booleanValue() && batchConflationEnabled.booleanValue()) {
                return ResultModel.createError("Gateway Sender cannot be created with both --group-transaction-events and --enable-batch-conflation.");
            }
            if (parallel.booleanValue() && enforceThreadsConnectSameReceiver.booleanValue()) {
                return ResultModel.createError("Option --enforce-threads-connect-same-receiver only applies to serial gateway senders.");
            }
            return ResultModel.createInfo("");
        }
    }
}

