package _ss_com.streamsets.datacollector.event.handler.remote;

import _ss_com.com.google.common.annotations.VisibleForTesting;
import _ss_com.streamsets.datacollector.callback.CallbackInfo;
import _ss_com.streamsets.datacollector.config.PipelineConfiguration;
import _ss_com.streamsets.datacollector.config.RuleDefinitions;
import _ss_com.streamsets.datacollector.config.dto.ValidationStatus;
import _ss_com.streamsets.datacollector.event.dto.WorkerInfo;
import _ss_com.streamsets.datacollector.event.handler.DataCollector;
import _ss_com.streamsets.datacollector.execution.Manager;
import _ss_com.streamsets.datacollector.execution.PipelineState;
import _ss_com.streamsets.datacollector.execution.PipelineStateStore;
import _ss_com.streamsets.datacollector.execution.PipelineStatus;
import _ss_com.streamsets.datacollector.execution.PreviewOutput;
import _ss_com.streamsets.datacollector.execution.PreviewStatus;
import _ss_com.streamsets.datacollector.execution.Previewer;
import _ss_com.streamsets.datacollector.execution.Runner;
import _ss_com.streamsets.datacollector.execution.manager.PipelineManagerException;
import _ss_com.streamsets.datacollector.store.PipelineStoreTask;
import _ss_com.streamsets.datacollector.util.ContainerError;
import _ss_com.streamsets.datacollector.util.PipelineException;
import _ss_com.streamsets.datacollector.validation.Issues;
import com.streamsets.pipeline.api.ExecutionMode;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.impl.Utils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:_ss_com/streamsets/datacollector/event/handler/remote/RemoteDataCollector.class */
public class RemoteDataCollector implements DataCollector {
    public static final String IS_REMOTE_PIPELINE = "IS_REMOTE_PIPELINE";
    private static final String NAME_AND_REV_SEPARATOR = "::";
    private static final Logger LOG = LoggerFactory.getLogger(RemoteDataCollector.class);
    private final Manager manager;
    private final PipelineStoreTask pipelineStore;
    private final List<String> validatorIdList = new ArrayList();
    private final PipelineStateStore pipelineStateStore;
    private final RemoteStateEventListener stateEventListener;

    @Inject
    public RemoteDataCollector(Manager manager, PipelineStoreTask pipelineStoreTask, PipelineStateStore pipelineStateStore, RemoteStateEventListener remoteStateEventListener) {
        this.manager = manager;
        this.pipelineStore = pipelineStoreTask;
        this.pipelineStateStore = pipelineStateStore;
        this.stateEventListener = remoteStateEventListener;
    }

    public void init() {
        this.stateEventListener.init();
        this.manager.addStateEventListener(this.stateEventListener);
        this.pipelineStore.registerStateListener(this.stateEventListener);
    }

    private void validateIfRemote(String str, String str2, String str3) throws PipelineException {
        if (!this.manager.isRemotePipeline(str, str2)) {
            throw new PipelineException(ContainerError.CONTAINER_01100, str3, str);
        }
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void start(String str, String str2, String str3) throws PipelineException, StageException {
        validateIfRemote(str2, str3, "START");
        this.manager.getRunner(str, str2, str3).start();
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void stop(String str, String str2, String str3) throws PipelineException {
        validateIfRemote(str2, str3, "STOP");
        this.manager.getRunner(str, str2, str3).stop();
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void delete(String str, String str2) throws PipelineException {
        validateIfRemote(str, str2, "DELETE");
        this.pipelineStore.delete(str);
        this.pipelineStore.deleteRules(str);
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void deleteHistory(String str, String str2, String str3) throws PipelineException {
        validateIfRemote(str2, str3, "DELETE_HISTORY");
        this.manager.getRunner(str, str2, str3).deleteHistory();
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void savePipeline(String str, String str2, String str3, String str4, PipelineConfiguration pipelineConfiguration, RuleDefinitions ruleDefinitions) throws PipelineException {
        UUID uuid;
        boolean z = false;
        Iterator<PipelineState> it = this.manager.getPipelines().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            } else if (it.next().getName().equals(str2)) {
                z = true;
                break;
            }
        }
        if (z) {
            validateIfRemote(str2, str3, "SAVE");
            uuid = this.pipelineStore.getInfo(str2).getUuid();
            ruleDefinitions.setUuid(this.pipelineStore.retrieveRules(str2, str3).getUuid());
        } else {
            uuid = this.pipelineStore.create(str, str2, str4, true).getUuid();
        }
        pipelineConfiguration.setUuid(uuid);
        this.pipelineStore.save(str, str2, str3, str4, pipelineConfiguration);
        this.pipelineStore.storeRules(str2, str3, ruleDefinitions);
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void savePipelineRules(String str, String str2, RuleDefinitions ruleDefinitions) throws PipelineException {
        validateIfRemote(str, str2, "SAVE_RULES");
        this.pipelineStore.getInfo(str);
        ruleDefinitions.setUuid(this.pipelineStore.retrieveRules(str, str2).getUuid());
        this.pipelineStore.storeRules(str, str2, ruleDefinitions);
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void resetOffset(String str, String str2, String str3) throws PipelineException, PipelineManagerException {
        validateIfRemote(str2, str3, "RESET_OFFSET");
        this.manager.getRunner(str, str2, str3).resetOffset();
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void validateConfigs(String str, String str2, String str3) throws PipelineException {
        Previewer createPreviewer = this.manager.createPreviewer(str, str2, str3);
        validateIfRemote(str2, str3, "VALIDATE_CONFIGS");
        createPreviewer.validateConfigs(1000L);
        this.validatorIdList.add(createPreviewer.getId());
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void stopAndDelete(String str, String str2, String str3) throws PipelineException, StageException {
        validateIfRemote(str2, str3, "STOP_AND_DELETE");
        PipelineState state = this.pipelineStateStore.getState(str2, str3);
        if (state.getStatus().isActive()) {
            this.manager.getRunner(str, str2, str3).stop();
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (state.getStatus().isActive() && System.currentTimeMillis() - currentTimeMillis < 10000) {
            try {
                Thread.sleep(500L);
                state = this.pipelineStateStore.getState(str2, str3);
            } catch (InterruptedException e) {
                throw new IllegalStateException("Interrupted while waiting for pipeline to stop " + e, e);
            }
        }
        if (state.getStatus().isActive()) {
            this.pipelineStateStore.saveState(str, str2, str3, PipelineStatus.STOPPED, "Stopping pipeline forcefully as we are performing a delete afterwards", state.getAttributes(), state.getExecutionMode(), state.getMetrics(), state.getRetryAttempt(), state.getNextRetryTimeStamp());
        }
        delete(str2, str3);
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public List<PipelineAndValidationStatus> getRemotePipelinesWithChanges() throws PipelineException {
        PipelineState pipelineState;
        ArrayList arrayList = new ArrayList();
        for (PipelineState pipelineState2 : this.stateEventListener.getPipelineStateEvents()) {
            String name = pipelineState2.getName();
            String rev = pipelineState2.getRev();
            boolean z = pipelineState2.getExecutionMode() != ExecutionMode.STANDALONE;
            List<WorkerInfo> arrayList2 = new ArrayList();
            if (this.pipelineStore.hasPipeline(name)) {
                Runner runner = this.manager.getRunner(pipelineState2.getUser(), name, rev);
                pipelineState = runner.getState();
                if (z) {
                    arrayList2 = getWorkers(runner.getSlaveCallbackList());
                }
            } else {
                pipelineState = pipelineState2;
            }
            arrayList.add(new PipelineAndValidationStatus(name, rev, true, pipelineState.getStatus(), pipelineState.getMessage(), arrayList2, z));
        }
        return arrayList;
    }

    private List<WorkerInfo> getWorkers(Collection<CallbackInfo> collection) {
        ArrayList arrayList = new ArrayList();
        for (CallbackInfo callbackInfo : collection) {
            WorkerInfo workerInfo = new WorkerInfo();
            workerInfo.setWorkerURL(callbackInfo.getSdcURL());
            workerInfo.setWorkerId(callbackInfo.getSlaveSdcId());
            arrayList.add(workerInfo);
        }
        return arrayList;
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public Collection<PipelineAndValidationStatus> getPipelines() throws PipelineException {
        List<PipelineState> pipelines = this.manager.getPipelines();
        this.stateEventListener.clear();
        HashMap hashMap = new HashMap();
        for (PipelineState pipelineState : pipelines) {
            String name = pipelineState.getName();
            String rev = pipelineState.getRev();
            String user = pipelineState.getUser();
            boolean z = this.manager.isRemotePipeline(name, rev);
            if (z || this.manager.isPipelineActive(name, rev)) {
                ArrayList arrayList = new ArrayList();
                boolean z2 = pipelineState.getExecutionMode() != ExecutionMode.STANDALONE;
                if (z2) {
                    for (CallbackInfo callbackInfo : this.manager.getRunner(user, name, rev).getSlaveCallbackList()) {
                        WorkerInfo workerInfo = new WorkerInfo();
                        workerInfo.setWorkerURL(callbackInfo.getSdcURL());
                        workerInfo.setWorkerId(callbackInfo.getSlaveSdcId());
                        arrayList.add(workerInfo);
                    }
                }
                hashMap.put(getNameAndRevString(name, rev), new PipelineAndValidationStatus(name, rev, z, pipelineState.getStatus(), pipelineState.getMessage(), arrayList, z2));
            }
        }
        setValidationStatus(hashMap);
        return hashMap.values();
    }

    private void setValidationStatus(Map<String, PipelineAndValidationStatus> map) {
        ArrayList arrayList = new ArrayList();
        for (String str : this.validatorIdList) {
            Previewer previewer = this.manager.getPreviewer(str);
            ValidationStatus validationStatus = null;
            Issues issues = null;
            String str2 = null;
            if (previewer != null) {
                PreviewStatus status = previewer.getStatus();
                switch (status) {
                    case INVALID:
                        validationStatus = ValidationStatus.INVALID;
                        break;
                    case TIMING_OUT:
                    case TIMED_OUT:
                        validationStatus = ValidationStatus.TIMED_OUT;
                        break;
                    case VALID:
                        validationStatus = ValidationStatus.VALID;
                        break;
                    case VALIDATING:
                        validationStatus = ValidationStatus.VALIDATING;
                        break;
                    case VALIDATION_ERROR:
                        validationStatus = ValidationStatus.VALIDATION_ERROR;
                        break;
                    default:
                        LOG.warn(Utils.format("Unrecognized validation state: '{}'", new Object[]{status}));
                        break;
                }
                if (!status.isActive()) {
                    PreviewOutput output = previewer.getOutput();
                    issues = output.getIssues();
                    str2 = output.getMessage();
                    arrayList.add(str);
                }
            } else {
                LOG.warn(Utils.format("Previewer is null for id: '{}'", new Object[]{str}));
            }
            PipelineAndValidationStatus pipelineAndValidationStatus = map.get(getNameAndRevString(previewer.getName(), previewer.getRev()));
            if (pipelineAndValidationStatus == null) {
                LOG.warn("Preview pipeline: '{}'::'{}' doesn't exist", previewer.getName(), previewer.getRev());
            } else {
                pipelineAndValidationStatus.setValidationStatus(validationStatus);
                pipelineAndValidationStatus.setIssues(issues);
                pipelineAndValidationStatus.setMessage(str2);
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            this.validatorIdList.remove((String) it.next());
        }
    }

    private String getNameAndRevString(String str, String str2) {
        return str + NAME_AND_REV_SEPARATOR + str2;
    }

    @VisibleForTesting
    List<String> getValidatorList() {
        return this.validatorIdList;
    }
}
