package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.PropertiesUtil;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.class */
public class SimpleConsumerThread<T> extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(SimpleConsumerThread.class);
    private static final KafkaTopicPartitionState<TopicAndPartition> MARKER = Kafka08Fetcher.MARKER;
    private final Kafka08Fetcher<T> owner;
    private final KeyedDeserializationSchema<T> deserializer;
    private final List<KafkaTopicPartitionState<TopicAndPartition>> partitions;
    private final Node broker;
    private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue;
    private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions;
    private final ExceptionProxy errorHandler;
    private final long invalidOffsetBehavior;
    private volatile boolean running = true;
    private volatile SimpleConsumer consumer;
    private final int soTimeout;
    private final int minBytes;
    private final int maxWait;
    private final int fetchSize;
    private final int bufferSize;
    private final int reconnectLimit;

    public SimpleConsumerThread(Kafka08Fetcher<T> kafka08Fetcher, ExceptionProxy exceptionProxy, Properties properties, Node node, List<KafkaTopicPartitionState<TopicAndPartition>> list, ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> closableBlockingQueue, KeyedDeserializationSchema<T> keyedDeserializationSchema, long j) {
        this.owner = kafka08Fetcher;
        this.errorHandler = exceptionProxy;
        this.broker = node;
        checkAllPartitionsHaveDefinedStartingOffsets(list);
        this.partitions = list;
        this.deserializer = (KeyedDeserializationSchema) Objects.requireNonNull(keyedDeserializationSchema);
        this.unassignedPartitions = (ClosableBlockingQueue) Objects.requireNonNull(closableBlockingQueue);
        this.newPartitionsQueue = new ClosableBlockingQueue<>();
        this.invalidOffsetBehavior = j;
        this.soTimeout = PropertiesUtil.getInt(properties, "socket.timeout.ms", 30000);
        this.minBytes = PropertiesUtil.getInt(properties, "fetch.min.bytes", 1);
        this.maxWait = PropertiesUtil.getInt(properties, "fetch.wait.max.ms", 100);
        this.fetchSize = PropertiesUtil.getInt(properties, "fetch.message.max.bytes", 1048576);
        this.bufferSize = PropertiesUtil.getInt(properties, "socket.receive.buffer.bytes", 65536);
        this.reconnectLimit = PropertiesUtil.getInt(properties, "flink.simple-consumer-reconnectLimit", 3);
    }

    public ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> getNewPartitionsQueue() {
        return this.newPartitionsQueue;
    }

    /* JADX WARN: Code restructure failed: missing block: B:164:0x05e0, code lost:
    
        if (r9.newPartitionsQueue.close() != false) goto L117;
     */
    /* JADX WARN: Code restructure failed: missing block: B:166:0x05ed, code lost:
    
        throw new java.lang.Exception("Bug: Cleanly leaving fetcher thread without having a closed queue.");
     */
    /* JADX WARN: Code restructure failed: missing block: B:169:0x05f2, code lost:
    
        if (r9.consumer == null) goto L139;
     */
    /* JADX WARN: Code restructure failed: missing block: B:170:0x065e, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:172:0x05f5, code lost:
    
        r9.consumer.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:174:?, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:175:0x05ff, code lost:
    
        r11 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:176:0x0600, code lost:
    
        org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.LOG.error("Error while closing the Kafka simple consumer", r11);
     */
    /* JADX WARN: Code restructure failed: missing block: B:177:?, code lost:
    
        return;
     */
    @Override // java.lang.Thread, java.lang.Runnable
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void run() {
        /*
            Method dump skipped, instructions count: 1631
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run():void");
    }

    public void cancel() {
        this.running = false;
        if (this.consumer != null) {
            this.consumer.close();
        }
        interrupt();
    }

    private static void requestAndSetSpecificTimeOffsetsFromKafka(SimpleConsumer simpleConsumer, List<KafkaTopicPartitionState<TopicAndPartition>> list, long j) throws IOException {
        HashMap hashMap = new HashMap();
        Iterator<KafkaTopicPartitionState<TopicAndPartition>> it = list.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next().getKafkaPartitionHandle(), new PartitionOffsetRequestInfo(j, 1));
        }
        requestAndSetOffsetsFromKafka(simpleConsumer, list, hashMap);
    }

    private static void requestAndSetEarliestOrLatestOffsetsFromKafka(SimpleConsumer simpleConsumer, List<KafkaTopicPartitionState<TopicAndPartition>> list) throws Exception {
        HashMap hashMap = new HashMap();
        for (KafkaTopicPartitionState<TopicAndPartition> kafkaTopicPartitionState : list) {
            if (kafkaTopicPartitionState.getOffset() == OffsetRequest.EarliestTime() || kafkaTopicPartitionState.getOffset() == OffsetRequest.LatestTime()) {
                hashMap.put(kafkaTopicPartitionState.getKafkaPartitionHandle(), new PartitionOffsetRequestInfo(kafkaTopicPartitionState.getOffset(), 1));
            }
        }
        requestAndSetOffsetsFromKafka(simpleConsumer, list, hashMap);
    }

    private static void requestAndSetOffsetsFromKafka(SimpleConsumer simpleConsumer, List<KafkaTopicPartitionState<TopicAndPartition>> list, Map<TopicAndPartition, PartitionOffsetRequestInfo> map) throws IOException {
        int i = 0;
        while (true) {
            OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(map, OffsetRequest.CurrentVersion(), simpleConsumer.clientId()));
            if (!offsetsBefore.hasError()) {
                for (KafkaTopicPartitionState<TopicAndPartition> kafkaTopicPartitionState : list) {
                    if (map.containsKey(kafkaTopicPartitionState.getKafkaPartitionHandle())) {
                        kafkaTopicPartitionState.setOffset(offsetsBefore.offsets(kafkaTopicPartitionState.getTopic(), kafkaTopicPartitionState.getPartition())[0] - 1);
                    }
                }
                return;
            }
            StringBuilder sb = new StringBuilder();
            for (KafkaTopicPartitionState<TopicAndPartition> kafkaTopicPartitionState2 : list) {
                short errorCode = offsetsBefore.errorCode(kafkaTopicPartitionState2.getTopic(), kafkaTopicPartitionState2.getPartition());
                if (errorCode != ErrorMapping.NoError()) {
                    sb.append("\nException for topic=").append(kafkaTopicPartitionState2.getTopic()).append(" partition=").append(kafkaTopicPartitionState2.getPartition()).append(": ").append(ExceptionUtils.stringifyException(ErrorMapping.exceptionFor(errorCode)));
                }
            }
            i++;
            if (i >= 3) {
                throw new IOException("Unable to get last offset for partitions " + list + ": " + sb.toString());
            }
            LOG.warn("Unable to get last offset for partitions: Exception(s): {}", sb);
        }
    }

    private static void checkAllPartitionsHaveDefinedStartingOffsets(List<KafkaTopicPartitionState<TopicAndPartition>> list) {
        Iterator<KafkaTopicPartitionState<TopicAndPartition>> it = list.iterator();
        while (it.hasNext()) {
            if (!it.next().isOffsetDefined()) {
                throw new IllegalArgumentException("SimpleConsumerThread received a partition with undefined starting offset");
            }
        }
    }
}
