/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.client.dataservices.impl;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.SessionState;
import com.marklogic.client.dataservices.InputOutputEndpoint;
import com.marklogic.client.dataservices.impl.IOEndpointImpl;
import com.marklogic.client.dataservices.impl.InputOutputCallerImpl;
import com.marklogic.client.io.marker.JSONWriteHandle;
import java.io.InputStream;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InputOutputEndpointImpl
extends IOEndpointImpl
implements InputOutputEndpoint {
    private static Logger logger = LoggerFactory.getLogger(InputOutputEndpointImpl.class);
    private InputOutputCallerImpl caller;
    private int batchSize;

    public InputOutputEndpointImpl(DatabaseClient client, JSONWriteHandle apiDecl) {
        this(client, new InputOutputCallerImpl(apiDecl));
    }

    private InputOutputEndpointImpl(DatabaseClient client, InputOutputCallerImpl caller) {
        super(client, caller);
        this.caller = caller;
        this.batchSize = this.initBatchSize(caller);
    }

    private InputOutputCallerImpl getCaller() {
        return this.caller;
    }

    private int getBatchSize() {
        return this.batchSize;
    }

    @Override
    public InputStream[] call(InputStream[] input) {
        return this.call(null, null, null, input);
    }

    @Override
    public InputStream[] call(InputStream endpointState, SessionState session, InputStream workUnit, InputStream[] input) {
        this.checkAllowedArgs(endpointState, session, workUnit);
        return this.getCaller().arrayCall(this.getClient(), endpointState, session, workUnit, input);
    }

    @Override
    public InputOutputEndpoint.BulkInputOutputCaller bulkCaller() {
        return new BulkInputOutputCallerImpl(this, this.getBatchSize());
    }

    static final class BulkInputOutputCallerImpl
    extends IOEndpointImpl.BulkIOEndpointCallerImpl
    implements InputOutputEndpoint.BulkInputOutputCaller {
        private InputOutputEndpointImpl endpoint;
        private int batchSize;
        private LinkedBlockingQueue<InputStream> queue;
        private Consumer<InputStream> outputListener;

        BulkInputOutputCallerImpl(InputOutputEndpointImpl endpoint, int batchSize) {
            super(endpoint);
            this.endpoint = endpoint;
            this.batchSize = batchSize;
            this.queue = new LinkedBlockingQueue();
        }

        private InputOutputEndpointImpl getEndpoint() {
            return this.endpoint;
        }

        private int getBatchSize() {
            return this.batchSize;
        }

        private LinkedBlockingQueue<InputStream> getQueue() {
            return this.queue;
        }

        private Consumer<InputStream> getOutputListener() {
            return this.outputListener;
        }

        @Override
        public void setOutputListener(Consumer<InputStream> listener) {
            this.outputListener = listener;
        }

        @Override
        public void accept(InputStream input) {
            if (this.getOutputListener() == null) {
                throw new IllegalStateException("Must configure output consumer before providing input");
            }
            boolean hasBatch = this.queueInput(input, this.getQueue(), this.getBatchSize());
            if (hasBatch) {
                this.processInput();
            }
        }

        @Override
        public void acceptAll(InputStream[] input) {
            if (this.getOutputListener() == null) {
                throw new IllegalStateException("Must configure output consumer before providing input");
            }
            boolean hasBatch = this.queueAllInput(input, this.getQueue(), this.getBatchSize());
            if (hasBatch) {
                this.processInput();
            }
        }

        private void processInput() {
            logger.trace("input endpoint running endpoint={} count={} state={}", new Object[]{this.getEndpointPath(), this.getCallCount(), this.getEndpointState()});
            InputStream[] output = null;
            try {
                output = this.getEndpoint().getCaller().arrayCall(this.getClient(), this.getEndpointState(), this.getSession(), this.getWorkUnit(), this.getInputBatch(this.getQueue(), this.getBatchSize()));
            }
            catch (Throwable throwable) {
                throw new RuntimeException("error while calling " + this.getEndpoint().getEndpointPath(), throwable);
            }
            this.incrementCallCount();
            this.processOutputBatch(output, this.getOutputListener());
        }

        @Override
        public void awaitCompletion() {
            if (this.getQueue() == null) {
                return;
            }
            while (!this.getQueue().isEmpty()) {
                this.processInput();
            }
        }
    }
}

