/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.dataproxy.channel;

import com.google.common.base.Preconditions;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.channel.AbstractChannel;
import org.apache.inlong.dataproxy.channel.ProxyTransaction;
import org.apache.inlong.dataproxy.utils.BufferQueue;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BufferQueueChannel
extends AbstractChannel {
    public static final Logger LOG = LoggerFactory.getLogger(BufferQueueChannel.class);
    public static final String KEY_MAX_BUFFERQUEUE_COUNT = "maxBufferQueueCount";
    public static final int DEFAULT_MAX_BUFFERQUEUE_COUNT = 131072;
    public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb";
    public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 131072;
    public static final String KEY_RELOADINTERVAL = "reloadInterval";
    private Context context;
    private int maxBufferQueueCount;
    private Semaphore countSemaphore;
    private int maxBufferQueueSizeKb;
    private BufferQueue<ProxyEvent> bufferQueue;
    private ThreadLocal<ProxyTransaction> currentTransaction = new ThreadLocal();
    protected Timer channelTimer;
    private AtomicLong takeCounter = new AtomicLong(0L);
    private AtomicLong putCounter = new AtomicLong(0L);

    public void put(Event event) throws ChannelException {
        if (event instanceof ProxyEvent) {
            this.putCounter.incrementAndGet();
            int eventSize = event.getBody().length;
            this.countSemaphore.acquireUninterruptibly();
            this.bufferQueue.acquire(eventSize);
            ProxyTransaction transaction = this.currentTransaction.get();
            Preconditions.checkState((transaction != null ? 1 : 0) != 0, (Object)"No transaction exists for this thread");
            ProxyEvent profile = (ProxyEvent)event;
            transaction.doPut(profile);
        }
    }

    public Event take() throws ChannelException {
        ProxyEvent event = this.bufferQueue.pollRecord();
        if (event != null) {
            ProxyTransaction transaction = this.currentTransaction.get();
            Preconditions.checkState((transaction != null ? 1 : 0) != 0, (Object)"No transaction exists for this thread");
            transaction.doTake(event);
            this.takeCounter.incrementAndGet();
        }
        return event;
    }

    public Transaction getTransaction() {
        ProxyTransaction newTransaction = new ProxyTransaction(this.countSemaphore, this.bufferQueue);
        this.currentTransaction.set(newTransaction);
        return newTransaction;
    }

    public void start() {
        super.start();
        try {
            this.setReloadTimer();
        }
        catch (Exception e) {
            LOG.error(e.getMessage(), (Throwable)e);
        }
    }

    protected void setReloadTimer() {
        this.channelTimer = new Timer(true);
        long reloadInterval = this.context.getLong(KEY_RELOADINTERVAL, Long.valueOf(60000L));
        TimerTask channelTask = new TimerTask(){

            @Override
            public void run() {
                LOG.info("queueSize:{},availablePermits:{},maxBufferQueueCount:{},availablePermits:{},put:{},take:{}", new Object[]{BufferQueueChannel.this.bufferQueue.size(), BufferQueueChannel.this.bufferQueue.availablePermits(), BufferQueueChannel.this.maxBufferQueueCount, BufferQueueChannel.this.countSemaphore.availablePermits(), BufferQueueChannel.this.putCounter.getAndSet(0L), BufferQueueChannel.this.takeCounter.getAndSet(0L)});
            }
        };
        this.channelTimer.schedule(channelTask, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval);
    }

    public void configure(Context context) {
        this.context = context;
        this.maxBufferQueueCount = context.getInteger(KEY_MAX_BUFFERQUEUE_COUNT, Integer.valueOf(131072));
        this.countSemaphore = new Semaphore(this.maxBufferQueueCount, true);
        this.maxBufferQueueSizeKb = context.getInteger(KEY_MAX_BUFFERQUEUE_SIZE_KB, Integer.valueOf(131072));
        this.bufferQueue = new BufferQueue(this.maxBufferQueueSizeKb);
    }
}

