package io.continual.services.processor.engine.library.services.bucketing;

import io.continual.services.processor.config.readers.ConfigLoadContext;
import io.continual.services.processor.engine.library.sources.JsonObjectStreamSource;
import io.continual.services.processor.engine.model.Message;
import io.continual.services.processor.engine.model.MessageProcessingContext;
import io.continual.services.processor.engine.model.Source;
import io.continual.services.processor.service.SimpleProcessingService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.TimeZone;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/continual/services/processor/engine/library/services/bucketing/BucketingService.class */
public class BucketingService extends SimpleProcessingService {
    private final Period fSize;
    private final long fOffsetSeconds;
    private final HashMap<Long, HashMap<String, Message>> fSet;
    private final MessageBridge fBridge;
    private JsonObjectStreamSource fRptTo;
    private final String fRptToName;
    private long fLastTs;
    public static final String kHost = "host";
    public static final String kMetricName = "metric";
    public static final String kTimestamp = "timestamp";
    public static final String kValue = "value";
    public static final String kCount = "count";
    private static final Logger log = LoggerFactory.getLogger(BucketingService.class);

    /* loaded from: input_file:io/continual/services/processor/engine/library/services/bucketing/BucketingService$MessageBridge.class */
    public interface MessageBridge {
        long getTimestamp(Message message);

        Message cloneWithTime(long j, Message message);

        Message merge(Message message, Message message2);

        String getKey(Message message);
    }

    /* loaded from: input_file:io/continual/services/processor/engine/library/services/bucketing/BucketingService$Period.class */
    public enum Period {
        SECONDS { // from class: io.continual.services.processor.engine.library.services.bucketing.BucketingService.Period.1
            @Override // io.continual.services.processor.engine.library.services.bucketing.BucketingService.Period
            protected long getFraction() {
                return 1000L;
            }
        },
        MINUTES { // from class: io.continual.services.processor.engine.library.services.bucketing.BucketingService.Period.2
            @Override // io.continual.services.processor.engine.library.services.bucketing.BucketingService.Period
            protected long getFraction() {
                return SECONDS.getFraction() * 60;
            }
        },
        HOURS { // from class: io.continual.services.processor.engine.library.services.bucketing.BucketingService.Period.3
            @Override // io.continual.services.processor.engine.library.services.bucketing.BucketingService.Period
            protected long getFraction() {
                return MINUTES.getFraction() * 60;
            }
        },
        DAYS { // from class: io.continual.services.processor.engine.library.services.bucketing.BucketingService.Period.4
            @Override // io.continual.services.processor.engine.library.services.bucketing.BucketingService.Period
            protected long getFraction() {
                return HOURS.getFraction() * 24;
            }
        },
        WEEKS { // from class: io.continual.services.processor.engine.library.services.bucketing.BucketingService.Period.5
            @Override // io.continual.services.processor.engine.library.services.bucketing.BucketingService.Period
            protected long getFraction() {
                return DAYS.getFraction() * 7;
            }
        },
        MONTHS { // from class: io.continual.services.processor.engine.library.services.bucketing.BucketingService.Period.6
            @Override // io.continual.services.processor.engine.library.services.bucketing.BucketingService.Period
            protected long getFraction() {
                return -1L;
            }
        };

        protected abstract long getFraction();

        public static List<Long> getTimestampsBetween(Period period, long j, long j2) {
            long bucketTimestamp = getBucketTimestamp(j, period);
            long bucketTimestamp2 = getBucketTimestamp(j2, period);
            ArrayList arrayList = new ArrayList();
            if (bucketTimestamp < bucketTimestamp2) {
                if (period != MONTHS) {
                    long fraction = period.getFraction();
                    long j3 = bucketTimestamp;
                    while (true) {
                        long j4 = j3 + fraction;
                        if (j4 >= bucketTimestamp2) {
                            break;
                        }
                        arrayList.add(Long.valueOf(j4));
                        j3 = j4;
                    }
                } else {
                    Calendar calendar = Calendar.getInstance();
                    calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
                    calendar.setTimeInMillis(bucketTimestamp);
                    calendar.add(2, 1);
                    while (calendar.getTimeInMillis() < bucketTimestamp2) {
                        arrayList.add(Long.valueOf(calendar.getTimeInMillis()));
                        calendar.add(2, 1);
                    }
                }
            }
            return arrayList;
        }

        public static long getBucketTimestamp(long j, Period period) {
            if (period != MONTHS) {
                long fraction = period.getFraction();
                return ((j / fraction) * fraction) + Math.round(0.5d * fraction);
            }
            Calendar calendar = Calendar.getInstance();
            calendar.setTimeInMillis(j);
            calendar.set(5, 15);
            calendar.set(11, 12);
            calendar.set(12, 0);
            calendar.set(13, 0);
            return calendar.getTimeInMillis();
        }

        public static Period readFrom(String str) {
            if (str == null) {
                return null;
            }
            return valueOf(str.trim().toUpperCase());
        }
    }

    /* loaded from: input_file:io/continual/services/processor/engine/library/services/bucketing/BucketingService$StdDataCombiner.class */
    public enum StdDataCombiner {
        AVERAGE,
        SUM
    }

    /* loaded from: input_file:io/continual/services/processor/engine/library/services/bucketing/BucketingService$StdMsgBridge.class */
    public static class StdMsgBridge implements MessageBridge {
        private final StdDataCombiner fCombiner;

        public StdMsgBridge(StdDataCombiner stdDataCombiner) {
            this.fCombiner = stdDataCombiner;
        }

        @Override // io.continual.services.processor.engine.library.services.bucketing.BucketingService.MessageBridge
        public long getTimestamp(Message message) {
            return message.getLong(BucketingService.kTimestamp, -1L);
        }

        @Override // io.continual.services.processor.engine.library.services.bucketing.BucketingService.MessageBridge
        public String getKey(Message message) {
            return message.getValueAsString(BucketingService.kMetricName);
        }

        @Override // io.continual.services.processor.engine.library.services.bucketing.BucketingService.MessageBridge
        public Message cloneWithTime(long j, Message message) {
            return message.m14clone().putValue(BucketingService.kTimestamp, j);
        }

        @Override // io.continual.services.processor.engine.library.services.bucketing.BucketingService.MessageBridge
        public Message merge(Message message, Message message2) {
            long j = message.getLong(BucketingService.kCount, 1L);
            double d = message.getDouble("value", 0.0d);
            long j2 = message2.getLong(BucketingService.kCount, 1L);
            double d2 = message2.getDouble("value", 0.0d);
            Message m14clone = message.m14clone();
            switch (this.fCombiner) {
                case AVERAGE:
                    double d3 = d * j;
                    double d4 = d2 * j2;
                    long j3 = j + j2;
                    return m14clone.putValue("value", j3 == 0 ? 0.0d : (d3 + d4) / j3).putValue(BucketingService.kCount, j3);
                case SUM:
                    return m14clone.putValue("value", d + d2);
                default:
                    return message;
            }
        }
    }

    public static long getBucketTimestamp(Date date, Period period) {
        return Period.getBucketTimestamp(date.getTime(), period);
    }

    public BucketingService(ConfigLoadContext configLoadContext, JSONObject jSONObject) {
        this.fLastTs = -1L;
        this.fSize = Period.readFrom(jSONObject.optString("period", Period.MINUTES.toString()));
        this.fOffsetSeconds = jSONObject.optLong("bucketTimeOffset", 0L);
        this.fBridge = new StdMsgBridge(StdDataCombiner.SUM);
        this.fSet = new HashMap<>();
        this.fRptTo = null;
        this.fRptToName = jSONObject.optString("reportTo");
    }

    public BucketingService(Period period, JsonObjectStreamSource jsonObjectStreamSource) {
        this(period, StdDataCombiner.SUM, jsonObjectStreamSource);
    }

    public BucketingService(Period period, MessageBridge messageBridge, JsonObjectStreamSource jsonObjectStreamSource) {
        this.fLastTs = -1L;
        this.fSize = period;
        this.fOffsetSeconds = 0L;
        this.fBridge = messageBridge;
        this.fSet = new HashMap<>();
        this.fRptTo = jsonObjectStreamSource;
        this.fRptToName = null;
    }

    public BucketingService(Period period, StdDataCombiner stdDataCombiner, JsonObjectStreamSource jsonObjectStreamSource) {
        this(period, new StdMsgBridge(stdDataCombiner), jsonObjectStreamSource);
    }

    @Override // io.continual.services.processor.service.SimpleProcessingService
    protected void onStopRequested() {
        close();
    }

    public synchronized void close() {
        flush();
        if (this.fRptTo != null) {
            try {
                this.fRptTo.close();
            } catch (IOException e) {
                log.warn("Problem closing bucket service target stream: " + e.getMessage());
            }
        }
    }

    public synchronized void add(MessageProcessingContext messageProcessingContext) {
        if (this.fRptTo == null && this.fRptToName != null) {
            Source source = messageProcessingContext.getSource(this.fRptToName);
            if (source instanceof JsonObjectStreamSource) {
                this.fRptTo = (JsonObjectStreamSource) source;
            }
        }
        Message message = messageProcessingContext.getMessage();
        long bucketTimestamp = Period.getBucketTimestamp(this.fBridge.getTimestamp(message), this.fSize) + (this.fOffsetSeconds * 1000);
        Message cloneWithTime = this.fBridge.cloneWithTime(bucketTimestamp, message);
        String key = this.fBridge.getKey(cloneWithTime);
        HashMap<String, Message> hashMap = this.fSet.get(Long.valueOf(bucketTimestamp));
        if (hashMap != null) {
            Message message2 = hashMap.get(key);
            hashMap.put(key, message2 == null ? cloneWithTime : this.fBridge.merge(message2, message));
        } else {
            flush();
            HashMap<String, Message> hashMap2 = new HashMap<>();
            hashMap2.put(key, cloneWithTime);
            this.fSet.put(Long.valueOf(bucketTimestamp), hashMap2);
        }
    }

    public synchronized List<Message> getBuckets() {
        LinkedList linkedList = new LinkedList();
        LinkedList linkedList2 = new LinkedList(this.fSet.keySet());
        Collections.sort(linkedList2);
        Iterator it = linkedList2.iterator();
        while (it.hasNext()) {
            HashMap<String, Message> hashMap = this.fSet.get((Long) it.next());
            LinkedList linkedList3 = new LinkedList(hashMap.keySet());
            Collections.sort(linkedList3);
            Iterator it2 = linkedList3.iterator();
            while (it2.hasNext()) {
                linkedList.add(hashMap.get((String) it2.next()));
            }
        }
        return linkedList;
    }

    private void flush() {
        if (this.fRptTo == null) {
            return;
        }
        LinkedList linkedList = new LinkedList(this.fSet.keySet());
        Collections.sort(linkedList);
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            Long l = (Long) it.next();
            if (this.fLastTs < 0) {
                this.fLastTs = l.longValue();
            } else {
                Iterator<Long> it2 = Period.getTimestampsBetween(this.fSize, this.fLastTs, l.longValue()).iterator();
                while (it2.hasNext()) {
                    this.fRptTo.submit(Message.adoptJsonAsMessage(new JSONObject().put(kTimestamp, it2.next().longValue()).put("value", 0)).toJson());
                }
            }
            HashMap<String, Message> hashMap = this.fSet.get(l);
            this.fLastTs = l.longValue();
            LinkedList linkedList2 = new LinkedList(hashMap.keySet());
            Collections.sort(linkedList2);
            Iterator it3 = linkedList2.iterator();
            while (it3.hasNext()) {
                this.fRptTo.submit(hashMap.get((String) it3.next()).toJson());
            }
        }
        this.fSet.clear();
    }
}
