package org.joyqueue.broker.kafka.coordinator.transaction.log;

import com.google.common.collect.Lists;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.broker.buffer.Serializer;
import org.joyqueue.broker.consumer.Consume;
import org.joyqueue.broker.consumer.model.PullResult;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionDomain;
import org.joyqueue.broker.kafka.coordinator.transaction.helper.TransactionSerializer;
import org.joyqueue.broker.producer.Produce;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.network.session.Producer;
import org.joyqueue.toolkit.network.IpUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/kafka/coordinator/transaction/log/TransactionLogSegment.class */
public class TransactionLogSegment {
    protected static final Logger logger = LoggerFactory.getLogger(TransactionLogSegment.class);
    private final byte[] LOCAL_IP = IpUtil.getLocalIp().getBytes();
    private KafkaConfig config;
    private String topic;
    private short partition;
    private Produce produce;
    private Consume consume;
    private Consumer consumer;
    private Producer producer;

    public TransactionLogSegment(KafkaConfig kafkaConfig, String str, short s, Produce produce, Consume consume, Producer producer, Consumer consumer) {
        this.config = kafkaConfig;
        this.topic = str;
        this.partition = s;
        this.produce = produce;
        this.consume = consume;
        this.producer = producer;
        this.consumer = consumer;
    }

    public long getIndex() {
        long ackIndex = this.consume.getAckIndex(this.consumer, this.partition);
        if (ackIndex < 0) {
            ackIndex = 0;
        }
        return ackIndex;
    }

    public List<TransactionDomain> read(long j, int i) throws Exception {
        List<ByteBuffer> doRead = doRead(this.partition, j, i);
        if (CollectionUtils.isEmpty(doRead)) {
            return Collections.emptyList();
        }
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(doRead.size());
        Iterator<ByteBuffer> it = doRead.iterator();
        while (it.hasNext()) {
            newArrayListWithCapacity.add(TransactionSerializer.deserialize(it.next()));
        }
        return newArrayListWithCapacity;
    }

    public void saveIndex(long j) throws Exception {
        this.consume.setAckIndex(this.consumer, this.partition, j);
    }

    protected List<ByteBuffer> doRead(short s, long j, int i) throws Exception {
        PullResult message = this.consume.getMessage(this.consumer, s, j, i);
        if (!message.getCode().equals(JoyQueueCode.SUCCESS)) {
            logger.error("read transaction log exception, partition: {}, index: {}, count: {}", new Object[]{Short.valueOf(s), Long.valueOf(j), Integer.valueOf(i), message.getCode()});
            return Collections.emptyList();
        }
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(message.getBuffers().size());
        Iterator it = message.getBuffers().iterator();
        while (it.hasNext()) {
            newArrayListWithCapacity.add(Serializer.readBrokerMessage((ByteBuffer) it.next()).getBody());
        }
        return newArrayListWithCapacity;
    }

    public boolean batchWrite(String str, String str2, List<byte[]> list) throws Exception {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(list.size());
        Iterator<byte[]> it = list.iterator();
        while (it.hasNext()) {
            newArrayListWithCapacity.add(convertMessage(it.next()));
        }
        this.produce.putMessage(this.producer, newArrayListWithCapacity, this.config.getTransactionLogWriteQosLevel(), this.config.getTransactionSyncTimeout());
        return true;
    }

    public boolean write(String str, String str2, byte[] bArr) throws Exception {
        this.produce.putMessage(this.producer, Lists.newArrayList(new BrokerMessage[]{convertMessage(bArr)}), this.config.getTransactionLogWriteQosLevel(), this.config.getTransactionSyncTimeout());
        return true;
    }

    protected BrokerMessage convertMessage(byte[] bArr) {
        BrokerMessage brokerMessage = new BrokerMessage();
        brokerMessage.setTopic(this.topic);
        brokerMessage.setApp(this.producer.getApp());
        brokerMessage.setBody(bArr);
        brokerMessage.setClientIp(this.LOCAL_IP);
        brokerMessage.setPartition(this.partition);
        return brokerMessage;
    }

    public short getPartition() {
        return this.partition;
    }
}
