package de.zalando.paradox.nakadi.consumer.core.http;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import de.zalando.paradox.nakadi.consumer.core.utils.ThrowableUtils;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action1;
import rx.schedulers.Schedulers;

/* loaded from: input_file:de/zalando/paradox/nakadi/consumer/core/http/HttpReactiveReceiver.class */
public class HttpReactiveReceiver implements Closeable {
    private final Logger log;
    private final AtomicBoolean running;
    private Subscription subscription;
    private final HttpReactiveHandler httpReactiveHandler;
    private final Scheduler scheduler;

    public HttpReactiveReceiver(HttpReactiveHandler httpReactiveHandler) {
        this.running = new AtomicBoolean(false);
        this.httpReactiveHandler = httpReactiveHandler;
        this.log = httpReactiveHandler.getLogger(getClass());
        this.scheduler = Schedulers.io();
    }

    @VisibleForTesting
    HttpReactiveReceiver(HttpReactiveHandler httpReactiveHandler, Scheduler scheduler) {
        this.running = new AtomicBoolean(false);
        this.httpReactiveHandler = httpReactiveHandler;
        this.log = httpReactiveHandler.getLogger(getClass());
        this.scheduler = scheduler;
    }

    public void init() {
        this.log.info("Starting HTTP event receiver");
        if (!this.running.compareAndSet(false, true)) {
            this.log.info("HTTP reactive receiver is already running");
            return;
        }
        this.httpReactiveHandler.init();
        this.subscription = handleRestart(handleSubscription(this.httpReactiveHandler.createRequest().subscribeOn(this.scheduler).unsubscribeOn(this.scheduler))).subscribe(getAction(), th -> {
            this.log.error("Subscription handler error [{}] / [{}] ", new Object[]{th.getClass().getName(), ExceptionUtils.getMessage(th), th});
        }, () -> {
            this.log.info("Subscription handler completed");
        });
    }

    private <T> Observable<T> handleSubscription(Observable<T> observable) {
        return observable.doOnSubscribe(() -> {
            this.log.debug("Handler subscription started");
            this.httpReactiveHandler.onStarted();
        }).doOnUnsubscribe(() -> {
            this.log.debug("Handler subscription finished");
            this.httpReactiveHandler.onFinished();
        });
    }

    private <T> Observable<T> handleRestart(Observable<T> observable) {
        return observable.retryWhen(observable2 -> {
            return observable2.compose(zipWithFlatMap("retry"));
        }).repeatWhen(observable3 -> {
            return observable3.compose(zipWithFlatMap("repeat"));
        });
    }

    private <T> Observable.Transformer<T, Long> zipWithFlatMap(String str) {
        return observable -> {
            return observable.zipWith(Observable.range(1, Integer.MAX_VALUE), (obj, num) -> {
                if (obj instanceof Throwable) {
                    this.log.warn("Exception [{}]", ExceptionUtils.getMessage((Throwable) obj));
                }
                return num;
            }).flatMap(num2 -> {
                long retryAfterMillis = this.httpReactiveHandler.getRetryAfterMillis();
                Preconditions.checkArgument(retryAfterMillis > 0, "RetryAfterMillis must be greater than 0");
                this.log.debug("Restart after [{}] running [{}] reason [{}] attempt : [{}]", new Object[]{Long.valueOf(retryAfterMillis), Boolean.valueOf(this.running.get()), str, num2});
                return Observable.timer(retryAfterMillis, TimeUnit.MILLISECONDS);
            }).takeUntil(l -> {
                return Boolean.valueOf(!this.running.get());
            });
        };
    }

    private Action1<HttpResponseChunk> getAction() {
        return httpResponseChunk -> {
            if (!this.running.get()) {
                this.log.error("Receiving payload but not running");
                return;
            }
            try {
                if (httpResponseChunk.getStatusCode() == 200) {
                    this.log.trace("Chunk response event [{}]", httpResponseChunk.getContent());
                    this.httpReactiveHandler.onResponse(httpResponseChunk.getContent());
                } else {
                    this.log.error("Chunk response error [{}] / [{}]", Integer.valueOf(httpResponseChunk.getStatusCode()), httpResponseChunk.getContent());
                    this.httpReactiveHandler.onErrorResponse(httpResponseChunk.getStatusCode(), httpResponseChunk.getContent());
                }
            } catch (Throwable th) {
                this.log.error("Unexpected handler error [{}]", ExceptionUtils.getMessage(th));
                ThrowableUtils.throwException(th);
            }
        };
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.log.info("Stopping HTTP event receiver");
        if (!this.running.compareAndSet(true, false)) {
            this.log.debug("HTTP reactive receiver is already stopped");
            return;
        }
        if (null != this.subscription) {
            try {
                this.subscription.unsubscribe();
            } finally {
                this.subscription = null;
            }
        }
        this.httpReactiveHandler.close();
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public boolean isSubscribed() {
        Subscription subscription = this.subscription;
        return (null == subscription || subscription.isUnsubscribed()) ? false : true;
    }
}
