package de.zalando.paradox.nakadi.consumer.core.http.requests;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import de.zalando.paradox.nakadi.consumer.core.ConsumerConfig;
import de.zalando.paradox.nakadi.consumer.core.domain.EventType;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypeCursor;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypePartition;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypePartitions;
import de.zalando.paradox.nakadi.consumer.core.domain.NakadiPartition;
import de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler;
import de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveReceiver;
import de.zalando.paradox.nakadi.consumer.core.http.HttpResponseChunk;
import de.zalando.paradox.nakadi.consumer.core.http.okhttp.RxHttpRequest;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCoordinator;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionRebalanceListener;
import de.zalando.paradox.nakadi.consumer.core.utils.LoggingUtils;
import de.zalando.paradox.nakadi.consumer.core.utils.ThrowableUtils;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import rx.Observable;

/* loaded from: input_file:de/zalando/paradox/nakadi/consumer/core/http/requests/HttpGetPartitionsHandler.class */
public class HttpGetPartitionsHandler implements HttpReactiveHandler, PartitionRebalanceListener, Closeable {
    private final ConsumerConfig config;
    private final Logger log;
    private final String baseUri;
    private final EventType eventType;
    private PartitionCoordinator coordinator;
    private HttpGetPartitions httpGetPartitions;
    private final ConcurrentMap<String, HttpReactiveReceiver> partitionToReceiver = new ConcurrentHashMap();
    private final AtomicBoolean rebalanceRegistered = new AtomicBoolean(false);

    public HttpGetPartitionsHandler(ConsumerConfig consumerConfig) {
        this.config = consumerConfig;
        this.log = LoggingUtils.getLogger(getClass(), consumerConfig.getEventType());
        this.httpGetPartitions = new HttpGetPartitions(consumerConfig.getNakadiUrl(), consumerConfig.getEventType());
        this.baseUri = consumerConfig.getNakadiUrl();
        this.eventType = consumerConfig.getEventType();
        this.coordinator = consumerConfig.getPartitionCoordinator();
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler
    public void init() {
        if (this.rebalanceRegistered.compareAndSet(false, true)) {
            this.coordinator.registerRebalanceListener(this.eventType, this);
        }
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler
    public void close() {
        if (this.rebalanceRegistered.compareAndSet(true, false)) {
            try {
                List list = (List) this.partitionToReceiver.keySet().stream().map(str -> {
                    return EventTypePartition.of(this.eventType, str);
                }).collect(Collectors.toList());
                this.log.info("Handler close revokes partitions [{}]", list);
                onPartitionsRevoked(list);
            } finally {
                this.coordinator.unregisterRebalanceListener(this.eventType);
            }
        }
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler
    public Logger getLogger(Class<?> cls) {
        return LoggingUtils.getLogger(cls, "partitions", this.eventType);
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler
    public void onResponse(String str) {
        this.log.trace("ResultCallback [{}]", str);
        Optional<List<NakadiPartition>> partitions = getPartitions(str);
        if (partitions.isPresent()) {
            this.coordinator.rebalance(EventTypePartitions.of(this.eventType, this.partitionToReceiver.keySet()), partitions.get());
        }
    }

    private Optional<List<NakadiPartition>> getPartitions(String str) {
        try {
            return Optional.of(this.config.getObjectMapper().readValue(str, new TypeReference<ArrayList<NakadiPartition>>() { // from class: de.zalando.paradox.nakadi.consumer.core.http.requests.HttpGetPartitionsHandler.1
            }));
        } catch (IOException e) {
            this.log.error("Error while parsing partition information", e);
            return Optional.empty();
        }
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionRebalanceListener
    public void onPartitionsAssigned(Collection<EventTypeCursor> collection) {
        this.log.trace("onPartitionsAssigned [{}]", collection);
        collection.forEach(this::startReceiver);
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionRebalanceListener
    public void onPartitionsRevoked(Collection<EventTypePartition> collection) {
        this.log.trace("onPartitionsRevoked [{}]", collection);
        collection.forEach(this::stopReceiver);
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionRebalanceListener
    public void onPartitionsHealthCheck() {
        this.log.trace("onPartitionsHealthCheck");
        this.partitionToReceiver.entrySet().forEach(entry -> {
            HttpReactiveReceiver httpReactiveReceiver = (HttpReactiveReceiver) entry.getValue();
            if (!httpReactiveReceiver.isRunning() || httpReactiveReceiver.isSubscribed()) {
                return;
            }
            EventTypePartition of = EventTypePartition.of(this.eventType, (String) entry.getKey());
            try {
                this.log.warn("Receiver for partition [{}] is running but unsubscribed", of);
                Thread.sleep(200L);
            } catch (InterruptedException e) {
                ThrowableUtils.throwException(e);
            }
            if (!httpReactiveReceiver.isRunning() || httpReactiveReceiver.isSubscribed()) {
                return;
            }
            this.log.warn("Force stop receiver for partition [{}]", of);
            stopReceiver(of);
        });
    }

    private HttpReactiveReceiver stopReceiver(EventTypePartition eventTypePartition) {
        checkEventTypePartition(eventTypePartition);
        HttpReactiveReceiver remove = this.partitionToReceiver.remove(eventTypePartition.getPartition());
        if (null != remove) {
            try {
                this.log.info("Stopping receiver for partition [{}]", eventTypePartition);
                remove.close();
                this.log.info("Receiver for partition [{}] stopped", eventTypePartition);
            } catch (Exception e) {
                this.log.error("Stopping receiver for partition [{}] failed due to [{}]", eventTypePartition, ExceptionUtils.getMessage(e));
            }
        }
        return remove;
    }

    private void startReceiver(EventTypeCursor eventTypeCursor) {
        checkEventTypePartition(eventTypeCursor.getEventTypePartition());
        HttpReactiveReceiver httpReactiveReceiver = this.partitionToReceiver.get(eventTypeCursor.getEventTypePartition().getPartition());
        if (null == httpReactiveReceiver) {
            newReceiver(eventTypeCursor);
            return;
        }
        if (!httpReactiveReceiver.isRunning() || httpReactiveReceiver.isSubscribed()) {
            return;
        }
        try {
            this.log.warn("Receiver for cursor [{}] is running but unsubscribed", eventTypeCursor);
            Thread.sleep(200L);
        } catch (InterruptedException e) {
            ThrowableUtils.throwException(e);
        }
        if (!httpReactiveReceiver.isRunning() || httpReactiveReceiver.isSubscribed()) {
            return;
        }
        this.log.warn("Force restart receiver for cursor [{}]", eventTypeCursor);
        if (httpReactiveReceiver == stopReceiver(eventTypeCursor.getEventTypePartition())) {
            newReceiver(eventTypeCursor);
        }
    }

    private void newReceiver(EventTypeCursor eventTypeCursor) {
        checkEventTypePartition(eventTypeCursor.getEventTypePartition());
        String partition = eventTypeCursor.getEventTypePartition().getPartition();
        HttpReactiveReceiver httpReactiveReceiver = null;
        try {
            httpReactiveReceiver = new HttpReactiveReceiver(new HttpGetEventsHandler(this.baseUri, eventTypeCursor, this.config));
            if (null == this.partitionToReceiver.putIfAbsent(partition, httpReactiveReceiver)) {
                this.log.info("Starting receiver for cursor [{}]", eventTypeCursor);
                httpReactiveReceiver.init();
                this.log.info("Receiver started for cursor [{}]", eventTypeCursor);
            }
        } catch (Exception e) {
            this.log.error("Cannot start receiver for cursor [{}] due to [{}]", eventTypeCursor, ExceptionUtils.getMessage(e));
            try {
                if (null != httpReactiveReceiver) {
                    try {
                        httpReactiveReceiver.close();
                        this.partitionToReceiver.remove(partition);
                    } catch (IOException e2) {
                        this.log.error("Stopping receiver for cursor [{}] failed due to [{}]", eventTypeCursor, ExceptionUtils.getMessage(e2));
                        this.partitionToReceiver.remove(partition);
                    }
                }
            } catch (Throwable th) {
                this.partitionToReceiver.remove(partition);
                throw th;
            }
        }
    }

    private void checkEventTypePartition(EventTypePartition eventTypePartition) {
        Preconditions.checkArgument(this.eventType.equals(eventTypePartition.getEventType()), "Event type mismatch [%s]/[%s]", new Object[]{this.eventType, eventTypePartition.getEventType()});
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler
    public void onErrorResponse(int i, String str) {
        this.log.trace("Error result [{} / {}]", Integer.valueOf(i), str);
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler
    public void onStarted() {
        this.log.trace("Started");
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler
    public void onFinished() {
        this.log.trace("Finished");
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler
    public long getRetryAfterMillis() {
        return this.config.getPartitionsRetryRandomMillis() > 0 ? this.config.getPartitionsRetryAfterMillis() + ThreadLocalRandom.current().nextLong(this.config.getPartitionsRetryRandomMillis()) : this.config.getPartitionsRetryAfterMillis();
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler
    public Observable<HttpResponseChunk> createRequest() {
        return new RxHttpRequest(this.config.getPartitionsTimeoutMillis(), this.config.getAuthorizationValueProvider()).createRequest(this.httpGetPartitions);
    }
}
