package de.zalando.paradox.nakadi.consumer.core.http.requests;

import de.zalando.paradox.nakadi.consumer.core.ConsumerConfig;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypeCursor;
import de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler;
import de.zalando.paradox.nakadi.consumer.core.http.HttpResponseChunk;
import de.zalando.paradox.nakadi.consumer.core.http.handlers.ResponseHandler;
import de.zalando.paradox.nakadi.consumer.core.http.okhttp.RxHttpRequest;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCommitCallback;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCoordinator;
import de.zalando.paradox.nakadi.consumer.core.utils.LoggingUtils;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import rx.Observable;

/* loaded from: input_file:de/zalando/paradox/nakadi/consumer/core/http/requests/HttpGetEventsHandler.class */
public class HttpGetEventsHandler implements HttpReactiveHandler, PartitionCommitCallback {
    private final Logger log;
    private final ConsumerConfig config;
    private final EventTypeCursor startCursor;
    private HttpGetEvents httpGetEvents;
    private PartitionCoordinator coordinator;
    private ResponseHandler responseHandler;
    private final AtomicBoolean callbackRegistered = new AtomicBoolean(false);

    public HttpGetEventsHandler(String str, EventTypeCursor eventTypeCursor, ConsumerConfig consumerConfig) {
        this.log = LoggingUtils.getLogger(getClass(), eventTypeCursor.getEventTypePartition());
        this.startCursor = eventTypeCursor;
        this.config = consumerConfig;
        this.httpGetEvents = new HttpGetEvents(str, eventTypeCursor, consumerConfig.getEventStreamConfig());
        this.coordinator = consumerConfig.getPartitionCoordinator();
        this.responseHandler = consumerConfig.getResponseHandlerFactory().get(eventTypeCursor.getEventTypePartition(), consumerConfig.getObjectMapper());
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler
    public Logger getLogger(Class<?> cls) {
        return LoggingUtils.getLogger(getClass(), "events", this.startCursor.getEventTypePartition());
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler
    public void onResponse(String str) {
        this.log.trace("ResultCallback : [{}]", str);
        this.responseHandler.onResponse(str);
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler
    public void onErrorResponse(int i, String str) {
        this.log.trace("Error result [{} / {}]", Integer.valueOf(i), str);
        this.coordinator.error(i, str, this.startCursor.getEventTypePartition());
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler
    public void onStarted() {
        this.log.trace("Started");
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler
    public void onFinished() {
        this.log.trace("Finished");
        try {
            this.coordinator.flush(this.startCursor.getEventTypePartition());
        } finally {
            this.coordinator.finished(this.startCursor.getEventTypePartition());
        }
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler
    public void init() {
        if (this.callbackRegistered.compareAndSet(false, true)) {
            this.coordinator.registerCommitCallback(this.startCursor.getEventTypePartition(), this);
        }
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler
    public void close() {
        if (this.callbackRegistered.compareAndSet(true, false)) {
            this.coordinator.unregisterCommitCallback(this.startCursor.getEventTypePartition());
        }
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler
    public long getRetryAfterMillis() {
        return this.config.getEventsRetryRandomMillis() > 0 ? this.config.getEventsRetryAfterMillis() + ThreadLocalRandom.current().nextLong(this.config.getEventsRetryRandomMillis()) : this.config.getEventsRetryAfterMillis();
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler
    public Observable<HttpResponseChunk> createRequest() {
        return new RxHttpRequest(this.config.getEventsTimeoutMillis(), this.config.getAuthorizationValueProvider()).createRequest(this.httpGetEvents);
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCommitCallback
    public void onCommitComplete(EventTypeCursor eventTypeCursor) {
        this.log.trace("onCommitComplete");
        this.httpGetEvents.setOffset(eventTypeCursor.getOffset());
    }
}
