/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.broker.latency;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.latency.FutureTaskExt;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokerFastFailure {
    private static final Logger log = LoggerFactory.getLogger((String)"RocketmqBroker");
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new ThreadFactoryImpl("BrokerFastFailureScheduledThread"));
    private final BrokerController brokerController;

    public BrokerFastFailure(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    public static RequestTask castRunnable(Runnable runnable) {
        try {
            FutureTaskExt object = (FutureTaskExt)runnable;
            return (RequestTask)object.getRunnable();
        }
        catch (Throwable e) {
            log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e);
            return null;
        }
    }

    public void start() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                if (BrokerFastFailure.this.brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
                    BrokerFastFailure.this.cleanExpiredRequest();
                }
            }
        }, 1000L, 10L, TimeUnit.MILLISECONDS);
    }

    private void cleanExpiredRequest() {
        while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
            try {
                Runnable runnable;
                if (this.brokerController.getSendThreadPoolQueue().isEmpty() || null == (runnable = this.brokerController.getSendThreadPoolQueue().poll(0L, TimeUnit.SECONDS))) break;
                RequestTask rt = BrokerFastFailure.castRunnable(runnable);
                rt.returnResponse(2, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
            }
            catch (Throwable throwable) {}
        }
        this.cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(), this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
        this.cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(), this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
    }

    void cleanExpiredRequestInQueue(BlockingQueue<Runnable> blockingQueue, long maxWaitTimeMillsInQueue) {
        while (true) {
            try {
                long behind;
                RequestTask rt;
                Runnable runnable;
                while (!blockingQueue.isEmpty() && null != (runnable = (Runnable)blockingQueue.peek()) && (rt = BrokerFastFailure.castRunnable(runnable)) != null && !rt.isStopRun() && (behind = System.currentTimeMillis() - rt.getCreateTimestamp()) >= maxWaitTimeMillsInQueue) {
                    if (!blockingQueue.remove(runnable)) continue;
                    rt.setStopRun(true);
                    rt.returnResponse(2, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
                }
            }
            catch (Throwable ignored) {
                continue;
            }
            break;
        }
    }

    public void shutdown() {
        this.scheduledExecutorService.shutdown();
    }
}

