package com.flipkart.gojira.queuedsender;

import com.flipkart.gojira.core.injectors.GuiceInjector;
import com.flipkart.gojira.models.TestData;
import com.flipkart.gojira.models.TestDataType;
import com.flipkart.gojira.models.TestRequestData;
import com.flipkart.gojira.models.TestResponseData;
import com.flipkart.gojira.serde.SerdeHandlerRepository;
import com.flipkart.gojira.sinkstore.handlers.SinkHandler;
import com.leansoft.bigqueue.BigQueueImpl;
import com.leansoft.bigqueue.IBigQueue;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/flipkart/gojira/queuedsender/TestQueuedSenderImpl.class */
public class TestQueuedSenderImpl extends TestQueuedSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestQueuedSenderImpl.class.getSimpleName());
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
    private IBigQueue messageQueue;

    /* loaded from: input_file:com/flipkart/gojira/queuedsender/TestQueuedSenderImpl$MessageSenderThread.class */
    private static final class MessageSenderThread implements Runnable {
        private IBigQueue messageQueue;

        public MessageSenderThread(IBigQueue iBigQueue) {
            this.messageQueue = iBigQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.messageQueue.isEmpty()) {
                try {
                    TestQueuedSenderImpl.LOGGER.info("There are messages in the hyperion message queue. Sender invoked.");
                    byte[] dequeue = this.messageQueue.dequeue();
                    if (null == dequeue) {
                        break;
                    }
                    TestData testData = (TestData) ((SerdeHandlerRepository) GuiceInjector.getInjector().getInstance(SerdeHandlerRepository.class)).getTestDataSerdeHandler().deserialize(dequeue, TestData.class);
                    TestQueuedSenderImpl.LOGGER.info("TestData with id: " + testData.getId() + " send for DataStore write.");
                    ((SinkHandler) GuiceInjector.getInjector().getInstance(SinkHandler.class)).write(testData.getId(), dequeue);
                } catch (Exception e) {
                    TestQueuedSenderImpl.LOGGER.error("Could not send message: ", e);
                    return;
                }
            }
        }
    }

    /* loaded from: input_file:com/flipkart/gojira/queuedsender/TestQueuedSenderImpl$TestQueueCleaner.class */
    private static final class TestQueueCleaner implements Runnable {
        private IBigQueue messageQueue;

        private TestQueueCleaner(IBigQueue iBigQueue) {
            this.messageQueue = iBigQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                this.messageQueue.gc();
                TestQueuedSenderImpl.LOGGER.info(String.format("Ran GC on queue. Took: %d milliseconds", Long.valueOf(System.currentTimeMillis() - currentTimeMillis)));
            } catch (IOException e) {
                TestQueuedSenderImpl.LOGGER.error("Could not perform GC on hyperion message queue: ", e);
            }
        }
    }

    @Override // com.flipkart.gojira.queuedsender.TestQueuedSender
    public void setup() throws Exception {
        Files.createDirectories(Paths.get(this.testQueuedSenderConfig.getPath(), new String[0]), PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxrwxrwx")));
        this.messageQueue = new BigQueueImpl(this.testQueuedSenderConfig.getPath(), "gojira-messages");
        this.scheduler.scheduleWithFixedDelay(new MessageSenderThread(this.messageQueue), 20L, this.testQueuedSenderConfig.getQueuePurgeInterval(), TimeUnit.SECONDS);
        this.scheduler.scheduleAtFixedRate(new TestQueueCleaner(this.messageQueue), 20L, this.testQueuedSenderConfig.getQueuePurgeInterval(), TimeUnit.SECONDS);
    }

    @Override // com.flipkart.gojira.queuedsender.TestQueuedSender
    public void shutdown() throws Exception {
        while (!this.messageQueue.isEmpty()) {
            Thread.sleep(1000L);
        }
        this.scheduler.shutdownNow();
    }

    @Override // com.flipkart.gojira.queuedsender.TestQueuedSender
    public <T extends TestDataType> void send(TestData<TestRequestData<T>, TestResponseData<T>, T> testData) throws Exception {
        if (this.messageQueue.size() >= this.testQueuedSenderConfig.getQueueSize().longValue()) {
            LOGGER.error("messageQueue size greater than " + this.testQueuedSenderConfig.getQueueSize() + " testData.id " + testData.getId());
        } else {
            LOGGER.info("TestData with id: " + testData.getId() + " enqueued.");
            this.messageQueue.enqueue(((SerdeHandlerRepository) GuiceInjector.getInjector().getInstance(SerdeHandlerRepository.class)).getTestDataSerdeHandler().serialize(testData));
        }
    }
}
