/*
 * Decompiled with CFR 0.152.
 */
package org.apache.skywalking.apm.plugin.kafka;

import java.lang.reflect.Method;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.Component;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.kafka.CallbackCache;

public class KafkaProducerInterceptor
implements InstanceMethodsAroundInterceptor {
    public static final String OPERATE_NAME_PREFIX = "Kafka/";
    public static final String PRODUCER_OPERATE_NAME_SUFFIX = "/Producer";

    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
        ContextSnapshot snapshot;
        ContextCarrier contextCarrier = new ContextCarrier();
        ProducerRecord record = (ProducerRecord)allArguments[0];
        String topicName = record.topic();
        AbstractSpan activeSpan = ContextManager.createExitSpan((String)(OPERATE_NAME_PREFIX + topicName + PRODUCER_OPERATE_NAME_SUFFIX), (ContextCarrier)contextCarrier, (String)((String)objInst.getSkyWalkingDynamicField()));
        Tags.MQ_BROKER.set(activeSpan, (String)objInst.getSkyWalkingDynamicField());
        Tags.MQ_TOPIC.set(activeSpan, topicName);
        SpanLayer.asMQ((AbstractSpan)activeSpan);
        activeSpan.setComponent((Component)ComponentsDefine.KAFKA_PRODUCER);
        CarrierItem next = contextCarrier.items();
        while (next.hasNext()) {
            next = next.next();
            record.headers().add(next.getHeadKey(), next.getHeadValue().getBytes());
        }
        EnhancedInstance callbackInstance = (EnhancedInstance)allArguments[1];
        if (null != callbackInstance && null != (snapshot = ContextManager.capture())) {
            CallbackCache cache = new CallbackCache();
            cache.setSnapshot(snapshot);
            callbackInstance.setSkyWalkingDynamicField((Object)cache);
        }
    }

    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
        ContextManager.stopSpan();
        return ret;
    }

    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
    }
}

