/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kudu.flume.sink;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.util.List;
import java.util.Map;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.kudu.annotations.InterfaceAudience;
import org.apache.kudu.annotations.InterfaceStability;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.flume.sink.KuduEventProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Evolving
public class KuduSink
extends AbstractSink
implements Configurable {
    private static final Logger logger = LoggerFactory.getLogger(KuduSink.class);
    private static final Long DEFAULT_BATCH_SIZE = 100L;
    private static final Long DEFAULT_TIMEOUT_MILLIS = 30000L;
    private static final String DEFAULT_KUDU_EVENT_PRODUCER = "org.apache.kudu.flume.sink.SimpleKuduEventProducer";
    private static final boolean DEFAULT_IGNORE_DUPLICATE_ROWS = true;
    private String masterAddresses;
    private String tableName;
    private long batchSize;
    private long timeoutMillis;
    private boolean ignoreDuplicateRows;
    private KuduTable table;
    private KuduSession session;
    private KuduClient client;
    private KuduEventProducer eventProducer;
    private String eventProducerType;
    private Context producerContext;
    private SinkCounter sinkCounter;

    public KuduSink() {
        this(null);
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public KuduSink(KuduClient kuduClient) {
        this.client = kuduClient;
    }

    public void start() {
        Preconditions.checkState((this.table == null && this.session == null ? 1 : 0) != 0, (Object)"Please call stop before calling start on an old instance.");
        if (this.client == null) {
            this.client = new KuduClient.KuduClientBuilder(this.masterAddresses).build();
        }
        this.session = this.client.newSession();
        this.session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
        this.session.setTimeoutMillis(this.timeoutMillis);
        this.session.setIgnoreAllDuplicateRows(this.ignoreDuplicateRows);
        try {
            this.table = this.client.openTable(this.tableName);
        }
        catch (Exception e) {
            this.sinkCounter.incrementConnectionFailedCount();
            String msg = String.format("Could not open table '%s' from Kudu", this.tableName);
            logger.error(msg, (Throwable)e);
            throw new FlumeException(msg, (Throwable)e);
        }
        super.start();
        this.sinkCounter.incrementConnectionCreatedCount();
        this.sinkCounter.start();
    }

    public void stop() {
        try {
            if (this.client != null) {
                this.client.shutdown();
            }
            this.client = null;
            this.table = null;
            this.session = null;
        }
        catch (Exception e) {
            throw new FlumeException("Error closing client.", (Throwable)e);
        }
        this.sinkCounter.incrementConnectionClosedCount();
        this.sinkCounter.stop();
    }

    public void configure(Context context) {
        this.masterAddresses = context.getString("masterAddresses");
        this.tableName = context.getString("tableName");
        this.batchSize = context.getLong("batchSize", DEFAULT_BATCH_SIZE);
        this.timeoutMillis = context.getLong("timeoutMillis", DEFAULT_TIMEOUT_MILLIS);
        this.ignoreDuplicateRows = context.getBoolean("ignoreDuplicateRows", Boolean.valueOf(true));
        this.eventProducerType = context.getString("producer");
        Preconditions.checkNotNull((Object)this.masterAddresses, (Object)"Master address cannot be empty, please specify 'masterAddresses' in configuration file");
        Preconditions.checkNotNull((Object)this.tableName, (Object)"Table name cannot be empty, please specify 'tableName' in configuration file");
        if (this.eventProducerType == null || this.eventProducerType.isEmpty()) {
            this.eventProducerType = DEFAULT_KUDU_EVENT_PRODUCER;
            logger.info("No Kudu event producer defined, will use default");
        }
        this.producerContext = new Context();
        this.producerContext.putAll((Map)context.getSubProperties("producer."));
        try {
            Class<?> clazz = Class.forName(this.eventProducerType);
            this.eventProducer = (KuduEventProducer)clazz.newInstance();
            this.eventProducer.configure(this.producerContext);
        }
        catch (Exception e) {
            logger.error("Could not instantiate Kudu event producer.", (Throwable)e);
            Throwables.propagate((Throwable)e);
        }
        this.sinkCounter = new SinkCounter(this.getName());
    }

    public KuduClient getClient() {
        return this.client;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Sink.Status process() throws EventDeliveryException {
        block17: {
            if (this.session.hasPendingOperations()) {
                return Sink.Status.BACKOFF;
            }
            Channel channel = this.getChannel();
            txn.begin();
            try (Transaction txn = channel.getTransaction();){
                Sink.Status status;
                Event event;
                long txnEventCount;
                for (txnEventCount = 0L; txnEventCount < this.batchSize && (event = channel.take()) != null; ++txnEventCount) {
                    this.eventProducer.initialize(event, this.table);
                    List<Operation> operations = this.eventProducer.getOperations();
                    for (Operation o : operations) {
                        this.session.apply(o);
                    }
                }
                logger.debug("Flushing {} events", (Object)txnEventCount);
                List responses = this.session.flush();
                if (responses != null) {
                    for (OperationResponse response : responses) {
                        if (!response.hasRowError()) continue;
                        throw new EventDeliveryException("Failed to flush one or more changes. Transaction rolled back: " + response.getRowError().toString());
                    }
                }
                if (txnEventCount == 0L) {
                    this.sinkCounter.incrementBatchEmptyCount();
                } else if (txnEventCount == this.batchSize) {
                    this.sinkCounter.incrementBatchCompleteCount();
                } else {
                    this.sinkCounter.incrementBatchUnderflowCount();
                }
                txn.commit();
                if (txnEventCount == 0L) {
                    status = Sink.Status.BACKOFF;
                    return status;
                }
                this.sinkCounter.addToEventDrainSuccessCount(txnEventCount);
                status = Sink.Status.READY;
                return status;
            }
        }
        return Sink.Status.BACKOFF;
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    KuduEventProducer getEventProducer() {
        return this.eventProducer;
    }
}

