/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.service.resourcelimits;

import io.vertx.core.Future;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.predicate.ResponsePredicate;
import io.vertx.ext.web.codec.BodyCodec;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.YearMonth;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAdjusters;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.eclipse.hono.cache.CacheProvider;
import org.eclipse.hono.cache.ExpiringValueCache;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.service.resourcelimits.PrometheusBasedResourceLimitChecksConfig;
import org.eclipse.hono.service.resourcelimits.ResourceLimitChecks;
import org.eclipse.hono.util.DataVolume;
import org.eclipse.hono.util.DataVolumePeriod;
import org.eclipse.hono.util.TenantObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PrometheusBasedResourceLimitChecks
implements ResourceLimitChecks {
    private static final String CONNECTIONS_METRIC_NAME = "hono.connections.authenticated".replace(".", "_");
    private static final String MESSAGES_PAYLOAD_SIZE_METRIC_NAME = String.format("%s_bytes_sum", "hono.messages.payload".replace(".", "_"));
    private static final String COMMANDS_PAYLOAD_SIZE_METRIC_NAME = String.format("%s_bytes_sum", "hono.commands.payload".replace(".", "_"));
    private static final Logger log = LoggerFactory.getLogger(PrometheusBasedResourceLimitChecks.class);
    private static final String QUERY_URI = "/api/v1/query";
    private static final String LIMITS_CACHE_NAME = "resource-limits";
    private final WebClient client;
    private final PrometheusBasedResourceLimitChecksConfig config;
    private final ExpiringValueCache<Object, Object> limitsCache;

    public PrometheusBasedResourceLimitChecks(WebClient webClient, PrometheusBasedResourceLimitChecksConfig config, CacheProvider cacheProvider) {
        this.client = Objects.requireNonNull(webClient);
        this.config = Objects.requireNonNull(config);
        this.limitsCache = Optional.ofNullable(cacheProvider).map(provider -> provider.getCache(LIMITS_CACHE_NAME)).orElse(null);
    }

    @Override
    public Future<Boolean> isConnectionLimitReached(TenantObject tenant) {
        Objects.requireNonNull(tenant);
        if (tenant.getResourceLimits() == null) {
            log.trace("No connection limits configured for the tenant [{}]", (Object)tenant.getTenantId());
            return Future.succeededFuture((Object)Boolean.FALSE);
        }
        long maxConnections = tenant.getResourceLimits().getMaxConnections();
        log.trace("connection limit for tenant [{}] is [{}]", (Object)tenant.getTenantId(), (Object)maxConnections);
        if (maxConnections == -1L) {
            return Future.succeededFuture((Object)Boolean.FALSE);
        }
        String queryParams = String.format("sum(%s{tenant=\"%s\"})", CONNECTIONS_METRIC_NAME, tenant.getTenantId());
        return this.executeQuery(queryParams).map(currentConnections -> {
            if (currentConnections < maxConnections) {
                return Boolean.FALSE;
            }
            log.trace("connection limit exceeded [tenant: {}, current connections: {}, max-connections: {}]", new Object[]{tenant.getTenantId(), currentConnections, maxConnections});
            return Boolean.TRUE;
        }).otherwise((Object)Boolean.FALSE);
    }

    @Override
    public Future<Boolean> isMessageLimitReached(TenantObject tenant, long payloadSize) {
        Objects.requireNonNull(tenant);
        if (tenant.getResourceLimits() == null || tenant.getResourceLimits().getDataVolume() == null) {
            log.trace("No message limits configured for the tenant [{}]", (Object)tenant.getTenantId());
            return Future.succeededFuture((Object)Boolean.FALSE);
        }
        DataVolume dataVolumeConfig = tenant.getResourceLimits().getDataVolume();
        long maxBytes = dataVolumeConfig.getMaxBytes();
        Instant effectiveSince = dataVolumeConfig.getEffectiveSince();
        PeriodMode periodMode = PeriodMode.from(dataVolumeConfig.getPeriod().getMode());
        long periodInDays = Optional.ofNullable(dataVolumeConfig.getPeriod()).map(DataVolumePeriod::getNoOfDays).orElse(0).intValue();
        log.trace("message limit config for tenant [{}] are [{}:{}, {}:{}, {}:{}, {}:{}]", new Object[]{tenant.getTenantId(), "max-bytes", maxBytes, "effective-since", effectiveSince, "mode", periodMode, "no-of-days", periodInDays});
        if (maxBytes == -1L || effectiveSince == null || PeriodMode.UNKNOWN.equals((Object)periodMode) || payloadSize <= 0L) {
            return Future.succeededFuture((Object)Boolean.FALSE);
        }
        long allowedMaxBytes = this.getOrAddToCache(this.limitsCache, String.format("%s_allowed_max_bytes", tenant.getTenantId()), () -> this.calculateDataVolume(OffsetDateTime.ofInstant(effectiveSince, ZoneOffset.UTC), OffsetDateTime.now(ZoneOffset.UTC), periodMode, maxBytes));
        long dataUsagePeriod = this.getOrAddToCache(this.limitsCache, String.format("%s_data_usage_period", tenant.getTenantId()), () -> this.calculateDataUsagePeriod(OffsetDateTime.ofInstant(effectiveSince, ZoneOffset.UTC), OffsetDateTime.now(ZoneOffset.UTC), periodMode, periodInDays));
        if (dataUsagePeriod <= 0L) {
            return Future.succeededFuture((Object)Boolean.FALSE);
        }
        String queryParams = String.format("floor(sum(increase(%s{status=~\"%s|%s\", tenant=\"%s\"} [%sd]) or %s*0) + sum(increase(%s{status=~\"%s|%s\", tenant=\"%s\"} [%sd]) or %s*0))", MESSAGES_PAYLOAD_SIZE_METRIC_NAME, MetricsTags.ProcessingOutcome.FORWARDED.asTag().getValue(), MetricsTags.ProcessingOutcome.UNPROCESSABLE.asTag().getValue(), tenant.getTenantId(), dataUsagePeriod, COMMANDS_PAYLOAD_SIZE_METRIC_NAME, COMMANDS_PAYLOAD_SIZE_METRIC_NAME, MetricsTags.ProcessingOutcome.FORWARDED.asTag().getValue(), MetricsTags.ProcessingOutcome.UNPROCESSABLE.asTag().getValue(), tenant.getTenantId(), dataUsagePeriod, MESSAGES_PAYLOAD_SIZE_METRIC_NAME);
        String key = String.format("%s_bytes_consumed", tenant.getTenantId());
        return Optional.ofNullable(this.limitsCache).map(success -> this.limitsCache.get((Object)key)).map(cachedValue -> Future.succeededFuture((Object)((Long)cachedValue))).orElseGet(() -> this.executeQuery(queryParams).map(bytesConsumed -> this.addToCache(this.limitsCache, key, (long)bytesConsumed))).map(bytesConsumed -> {
            if (bytesConsumed + payloadSize <= allowedMaxBytes) {
                return Boolean.FALSE;
            }
            log.trace("Data limit exceeded for the tenant [{}]. [consumed bytes:{}, allowed max-bytes:{}, {}:{}, {}:{}, {}:{}]", new Object[]{tenant.getTenantId(), bytesConsumed, allowedMaxBytes, "effective-since", effectiveSince, "mode", periodMode, "no-of-days", periodInDays});
            return Boolean.TRUE;
        }).otherwise((Object)Boolean.FALSE);
    }

    private Future<Long> executeQuery(String query) {
        Future result = Future.future();
        log.trace("running query [{}] against Prometheus backend [http://{}:{}{}]", new Object[]{query, this.config.getHost(), this.config.getPort(), QUERY_URI});
        this.client.get(this.config.getPort(), this.config.getHost(), QUERY_URI).addQueryParam("query", query).expect(ResponsePredicate.SC_OK).as(BodyCodec.jsonObject()).send(sendAttempt -> {
            if (sendAttempt.succeeded()) {
                HttpResponse response = (HttpResponse)sendAttempt.result();
                result.complete((Object)this.extractLongValue((JsonObject)response.body()));
            } else {
                log.debug("error fetching result from Prometheus: {}", (Object)sendAttempt.cause().getMessage());
                result.fail(sendAttempt.cause());
            }
        });
        return result;
    }

    private Long extractLongValue(JsonObject response) {
        Objects.requireNonNull(response);
        try {
            String value;
            JsonArray valueArray;
            String status = response.getString("status");
            if ("error".equals(status)) {
                log.debug("error while executing query [status: {}, error type: {}, error: {}]", new Object[]{status, response.getString("errorType"), response.getString("error")});
                return 0L;
            }
            JsonObject data = response.getJsonObject("data", new JsonObject());
            JsonArray result = data.getJsonArray("result");
            if (result != null && result.size() == 1 && result.getJsonObject(0) != null && (valueArray = result.getJsonObject(0).getJsonArray("value")) != null && valueArray.size() == 2 && (value = valueArray.getString(1)) != null && !value.isEmpty()) {
                return Long.parseLong(value);
            }
            log.debug("received malformed response from Prometheus server: {}", (Object)response.encodePrettily());
        }
        catch (Exception e) {
            log.debug("received malformed response from Prometheus server: {}", (Object)response.encodePrettily(), (Object)e);
        }
        return 0L;
    }

    long calculateDataVolume(OffsetDateTime effectiveSince, OffsetDateTime targetDateTime, PeriodMode mode, long maxBytes) {
        if (PeriodMode.MONTHLY.equals((Object)mode) && maxBytes > 0L && !targetDateTime.isBefore(effectiveSince) && YearMonth.from(targetDateTime).equals(YearMonth.from(effectiveSince)) && effectiveSince.getDayOfMonth() != 1) {
            OffsetDateTime lastDayOfMonth = effectiveSince.with(TemporalAdjusters.lastDayOfMonth());
            long daysBetween = ChronoUnit.DAYS.between(effectiveSince, lastDayOfMonth) + 1L;
            return Double.valueOf(Math.ceil(daysBetween * maxBytes / (long)lastDayOfMonth.getDayOfMonth())).longValue();
        }
        return maxBytes;
    }

    long calculateDataUsagePeriod(OffsetDateTime effectiveSince, OffsetDateTime targetDateTime, PeriodMode mode, long periodInDays) {
        long inclusiveDaysBetween = ChronoUnit.DAYS.between(effectiveSince, targetDateTime) + 1L;
        switch (mode) {
            case DAYS: {
                if (inclusiveDaysBetween > 0L && periodInDays > 0L) {
                    long dataUsagePeriodInDays = inclusiveDaysBetween % periodInDays;
                    return dataUsagePeriodInDays == 0L ? periodInDays : dataUsagePeriodInDays;
                }
                return 0L;
            }
            case MONTHLY: {
                if (YearMonth.from(targetDateTime).equals(YearMonth.from(effectiveSince)) && effectiveSince.getDayOfMonth() != 1) {
                    return inclusiveDaysBetween;
                }
                return targetDateTime.getDayOfMonth();
            }
        }
        return 0L;
    }

    private long getOrAddToCache(ExpiringValueCache<Object, Object> cache, String key, Supplier<Long> valueSupplier) {
        return Optional.ofNullable(cache).map(success -> cache.get((Object)key)).map(cachedValue -> (long)((Long)cachedValue)).orElseGet(() -> this.addToCache(cache, key, (Long)valueSupplier.get()));
    }

    private long addToCache(ExpiringValueCache<Object, Object> cache, String key, long result) {
        Optional.ofNullable(cache).ifPresent(success -> cache.put((Object)key, (Object)result, Duration.ofSeconds(this.config.getCacheTimeout())));
        return result;
    }

    protected static enum PeriodMode {
        DAYS("days"),
        MONTHLY("monthly"),
        UNKNOWN("unknown");

        private final String mode;

        private PeriodMode(String mode) {
            this.mode = mode;
        }

        static PeriodMode from(String value) {
            if (value != null) {
                for (PeriodMode mode : PeriodMode.values()) {
                    if (!value.equalsIgnoreCase(mode.value())) continue;
                    return mode;
                }
            }
            return UNKNOWN;
        }

        String value() {
            return this.mode;
        }
    }
}

