/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.notification.spool;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.atlas.AtlasException;
import org.apache.atlas.hook.FailedMessagesLogger;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.spool.IndexManagement;
import org.apache.atlas.notification.spool.Publisher;
import org.apache.atlas.notification.spool.SpoolConfiguration;
import org.apache.atlas.notification.spool.Spooler;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AtlasFileSpool
implements NotificationInterface {
    private static final Logger LOG = LoggerFactory.getLogger(AtlasFileSpool.class);
    private final AbstractNotification notificationHandler;
    private final SpoolConfiguration config;
    private final IndexManagement indexManagement;
    private final Spooler spooler;
    private final Publisher publisher;
    private Thread publisherThread;
    private Boolean initDone = null;
    private String currentUser;

    public AtlasFileSpool(Configuration configuration, AbstractNotification notificationHandler) {
        this.notificationHandler = notificationHandler;
        this.config = new SpoolConfiguration(configuration, notificationHandler.getClass().getSimpleName());
        this.indexManagement = new IndexManagement(this.config);
        this.spooler = new Spooler(this.config, this.indexManagement);
        this.publisher = new Publisher(this.config, this.indexManagement, notificationHandler);
    }

    @Override
    public void init(String source, Object failedMessagesLogger) {
        LOG.debug("==> AtlasFileSpool.init(source={})", (Object)source);
        if (!this.isInitDone()) {
            try {
                this.config.setSource(source, this.currentUser);
                LOG.info("{}: Initialization: Starting...", (Object)this.config.getSourceName());
                this.indexManagement.init();
                if (failedMessagesLogger instanceof FailedMessagesLogger) {
                    this.spooler.setFailedMessagesLogger((FailedMessagesLogger)failedMessagesLogger);
                }
                this.startPublisher();
                this.initDone = true;
            }
            catch (AtlasException exception) {
                LOG.error("AtlasFileSpool(source={}): initialization failed", (Object)this.config.getSourceName(), (Object)exception);
                this.initDone = false;
            }
            catch (Throwable t) {
                LOG.error("AtlasFileSpool(source={}): initialization failed, unknown error", (Object)this.config.getSourceName(), (Object)t);
            }
        } else {
            LOG.debug("AtlasFileSpool.init(): initialization already done. initDone={}", (Object)this.initDone);
        }
        LOG.debug("<== AtlasFileSpool.init(source={})", (Object)source);
    }

    @Override
    public void setCurrentUser(String user) {
        this.notificationHandler.setCurrentUser(user);
        this.currentUser = user;
    }

    @Override
    public <T> List<NotificationConsumer<T>> createConsumers(NotificationInterface.NotificationType notificationType, int numConsumers) {
        LOG.warn("AtlasFileSpool.createConsumers(): not implemented");
        return null;
    }

    @Override
    public <T> void send(NotificationInterface.NotificationType type, T ... messages) throws NotificationException {
        this.send(type, Arrays.asList(messages));
    }

    @Override
    public boolean isReady(NotificationInterface.NotificationType type) {
        return true;
    }

    @Override
    public <T> void send(NotificationInterface.NotificationType type, List<T> messages) throws NotificationException {
        List<String> serializedMessages = this.getSerializedMessages(messages);
        if (this.hasInitSucceeded() && (this.indexManagement.isPending() || this.publisher.isDestinationDown())) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("AtlasFileSpool.send(): sending to spooler");
            }
            this.spooler.sendInternal(type, serializedMessages);
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug("AtlasFileSpool.send(): sending to notificationHandler");
            }
            try {
                this.notificationHandler.sendInternal(type, serializedMessages);
            }
            catch (Exception e) {
                if (this.isInitDone()) {
                    LOG.info("AtlasFileSpool.send(): failed in sending to notificationHandler. Sending to spool", (Throwable)e);
                    this.publisher.setDestinationDown();
                    this.spooler.sendInternal(type, serializedMessages);
                }
                LOG.warn("AtlasFileSpool.send(): failed in sending to notificationHandler. Not sending to spool, as it is not initialized.", (Throwable)e);
                throw e;
            }
        }
    }

    private <T> List<String> getSerializedMessages(List<T> messages) {
        ArrayList<String> serializedMessages = new ArrayList<String>(messages.size());
        for (int index = 0; index < messages.size(); ++index) {
            AbstractNotification.createNotificationMessages(messages.get(index), serializedMessages);
        }
        return serializedMessages;
    }

    @Override
    public void close() {
        try {
            this.spooler.setDrain();
            this.publisher.setDrain();
            this.indexManagement.stop();
            this.publisherThread.join();
        }
        catch (InterruptedException e) {
            LOG.error("Interrupted! source={}", (Object)this.config.getSourceName(), (Object)e);
        }
    }

    private void startPublisher() {
        this.publisherThread = new Thread(this.publisher);
        this.publisherThread.setDaemon(true);
        this.publisherThread.setContextClassLoader(this.getClass().getClassLoader());
        this.publisherThread.start();
        LOG.info("{}: publisher started!", (Object)this.config.getSourceName());
    }

    private boolean isInitDone() {
        return this.initDone != null;
    }

    private boolean hasInitSucceeded() {
        return this.initDone != null && this.initDone == true;
    }
}

