/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.arthas.core.distribution.impl;

import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import com.taobao.arthas.core.command.model.InputStatusModel;
import com.taobao.arthas.core.command.model.MessageModel;
import com.taobao.arthas.core.command.model.ResultModel;
import com.taobao.arthas.core.distribution.DistributorOptions;
import com.taobao.arthas.core.distribution.ResultConsumer;
import com.taobao.arthas.core.distribution.SharingResultDistributor;
import com.taobao.arthas.core.shell.session.Session;
import com.taobao.arthas.core.shell.system.Job;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

public class SharingResultDistributorImpl
implements SharingResultDistributor {
    private static final Logger logger = LoggerFactory.getLogger(SharingResultDistributorImpl.class);
    private List<ResultConsumer> consumers = new CopyOnWriteArrayList<ResultConsumer>();
    private BlockingQueue<ResultModel> pendingResultQueue = new ArrayBlockingQueue<ResultModel>(10);
    private final Session session;
    private Thread distributorThread;
    private volatile boolean running;
    private AtomicInteger consumerNumGenerator = new AtomicInteger(0);
    private SharingResultConsumerImpl sharingResultConsumer = new SharingResultConsumerImpl();

    public SharingResultDistributorImpl(Session session) {
        this.session = session;
        this.running = true;
        this.distributorThread = new Thread((Runnable)new DistributorTask(), "ResultDistributor");
        this.distributorThread.start();
    }

    @Override
    public void appendResult(ResultModel result) {
        try {
            if (!this.pendingResultQueue.offer(result, 100L, TimeUnit.MILLISECONDS)) {
                ResultModel discardResult = (ResultModel)this.pendingResultQueue.poll();
                this.interruptJob("result queue is full: " + this.pendingResultQueue.size());
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private void interruptJob(String message) {
        Job job = this.session.getForegroundJob();
        if (job != null) {
            logger.warn(message + ", current job was interrupted.", (Object)job.id());
            job.interrupt();
            this.pendingResultQueue.offer(new MessageModel(message + ", current job was interrupted."));
        }
    }

    private void distribute() {
        while (this.running) {
            try {
                ResultModel result = this.pendingResultQueue.poll(100L, TimeUnit.MILLISECONDS);
                if (result == null) continue;
                this.sharingResultConsumer.appendResult(result);
                int healthCount = 0;
                for (int i = 0; i < this.consumers.size(); ++i) {
                    ResultConsumer consumer = this.consumers.get(i);
                    if (consumer.isHealthy()) {
                        ++healthCount;
                    }
                    consumer.appendResult(result);
                }
                if (healthCount != 0) continue;
                this.interruptJob("all consumers are unhealthy");
            }
            catch (Throwable e) {
                logger.warn("distribute result failed: " + e.getMessage(), e);
            }
        }
    }

    @Override
    public void close() {
        this.running = false;
    }

    @Override
    public void addConsumer(ResultConsumer consumer) {
        int consumerNo = this.consumerNumGenerator.incrementAndGet();
        String consumerId = UUID.randomUUID().toString().replaceAll("-", "") + "_" + consumerNo;
        consumer.setConsumerId(consumerId);
        this.sharingResultConsumer.copyTo(consumer);
        this.consumers.add(consumer);
    }

    @Override
    public void removeConsumer(ResultConsumer consumer) {
        this.consumers.remove(consumer);
        consumer.close();
    }

    @Override
    public List<ResultConsumer> getConsumers() {
        return this.consumers;
    }

    @Override
    public ResultConsumer getConsumer(String consumerId) {
        for (int i = 0; i < this.consumers.size(); ++i) {
            ResultConsumer consumer = this.consumers.get(i);
            if (!consumer.getConsumerId().equals(consumerId)) continue;
            return consumer;
        }
        return null;
    }

    private class SharingResultConsumerImpl
    implements ResultConsumer {
        private BlockingQueue<ResultModel> resultQueue = new ArrayBlockingQueue<ResultModel>(DistributorOptions.resultQueueSize);
        private ReentrantLock queueLock = new ReentrantLock();
        private InputStatusModel lastInputStatus;

        private SharingResultConsumerImpl() {
        }

        @Override
        public boolean appendResult(ResultModel result) {
            this.queueLock.lock();
            try {
                if (result instanceof InputStatusModel) {
                    this.lastInputStatus = (InputStatusModel)result;
                    boolean bl = true;
                    return bl;
                }
                while (!this.resultQueue.offer(result)) {
                    ResultModel resultModel = (ResultModel)this.resultQueue.poll();
                }
            }
            finally {
                if (this.queueLock.isHeldByCurrentThread()) {
                    this.queueLock.unlock();
                }
            }
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void copyTo(ResultConsumer consumer) {
            this.queueLock.lock();
            try {
                for (ResultModel result : this.resultQueue) {
                    consumer.appendResult(result);
                }
                if (this.lastInputStatus != null) {
                    consumer.appendResult(this.lastInputStatus);
                }
            }
            finally {
                if (this.queueLock.isHeldByCurrentThread()) {
                    this.queueLock.unlock();
                }
            }
        }

        @Override
        public List<ResultModel> pollResults() {
            return null;
        }

        @Override
        public long getLastAccessTime() {
            return 0L;
        }

        @Override
        public void close() {
        }

        @Override
        public boolean isClosed() {
            return false;
        }

        @Override
        public boolean isPolling() {
            return false;
        }

        @Override
        public String getConsumerId() {
            return "shared-consumer";
        }

        @Override
        public void setConsumerId(String consumerId) {
        }

        @Override
        public boolean isHealthy() {
            return true;
        }
    }

    private class DistributorTask
    implements Runnable {
        private DistributorTask() {
        }

        @Override
        public void run() {
            SharingResultDistributorImpl.this.distribute();
        }
    }
}

