/*
 * Decompiled with CFR 0.152.
 */
package org.terracotta.masterworker;

import java.util.UUID;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.masterworker.WorkMessage;
import org.terracotta.masterworker.Worker;
import org.terracotta.masterworker.cluster.ClusterState;
import org.terracotta.message.pipe.Pipe;
import org.terracotta.message.pipe.PipeProcessor;
import org.terracotta.message.topology.Topology;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public abstract class AbstractWorker<T>
implements Worker<T> {
    private static final transient Logger logger = LoggerFactory.getLogger(AbstractWorker.class);
    private final ClusterState clusterState;
    private final String workerId;
    private final Topology<WorkMessage<T>, String> topology;
    private final ExecutorService workThreadPool;
    private Pipe<WorkMessage<T>> workPipe;
    private PipeProcessor<WorkMessage<T>> workPipeProcessor;

    public AbstractWorker(String prefix, Topology<WorkMessage<T>, String> topology, ExecutorService workThreadPool) {
        this.workerId = prefix != null && prefix.length() > 0 ? prefix + "-" + UUID.randomUUID().toString() : UUID.randomUUID().toString();
        this.topology = topology;
        this.workThreadPool = workThreadPool;
        this.clusterState = ClusterState.getOrCreateInstance(topology.getName());
    }

    @Override
    public final void start() throws Exception {
        logger.info("Starting worker for routing id {}", (Object)this.workerId);
        this.onStart();
        this.workPipe = this.topology.getOrCreatePipeFor((Object)this.workerId);
        this.workPipeProcessor = new WorkPipeListener(this.workPipe);
        this.workPipeProcessor.start();
        this.initClusterState();
    }

    @Override
    public final void stop() {
        logger.info("Stopping worker for routing id {}", (Object)this.workerId);
        this.clearClusterState();
        this.workPipeProcessor.stop();
        this.workThreadPool.shutdown();
        this.onStop();
    }

    protected final void doReply(WorkMessage<T> workMessage) {
        try {
            String replyPipeID = workMessage.getReplyPipeRoutingID();
            logger.debug("Replying work to pipe {}", (Object)replyPipeID);
            Pipe replyPipe = this.topology.getPipeFor((Object)replyPipeID);
            if (replyPipe != null) {
                replyPipe.put(workMessage);
            } else {
                logger.warn("Reply pipe {} not found!", (Object)replyPipeID);
            }
        }
        catch (InterruptedException ex) {
            logger.error("Exception while putting work into reply pipe {}!", (Object)workMessage.getReplyPipeRoutingID());
            logger.error(ex.getMessage(), (Throwable)ex);
        }
    }

    protected void onStart() throws Exception {
    }

    protected void onStop() {
    }

    protected abstract void doExecute(WorkMessage<T> var1);

    private void initClusterState() {
        this.clusterState.addWorkerForNode(this.workerId, this.clusterState.getCurrentNodeId());
    }

    private void clearClusterState() {
        this.clusterState.removeWorkerForNode(this.workerId, this.clusterState.getCurrentNodeId());
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class WorkPipeListener
    extends PipeProcessor<WorkMessage<T>> {
        public WorkPipeListener(Pipe<WorkMessage<T>> workPipe) {
            super(workPipe, false);
        }

        public boolean event(final WorkMessage<T> workMessage) throws Exception {
            logger.debug("Executing work {} in pipe {}", workMessage.getWorkObject(), (Object)AbstractWorker.this.workerId);
            AbstractWorker.this.workThreadPool.submit(new Runnable(){

                public void run() {
                    AbstractWorker.this.doExecute(workMessage);
                }
            });
            return true;
        }
    }
}

