/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.app.gpfdist.sink;

import com.codahale.metrics.Meter;
import java.util.Date;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Processor;
import org.springframework.cloud.stream.app.gpfdist.sink.AbstractGpfdistMessageHandler;
import org.springframework.cloud.stream.app.gpfdist.sink.GpfdistServer;
import org.springframework.cloud.stream.app.gpfdist.sink.support.GreenplumLoad;
import org.springframework.cloud.stream.app.gpfdist.sink.support.NetworkUtils;
import org.springframework.cloud.stream.app.gpfdist.sink.support.RuntimeContext;
import org.springframework.data.hadoop.util.net.HostInfoDiscovery;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.SettableListenableFuture;
import reactor.Environment;
import reactor.core.processor.RingBufferProcessor;
import reactor.io.buffer.Buffer;

public class GpfdistMessageHandler
extends AbstractGpfdistMessageHandler {
    private final Log log = LogFactory.getLog(GpfdistMessageHandler.class);
    private final int port;
    private final int flushCount;
    private final int flushTime;
    private final int batchTimeout;
    private final int batchCount;
    private final int batchPeriod;
    private final String delimiter;
    private GreenplumLoad greenplumLoad;
    private Processor<Buffer, Buffer> processor;
    private GpfdistServer gpfdistServer;
    private TaskScheduler sqlTaskScheduler;
    private final TaskFuture taskFuture = new TaskFuture();
    private int rateInterval = 0;
    private Meter meter = null;
    private int meterCount = 0;
    private final HostInfoDiscovery hostInfoDiscovery;

    public GpfdistMessageHandler(int port, int flushCount, int flushTime, int batchTimeout, int batchCount, int batchPeriod, String delimiter, HostInfoDiscovery hostInfoDiscovery) {
        this.port = port;
        this.flushCount = flushCount;
        this.flushTime = flushTime;
        this.batchTimeout = batchTimeout;
        this.batchCount = batchCount;
        this.batchPeriod = batchPeriod;
        this.delimiter = StringUtils.hasLength((String)delimiter) ? delimiter : null;
        this.hostInfoDiscovery = hostInfoDiscovery;
    }

    @Override
    protected void doWrite(Message<?> message) throws Exception {
        Object payload = message.getPayload();
        if (payload instanceof String) {
            String data = (String)payload;
            if (this.delimiter != null) {
                this.processor.onNext((Object)Buffer.wrap((String)(data + this.delimiter)));
            } else {
                this.processor.onNext((Object)Buffer.wrap((String)data));
            }
            if (this.meter != null && this.meterCount++ % this.rateInterval == 0) {
                this.meter.mark((long)this.rateInterval);
                this.log.info((Object)("METER: 1 minute rate = " + this.meter.getOneMinuteRate() + " mean rate = " + this.meter.getMeanRate()));
            }
        } else {
            throw new MessageHandlingException(message, "message not a String");
        }
    }

    protected void onInit() throws Exception {
        super.onInit();
        Environment.initializeIfEmpty().assignErrorJournal();
        this.processor = RingBufferProcessor.create((boolean)false);
    }

    @Override
    protected void doStart() {
        try {
            this.log.info((Object)("Creating gpfdist protocol listener on port=" + this.port));
            this.gpfdistServer = new GpfdistServer(this.processor, this.port, this.flushCount, this.flushTime, this.batchTimeout, this.batchCount);
            this.gpfdistServer.start();
            this.log.info((Object)("gpfdist protocol listener running on port=" + this.gpfdistServer.getLocalPort()));
        }
        catch (Exception e) {
            throw new RuntimeException("Error starting protocol listener", e);
        }
        if (this.greenplumLoad != null) {
            this.log.info((Object)("Scheduling gpload task with batchPeriod=" + this.batchPeriod));
            final RuntimeContext context = new RuntimeContext(NetworkUtils.getGPFDistUri(this.hostInfoDiscovery.getHostInfo().getAddress(), this.gpfdistServer.getLocalPort()));
            this.sqlTaskScheduler.schedule(new FutureTask<Object>(new Runnable(){

                @Override
                public void run() {
                    boolean taskValue = true;
                    try {
                        while (!((GpfdistMessageHandler)GpfdistMessageHandler.this).taskFuture.interrupted) {
                            try {
                                GpfdistMessageHandler.this.greenplumLoad.load(context);
                            }
                            catch (Exception e) {
                                GpfdistMessageHandler.this.log.error((Object)"Error in load", (Throwable)e);
                            }
                            Thread.sleep(GpfdistMessageHandler.this.batchPeriod * 1000);
                        }
                    }
                    catch (Exception e) {
                        taskValue = false;
                    }
                    GpfdistMessageHandler.this.taskFuture.set(taskValue);
                }
            }, null), new Date());
        } else {
            this.log.info((Object)"Skipping gpload tasks because greenplumLoad is not set");
        }
    }

    @Override
    protected void doStop() {
        if (this.greenplumLoad != null) {
            this.taskFuture.interruptTask();
            try {
                long now = System.currentTimeMillis();
                Boolean value = (Boolean)this.taskFuture.get(this.batchTimeout + this.batchPeriod + 2, TimeUnit.SECONDS);
                this.log.info((Object)("Stopping, got future value " + value + " from task which took " + (System.currentTimeMillis() - now) + "ms"));
            }
            catch (Exception e) {
                this.log.warn((Object)"Got error from task wait value which may indicate trouble", (Throwable)e);
            }
        }
        try {
            this.processor.onComplete();
            this.gpfdistServer.stop();
        }
        catch (Exception e) {
            this.log.warn((Object)"Error shutting down protocol listener", (Throwable)e);
        }
    }

    public void setSqlTaskScheduler(TaskScheduler sqlTaskScheduler) {
        this.sqlTaskScheduler = sqlTaskScheduler;
    }

    public void setGreenplumLoad(GreenplumLoad greenplumLoad) {
        this.greenplumLoad = greenplumLoad;
    }

    public void setRateInterval(int rateInterval) {
        this.rateInterval = rateInterval;
        if (rateInterval > 0) {
            this.meter = new Meter();
        }
    }

    private static class TaskFuture
    extends SettableListenableFuture<Boolean> {
        boolean interrupted = false;

        private TaskFuture() {
        }

        protected void interruptTask() {
            this.interrupted = true;
        }
    }
}

