/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.influxdb;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BatchSink<T, R>
implements Sink<R> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(BatchSink.class);
    private int batchSize;
    private List<Record<R>> incomingList;
    private ScheduledExecutorService flushExecutor;

    protected void init(long batchTimeMs, int batchSize) {
        this.batchSize = batchSize;
        this.incomingList = Lists.newArrayList();
        this.flushExecutor = Executors.newSingleThreadScheduledExecutor();
        this.flushExecutor.scheduleAtFixedRate(this::flush, batchTimeMs, batchTimeMs, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void write(Record<R> record) {
        int currentSize;
        BatchSink batchSink = this;
        synchronized (batchSink) {
            if (null != record) {
                this.incomingList.add(record);
            }
            currentSize = this.incomingList.size();
        }
        if (currentSize >= this.batchSize) {
            this.flushExecutor.execute(this::flush);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flush() {
        List<Record<R>> toFlushList;
        BatchSink batchSink = this;
        synchronized (batchSink) {
            if (this.incomingList.isEmpty()) {
                return;
            }
            toFlushList = this.incomingList;
            this.incomingList = Lists.newArrayList();
        }
        ArrayList points = Lists.newArrayListWithExpectedSize((int)toFlushList.size());
        if (CollectionUtils.isNotEmpty(toFlushList)) {
            for (Record<R> record : toFlushList) {
                try {
                    points.add(this.buildPoint(record));
                }
                catch (Exception e) {
                    record.fail();
                    toFlushList.remove(record);
                    log.warn("Record flush thread was exception ", (Throwable)e);
                }
            }
        }
        try {
            if (CollectionUtils.isNotEmpty((Collection)points)) {
                this.writePoints(points);
            }
            toFlushList.forEach(Record::ack);
            points.clear();
            toFlushList.clear();
        }
        catch (Exception e) {
            toFlushList.forEach(Record::fail);
            log.error("InfluxDB write batch data exception ", (Throwable)e);
        }
    }

    public void close() throws Exception {
        if (null != this.flushExecutor) {
            this.flushExecutor.shutdown();
        }
    }

    protected Object getFiled(GenericRecord record, String fieldName) {
        List fields = record.getFields();
        Field fieldMatch = fields.stream().filter(field -> fieldName.equals(field.getName())).findAny().orElse(null);
        if (null != fieldMatch) {
            return record.getField(fieldMatch);
        }
        return null;
    }

    protected abstract T buildPoint(Record<R> var1) throws Exception;

    protected abstract void writePoints(List<T> var1) throws Exception;
}

