package storm.kafka;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import kafka.javaapi.consumer.SimpleConsumer;

/* loaded from: input_file:storm/kafka/DynamicPartitionConnections.class */
public class DynamicPartitionConnections {
    Map<HostPort, ConnectionInfo> _connections = new HashMap();
    SpoutConfig _config;

    /* loaded from: input_file:storm/kafka/DynamicPartitionConnections$ConnectionInfo.class */
    static class ConnectionInfo {
        SimpleConsumer consumer;
        Set<Integer> partitions = new HashSet();

        public ConnectionInfo(SimpleConsumer simpleConsumer) {
            this.consumer = simpleConsumer;
        }
    }

    public DynamicPartitionConnections(SpoutConfig spoutConfig) {
        this._config = spoutConfig;
    }

    public SimpleConsumer register(HostPort hostPort, int i) {
        if (!this._connections.containsKey(hostPort)) {
            this._connections.put(hostPort, new ConnectionInfo(new SimpleConsumer(hostPort.host, hostPort.port, this._config.socketTimeoutMs, this._config.bufferSizeBytes)));
        }
        ConnectionInfo connectionInfo = this._connections.get(hostPort);
        connectionInfo.partitions.add(Integer.valueOf(i));
        return connectionInfo.consumer;
    }

    public void unregister(HostPort hostPort, int i) {
        ConnectionInfo connectionInfo = this._connections.get(hostPort);
        connectionInfo.partitions.remove(Integer.valueOf(i));
        if (connectionInfo.partitions.size() == 0) {
            connectionInfo.consumer.close();
        }
        this._connections.remove(hostPort);
    }

    public void close() {
        Iterator<ConnectionInfo> it = this._connections.values().iterator();
        while (it.hasNext()) {
            it.next().consumer.close();
        }
        this._connections.clear();
    }
}
