/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.internals.KStreamKTableJoin;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut>
extends ContextualProcessor<K1, V1, K1, VOut> {
    private static final Logger LOG = LoggerFactory.getLogger(KStreamKTableJoin.class);
    private final KTableValueGetter<K2, V2> valueGetter;
    private final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper;
    private final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends VOut> joiner;
    private final boolean leftJoin;
    private Sensor droppedRecordsSensor;

    KStreamKTableJoinProcessor(KTableValueGetter<K2, V2> valueGetter, KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper, ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends VOut> joiner, boolean leftJoin) {
        this.valueGetter = valueGetter;
        this.keyMapper = keyMapper;
        this.joiner = joiner;
        this.leftJoin = leftJoin;
    }

    @Override
    public void init(ProcessorContext<K1, VOut> context) {
        super.init(context);
        StreamsMetricsImpl metrics = (StreamsMetricsImpl)context.metrics();
        this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
        this.valueGetter.init(context);
    }

    @Override
    public void process(Record<K1, V1> record) {
        K2 mappedKey = this.keyMapper.apply(record.key(), record.value());
        if (mappedKey == null || record.value() == null) {
            if (this.context().recordMetadata().isPresent()) {
                RecordMetadata recordMetadata = this.context().recordMetadata().get();
                LOG.warn("Skipping record due to null join key or value. topic=[{}] partition=[{}] offset=[{}]", new Object[]{recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()});
            } else {
                LOG.warn("Skipping record due to null join key or value. Topic, partition, and offset not known.");
            }
            this.droppedRecordsSensor.record();
        } else {
            ValueAndTimestamp<V2> valueAndTimestamp2 = this.valueGetter.isVersioned() ? this.valueGetter.get(mappedKey, record.timestamp()) : this.valueGetter.get(mappedKey);
            V2 value2 = ValueAndTimestamp.getValueOrNull(valueAndTimestamp2);
            if (this.leftJoin || value2 != null) {
                this.context().forward(record.withValue(this.joiner.apply(record.key(), record.value(), value2)));
            }
        }
    }

    @Override
    public void close() {
        this.valueGetter.close();
    }
}

