/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.mongo.documentdb;

import com.google.common.base.Preconditions;
import io.micrometer.core.instrument.Counter;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginConfigObserver;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.mongo.configuration.CollectionConfig;
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
import org.opensearch.dataprepper.plugins.mongo.export.ExportScheduler;
import org.opensearch.dataprepper.plugins.mongo.export.ExportWorker;
import org.opensearch.dataprepper.plugins.mongo.export.MongoDBExportPartitionSupplier;
import org.opensearch.dataprepper.plugins.mongo.stream.StreamScheduler;
import org.opensearch.dataprepper.plugins.mongo.utils.DocumentDBSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoTasksRefresher
implements PluginConfigObserver<MongoDBSourceConfig> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoTasksRefresher.class);
    static final String CREDENTIALS_CHANGED = "credentialsChanged";
    static final String EXECUTOR_REFRESH_ERRORS = "executorRefreshErrors";
    private final EnhancedSourceCoordinator sourceCoordinator;
    private final PluginMetrics pluginMetrics;
    private final AcknowledgementSetManager acknowledgementSetManager;
    private final Buffer<Record<Event>> buffer;
    private final Function<Integer, ExecutorService> executorServiceFunction;
    private final Counter credentialsChangeCounter;
    private final Counter executorRefreshErrorsCounter;
    private final String s3PathPrefix;
    private final DocumentDBSourceAggregateMetrics documentDBAggregateMetrics;
    private MongoDBExportPartitionSupplier currentMongoDBExportPartitionSupplier;
    private MongoDBSourceConfig currentMongoDBSourceConfig;
    private ExecutorService currentExecutor;

    public MongoTasksRefresher(Buffer<Record<Event>> buffer, EnhancedSourceCoordinator sourceCoordinator, PluginMetrics pluginMetrics, AcknowledgementSetManager acknowledgementSetManager, Function<Integer, ExecutorService> executorServiceFunction, String s3PathPrefix, DocumentDBSourceAggregateMetrics documentDBAggregateMetrics) {
        this.sourceCoordinator = sourceCoordinator;
        this.pluginMetrics = pluginMetrics;
        this.acknowledgementSetManager = acknowledgementSetManager;
        this.buffer = buffer;
        this.executorServiceFunction = executorServiceFunction;
        this.credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED);
        this.executorRefreshErrorsCounter = pluginMetrics.counter(EXECUTOR_REFRESH_ERRORS);
        Preconditions.checkArgument((boolean)Objects.nonNull(s3PathPrefix), (Object)"S3 path prefix must not be null");
        this.s3PathPrefix = s3PathPrefix;
        this.documentDBAggregateMetrics = documentDBAggregateMetrics;
    }

    public void initialize(MongoDBSourceConfig sourceConfig) {
        this.currentMongoDBSourceConfig = sourceConfig;
        this.refreshJobs(sourceConfig);
    }

    public void update(MongoDBSourceConfig pluginConfig) {
        MongoDBSourceConfig.AuthenticationConfig newAuthConfig = pluginConfig.getAuthenticationConfig();
        if (this.basicAuthChanged(newAuthConfig)) {
            this.credentialsChangeCounter.increment();
            try {
                this.currentExecutor.shutdownNow();
                this.refreshJobs(pluginConfig);
                this.currentMongoDBSourceConfig = pluginConfig;
            }
            catch (Exception e) {
                this.executorRefreshErrorsCounter.increment();
                LOG.error("Refreshing executor failed.", (Throwable)e);
            }
        }
    }

    private void refreshJobs(MongoDBSourceConfig pluginConfig) {
        ArrayList<Runnable> runnables = new ArrayList<Runnable>();
        if (pluginConfig.getCollections().stream().anyMatch(CollectionConfig::isExport)) {
            this.currentMongoDBExportPartitionSupplier = new MongoDBExportPartitionSupplier(pluginConfig, this.sourceCoordinator, this.documentDBAggregateMetrics);
            runnables.add(new ExportScheduler(this.sourceCoordinator, this.currentMongoDBExportPartitionSupplier, this.pluginMetrics));
            runnables.add(new ExportWorker(this.sourceCoordinator, this.buffer, this.pluginMetrics, this.acknowledgementSetManager, pluginConfig, this.s3PathPrefix, this.documentDBAggregateMetrics));
        }
        if (pluginConfig.getCollections().stream().anyMatch(CollectionConfig::isStream)) {
            runnables.add(new StreamScheduler(this.sourceCoordinator, this.buffer, this.acknowledgementSetManager, pluginConfig, this.s3PathPrefix, this.pluginMetrics, this.documentDBAggregateMetrics));
        }
        this.currentExecutor = this.executorServiceFunction.apply(runnables.size());
        runnables.forEach(this.currentExecutor::submit);
    }

    private boolean basicAuthChanged(MongoDBSourceConfig.AuthenticationConfig newAuthConfig) {
        MongoDBSourceConfig.AuthenticationConfig currentAuthConfig = this.currentMongoDBSourceConfig.getAuthenticationConfig();
        return !Objects.equals(currentAuthConfig.getUsername(), newAuthConfig.getUsername()) || !Objects.equals(currentAuthConfig.getPassword(), newAuthConfig.getPassword());
    }

    public void shutdown() {
        if (this.currentExecutor != null) {
            LOG.info("shutdown down export worker and stream worker");
            this.currentExecutor.shutdownNow();
        }
    }
}

