package com.katanox.tabour.integration.sqs.core.consumer;

import com.amazonaws.services.sqs.model.Message;
import com.katanox.tabour.config.EventPollerProperties;
import com.katanox.tabour.exception.ExceptionHandler;
import com.katanox.tabour.integration.sqs.config.SqsConfiguration;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import kotlin.Metadata;
import kotlin.jvm.internal.Intrinsics;
import kotlin.text.Charsets;
import mu.KLogger;
import org.jetbrains.annotations.NotNull;

/* compiled from: SqsEventPoller.kt */
@Metadata(mv = {1, 4, 2}, bv = {1, 0, 3}, k = 1, d1 = {"��J\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0006\u0018��2\u00020\u0001BE\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t\u0012\u0006\u0010\n\u001a\u00020\u000b\u0012\u0006\u0010\f\u001a\u00020\r\u0012\u0006\u0010\u000e\u001a\u00020\u000f\u0012\u0006\u0010\u0010\u001a\u00020\u0011¢\u0006\u0002\u0010\u0012J\u0010\u0010\u0013\u001a\u00020\u00142\u0006\u0010\u0015\u001a\u00020\u0016H\u0002J\u0010\u0010\u0017\u001a\u00020\u00142\u0006\u0010\u0018\u001a\u00020\u0016H\u0002J\b\u0010\u0019\u001a\u00020\u0014H\u0002J\u0006\u0010\u001a\u001a\u00020\u0014J\u0006\u0010\u001b\u001a\u00020\u0014R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0010\u001a\u00020\u0011X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\n\u001a\u00020\u000bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\f\u001a\u00020\rX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000e\u001a\u00020\u000fX\u0082\u0004¢\u0006\u0002\n��¨\u0006\u001c"}, d2 = {"Lcom/katanox/tabour/integration/sqs/core/consumer/SqsEventPoller;", "", "name", "", "eventHandler", "Lcom/katanox/tabour/integration/sqs/core/consumer/SqsEventHandler;", "eventFetcher", "Lcom/katanox/tabour/integration/sqs/core/consumer/SqsEventFetcher;", "pollerThreadPool", "Ljava/util/concurrent/ScheduledThreadPoolExecutor;", "handlerThreadPool", "Ljava/util/concurrent/ThreadPoolExecutor;", "pollingProperties", "Lcom/katanox/tabour/config/EventPollerProperties;", "sqsConfiguration", "Lcom/katanox/tabour/integration/sqs/config/SqsConfiguration;", "exceptionHandler", "Lcom/katanox/tabour/exception/ExceptionHandler;", "(Ljava/lang/String;Lcom/katanox/tabour/integration/sqs/core/consumer/SqsEventHandler;Lcom/katanox/tabour/integration/sqs/core/consumer/SqsEventFetcher;Ljava/util/concurrent/ScheduledThreadPoolExecutor;Ljava/util/concurrent/ThreadPoolExecutor;Lcom/katanox/tabour/config/EventPollerProperties;Lcom/katanox/tabour/integration/sqs/config/SqsConfiguration;Lcom/katanox/tabour/exception/ExceptionHandler;)V", "acknowledgeMessage", "", "message", "Lcom/amazonaws/services/sqs/model/Message;", "handleMessage", "sqsMessage", "pollMessages", "start", "stop", "tabour"})
/* loaded from: input_file:com/katanox/tabour/integration/sqs/core/consumer/SqsEventPoller.class */
public final class SqsEventPoller {
    private final String name;
    private final SqsEventHandler eventHandler;
    private final SqsEventFetcher eventFetcher;
    private final ScheduledThreadPoolExecutor pollerThreadPool;
    private final ThreadPoolExecutor handlerThreadPool;
    private final EventPollerProperties pollingProperties;
    private final SqsConfiguration sqsConfiguration;
    private final ExceptionHandler exceptionHandler;

    public final void start() {
        KLogger kLogger;
        KLogger kLogger2;
        kLogger = SqsEventPollerKt.logger;
        kLogger.info("starting SqsMessagePoller");
        int corePoolSize = this.pollerThreadPool.getCorePoolSize();
        for (int i = 0; i < corePoolSize; i++) {
            kLogger2 = SqsEventPollerKt.logger;
            kLogger2.info("starting SqsMessagePoller ({}) - thread {}", this.name, Integer.valueOf(i));
            this.pollerThreadPool.scheduleWithFixedDelay(new Runnable() { // from class: com.katanox.tabour.integration.sqs.core.consumer.SqsEventPoller$start$1
                @Override // java.lang.Runnable
                public final void run() {
                    SqsEventPoller.this.pollMessages();
                }
            }, this.pollingProperties.getPollDelay().getSeconds(), this.pollingProperties.getPollDelay().getSeconds(), TimeUnit.SECONDS);
        }
    }

    public final void stop() {
        KLogger kLogger;
        kLogger = SqsEventPollerKt.logger;
        kLogger.info("stopping SqsMessagePoller");
        this.pollerThreadPool.shutdownNow();
        this.handlerThreadPool.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void pollMessages() {
        KLogger kLogger;
        try {
            Iterator<Message> it = this.eventFetcher.fetchMessages().iterator();
            while (it.hasNext()) {
                handleMessage(it.next());
            }
        } catch (Exception e) {
            kLogger = SqsEventPollerKt.logger;
            kLogger.error("error fetching messages from queue {}:", this.eventHandler.getSqsQueueUrl(), e);
        }
    }

    private final void handleMessage(final Message message) {
        KLogger kLogger;
        kLogger = SqsEventPollerKt.logger;
        kLogger.info("Received message ID {}", message.getMessageId());
        final String body = message.getBody();
        this.handlerThreadPool.submit(new Runnable() { // from class: com.katanox.tabour.integration.sqs.core.consumer.SqsEventPoller$handleMessage$1
            @Override // java.lang.Runnable
            public final void run() {
                ExceptionHandler exceptionHandler;
                SqsEventHandler sqsEventHandler;
                SqsEventHandler sqsEventHandler2;
                SqsEventHandler sqsEventHandler3;
                SqsEventHandler sqsEventHandler4;
                KLogger kLogger2;
                SqsEventHandler sqsEventHandler5;
                try {
                    try {
                        sqsEventHandler3 = SqsEventPoller.this.eventHandler;
                        String str = body;
                        Intrinsics.checkNotNullExpressionValue(str, "message");
                        Charset charset = Charsets.UTF_8;
                        if (str == null) {
                            throw new NullPointerException("null cannot be cast to non-null type java.lang.String");
                        }
                        byte[] bytes = str.getBytes(charset);
                        Intrinsics.checkNotNullExpressionValue(bytes, "(this as java.lang.String).getBytes(charset)");
                        sqsEventHandler3.onBeforeHandle(bytes);
                        sqsEventHandler4 = SqsEventPoller.this.eventHandler;
                        String str2 = body;
                        Intrinsics.checkNotNullExpressionValue(str2, "message");
                        Charset charset2 = Charsets.UTF_8;
                        if (str2 == null) {
                            throw new NullPointerException("null cannot be cast to non-null type java.lang.String");
                        }
                        byte[] bytes2 = str2.getBytes(charset2);
                        Intrinsics.checkNotNullExpressionValue(bytes2, "(this as java.lang.String).getBytes(charset)");
                        sqsEventHandler4.handle(bytes2);
                        SqsEventPoller.this.acknowledgeMessage(message);
                        kLogger2 = SqsEventPollerKt.logger;
                        kLogger2.debug("message {} processed successfully - message has been deleted from SQS", message.getMessageId());
                        sqsEventHandler5 = SqsEventPoller.this.eventHandler;
                        String str3 = body;
                        Intrinsics.checkNotNullExpressionValue(str3, "message");
                        Charset charset3 = Charsets.UTF_8;
                        if (str3 == null) {
                            throw new NullPointerException("null cannot be cast to non-null type java.lang.String");
                        }
                        byte[] bytes3 = str3.getBytes(charset3);
                        Intrinsics.checkNotNullExpressionValue(bytes3, "(this as java.lang.String).getBytes(charset)");
                        sqsEventHandler5.onAfterHandle(bytes3);
                    } catch (Exception e) {
                        exceptionHandler = SqsEventPoller.this.exceptionHandler;
                        switch (exceptionHandler.handleException(message, e)) {
                            case DELETE:
                                SqsEventPoller.this.acknowledgeMessage(message);
                                break;
                        }
                        sqsEventHandler = SqsEventPoller.this.eventHandler;
                        String str4 = body;
                        Intrinsics.checkNotNullExpressionValue(str4, "message");
                        Charset charset4 = Charsets.UTF_8;
                        if (str4 == null) {
                            throw new NullPointerException("null cannot be cast to non-null type java.lang.String");
                        }
                        byte[] bytes4 = str4.getBytes(charset4);
                        Intrinsics.checkNotNullExpressionValue(bytes4, "(this as java.lang.String).getBytes(charset)");
                        sqsEventHandler.onAfterHandle(bytes4);
                    }
                } catch (Throwable th) {
                    sqsEventHandler2 = SqsEventPoller.this.eventHandler;
                    String str5 = body;
                    Intrinsics.checkNotNullExpressionValue(str5, "message");
                    Charset charset5 = Charsets.UTF_8;
                    if (str5 == null) {
                        throw new NullPointerException("null cannot be cast to non-null type java.lang.String");
                    }
                    byte[] bytes5 = str5.getBytes(charset5);
                    Intrinsics.checkNotNullExpressionValue(bytes5, "(this as java.lang.String).getBytes(charset)");
                    sqsEventHandler2.onAfterHandle(bytes5);
                    throw th;
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void acknowledgeMessage(Message message) {
        this.sqsConfiguration.amazonSQSAsync().deleteMessage(this.eventHandler.getSqsQueueUrl(), message.getReceiptHandle());
    }

    public SqsEventPoller(@NotNull String str, @NotNull SqsEventHandler sqsEventHandler, @NotNull SqsEventFetcher sqsEventFetcher, @NotNull ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, @NotNull ThreadPoolExecutor threadPoolExecutor, @NotNull EventPollerProperties eventPollerProperties, @NotNull SqsConfiguration sqsConfiguration, @NotNull ExceptionHandler exceptionHandler) {
        Intrinsics.checkNotNullParameter(str, "name");
        Intrinsics.checkNotNullParameter(sqsEventHandler, "eventHandler");
        Intrinsics.checkNotNullParameter(sqsEventFetcher, "eventFetcher");
        Intrinsics.checkNotNullParameter(scheduledThreadPoolExecutor, "pollerThreadPool");
        Intrinsics.checkNotNullParameter(threadPoolExecutor, "handlerThreadPool");
        Intrinsics.checkNotNullParameter(eventPollerProperties, "pollingProperties");
        Intrinsics.checkNotNullParameter(sqsConfiguration, "sqsConfiguration");
        Intrinsics.checkNotNullParameter(exceptionHandler, "exceptionHandler");
        this.name = str;
        this.eventHandler = sqsEventHandler;
        this.eventFetcher = sqsEventFetcher;
        this.pollerThreadPool = scheduledThreadPoolExecutor;
        this.handlerThreadPool = threadPoolExecutor;
        this.pollingProperties = eventPollerProperties;
        this.sqsConfiguration = sqsConfiguration;
        this.exceptionHandler = exceptionHandler;
    }
}
