package org.apache.flink.statefun.flink.core.httpfn;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Response;
import okio.Timeout;
import org.apache.flink.kinesis.shaded.org.apache.http.HttpStatus;
import org.apache.flink.statefun.flink.core.backpressure.BoundedExponentialBackoff;
import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
import org.apache.flink.statefun.flink.core.reqreply.ToFunctionRequestSummary;
import org.apache.flink.util.function.RunnableWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.class */
final class RetryingCallback implements Callback {
    private static final Duration INITIAL_BACKOFF_DURATION = Duration.ofMillis(10);
    private static final Set<Integer> RETRYABLE_HTTP_CODES = new HashSet(Arrays.asList(Integer.valueOf(HttpStatus.SC_CONFLICT), Integer.valueOf(HttpStatus.SC_METHOD_FAILURE), Integer.valueOf(HttpStatus.SC_REQUEST_TIMEOUT), 429, 499, 500));
    private static final Logger LOG = LoggerFactory.getLogger(RetryingCallback.class);
    private final CompletableFuture<Response> resultFuture = new CompletableFuture<>();
    private final BoundedExponentialBackoff backoff;
    private final ToFunctionRequestSummary requestSummary;
    private final RemoteInvocationMetrics metrics;
    private final BooleanSupplier isShutdown;
    private long requestStarted;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RetryingCallback(ToFunctionRequestSummary toFunctionRequestSummary, RemoteInvocationMetrics remoteInvocationMetrics, Timeout timeout, BooleanSupplier booleanSupplier) {
        this.backoff = new BoundedExponentialBackoff(INITIAL_BACKOFF_DURATION, duration(timeout));
        this.requestSummary = toFunctionRequestSummary;
        this.metrics = remoteInvocationMetrics;
        this.isShutdown = (BooleanSupplier) Objects.requireNonNull(booleanSupplier);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Response> future() {
        return this.resultFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void attachToCall(Call call) {
        this.requestStarted = System.nanoTime();
        call.enqueue(this);
    }

    @Override // okhttp3.Callback
    public void onFailure(Call call, IOException iOException) {
        tryWithFuture(() -> {
            onFailureUnsafe(call, iOException);
        });
    }

    @Override // okhttp3.Callback
    public void onResponse(Call call, Response response) {
        tryWithFuture(() -> {
            onResponseUnsafe(call, response);
        });
    }

    private void onFailureUnsafe(Call call, IOException iOException) {
        if (this.isShutdown.getAsBoolean()) {
            throw new IllegalStateException("An exception caught during shutdown.", iOException);
        }
        LOG.warn("Retriable exception caught while trying to deliver a message: " + this.requestSummary, iOException);
        this.metrics.remoteInvocationFailures();
        if (!retryAfterApplyingBackoff(call)) {
            throw new IllegalStateException("Maximal request time has elapsed. Last cause is attached", iOException);
        }
    }

    private void onResponseUnsafe(Call call, Response response) {
        if (response.isSuccessful()) {
            this.resultFuture.complete(response);
        } else {
            if (!RETRYABLE_HTTP_CODES.contains(Integer.valueOf(response.code())) && response.code() < 500) {
                throw new IllegalStateException("Non successful HTTP response code " + response.code());
            }
            if (!retryAfterApplyingBackoff(call)) {
                throw new IllegalStateException("Maximal request time has elapsed. Last known error is: invalid HTTP response code " + response.code());
            }
        }
    }

    private boolean retryAfterApplyingBackoff(Call call) {
        if (!this.backoff.applyNow()) {
            return false;
        }
        attachToCall(call.clone());
        return true;
    }

    private void tryWithFuture(RunnableWithException runnableWithException) {
        try {
            endTimingRequest();
            runnableWithException.run();
        } catch (Throwable th) {
            this.resultFuture.completeExceptionally(th);
        }
    }

    private static Duration duration(Timeout timeout) {
        return Duration.ofNanos(timeout.timeoutNanos());
    }

    private void endTimingRequest() {
        this.metrics.remoteInvocationLatency(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.requestStarted));
    }
}
