/*
 * Decompiled with CFR 0.152.
 */
package org.tikv.txn;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.BytePairWrapper;
import org.tikv.common.ByteWrapper;
import org.tikv.common.TiSession;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiBatchWriteException;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.tikv.shade.com.google.protobuf.ByteString;
import org.tikv.txn.TxnKVClient;
import org.tikv.txn.type.BatchKeys;
import org.tikv.txn.type.ClientRPCResult;
import org.tikv.txn.type.GroupKeyResult;

public class TwoPhaseCommitter
implements AutoCloseable {
    private static final int WRITE_BUFFER_SIZE = 32768;
    private static final int TXN_COMMIT_BATCH_SIZE = 786432;
    private static final long DEFAULT_BATCH_WRITE_LOCK_TTL = 3600000L;
    private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitter.class);
    private final long startTs;
    private final long lockTTL;
    private final boolean retryCommitSecondaryKeys;
    private final TxnKVClient kvClient;
    private final RegionManager regionManager;
    private final long txnPrewriteBatchSize;
    private final long txnCommitBatchSize;
    private final int writeBufferSize;
    private final int writeThreadPerTask;
    private final int prewriteMaxRetryTimes;
    private final ExecutorService executorService;

    public TwoPhaseCommitter(TiSession session, long startTime) {
        this(session, startTime, 3600000L);
    }

    public TwoPhaseCommitter(TiSession session, long startTime, long lockTTL) {
        this(session, startTime, lockTTL, 786432L, 786432L, 32768, 1, true, 3, TwoPhaseCommitter.createExecutorService(32768));
    }

    TwoPhaseCommitter(TiSession session, long startTime, long lockTTL, long txnPrewriteBatchSize, long txnCommitBatchSize, int writeBufferSize, int writeThreadPerTask, boolean retryCommitSecondaryKeys, int prewriteMaxRetryTimes, ExecutorService executorService) {
        this.kvClient = session.createTxnClient();
        this.regionManager = this.kvClient.getRegionManager();
        this.startTs = startTime;
        this.lockTTL = lockTTL;
        this.retryCommitSecondaryKeys = retryCommitSecondaryKeys;
        this.txnPrewriteBatchSize = txnPrewriteBatchSize;
        this.txnCommitBatchSize = txnCommitBatchSize;
        this.writeBufferSize = writeBufferSize;
        this.writeThreadPerTask = writeThreadPerTask;
        this.prewriteMaxRetryTimes = prewriteMaxRetryTimes;
        this.executorService = executorService;
    }

    private static ExecutorService createExecutorService(int size) {
        return Executors.newFixedThreadPool(size, new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build());
    }

    @Override
    public void close() throws Exception {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    public void prewritePrimaryKey(BackOffer backOffer, byte[] primaryKey, byte[] value) throws TiBatchWriteException {
        this.doPrewritePrimaryKeyWithRetry(backOffer, ByteString.copyFrom(primaryKey), ByteString.copyFrom(value));
    }

    private void doPrewritePrimaryKeyWithRetry(BackOffer backOffer, ByteString key, ByteString value) throws TiBatchWriteException {
        long lockTTL;
        Pair<TiRegion, TiStore> pair = this.regionManager.getRegionStorePairByKey(key, backOffer);
        TiRegion tiRegion = (TiRegion)pair.first;
        TiStore store = (TiStore)pair.second;
        Kvrpcpb.Mutation mutation = !value.isEmpty() ? Kvrpcpb.Mutation.newBuilder().setKey(key).setValue(value).setOp(Kvrpcpb.Op.Put).build() : Kvrpcpb.Mutation.newBuilder().setKey(key).setOp(Kvrpcpb.Op.Del).build();
        List<Kvrpcpb.Mutation> mutationList = Collections.singletonList(mutation);
        ClientRPCResult prewriteResult = this.kvClient.prewrite(backOffer, mutationList, key, lockTTL = this.getTxnLockTTL(this.startTs), this.startTs, tiRegion, store);
        if (!prewriteResult.isSuccess() && !prewriteResult.isRetry()) {
            throw new TiBatchWriteException("prewrite primary key error", prewriteResult.getException());
        }
        if (prewriteResult.isRetry()) {
            try {
                backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(String.format("Txn prewrite primary key failed, regionId=%s", tiRegion.getId()), prewriteResult.getException()));
                this.doPrewritePrimaryKeyWithRetry(backOffer, key, value);
            }
            catch (GrpcException e) {
                String errorMsg = String.format("Txn prewrite primary key error, re-split commit failed, regionId=%s, detail=%s", tiRegion.getId(), e.getMessage());
                throw new TiBatchWriteException(errorMsg, e);
            }
        }
        LOG.info("prewrite primary key {} successfully", (Object)KeyUtils.formatBytes(key));
    }

    public void commitPrimaryKey(BackOffer backOffer, byte[] key, long commitTs) throws TiBatchWriteException {
        this.doCommitPrimaryKeyWithRetry(backOffer, ByteString.copyFrom(key), commitTs);
    }

    private void doCommitPrimaryKeyWithRetry(BackOffer backOffer, ByteString key, long commitTs) throws TiBatchWriteException {
        Pair<TiRegion, TiStore> pair = this.regionManager.getRegionStorePairByKey(key, backOffer);
        TiRegion tiRegion = (TiRegion)pair.first;
        TiStore store = (TiStore)pair.second;
        ByteString[] keys = new ByteString[]{key};
        ClientRPCResult commitResult = this.kvClient.commit(backOffer, keys, this.startTs, commitTs, tiRegion, store);
        if (!commitResult.isSuccess()) {
            if (!commitResult.isRetry()) {
                throw new TiBatchWriteException("commit primary key error", commitResult.getException());
            }
            backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(String.format("Txn commit primary key failed, regionId=%s", tiRegion.getId()), commitResult.getException()));
            this.doCommitPrimaryKeyWithRetry(backOffer, key, commitTs);
        }
        LOG.info("commit primary key {} successfully", (Object)KeyUtils.formatBytes(key));
    }

    public void prewriteSecondaryKeys(byte[] primaryKey, final Iterator<BytePairWrapper> pairs, int maxBackOfferMS) throws TiBatchWriteException {
        Iterator<Pair<ByteString, ByteString>> byteStringKeys = new Iterator<Pair<ByteString, ByteString>>(){

            @Override
            public boolean hasNext() {
                return pairs.hasNext();
            }

            @Override
            public Pair<ByteString, ByteString> next() {
                BytePairWrapper pair = (BytePairWrapper)pairs.next();
                return new Pair<ByteString, ByteString>(ByteString.copyFrom(pair.getKey()), ByteString.copyFrom(pair.getValue()));
            }
        };
        this.doPrewriteSecondaryKeys(ByteString.copyFrom(primaryKey), byteStringKeys, maxBackOfferMS);
    }

    private void doPrewriteSecondaryKeys(ByteString primaryKey, Iterator<Pair<ByteString, ByteString>> pairs, int maxBackOfferMS) throws TiBatchWriteException {
        try {
            int taskBufferSize = this.writeThreadPerTask * 2;
            int totalSize = 0;
            int cnt = 0;
            ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<Void>(this.executorService);
            while (pairs.hasNext()) {
                int size;
                ByteString[] keyBytes = new ByteString[this.writeBufferSize];
                ByteString[] valueBytes = new ByteString[this.writeBufferSize];
                for (size = 0; size < this.writeBufferSize && pairs.hasNext(); ++size) {
                    Pair<ByteString, ByteString> pair = pairs.next();
                    keyBytes[size] = (ByteString)pair.first;
                    valueBytes[size] = (ByteString)pair.second;
                }
                int curSize = size;
                if (++cnt > taskBufferSize) {
                    completionService.take().get();
                }
                ConcreteBackOffer backOffer = ConcreteBackOffer.newCustomBackOff(maxBackOfferMS);
                completionService.submit(() -> {
                    this.doPrewriteSecondaryKeysInBatchesWithRetry(backOffer, primaryKey, keyBytes, valueBytes, curSize, 0);
                    return null;
                });
                totalSize += size;
            }
            for (int i = 0; i < Math.min(taskBufferSize, cnt); ++i) {
                completionService.take().get();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new TiBatchWriteException("Current thread interrupted.", e);
        }
        catch (ExecutionException e) {
            throw new TiBatchWriteException("Execution exception met.", e);
        }
    }

    private void doPrewriteSecondaryKeysInBatchesWithRetry(BackOffer backOffer, ByteString primaryKey, ByteString[] keys, ByteString[] values, int size, int level) throws TiBatchWriteException {
        if (keys == null || keys.length == 0 || values == null || values.length == 0 || size <= 0) {
            return;
        }
        LinkedHashMap<ByteString, Kvrpcpb.Mutation> mutations = new LinkedHashMap<ByteString, Kvrpcpb.Mutation>();
        for (int i = 0; i < size; ++i) {
            ByteString key = keys[i];
            ByteString value = values[i];
            Kvrpcpb.Mutation mutation = !value.isEmpty() ? Kvrpcpb.Mutation.newBuilder().setKey(key).setValue(value).setOp(Kvrpcpb.Op.Put).build() : Kvrpcpb.Mutation.newBuilder().setKey(key).setOp(Kvrpcpb.Op.Del).build();
            mutations.put(key, mutation);
        }
        GroupKeyResult groupResult = this.groupKeysByRegion(keys, size, backOffer);
        LinkedList<BatchKeys> batchKeyList = new LinkedList<BatchKeys>();
        Map<Pair<TiRegion, TiStore>, List<ByteString>> groupKeyMap = groupResult.getGroupsResult();
        for (Map.Entry<Pair<TiRegion, TiStore>, List<ByteString>> entry : groupKeyMap.entrySet()) {
            TiRegion tiRegion = (TiRegion)entry.getKey().first;
            TiStore store = (TiStore)entry.getKey().second;
            this.appendBatchBySize(batchKeyList, tiRegion, store, entry.getValue(), true, mutations);
        }
        for (BatchKeys batchKeys : batchKeyList) {
            TiRegion currentRegion;
            TiRegion oldRegion = batchKeys.getRegion();
            if (oldRegion.equals(currentRegion = this.regionManager.getRegionByKey(oldRegion.getStartKey(), backOffer))) {
                this.doPrewriteSecondaryKeySingleBatchWithRetry(backOffer, primaryKey, batchKeys, mutations);
                continue;
            }
            if (level > this.prewriteMaxRetryTimes) {
                throw new TiBatchWriteException(String.format("> max retry number %s, oldRegion=%s, currentRegion=%s", this.prewriteMaxRetryTimes, oldRegion, currentRegion));
            }
            LOG.info(String.format("oldRegion=%s != currentRegion=%s, will re-fetch region info and retry", oldRegion, currentRegion));
            this.retryPrewriteBatch(backOffer, primaryKey, batchKeys, mutations, level <= 0 ? 1 : level + 1);
        }
    }

    private void retryPrewriteBatch(BackOffer backOffer, ByteString primaryKey, BatchKeys batchKeys, Map<ByteString, Kvrpcpb.Mutation> mutations, int level) {
        int size = batchKeys.getKeys().size();
        ByteString[] keyBytes = new ByteString[size];
        ByteString[] valueBytes = new ByteString[size];
        int i = 0;
        Iterator<ByteString> iterator = batchKeys.getKeys().iterator();
        while (iterator.hasNext()) {
            ByteString k;
            keyBytes[i] = k = iterator.next();
            valueBytes[i] = mutations.get(k).getValue();
            ++i;
        }
        this.doPrewriteSecondaryKeysInBatchesWithRetry(backOffer, primaryKey, keyBytes, valueBytes, size, level);
    }

    private void doPrewriteSecondaryKeySingleBatchWithRetry(BackOffer backOffer, ByteString primaryKey, BatchKeys batchKeys, Map<ByteString, Kvrpcpb.Mutation> mutations) throws TiBatchWriteException {
        LOG.debug("start prewrite secondary key, row={}, size={}KB, regionId={}", new Object[]{batchKeys.getKeys().size(), Float.valueOf(batchKeys.getSizeInKB()), batchKeys.getRegion().getId()});
        List<ByteString> keyList = batchKeys.getKeys();
        int batchSize = keyList.size();
        ArrayList<Kvrpcpb.Mutation> mutationList = new ArrayList<Kvrpcpb.Mutation>(batchSize);
        for (ByteString key : keyList) {
            mutationList.add(mutations.get(key));
        }
        int txnSize = batchKeys.getKeys().size();
        long lockTTL = this.getTxnLockTTL(this.startTs, txnSize);
        ClientRPCResult prewriteResult = this.kvClient.prewrite(backOffer, mutationList, primaryKey, lockTTL, this.startTs, batchKeys.getRegion(), batchKeys.getStore());
        if (!prewriteResult.isSuccess() && !prewriteResult.isRetry()) {
            throw new TiBatchWriteException("prewrite secondary key error", prewriteResult.getException());
        }
        if (prewriteResult.isRetry()) {
            LOG.info("prewrite secondary key fail, will backoff and retry");
            try {
                backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(String.format("Txn prewrite secondary key SingleBatch failed, regionId=%s", batchKeys.getRegion().getId()), prewriteResult.getException()));
                this.retryPrewriteBatch(backOffer, primaryKey, batchKeys, mutations, 0);
            }
            catch (GrpcException e) {
                String errorMsg = String.format("Txn prewrite secondary key SingleBatch error, re-split commit failed, regionId=%s, detail=%s", batchKeys.getRegion().getId(), e.getMessage());
                throw new TiBatchWriteException(errorMsg, e);
            }
        }
        LOG.debug("prewrite secondary key successfully, row={}, size={}KB, regionId={}", new Object[]{batchKeys.getKeys().size(), Float.valueOf(batchKeys.getSizeInKB()), batchKeys.getRegion().getId()});
    }

    private void appendBatchBySize(List<BatchKeys> batchKeyList, TiRegion tiRegion, TiStore store, List<ByteString> keys, boolean sizeIncludeValue, Map<ByteString, Kvrpcpb.Mutation> mutations) {
        long commitBatchSize;
        long l = commitBatchSize = sizeIncludeValue ? this.txnPrewriteBatchSize : this.txnCommitBatchSize;
        if (keys == null) {
            return;
        }
        int len = keys.size();
        int start = 0;
        while (start < len) {
            int end;
            int sizeInBytes = 0;
            for (end = start; end < len && (long)sizeInBytes < commitBatchSize; ++end) {
                sizeInBytes = sizeIncludeValue ? (int)((long)sizeInBytes + this.keyValueSize(keys.get(end), mutations)) : (int)((long)sizeInBytes + this.keySize(keys.get(end)));
            }
            BatchKeys batchKeys = new BatchKeys(tiRegion, store, keys.subList(start, end), sizeInBytes);
            batchKeyList.add(batchKeys);
            start = end;
        }
    }

    private long keyValueSize(ByteString key, Map<ByteString, Kvrpcpb.Mutation> mutations) {
        long size = key.size();
        Kvrpcpb.Mutation mutation = mutations.get(key);
        if (mutation != null) {
            size += (long)mutation.getValue().toByteArray().length;
        }
        return size;
    }

    private long keySize(ByteString key) {
        return key.size();
    }

    public void commitSecondaryKeys(final Iterator<ByteWrapper> keys, long commitTs, int commitBackOfferMS) throws TiBatchWriteException {
        Iterator<ByteString> byteStringKeys = new Iterator<ByteString>(){

            @Override
            public boolean hasNext() {
                return keys.hasNext();
            }

            @Override
            public ByteString next() {
                return ByteString.copyFrom(((ByteWrapper)keys.next()).getBytes());
            }
        };
        this.doCommitSecondaryKeys(byteStringKeys, commitTs, commitBackOfferMS);
    }

    private void doCommitSecondaryKeys(Iterator<ByteString> keys, long commitTs, int commitBackOfferMS) throws TiBatchWriteException {
        try {
            int taskBufferSize = this.writeThreadPerTask * 2;
            int totalSize = 0;
            int cnt = 0;
            ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<Void>(this.executorService);
            while (keys.hasNext()) {
                int size;
                ByteString[] keyBytes = new ByteString[this.writeBufferSize];
                for (size = 0; size < this.writeBufferSize && keys.hasNext(); ++size) {
                    keyBytes[size] = keys.next();
                }
                int curSize = size;
                if (++cnt > taskBufferSize) {
                    completionService.take().get();
                }
                ConcreteBackOffer backOffer = ConcreteBackOffer.newCustomBackOff(commitBackOfferMS);
                completionService.submit(() -> {
                    this.doCommitSecondaryKeysWithRetry(backOffer, keyBytes, curSize, commitTs);
                    return null;
                });
                totalSize += size;
            }
            for (int i = 0; i < Math.min(taskBufferSize, cnt); ++i) {
                completionService.take().get();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new TiBatchWriteException("Current thread interrupted.", e);
        }
        catch (ExecutionException e) {
            throw new TiBatchWriteException("Execution exception met.", e);
        }
    }

    private void doCommitSecondaryKeysWithRetry(BackOffer backOffer, ByteString[] keys, int size, long commitTs) throws TiBatchWriteException {
        if (keys == null || keys.length == 0 || size <= 0) {
            return;
        }
        GroupKeyResult groupResult = this.groupKeysByRegion(keys, size, backOffer);
        ArrayList<BatchKeys> batchKeyList = new ArrayList<BatchKeys>();
        Map<Pair<TiRegion, TiStore>, List<ByteString>> groupKeyMap = groupResult.getGroupsResult();
        for (Map.Entry<Pair<TiRegion, TiStore>, List<ByteString>> entry : groupKeyMap.entrySet()) {
            TiRegion tiRegion = (TiRegion)entry.getKey().first;
            TiStore store = (TiStore)entry.getKey().second;
            this.appendBatchBySize(batchKeyList, tiRegion, store, entry.getValue(), false, null);
        }
        for (BatchKeys batchKeys : batchKeyList) {
            this.doCommitSecondaryKeySingleBatchWithRetry(backOffer, batchKeys, commitTs);
        }
    }

    private void doCommitSecondaryKeySingleBatchWithRetry(BackOffer backOffer, BatchKeys batchKeys, long commitTs) throws TiBatchWriteException {
        LOG.info("start commit secondary key, row={}, size={}KB, regionId={}", new Object[]{batchKeys.getKeys().size(), Float.valueOf(batchKeys.getSizeInKB()), batchKeys.getRegion().getId()});
        List<ByteString> keysCommit = batchKeys.getKeys();
        ByteString[] keys = new ByteString[keysCommit.size()];
        keysCommit.toArray(keys);
        ClientRPCResult commitResult = this.kvClient.commit(backOffer, keys, this.startTs, commitTs, batchKeys.getRegion(), batchKeys.getStore());
        if (this.retryCommitSecondaryKeys && commitResult.isRetry()) {
            this.doCommitSecondaryKeysWithRetry(backOffer, keys, keysCommit.size(), commitTs);
        } else if (!commitResult.isSuccess()) {
            String error = String.format("Txn commit secondary key error, regionId=%s", batchKeys.getRegion());
            LOG.warn(error);
            throw new TiBatchWriteException("commit secondary key error", commitResult.getException());
        }
        LOG.info("commit {} rows successfully, size={}KB, regionId={}", new Object[]{batchKeys.getKeys().size(), Float.valueOf(batchKeys.getSizeInKB()), batchKeys.getRegion().getId()});
    }

    private GroupKeyResult groupKeysByRegion(ByteString[] keys, int size, BackOffer backOffer) throws TiBatchWriteException {
        HashMap<Pair<TiRegion, TiStore>, List<ByteString>> groups = new HashMap<Pair<TiRegion, TiStore>, List<ByteString>>();
        try {
            for (int index = 0; index < size; ++index) {
                ByteString key = keys[index];
                Pair<TiRegion, TiStore> pair = this.regionManager.getRegionStorePairByKey(key, backOffer);
                if (pair == null) continue;
                groups.computeIfAbsent(pair, e -> new ArrayList()).add(key);
            }
        }
        catch (Exception e2) {
            throw new TiBatchWriteException("Txn groupKeysByRegion error", e2);
        }
        GroupKeyResult result = new GroupKeyResult();
        result.setGroupsResult(groups);
        return result;
    }

    private long getTxnLockTTL(long startTime) {
        return this.lockTTL;
    }

    private long getTxnLockTTL(long startTime, int txnSize) {
        return this.lockTTL;
    }
}

