/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.SampledStat;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.DelayedResponseAuthenticationException;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;

public class Selector
implements Selectable,
AutoCloseable {
    public static final long NO_IDLE_TIMEOUT_MS = -1L;
    public static final int NO_FAILED_AUTHENTICATION_DELAY = 0;
    private final Logger log;
    private final java.nio.channels.Selector nioSelector;
    private final Map<String, KafkaChannel> channels;
    private final Set<KafkaChannel> explicitlyMutedChannels;
    private boolean outOfMemory;
    private final List<Send> completedSends;
    private final List<NetworkReceive> completedReceives;
    private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
    private final Set<SelectionKey> immediatelyConnectedKeys;
    private final Map<String, KafkaChannel> closingChannels;
    private Set<SelectionKey> keysWithBufferedRead;
    private final Map<String, ChannelState> disconnected;
    private final List<String> connected;
    private final List<String> failedSends;
    private final Time time;
    private final SelectorMetrics sensors;
    private final ChannelBuilder channelBuilder;
    private final int maxReceiveSize;
    private final boolean recordTimePerConnection;
    private final IdleExpiryManager idleExpiryManager;
    private final LinkedHashMap<String, DelayedAuthenticationFailureClose> delayedClosingChannels;
    private final MemoryPool memoryPool;
    private final long lowMemThreshold;
    private final int failedAuthenticationDelayMs;
    private boolean madeReadProgressLastPoll = true;

    public Selector(int maxReceiveSize, long connectionMaxIdleMs, int failedAuthenticationDelayMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, boolean recordTimePerConnection, ChannelBuilder channelBuilder, MemoryPool memoryPool, LogContext logContext) {
        try {
            this.nioSelector = java.nio.channels.Selector.open();
        }
        catch (IOException e) {
            throw new KafkaException(e);
        }
        this.maxReceiveSize = maxReceiveSize;
        this.time = time;
        this.channels = new HashMap<String, KafkaChannel>();
        this.explicitlyMutedChannels = new HashSet<KafkaChannel>();
        this.outOfMemory = false;
        this.completedSends = new ArrayList<Send>();
        this.completedReceives = new ArrayList<NetworkReceive>();
        this.stagedReceives = new HashMap<KafkaChannel, Deque<NetworkReceive>>();
        this.immediatelyConnectedKeys = new HashSet<SelectionKey>();
        this.closingChannels = new HashMap<String, KafkaChannel>();
        this.keysWithBufferedRead = new HashSet<SelectionKey>();
        this.connected = new ArrayList<String>();
        this.disconnected = new HashMap<String, ChannelState>();
        this.failedSends = new ArrayList<String>();
        this.sensors = new SelectorMetrics(metrics, metricGrpPrefix, metricTags, metricsPerConnection);
        this.channelBuilder = channelBuilder;
        this.recordTimePerConnection = recordTimePerConnection;
        this.idleExpiryManager = connectionMaxIdleMs < 0L ? null : new IdleExpiryManager(time, connectionMaxIdleMs);
        this.memoryPool = memoryPool;
        this.lowMemThreshold = (long)(0.1 * (double)this.memoryPool.size());
        this.log = logContext.logger(Selector.class);
        this.failedAuthenticationDelayMs = failedAuthenticationDelayMs;
        this.delayedClosingChannels = failedAuthenticationDelayMs > 0 ? new LinkedHashMap() : null;
    }

    public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, boolean recordTimePerConnection, ChannelBuilder channelBuilder, MemoryPool memoryPool, LogContext logContext) {
        this(maxReceiveSize, connectionMaxIdleMs, 0, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, recordTimePerConnection, channelBuilder, memoryPool, logContext);
    }

    public Selector(int maxReceiveSize, long connectionMaxIdleMs, int failedAuthenticationDelayMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder, LogContext logContext) {
        this(maxReceiveSize, connectionMaxIdleMs, failedAuthenticationDelayMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder, MemoryPool.NONE, logContext);
    }

    public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder, LogContext logContext) {
        this(maxReceiveSize, connectionMaxIdleMs, 0, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, channelBuilder, logContext);
    }

    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext) {
        this(-1, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.emptyMap(), true, channelBuilder, logContext);
    }

    public Selector(long connectionMaxIdleMS, int failedAuthenticationDelayMs, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext) {
        this(-1, connectionMaxIdleMS, failedAuthenticationDelayMs, metrics, time, metricGrpPrefix, Collections.emptyMap(), true, channelBuilder, logContext);
    }

    @Override
    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
        this.ensureNotRegistered(id);
        SocketChannel socketChannel = SocketChannel.open();
        try {
            this.configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
            boolean connected = this.doConnect(socketChannel, address);
            SelectionKey key = this.registerChannel(id, socketChannel, 8);
            if (connected) {
                this.log.debug("Immediately connected to node {}", (Object)id);
                this.immediatelyConnectedKeys.add(key);
                key.interestOps(0);
            }
        }
        catch (IOException | RuntimeException e) {
            socketChannel.close();
            throw e;
        }
    }

    protected boolean doConnect(SocketChannel channel, InetSocketAddress address) throws IOException {
        try {
            return channel.connect(address);
        }
        catch (UnresolvedAddressException e) {
            throw new IOException("Can't resolve address: " + address, e);
        }
    }

    private void configureSocketChannel(SocketChannel socketChannel, int sendBufferSize, int receiveBufferSize) throws IOException {
        socketChannel.configureBlocking(false);
        Socket socket = socketChannel.socket();
        socket.setKeepAlive(true);
        if (sendBufferSize != -1) {
            socket.setSendBufferSize(sendBufferSize);
        }
        if (receiveBufferSize != -1) {
            socket.setReceiveBufferSize(receiveBufferSize);
        }
        socket.setTcpNoDelay(true);
    }

    public void register(String id, SocketChannel socketChannel) throws IOException {
        this.ensureNotRegistered(id);
        this.registerChannel(id, socketChannel, 1);
        this.sensors.connectionCreated.record();
    }

    private void ensureNotRegistered(String id) {
        if (this.channels.containsKey(id)) {
            throw new IllegalStateException("There is already a connection for id " + id);
        }
        if (this.closingChannels.containsKey(id)) {
            throw new IllegalStateException("There is already a connection for id " + id + " that is still being closed");
        }
    }

    private SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException {
        SelectionKey key = socketChannel.register(this.nioSelector, interestedOps);
        KafkaChannel channel = this.buildAndAttachKafkaChannel(socketChannel, id, key);
        this.channels.put(id, channel);
        if (this.idleExpiryManager != null) {
            this.idleExpiryManager.update(channel.id(), this.time.nanoseconds());
        }
        return key;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private KafkaChannel buildAndAttachKafkaChannel(SocketChannel socketChannel, String id, SelectionKey key) throws IOException {
        try {
            KafkaChannel channel = this.channelBuilder.buildChannel(id, key, this.maxReceiveSize, this.memoryPool);
            key.attach(channel);
            return channel;
        }
        catch (Exception e) {
            try {
                socketChannel.close();
            }
            finally {
                key.cancel();
            }
            throw new IOException("Channel could not be created for socket " + socketChannel, e);
        }
    }

    @Override
    public void wakeup() {
        this.nioSelector.wakeup();
    }

    @Override
    public void close() {
        ArrayList<String> connections = new ArrayList<String>(this.channels.keySet());
        for (String id : connections) {
            this.close(id);
        }
        try {
            this.nioSelector.close();
        }
        catch (IOException | SecurityException e) {
            this.log.error("Exception closing nioSelector:", (Throwable)e);
        }
        this.sensors.close();
        this.channelBuilder.close();
    }

    @Override
    public void send(Send send) {
        block4: {
            String connectionId = send.destination();
            KafkaChannel channel = this.openOrClosingChannelOrFail(connectionId);
            if (this.closingChannels.containsKey(connectionId)) {
                this.failedSends.add(connectionId);
            } else {
                try {
                    channel.setSend(send);
                }
                catch (Exception e) {
                    channel.state(ChannelState.FAILED_SEND);
                    this.failedSends.add(connectionId);
                    this.close(channel, CloseMode.DISCARD_NO_NOTIFY);
                    if (e instanceof CancelledKeyException) break block4;
                    this.log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}", (Object)connectionId, (Object)e);
                    throw e;
                }
            }
        }
    }

    @Override
    public void poll(long timeout) throws IOException {
        boolean dataInBuffers;
        if (timeout < 0L) {
            throw new IllegalArgumentException("timeout should be >= 0");
        }
        boolean madeReadProgressLastCall = this.madeReadProgressLastPoll;
        this.clear();
        boolean bl = dataInBuffers = !this.keysWithBufferedRead.isEmpty();
        if (this.hasStagedReceives() || !this.immediatelyConnectedKeys.isEmpty() || madeReadProgressLastCall && dataInBuffers) {
            timeout = 0L;
        }
        if (!this.memoryPool.isOutOfMemory() && this.outOfMemory) {
            this.log.trace("Broker no longer low on memory - unmuting incoming sockets");
            for (KafkaChannel channel : this.channels.values()) {
                if (!channel.isInMutableState() || this.explicitlyMutedChannels.contains(channel)) continue;
                channel.maybeUnmute();
            }
            this.outOfMemory = false;
        }
        long startSelect = this.time.nanoseconds();
        int numReadyKeys = this.select(timeout);
        long endSelect = this.time.nanoseconds();
        this.sensors.selectTime.record(endSelect - startSelect, this.time.milliseconds());
        if (numReadyKeys > 0 || !this.immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
            Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
            if (dataInBuffers) {
                this.keysWithBufferedRead.removeAll(readyKeys);
                Set<SelectionKey> toPoll = this.keysWithBufferedRead;
                this.keysWithBufferedRead = new HashSet<SelectionKey>();
                this.pollSelectionKeys(toPoll, false, endSelect);
            }
            this.pollSelectionKeys(readyKeys, false, endSelect);
            readyKeys.clear();
            this.pollSelectionKeys(this.immediatelyConnectedKeys, true, endSelect);
            this.immediatelyConnectedKeys.clear();
        } else {
            this.madeReadProgressLastPoll = true;
        }
        long endIo = this.time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, this.time.milliseconds());
        this.completeDelayedChannelClose(endIo);
        this.maybeCloseOldestConnection(endSelect);
        this.addToCompletedReceives();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
        for (SelectionKey key : this.determineHandlingOrder(selectionKeys)) {
            KafkaChannel channel = this.channel(key);
            long channelStartTimeNanos = this.recordTimePerConnection ? this.time.nanoseconds() : 0L;
            boolean sendFailed = false;
            this.sensors.maybeRegisterConnectionMetrics(channel.id());
            if (this.idleExpiryManager != null) {
                this.idleExpiryManager.update(channel.id(), currentTimeNanos);
            }
            try {
                if (isImmediatelyConnected || key.isConnectable()) {
                    if (!channel.finishConnect()) continue;
                    this.connected.add(channel.id());
                    this.sensors.connectionCreated.record();
                    SocketChannel socketChannel = (SocketChannel)key.channel();
                    this.log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}", new Object[]{socketChannel.socket().getReceiveBufferSize(), socketChannel.socket().getSendBufferSize(), socketChannel.socket().getSoTimeout(), channel.id()});
                }
                if (channel.isConnected() && !channel.ready()) {
                    try {
                        channel.prepare();
                    }
                    catch (AuthenticationException e) {
                        this.sensors.failedAuthentication.record();
                        throw e;
                    }
                    if (channel.ready()) {
                        this.sensors.successfulAuthentication.record();
                    }
                }
                this.attemptRead(key, channel);
                if (channel.hasBytesBuffered()) {
                    this.keysWithBufferedRead.add(key);
                }
                if (channel.ready() && key.isWritable()) {
                    Send send;
                    try {
                        send = channel.write();
                    }
                    catch (Exception e) {
                        sendFailed = true;
                        throw e;
                    }
                    if (send != null) {
                        this.completedSends.add(send);
                        this.sensors.recordBytesSent(channel.id(), send.size());
                    }
                }
                if (key.isValid()) continue;
                this.close(channel, CloseMode.GRACEFUL);
            }
            catch (Exception e) {
                String desc = channel.socketDescription();
                if (e instanceof IOException) {
                    this.log.debug("Connection with {} disconnected", (Object)desc, (Object)e);
                } else if (e instanceof AuthenticationException) {
                    this.log.debug("Connection with {} disconnected due to authentication exception", (Object)desc, (Object)e);
                } else {
                    this.log.warn("Unexpected error from {}; closing connection", (Object)desc, (Object)e);
                }
                if (e instanceof DelayedResponseAuthenticationException) {
                    this.maybeDelayCloseOnAuthenticationFailure(channel);
                    continue;
                }
                this.close(channel, sendFailed ? CloseMode.NOTIFY_ONLY : CloseMode.GRACEFUL);
            }
            finally {
                this.maybeRecordTimePerConnection(channel, channelStartTimeNanos);
            }
        }
    }

    private Collection<SelectionKey> determineHandlingOrder(Set<SelectionKey> selectionKeys) {
        if (!this.outOfMemory && this.memoryPool.availableMemory() < this.lowMemThreshold) {
            ArrayList<SelectionKey> shuffledKeys = new ArrayList<SelectionKey>(selectionKeys);
            Collections.shuffle(shuffledKeys);
            return shuffledKeys;
        }
        return selectionKeys;
    }

    private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {
        if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !this.hasStagedReceive(channel) && !this.explicitlyMutedChannels.contains(channel)) {
            NetworkReceive networkReceive;
            while ((networkReceive = channel.read()) != null) {
                this.madeReadProgressLastPoll = true;
                this.addToStagedReceives(channel, networkReceive);
            }
            if (channel.isMute()) {
                this.outOfMemory = true;
            } else {
                this.madeReadProgressLastPoll = true;
            }
        }
    }

    private void maybeRecordTimePerConnection(KafkaChannel channel, long startTimeNanos) {
        if (this.recordTimePerConnection) {
            channel.addNetworkThreadTimeNanos(this.time.nanoseconds() - startTimeNanos);
        }
    }

    @Override
    public List<Send> completedSends() {
        return this.completedSends;
    }

    @Override
    public List<NetworkReceive> completedReceives() {
        return this.completedReceives;
    }

    @Override
    public Map<String, ChannelState> disconnected() {
        return this.disconnected;
    }

    @Override
    public List<String> connected() {
        return this.connected;
    }

    @Override
    public void mute(String id) {
        KafkaChannel channel = this.openOrClosingChannelOrFail(id);
        this.mute(channel);
    }

    private void mute(KafkaChannel channel) {
        channel.mute();
        this.explicitlyMutedChannels.add(channel);
    }

    @Override
    public void unmute(String id) {
        KafkaChannel channel = this.openOrClosingChannelOrFail(id);
        this.unmute(channel);
    }

    private void unmute(KafkaChannel channel) {
        if (channel.maybeUnmute()) {
            this.explicitlyMutedChannels.remove(channel);
        }
    }

    @Override
    public void muteAll() {
        for (KafkaChannel channel : this.channels.values()) {
            this.mute(channel);
        }
    }

    @Override
    public void unmuteAll() {
        for (KafkaChannel channel : this.channels.values()) {
            this.unmute(channel);
        }
    }

    void completeDelayedChannelClose(long currentTimeNanos) {
        DelayedAuthenticationFailureClose delayedClose;
        if (this.delayedClosingChannels == null) {
            return;
        }
        while (!this.delayedClosingChannels.isEmpty() && (delayedClose = this.delayedClosingChannels.values().iterator().next()).tryClose(currentTimeNanos)) {
        }
    }

    private void maybeCloseOldestConnection(long currentTimeNanos) {
        String connectionId;
        KafkaChannel channel;
        if (this.idleExpiryManager == null) {
            return;
        }
        Map.Entry<String, Long> expiredConnection = this.idleExpiryManager.pollExpiredConnection(currentTimeNanos);
        if (expiredConnection != null && (channel = this.channels.get(connectionId = expiredConnection.getKey())) != null) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("About to close the idle connection from {} due to being idle for {} millis", (Object)connectionId, (Object)((currentTimeNanos - expiredConnection.getValue()) / 1000L / 1000L));
            }
            channel.state(ChannelState.EXPIRED);
            this.close(channel, CloseMode.GRACEFUL);
        }
    }

    private void clear() {
        this.completedSends.clear();
        this.completedReceives.clear();
        this.connected.clear();
        this.disconnected.clear();
        Iterator<Map.Entry<String, KafkaChannel>> it = this.closingChannels.entrySet().iterator();
        while (it.hasNext()) {
            KafkaChannel kafkaChannel = it.next().getValue();
            Deque<NetworkReceive> deque = this.stagedReceives.get(kafkaChannel);
            boolean sendFailed = this.failedSends.remove(kafkaChannel.id());
            if (deque != null && !deque.isEmpty() && !sendFailed) continue;
            this.doClose(kafkaChannel, true);
            it.remove();
        }
        for (String string : this.failedSends) {
            this.disconnected.put(string, ChannelState.FAILED_SEND);
        }
        this.failedSends.clear();
        this.madeReadProgressLastPoll = false;
    }

    private int select(long timeoutMs) throws IOException {
        if (timeoutMs < 0L) {
            throw new IllegalArgumentException("timeout should be >= 0");
        }
        if (timeoutMs == 0L) {
            return this.nioSelector.selectNow();
        }
        return this.nioSelector.select(timeoutMs);
    }

    @Override
    public void close(String id) {
        KafkaChannel channel = this.channels.get(id);
        if (channel != null) {
            channel.state(ChannelState.LOCAL_CLOSE);
            this.close(channel, CloseMode.DISCARD_NO_NOTIFY);
        } else {
            KafkaChannel closingChannel = this.closingChannels.remove(id);
            if (closingChannel != null) {
                this.doClose(closingChannel, false);
            }
        }
    }

    private void maybeDelayCloseOnAuthenticationFailure(KafkaChannel channel) {
        DelayedAuthenticationFailureClose delayedClose = new DelayedAuthenticationFailureClose(channel, this.failedAuthenticationDelayMs);
        if (this.delayedClosingChannels != null) {
            this.delayedClosingChannels.put(channel.id(), delayedClose);
        } else {
            delayedClose.closeNow();
        }
    }

    private void handleCloseOnAuthenticationFailure(KafkaChannel channel) {
        try {
            channel.completeCloseOnAuthenticationFailure();
        }
        catch (Exception e) {
            this.log.error("Exception handling close on authentication failure node {}", (Object)channel.id(), (Object)e);
        }
        finally {
            this.close(channel, CloseMode.GRACEFUL);
        }
    }

    private void close(KafkaChannel channel, CloseMode closeMode) {
        channel.disconnect();
        this.connected.remove(channel.id());
        Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
        if (closeMode == CloseMode.GRACEFUL && deque != null && !deque.isEmpty()) {
            this.closingChannels.put(channel.id(), channel);
            this.log.debug("Tracking closing connection {} to process outstanding requests", (Object)channel.id());
        } else {
            this.doClose(channel, closeMode.notifyDisconnect);
        }
        this.channels.remove(channel.id());
        if (this.delayedClosingChannels != null) {
            this.delayedClosingChannels.remove(channel.id());
        }
        if (this.idleExpiryManager != null) {
            this.idleExpiryManager.remove(channel.id());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doClose(KafkaChannel channel, boolean notifyDisconnect) {
        SelectionKey key = channel.selectionKey();
        try {
            this.immediatelyConnectedKeys.remove(key);
            this.keysWithBufferedRead.remove(key);
            channel.close();
        }
        catch (IOException e) {
            this.log.error("Exception closing connection to node {}:", (Object)channel.id(), (Object)e);
        }
        finally {
            key.cancel();
            key.attach(null);
        }
        this.sensors.connectionClosed.record();
        this.stagedReceives.remove(channel);
        this.explicitlyMutedChannels.remove(channel);
        if (notifyDisconnect) {
            this.disconnected.put(channel.id(), channel.state());
        }
    }

    @Override
    public boolean isChannelReady(String id) {
        KafkaChannel channel = this.channels.get(id);
        return channel != null && channel.ready();
    }

    private KafkaChannel openOrClosingChannelOrFail(String id) {
        KafkaChannel channel = this.channels.get(id);
        if (channel == null) {
            channel = this.closingChannels.get(id);
        }
        if (channel == null) {
            throw new IllegalStateException("Attempt to retrieve channel for which there is no connection. Connection id " + id + " existing connections " + this.channels.keySet());
        }
        return channel;
    }

    public List<KafkaChannel> channels() {
        return new ArrayList<KafkaChannel>(this.channels.values());
    }

    public KafkaChannel channel(String id) {
        return this.channels.get(id);
    }

    public KafkaChannel closingChannel(String id) {
        return this.closingChannels.get(id);
    }

    private KafkaChannel channel(SelectionKey key) {
        return (KafkaChannel)key.attachment();
    }

    private boolean hasStagedReceive(KafkaChannel channel) {
        return this.stagedReceives.containsKey(channel);
    }

    private boolean hasStagedReceives() {
        for (KafkaChannel channel : this.stagedReceives.keySet()) {
            if (channel.isMute()) continue;
            return true;
        }
        return false;
    }

    private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
        if (!this.stagedReceives.containsKey(channel)) {
            this.stagedReceives.put(channel, new ArrayDeque());
        }
        Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
        deque.add(receive);
    }

    private void addToCompletedReceives() {
        if (!this.stagedReceives.isEmpty()) {
            Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
                KafkaChannel channel = entry.getKey();
                if (this.explicitlyMutedChannels.contains(channel)) continue;
                Deque<NetworkReceive> deque = entry.getValue();
                this.addToCompletedReceives(channel, deque);
                if (!deque.isEmpty()) continue;
                iter.remove();
            }
        }
    }

    private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
        NetworkReceive networkReceive = stagedDeque.poll();
        this.completedReceives.add(networkReceive);
        this.sensors.recordBytesReceived(channel.id(), networkReceive.size());
    }

    public Set<SelectionKey> keys() {
        return new HashSet<SelectionKey>(this.nioSelector.keys());
    }

    public int numStagedReceives(KafkaChannel channel) {
        Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
        return deque == null ? 0 : deque.size();
    }

    boolean isOutOfMemory() {
        return this.outOfMemory;
    }

    boolean isMadeReadProgressLastPoll() {
        return this.madeReadProgressLastPoll;
    }

    Map<?, ?> delayedClosingChannels() {
        return this.delayedClosingChannels;
    }

    private static class IdleExpiryManager {
        private final Map<String, Long> lruConnections;
        private final long connectionsMaxIdleNanos;
        private long nextIdleCloseCheckTime;

        public IdleExpiryManager(Time time, long connectionsMaxIdleMs) {
            this.connectionsMaxIdleNanos = connectionsMaxIdleMs * 1000L * 1000L;
            this.lruConnections = new LinkedHashMap<String, Long>(16, 0.75f, true);
            this.nextIdleCloseCheckTime = time.nanoseconds() + this.connectionsMaxIdleNanos;
        }

        public void update(String connectionId, long currentTimeNanos) {
            this.lruConnections.put(connectionId, currentTimeNanos);
        }

        public Map.Entry<String, Long> pollExpiredConnection(long currentTimeNanos) {
            if (currentTimeNanos <= this.nextIdleCloseCheckTime) {
                return null;
            }
            if (this.lruConnections.isEmpty()) {
                this.nextIdleCloseCheckTime = currentTimeNanos + this.connectionsMaxIdleNanos;
                return null;
            }
            Map.Entry<String, Long> oldestConnectionEntry = this.lruConnections.entrySet().iterator().next();
            Long connectionLastActiveTime = oldestConnectionEntry.getValue();
            this.nextIdleCloseCheckTime = connectionLastActiveTime + this.connectionsMaxIdleNanos;
            if (currentTimeNanos > this.nextIdleCloseCheckTime) {
                return oldestConnectionEntry;
            }
            return null;
        }

        public void remove(String connectionId) {
            this.lruConnections.remove(connectionId);
        }
    }

    private class DelayedAuthenticationFailureClose {
        private final KafkaChannel channel;
        private final long endTimeNanos;
        private boolean closed;

        public DelayedAuthenticationFailureClose(KafkaChannel channel, int delayMs) {
            this.channel = channel;
            this.endTimeNanos = Selector.this.time.nanoseconds() + (long)delayMs * 1000L * 1000L;
            this.closed = false;
        }

        public final boolean tryClose(long currentTimeNanos) {
            if (this.endTimeNanos <= currentTimeNanos) {
                this.closeNow();
            }
            return this.closed;
        }

        public final void closeNow() {
            if (this.closed) {
                throw new IllegalStateException("Attempt to close a channel that has already been closed");
            }
            Selector.this.handleCloseOnAuthenticationFailure(this.channel);
            this.closed = true;
        }
    }

    private class SelectorMetrics {
        private final Metrics metrics;
        private final String metricGrpPrefix;
        private final Map<String, String> metricTags;
        private final boolean metricsPerConnection;
        public final Sensor connectionClosed;
        public final Sensor connectionCreated;
        public final Sensor successfulAuthentication;
        public final Sensor failedAuthentication;
        public final Sensor bytesTransferred;
        public final Sensor bytesSent;
        public final Sensor bytesReceived;
        public final Sensor selectTime;
        public final Sensor ioTime;
        private final List<MetricName> topLevelMetricNames = new ArrayList<MetricName>();
        private final List<Sensor> sensors = new ArrayList<Sensor>();

        public SelectorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) {
            this.metrics = metrics;
            this.metricGrpPrefix = metricGrpPrefix;
            this.metricTags = metricTags;
            this.metricsPerConnection = metricsPerConnection;
            String metricGrpName = metricGrpPrefix + "-metrics";
            StringBuilder tagsSuffix = new StringBuilder();
            for (Map.Entry<String, String> tag : metricTags.entrySet()) {
                tagsSuffix.append(tag.getKey());
                tagsSuffix.append("-");
                tagsSuffix.append(tag.getValue());
            }
            this.connectionClosed = this.sensor("connections-closed:" + tagsSuffix, new Sensor[0]);
            this.connectionClosed.add(this.createMeter(metrics, metricGrpName, metricTags, "connection-close", "connections closed"));
            this.connectionCreated = this.sensor("connections-created:" + tagsSuffix, new Sensor[0]);
            this.connectionCreated.add(this.createMeter(metrics, metricGrpName, metricTags, "connection-creation", "new connections established"));
            this.successfulAuthentication = this.sensor("successful-authentication:" + tagsSuffix, new Sensor[0]);
            this.successfulAuthentication.add(this.createMeter(metrics, metricGrpName, metricTags, "successful-authentication", "connections with successful authentication"));
            this.failedAuthentication = this.sensor("failed-authentication:" + tagsSuffix, new Sensor[0]);
            this.failedAuthentication.add(this.createMeter(metrics, metricGrpName, metricTags, "failed-authentication", "connections with failed authentication"));
            this.bytesTransferred = this.sensor("bytes-sent-received:" + tagsSuffix, new Sensor[0]);
            this.bytesTransferred.add(this.createMeter(metrics, metricGrpName, metricTags, new Count(), "network-io", "network operations (reads or writes) on all connections"));
            this.bytesSent = this.sensor("bytes-sent:" + tagsSuffix, this.bytesTransferred);
            this.bytesSent.add(this.createMeter(metrics, metricGrpName, metricTags, "outgoing-byte", "outgoing bytes sent to all servers"));
            this.bytesSent.add(this.createMeter(metrics, metricGrpName, metricTags, new Count(), "request", "requests sent"));
            MetricName metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of requests sent.", metricTags);
            this.bytesSent.add(metricName, new Avg());
            metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent.", metricTags);
            this.bytesSent.add(metricName, new Max());
            this.bytesReceived = this.sensor("bytes-received:" + tagsSuffix, this.bytesTransferred);
            this.bytesReceived.add(this.createMeter(metrics, metricGrpName, metricTags, "incoming-byte", "bytes read off all sockets"));
            this.bytesReceived.add(this.createMeter(metrics, metricGrpName, metricTags, new Count(), "response", "responses received"));
            this.selectTime = this.sensor("select-time:" + tagsSuffix, new Sensor[0]);
            this.selectTime.add(this.createMeter(metrics, metricGrpName, metricTags, new Count(), "select", "times the I/O layer checked for new I/O to perform"));
            metricName = metrics.metricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
            this.selectTime.add(metricName, new Avg());
            this.selectTime.add(this.createIOThreadRatioMeter(metrics, metricGrpName, metricTags, "io-wait", "waiting"));
            this.ioTime = this.sensor("io-time:" + tagsSuffix, new Sensor[0]);
            metricName = metrics.metricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
            this.ioTime.add(metricName, new Avg());
            this.ioTime.add(this.createIOThreadRatioMeter(metrics, metricGrpName, metricTags, "io", "doing I/O"));
            metricName = metrics.metricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
            this.topLevelMetricNames.add(metricName);
            this.metrics.addMetric(metricName, (config, now) -> Selector.this.channels.size());
        }

        private Meter createMeter(Metrics metrics, String groupName, Map<String, String> metricTags, SampledStat stat, String baseName, String descriptiveName) {
            MetricName rateMetricName = metrics.metricName(baseName + "-rate", groupName, String.format("The number of %s per second", descriptiveName), metricTags);
            MetricName totalMetricName = metrics.metricName(baseName + "-total", groupName, String.format("The total number of %s", descriptiveName), metricTags);
            if (stat == null) {
                return new Meter(rateMetricName, totalMetricName);
            }
            return new Meter(stat, rateMetricName, totalMetricName);
        }

        private Meter createMeter(Metrics metrics, String groupName, Map<String, String> metricTags, String baseName, String descriptiveName) {
            return this.createMeter(metrics, groupName, metricTags, null, baseName, descriptiveName);
        }

        private Meter createIOThreadRatioMeter(Metrics metrics, String groupName, Map<String, String> metricTags, String baseName, String action) {
            MetricName rateMetricName = metrics.metricName(baseName + "-ratio", groupName, String.format("The fraction of time the I/O thread spent %s", action), metricTags);
            MetricName totalMetricName = metrics.metricName(baseName + "time-total", groupName, String.format("The total time the I/O thread spent %s", action), metricTags);
            return new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName);
        }

        private Sensor sensor(String name, Sensor ... parents) {
            Sensor sensor = this.metrics.sensor(name, parents);
            this.sensors.add(sensor);
            return sensor;
        }

        public void maybeRegisterConnectionMetrics(String connectionId) {
            String nodeRequestName;
            Sensor nodeRequest;
            if (!connectionId.isEmpty() && this.metricsPerConnection && (nodeRequest = this.metrics.getSensor(nodeRequestName = "node-" + connectionId + ".bytes-sent")) == null) {
                String metricGrpName = this.metricGrpPrefix + "-node-metrics";
                LinkedHashMap<String, String> tags = new LinkedHashMap<String, String>(this.metricTags);
                tags.put("node-id", "node-" + connectionId);
                nodeRequest = this.sensor(nodeRequestName, new Sensor[0]);
                nodeRequest.add(this.createMeter(this.metrics, metricGrpName, tags, "outgoing-byte", "outgoing bytes"));
                nodeRequest.add(this.createMeter(this.metrics, metricGrpName, tags, new Count(), "request", "requests sent"));
                MetricName metricName = this.metrics.metricName("request-size-avg", metricGrpName, "The average size of requests sent.", tags);
                nodeRequest.add(metricName, new Avg());
                metricName = this.metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent.", tags);
                nodeRequest.add(metricName, new Max());
                String nodeResponseName = "node-" + connectionId + ".bytes-received";
                Sensor nodeResponse = this.sensor(nodeResponseName, new Sensor[0]);
                nodeResponse.add(this.createMeter(this.metrics, metricGrpName, tags, "incoming-byte", "incoming bytes"));
                nodeResponse.add(this.createMeter(this.metrics, metricGrpName, tags, new Count(), "response", "responses received"));
                String nodeTimeName = "node-" + connectionId + ".latency";
                Sensor nodeRequestTime = this.sensor(nodeTimeName, new Sensor[0]);
                metricName = this.metrics.metricName("request-latency-avg", metricGrpName, tags);
                nodeRequestTime.add(metricName, new Avg());
                metricName = this.metrics.metricName("request-latency-max", metricGrpName, tags);
                nodeRequestTime.add(metricName, new Max());
            }
        }

        public void recordBytesSent(String connectionId, long bytes) {
            String nodeRequestName;
            Sensor nodeRequest;
            long now = Selector.this.time.milliseconds();
            this.bytesSent.record(bytes, now);
            if (!connectionId.isEmpty() && (nodeRequest = this.metrics.getSensor(nodeRequestName = "node-" + connectionId + ".bytes-sent")) != null) {
                nodeRequest.record(bytes, now);
            }
        }

        public void recordBytesReceived(String connection, int bytes) {
            String nodeRequestName;
            Sensor nodeRequest;
            long now = Selector.this.time.milliseconds();
            this.bytesReceived.record(bytes, now);
            if (!connection.isEmpty() && (nodeRequest = this.metrics.getSensor(nodeRequestName = "node-" + connection + ".bytes-received")) != null) {
                nodeRequest.record(bytes, now);
            }
        }

        public void close() {
            for (MetricName metricName : this.topLevelMetricNames) {
                this.metrics.removeMetric(metricName);
            }
            for (Sensor sensor : this.sensors) {
                this.metrics.removeSensor(sensor.name());
            }
        }
    }

    private static enum CloseMode {
        GRACEFUL(true),
        NOTIFY_ONLY(true),
        DISCARD_NO_NOTIFY(false);

        boolean notifyDisconnect;

        private CloseMode(boolean notifyDisconnect) {
            this.notifyDisconnect = notifyDisconnect;
        }
    }
}

