package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase;
import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;

@Deprecated
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.class */
class SinkV2SinkWriterOperatorDeprecatedTest extends SinkWriterOperatorTestBase {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest$SnapshottingBufferingSinkWriter.class */
    public static class SnapshottingBufferingSinkWriter extends TestSinkV2.DefaultStatefulSinkWriter {
        public static final int NOT_SNAPSHOTTED = -1;
        long lastCheckpointId;
        boolean endOfInput;

        private SnapshottingBufferingSinkWriter() {
            this.lastCheckpointId = -1L;
            this.endOfInput = false;
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2.DefaultCommittingSinkWriter, org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2.DefaultSinkWriter
        public void flush(boolean z) throws IOException, InterruptedException {
            this.endOfInput = z;
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2.DefaultStatefulSinkWriter
        public List<String> snapshotState(long j) throws IOException {
            this.lastCheckpointId = j;
            return super.snapshotState(j);
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2.DefaultCommittingSinkWriter
        public Collection<String> prepareCommit() {
            if (!this.endOfInput) {
                return ImmutableList.of();
            }
            List<String> list = this.elements;
            this.elements = new ArrayList();
            return list;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest$TimeBasedBufferingSinkWriter.class */
    private static class TimeBasedBufferingSinkWriter extends TestSinkV2.DefaultCommittingSinkWriter<Integer> implements ProcessingTimeService.ProcessingTimeCallback {
        private final List<String> cachedCommittables;
        private ProcessingTimeService processingTimeService;

        private TimeBasedBufferingSinkWriter() {
            this.cachedCommittables = new ArrayList();
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2.DefaultSinkWriter
        public void write(Integer num, SinkWriter.Context context) {
            this.cachedCommittables.add(Tuple3.of(num, context.timestamp(), Long.valueOf(context.currentWatermark())).toString());
        }

        public void onProcessingTime(long j) {
            this.elements.addAll(this.cachedCommittables);
            this.cachedCommittables.clear();
            this.processingTimeService.registerTimer(j + 1000, this);
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2.DefaultSinkWriter
        public void init(Sink.InitContext initContext) {
            this.processingTimeService = initContext.getProcessingTimeService();
            this.processingTimeService.registerTimer(1000L, this);
        }
    }

    SinkV2SinkWriterOperatorDeprecatedTest() {
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase
    SinkWriterOperatorTestBase.SinkAndSuppliers sinkWithoutCommitter() {
        TestSinkV2.DefaultSinkWriter defaultSinkWriter = new TestSinkV2.DefaultSinkWriter();
        return new SinkWriterOperatorTestBase.SinkAndSuppliers(org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2.newBuilder().setWriter(defaultSinkWriter).build(), () -> {
            return defaultSinkWriter.elements;
        }, () -> {
            return defaultSinkWriter.watermarks;
        }, () -> {
            return -1L;
        }, TestSinkV2.StringSerializer::new);
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase
    SinkWriterOperatorTestBase.SinkAndSuppliers sinkWithCommitter() {
        TestSinkV2.DefaultCommittingSinkWriter defaultCommittingSinkWriter = new TestSinkV2.DefaultCommittingSinkWriter();
        return new SinkWriterOperatorTestBase.SinkAndSuppliers(org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2.newBuilder().setWriter(defaultCommittingSinkWriter).setDefaultCommitter().build(), () -> {
            return defaultCommittingSinkWriter.elements;
        }, () -> {
            return defaultCommittingSinkWriter.watermarks;
        }, () -> {
            return -1L;
        }, TestSinkV2.StringSerializer::new);
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase
    SinkWriterOperatorTestBase.SinkAndSuppliers sinkWithTimeBasedWriter() {
        TimeBasedBufferingSinkWriter timeBasedBufferingSinkWriter = new TimeBasedBufferingSinkWriter();
        return new SinkWriterOperatorTestBase.SinkAndSuppliers(org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2.newBuilder().setWriter(timeBasedBufferingSinkWriter).setDefaultCommitter().build(), () -> {
            return timeBasedBufferingSinkWriter.elements;
        }, () -> {
            return timeBasedBufferingSinkWriter.watermarks;
        }, () -> {
            return -1L;
        }, TestSinkV2.StringSerializer::new);
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase
    SinkWriterOperatorTestBase.SinkAndSuppliers sinkWithSnapshottingWriter(boolean z, String str) {
        SnapshottingBufferingSinkWriter snapshottingBufferingSinkWriter = new SnapshottingBufferingSinkWriter();
        TestSinkV2.Builder withPostCommitTopology = org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2.newBuilder().setWriter(snapshottingBufferingSinkWriter).setDefaultCommitter().setWithPostCommitTopology(true);
        if (z) {
            withPostCommitTopology.setWriterState(true);
        }
        if (str != null) {
            withPostCommitTopology.setCompatibleStateNames(str);
        }
        return new SinkWriterOperatorTestBase.SinkAndSuppliers(withPostCommitTopology.build(), () -> {
            return snapshottingBufferingSinkWriter.elements;
        }, () -> {
            return snapshottingBufferingSinkWriter.watermarks;
        }, () -> {
            return snapshottingBufferingSinkWriter.lastCheckpointId;
        }, () -> {
            return new TestSinkV2.StringSerializer();
        });
    }
}
