package _ss_com.streamsets.datacollector.event.handler.remote;

import _ss_com.streamsets.datacollector.execution.PipelineState;
import _ss_com.streamsets.datacollector.execution.StateEventListener;
import _ss_com.streamsets.datacollector.util.Configuration;
import _ss_com.streamsets.datacollector.util.PipelineException;
import _ss_com.streamsets.dc.execution.manager.standalone.ThreadUsage;
import com.streamsets.pipeline.api.impl.Utils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:_ss_com/streamsets/datacollector/event/handler/remote/RemoteStateEventListener.class */
public class RemoteStateEventListener implements StateEventListener {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteStateEventListener.class);
    private static final String REMOTE_EVENTS_QUEUE_CAPACITY = "remote.events.queue.capacity";
    private static final int REMOTE_EVENTS_QUEUE_CAPACITY_DEFAULT = 1000;
    private final int capacity;
    private BlockingQueue<PipelineState> pipelineStateQueue;

    @Inject
    public RemoteStateEventListener(Configuration configuration) {
        this.capacity = configuration.get(REMOTE_EVENTS_QUEUE_CAPACITY, 1000);
    }

    public void init() {
        this.pipelineStateQueue = new ArrayBlockingQueue(this.capacity);
    }

    @Override // _ss_com.streamsets.datacollector.execution.StateEventListener
    public void onStateChange(PipelineState pipelineState, PipelineState pipelineState2, String str, ThreadUsage threadUsage) throws PipelineException {
        Object obj = pipelineState2.getAttributes().get(RemoteDataCollector.IS_REMOTE_PIPELINE);
        if (obj != null && ((Boolean) obj).booleanValue()) {
            if (this.pipelineStateQueue.offer(pipelineState2)) {
                LOG.debug(Utils.format("Adding status event for remote pipeline: '{}' in status: '{}'", new Object[]{pipelineState2.getName(), pipelineState2.getStatus()}));
            } else {
                LOG.warn(Utils.format("Cannot add status event for remote pipeline: '{}' in status: '{}'; Queue for storing pipeline state events is full", new Object[0]), pipelineState2.getName(), pipelineState2.getStatus());
            }
        }
    }

    public Collection<PipelineState> getPipelineStateEvents() {
        ArrayList<PipelineState> arrayList = new ArrayList();
        this.pipelineStateQueue.drainTo(arrayList);
        HashMap hashMap = new HashMap();
        for (PipelineState pipelineState : arrayList) {
            hashMap.put(pipelineState.getName(), pipelineState);
        }
        return hashMap.values();
    }

    public void clear() {
        this.pipelineStateQueue.clear();
    }
}
