package _ss_com.streamsets.datacollector.runner;

import _ss_com.com.google.common.annotations.VisibleForTesting;
import _ss_com.com.google.common.base.Preconditions;
import _ss_com.com.google.common.util.concurrent.RateLimiter;
import _ss_com.streamsets.datacollector.config.StageType;
import _ss_com.streamsets.datacollector.record.RecordImpl;
import com.streamsets.pipeline.api.Record;
import com.streamsets.pipeline.api.impl.Utils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;

/* loaded from: input_file:_ss_com/streamsets/datacollector/runner/FullPipeBatch.class */
public class FullPipeBatch implements PipeBatch {
    private final SourceOffsetTracker offsetTracker;
    private final int batchSize;
    private final Map<String, List<Record>> fullPayload = new HashMap();
    private final Set<String> processedStages = new HashSet();
    private final List<StageOutput> stageOutputSnapshot;
    private final ErrorSink errorSink;
    private String newOffset;
    private int inputRecords;
    private int outputRecords;
    private RateLimiter rateLimiter;

    public FullPipeBatch(SourceOffsetTracker sourceOffsetTracker, int i, boolean z) {
        this.offsetTracker = sourceOffsetTracker;
        this.batchSize = i;
        this.stageOutputSnapshot = z ? new ArrayList() : null;
        this.errorSink = new ErrorSink();
    }

    @VisibleForTesting
    Map<String, List<Record>> getFullPayload() {
        return this.fullPayload;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public int getBatchSize() {
        return this.batchSize;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public String getPreviousOffset() {
        return this.offsetTracker.getOffset();
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public void setNewOffset(String str) {
        this.newOffset = str;
        this.offsetTracker.setOffset(str);
    }

    public void setRateLimiter(@Nullable RateLimiter rateLimiter) {
        this.rateLimiter = rateLimiter;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public BatchImpl getBatch(Pipe pipe) {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = pipe.getInputLanes().iterator();
        while (it.hasNext()) {
            arrayList.addAll(this.fullPayload.remove(it.next()));
        }
        if (pipe.getStage().getDefinition().getType().isOneOf(StageType.TARGET, StageType.EXECUTOR)) {
            this.outputRecords += arrayList.size();
        }
        return new BatchImpl(pipe.getStage().getInfo().getInstanceName(), this.offsetTracker.getOffset(), arrayList);
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public BatchMakerImpl startStage(StagePipe stagePipe) {
        String instanceName = stagePipe.getStage().getInfo().getInstanceName();
        Preconditions.checkState(!this.processedStages.contains(instanceName), Utils.formatL("The stage '{}' has been processed already", new Object[]{instanceName}));
        this.processedStages.add(instanceName);
        Iterator<String> it = stagePipe.getOutputLanes().iterator();
        while (it.hasNext()) {
            this.fullPayload.put(it.next(), null);
        }
        BatchMakerImpl batchMakerImpl = new BatchMakerImpl(stagePipe, this.stageOutputSnapshot != null, stagePipe.getStage().getDefinition().getType() == StageType.SOURCE ? getBatchSize() : Integer.MAX_VALUE);
        batchMakerImpl.setRateLimiter(this.rateLimiter);
        return batchMakerImpl;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public void skipStage(Pipe pipe) {
        Iterator<String> it = pipe.getOutputLanes().iterator();
        while (it.hasNext()) {
            this.fullPayload.put(it.next(), Collections.emptyList());
        }
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public void completeStage(BatchMakerImpl batchMakerImpl, EventSink eventSink) {
        StagePipe stagePipe = batchMakerImpl.getStagePipe();
        if (stagePipe.getStage().getDefinition().getType() == StageType.SOURCE) {
            this.inputRecords += batchMakerImpl.getSize();
        }
        Map<String, List<Record>> stageOutput = batchMakerImpl.getStageOutput();
        List<String> outputLanes = stagePipe.getStage().getConfiguration().getOutputLanes();
        for (int i = 0; i < outputLanes.size(); i++) {
            this.fullPayload.put(stagePipe.getOutputLanes().get(i), stageOutput.get(outputLanes.get(i)));
        }
        if (this.stageOutputSnapshot != null) {
            this.stageOutputSnapshot.add(new StageOutput(stagePipe.getStage().getInfo().getInstanceName(), batchMakerImpl.getStageOutputSnapshot(), this.errorSink));
        }
        if (stagePipe.getStage().getDefinition().getType().isOneOf(StageType.TARGET, StageType.EXECUTOR)) {
            this.outputRecords -= this.errorSink.getErrorRecords(stagePipe.getStage().getInfo().getInstanceName()).size();
        }
        completeStage(stagePipe, eventSink);
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public void completeStage(StagePipe stagePipe, EventSink eventSink) {
        if (stagePipe.getEventLanes().size() == 1) {
            this.fullPayload.put(stagePipe.getEventLanes().get(0), eventSink.getEventRecords());
        }
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public void commitOffset() {
        this.offsetTracker.commitOffset();
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public Map<String, List<Record>> getLaneOutputRecords(List<String> list) {
        HashMap hashMap = new HashMap();
        for (String str : list) {
            hashMap.put(str, this.fullPayload.get(str));
        }
        return hashMap;
    }

    private List<Record> createSnapshot(List<Record> list) {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<Record> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(((RecordImpl) it.next()).m644clone());
        }
        return arrayList;
    }

    private Map<String, List<Record>> createSnapshot(Map<String, List<Record>> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, List<Record>> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), createSnapshot(entry.getValue()));
        }
        return hashMap;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public void overrideStageOutput(StagePipe stagePipe, StageOutput stageOutput) {
        startStage(stagePipe);
        for (String str : stagePipe.getOutputLanes()) {
            this.fullPayload.put(str, stageOutput.getOutput().get(LaneResolver.removePostFixFromLane(str)));
        }
        if (this.stageOutputSnapshot != null) {
            this.stageOutputSnapshot.add(new StageOutput(stageOutput.getInstanceName(), createSnapshot(stageOutput.getOutput()), stageOutput.getErrorRecords(), stageOutput.getStageErrors()));
        }
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public List<StageOutput> getSnapshotsOfAllStagesOutput() {
        return this.stageOutputSnapshot;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public ErrorSink getErrorSink() {
        return this.errorSink;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public void moveLane(String str, String str2) {
        this.fullPayload.put(str2, Preconditions.checkNotNull(this.fullPayload.remove(str), Utils.formatL("Stream '{}' does not exist", new Object[]{str})));
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public void moveLaneCopying(String str, List<String> list) {
        List<Record> list2 = (List) Preconditions.checkNotNull(this.fullPayload.remove(str), Utils.formatL("Stream '{}' does not exist", new Object[]{str}));
        for (String str2 : list) {
            Preconditions.checkNotNull(Boolean.valueOf(this.fullPayload.containsKey(str2)), Utils.formatL("Lane '{}' does not exist", new Object[]{str2}));
            this.fullPayload.put(str2, createCopy(list2));
        }
    }

    private List<Record> createCopy(List<Record> list) {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<Record> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(((RecordImpl) it.next()).m644clone());
        }
        return arrayList;
    }

    private List<String> remove(List<String> list, Collection<String> collection) {
        ArrayList arrayList = new ArrayList(list);
        arrayList.removeAll(collection);
        return arrayList;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public void combineLanes(List<String> list, String str) {
        List<String> remove = remove(list, this.fullPayload.keySet());
        Preconditions.checkState(remove.isEmpty(), Utils.formatL("Lanes '{}' does not exist", new Object[]{remove}));
        this.fullPayload.put(str, new ArrayList());
        for (String str2 : list) {
            this.fullPayload.get(str).addAll((List) Preconditions.checkNotNull(this.fullPayload.remove(str2), Utils.formatL("Stream '{}' does not exist", new Object[]{str2})));
        }
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public int getInputRecords() {
        return this.inputRecords;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public int getOutputRecords() {
        return this.outputRecords;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public int getErrorRecords() {
        return this.errorSink.getTotalErrorRecords();
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipeBatch
    public int getErrorMessages() {
        return this.errorSink.getTotalErrorMessages();
    }

    public String toString() {
        Object[] objArr = new Object[5];
        objArr[0] = this.offsetTracker.getOffset();
        objArr[1] = this.newOffset;
        objArr[2] = Integer.valueOf(this.batchSize);
        objArr[3] = Boolean.valueOf(this.stageOutputSnapshot != null);
        objArr[4] = Integer.valueOf(this.errorSink.size());
        return Utils.format("PipeBatch[previousOffset='{}' currentOffset='{}' batchSize='{}' keepSnapshot='{}' errorRecords='{}]'", objArr);
    }
}
