/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.commitlog;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractCommitLogService {
    private static final long LAG_REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(5L);
    private final Thread thread;
    private volatile boolean shutdown = false;
    protected volatile long lastSyncedAt = System.currentTimeMillis();
    private final AtomicLong written = new AtomicLong(0L);
    protected final AtomicLong pending = new AtomicLong(0L);
    protected final WaitQueue syncComplete = new WaitQueue();
    private final Semaphore haveWork = new Semaphore(1);
    private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);

    AbstractCommitLogService(final CommitLog commitLog, String name, final long pollIntervalMillis) {
        if (pollIntervalMillis < 1L) {
            throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis));
        }
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                long firstLagAt = 0L;
                long totalSyncDuration = 0L;
                long syncExceededIntervalBy = 0L;
                int lagCount = 0;
                int syncCount = 0;
                boolean run = true;
                while (run) {
                    try {
                        run = !AbstractCommitLogService.this.shutdown;
                        long syncStarted = System.currentTimeMillis();
                        commitLog.sync(AbstractCommitLogService.this.shutdown);
                        AbstractCommitLogService.this.lastSyncedAt = syncStarted;
                        AbstractCommitLogService.this.syncComplete.signalAll();
                        long now = System.currentTimeMillis();
                        long sleep = syncStarted + pollIntervalMillis - now;
                        if (sleep < 0L) {
                            if (firstLagAt == 0L) {
                                firstLagAt = now;
                                lagCount = 0;
                                syncCount = 0;
                                totalSyncDuration = syncExceededIntervalBy = (long)0;
                            }
                            syncExceededIntervalBy -= sleep;
                            ++lagCount;
                        }
                        ++syncCount;
                        totalSyncDuration += now - syncStarted;
                        if (firstLagAt > 0L && now - firstLagAt >= LAG_REPORT_INTERVAL) {
                            logger.warn(String.format("Out of %d commit log syncs over the past %ds with average duration of %.2fms, %d have exceeded the configured commit interval by an average of %.2fms", syncCount, (now - firstLagAt) / 1000L, (double)totalSyncDuration / (double)syncCount, lagCount, (double)syncExceededIntervalBy / (double)lagCount));
                            firstLagAt = 0L;
                        }
                        if (sleep < 0L || !run) continue;
                        try {
                            AbstractCommitLogService.this.haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS);
                        }
                        catch (InterruptedException e) {
                            throw new AssertionError();
                        }
                    }
                    catch (Throwable t) {
                        if (!CommitLog.handleCommitError("Failed to persist commits to disk", t)) break;
                        try {
                            AbstractCommitLogService.this.haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS);
                        }
                        catch (InterruptedException e) {
                            throw new AssertionError();
                        }
                    }
                }
            }
        };
        this.thread = new Thread(runnable, name);
        this.thread.start();
    }

    public void finishWriteFor(CommitLogSegment.Allocation alloc) {
        this.maybeWaitForSync(alloc);
        this.written.incrementAndGet();
    }

    protected abstract void maybeWaitForSync(CommitLogSegment.Allocation var1);

    public WaitQueue.Signal requestExtraSync() {
        WaitQueue.Signal signal = this.syncComplete.register();
        this.haveWork.release(1);
        return signal;
    }

    public void shutdown() {
        this.shutdown = true;
        this.haveWork.release(1);
    }

    public void awaitTermination() throws InterruptedException {
        this.thread.join();
    }

    public long getCompletedTasks() {
        return this.written.get();
    }

    public long getPendingTasks() {
        return this.pending.get();
    }
}

