package xin.manong.weapon.base.kafka;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:xin/manong/weapon/base/kafka/KafkaConsumer.class */
public class KafkaConsumer implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    private boolean running = false;
    private String name;
    private org.apache.kafka.clients.consumer.KafkaConsumer<byte[], byte[]> consumer;
    private KafkaRecordProcessor processor;
    private KafkaConsumeConfig config;
    private Thread consumeThread;

    public KafkaConsumer(String str, KafkaConsumeConfig kafkaConsumeConfig, KafkaRecordProcessor kafkaRecordProcessor) {
        this.name = str;
        this.config = kafkaConsumeConfig;
        this.processor = kafkaRecordProcessor;
    }

    public boolean start() {
        logger.info("kafka consumer[{}] is starting ...", this.name);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.config.servers);
        properties.put("group.id", this.config.groupId);
        properties.put("enable.auto.commit", "false");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        if (this.config.maxFetchWaitTimeMs != null && this.config.maxFetchWaitTimeMs.longValue() > 0) {
            properties.put("fetch.max.wait.ms", String.valueOf(this.config.maxFetchWaitTimeMs));
        }
        if (this.config.authConfig != null) {
            properties.put(KafkaAuthConfig.SECURITY_PROTOCOL, this.config.authConfig.securityProtocol);
            properties.put(KafkaAuthConfig.SASL_MECHANISM, this.config.authConfig.saslMechanism);
            properties.put(KafkaAuthConfig.SASL_JAAS_CONFIG, this.config.authConfig.saslJaasConfig);
        }
        this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(properties);
        this.running = true;
        this.consumeThread = new Thread(this, this.name);
        this.consumeThread.start();
        logger.info("kafka consumer[{}] has been started", this.name);
        return true;
    }

    public void stop() {
        logger.info("kafka consumer[{}] is stopping ...", this.name);
        this.running = false;
        if (this.consumeThread != null && this.consumeThread.isAlive()) {
            this.consumeThread.interrupt();
        }
        try {
            if (this.consumeThread != null) {
                this.consumeThread.join();
            }
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
        if (this.consumer != null) {
            this.consumer.close();
        }
        logger.info("kafka consumer[{}] has been stopped", this.name);
    }

    @Override // java.lang.Runnable
    public void run() {
        this.consumer.subscribe(this.config.topics);
        while (this.running) {
            try {
                ConsumerRecords poll = this.consumer.poll(Duration.ofSeconds(3L));
                if (poll != null && !poll.isEmpty()) {
                    for (TopicPartition topicPartition : poll.partitions()) {
                        for (ConsumerRecord<byte[], byte[]> consumerRecord : poll.records(topicPartition)) {
                            this.processor.process(consumerRecord);
                            this.consumer.commitAsync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(consumerRecord.offset() + 1)), (map, exc) -> {
                                if (exc == null) {
                                    return;
                                }
                                map.forEach((topicPartition2, offsetAndMetadata) -> {
                                    logger.warn("commit failed for topic[{}], partition[{}] and offset[{}]", new Object[]{topicPartition2.topic(), Integer.valueOf(topicPartition2.partition()), Long.valueOf(offsetAndMetadata.offset())});
                                });
                            });
                        }
                    }
                }
            } catch (Throwable th) {
                logger.error("process kafka message failed");
                logger.error(th.getMessage(), th);
            }
        }
    }
}
