/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.notification.spool;

import com.google.common.annotations.VisibleForTesting;
import java.io.DataOutput;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.spool.Archiver;
import org.apache.atlas.notification.spool.FileOperations;
import org.apache.atlas.notification.spool.SpoolConfiguration;
import org.apache.atlas.notification.spool.SpoolUtils;
import org.apache.atlas.notification.spool.models.IndexRecord;
import org.apache.atlas.notification.spool.models.IndexRecords;
import org.apache.atlas.notification.spool.utils.local.FileLockedReadWrite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IndexManagement {
    private static final Logger LOG = LoggerFactory.getLogger(IndexManagement.class);
    private static final int MAX_RETRY_ATTEMPTS = 3;
    private final SpoolConfiguration config;
    private IndexFileManager indexFileManager;
    private IndexReader indexReader;
    private IndexWriter indexWriter;

    public IndexManagement(SpoolConfiguration config) {
        this.config = config;
    }

    public void init() throws IOException, AtlasException {
        String sourceName = this.config.getSourceName();
        File spoolDir = SpoolUtils.getCreateDirectoryWithPermissionCheck(this.config.getSpoolDir(), this.config.getUser());
        if (spoolDir == null) {
            throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, spoolDir.getAbsolutePath()));
        }
        this.config.setSpoolDir(spoolDir.getAbsolutePath());
        File archiveDir = SpoolUtils.getCreateDirectory(this.config.getArchiveDir());
        if (archiveDir == null) {
            throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, archiveDir.getAbsolutePath()));
        }
        File indexFile = SpoolUtils.getCreateFile(this.config.getIndexFile(), sourceName);
        if (indexFile == null) {
            throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, indexFile.getAbsolutePath()));
        }
        File indexDoneFile = SpoolUtils.getCreateFile(this.config.getIndexDoneFile(), sourceName);
        if (indexDoneFile == null) {
            throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, indexDoneFile.getAbsolutePath()));
        }
        this.performInit(indexFile.getAbsolutePath(), sourceName);
    }

    @VisibleForTesting
    void performInit(String indexFilePath, String source) {
        try {
            File spoolDir = this.config.getSpoolDir();
            File archiveDir = this.config.getArchiveDir();
            File indexFile = this.config.getIndexFile();
            File indexDoneFile = this.config.getIndexDoneFile();
            this.indexFileManager = new IndexFileManager(source, indexFile, indexDoneFile, archiveDir, this.config.getMaxArchiveFiles());
            this.indexReader = new IndexReader(source, this.indexFileManager, this.config.getRetryDestinationMS());
            this.indexWriter = new IndexWriter(source, this.config, this.indexFileManager, this.indexReader, spoolDir, archiveDir, this.config.getFileRolloverSec());
        }
        catch (Exception e) {
            LOG.error("{}: init: Failed! Error loading records from index file: {}", (Object)this.config.getSourceName(), (Object)indexFilePath);
        }
    }

    public boolean isPending() {
        return !this.indexReader.isEmpty() || this.indexWriter.getCurrent() != null && this.indexWriter.getCurrent().isStatusWriteInProgress() || this.indexReader.currentIndexRecord != null && this.indexReader.currentIndexRecord.getStatus() == "READ_IN_PROGRESS";
    }

    public synchronized DataOutput getSpoolWriter() throws IOException {
        return this.indexWriter.getCreateWriter();
    }

    public void setSpoolWriteInProgress() {
        this.indexWriter.setFileWriteInProgress(true);
    }

    public void resetSpoolWriteInProgress() {
        this.indexWriter.setFileWriteInProgress(false);
    }

    public void updateFailedAttempt() {
        this.indexReader.updateFailedAttempt();
    }

    public IndexRecord next() throws InterruptedException {
        return this.indexReader.next();
    }

    public int getQueueSize() {
        return this.indexReader.size();
    }

    public void removeAsDone(IndexRecord indexRecord) {
        this.indexReader.removeAsDone(indexRecord);
        this.indexWriter.rolloverIfNeeded();
    }

    public void stop() {
        this.indexWriter.stop();
    }

    public void rolloverSpoolFileIfNeeded() {
        this.indexWriter.rolloverIfNeeded();
    }

    @VisibleForTesting
    IndexFileManager getIndexFileManager() {
        return this.indexFileManager;
    }

    public void update(IndexRecord record) {
        this.indexFileManager.updateIndex(record);
        LOG.info("this.indexFileManager.updateIndex: {}", (Object)record.getLine());
    }

    public void flushSpoolWriter() throws IOException {
        this.indexWriter.flushCurrent();
    }

    static class IndexFileManager {
        private final String source;
        private final File indexDoneFile;
        private final File indexFile;
        private final Archiver archiver;
        private final FileOperations fileOperations;

        public IndexFileManager(String source, File indexFile, File indexDoneFile, File archiveFolder, int maxArchiveFiles) {
            this.source = source;
            this.indexFile = indexFile;
            this.indexDoneFile = indexDoneFile;
            this.archiver = new Archiver(source, indexDoneFile, archiveFolder, maxArchiveFiles);
            this.fileOperations = new FileOperations(SpoolUtils.getEmptyRecordForWriting(), source);
        }

        public List<IndexRecord> getRecords() {
            return new ArrayList<IndexRecord>(this.loadRecords(this.indexFile).getRecords().values());
        }

        public synchronized void delete(File file, String id) {
            this.fileOperations.delete(file, id);
        }

        public synchronized IndexRecord getFirstWriteInProgressRecord() {
            IndexRecord ret = null;
            IndexRecords records = this.loadRecords(this.indexFile);
            if (records != null) {
                for (IndexRecord record : records.getRecords().values()) {
                    if (!record.isStatusWriteInProgress()) continue;
                    LOG.info("IndexFileManager.getFirstWriteInProgressRecord(source={}): current file={}", (Object)this.source, (Object)record.getPath());
                    ret = record;
                    break;
                }
            }
            return ret;
        }

        public synchronized void remove(IndexRecord record) {
            this.delete(this.indexFile, record.getId());
            this.appendToDoneFile(record);
            IndexRecords records = this.loadRecords(this.indexFile);
            if (records.size() == 0) {
                LOG.info("IndexFileManager.remove(source={}): All done!", (Object)this.source);
                this.compactFile(this.indexFile);
            }
        }

        public void appendToIndexFile(IndexRecord record) {
            this.fileOperations.append(this.indexFile, SpoolUtils.getRecordForWriting(record));
        }

        public void updateIndex(IndexRecord record) {
            this.fileOperations.update(this.indexFile, record.getId(), SpoolUtils.getRecordForWriting(record));
        }

        private void compactFile(File file) {
            LOG.info("IndexFileManager.compactFile(source={}): compacting file {}", (Object)this.source, (Object)file.getAbsolutePath());
            try {
                this.fileOperations.compact(file);
            }
            finally {
                LOG.info("IndexFileManager.compactFile(source={}): done compacting file {}", (Object)this.source, (Object)file.getAbsolutePath());
            }
        }

        private void appendToDoneFile(IndexRecord indexRecord) {
            String json = SpoolUtils.getRecordForWriting(indexRecord);
            this.fileOperations.append(this.indexDoneFile, json);
            this.archiver.archive(indexRecord);
        }

        @VisibleForTesting
        IndexRecords loadRecords(File file) {
            String[] items = this.fileOperations.load(file);
            return SpoolUtils.createRecords(items);
        }

        @VisibleForTesting
        File getDoneFile() {
            return this.indexDoneFile;
        }

        @VisibleForTesting
        File getIndexFile() {
            return this.indexFile;
        }

        @VisibleForTesting
        IndexRecord add(String path) {
            IndexRecord record = new IndexRecord(path);
            this.appendToIndexFile(record);
            return record;
        }
    }

    static class IndexReader {
        private final String source;
        private final BlockingQueue<IndexRecord> blockingQueue;
        private final IndexFileManager indexFileManager;
        private final long retryDestinationMS;
        private IndexRecord currentIndexRecord;

        public IndexReader(String source, IndexFileManager indexFileManager, long retryDestinationMS) {
            this.source = source;
            this.blockingQueue = new LinkedBlockingQueue<IndexRecord>();
            this.retryDestinationMS = retryDestinationMS;
            this.indexFileManager = indexFileManager;
            List<IndexRecord> records = indexFileManager.getRecords();
            records.stream().forEach(x -> this.addIfStatus((IndexRecord)x, "READ_IN_PROGRESS"));
            records.stream().forEach(x -> this.addIfStatus((IndexRecord)x, "PENDING"));
        }

        private void addIfStatus(IndexRecord record, String status) {
            if (record != null && record.getStatus().equals(status)) {
                if (!SpoolUtils.fileExists(record)) {
                    LOG.error("IndexReader.addIfStatus(source={}): file {} not found!", (Object)this.source, (Object)record.getPath());
                } else {
                    this.addToPublishQueue(record);
                }
            }
        }

        public void addToPublishQueue(IndexRecord record) {
            try {
                if (!this.blockingQueue.contains(record)) {
                    this.blockingQueue.add(record);
                }
            }
            catch (OverlappingFileLockException lockException) {
                LOG.warn("{}: {}: Someone else has locked the file.", (Object)this.source, (Object)record.getPath());
            }
        }

        public IndexRecord next() throws InterruptedException {
            this.currentIndexRecord = this.blockingQueue.poll(this.retryDestinationMS, TimeUnit.MILLISECONDS);
            if (this.currentIndexRecord != null) {
                this.currentIndexRecord.setStatus("READ_IN_PROGRESS");
            }
            return this.currentIndexRecord;
        }

        public int size() {
            return this.blockingQueue.size();
        }

        public boolean isEmpty() {
            return this.blockingQueue.isEmpty();
        }

        public void updateFailedAttempt() {
            if (this.currentIndexRecord != null) {
                this.currentIndexRecord.updateFailedAttempt();
                this.indexFileManager.updateIndex(this.currentIndexRecord);
            }
        }

        public void removeAsDone(IndexRecord indexRecord) {
            indexRecord.setDone();
            this.indexFileManager.remove(indexRecord);
        }
    }

    static class IndexWriter {
        private final String source;
        private final SpoolConfiguration config;
        private final File spoolFolder;
        private final File archiveFolder;
        private final int rollOverTimeout;
        private final IndexFileManager indexFileManager;
        private final IndexReader indexReader;
        private final FileLockedReadWrite fileLockedReadWrite;
        private IndexRecord currentIndexRecord;
        private DataOutput currentWriter;
        private boolean fileWriteInProgress;

        public IndexWriter(String source, SpoolConfiguration config, IndexFileManager indexFileManager, IndexReader indexReader, File spoolFolder, File archiveFolder, int rollOverTimeout) {
            this.source = source;
            this.config = config;
            this.indexFileManager = indexFileManager;
            this.indexReader = indexReader;
            this.spoolFolder = spoolFolder;
            this.archiveFolder = archiveFolder;
            this.rollOverTimeout = rollOverTimeout;
            this.fileLockedReadWrite = new FileLockedReadWrite(source);
            this.setCurrent(indexFileManager.getFirstWriteInProgressRecord());
        }

        public void setCurrent(IndexRecord indexRecord) {
            this.currentIndexRecord = indexRecord;
        }

        public IndexRecord getCurrent() {
            return this.currentIndexRecord;
        }

        private void setCurrentWriter(File file) throws IOException {
            this.currentWriter = this.fileLockedReadWrite.getOutput(file);
        }

        public synchronized DataOutput getWriter() {
            return this.currentWriter;
        }

        public synchronized DataOutput getCreateWriter() throws IOException {
            this.rolloverIfNeeded();
            if (this.getCurrent() == null) {
                IndexRecord record = new IndexRecord("");
                String filePath = SpoolUtils.getSpoolFilePath(this.config, this.spoolFolder.toString(), this.archiveFolder.toString(), record.getId());
                record.setPath(filePath);
                this.indexFileManager.appendToIndexFile(record);
                this.setCurrent(record);
                LOG.info("IndexWriter.getCreateWriter(source={}): Creating new spool file. File: {}", (Object)this.source, (Object)filePath);
                this.setCurrentWriter(new File(filePath));
            } else if (this.currentWriter == null) {
                LOG.info("IndexWriter.getCreateWriter(source={}): Opening existing file for append: File: {}", (Object)this.source, (Object)this.currentIndexRecord.getPath());
                this.setCurrentWriter(new File(this.currentIndexRecord.getPath()));
            }
            return this.currentWriter;
        }

        public synchronized void rolloverIfNeeded() {
            if (this.currentWriter != null && this.shouldRolloverSpoolFile()) {
                LOG.info("IndexWriter.rolloverIfNeeded(source={}): Rolling over. Closing File: {}", (Object)this.config.getSourceName(), (Object)this.currentIndexRecord.getPath());
                this.fileLockedReadWrite.close();
                this.currentWriter = null;
                this.currentIndexRecord.setStatusPending();
                this.indexFileManager.updateIndex(this.currentIndexRecord);
                LOG.info("IndexWriter.rolloverIfNeeded(source={}): Adding file to queue. File: {}", (Object)this.config.getSourceName(), (Object)this.currentIndexRecord.getPath());
                this.indexReader.addToPublishQueue(this.currentIndexRecord);
                this.currentIndexRecord = null;
            }
        }

        private boolean shouldRolloverSpoolFile() {
            return this.currentIndexRecord != null && System.currentTimeMillis() - this.currentIndexRecord.getCreated() > (long)this.rollOverTimeout;
        }

        void flushCurrent() throws IOException {
            DataOutput pw = this.getWriter();
            if (pw != null) {
                this.fileLockedReadWrite.flush();
            }
        }

        public void setFileWriteInProgress(boolean val) {
            this.fileWriteInProgress = val;
        }

        public boolean isWriteInProgress() {
            return this.fileWriteInProgress;
        }

        public void stop() {
            block9: {
                LOG.info("==> IndexWriter.stop(source={})", (Object)this.config.getSourceName());
                try {
                    DataOutput out = this.getWriter();
                    if (out == null) break block9;
                    this.flushCurrent();
                    for (int i = 0; i < 3; ++i) {
                        if (this.isWriteInProgress()) {
                            try {
                                TimeUnit.SECONDS.sleep(i);
                                continue;
                            }
                            catch (InterruptedException e) {
                                LOG.error("IndexWriter.stop(source={}): Interrupted!", (Object)this.config.getSourceName(), (Object)e);
                            }
                        } else {
                            LOG.info("IndexWriter.stop(source={}): Closing open file.", (Object)this.config.getSourceName());
                            this.fileLockedReadWrite.close();
                            this.currentIndexRecord.setStatusPending();
                            this.indexFileManager.updateIndex(this.currentIndexRecord);
                        }
                        break;
                    }
                }
                catch (FileNotFoundException e) {
                    LOG.error("IndexWriter.stop(source={}): File not found! {}", new Object[]{this.config.getSourceName(), this.getCurrent().getPath(), e});
                }
                catch (IOException exception) {
                    LOG.error("IndexWriter.stop(source={}): Error accessing file: {}", new Object[]{this.config.getSourceName(), this.getCurrent().getPath(), exception});
                }
                catch (Exception exception) {
                    LOG.error("IndexWriter.stop(source={}): Error closing spool file.", (Object)this.config.getSourceName(), (Object)exception);
                }
            }
            LOG.info("<== IndexWriter.stop(source={})", (Object)this.config.getSourceName());
        }
    }
}

