package de.zalando.paradox.nakadi.consumer.core.http.okhttp;

import com.google.common.base.Stopwatch;
import de.zalando.paradox.nakadi.consumer.core.AuthorizationValueProvider;
import de.zalando.paradox.nakadi.consumer.core.http.HttpGetRequest;
import de.zalando.paradox.nakadi.consumer.core.http.HttpResponseChunk;
import de.zalando.paradox.nakadi.consumer.core.utils.ThrowableUtils;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.Spliterators;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import okhttp3.Call;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;

/* loaded from: input_file:de/zalando/paradox/nakadi/consumer/core/http/okhttp/RxHttpRequest.class */
public class RxHttpRequest {
    private static final Logger LOGGER = LoggerFactory.getLogger(RxHttpRequest.class);
    private static final String AUTHORIZATION_HEADER = "Authorization";
    static final String BATCH_SPLITTER = "\n";
    private final OkHttpClient client;
    private final AuthorizationValueProvider authorizationValueProvider;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:de/zalando/paradox/nakadi/consumer/core/http/okhttp/RxHttpRequest$HttpCall.class */
    public static class HttpCall {
        private final Call call;
        private final Response response;
        private final long threadId = Thread.currentThread().getId();

        HttpCall(Call call, Response response) {
            this.call = call;
            this.response = response;
        }

        void dispose() {
            if (this.threadId == Thread.currentThread().getId()) {
                if (null != this.response.body()) {
                    try {
                        this.response.body().close();
                        return;
                    } catch (Throwable th) {
                        RxHttpRequest.LOGGER.error("Dispose error request [{}]", this.call.request(), th);
                        return;
                    }
                }
                return;
            }
            try {
                if (this.call.isCanceled()) {
                    RxHttpRequest.LOGGER.warn("Already cancelled request [{}]", this.call.request());
                } else {
                    RxHttpRequest.LOGGER.info("Cancel request [{}]", this.call.request());
                    this.call.cancel();
                }
            } catch (Throwable th2) {
                RxHttpRequest.LOGGER.error("Cancel error request [{}]", this.call.request(), th2);
            }
        }
    }

    public RxHttpRequest(long j, @Nullable AuthorizationValueProvider authorizationValueProvider) {
        this.authorizationValueProvider = authorizationValueProvider;
        this.client = new OkHttpClient.Builder().readTimeout(j, TimeUnit.MILLISECONDS).build();
    }

    public Observable<HttpResponseChunk> createRequest(HttpGetRequest httpGetRequest) {
        Func0 func0 = () -> {
            Request request = null;
            try {
                request = getRequest(httpGetRequest.getUrl(), withAuthorization(httpGetRequest.getHeaders()));
                LOGGER.info("Request [{}]", request);
                Call newCall = this.client.newCall(request);
                Response execute = newCall.execute();
                LOGGER.debug("Received response with code [{}] and headers [{}]", Integer.valueOf(execute.code()), execute.headers());
                return new HttpCall(newCall, execute);
            } catch (Throwable th) {
                LOGGER.error("Encountered error while making request [{}] [{}]", request, ExceptionUtils.getMessage(th));
                ThrowableUtils.throwException(th);
                return null;
            }
        };
        Func1 func1 = httpCall -> {
            Scanner useDelimiter = new Scanner(httpCall.response.body().source().inputStream(), "UTF-8").useDelimiter(BATCH_SPLITTER);
            Stream stream = StreamSupport.stream(Spliterators.spliterator(useDelimiter, Long.MAX_VALUE, 272), false);
            useDelimiter.getClass();
            Stream stream2 = (Stream) stream.onClose(useDelimiter::close);
            int code = httpCall.response.code();
            Observable just = code != 200 ? Observable.just(new HttpResponseChunk(code, "")) : Observable.empty();
            stream2.getClass();
            return Observable.from(stream2::iterator).map(str -> {
                if (StringUtils.isNotEmpty(str)) {
                    return new HttpResponseChunk(code, str);
                }
                LOGGER.warn("Received empty content");
                return new HttpResponseChunk(code, "");
            }).switchIfEmpty(just);
        };
        Action1 action1 = httpCall2 -> {
            LOGGER.debug("Dispose request [{}]", httpGetRequest.getUrl());
            httpCall2.dispose();
        };
        AtomicReference atomicReference = new AtomicReference();
        return Observable.using(func0, func1, action1, true).doOnSubscribe(() -> {
            atomicReference.set(Stopwatch.createStarted());
        }).doOnTerminate(() -> {
            Stopwatch stopwatch = (Stopwatch) atomicReference.get();
            LOGGER.info("Processed in [{}] [{}]", null != stopwatch ? stopwatch.stop().toString() : "?", httpGetRequest.getUrl());
        });
    }

    private Map<String, String> withAuthorization(Map<String, String> map) {
        if (null == this.authorizationValueProvider || map.containsKey(AUTHORIZATION_HEADER)) {
            return map;
        }
        HashMap hashMap = new HashMap(map);
        String str = this.authorizationValueProvider.get();
        LOGGER.debug("Authorization header value '{}'", str);
        hashMap.put(AUTHORIZATION_HEADER, str);
        return hashMap;
    }

    private static Request getRequest(URL url, Map<String, String> map) {
        return new Request.Builder().url(url).headers(Headers.of(map)).build();
    }
}
