package org.joyqueue.broker.kafka.coordinator.transaction.synchronizer;

import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.broker.cluster.ClusterNameService;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.transaction.TransactionIdManager;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionMarker;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionMetadata;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionOffset;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionPrepare;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionState;
import org.joyqueue.broker.kafka.coordinator.transaction.log.TransactionLog;
import org.joyqueue.network.transport.session.session.TransportSessionManager;
import org.joyqueue.toolkit.service.Service;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/kafka/coordinator/transaction/synchronizer/TransactionSynchronizer.class */
public class TransactionSynchronizer extends Service {
    protected static final Logger logger = LoggerFactory.getLogger(TransactionSynchronizer.class);
    private KafkaConfig config;
    private TransactionIdManager transactionIdManager;
    private TransactionLog transactionLog;
    private TransportSessionManager sessionManager;
    private ClusterNameService clusterNameService;
    private TransactionCommitSynchronizer transactionCommitSynchronizer;
    private TransactionAbortSynchronizer transactionAbortSynchronizer;

    public TransactionSynchronizer(KafkaConfig kafkaConfig, TransactionIdManager transactionIdManager, TransactionLog transactionLog, TransportSessionManager transportSessionManager, ClusterNameService clusterNameService) {
        this.config = kafkaConfig;
        this.transactionIdManager = transactionIdManager;
        this.transactionLog = transactionLog;
        this.sessionManager = transportSessionManager;
        this.clusterNameService = clusterNameService;
    }

    protected void validate() throws Exception {
        this.transactionCommitSynchronizer = new TransactionCommitSynchronizer(this.config, this.sessionManager, this.transactionIdManager, this.clusterNameService);
        this.transactionAbortSynchronizer = new TransactionAbortSynchronizer(this.config, this.sessionManager, this.transactionIdManager);
    }

    protected void doStart() throws Exception {
        this.transactionCommitSynchronizer.start();
        this.transactionAbortSynchronizer.start();
    }

    protected void doStop() {
        if (this.transactionCommitSynchronizer != null) {
            this.transactionCommitSynchronizer.stop();
        }
        if (this.transactionAbortSynchronizer != null) {
            this.transactionAbortSynchronizer.stop();
        }
    }

    public boolean prepare(TransactionMetadata transactionMetadata, Set<TransactionPrepare> set) throws Exception {
        return this.transactionLog.batchWrite(transactionMetadata.getApp(), transactionMetadata.getId(), set);
    }

    public boolean prepareCommit(TransactionMetadata transactionMetadata, Set<TransactionPrepare> set) throws Exception {
        return writeMarker(transactionMetadata, TransactionState.PREPARE_COMMIT);
    }

    public boolean commit(TransactionMetadata transactionMetadata, Set<TransactionPrepare> set, Set<TransactionOffset> set2) throws Exception {
        if (tryCommit(transactionMetadata, set, set2)) {
            return writeMarker(transactionMetadata, TransactionState.COMPLETE_COMMIT);
        }
        return false;
    }

    public boolean tryCommit(TransactionMetadata transactionMetadata, Set<TransactionPrepare> set, Set<TransactionOffset> set2) throws Exception {
        boolean z = true;
        if (CollectionUtils.isNotEmpty(set)) {
            z = this.transactionCommitSynchronizer.commitPrepare(transactionMetadata, set);
        }
        if (z && CollectionUtils.isNotEmpty(set2)) {
            z = this.transactionCommitSynchronizer.commitOffsets(transactionMetadata, set2);
        }
        return z;
    }

    public boolean prepareAbort(TransactionMetadata transactionMetadata, Set<TransactionPrepare> set) throws Exception {
        return writeMarker(transactionMetadata, TransactionState.PREPARE_ABORT);
    }

    public boolean abort(TransactionMetadata transactionMetadata, Set<TransactionPrepare> set) throws Exception {
        return tryAbort(transactionMetadata, set) && writeMarker(transactionMetadata, TransactionState.COMPLETE_ABORT);
    }

    public boolean tryAbort(TransactionMetadata transactionMetadata, Set<TransactionPrepare> set) throws Exception {
        return this.transactionAbortSynchronizer.abort(transactionMetadata, set);
    }

    public boolean commitOffset(TransactionMetadata transactionMetadata, Set<TransactionOffset> set) throws Exception {
        return this.transactionLog.batchWrite(transactionMetadata.getApp(), transactionMetadata.getId(), set);
    }

    protected boolean writeMarker(TransactionMetadata transactionMetadata, TransactionState transactionState) throws Exception {
        return this.transactionLog.write(transactionMetadata.getApp(), transactionMetadata.getId(), convertMarker(transactionMetadata, transactionState));
    }

    protected TransactionMarker convertMarker(TransactionMetadata transactionMetadata, TransactionState transactionState) {
        return new TransactionMarker(transactionMetadata.getApp(), transactionMetadata.getId(), transactionMetadata.getProducerId(), transactionMetadata.getProducerEpoch(), transactionMetadata.getEpoch(), transactionState, transactionMetadata.getTimeout(), SystemClock.now());
    }
}
