/*
 * Decompiled with CFR 0.152.
 */
package com.volcengine.service.tls.consumer;

import com.volcengine.model.tls.consumer.CheckpointInfo;
import com.volcengine.model.tls.consumer.ConsumeShard;
import com.volcengine.model.tls.consumer.ConsumerConfig;
import com.volcengine.model.tls.consumer.ConsumerStatus;
import com.volcengine.model.tls.exception.LogException;
import com.volcengine.model.tls.pb.PutLogRequest;
import com.volcengine.model.tls.request.ConsumeLogsRequest;
import com.volcengine.model.tls.request.DescribeCheckpointRequest;
import com.volcengine.model.tls.request.DescribeCursorRequest;
import com.volcengine.model.tls.response.CommonResponse;
import com.volcengine.model.tls.response.ConsumeLogsResponse;
import com.volcengine.model.tls.response.DescribeCheckpointResponse;
import com.volcengine.model.tls.response.DescribeCursorResponse;
import com.volcengine.service.tls.TLSLogClient;
import com.volcengine.service.tls.consumer.CheckpointTracker;
import com.volcengine.service.tls.consumer.ConsumerImpl;
import com.volcengine.service.tls.consumer.ConsumerUtil;
import com.volcengine.service.tls.consumer.LogProcessor;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class LogConsumer {
    private static final Log LOG = LogFactory.getLog(LogConsumer.class);
    public static final int BACK_OFF_CODE = 429;
    private ConsumerConfig consumerConfig;
    private TLSLogClient tlsClient;
    private LogProcessor logProcessor;
    private ExecutorService executor;
    private ConsumerStatus status;
    private final ConsumeShard shard;
    private String nextCheckpoint;
    private PutLogRequest.LogGroupList currLogGroupList;
    private LocalDateTime lastBackoffTime;
    private ReentrantReadWriteLock statusLock;
    private CheckpointTracker checkpointTracker;
    private volatile boolean isTaskFinished;

    public LogConsumer(ConsumerImpl consumer, ConsumeShard consumeShard) {
        this.consumerConfig = consumer.consumerConfig;
        this.tlsClient = consumer.tlsClient;
        this.logProcessor = consumer.logProcessor;
        this.executor = consumer.executor;
        this.status = ConsumerStatus.PENDING;
        this.shard = consumeShard;
        this.statusLock = new ReentrantReadWriteLock();
        this.checkpointTracker = new CheckpointTracker(consumer, consumeShard);
    }

    public void run() {
        switch (this.loadStatus()) {
            case PENDING: {
                this.setStatus(ConsumerStatus.INITIALIZING);
                this.executor.execute(() -> {
                    this.isTaskFinished = false;
                    try {
                        this.init();
                        this.setStatus(ConsumerStatus.READY_TO_FETCH);
                    }
                    catch (Exception e) {
                        LOG.error((Object)"Init log consumer failed!", (Throwable)e);
                        this.setStatus(ConsumerStatus.PENDING);
                    }
                    finally {
                        this.isTaskFinished = true;
                    }
                });
                break;
            }
            case READY_TO_FETCH: {
                this.setStatus(ConsumerStatus.FETCHING);
                this.executor.execute(() -> {
                    block12: {
                        this.isTaskFinished = false;
                        try {
                            this.fetchData();
                            this.setStatus(ConsumerStatus.READY_TO_CONSUME);
                        }
                        catch (LogException e) {
                            if (e.getErrorMessage().contains("ConsumerHeartbeatExpired")) {
                                try {
                                    this.checkpointTracker.uploadCheckpoint();
                                }
                                catch (Exception ex) {
                                    LOG.error((Object)"Upload checkpoint failed when consumer expired.", (Throwable)ex);
                                }
                                this.checkpointTracker.stop();
                                this.setStatus(ConsumerStatus.WAIT_FOR_RESTART);
                                break block12;
                            }
                            if (e.getHttpCode() == 429) {
                                this.setStatus(ConsumerStatus.BACKOFF);
                            } else {
                                LOG.error((Object)"Fetch log data failed!", (Throwable)e);
                                this.setStatus(ConsumerStatus.READY_TO_FETCH);
                            }
                        }
                        catch (Exception e) {
                            LOG.error((Object)"Fetch data failed!", (Throwable)e);
                            this.setStatus(ConsumerStatus.READY_TO_FETCH);
                        }
                        finally {
                            this.isTaskFinished = true;
                        }
                    }
                });
                break;
            }
            case READY_TO_CONSUME: {
                this.setStatus(ConsumerStatus.CONSUMING);
                this.executor.execute(() -> {
                    this.isTaskFinished = false;
                    try {
                        this.consume();
                        this.setStatus(ConsumerStatus.READY_TO_FETCH);
                    }
                    catch (Exception e) {
                        LOG.error((Object)"consume error.", (Throwable)e);
                        this.setStatus(ConsumerStatus.READY_TO_CONSUME);
                    }
                    finally {
                        this.isTaskFinished = true;
                    }
                });
                break;
            }
            case BACKOFF: {
                if (this.backoff()) {
                    this.setStatus(ConsumerStatus.BACKOFF);
                    break;
                }
                this.setStatus(ConsumerStatus.READY_TO_FETCH);
                break;
            }
        }
    }

    public void setStatus(ConsumerStatus status) {
        this.statusLock.writeLock().lock();
        this.status = status;
        this.statusLock.writeLock().unlock();
    }

    public ConsumerStatus loadStatus() {
        this.statusLock.readLock().lock();
        ConsumerStatus status = this.status;
        this.statusLock.readLock().unlock();
        return status;
    }

    public void stop() {
        int times = 0;
        while (!this.isTaskFinished && times++ < this.consumerConfig.getStopTimeout()) {
            ConsumerUtil.sleep(1000L);
            LOG.debug((Object)("LogConsumer stop failed " + (Object)((Object)this.status) + " times: " + times));
        }
        this.checkpointTracker.stop();
        try {
            this.checkpointTracker.uploadCheckpoint();
        }
        catch (Exception ex) {
            LOG.error((Object)"Upload checkpoint failed when received stop signal.", (Throwable)ex);
        }
    }

    private void init() throws LogException {
        CommonResponse resp;
        Object req;
        this.checkpointTracker.start();
        String projectID = this.consumerConfig.getProjectID();
        String topicID = this.shard.getTopicID();
        int shardID = this.shard.getShardID();
        String consumerGroupName = this.consumerConfig.getConsumerGroupName();
        try {
            req = new DescribeCheckpointRequest(projectID, topicID, shardID, consumerGroupName);
            resp = this.tlsClient.describeCheckPoint((DescribeCheckpointRequest)req);
            if (!((DescribeCheckpointResponse)resp).getCheckpoint().isEmpty()) {
                this.nextCheckpoint = ((DescribeCheckpointResponse)resp).getCheckpoint();
                return;
            }
        }
        catch (LogException e) {
            LOG.error((Object)"Initializing log consumer failed in getting checkpoint.");
            throw e;
        }
        try {
            req = new DescribeCursorRequest(topicID, shardID, this.consumerConfig.getConsumeFrom());
            resp = this.tlsClient.describeCursor((DescribeCursorRequest)req);
            this.nextCheckpoint = ((DescribeCursorResponse)resp).getCursor();
        }
        catch (LogException e) {
            LOG.error((Object)"Initializing log consumer failed in getting cursor.");
            throw e;
        }
    }

    private void fetchData() throws LogException {
        this.lastBackoffTime = LocalDateTime.now();
        ConsumeLogsRequest req = new ConsumeLogsRequest();
        req.setTopicId(this.shard.getTopicID());
        req.setShardId(this.shard.getShardID());
        req.setCursor(this.nextCheckpoint);
        req.setLogGroupCount(this.consumerConfig.getMaxFetchLogGroupCount());
        req.setCompression(this.consumerConfig.getCompressType());
        req.setConsumerGroupName(this.consumerConfig.getConsumerGroupName());
        req.setConsumerName(this.consumerConfig.getConsumerName());
        req.setOrigin(this.consumerConfig.isOrigin());
        ConsumeLogsResponse resp = this.tlsClient.consumeLogs(req);
        this.currLogGroupList = resp.getLogGroupList();
        this.nextCheckpoint = resp.getXTlsCursor();
    }

    private void consume() {
        if (this.currLogGroupList == null || this.currLogGroupList.getLogGroupsCount() == 0) {
            return;
        }
        this.logProcessor.process(this.shard.getTopicID(), this.shard.getShardID(), this.currLogGroupList);
        this.checkpointTracker.setCheckpoint(new CheckpointInfo(this.nextCheckpoint, this.shard));
    }

    private boolean backoff() {
        Duration duration = Duration.between(this.lastBackoffTime, LocalDateTime.now());
        return duration.compareTo(Duration.ofSeconds(5L)) <= 0;
    }
}

