/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.server.metrics;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.common.metrics.Counter;
import org.apache.seatunnel.api.common.metrics.Meter;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.PluginType;

public class ConnectorMetricsCalcContext {
    private final MetricsContext metricsContext;
    private final PluginType type;
    private Counter count;
    private final Map<String, Counter> countPerTable = new ConcurrentHashMap<String, Counter>();
    private Meter QPS;
    private final Map<String, Meter> QPSPerTable = new ConcurrentHashMap<String, Meter>();
    private Counter bytes;
    private final Map<String, Counter> bytesPerTable = new ConcurrentHashMap<String, Counter>();
    private Meter bytesPerSeconds;
    private final Map<String, Meter> bytesPerSecondsPerTable = new ConcurrentHashMap<String, Meter>();

    public ConnectorMetricsCalcContext(MetricsContext metricsContext, PluginType type, boolean isMulti, List<TablePath> tables) {
        this.metricsContext = metricsContext;
        this.type = type;
        this.initializeMetrics(isMulti, tables);
    }

    private void initializeMetrics(boolean isMulti, List<TablePath> tables) {
        if (this.type.equals((Object)PluginType.SINK)) {
            this.initializeMetrics(isMulti, tables, "SinkWriteCount", "SinkWriteQPS", "SinkWriteBytes", "SinkWriteBytesPerSeconds");
        } else if (this.type.equals((Object)PluginType.SOURCE)) {
            this.initializeMetrics(isMulti, tables, "SourceReceivedCount", "SourceReceivedQPS", "SourceReceivedBytes", "SourceReceivedBytesPerSeconds");
        }
    }

    private void initializeMetrics(boolean isMulti, List<TablePath> tables, String countName, String qpsName, String bytesName, String bytesPerSecondsName) {
        this.count = this.metricsContext.counter(countName);
        this.QPS = this.metricsContext.meter(qpsName);
        this.bytes = this.metricsContext.counter(bytesName);
        this.bytesPerSeconds = this.metricsContext.meter(bytesPerSecondsName);
        if (isMulti) {
            tables.forEach(tablePath -> {
                this.countPerTable.put(tablePath.getFullName(), this.metricsContext.counter(countName + "#" + tablePath.getFullName()));
                this.QPSPerTable.put(tablePath.getFullName(), this.metricsContext.meter(qpsName + "#" + tablePath.getFullName()));
                this.bytesPerTable.put(tablePath.getFullName(), this.metricsContext.counter(bytesName + "#" + tablePath.getFullName()));
                this.bytesPerSecondsPerTable.put(tablePath.getFullName(), this.metricsContext.meter(bytesPerSecondsName + "#" + tablePath.getFullName()));
            });
        }
    }

    public void updateMetrics(Object data, String tableId) {
        this.count.inc();
        this.QPS.markEvent();
        if (data instanceof SeaTunnelRow) {
            SeaTunnelRow row = (SeaTunnelRow)data;
            this.bytes.inc((long)row.getBytesSize());
            this.bytesPerSeconds.markEvent((long)row.getBytesSize());
            if (StringUtils.isNotBlank((CharSequence)tableId)) {
                String tableName = TablePath.of((String)tableId).getFullName();
                this.processMetrics(this.countPerTable, Counter.class, tableName, "SinkWriteCount", "SourceReceivedCount", Counter::inc);
                this.processMetrics(this.bytesPerTable, Counter.class, tableName, "SinkWriteBytes", "SourceReceivedBytes", counter -> counter.inc((long)row.getBytesSize()));
                this.processMetrics(this.QPSPerTable, Meter.class, tableName, "SinkWriteQPS", "SourceReceivedQPS", Meter::markEvent);
                this.processMetrics(this.bytesPerSecondsPerTable, Meter.class, tableName, "SinkWriteBytesPerSeconds", "SourceReceivedBytesPerSeconds", meter -> meter.markEvent((long)row.getBytesSize()));
            }
        }
    }

    private <T> void processMetrics(Map<String, T> metricMap, Class<T> cls, String tableName, String sinkMetric, String sourceMetric, MetricProcessor<T> processor) {
        T metric = metricMap.get(tableName);
        if (Objects.nonNull(metric)) {
            processor.process(metric);
        } else {
            String metricName = PluginType.SINK.equals((Object)this.type) ? sinkMetric + "#" + tableName : sourceMetric + "#" + tableName;
            T newMetric = this.createMetric(this.metricsContext, metricName, cls);
            processor.process(newMetric);
            metricMap.put(tableName, newMetric);
        }
    }

    private <T> T createMetric(MetricsContext metricsContext, String metricName, Class<T> metricClass) {
        if (metricClass == Counter.class) {
            return metricClass.cast(metricsContext.counter(metricName));
        }
        if (metricClass == Meter.class) {
            return metricClass.cast(metricsContext.meter(metricName));
        }
        throw new IllegalArgumentException("Unsupported metric class: " + metricClass.getName());
    }

    @FunctionalInterface
    static interface MetricProcessor<T> {
        public void process(T var1);
    }
}

