/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase;
import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;

@Deprecated
class SinkV2SinkWriterOperatorDeprecatedTest
extends SinkWriterOperatorTestBase {
    SinkV2SinkWriterOperatorDeprecatedTest() {
    }

    @Override
    InspectableSink sinkWithoutCommitter() {
        TestSinkV2.DefaultSinkWriter sinkWriter = new TestSinkV2.DefaultSinkWriter();
        return new InspectableSink(TestSinkV2.newBuilder().setWriter(sinkWriter).build());
    }

    @Override
    InspectableSink sinkWithCommitter() {
        TestSinkV2.DefaultCommittingSinkWriter sinkWriter = new TestSinkV2.DefaultCommittingSinkWriter();
        return new InspectableSink(TestSinkV2.newBuilder().setDefaultCommitter().setWriter(sinkWriter).build());
    }

    @Override
    InspectableSink sinkWithTimeBasedWriter() {
        TimeBasedBufferingSinkWriter sinkWriter = new TimeBasedBufferingSinkWriter();
        return new InspectableSink(TestSinkV2.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build());
    }

    @Override
    InspectableSink sinkWithState(boolean withState, String stateName) {
        TestSinkV2.DefaultStatefulSinkWriter sinkWriter = new TestSinkV2.DefaultStatefulSinkWriter();
        TestSinkV2.Builder builder = TestSinkV2.newBuilder().setDefaultCommitter().setWithPostCommitTopology(true).setWriter(sinkWriter);
        if (withState) {
            builder.setWriterState(true);
        }
        if (stateName != null) {
            builder.setCompatibleStateNames(stateName);
        }
        return new InspectableSink(builder.build());
    }

    static class InspectableSink
    extends SinkWriterOperatorTestBase.AbstractInspectableSink<TestSinkV2<Integer>> {
        InspectableSink(TestSinkV2<Integer> sink) {
            super(sink);
        }

        @Override
        public long getLastCheckpointId() {
            return ((TestSinkV2)this.getSink()).getWriter().lastCheckpointId;
        }

        @Override
        public List<String> getRecordsOfCurrentCheckpoint() {
            return ((TestSinkV2)this.getSink()).getWriter().elements;
        }

        @Override
        public List<Watermark> getWatermarks() {
            return ((TestSinkV2)this.getSink()).getWriter().watermarks;
        }

        @Override
        public int getRecordCountFromState() {
            return ((TestSinkV2.DefaultStatefulSinkWriter)((TestSinkV2)this.getSink()).getWriter()).getRecordCount();
        }
    }

    private static class TimeBasedBufferingSinkWriter
    extends TestSinkV2.DefaultCommittingSinkWriter<Integer>
    implements ProcessingTimeService.ProcessingTimeCallback {
        private final List<String> cachedCommittables = new ArrayList<String>();
        private ProcessingTimeService processingTimeService;

        private TimeBasedBufferingSinkWriter() {
        }

        @Override
        public void write(Integer element, SinkWriter.Context context) {
            this.cachedCommittables.add(Tuple3.of((Object)element, (Object)context.timestamp(), (Object)context.currentWatermark()).toString());
        }

        public void onProcessingTime(long time) {
            this.elements.addAll(this.cachedCommittables);
            this.cachedCommittables.clear();
            this.processingTimeService.registerTimer(time + 1000L, (ProcessingTimeService.ProcessingTimeCallback)this);
        }

        @Override
        public void init(Sink.InitContext context) {
            this.processingTimeService = context.getProcessingTimeService();
            this.processingTimeService.registerTimer(1000L, (ProcessingTimeService.ProcessingTimeCallback)this);
        }
    }
}

