/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.astyanax.shaded.org.apache.cassandra.streaming;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.netflix.astyanax.shaded.org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import com.netflix.astyanax.shaded.org.apache.cassandra.config.DatabaseDescriptor;
import com.netflix.astyanax.shaded.org.apache.cassandra.db.ColumnFamilyStore;
import com.netflix.astyanax.shaded.org.apache.cassandra.db.Keyspace;
import com.netflix.astyanax.shaded.org.apache.cassandra.db.RowPosition;
import com.netflix.astyanax.shaded.org.apache.cassandra.dht.AbstractBounds;
import com.netflix.astyanax.shaded.org.apache.cassandra.dht.Range;
import com.netflix.astyanax.shaded.org.apache.cassandra.dht.Token;
import com.netflix.astyanax.shaded.org.apache.cassandra.gms.ApplicationState;
import com.netflix.astyanax.shaded.org.apache.cassandra.gms.EndpointState;
import com.netflix.astyanax.shaded.org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import com.netflix.astyanax.shaded.org.apache.cassandra.gms.VersionedValue;
import com.netflix.astyanax.shaded.org.apache.cassandra.io.sstable.Component;
import com.netflix.astyanax.shaded.org.apache.cassandra.io.sstable.Descriptor;
import com.netflix.astyanax.shaded.org.apache.cassandra.io.sstable.SSTableReader;
import com.netflix.astyanax.shaded.org.apache.cassandra.metrics.StreamingMetrics;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.ConnectionHandler;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.ProgressInfo;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.SessionInfo;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.StreamConnectionFactory;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.StreamReceiveTask;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.StreamRequest;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.StreamResultFuture;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.StreamSummary;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.StreamTask;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.StreamTransferTask;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.messages.CompleteMessage;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.messages.FileMessageHeader;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.messages.IncomingFileMessage;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.messages.OutgoingFileMessage;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.messages.PrepareMessage;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.messages.ReceivedMessage;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.messages.RetryMessage;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.messages.SessionFailedMessage;
import com.netflix.astyanax.shaded.org.apache.cassandra.streaming.messages.StreamMessage;
import com.netflix.astyanax.shaded.org.apache.cassandra.utils.FBUtilities;
import com.netflix.astyanax.shaded.org.apache.cassandra.utils.Pair;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamSession
implements IEndpointStateChangeSubscriber {
    private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
    private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher", FBUtilities.getAvailableProcessors());
    public final InetAddress peer;
    public final InetAddress connecting;
    private StreamResultFuture streamResult;
    private final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
    private final Map<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<UUID, StreamTransferTask>();
    private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<UUID, StreamReceiveTask>();
    private final StreamingMetrics metrics;
    private final StreamConnectionFactory factory;
    public final ConnectionHandler handler;
    private int retries;
    private AtomicBoolean isAborted = new AtomicBoolean(false);
    private volatile State state = State.INITIALIZED;
    private volatile boolean completeSent = false;

    public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory) {
        this.peer = peer;
        this.connecting = connecting;
        this.factory = factory;
        this.handler = new ConnectionHandler(this);
        this.metrics = StreamingMetrics.get(connecting);
    }

    public UUID planId() {
        return this.streamResult == null ? null : this.streamResult.planId;
    }

    public String description() {
        return this.streamResult == null ? null : this.streamResult.description;
    }

    public void init(StreamResultFuture streamResult) {
        this.streamResult = streamResult;
    }

    public void start() {
        if (this.requests.isEmpty() && this.transfers.isEmpty()) {
            logger.info("[Stream #{}] Session does not have any tasks.", (Object)this.planId());
            this.closeSession(State.COMPLETE);
            return;
        }
        streamExecutor.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    logger.info("[Stream #{}] Starting streaming to {}{}", new Object[]{StreamSession.this.planId(), StreamSession.this.peer, StreamSession.this.peer.equals(StreamSession.this.connecting) ? "" : " through " + StreamSession.this.connecting});
                    StreamSession.this.handler.initiate();
                    StreamSession.this.onInitializationComplete();
                }
                catch (IOException e) {
                    StreamSession.this.onError(e);
                }
            }
        });
    }

    public Socket createConnection() throws IOException {
        assert (this.factory != null);
        return this.factory.createConnection(this.connecting);
    }

    public void addStreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies) {
        this.requests.add(new StreamRequest(keyspace, ranges, columnFamilies));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables) {
        HashSet<ColumnFamilyStore> stores = new HashSet<ColumnFamilyStore>();
        if (columnFamilies.isEmpty()) {
            stores.addAll(Keyspace.open(keyspace).getColumnFamilyStores());
        } else {
            for (String cf : columnFamilies) {
                stores.add(Keyspace.open(keyspace).getColumnFamilyStore(cf));
            }
        }
        if (flushTables) {
            this.flushSSTables(stores);
        }
        List<Range<Token>> normalizedRanges = Range.normalize(ranges);
        List<SSTableStreamingSections> sections = this.getSSTableSectionsForRanges(normalizedRanges, stores);
        try {
            this.addTransferFiles(sections);
        }
        catch (Throwable throwable) {
            for (SSTableStreamingSections release : sections) {
                release.sstable.releaseReference();
            }
            throw throwable;
        }
        for (SSTableStreamingSections release : sections) {
            release.sstable.releaseReference();
        }
    }

    private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores) {
        ArrayList<SSTableReader> sstables = new ArrayList<SSTableReader>();
        try {
            for (ColumnFamilyStore cfStore : stores) {
                ArrayList<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<AbstractBounds<RowPosition>>(ranges.size());
                for (Range<Token> range : ranges) {
                    rowBoundsList.add(range.toRowBounds());
                }
                ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
                sstables.addAll(view.sstables);
            }
            ArrayList<SSTableStreamingSections> sections = new ArrayList<SSTableStreamingSections>(sstables.size());
            for (SSTableReader sstable : sstables) {
                sections.add(new SSTableStreamingSections(sstable, sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges)));
            }
            return sections;
        }
        catch (Throwable t) {
            SSTableReader.releaseReferences(sstables);
            throw t;
        }
    }

    public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails) {
        Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
        while (iter.hasNext()) {
            SSTableStreamingSections details = iter.next();
            if (details.sections.isEmpty()) {
                details.sstable.releaseReference();
                iter.remove();
                continue;
            }
            UUID cfId = details.sstable.metadata.cfId;
            StreamTransferTask task = this.transfers.get(cfId);
            if (task == null) {
                task = new StreamTransferTask(this, cfId);
                this.transfers.put(cfId, task);
            }
            task.addTransferFile(details.sstable, details.estimatedKeys, details.sections);
            iter.remove();
        }
    }

    private synchronized void closeSession(State finalState) {
        if (this.isAborted.compareAndSet(false, true)) {
            this.state(finalState);
            if (finalState == State.FAILED) {
                for (StreamTask task : Iterables.concat(this.receivers.values(), this.transfers.values())) {
                    task.abort();
                }
            }
            this.handler.close();
            this.streamResult.handleSessionComplete(this);
        }
    }

    public void state(State newState) {
        this.state = newState;
    }

    public State state() {
        return this.state;
    }

    public boolean isSuccess() {
        return this.state == State.COMPLETE;
    }

    public void messageReceived(StreamMessage message) {
        switch (message.type) {
            case PREPARE: {
                PrepareMessage msg = (PrepareMessage)message;
                this.prepare(msg.requests, msg.summaries);
                break;
            }
            case FILE: {
                this.receive((IncomingFileMessage)message);
                break;
            }
            case RECEIVED: {
                ReceivedMessage received = (ReceivedMessage)message;
                this.received(received.cfId, received.sequenceNumber);
                break;
            }
            case RETRY: {
                RetryMessage retry = (RetryMessage)message;
                this.retry(retry.cfId, retry.sequenceNumber);
                break;
            }
            case COMPLETE: {
                this.complete();
                break;
            }
            case SESSION_FAILED: {
                this.sessionFailed();
            }
        }
    }

    public void onInitializationComplete() {
        this.state(State.PREPARING);
        PrepareMessage prepare = new PrepareMessage();
        prepare.requests.addAll(this.requests);
        for (StreamTransferTask task : this.transfers.values()) {
            prepare.summaries.add(task.getSummary());
        }
        this.handler.sendMessage(prepare);
        if (this.requests.isEmpty()) {
            this.startStreamingFiles();
        }
    }

    public void onError(Throwable e) {
        logger.error("[Stream #" + this.planId() + "] Streaming error occurred", e);
        if (this.handler.isOutgoingConnected()) {
            this.handler.sendMessage(new SessionFailedMessage());
        }
        this.closeSession(State.FAILED);
    }

    public void prepare(Collection<StreamRequest> requests, Collection<StreamSummary> summaries) {
        this.state(State.PREPARING);
        for (StreamRequest request : requests) {
            this.addTransferRanges(request.keyspace, request.ranges, request.columnFamilies, true);
        }
        for (StreamSummary summary : summaries) {
            this.prepareReceiving(summary);
        }
        if (!requests.isEmpty()) {
            PrepareMessage prepare = new PrepareMessage();
            for (StreamTransferTask task : this.transfers.values()) {
                prepare.summaries.add(task.getSummary());
            }
            this.handler.sendMessage(prepare);
        }
        if (!this.maybeCompleted()) {
            this.startStreamingFiles();
        }
    }

    public void fileSent(FileMessageHeader header) {
        long headerSize = header.size();
        StreamingMetrics.totalOutgoingBytes.inc(headerSize);
        this.metrics.outgoingBytes.inc(headerSize);
        StreamTransferTask task = this.transfers.get(header.cfId);
        if (task != null) {
            task.scheduleTimeout(header.sequenceNumber, 12L, TimeUnit.HOURS);
        }
    }

    public void receive(IncomingFileMessage message) {
        long headerSize = message.header.size();
        StreamingMetrics.totalIncomingBytes.inc(headerSize);
        this.metrics.incomingBytes.inc(headerSize);
        this.handler.sendMessage(new ReceivedMessage(message.header.cfId, message.header.sequenceNumber));
        this.receivers.get(message.header.cfId).received(message.sstable);
    }

    public void progress(Descriptor desc, ProgressInfo.Direction direction, long bytes, long total) {
        ProgressInfo progress = new ProgressInfo(this.peer, desc.filenameFor(Component.DATA), direction, bytes, total);
        this.streamResult.handleProgress(progress);
    }

    public void received(UUID cfId, int sequenceNumber) {
        this.transfers.get(cfId).complete(sequenceNumber);
    }

    public void retry(UUID cfId, int sequenceNumber) {
        OutgoingFileMessage message = this.transfers.get(cfId).createMessageForRetry(sequenceNumber);
        this.handler.sendMessage(message);
    }

    public synchronized void complete() {
        if (this.state == State.WAIT_COMPLETE) {
            if (!this.completeSent) {
                this.handler.sendMessage(new CompleteMessage());
                this.completeSent = true;
            }
            this.closeSession(State.COMPLETE);
        } else {
            this.state(State.WAIT_COMPLETE);
        }
    }

    public synchronized void sessionFailed() {
        this.closeSession(State.FAILED);
    }

    public void doRetry(FileMessageHeader header, Throwable e) {
        logger.warn("[Stream #" + this.planId() + "] Retrying for following error", e);
        ++this.retries;
        if (this.retries > DatabaseDescriptor.getMaxStreamingRetries()) {
            this.onError(new IOException("Too many retries for " + header, e));
        } else {
            this.handler.sendMessage(new RetryMessage(header.cfId, header.sequenceNumber));
        }
    }

    public SessionInfo getSessionInfo() {
        ArrayList receivingSummaries = Lists.newArrayList();
        for (StreamTask streamTask : this.receivers.values()) {
            receivingSummaries.add(streamTask.getSummary());
        }
        ArrayList transferSummaries = Lists.newArrayList();
        for (StreamTask streamTask : this.transfers.values()) {
            transferSummaries.add(streamTask.getSummary());
        }
        return new SessionInfo(this.peer, this.connecting, receivingSummaries, transferSummaries, this.state);
    }

    public synchronized void taskCompleted(StreamReceiveTask completedTask) {
        this.receivers.remove(completedTask.cfId);
        this.maybeCompleted();
    }

    public synchronized void taskCompleted(StreamTransferTask completedTask) {
        this.transfers.remove(completedTask.cfId);
        this.maybeCompleted();
    }

    @Override
    public void onJoin(InetAddress endpoint, EndpointState epState) {
    }

    @Override
    public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {
    }

    @Override
    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {
    }

    @Override
    public void onAlive(InetAddress endpoint, EndpointState state) {
    }

    @Override
    public void onDead(InetAddress endpoint, EndpointState state) {
    }

    @Override
    public void onRemove(InetAddress endpoint) {
        this.closeSession(State.FAILED);
    }

    @Override
    public void onRestart(InetAddress endpoint, EndpointState epState) {
        this.closeSession(State.FAILED);
    }

    private boolean maybeCompleted() {
        boolean completed;
        boolean bl = completed = this.receivers.isEmpty() && this.transfers.isEmpty();
        if (completed) {
            if (this.state == State.WAIT_COMPLETE) {
                if (!this.completeSent) {
                    this.handler.sendMessage(new CompleteMessage());
                    this.completeSent = true;
                }
                this.closeSession(State.COMPLETE);
            } else {
                this.handler.sendMessage(new CompleteMessage());
                this.completeSent = true;
                this.state(State.WAIT_COMPLETE);
            }
        }
        return completed;
    }

    private void flushSSTables(Iterable<ColumnFamilyStore> stores) {
        ArrayList flushes = new ArrayList();
        for (ColumnFamilyStore cfs : stores) {
            flushes.add(cfs.forceFlush());
        }
        FBUtilities.waitOnFutures(flushes);
    }

    private void prepareReceiving(StreamSummary summary) {
        if (summary.files > 0) {
            this.receivers.put(summary.cfId, new StreamReceiveTask(this, summary.cfId, summary.files, summary.totalSize));
        }
    }

    private void startStreamingFiles() {
        this.streamResult.handleSessionPrepared(this);
        this.state(State.STREAMING);
        for (StreamTransferTask task : this.transfers.values()) {
            Collection<OutgoingFileMessage> messages = task.getFileMessages();
            if (messages.size() > 0) {
                this.handler.sendMessages(messages);
                continue;
            }
            this.taskCompleted(task);
        }
    }

    public static class SSTableStreamingSections {
        public final SSTableReader sstable;
        public final List<Pair<Long, Long>> sections;
        public final long estimatedKeys;

        public SSTableStreamingSections(SSTableReader sstable, List<Pair<Long, Long>> sections, long estimatedKeys) {
            this.sstable = sstable;
            this.sections = sections;
            this.estimatedKeys = estimatedKeys;
        }
    }

    public static enum State {
        INITIALIZED,
        PREPARING,
        STREAMING,
        WAIT_COMPLETE,
        COMPLETE,
        FAILED;

    }
}

