/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.dataproxy.sink.common;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.inlong.common.enums.InlongCompressType;
import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
import org.apache.inlong.dataproxy.sink.common.EventHandler;
import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
import org.apache.inlong.sdk.commons.protocol.ProxySdk;
import org.apache.inlong.sdk.commons.utils.GzipUtils;
import org.xerial.snappy.Snappy;

public class DefaultEventHandler
implements EventHandler {
    @Override
    public Map<String, String> parseHeader(IdTopicConfig idConfig, BatchPackProfile profile, String nodeId, InlongCompressType compressType) {
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("msgEnType", MessageWrapType.INLONG_MSG_V1.getStrId());
        headers.put("version", MessageWrapType.INLONG_MSG_V1.getStrId());
        headers.put("inlongGroupId", profile.getInlongGroupId());
        headers.put("inlongStreamId", profile.getInlongStreamId());
        headers.put("proxyName", nodeId);
        headers.put("packTime", String.valueOf(System.currentTimeMillis()));
        headers.put("msgCount", String.valueOf(profile.getEvents().size()));
        headers.put("srcLength", String.valueOf(profile.getSize()));
        headers.put("compressType", String.valueOf(compressType.getName()));
        return headers;
    }

    @Override
    public byte[] parseBody(IdTopicConfig idConfig, BatchPackProfile profile, InlongCompressType compressType) throws IOException {
        List<ProxyEvent> events = profile.getEvents();
        ProxySdk.MessageObjs.Builder objs = ProxySdk.MessageObjs.newBuilder();
        for (ProxyEvent event : events) {
            ProxySdk.MessageObj.Builder builder = ProxySdk.MessageObj.newBuilder();
            builder.setMsgTime(event.getMsgTime());
            builder.setSourceIp(event.getSourceIp());
            event.getHeaders().forEach((key, value) -> builder.addParams(ProxySdk.MapFieldEntry.newBuilder().setKey(key).setValue(value)));
            builder.setBody(ByteString.copyFrom((byte[])event.getBody()));
            objs.addMsgs(builder.build());
        }
        byte[] srcBytes = objs.build().toByteArray();
        byte[] compressBytes = null;
        switch (compressType) {
            case INLONG_SNAPPY: {
                compressBytes = Snappy.compress((byte[])srcBytes);
                break;
            }
            case INLONG_GZ: {
                compressBytes = GzipUtils.compress((byte[])srcBytes);
                break;
            }
            default: {
                compressBytes = srcBytes;
            }
        }
        return compressBytes;
    }
}

