/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.metrics;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.runners.samza.metrics.SamzaTransformMetricRegistry;
import org.apache.beam.runners.samza.runtime.KeyedTimerData;
import org.apache.beam.runners.samza.runtime.Op;
import org.apache.beam.runners.samza.runtime.OpEmitter;
import org.apache.beam.runners.samza.util.PipelineJsonRenderer;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.Scheduler;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SamzaOutputMetricOp<@UnknownKeyFor T>
implements Op<T, T, Void> {
    protected final @UnknownKeyFor @NonNull @Initialized String transformFullName;
    protected final @UnknownKeyFor @NonNull @Initialized SamzaTransformMetricRegistry samzaTransformMetricRegistry;
    protected final @UnknownKeyFor @NonNull @Initialized String pValue;
    private final @UnknownKeyFor @NonNull @Initialized AtomicLong count;
    private final @UnknownKeyFor @NonNull @Initialized AtomicReference<@UnknownKeyFor @NonNull @Initialized BigInteger> sumOfTimestamps;
    protected transient @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> transformInputs;
    protected transient @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> transformOutputs;
    protected transient @UnknownKeyFor @NonNull @Initialized String task;
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(SamzaOutputMetricOp.class);

    public SamzaOutputMetricOp(@NonNull @UnknownKeyFor @Initialized String pValue, @NonNull @UnknownKeyFor @Initialized String transformFullName, @NonNull @UnknownKeyFor @Initialized SamzaTransformMetricRegistry samzaTransformMetricRegistry) {
        this.transformFullName = transformFullName;
        this.samzaTransformMetricRegistry = samzaTransformMetricRegistry;
        this.pValue = pValue;
        this.count = new AtomicLong(0L);
        this.sumOfTimestamps = new AtomicReference<BigInteger>(BigInteger.ZERO);
    }

    @Override
    public void open(@UnknownKeyFor @NonNull @Initialized Config config, @UnknownKeyFor @NonNull @Initialized Context context, @UnknownKeyFor @NonNull @Initialized Scheduler<@UnknownKeyFor @NonNull @Initialized KeyedTimerData<@UnknownKeyFor @Nullable @Initialized Void>> timerRegistry, @UnknownKeyFor @NonNull @Initialized OpEmitter<T> emitter) {
        Map.Entry<List<String>, List<String>> transformInputOutput = PipelineJsonRenderer.getTransformIOMap(config).get(this.transformFullName);
        this.transformInputs = transformInputOutput != null ? transformInputOutput.getKey() : new ArrayList();
        this.transformOutputs = transformInputOutput != null ? transformInputOutput.getValue() : new ArrayList();
        this.task = context.getTaskContext().getTaskModel().getTaskName().getTaskName();
        this.samzaTransformMetricRegistry.register(this.transformFullName, this.pValue, context);
    }

    @Override
    public void processElement(@UnknownKeyFor @NonNull @Initialized WindowedValue<T> inputElement, @UnknownKeyFor @NonNull @Initialized OpEmitter<T> emitter) {
        this.count.incrementAndGet();
        this.sumOfTimestamps.updateAndGet(sum -> sum.add(BigInteger.valueOf(System.nanoTime())));
        this.samzaTransformMetricRegistry.getTransformMetrics().getTransformOutputThroughput(this.transformFullName).inc();
        emitter.emitElement(inputElement);
    }

    @Override
    public void processWatermark(@UnknownKeyFor @NonNull @Initialized Instant watermark, @UnknownKeyFor @NonNull @Initialized OpEmitter<T> emitter) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Processing Output Watermark for Transform: {} Count: {} SumOfTimestamps: {} for Watermark: {} for Task: {}", new Object[]{this.transformFullName, this.count.get(), this.sumOfTimestamps.get().longValue(), watermark.getMillis(), this.task});
        }
        if (this.count.get() > 0L) {
            long avg = Math.floorDiv(this.sumOfTimestamps.get().longValue(), this.count.get());
            this.samzaTransformMetricRegistry.updateArrivalTimeMap(this.transformFullName, this.pValue, watermark.getMillis(), avg);
            this.samzaTransformMetricRegistry.emitLatencyMetric(this.transformFullName, this.transformInputs, this.transformOutputs, watermark.getMillis(), this.task);
        }
        this.samzaTransformMetricRegistry.getTransformMetrics().getTransformWatermarkProgress(this.transformFullName).set((Object)watermark.getMillis());
        this.count.set(0L);
        this.sumOfTimestamps.set(BigInteger.ZERO);
        emitter.emitWatermark(watermark);
    }

    @VisibleForTesting
    void init(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> transformInputs, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> transformOutputs) {
        this.transformInputs = transformInputs;
        this.transformOutputs = transformOutputs;
    }
}

