package org.apache.flink.streaming.runtime.operators.sink;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.class */
class WithAdapterSinkWriterOperatorTest extends SinkWriterOperatorTestBase {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest$SnapshottingBufferingSinkWriter.class */
    public static class SnapshottingBufferingSinkWriter extends TestSink.DefaultSinkWriter<Integer> {
        public static final int NOT_SNAPSHOTTED = -1;
        long lastCheckpointId;

        private SnapshottingBufferingSinkWriter() {
            this.lastCheckpointId = -1L;
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSink.DefaultSinkWriter
        public List<String> snapshotState(long j) {
            this.lastCheckpointId = j;
            return this.elements;
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSink.DefaultSinkWriter
        void restoredFrom(List<String> list) {
            this.elements = new ArrayList(list);
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSink.DefaultSinkWriter
        public List<String> prepareCommit(boolean z) {
            if (!z) {
                return Collections.emptyList();
            }
            List<String> list = this.elements;
            this.elements = new ArrayList();
            return list;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest$TimeBasedBufferingSinkWriter.class */
    private static class TimeBasedBufferingSinkWriter extends TestSink.DefaultSinkWriter<Integer> implements Sink.ProcessingTimeService.ProcessingTimeCallback {
        private final List<String> cachedCommittables;

        private TimeBasedBufferingSinkWriter() {
            this.cachedCommittables = new ArrayList();
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSink.DefaultSinkWriter
        public void write(Integer num, SinkWriter.Context context) {
            this.cachedCommittables.add(Tuple3.of(num, context.timestamp(), Long.valueOf(context.currentWatermark())).toString());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSink.DefaultSinkWriter
        public void setProcessingTimerService(Sink.ProcessingTimeService processingTimeService) {
            super.setProcessingTimerService(processingTimeService);
            this.processingTimerService.registerProcessingTimer(1000L, this);
        }

        public void onProcessingTime(long j) {
            this.elements.addAll(this.cachedCommittables);
            this.cachedCommittables.clear();
            this.processingTimerService.registerProcessingTimer(j + 1000, this);
        }
    }

    WithAdapterSinkWriterOperatorTest() {
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase
    SinkWriterOperatorTestBase.SinkAndSuppliers sinkWithoutCommitter() {
        TestSink.DefaultSinkWriter defaultSinkWriter = new TestSink.DefaultSinkWriter();
        return new SinkWriterOperatorTestBase.SinkAndSuppliers(TestSink.newBuilder().setWriter(defaultSinkWriter).build().asV2(), () -> {
            return defaultSinkWriter.elements;
        }, () -> {
            return defaultSinkWriter.watermarks;
        }, () -> {
            return -1L;
        }, () -> {
            return new TestSink.StringCommittableSerializer();
        });
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase
    SinkWriterOperatorTestBase.SinkAndSuppliers sinkWithCommitter() {
        TestSink.DefaultSinkWriter defaultSinkWriter = new TestSink.DefaultSinkWriter();
        return new SinkWriterOperatorTestBase.SinkAndSuppliers(TestSink.newBuilder().setWriter(defaultSinkWriter).setDefaultCommitter().build().asV2(), () -> {
            return defaultSinkWriter.elements;
        }, () -> {
            return defaultSinkWriter.watermarks;
        }, () -> {
            return -1L;
        }, () -> {
            return new TestSink.StringCommittableSerializer();
        });
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase
    SinkWriterOperatorTestBase.SinkAndSuppliers sinkWithTimeBasedWriter() {
        TimeBasedBufferingSinkWriter timeBasedBufferingSinkWriter = new TimeBasedBufferingSinkWriter();
        return new SinkWriterOperatorTestBase.SinkAndSuppliers(TestSink.newBuilder().setWriter(timeBasedBufferingSinkWriter).setDefaultCommitter().build().asV2(), () -> {
            return timeBasedBufferingSinkWriter.elements;
        }, () -> {
            return timeBasedBufferingSinkWriter.watermarks;
        }, () -> {
            return -1L;
        }, () -> {
            return new TestSink.StringCommittableSerializer();
        });
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase
    SinkWriterOperatorTestBase.SinkAndSuppliers sinkWithSnapshottingWriter(boolean z, String str) {
        SnapshottingBufferingSinkWriter snapshottingBufferingSinkWriter = new SnapshottingBufferingSinkWriter();
        TestSink.Builder defaultCommitter = TestSink.newBuilder().setWriter(snapshottingBufferingSinkWriter).setDefaultCommitter();
        if (z) {
            defaultCommitter.withWriterState();
        }
        if (str != null) {
            defaultCommitter.setCompatibleStateNames(str);
        }
        return new SinkWriterOperatorTestBase.SinkAndSuppliers(defaultCommitter.build().asV2(), () -> {
            return snapshottingBufferingSinkWriter.elements;
        }, () -> {
            return snapshottingBufferingSinkWriter.watermarks;
        }, () -> {
            return snapshottingBufferingSinkWriter.lastCheckpointId;
        }, () -> {
            return new TestSink.StringCommittableSerializer();
        });
    }
}
