/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.tool.tsfile.subscription;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.StringUtils;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.session.subscription.consumer.ISubscriptionTablePullConsumer;
import org.apache.iotdb.session.subscription.consumer.table.SubscriptionTablePullConsumer;
import org.apache.iotdb.session.subscription.consumer.table.SubscriptionTablePullConsumerBuilder;
import org.apache.iotdb.session.subscription.model.Topic;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
import org.apache.iotdb.tool.tsfile.subscription.AbstractSubscriptionTsFile;
import org.apache.iotdb.tool.tsfile.subscription.CommonParam;

public class SubscriptionTableTsFile
extends AbstractSubscriptionTsFile {
    @Override
    public void createTopics(String topicName) throws IoTDBConnectionException, StatementExecutionException {
        Properties properties;
        block6: {
            block5: {
                properties = new Properties();
                properties.put("mode", "snapshot");
                properties.put("format", "TsFileHandler");
                properties.put("strict", (Object)false);
                properties.put("loose-range", "");
                if (StringUtils.isNotBlank((CharSequence)commonParam.getStartTime())) {
                    properties.put("start-time", commonParam.getStartTime());
                }
                if (StringUtils.isNotBlank((CharSequence)commonParam.getEndTime())) {
                    properties.put("end-time", commonParam.getEndTime());
                }
                if (StringUtils.isNotBlank((CharSequence)commonParam.getDatabase())) break block5;
                if (!StringUtils.isNotBlank((CharSequence)commonParam.getTable())) break block6;
            }
            properties.put("database", commonParam.getDatabase());
            properties.put("table", commonParam.getTable());
        }
        commonParam.getTableSubs().createTopic(topicName, properties);
    }

    @Override
    public void doClean() throws Exception {
        List<ISubscriptionTablePullConsumer> pullTableConsumers = commonParam.getPullTableConsumers();
        for (int i = commonParam.getStartIndex(); i < pullTableConsumers.size(); ++i) {
            SubscriptionTablePullConsumer consumer = (SubscriptionTablePullConsumer)pullTableConsumers.get(i);
            String path = commonParam.getTargetDir() + File.separator + consumer.getConsumerGroupId() + File.separator + consumer.getConsumerId();
            File file = new File(path);
            if (!file.exists()) continue;
            FileUtils.deleteFileOrDirectory((File)file);
        }
        for (Topic topic : CommonParam.getTableSubs().getTopics()) {
            try {
                commonParam.getTableSubs().dropTopicIfExists(topic.getTopicName());
            }
            catch (Exception exception) {}
        }
        commonParam.getTableSubs().close();
    }

    @Override
    public void createConsumers(String groupId) {
        CommonParam.setPullTableConsumers(new ArrayList<ISubscriptionTablePullConsumer>(CommonParam.getConsumerCount()));
        for (int i = commonParam.getStartIndex(); i < commonParam.getConsumerCount(); ++i) {
            commonParam.getPullTableConsumers().add(new SubscriptionTablePullConsumerBuilder().host(commonParam.getSrcHost()).port(Integer.valueOf(commonParam.getSrcPort())).consumerId("consumer_" + i).consumerGroupId(groupId).autoCommit(true).autoCommitIntervalMs(5000L).fileSaveDir(commonParam.getTargetDir()).build());
        }
        commonParam.getPullTableConsumers().removeIf(consumer -> {
            try {
                consumer.open();
                return false;
            }
            catch (SubscriptionException e) {
                return true;
            }
        });
        CommonParam.setConsumerCount(commonParam.getPullTableConsumers().size());
    }

    @Override
    public void subscribe(String topicName) throws IoTDBConnectionException, StatementExecutionException {
        List<ISubscriptionTablePullConsumer> pullTableConsumers = commonParam.getPullTableConsumers();
        for (int i = 0; i < pullTableConsumers.size(); ++i) {
            try {
                pullTableConsumers.get(i).subscribe(topicName);
                continue;
            }
            catch (Exception e) {
                e.printStackTrace(System.out);
            }
        }
    }

    @Override
    public void consumerPoll(ExecutorService executor, final String topicName) {
        List<ISubscriptionTablePullConsumer> pullTableConsumers = commonParam.getPullTableConsumers();
        for (int i = commonParam.getStartIndex(); i < pullTableConsumers.size(); ++i) {
            final SubscriptionTablePullConsumer consumer = (SubscriptionTablePullConsumer)pullTableConsumers.get(i);
            executor.submit(new Runnable(){

                @Override
                public void run() {
                    String consumerGroupId = consumer.getConsumerGroupId();
                    while (!consumer.allTopicMessagesHaveBeenConsumed()) {
                        try {
                            for (SubscriptionMessage message : consumer.poll(10000L)) {
                                SubscriptionTsFileHandler handler = message.getTsFileHandler();
                                AbstractSubscriptionTsFile.ioTPrinter.println(handler.getFile().getName());
                                try {
                                    handler.moveFile(Paths.get(AbstractSubscriptionTsFile.commonParam.getTargetDir() + File.separator + consumerGroupId, handler.getPath().getFileName().toString()));
                                }
                                catch (IOException e) {
                                    throw new RuntimeException(e);
                                }
                                AbstractSubscriptionTsFile.commonParam.getCountFile().incrementAndGet();
                            }
                        }
                        catch (Exception e) {
                            e.printStackTrace(System.out);
                        }
                    }
                    consumer.unsubscribe(topicName);
                }
            });
        }
    }
}

