/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.app.hdfs.dataset.sink;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import javax.annotation.PreDestroy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.kitesdk.data.PartitionStrategy;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.app.hdfs.dataset.sink.HdfsDatasetSinkProperties;
import org.springframework.cloud.stream.binding.InputBindingLifecycle;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.data.hadoop.store.StoreException;
import org.springframework.data.hadoop.store.dataset.DatasetDefinition;
import org.springframework.data.hadoop.store.dataset.DatasetOperations;
import org.springframework.data.hadoop.store.dataset.DatasetRepositoryFactory;
import org.springframework.data.hadoop.store.dataset.DatasetTemplate;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.SpelEvaluationException;
import org.springframework.expression.spel.SpelParseException;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.integration.aggregator.CorrelationStrategy;
import org.springframework.integration.aggregator.DefaultAggregatingMessageGroupProcessor;
import org.springframework.integration.aggregator.ExpressionEvaluatingCorrelationStrategy;
import org.springframework.integration.aggregator.MessageCountReleaseStrategy;
import org.springframework.integration.aggregator.ReleaseStrategy;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.AggregatorFactoryBean;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.MessageGroupStoreReaper;
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.StringUtils;

@org.springframework.context.annotation.Configuration
@EnableScheduling
@EnableBinding(value={Sink.class})
@EnableConfigurationProperties(value={HdfsDatasetSinkProperties.class})
public class HdfsDatasetSinkConfiguration {
    private static final Log logger = LogFactory.getLog(HdfsDatasetSinkConfiguration.class);
    @Autowired
    private HdfsDatasetSinkProperties properties;

    @Bean
    public MessageChannel toSink() {
        return new DirectChannel();
    }

    @Bean
    @Primary
    @ServiceActivator(inputChannel="input")
    FactoryBean<MessageHandler> aggregatorFactoryBean(MessageChannel toSink, MessageGroupStore messageGroupStore) {
        AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
        aggregatorFactoryBean.setCorrelationStrategy((CorrelationStrategy)new ExpressionEvaluatingCorrelationStrategy("payload.getClass().name"));
        aggregatorFactoryBean.setReleaseStrategy((ReleaseStrategy)new MessageCountReleaseStrategy(this.properties.getBatchSize()));
        aggregatorFactoryBean.setMessageStore(messageGroupStore);
        aggregatorFactoryBean.setProcessorBean((Object)new DefaultAggregatingMessageGroupProcessor());
        aggregatorFactoryBean.setExpireGroupsUponCompletion(Boolean.valueOf(true));
        aggregatorFactoryBean.setSendPartialResultOnExpiry(Boolean.valueOf(true));
        aggregatorFactoryBean.setOutputChannel(toSink);
        return aggregatorFactoryBean;
    }

    @Bean
    @ServiceActivator(inputChannel="toSink")
    public MessageHandler datasetSinkMessageHandler(final DatasetOperations datasetOperations) {
        return new MessageHandler(){

            public void handleMessage(Message<?> message) throws MessagingException {
                Object payload = message.getPayload();
                if (!(payload instanceof Collection)) {
                    throw new IllegalStateException("Expected a collection of POJOs but received " + message.getPayload().getClass().getName());
                }
                Collection payloads = (Collection)payload;
                logger.debug((Object)("Writing a collection of {} POJOs" + payloads.size()));
                datasetOperations.write((Collection)message.getPayload());
            }
        };
    }

    @Bean
    MessageGroupStore messageGroupStore() {
        SimpleMessageStore messageGroupStore = new SimpleMessageStore();
        messageGroupStore.setTimeoutOnIdle(true);
        messageGroupStore.setCopyOnGet(false);
        return messageGroupStore;
    }

    @Bean
    MessageGroupStoreReaper messageGroupStoreReaper(MessageGroupStore messageStore, InputBindingLifecycle inputBindingLifecycle) {
        MessageGroupStoreReaper messageGroupStoreReaper = new MessageGroupStoreReaper(messageStore);
        messageGroupStoreReaper.setPhase(inputBindingLifecycle.getPhase() - 1);
        messageGroupStoreReaper.setTimeout(this.properties.getIdleTimeout());
        messageGroupStoreReaper.setAutoStartup(true);
        messageGroupStoreReaper.setExpireOnDestroy(true);
        return messageGroupStoreReaper;
    }

    @Bean
    ReaperTask reaperTask() {
        return new ReaperTask();
    }

    @Bean
    FsShutdown fsShutdown(InputBindingLifecycle inputBindingLifecycle) {
        return new FsShutdown(inputBindingLifecycle.getPhase() - 2);
    }

    @Bean
    public DatasetOperations datasetOperations(DatasetRepositoryFactory datasetRepositoryFactory, DatasetDefinition datasetDefinition) {
        return new DatasetTemplate(datasetRepositoryFactory, datasetDefinition);
    }

    @Bean
    public DatasetRepositoryFactory datasetRepositoryFactory(Configuration configuration) {
        DatasetRepositoryFactory datasetRepositoryFactory = new DatasetRepositoryFactory();
        Configuration moduleConfiguration = new Configuration(configuration);
        moduleConfiguration.setBoolean("fs.automatic.close", false);
        if (StringUtils.hasText((String)this.properties.getFsUri())) {
            moduleConfiguration.set("fs.defaultFS", this.properties.getFsUri());
        }
        datasetRepositoryFactory.setConf(moduleConfiguration);
        datasetRepositoryFactory.setBasePath(this.properties.getDirectory());
        datasetRepositoryFactory.setNamespace(this.properties.getNamespace());
        return datasetRepositoryFactory;
    }

    @Bean
    public DatasetDefinition datasetDefinition() {
        DatasetDefinition datasetDefinition = new DatasetDefinition(this.properties.isAllowNullValues(), this.properties.getFormat());
        if (StringUtils.hasText((String)this.properties.getPartitionPath())) {
            datasetDefinition.setPartitionStrategy(HdfsDatasetSinkConfiguration.parsePartitionExpression(this.properties.getPartitionPath()));
        }
        if (this.properties.getWriterCacheSize() > 0) {
            datasetDefinition.setWriterCacheSize(Integer.valueOf(this.properties.getWriterCacheSize()));
        }
        if (StringUtils.hasText((String)this.properties.getCompressionType())) {
            datasetDefinition.setCompressionType(this.properties.getCompressionType());
        }
        return datasetDefinition;
    }

    private static PartitionStrategy parsePartitionExpression(String expression) {
        List<String> expressions = Arrays.asList(expression.split("/"));
        SpelExpressionParser parser = new SpelExpressionParser();
        PartitionStrategy.Builder psb = new PartitionStrategy.Builder();
        StandardEvaluationContext ctx = new StandardEvaluationContext((Object)psb);
        for (String expr : expressions) {
            try {
                Expression e = parser.parseExpression(expr);
                psb = (PartitionStrategy.Builder)e.getValue((EvaluationContext)ctx, PartitionStrategy.Builder.class);
            }
            catch (SpelParseException spe) {
                if (!expr.trim().endsWith(")")) {
                    throw new StoreException("Invalid partitioning expression '" + expr + "' -  did you forget the closing parenthesis?", (Throwable)spe);
                }
                throw new StoreException("Invalid partitioning expression '" + expr + "'!", (Throwable)spe);
            }
            catch (SpelEvaluationException see) {
                throw new StoreException("Invalid partitioning expression '" + expr + "' - failed evaluation!", (Throwable)see);
            }
            catch (NullPointerException npe) {
                throw new StoreException("Invalid partitioning expression '" + expr + "' - was evaluated to null!", (Throwable)npe);
            }
        }
        return psb.build();
    }

    public static class FsShutdown
    implements SmartLifecycle {
        private int phase;
        private volatile boolean running = true;

        public FsShutdown(int phase) {
            this.phase = phase;
        }

        public boolean isAutoStartup() {
            return true;
        }

        public void stop(Runnable runnable) {
            this.stop();
            if (runnable != null) {
                runnable.run();
            }
        }

        public void start() {
        }

        public void stop() {
            try {
                FileSystem.closeAll();
                logger.info((Object)"Closing the Hadoop FileSystem");
            }
            catch (IOException e) {
                logger.error((Object)"Unable to close Hadoop FileSystem", (Throwable)e);
            }
            this.running = false;
        }

        public boolean isRunning() {
            return this.running;
        }

        public int getPhase() {
            return this.phase;
        }
    }

    public static class ReaperTask {
        @Autowired
        MessageGroupStoreReaper messageGroupStoreReaper;

        @Scheduled(fixedRate=1000L)
        public void reap() {
            this.messageGroupStoreReaper.run();
        }

        @PreDestroy
        public void beforeDestroy() {
            this.reap();
        }
    }
}

