/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.hub.step.impl;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import com.marklogic.client.DatabaseClient;
import com.marklogic.client.datamovement.JacksonCSVSplitter;
import com.marklogic.client.dataservices.IOEndpoint;
import com.marklogic.client.dataservices.InputCaller;
import com.marklogic.client.io.Format;
import com.marklogic.client.io.InputStreamHandle;
import com.marklogic.client.io.JacksonHandle;
import com.marklogic.client.io.marker.BufferableHandle;
import com.marklogic.hub.DatabaseKind;
import com.marklogic.hub.HubClient;
import com.marklogic.hub.HubProject;
import com.marklogic.hub.dataservices.BulkUtil;
import com.marklogic.hub.dataservices.StepService;
import com.marklogic.hub.error.DataHubConfigurationException;
import com.marklogic.hub.flow.Flow;
import com.marklogic.hub.impl.HubClientImpl;
import com.marklogic.hub.step.RunStepResponse;
import com.marklogic.hub.step.StepDefinition;
import com.marklogic.hub.step.StepItemCompleteListener;
import com.marklogic.hub.step.StepItemFailureListener;
import com.marklogic.hub.step.StepRunner;
import com.marklogic.hub.step.StepStatusListener;
import com.marklogic.hub.step.impl.FileCollector;
import com.marklogic.hub.step.impl.IngestionStepDefinitionImpl;
import com.marklogic.hub.step.impl.StepMetrics;
import com.marklogic.hub.step.impl.StepRunnerUtil;
import com.marklogic.hub.util.DiskQueue;
import com.marklogic.hub.util.json.JSONObject;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.net.FileNameMap;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLConnection;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StreamUtils;

public class WriteStepRunner
implements StepRunner {
    private static final int MAX_ERROR_MESSAGES = 10;
    private static final Pattern percentPattern = Pattern.compile("%", 16);
    private Flow flow;
    private int batchSize;
    private int threadCount;
    private String destinationDatabase;
    private int previousPercentComplete;
    protected long csvFilesProcessed;
    private String currentCsvFile;
    private Map<String, Object> combinedOptions;
    private boolean stopOnFailure = false;
    private String jobId;
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    private String step = "1";
    private static final String DATE_TIME_FORMAT_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS";
    private static final String END_LINE = "\r\n";
    private final List<StepItemCompleteListener> stepItemCompleteListeners = new ArrayList<StepItemCompleteListener>();
    private final List<StepItemFailureListener> stepItemFailureListeners = new ArrayList<StepItemFailureListener>();
    private final List<StepStatusListener> stepStatusListeners = new ArrayList<StepStatusListener>();
    private final HubClient hubClient;
    private final HubProject hubProject;
    private Thread runningThread = null;
    protected String inputFilePath = null;
    protected String outputCollections;
    protected String outputPermissions;
    protected String outputFormat;
    protected String inputFileType;
    protected String inputMimeType;
    protected String outputURIReplacement;
    protected String outputURIPrefix;
    protected String separator = null;
    protected String boundary = "===dataHubIngestion===";
    protected AtomicBoolean isStopped = new AtomicBoolean(false);
    private IngestionStepDefinitionImpl stepDef;
    private Map<String, Object> stepConfig = new HashMap<String, Object>();
    private final FileNameMap fileNameMap = URLConnection.getFileNameMap();

    public WriteStepRunner(HubClient hubClient, HubProject hubProject) {
        this.hubClient = hubClient;
        this.hubProject = hubProject;
    }

    @Override
    public StepRunner withFlow(Flow flow) {
        this.flow = flow;
        return this;
    }

    @Override
    public StepRunner withStep(String step) {
        this.step = step;
        return this;
    }

    @Override
    public StepRunner withJobId(String jobId) {
        this.jobId = jobId;
        return this;
    }

    public StepRunner withStepDefinition(StepDefinition stepDefinition) {
        this.stepDef = (IngestionStepDefinitionImpl)stepDefinition;
        return this;
    }

    @Override
    public StepRunner withBatchSize(int batchSize) {
        this.batchSize = batchSize;
        return this;
    }

    @Override
    public StepRunner withThreadCount(int threadCount) {
        this.threadCount = threadCount;
        return this;
    }

    public StepRunner withDestinationDatabase(String destinationDatabase) {
        this.destinationDatabase = destinationDatabase;
        return this;
    }

    @Override
    public StepRunner withStopOnFailure(boolean stopOnFailure) {
        this.stopOnFailure = stopOnFailure;
        return this;
    }

    @Override
    public StepRunner withRuntimeOptions(Map<String, Object> runtimeOptions) {
        if (this.flow == null) {
            throw new DataHubConfigurationException("Flow has to be set before setting options");
        }
        this.combinedOptions = StepRunnerUtil.makeCombinedOptions(this.flow, this.stepDef, this.step, runtimeOptions);
        return this;
    }

    @Override
    public StepRunner withStepConfig(Map<String, Object> stepConfig) {
        this.stepConfig = stepConfig;
        return this;
    }

    @Override
    public StepRunner onItemComplete(StepItemCompleteListener listener) {
        this.stepItemCompleteListeners.add(listener);
        return this;
    }

    @Override
    public StepRunner onItemFailed(StepItemFailureListener listener) {
        this.stepItemFailureListeners.add(listener);
        return this;
    }

    @Override
    public StepRunner onStatusChanged(StepStatusListener listener) {
        this.stepStatusListeners.add(listener);
        return this;
    }

    @Override
    public void awaitCompletion() {
        try {
            this.awaitCompletion(Long.MAX_VALUE, TimeUnit.DAYS);
        }
        catch (InterruptedException | TimeoutException e) {
            this.logger.info("WriteStepRunner awaitCompletion failed!", (Throwable)e);
        }
    }

    @Override
    public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        if (this.runningThread != null) {
            this.runningThread.join(unit.convert(timeout, unit));
            if (this.runningThread.getState() != Thread.State.TERMINATED) {
                this.isStopped.set(true);
                this.runningThread.interrupt();
                throw new TimeoutException("Timeout occurred after " + timeout + " " + (Object)((Object)unit));
            }
        }
    }

    @Override
    public int getBatchSize() {
        return this.batchSize;
    }

    private boolean jobOutputIsEnabled() {
        if (this.combinedOptions != null && this.combinedOptions.containsKey("disableJobOutput")) {
            return !Boolean.parseBoolean(this.combinedOptions.get("disableJobOutput").toString());
        }
        return true;
    }

    @Override
    public RunStepResponse run() {
        Collection<String> uris;
        if (this.combinedOptions == null) {
            this.combinedOptions = new HashMap<String, Object>();
        }
        this.runningThread = null;
        RunStepResponse runStepResponse = StepRunnerUtil.createStepResponse(this.flow, this.step, this.jobId);
        this.loadStepRunnerParameters();
        if ("csv".equalsIgnoreCase(this.inputFileType)) {
            this.combinedOptions.put("inputFileType", "csv");
        }
        this.combinedOptions.put("flow", this.flow.getName());
        if (this.jobOutputIsEnabled()) {
            StepService.on(this.hubClient.getJobsClient()).startStep(this.jobId, this.step, this.flow.getName(), new ObjectMapper().valueToTree(this.combinedOptions));
        }
        runStepResponse.setStepStartTime(StepRunnerUtil.getCurrentTimeWithTimeZone());
        try {
            uris = this.runFileCollector();
        }
        catch (Exception e) {
            runStepResponse.setCounts(0L, 0L, 0L, 0L, 0L).withStatus("failed step " + this.step);
            StringWriter errors = new StringWriter();
            e.printStackTrace(new PrintWriter(errors));
            runStepResponse.withStepOutput(errors.toString());
            if (this.jobOutputIsEnabled()) {
                JsonNode jobDoc = StepService.on(this.hubClient.getJobsClient()).finishStep(this.jobId, this.step, "failed step " + this.step, (JsonNode)runStepResponse.toObjectNode());
                try {
                    return StepRunnerUtil.getResponse(jobDoc, this.step);
                }
                catch (Exception ex) {
                    this.logger.info("Failed to update jobs document.", (Throwable)ex);
                }
            }
            runStepResponse.setStepEndTime(StepRunnerUtil.getCurrentTimeWithTimeZone());
            return runStepResponse;
        }
        return this.runIngester(runStepResponse, uris);
    }

    @Override
    public RunStepResponse run(Collection<String> uris) {
        this.runningThread = null;
        RunStepResponse runStepResponse = StepRunnerUtil.createStepResponse(this.flow, this.step, this.jobId);
        if (this.jobOutputIsEnabled()) {
            StepService.on(this.hubClient.getJobsClient()).startStep(this.jobId, this.step, this.flow.getName(), new ObjectMapper().valueToTree(this.combinedOptions));
        }
        runStepResponse.setStepStartTime(StepRunnerUtil.getCurrentTimeWithTimeZone());
        return this.runIngester(runStepResponse, uris);
    }

    @Override
    public void stop() {
        this.isStopped.set(true);
    }

    protected void loadStepRunnerParameters() {
        JsonNode comboOptions;
        try {
            comboOptions = JSONObject.readInput(JSONObject.writeValueAsString(this.combinedOptions));
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        JSONObject obj = new JSONObject(comboOptions);
        if (obj.getArrayString("collections", false) != null) {
            this.outputCollections = StringUtils.join(obj.getArrayString("collections", false), (String)",");
        }
        if (obj.getString("permissions") != null) {
            this.outputPermissions = obj.getString("permissions");
        }
        if (obj.getString("targetDatabase") != null) {
            this.withDestinationDatabase(obj.getString("targetDatabase"));
        }
        if (obj.getString("outputFormat") != null) {
            this.outputFormat = obj.getString("outputFormat");
        }
        ObjectMapper mapper = new ObjectMapper();
        HashMap fileLocations = new HashMap();
        if (this.stepDef.getFileLocations() != null) {
            Map stepDefFileLocation = (Map)mapper.convertValue((Object)this.stepDef.getFileLocations(), Map.class);
            fileLocations.putAll(stepDefFileLocation);
        }
        if (this.flow.getStep(this.step).getFileLocations() != null) {
            Map stepFileLocation = (Map)mapper.convertValue((Object)this.flow.getStep(this.step).getFileLocations(), Map.class);
            fileLocations.putAll(stepFileLocation);
        }
        if (this.stepConfig.get("batchSize") != null) {
            this.batchSize = Integer.parseInt(this.stepConfig.get("batchSize").toString());
        }
        if (this.stepConfig.get("threadCount") != null) {
            this.threadCount = Integer.parseInt(this.stepConfig.get("threadCount").toString());
        }
        if (this.stepConfig.get("fileLocations") != null) {
            fileLocations.putAll((Map)this.stepConfig.get("fileLocations"));
        }
        if (!fileLocations.isEmpty()) {
            this.inputFilePath = (String)fileLocations.get("inputFilePath");
            this.inputFileType = (String)fileLocations.get("inputFileType");
            this.outputURIReplacement = (String)fileLocations.get("outputURIReplacement");
            this.outputURIPrefix = (String)fileLocations.get("outputURIPrefix");
            if (this.inputFileType.equalsIgnoreCase("csv") && fileLocations.get("separator") != null) {
                this.separator = (String)fileLocations.get("separator");
                if (!"\t".equals(this.separator)) {
                    this.separator = this.separator.trim();
                }
            }
        }
        if (this.separator != null && this.separator.equalsIgnoreCase("\\t")) {
            this.separator = "\t";
        }
        if (this.stepConfig.get("stopOnFailure") != null) {
            this.withStopOnFailure(Boolean.parseBoolean(this.stepConfig.get("stopOnFailure").toString()));
        }
        if (StringUtils.isNotEmpty((CharSequence)this.outputURIReplacement)) {
            if (this.outputURIPrefix != null) {
                throw new RuntimeException("'outputURIPrefix' and 'outputURIReplacement' cannot be set simultaneously");
            }
        } else if (this.outputURIPrefix == null) {
            this.outputURIPrefix = "";
        }
        if (this.inputFilePath == null || this.inputFileType == null) {
            throw new RuntimeException("File path and type cannot be empty");
        }
    }

    protected Path determineInputFilePath(String inputFilePath) {
        Path dirPath = Paths.get(inputFilePath, new String[0]);
        if (dirPath.isAbsolute()) {
            return dirPath;
        }
        if (this.hubProject != null) {
            String projectDirString = this.hubProject.getProjectDirString();
            return new File(projectDirString, dirPath.toString()).toPath().toAbsolutePath();
        }
        Path inputPath = new File(dirPath.toString()).toPath().toAbsolutePath();
        this.logger.info("No HubProject available to resolve relative inputFilePath; will ingest from: " + inputPath);
        return inputPath;
    }

    private Collection<String> runFileCollector() {
        this.stepStatusListeners.forEach(listener -> listener.onStatusChange(this.jobId, 0, "running step " + this.step, 0L, 0L, "fetching files"));
        DiskQueue<String> uris = !this.isStopped.get() ? new FileCollector(this.inputFileType, ((HubClientImpl)this.hubClient).getHubClientConfig()).run(this.determineInputFilePath(this.inputFilePath)) : null;
        return uris;
    }

    private RunStepResponse runIngester(RunStepResponse runStepResponse, Collection<String> uris) {
        this.stepStatusListeners.forEach(listener -> listener.onStatusChange(runStepResponse.getJobId(), 0, "running step " + this.step, 0L, 0L, "starting step execution"));
        if (uris == null || uris.isEmpty()) {
            String stepStatus = this.isStopped.get() ? "canceled step " + this.step : "completed step " + this.step;
            this.stepStatusListeners.forEach(listener -> listener.onStatusChange(runStepResponse.getJobId(), 100, stepStatus, 0L, 0L, stepStatus.contains("completed step ") ? "provided file path returned 0 items" : "job was stopped"));
            runStepResponse.setCounts(0L, 0L, 0L, 0L, 0L);
            runStepResponse.withStatus(stepStatus);
            if (this.jobOutputIsEnabled()) {
                JsonNode jobDoc = StepService.on(this.hubClient.getJobsClient()).finishStep(this.jobId, this.step, stepStatus, (JsonNode)runStepResponse.toObjectNode());
                try {
                    return StepRunnerUtil.getResponse(jobDoc, this.step);
                }
                catch (Exception ex) {
                    return runStepResponse;
                }
            }
            runStepResponse.setStepEndTime(StepRunnerUtil.getCurrentTimeWithTimeZone());
            return runStepResponse;
        }
        int inputCount = uris.size();
        StepMetrics stepMetrics = new StepMetrics(inputCount, (int)Math.ceil((double)inputCount / (double)this.batchSize));
        DatabaseClient client = this.destinationDatabase.equals(this.hubClient.getDbName(DatabaseKind.FINAL)) ? this.hubClient.getFinalClient() : this.hubClient.getStagingClient();
        String apiPath = "ml-modules/root/data-hub/data-services/stepRunner/processIngestBatch.api";
        ObjectNode endpointConstants = new ObjectMapper().createObjectNode();
        endpointConstants.put("jobId", this.jobId);
        endpointConstants.put("stepNumber", this.step);
        endpointConstants.put("flowName", this.flow.getName());
        JsonNode optionsNode = WriteStepRunner.jsonToNode(this.combinedOptions);
        endpointConstants.set("options", optionsNode);
        ErrorListener errorListener = new ErrorListener(this, stepMetrics, this.stopOnFailure, optionsNode.path("retryLimit").asInt(0));
        switch (this.inputFileType.toLowerCase()) {
            case "xml": {
                this.inputMimeType = "application/xml";
                break;
            }
            case "json": 
            case "csv": {
                this.inputMimeType = "application/json";
                break;
            }
            case "text": {
                this.inputMimeType = "text/plain";
                break;
            }
            default: {
                this.inputMimeType = "application/octet-stream";
            }
        }
        this.runningThread = new Thread(() -> {
            InputCaller.BulkInputCaller<InputStreamHandle> bulkCaller = BulkUtil.runInputCaller(client, apiPath, endpointConstants, runStepResponse.toObjectNode(), this.threadCount, this.batchSize, errorListener);
            AtomicLong count = new AtomicLong(0L);
            Stream<ConcurrentMap<Long, List<InputStreamHandle>>> inputHandles = uris.stream().map(uri -> this.toInputStreamHandleList(new File((String)uri))).flatMap(Collection::stream);
            Collection resultingBatches = inputHandles.collect(Collectors.groupingByConcurrent(inputStreamHandle -> count.getAndIncrement() / (long)this.batchSize)).values();
            InputStreamHandle[] handleArray = new InputStreamHandle[]{};
            for (List batch : resultingBatches) {
                bulkCaller.acceptAll((Object[])batch.toArray(handleArray));
                stepMetrics.getSuccessfulBatches().incrementAndGet();
                stepMetrics.getSuccessfulEvents().addAndGet(batch.size());
                this.runStatusListener(stepMetrics);
            }
            bulkCaller.awaitCompletion();
            stepMetrics.getSuccessfulBatches().set((long)Math.ceil((double)inputCount / (double)this.batchSize) - stepMetrics.getFailedBatchesCount());
            stepMetrics.getSuccessfulEvents().set((long)inputCount - Math.min(stepMetrics.getFailedEventsCount(), (long)inputCount));
            String stepStatus = stepMetrics.getFailedEventsCount() > 0L && this.stopOnFailure ? "stop on error in step " + this.step : (this.isStopped.get() ? "canceled step " + this.step : (stepMetrics.getFailedEventsCount() > 0L && stepMetrics.getSuccessfulEventsCount() > 0L ? "completed with errors step " + this.step : (stepMetrics.getFailedEventsCount() == 0L && stepMetrics.getSuccessfulEventsCount() > 0L ? "completed step " + this.step : "failed step " + this.step)));
            this.stepStatusListeners.forEach(listener -> listener.onStatusChange(runStepResponse.getJobId(), 100, stepStatus, stepMetrics.getSuccessfulEventsCount(), stepMetrics.getFailedEventsCount(), "Ingestion completed"));
            runStepResponse.setCounts(stepMetrics.getSuccessfulEventsCount() + stepMetrics.getFailedEventsCount(), count.get(), stepMetrics.getFailedEventsCount(), stepMetrics.getSuccessfulBatchesCount(), stepMetrics.getFailedBatchesCount());
            runStepResponse.withStatus(stepStatus);
            if (!errorListener.getThrowables().isEmpty()) {
                runStepResponse.withStepOutput(errorListener.getThrowables().stream().filter(Objects::nonNull).map(Throwable::toString).filter(Objects::nonNull).collect(Collectors.toList()));
                errorListener.getThrowables().clear();
            }
            if (this.jobOutputIsEnabled()) {
                JsonNode jobDoc = null;
                try {
                    jobDoc = StepService.on(this.hubClient.getJobsClient()).finishStep(this.jobId, this.step, stepStatus, (JsonNode)runStepResponse.toObjectNode());
                }
                catch (Exception e) {
                    this.logger.error("Unable to update job document, cause: " + e.getMessage());
                }
                if (jobDoc != null) {
                    try {
                        RunStepResponse tempResp = StepRunnerUtil.getResponse(jobDoc, this.step);
                        runStepResponse.setStepStartTime(tempResp.getStepStartTime());
                        runStepResponse.setStepEndTime(tempResp.getStepEndTime());
                    }
                    catch (Exception ex) {
                        this.logger.error("Unable to update step response, cause: " + ex.getMessage());
                    }
                }
            } else {
                runStepResponse.setStepEndTime(StepRunnerUtil.getCurrentTimeWithTimeZone());
            }
        });
        this.runningThread.start();
        return runStepResponse;
    }

    private InputStreamHandle processCsv(JacksonHandle jacksonHandle, File file) {
        ObjectMapper mapper = jacksonHandle.getMapper();
        JsonNode originalContent = jacksonHandle.get();
        ObjectNode node = mapper.createObjectNode();
        if (this.outputFormat != null && this.outputFormat.equalsIgnoreCase("xml")) {
            node.putObject("content").set("root", originalContent);
        } else {
            node.set("content", originalContent);
        }
        node.put("file", file.getAbsolutePath());
        InputStreamHandle inputStreamHandle = null;
        try {
            inputStreamHandle = this.toInputStreamHandleWithInputStream(this.generateUriForCsv(file.getParent(), SystemUtils.OS_NAME.toLowerCase()), new ByteArrayInputStream(node.toPrettyString().getBytes(StandardCharsets.UTF_8)));
        }
        catch (IllegalStateException e) {
            this.logger.error("WriteBatcher has been stopped", (Throwable)e);
        }
        if (!file.getAbsolutePath().equalsIgnoreCase(this.currentCsvFile)) {
            this.currentCsvFile = file.getAbsolutePath();
            ++this.csvFilesProcessed;
        }
        return inputStreamHandle;
    }

    protected String generateUriForCsv(String parentPath, String os) {
        String uri;
        if (this.outputURIPrefix != null) {
            try {
                uri = percentPattern.matcher(WriteStepRunner.generateAndEncodeURI(this.outputURIPrefix)).replaceAll("%%");
            }
            catch (URISyntaxException e) {
                throw new RuntimeException(e);
            }
        }
        uri = parentPath;
        if (os.contains("windows")) {
            uri = "/" + FilenameUtils.separatorsToUnix((String)StringUtils.replaceOnce((String)uri, (String)":", (String)""));
        }
        try {
            uri = percentPattern.matcher(WriteStepRunner.generateAndEncodeURI(this.outputURIReplace(uri))).replaceAll("%%");
        }
        catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
        uri = uri + "/";
        return String.format(uri + "%s." + ("xml".equalsIgnoreCase(this.outputFormat) ? "xml" : "json"), UUID.randomUUID());
    }

    private String fileUri(File file) throws URISyntaxException {
        String uri;
        if (this.outputURIPrefix != null) {
            uri = this.getPrefixedEncodedURI(file.getName());
        } else {
            uri = file.getAbsolutePath();
            if (SystemUtils.OS_NAME.toLowerCase().contains("windows")) {
                uri = "/" + FilenameUtils.separatorsToUnix((String)StringUtils.replaceOnce((String)uri, (String)":", (String)""));
            }
            uri = WriteStepRunner.generateAndEncodeURI(this.outputURIReplace(uri));
        }
        return uri;
    }

    private boolean isDelimitedFile(String fileType) {
        return fileType.equalsIgnoreCase("csv") || fileType.equalsIgnoreCase("tsv") || fileType.equalsIgnoreCase("psv");
    }

    private String getSeparator(String extension) {
        if (this.separator != null) {
            return this.separator;
        }
        switch (extension.toLowerCase()) {
            case "csv": {
                return ",";
            }
            case "tsv": {
                return "\t";
            }
            case "psv": {
                return "|";
            }
        }
        return null;
    }

    private List<InputStreamHandle> toInputStreamHandleList(File file) {
        ArrayList<InputStreamHandle> inputStreamHandleList;
        block19: {
            inputStreamHandleList = new ArrayList<InputStreamHandle>();
            try (FileInputStream docStream = new FileInputStream(file);){
                String extension = FilenameUtils.getExtension((String)file.getName());
                if (this.isDelimitedFile(this.inputFileType) || this.isDelimitedFile(extension)) {
                    String separator = this.getSeparator(extension);
                    assert (separator != null);
                    CsvSchema schema = CsvSchema.emptySchema().withHeader().withColumnSeparator(separator.charAt(0));
                    JacksonCSVSplitter splitter = new JacksonCSVSplitter().withCsvSchema(schema);
                    try {
                        if (!this.isStopped.get()) {
                            Stream contentStream = splitter.split((InputStream)docStream);
                            contentStream.forEach(jacksonHandle -> inputStreamHandleList.add(this.processCsv((JacksonHandle)jacksonHandle, file)));
                        }
                        break block19;
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
                inputStreamHandleList.add(this.toInputStreamHandleWithInputStream(this.fileUri(file), docStream));
            }
            catch (IOException | URISyntaxException e) {
                throw new RuntimeException(e);
            }
        }
        return inputStreamHandleList;
    }

    protected InputStreamHandle toInputStreamHandleWithInputStream(String uri, InputStream inputStream) {
        Charset charset = StandardCharsets.UTF_8;
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        PrintWriter writer = new PrintWriter((Writer)new OutputStreamWriter((OutputStream)outputStream, charset), true);
        writer.append("--").append(this.boundary).append(END_LINE);
        writer.append("Content-Disposition: form-data; name=\"uri\"").append(END_LINE);
        writer.append("Content-Type: text/plain; charset=").append(String.valueOf(charset)).append(END_LINE);
        writer.append(END_LINE);
        writer.append(uri).append(END_LINE);
        writer.append("--").append(this.boundary).append(END_LINE);
        writer.append("Content-Disposition: form-data; name=\"fileName\"; filename=\"").append(uri).append("\"").append(END_LINE);
        writer.append("Content-Type: ").append(this.inputMimeType).append(END_LINE);
        writer.append("Content-Transfer-Encoding: binary").append(END_LINE);
        writer.append(END_LINE);
        writer.flush();
        try {
            StreamUtils.copy((InputStream)inputStream, (OutputStream)outputStream);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        writer.append(END_LINE);
        writer.flush();
        writer.append(END_LINE).flush();
        writer.append("--").append(this.boundary).append("--").append(END_LINE);
        writer.close();
        return new InputStreamHandle((InputStream)new ByteArrayInputStream(outputStream.toByteArray())).withFormat(Format.BINARY);
    }

    protected String getPrefixedEncodedURI(String filename) throws URISyntaxException {
        return WriteStepRunner.generateAndEncodeURI(this.outputURIPrefix + filename);
    }

    private static String generateAndEncodeURI(String path) throws URISyntaxException {
        URI uri = new URI(null, null, null, 0, path, null, null);
        return uri.toString();
    }

    private String outputURIReplace(String uri) {
        if (StringUtils.isNotEmpty((CharSequence)this.outputURIReplacement)) {
            String replacement;
            int i;
            String[] replace = this.outputURIReplacement.split(",");
            if (replace.length % 2 != 0) {
                throw new IllegalArgumentException("Invalid argument for URI replacement: " + this.outputURIReplacement);
            }
            int replaceLength = replace.length;
            for (i = 0; i < replaceLength - 1; ++i) {
                if ((replacement = replace[++i].trim()).startsWith("'") && replacement.endsWith("'")) continue;
                throw new IllegalArgumentException("Invalid argument for URI replacement: " + this.outputURIReplacement);
            }
            for (i = 0; i < replaceLength - 1; i += 2) {
                replacement = replace[i + 1].trim();
                replacement = replacement.substring(1, replacement.length() - 1);
                uri = uri.replaceAll(replace[i], replacement);
            }
        }
        return uri;
    }

    protected void runStatusListener(StepMetrics stepMetrics) {
        double uriSize = stepMetrics.getTotalEventsCount();
        double batchCount = stepMetrics.getTotalBatchesCount();
        long totalRunBatches = stepMetrics.getSuccessfulBatchesCount() + stepMetrics.getFailedBatchesCount();
        if ("csv".equalsIgnoreCase(this.inputFileType)) {
            int percentComplete = (int)((double)this.csvFilesProcessed / uriSize * 100.0);
            if (percentComplete != this.previousPercentComplete && percentComplete % 2 == 0) {
                this.previousPercentComplete = percentComplete;
                this.stepStatusListeners.forEach(listener -> listener.onStatusChange(this.jobId, percentComplete, "running step " + this.step, stepMetrics.getSuccessfulEventsCount(), stepMetrics.getFailedEventsCount(), "Ingesting"));
            }
        } else {
            int percentComplete = (int)((double)totalRunBatches / batchCount * 100.0);
            if (percentComplete != this.previousPercentComplete && percentComplete % 5 == 0) {
                this.previousPercentComplete = percentComplete;
                this.stepStatusListeners.forEach(listener -> listener.onStatusChange(this.jobId, percentComplete, "running step " + this.step, stepMetrics.getSuccessfulEventsCount(), stepMetrics.getFailedEventsCount(), "Ingesting"));
            }
        }
    }

    private static JsonNode jsonToNode(Map<String, Object> map) {
        ObjectMapper objectMapper = new ObjectMapper();
        return (JsonNode)objectMapper.convertValue(map, JsonNode.class);
    }

    static class ErrorListener
    implements InputCaller.BulkInputCaller.ErrorListener {
        WriteStepRunner writeStepRunner;
        StepMetrics stepMetrics;
        static final List<Throwable> throwables = new ArrayList<Throwable>();
        StepStatusListener[] stepStatusListeners = null;
        int retryLimit;
        boolean stopOnFailure;

        public ErrorListener(WriteStepRunner writeStepRunner, StepMetrics stepMetrics, boolean stopOnFailure, int retryLimit) {
            this.writeStepRunner = writeStepRunner;
            this.stepMetrics = stepMetrics;
            this.stopOnFailure = stopOnFailure;
            this.retryLimit = retryLimit;
        }

        public List<Throwable> getThrowables() {
            return throwables;
        }

        public ErrorListener withStepListeners(StepStatusListener ... stepStatusListeners) {
            this.stepStatusListeners = stepStatusListeners;
            return this;
        }

        public IOEndpoint.BulkIOEndpointCaller.ErrorDisposition processError(int retryCount, Throwable throwable, IOEndpoint.CallContext callContext, BufferableHandle[] input) {
            if (retryCount < this.retryLimit) {
                return IOEndpoint.BulkIOEndpointCaller.ErrorDisposition.RETRY;
            }
            this.stepMetrics.getFailedBatches().incrementAndGet();
            this.stepMetrics.getSuccessfulBatches().decrementAndGet();
            long failedEvents = input.length;
            this.stepMetrics.getSuccessfulEvents().set(this.stepMetrics.getSuccessfulEventsCount() - failedEvents);
            this.stepMetrics.getFailedEvents().addAndGet(failedEvents);
            this.writeStepRunner.runStatusListener(this.stepMetrics);
            if (throwable != null) {
                throwables.add(throwable);
            }
            return this.stopOnFailure ? IOEndpoint.BulkIOEndpointCaller.ErrorDisposition.STOP_ALL_CALLS : IOEndpoint.BulkIOEndpointCaller.ErrorDisposition.SKIP_CALL;
        }
    }
}

