/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.internals.PositionSerde;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(value=MockitoJUnitRunner.StrictStubs.class)
public class WriteConsistencyVectorTest {
    @Mock
    private StreamsConfig streamsConfig;
    @Mock
    private ProcessorStateManager stateManager;
    @Mock
    private RecordCollector recordCollector;
    @Mock
    private StreamTask task;
    @Mock
    private TaskId taskId;
    @Mock
    private StreamsMetricsImpl streamsMetrics;
    @Mock
    private ThreadCache threadCache;
    private ProcessorContextImpl context;
    private static final String KEY = "key";
    private static final Bytes KEY_BYTES = Bytes.wrap((byte[])"key".getBytes());
    private static final long VALUE = 42L;
    private static final byte[] VALUE_BYTES = String.valueOf(42L).getBytes();
    private static final long TIMESTAMP = 21L;
    private static final String REGISTERED_STORE_NAME = "registered-store";
    private static final TopicPartition CHANGELOG_PARTITION = new TopicPartition("store-changelog", 1);
    private static final String INPUT_TOPIC_NAME = "input-topic";
    private static final Integer INPUT_PARTITION = 0;
    private static final Long INPUT_OFFSET = 100L;

    @Before
    public void setup() {
        Mockito.when((Object)this.streamsConfig.originals()).thenReturn(Collections.singletonMap("__iq.consistency.offset.vector.enabled__", true));
        Mockito.when((Object)this.streamsConfig.values()).thenReturn(Collections.emptyMap());
        Mockito.when((Object)this.streamsConfig.getString("application.id")).thenReturn((Object)"add-id");
        Mockito.when((Object)this.stateManager.taskType()).thenReturn((Object)Task.TaskType.ACTIVE);
        Mockito.when((Object)this.stateManager.registeredChangelogPartitionFor(REGISTERED_STORE_NAME)).thenReturn((Object)CHANGELOG_PARTITION);
        this.context = new ProcessorContextImpl(this.taskId, this.streamsConfig, this.stateManager, this.streamsMetrics, this.threadCache);
        this.context.transitionToActive(this.task, null, null);
        this.context.setCurrentNode(new ProcessorNode("fake", (Processor)null, new HashSet<String>(Arrays.asList("LocalKeyValueStore", "LocalTimestampedKeyValueStore", "LocalWindowStore", "LocalTimestampedWindowStore", "LocalSessionStore"))));
    }

    @Test
    public void shouldSendConsistencyVectorToChangelogTopic() {
        Position position = Position.emptyPosition();
        position.withComponent(INPUT_TOPIC_NAME, INPUT_PARTITION.intValue(), INPUT_OFFSET.longValue());
        this.context.setRecordContext(new ProcessorRecordContext(-1L, INPUT_OFFSET.longValue(), INPUT_PARTITION.intValue(), INPUT_TOPIC_NAME, (Headers)new RecordHeaders()));
        RecordHeaders headers = new RecordHeaders();
        headers.add((Header)ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
        headers.add((Header)new RecordHeader("c", PositionSerde.serialize((Position)position).array()));
        this.context.transitionToActive(this.task, this.recordCollector, null);
        this.context.logChange(REGISTERED_STORE_NAME, KEY_BYTES, VALUE_BYTES, 21L, position);
        ((RecordCollector)Mockito.verify((Object)this.recordCollector)).send(CHANGELOG_PARTITION.topic(), (Object)KEY_BYTES, (Object)VALUE_BYTES, (Headers)headers, Integer.valueOf(CHANGELOG_PARTITION.partition()), Long.valueOf(21L), (Serializer)InternalProcessorContext.BYTES_KEY_SERIALIZER, (Serializer)InternalProcessorContext.BYTEARRAY_VALUE_SERIALIZER, null, null);
    }
}

