/*
 * Decompiled with CFR 0.152.
 */
package com.vesoft.nebula.client.storage.scan;

import com.facebook.thrift.TException;
import com.vesoft.nebula.DataSet;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.meta.MetaManager;
import com.vesoft.nebula.client.storage.GraphStorageConnection;
import com.vesoft.nebula.client.storage.StorageConnPool;
import com.vesoft.nebula.client.storage.data.ScanStatus;
import com.vesoft.nebula.client.storage.scan.PartScanInfo;
import com.vesoft.nebula.client.storage.scan.PartScanQueue;
import com.vesoft.nebula.client.storage.scan.ScanEdgeResult;
import com.vesoft.nebula.client.storage.scan.ScanResultIterator;
import com.vesoft.nebula.storage.PartitionResult;
import com.vesoft.nebula.storage.ScanEdgeRequest;
import com.vesoft.nebula.storage.ScanEdgeResponse;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ScanEdgeResultIterator
extends ScanResultIterator {
    private static final Logger LOGGER = LoggerFactory.getLogger(ScanEdgeResultIterator.class);
    private final ScanEdgeRequest request;
    private ExecutorService threadPool = null;

    private ScanEdgeResultIterator(MetaManager metaManager, StorageConnPool pool, Set<PartScanInfo> partScanInfoList, List<HostAddress> addresses, ScanEdgeRequest request, String spaceName, String labelName, boolean partSuccess) {
        super(metaManager, pool, new PartScanQueue(partScanInfoList), addresses, spaceName, labelName, partSuccess);
        this.request = request;
    }

    public ScanEdgeResult next() throws Exception {
        if (!this.hasNext()) {
            throw new IllegalAccessException("iterator has no more data");
        }
        List<DataSet> results = Collections.synchronizedList(new ArrayList(this.addresses.size()));
        List<Exception> exceptions = Collections.synchronizedList(new ArrayList(this.addresses.size()));
        CountDownLatch countDownLatch = new CountDownLatch(this.addresses.size());
        AtomicInteger existSuccess = new AtomicInteger(0);
        this.threadPool = Executors.newFixedThreadPool(this.addresses.size());
        for (HostAddress addr : this.addresses) {
            this.threadPool.submit(() -> {
                ScanEdgeResponse response;
                GraphStorageConnection connection;
                ScanEdgeRequest partRequest = new ScanEdgeRequest(this.request);
                PartScanInfo partInfo = this.partScanQueue.getPart(addr);
                if (partInfo == null) {
                    countDownLatch.countDown();
                    existSuccess.addAndGet(1);
                    return;
                }
                try {
                    connection = this.pool.getStorageConnection(new HostAddress(addr.getHost(), addr.getPort()));
                }
                catch (Exception e) {
                    LOGGER.error("get storage client error, ", (Throwable)e);
                    exceptions.add(e);
                    countDownLatch.countDown();
                    return;
                }
                partRequest.setPart_id(partInfo.getPart());
                partRequest.setCursor(partInfo.getCursor());
                try {
                    response = connection.scanEdge(partRequest);
                }
                catch (TException e) {
                    LOGGER.error(String.format("Scan edgeRow failed for %s", e.getMessage()), (Throwable)e);
                    exceptions.add(e);
                    this.partScanQueue.dropPart(partInfo);
                    countDownLatch.countDown();
                    return;
                }
                if (response == null) {
                    this.handleNullResponse(partInfo, exceptions);
                    countDownLatch.countDown();
                    return;
                }
                if (this.isSuccessful(response)) {
                    this.handleSucceedResult(existSuccess, response, partInfo);
                    results.add(response.getEdge_data());
                }
                if (response.getResult() != null) {
                    this.handleFailedResult(response, partInfo, exceptions);
                } else {
                    this.handleNullResult(partInfo, exceptions);
                }
                this.pool.release(new HostAddress(addr.getHost(), addr.getPort()), connection);
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
            this.threadPool.shutdown();
        }
        catch (InterruptedException interruptedE) {
            LOGGER.error("scan interrupted:", (Throwable)interruptedE);
            throw interruptedE;
        }
        if (this.partSuccess) {
            boolean bl = this.hasNext = this.partScanQueue.size() > 0;
            if (existSuccess.get() == 0) {
                this.throwExceptions(exceptions);
            }
            ScanStatus status = exceptions.size() > 0 ? ScanStatus.PART_SUCCESS : ScanStatus.ALL_SUCCESS;
            return new ScanEdgeResult(results, status);
        }
        boolean bl = this.hasNext = this.partScanQueue.size() > 0 && exceptions.isEmpty();
        if (!exceptions.isEmpty()) {
            this.throwExceptions(exceptions);
        }
        boolean success = existSuccess.get() == this.addresses.size();
        List<DataSet> finalResults = success ? results : null;
        return new ScanEdgeResult(finalResults, ScanStatus.ALL_SUCCESS);
    }

    private boolean isSuccessful(ScanEdgeResponse response) {
        return response.result.failed_parts.size() == 0;
    }

    private void handleSucceedResult(AtomicInteger existSuccess, ScanEdgeResponse response, PartScanInfo partInfo) {
        existSuccess.addAndGet(1);
        if (!response.has_next) {
            this.partScanQueue.dropPart(partInfo);
        } else {
            partInfo.setCursor(response.getNext_cursor());
        }
    }

    private void handleFailedResult(ScanEdgeResponse response, PartScanInfo partInfo, List<Exception> exceptions) {
        for (PartitionResult partResult : response.getResult().getFailed_parts()) {
            if (partResult.code == -11) {
                this.freshLeader(this.spaceName, partInfo.getPart(), partResult.getLeader());
                partInfo.setLeader(this.getLeader(partResult.getLeader()));
                continue;
            }
            int code = partResult.getCode();
            LOGGER.error(String.format("part scan failed, error code=%d", code));
            this.partScanQueue.dropPart(partInfo);
            exceptions.add(new Exception(String.format("part scan, error code=%d", code)));
        }
    }

    public static class ScanEdgeResultBuilder {
        MetaManager metaManager;
        StorageConnPool pool;
        Set<PartScanInfo> partScanInfoList;
        List<HostAddress> addresses;
        ScanEdgeRequest request;
        String spaceName;
        String edgeName;
        boolean partSuccess = false;

        public ScanEdgeResultBuilder withMetaClient(MetaManager metaManager) {
            this.metaManager = metaManager;
            return this;
        }

        public ScanEdgeResultBuilder withPool(StorageConnPool pool) {
            this.pool = pool;
            return this;
        }

        public ScanEdgeResultBuilder withPartScanInfo(Set<PartScanInfo> partScanInfoList) {
            this.partScanInfoList = partScanInfoList;
            return this;
        }

        public ScanEdgeResultBuilder withAddresses(List<HostAddress> addresses) {
            this.addresses = addresses;
            return this;
        }

        public ScanEdgeResultBuilder withRequest(ScanEdgeRequest request) {
            this.request = request;
            return this;
        }

        public ScanEdgeResultBuilder withSpaceName(String spaceName) {
            this.spaceName = spaceName;
            return this;
        }

        public ScanEdgeResultBuilder withEdgeName(String edgeName) {
            this.edgeName = edgeName;
            return this;
        }

        public ScanEdgeResultBuilder withPartSuccess(boolean partSuccess) {
            this.partSuccess = partSuccess;
            return this;
        }

        public ScanEdgeResultIterator build() {
            return new ScanEdgeResultIterator(this.metaManager, this.pool, this.partScanInfoList, this.addresses, this.request, this.spaceName, this.edgeName, this.partSuccess);
        }
    }
}

