/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.influxdb.v2;

import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import java.util.List;
import java.util.Map;
import lombok.Generated;
import org.apache.avro.util.Utf8;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.influxdb.BatchSink;
import org.apache.pulsar.io.influxdb.v2.InfluxDBClientBuilder;
import org.apache.pulsar.io.influxdb.v2.InfluxDBClientBuilderImpl;
import org.apache.pulsar.io.influxdb.v2.InfluxDBSinkConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InfluxDBSink
extends BatchSink<Point, GenericRecord> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(InfluxDBSink.class);
    private WritePrecision writePrecision;
    protected InfluxDBClientBuilder influxDBClientBuilder = new InfluxDBClientBuilderImpl();
    private InfluxDBClient influxDBClient;
    private WriteApiBlocking writeApi;

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        InfluxDBSinkConfig influxDBSinkConfig = InfluxDBSinkConfig.load(config, sinkContext);
        influxDBSinkConfig.validate();
        super.init(influxDBSinkConfig.getBatchTimeMs(), influxDBSinkConfig.getBatchSize());
        this.influxDBClient = this.influxDBClientBuilder.build(influxDBSinkConfig);
        this.writeApi = this.influxDBClient.getWriteApiBlocking();
        this.writePrecision = WritePrecision.fromValue((String)influxDBSinkConfig.getPrecision().toLowerCase());
    }

    @Override
    protected final Point buildPoint(Record<GenericRecord> record) {
        Object columnsField;
        long timestamp;
        GenericRecord genericRecord = (GenericRecord)record.getValue();
        Object measurementField = genericRecord.getField("measurement");
        if (null == measurementField) {
            throw new SchemaSerializationException("measurement is a required field.");
        }
        String measurement = (String)measurementField;
        Object timestampField = this.getFiled(genericRecord, "timestamp");
        if (null == timestampField) {
            timestamp = System.currentTimeMillis();
        } else if (timestampField instanceof Number) {
            timestamp = ((Number)timestampField).longValue();
        } else if (timestampField instanceof String) {
            timestamp = Long.parseLong((String)timestampField);
        } else {
            throw new SchemaSerializationException("Invalid timestamp field");
        }
        Point point = Point.measurement((String)measurement).time(Long.valueOf(timestamp), this.writePrecision);
        Object tagsField = this.getFiled(genericRecord, "tags");
        if (null != tagsField) {
            if (tagsField instanceof GenericRecord) {
                GenericRecord tagsRecord = (GenericRecord)tagsField;
                for (Field field : tagsRecord.getFields()) {
                    String fieldName = field.getName();
                    Object value2 = tagsRecord.getField(field);
                    point.addTag(fieldName, (String)value2);
                }
            } else if (Map.class.isAssignableFrom(tagsField.getClass())) {
                Map tagsMap = (Map)tagsField;
                tagsMap.forEach((key, value) -> point.addTag(key.toString(), value.toString()));
            } else {
                throw new SchemaSerializationException("Unknown type for 'tags'");
            }
        }
        if ((columnsField = genericRecord.getField("fields")) instanceof GenericRecord) {
            GenericRecord columnsRecord = (GenericRecord)columnsField;
            for (Field field : columnsRecord.getFields()) {
                String fieldName = field.getName();
                Object value3 = columnsRecord.getField(field);
                this.addPointField(point, fieldName, value3);
            }
        } else if (Map.class.isAssignableFrom(columnsField.getClass())) {
            Map columnsMap = (Map)columnsField;
            columnsMap.forEach((key, value) -> this.addPointField(point, key.toString(), value));
        } else {
            throw new SchemaSerializationException("Unknown type for 'fields'");
        }
        return point;
    }

    @Override
    protected void writePoints(List<Point> points) throws Exception {
        this.writeApi.writePoints(points);
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (null != this.influxDBClient) {
            this.influxDBClient.close();
        }
    }

    private void addPointField(Point point, String fieldName, Object value) throws SchemaSerializationException {
        if (value instanceof Number) {
            point.addField(fieldName, (Number)value);
        } else if (value instanceof Boolean) {
            point.addField(fieldName, ((Boolean)value).booleanValue());
        } else if (value instanceof String) {
            point.addField(fieldName, (String)value);
        } else if (value instanceof Utf8) {
            point.addField(fieldName, value.toString());
        } else {
            throw new SchemaSerializationException("Unknown value type for field " + fieldName + ". Type: " + String.valueOf(value.getClass()));
        }
    }
}

