/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.geoip.processor;

import io.micrometer.core.instrument.Counter;
import java.net.InetAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.geoip.GeoIPDatabase;
import org.opensearch.dataprepper.plugins.geoip.GeoIPField;
import org.opensearch.dataprepper.plugins.geoip.exception.EngineFailureException;
import org.opensearch.dataprepper.plugins.geoip.exception.EnrichFailedException;
import org.opensearch.dataprepper.plugins.geoip.extension.GeoIPProcessorService;
import org.opensearch.dataprepper.plugins.geoip.extension.api.GeoIPDatabaseReader;
import org.opensearch.dataprepper.plugins.geoip.extension.api.GeoIpConfigSupplier;
import org.opensearch.dataprepper.plugins.geoip.processor.BatchGeoIPDatabaseReader;
import org.opensearch.dataprepper.plugins.geoip.processor.EntryConfig;
import org.opensearch.dataprepper.plugins.geoip.processor.GeoIPProcessorConfig;
import org.opensearch.dataprepper.plugins.geoip.processor.GeoInetAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DataPrepperPlugin(name="geoip", pluginType=Processor.class, pluginConfigurationType=GeoIPProcessorConfig.class)
public class GeoIPProcessor
extends AbstractProcessor<Record<Event>, Record<Event>> {
    private static final Logger LOG = LoggerFactory.getLogger(GeoIPProcessor.class);
    static final String GEO_IP_EVENTS_PROCESSED = "eventsProcessed";
    static final String GEO_IP_EVENTS_SUCCEEDED = "eventsSucceeded";
    static final String GEO_IP_EVENTS_FAILED = "eventsFailed";
    static final String GEO_IP_EVENTS_FAILED_ENGINE_EXCEPTION = "eventsFailedEngineException";
    static final String GEO_IP_EVENTS_FAILED_IP_NOT_FOUND = "eventsFailedLocationNotFound";
    private final Counter geoIpEventsProcessed;
    private final Counter geoIpEventsSucceeded;
    private final Counter geoIpEventsFailed;
    private final Counter geoIpEventsFailedEngineException;
    private final Counter geoIpEventsFailedIPNotFound;
    private final GeoIPProcessorConfig geoIPProcessorConfig;
    private final List<String> tagsOnEngineFailure;
    private final List<String> tagsOnIPNotFound;
    private final List<String> tagsOnInvalidIP;
    private final GeoIPProcessorService geoIPProcessorService;
    private final ExpressionEvaluator expressionEvaluator;
    private final Map<EntryConfig, Collection<GeoIPField>> entryFieldsMap;
    final Map<EntryConfig, Collection<GeoIPDatabase>> entryDatabaseMap;

    @DataPrepperPluginConstructor
    public GeoIPProcessor(PluginMetrics pluginMetrics, GeoIPProcessorConfig geoIPProcessorConfig, GeoIpConfigSupplier geoIpConfigSupplier, ExpressionEvaluator expressionEvaluator) {
        super(pluginMetrics);
        if (geoIPProcessorConfig.getWhenCondition() != null && !expressionEvaluator.isValidExpressionStatement(geoIPProcessorConfig.getWhenCondition()).booleanValue()) {
            throw new InvalidPluginConfigurationException(String.format("geoip_when \"%s\" is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", geoIPProcessorConfig.getWhenCondition()));
        }
        this.geoIPProcessorService = geoIpConfigSupplier.getGeoIPProcessorService().orElseThrow(() -> new IllegalStateException("geoip_service configuration is required when using geoip processor."));
        this.geoIPProcessorConfig = geoIPProcessorConfig;
        this.tagsOnEngineFailure = geoIPProcessorConfig.getTagsOnEngineFailure();
        this.tagsOnIPNotFound = geoIPProcessorConfig.getTagsOnIPNotFound();
        this.tagsOnInvalidIP = geoIPProcessorConfig.getTagsOnNoValidIp();
        this.expressionEvaluator = expressionEvaluator;
        this.geoIpEventsProcessed = pluginMetrics.counter(GEO_IP_EVENTS_PROCESSED);
        this.geoIpEventsSucceeded = pluginMetrics.counter(GEO_IP_EVENTS_SUCCEEDED);
        this.geoIpEventsFailed = pluginMetrics.counter(GEO_IP_EVENTS_FAILED);
        this.geoIpEventsFailedEngineException = pluginMetrics.counter(GEO_IP_EVENTS_FAILED_ENGINE_EXCEPTION);
        this.geoIpEventsFailedIPNotFound = pluginMetrics.counter(GEO_IP_EVENTS_FAILED_IP_NOT_FOUND);
        this.entryFieldsMap = this.populateGeoIPFields();
        this.entryDatabaseMap = this.populateGeoIPDatabases();
    }

    public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
        try (BatchGeoIPDatabaseReader geoIPDatabaseReader = BatchGeoIPDatabaseReader.decorate(this.geoIPProcessorService.getGeoIPDatabaseReader());){
            this.processRecords(records, geoIPDatabaseReader);
        }
        catch (Exception e) {
            LOG.error(DataPrepperMarkers.NOISY, "Encountered exception in geoip processor.", (Throwable)e);
        }
        return records;
    }

    private void processRecords(Collection<Record<Event>> records, GeoIPDatabaseReader geoIPDatabaseReader) {
        for (Record<Event> eventRecord : records) {
            String whenCondition;
            Event event = (Event)eventRecord.getData();
            if (this.checkConditionAndDatabaseReader(geoIPDatabaseReader, event, whenCondition = this.geoIPProcessorConfig.getWhenCondition())) continue;
            boolean eventSucceeded = true;
            boolean ipNotFound = false;
            boolean invalidIp = false;
            boolean engineFailure = false;
            for (EntryConfig entry : this.geoIPProcessorConfig.getEntries()) {
                String source = entry.getSource();
                Collection<GeoIPField> fields = this.entryFieldsMap.get(entry);
                Collection<GeoIPDatabase> databases = this.entryDatabaseMap.get(entry);
                String ipAddress = null;
                try {
                    ipAddress = (String)event.get(source, String.class);
                }
                catch (Exception e) {
                    eventSucceeded = false;
                    ipNotFound = true;
                    LOG.error(DataPrepperMarkers.EVENT, "Failed to get IP address from [{}] in event: [{}]. Caused by:[{}]", new Object[]{source, event, e.getMessage()});
                }
                if (ipAddress != null && !ipAddress.isEmpty()) {
                    try {
                        Optional<InetAddress> optionalInetAddress = GeoInetAddress.usableInetFromString(ipAddress);
                        if (optionalInetAddress.isPresent()) {
                            Map<String, Object> geoData = geoIPDatabaseReader.getGeoData(optionalInetAddress.get(), fields, databases);
                            if (geoData.isEmpty()) {
                                ipNotFound = true;
                                eventSucceeded = false;
                                continue;
                            }
                            ((Event)eventRecord.getData()).put(entry.getTarget(), geoData);
                            continue;
                        }
                        invalidIp = true;
                        eventSucceeded = false;
                    }
                    catch (EnrichFailedException e) {
                        ipNotFound = true;
                        eventSucceeded = false;
                        LOG.error(DataPrepperMarkers.EVENT, "IP address not found in database for IP: [{}] in event: [{}]. Caused by:[{}]", new Object[]{ipAddress, event, e.getMessage()});
                        LOG.error("IP address not found in database for IP: [{}]. Caused by:[{}]", (Object)ipAddress, (Object)e.getMessage());
                    }
                    catch (EngineFailureException e) {
                        engineFailure = true;
                        eventSucceeded = false;
                        LOG.error(DataPrepperMarkers.EVENT, "Failed to get Geo data for event: [{}] for the IP address [{}]. Caused by:{}", new Object[]{event, ipAddress, e.getMessage()});
                        LOG.error("Failed to get Geo data for the IP address [{}]. Caused by:{}", (Object)ipAddress, (Object)e.getMessage());
                    }
                    continue;
                }
                eventSucceeded = false;
                invalidIp = true;
            }
            this.updateTagsAndMetrics(event, eventSucceeded, ipNotFound, engineFailure, invalidIp);
        }
    }

    private void updateTagsAndMetrics(Event event, boolean eventSucceeded, boolean ipNotFound, boolean engineFailure, boolean invalidIp) {
        if (invalidIp) {
            event.getMetadata().addTags(this.tagsOnInvalidIP);
        }
        if (ipNotFound) {
            event.getMetadata().addTags(this.tagsOnIPNotFound);
            this.geoIpEventsFailedIPNotFound.increment();
        }
        if (engineFailure) {
            event.getMetadata().addTags(this.tagsOnEngineFailure);
            this.geoIpEventsFailedEngineException.increment();
        }
        if (eventSucceeded) {
            this.geoIpEventsSucceeded.increment();
        } else {
            this.geoIpEventsFailed.increment();
        }
    }

    private boolean checkConditionAndDatabaseReader(GeoIPDatabaseReader geoIPDatabaseReader, Event event, String whenCondition) {
        if (whenCondition != null && !this.expressionEvaluator.evaluateConditional(whenCondition, event).booleanValue()) {
            return true;
        }
        this.geoIpEventsProcessed.increment();
        if (geoIPDatabaseReader == null || geoIPDatabaseReader.isExpired()) {
            event.getMetadata().addTags(this.tagsOnEngineFailure);
            this.geoIpEventsFailed.increment();
            return true;
        }
        return false;
    }

    private Map<EntryConfig, Collection<GeoIPField>> populateGeoIPFields() {
        return this.geoIPProcessorConfig.getEntries().stream().collect(Collectors.toMap(Function.identity(), EntryConfig::getGeoIPFields));
    }

    private Map<EntryConfig, Collection<GeoIPDatabase>> populateGeoIPDatabases() {
        HashMap<EntryConfig, Collection<GeoIPDatabase>> entryConfigGeoIPDatabaseMap = new HashMap<EntryConfig, Collection<GeoIPDatabase>>();
        for (EntryConfig entry : this.geoIPProcessorConfig.getEntries()) {
            Collection<GeoIPField> geoIPFields = this.entryFieldsMap.get(entry);
            Collection<GeoIPDatabase> geoIPDatabasesToUse = GeoIPDatabase.selectDatabasesForFields(geoIPFields);
            entryConfigGeoIPDatabaseMap.put(entry, geoIPDatabasesToUse);
        }
        if (LOG.isDebugEnabled()) {
            for (Collection geoIPDatabases : entryConfigGeoIPDatabaseMap.values()) {
                LOG.debug("Entry configuration using databases: {}", (Object)geoIPDatabases.stream().map(Enum::name).collect(Collectors.joining(", ")));
            }
        }
        return entryConfigGeoIPDatabaseMap;
    }

    public void prepareForShutdown() {
    }

    public boolean isReadyForShutdown() {
        this.geoIPProcessorService.shutdown();
        return true;
    }

    public void shutdown() {
    }
}

