/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.app.twitterstream.source;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.client.ClientHttpRequest;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.social.twitter.api.impl.TwitterTemplate;
import org.springframework.util.StringUtils;
import org.springframework.web.client.DefaultResponseErrorHandler;
import org.springframework.web.client.HttpStatusCodeException;
import org.springframework.web.client.RequestCallback;
import org.springframework.web.client.ResponseErrorHandler;
import org.springframework.web.client.ResponseExtractor;
import org.springframework.web.client.RestTemplate;

public abstract class AbstractTwitterInboundChannelAdapter
extends MessageProducerSupport {
    private static final AtomicInteger instance = new AtomicInteger();
    private final TwitterTemplate twitter;
    private final ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    private final Object monitor = new Object();
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final AtomicInteger linearBackOff = new AtomicInteger(250);
    private final AtomicInteger httpErrorBackOff = new AtomicInteger(5000);
    private final AtomicInteger rateLimitBackOff = new AtomicInteger(60000);

    protected AbstractTwitterInboundChannelAdapter(TwitterTemplate twitter) {
        this.twitter = twitter;
        this.twitter.getRestTemplate().setErrorHandler((ResponseErrorHandler)new DefaultResponseErrorHandler());
        this.setPhase(Integer.MAX_VALUE);
    }

    public void setReadTimeout(int millis) {
        ClientHttpRequestFactory f = this.getRequestFactory();
        if (f instanceof SimpleClientHttpRequestFactory) {
            ((SimpleClientHttpRequestFactory)f).setReadTimeout(millis);
        } else {
            ((HttpComponentsClientHttpRequestFactory)f).setReadTimeout(millis);
        }
    }

    public void setConnectTimeout(int millis) {
        ClientHttpRequestFactory f = this.getRequestFactory();
        if (f instanceof SimpleClientHttpRequestFactory) {
            ((SimpleClientHttpRequestFactory)f).setConnectTimeout(millis);
        } else {
            ((HttpComponentsClientHttpRequestFactory)f).setConnectTimeout(millis);
        }
    }

    protected void onInit() {
        this.taskExecutor.setThreadNamePrefix("twitterSource-" + instance.incrementAndGet() + "-");
        this.taskExecutor.initialize();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void doStart() {
        Object object = this.monitor;
        synchronized (object) {
            if (this.running.get()) {
                return;
            }
            this.running.set(true);
            this.taskExecutor.execute((Runnable)new StreamReadingTask());
        }
    }

    protected void doStop() {
        this.running.set(false);
        this.taskExecutor.getThreadPoolExecutor().shutdownNow();
        try {
            if (!this.taskExecutor.getThreadPoolExecutor().awaitTermination(10L, TimeUnit.SECONDS)) {
                this.logger.error((Object)"Reader task failed to stop");
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    protected abstract URI buildUri();

    protected abstract void doSendLine(String var1);

    private ClientHttpRequestFactory getRequestFactory() {
        DirectFieldAccessor f = new DirectFieldAccessor((Object)this.twitter.getRestTemplate().getRequestFactory());
        Object requestFactory = f.getPropertyValue("requestFactory");
        return (ClientHttpRequestFactory)requestFactory;
    }

    private void resetBackOffs() {
        this.linearBackOff.set(250);
        this.rateLimitBackOff.set(60000);
        this.httpErrorBackOff.set(5000);
    }

    private void waitLinearBackoff() {
        int millis = this.linearBackOff.get();
        this.logger.warn((Object)("Exception while reading stream, waiting for " + millis + " ms before restarting"));
        this.wait(millis);
        if (millis < 16000) {
            this.linearBackOff.set(millis + 250);
        }
    }

    private void waitRateLimitBackoff() {
        int millis = this.rateLimitBackOff.get();
        this.logger.warn((Object)("Rate limit error, waiting for " + millis / 1000 + " seconds before restarting"));
        this.wait(millis);
        this.rateLimitBackOff.set(millis * 2);
    }

    private void waitHttpErrorBackoff() {
        int millis = this.httpErrorBackOff.get();
        this.logger.warn((Object)("Http error, waiting for " + millis / 1000 + " seconds before restarting"));
        this.wait(millis);
        if (millis < 320000) {
            this.httpErrorBackOff.set(millis * 2);
        }
    }

    protected void wait(int millis) {
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            if (!this.running.get()) {
                return;
            }
            throw new IllegalStateException(e);
        }
    }

    protected class StreamReadingTask
    implements Runnable {
        protected StreamReadingTask() {
        }

        @Override
        public void run() {
            while (AbstractTwitterInboundChannelAdapter.this.running.get()) {
                try {
                    this.readStream(AbstractTwitterInboundChannelAdapter.this.twitter.getRestTemplate());
                }
                catch (HttpStatusCodeException sce) {
                    if (sce.getStatusCode() == HttpStatus.UNAUTHORIZED) {
                        AbstractTwitterInboundChannelAdapter.this.logger.error((Object)("Twitter authentication failed: " + sce.getMessage()));
                        AbstractTwitterInboundChannelAdapter.this.running.set(false);
                        continue;
                    }
                    if (sce.getStatusCode() == HttpStatus.TOO_MANY_REQUESTS) {
                        AbstractTwitterInboundChannelAdapter.this.waitRateLimitBackoff();
                        continue;
                    }
                    AbstractTwitterInboundChannelAdapter.this.waitHttpErrorBackoff();
                }
                catch (Exception e) {
                    AbstractTwitterInboundChannelAdapter.this.logger.warn((Object)"Exception while reading stream.", (Throwable)e);
                    AbstractTwitterInboundChannelAdapter.this.waitLinearBackoff();
                }
            }
        }

        private void readStream(RestTemplate restTemplate) {
            restTemplate.execute(AbstractTwitterInboundChannelAdapter.this.buildUri(), HttpMethod.GET, new RequestCallback(){

                public void doWithRequest(ClientHttpRequest request) throws IOException {
                }
            }, (ResponseExtractor)new ResponseExtractor<String>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public String extractData(ClientHttpResponse response) throws IOException {
                    InputStream inputStream = response.getBody();
                    try (BufferedReader reader = null;){
                        reader = new LineNumberReader(new InputStreamReader(inputStream));
                        AbstractTwitterInboundChannelAdapter.this.resetBackOffs();
                        while (AbstractTwitterInboundChannelAdapter.this.running.get()) {
                            String line = ((LineNumberReader)reader).readLine();
                            if (!StringUtils.hasText((String)line)) {
                                break;
                            }
                            AbstractTwitterInboundChannelAdapter.this.doSendLine(line);
                        }
                    }
                    return null;
                }
            });
        }
    }
}

