/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shenyu.admin.listener.http;

import com.google.common.collect.Lists;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.shenyu.admin.config.properties.HttpSyncProperties;
import org.apache.shenyu.admin.listener.AbstractDataChangedListener;
import org.apache.shenyu.admin.listener.ConfigDataCache;
import org.apache.shenyu.admin.model.event.instance.InstanceInfoReportEvent;
import org.apache.shenyu.admin.model.result.ShenyuAdminResult;
import org.apache.shenyu.admin.service.publish.InstanceInfoReportEventPublisher;
import org.apache.shenyu.admin.spring.SpringBeanUtils;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.constant.HttpConstants;
import org.apache.shenyu.common.dto.AppAuthData;
import org.apache.shenyu.common.dto.DiscoverySyncData;
import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.dto.PluginData;
import org.apache.shenyu.common.dto.ProxySelectorData;
import org.apache.shenyu.common.dto.RuleData;
import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.enums.ConfigGroupEnum;
import org.apache.shenyu.common.enums.DataEventTypeEnum;
import org.apache.shenyu.common.exception.ShenyuException;
import org.apache.shenyu.common.utils.GsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpLongPollingDataChangedListener
extends AbstractDataChangedListener {
    private static final Logger LOG = LoggerFactory.getLogger(HttpLongPollingDataChangedListener.class);
    private static final String X_REAL_IP = "X-Real-IP";
    private static final String X_FORWARDED_FOR = "X-Forwarded-For";
    private static final String X_FORWARDED_FOR_SPLIT_SYMBOL = ",";
    private static final String X_REAL_PORT = "X-Real-PORT";
    private static final String CLIENT_PORT_ZERO = "0";
    private final Map<String, BlockingQueue<LongPollingClient>> clientsMap = new ConcurrentHashMap<String, BlockingQueue<LongPollingClient>>();
    private final ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1, ShenyuThreadFactory.create((String)"long-polling", (boolean)true));
    private final HttpSyncProperties httpSyncProperties;

    public HttpLongPollingDataChangedListener(HttpSyncProperties httpSyncProperties) {
        this.httpSyncProperties = httpSyncProperties;
    }

    @Override
    protected void afterInitialize() {
        long syncInterval = this.httpSyncProperties.getRefreshInterval().toMillis();
        this.scheduler.scheduleWithFixedDelay(() -> {
            LOG.info("http sync strategy refresh config start.");
            try {
                super.refreshLocalCache();
                LOG.info("http sync strategy refresh config success.");
            }
            catch (Exception e) {
                LOG.error("http sync strategy refresh config error!", (Throwable)e);
            }
        }, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
        LOG.info("http sync strategy refresh interval: {}ms", (Object)syncInterval);
    }

    public void doLongPolling(HttpServletRequest request, HttpServletResponse response) {
        List<ConfigGroupEnum> changedGroup = this.compareChangedGroup(request);
        String clientIp = HttpLongPollingDataChangedListener.getRemoteIp(request);
        String namespaceId = HttpLongPollingDataChangedListener.getNamespaceId(request);
        String bootstrapInfo = StringUtils.defaultString((String)request.getHeader("bootstrapInstanceInfo"), (String)"");
        String clientPort = StringUtils.defaultString((String)request.getHeader(X_REAL_PORT), (String)"");
        if (!CLIENT_PORT_ZERO.equals(clientPort)) {
            InstanceInfoReportEvent instanceInfoReportEvent = InstanceInfoReportEvent.builder().instanceIp(clientIp).instancePort(clientPort).instanceInfo(GsonUtils.getInstance().toJson((Object)bootstrapInfo)).instanceType("bootstrap").instanceState(1).namespaceId(namespaceId).build();
            SpringBeanUtils.getInstance().getBean(InstanceInfoReportEventPublisher.class).publish(instanceInfoReportEvent);
        }
        if (CollectionUtils.isNotEmpty(changedGroup)) {
            this.generateResponse(response, changedGroup);
            LOG.info("send response with the changed group, ip={}, group={}", (Object)clientIp, changedGroup);
            return;
        }
        LOG.debug("no changed group, ip={}, waiting for compare cache changed", (Object)clientIp);
        AsyncContext asyncContext = request.startAsync();
        asyncContext.setTimeout(0L);
        this.scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT, namespaceId));
    }

    @Override
    protected void afterAppAuthChanged(List<AppAuthData> changed, DataEventTypeEnum eventType, String namespaceId) {
        this.scheduler.execute(new DataChangeTask(ConfigGroupEnum.APP_AUTH, namespaceId));
    }

    @Override
    protected void afterMetaDataChanged(List<MetaData> changed, DataEventTypeEnum eventType, String namespaceId) {
        this.scheduler.execute(new DataChangeTask(ConfigGroupEnum.META_DATA, namespaceId));
    }

    @Override
    protected void afterPluginChanged(List<PluginData> changed, DataEventTypeEnum eventType, String namespaceId) {
        this.scheduler.execute(new DataChangeTask(ConfigGroupEnum.PLUGIN, namespaceId));
    }

    @Override
    protected void afterRuleChanged(List<RuleData> changed, DataEventTypeEnum eventType, String namespaceId) {
        this.scheduler.execute(new DataChangeTask(ConfigGroupEnum.RULE, namespaceId));
    }

    @Override
    protected void afterSelectorChanged(List<SelectorData> changed, DataEventTypeEnum eventType, String namespaceId) {
        this.scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR, namespaceId));
    }

    @Override
    protected void afterProxySelectorChanged(List<ProxySelectorData> changed, DataEventTypeEnum eventType, String namespaceId) {
        this.scheduler.execute(new DataChangeTask(ConfigGroupEnum.PROXY_SELECTOR, namespaceId));
    }

    @Override
    protected void afterDiscoveryUpstreamDataChanged(List<DiscoverySyncData> changed, DataEventTypeEnum eventType, String namespaceId) {
        this.scheduler.execute(new DataChangeTask(ConfigGroupEnum.DISCOVER_UPSTREAM, namespaceId));
    }

    private List<ConfigGroupEnum> compareChangedGroup(HttpServletRequest request) {
        ArrayList<ConfigGroupEnum> changedGroup = new ArrayList<ConfigGroupEnum>(ConfigGroupEnum.values().length);
        String namespaceId = HttpLongPollingDataChangedListener.getNamespaceId(request);
        for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
            String[] params = StringUtils.split((String)request.getParameter(group.name()), (char)',');
            if (Objects.isNull(params) || params.length != 2) {
                throw new ShenyuException("group param invalid:" + request.getParameter(group.name()));
            }
            String clientMd5 = params[0];
            long clientModifyTime = NumberUtils.toLong((String)params[1]);
            ConfigDataCache serverCache = (ConfigDataCache)CACHE.get(HttpLongPollingDataChangedListener.buildCacheKey(namespaceId, group.name()));
            if (!this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) continue;
            changedGroup.add(group);
        }
        return changedGroup;
    }

    public static String buildCacheKey(String namespaceId, String group) {
        return namespaceId + "_" + group;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean checkCacheDelayAndUpdate(ConfigDataCache serverCache, String clientMd5, long clientModifyTime) {
        if (StringUtils.equals((CharSequence)clientMd5, (CharSequence)serverCache.getMd5())) {
            return false;
        }
        long lastModifyTime = serverCache.getLastModifyTime();
        if (lastModifyTime >= clientModifyTime) {
            return true;
        }
        String configDataCacheKey = HttpLongPollingDataChangedListener.buildCacheKey(serverCache.getNamespaceId(), serverCache.getGroup());
        ConfigDataCache latest = (ConfigDataCache)CACHE.get(configDataCacheKey);
        if (latest != serverCache) {
            return !StringUtils.equals((CharSequence)clientMd5, (CharSequence)latest.getMd5());
        }
        HttpLongPollingDataChangedListener httpLongPollingDataChangedListener = this;
        synchronized (httpLongPollingDataChangedListener) {
            latest = (ConfigDataCache)CACHE.get(configDataCacheKey);
            if (latest != serverCache) {
                return !StringUtils.equals((CharSequence)clientMd5, (CharSequence)latest.getMd5());
            }
            super.refreshLocalCache();
            latest = (ConfigDataCache)CACHE.get(configDataCacheKey);
            return !StringUtils.equals((CharSequence)clientMd5, (CharSequence)latest.getMd5());
        }
    }

    private void generateResponse(HttpServletResponse response, List<ConfigGroupEnum> changedGroups) {
        try {
            response.setHeader("Pragma", "no-cache");
            response.setDateHeader("Expires", 0L);
            response.setHeader("Cache-Control", "no-cache,no-store");
            response.setContentType("application/json");
            response.setStatus(200);
            response.getWriter().println(GsonUtils.getInstance().toJson((Object)ShenyuAdminResult.success("success", changedGroups)));
        }
        catch (IOException ex) {
            LOG.error("Sending response failed.", (Throwable)ex);
        }
    }

    private static String getRemoteIp(HttpServletRequest request) {
        String xForwardedFor = request.getHeader(X_FORWARDED_FOR);
        if (!StringUtils.isBlank((CharSequence)xForwardedFor)) {
            return xForwardedFor.split(X_FORWARDED_FOR_SPLIT_SYMBOL)[0].trim();
        }
        String header = request.getHeader(X_REAL_IP);
        return StringUtils.isBlank((CharSequence)header) ? request.getRemoteAddr() : header;
    }

    private static String getNamespaceId(HttpServletRequest request) {
        String namespaceId = "649330b6-c2d7-4edc-be8e-8a54df9eb385";
        if (StringUtils.isNotEmpty((CharSequence)request.getParameter("namespaceId"))) {
            namespaceId = request.getParameter("namespaceId");
        }
        return namespaceId;
    }

    class LongPollingClient
    implements Runnable {
        private final Logger log = LoggerFactory.getLogger(LongPollingClient.class);
        private final AsyncContext asyncContext;
        private final String ip;
        private final long timeoutTime;
        private final String namespaceId;
        private Future<?> asyncTimeoutFuture;

        LongPollingClient(AsyncContext ac, String ip, long timeoutTime, String namespaceId) {
            this.asyncContext = ac;
            this.ip = ip;
            this.timeoutTime = timeoutTime;
            this.namespaceId = namespaceId;
        }

        @Override
        public void run() {
            try {
                BlockingQueue namespaceClients = HttpLongPollingDataChangedListener.this.clientsMap.getOrDefault(this.namespaceId, new ArrayBlockingQueue(1024));
                this.asyncTimeoutFuture = HttpLongPollingDataChangedListener.this.scheduler.schedule(() -> {
                    namespaceClients.remove(this);
                    List<ConfigGroupEnum> changedGroups = HttpLongPollingDataChangedListener.this.compareChangedGroup((HttpServletRequest)this.asyncContext.getRequest());
                    this.sendResponse(changedGroups);
                    this.log.debug("LongPollingClient {} ", (Object)GsonUtils.getInstance().toJson(changedGroups));
                }, this.timeoutTime, TimeUnit.MILLISECONDS);
                namespaceClients.add(this);
                HttpLongPollingDataChangedListener.this.clientsMap.put(this.namespaceId, namespaceClients);
            }
            catch (Exception ex) {
                this.log.error("add long polling client error", (Throwable)ex);
            }
        }

        void sendResponse(List<ConfigGroupEnum> changedGroups) {
            if (Objects.nonNull(this.asyncTimeoutFuture)) {
                this.asyncTimeoutFuture.cancel(false);
            }
            HttpLongPollingDataChangedListener.this.generateResponse((HttpServletResponse)this.asyncContext.getResponse(), changedGroups);
            this.asyncContext.complete();
        }
    }

    class DataChangeTask
    implements Runnable {
        private final ConfigGroupEnum groupKey;
        private final long changeTime = System.currentTimeMillis();
        private final String namespaceId;

        DataChangeTask(ConfigGroupEnum groupKey, String namespaceId) {
            this.groupKey = groupKey;
            this.namespaceId = namespaceId;
        }

        @Override
        public void run() {
            BlockingQueue<LongPollingClient> namespaceClients = HttpLongPollingDataChangedListener.this.clientsMap.get(this.namespaceId);
            if (CollectionUtils.isEmpty(namespaceClients)) {
                return;
            }
            if (namespaceClients.size() > HttpLongPollingDataChangedListener.this.httpSyncProperties.getNotifyBatchSize()) {
                ArrayList targetClients = new ArrayList(namespaceClients.size());
                namespaceClients.drainTo(targetClients);
                List partitionClients = Lists.partition(targetClients, (int)HttpLongPollingDataChangedListener.this.httpSyncProperties.getNotifyBatchSize());
                partitionClients.forEach(item -> HttpLongPollingDataChangedListener.this.scheduler.execute(() -> this.doRun((Collection<LongPollingClient>)item)));
            } else {
                this.doRun(namespaceClients);
            }
        }

        private void doRun(Collection<LongPollingClient> clients) {
            Iterator<LongPollingClient> iter = clients.iterator();
            while (iter.hasNext()) {
                LongPollingClient client = iter.next();
                iter.remove();
                client.sendResponse(Collections.singletonList(this.groupKey));
                LOG.info("send response with the changed group,ip={}, group={}, changeTime={}", new Object[]{client.ip, this.groupKey, this.changeTime});
            }
        }
    }
}

