package io.continual.services.processor.engine.library.sources;

import io.continual.builder.Builder;
import io.continual.services.processor.config.readers.ConfigLoadContext;
import io.continual.services.processor.engine.model.Message;
import io.continual.services.processor.engine.model.MessageAndRouting;
import io.continual.services.processor.engine.model.StreamProcessingContext;
import java.io.IOException;
import java.util.ArrayList;
import org.json.JSONObject;

/* loaded from: input_file:io/continual/services/processor/engine/library/sources/JsonObjectStreamSource.class */
public class JsonObjectStreamSource extends BasicSource {
    private final ArrayList<JSONObject> fPending;
    private int fSkip;

    public JsonObjectStreamSource(ConfigLoadContext configLoadContext, JSONObject jSONObject) throws Builder.BuildFailure {
        super(jSONObject);
        this.fPending = new ArrayList<>();
        this.fSkip = jSONObject.optInt("skip", 0);
    }

    public synchronized void submit(JSONObject jSONObject) {
        if (isEof()) {
            throw new IllegalStateException("Added JSON msg after close.");
        }
        if (this.fSkip > 0) {
            this.fSkip--;
        } else {
            this.fPending.add(jSONObject);
            notify();
        }
    }

    @Override // io.continual.services.processor.engine.library.sources.BasicSource, io.continual.services.processor.engine.model.Source
    public synchronized boolean isEof() {
        return this.fPending.size() == 0 && super.isEof();
    }

    @Override // io.continual.services.processor.engine.library.sources.BasicSource
    protected synchronized MessageAndRouting internalGetNextMessage(StreamProcessingContext streamProcessingContext) throws IOException, InterruptedException {
        if (this.fPending.size() > 0) {
            return makeDefRoutingMessage(Message.adoptJsonAsMessage(this.fPending.remove(0)));
        }
        return null;
    }
}
