package org.vitej.core.protocol.websocket;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.subjects.BehaviorSubject;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.vitej.core.protocol.ProtocolHelper;
import org.vitej.core.protocol.RpcService;
import org.vitej.core.protocol.methods.response.Response;
import org.vitej.core.protocol.methods.response.SubscribeResponse;
import org.vitej.core.protocol.websocket.events.Notification;

/* loaded from: input_file:org/vitej/core/protocol/websocket/WebSocketService.class */
public class WebSocketService implements RpcService {
    private static final Logger log = LoggerFactory.getLogger(WebSocketService.class);
    public static final String DEFAULT_URL = "ws://127.0.0.1:41420";
    private final String url;
    private OkHttpClient client;
    private WebSocket ws;
    private final ObjectMapper objectMapper;
    private Map<Long, WebSocketRequest<?>> requestForId;
    private Map<Long, WebSocketSubscription<?>> subscriptionRequestForId;
    private Map<String, WebSocketSubscription<?>> subscriptionForId;
    private final ScheduledExecutorService executor;
    static final long REQUEST_TIMEOUT = 60;

    public WebSocketService() {
        this(DEFAULT_URL);
    }

    public WebSocketService(String str) {
        this(str, ProtocolHelper.getClient());
    }

    public WebSocketService(String str, OkHttpClient okHttpClient) {
        this.ws = null;
        this.objectMapper = ProtocolHelper.getObjectMapper();
        this.requestForId = new ConcurrentHashMap();
        this.subscriptionRequestForId = new ConcurrentHashMap();
        this.subscriptionForId = new ConcurrentHashMap();
        this.executor = Executors.newScheduledThreadPool(1);
        this.url = str;
        this.client = okHttpClient;
    }

    public void connect() {
        connect(str -> {
        }, th -> {
        }, () -> {
        });
    }

    public void connect(final Consumer<String> consumer, final Consumer<Throwable> consumer2, final Runnable runnable) {
        close();
        this.ws = this.client.newWebSocket(new Request.Builder().url(this.url).build(), new WebSocketListener() { // from class: org.vitej.core.protocol.websocket.WebSocketService.1
            public void onMessage(WebSocket webSocket, String str) {
                try {
                    WebSocketService.this.onWebSocketMessage(str);
                    consumer.accept(str);
                } catch (IOException e) {
                    WebSocketService.log.error("Failed to deal with WebSocket message, {}", str, e);
                    throw new RuntimeException("Failed to deal with WebSocket message", e);
                }
            }

            public void onClosed(WebSocket webSocket, int i, String str) {
                WebSocketService.log.warn("WebSocket closed {} {}", Integer.valueOf(i), str);
                runnable.run();
            }

            public void onFailure(WebSocket webSocket, Throwable th, @Nullable Response response) {
                WebSocketService.log.error("WebSocket error", th);
                consumer2.accept(th);
            }
        });
    }

    @Override // org.vitej.core.protocol.RpcService
    public <T extends org.vitej.core.protocol.methods.response.Response> T send(org.vitej.core.protocol.methods.request.Request request, Class<T> cls) throws IOException {
        try {
            return sendAsync(request, cls).get();
        } catch (InterruptedException e) {
            Thread.interrupted();
            throw new IOException("Interrupted WebSocket request", e);
        } catch (ExecutionException e2) {
            if (e2.getCause() instanceof IOException) {
                throw ((IOException) e2.getCause());
            }
            throw new RuntimeException("Unexpected exception", e2.getCause());
        }
    }

    @Override // org.vitej.core.protocol.RpcService
    public <T extends org.vitej.core.protocol.methods.response.Response> CompletableFuture<T> sendAsync(org.vitej.core.protocol.methods.request.Request request, Class<T> cls) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        long id = request.getId();
        this.requestForId.put(Long.valueOf(id), new WebSocketRequest<>(completableFuture, cls));
        try {
            this.ws.send(this.objectMapper.writeValueAsString(request));
            this.executor.schedule(() -> {
                closeRequest(id, new IOException(String.format("Request %d timed out", Long.valueOf(id))));
            }, REQUEST_TIMEOUT, TimeUnit.SECONDS);
        } catch (IOException e) {
            closeRequest(id, e);
        }
        return completableFuture;
    }

    @Override // org.vitej.core.protocol.RpcService
    public <T extends Notification<?>> Flowable<T> subscribe(org.vitej.core.protocol.methods.request.Request request) {
        BehaviorSubject create = BehaviorSubject.create();
        this.subscriptionRequestForId.put(Long.valueOf(request.getId()), new WebSocketSubscription<>(create, request.getResponseType()));
        try {
            send(request, SubscribeResponse.class);
        } catch (IOException e) {
            log.error("Failed to subscribe to RPC events with request id {}", Long.valueOf(request.getId()));
            create.onError(e);
        }
        return create.doOnDispose(() -> {
            String subscriptionId = getSubscriptionId(create);
            if (subscriptionId != null) {
                this.subscriptionForId.remove(subscriptionId);
            }
        }).toFlowable(BackpressureStrategy.BUFFER);
    }

    @Override // org.vitej.core.protocol.RpcService
    public void close() {
        if (this.ws != null) {
            try {
                this.ws.close(1000, "WebSocket closed");
            } catch (Exception e) {
                log.error("Close WebSocket failed", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onWebSocketMessage(String str) throws IOException {
        try {
            JsonNode readTree = this.objectMapper.readTree(str);
            if (isReply(readTree)) {
                long replyId = getReplyId(readTree);
                WebSocketRequest<?> remove = this.requestForId.remove(Long.valueOf(replyId));
                if (remove == null) {
                    log.error("Received reply for unexpected request id: {}", Long.valueOf(replyId));
                    return;
                }
                try {
                    Object convertValue = this.objectMapper.convertValue(readTree, remove.getResponseType());
                    if (convertValue instanceof SubscribeResponse) {
                        WebSocketSubscription<?> webSocketSubscription = this.subscriptionRequestForId.get(Long.valueOf(replyId));
                        SubscribeResponse subscribeResponse = (SubscribeResponse) convertValue;
                        if (subscribeResponse.hasError()) {
                            Response.Error error = subscribeResponse.getError();
                            log.error("Subscription request returned error: {}", error.getMessage());
                            webSocketSubscription.getSubject().onError(new IOException(String.format("Subscription request failed with error: %s", error.getMessage())));
                        } else {
                            this.subscriptionForId.put(subscribeResponse.getSubscriptionId(), new WebSocketSubscription<>(webSocketSubscription.getSubject(), webSocketSubscription.getResponseType()));
                        }
                    }
                    remove.getOnReply().complete(convertValue);
                } catch (IllegalArgumentException e) {
                    remove.getOnReply().completeExceptionally(new IOException(String.format("Failed to parse '%s' as type %s", str, remove.getResponseType()), e));
                }
            } else {
                if (!isSubscriptionEvent(readTree)) {
                    throw new IOException("Unknown message type");
                }
                String extractSubscriptionId = extractSubscriptionId(readTree);
                WebSocketSubscription<?> webSocketSubscription2 = this.subscriptionForId.get(extractSubscriptionId);
                if (webSocketSubscription2 != null) {
                    webSocketSubscription2.getSubject().onNext(this.objectMapper.convertValue(readTree, webSocketSubscription2.getResponseType()));
                } else {
                    log.warn("No subscriber for WebSocket event with subscription id {}", extractSubscriptionId);
                }
            }
        } catch (IOException e2) {
            throw new IOException("failed to parse WebSocket message", e2);
        }
    }

    private void closeRequest(long j, Exception exc) {
        if (this.requestForId.containsKey(Long.valueOf(j))) {
            CompletableFuture<?> onReply = this.requestForId.get(Long.valueOf(j)).getOnReply();
            this.requestForId.remove(Long.valueOf(j));
            onReply.completeExceptionally(exc);
        }
    }

    private <T extends Notification<?>> String getSubscriptionId(BehaviorSubject<T> behaviorSubject) {
        return (String) this.subscriptionForId.entrySet().stream().filter(entry -> {
            return ((WebSocketSubscription) entry.getValue()).getSubject() == behaviorSubject;
        }).map((v0) -> {
            return v0.getKey();
        }).findFirst().orElse(null);
    }

    private String extractSubscriptionId(JsonNode jsonNode) {
        return jsonNode.get("params").get("subscription").asText();
    }

    private boolean isSubscriptionEvent(JsonNode jsonNode) {
        return jsonNode.has("method");
    }

    private boolean isReply(JsonNode jsonNode) {
        return jsonNode.has("id");
    }

    private long getReplyId(JsonNode jsonNode) throws IOException {
        JsonNode jsonNode2 = jsonNode.get("id");
        if (jsonNode2 == null) {
            throw new IOException("'id' field is missing in the reply");
        }
        if (jsonNode2.isIntegralNumber()) {
            return jsonNode2.longValue();
        }
        throw new IOException(String.format("'id' expected to be long, but it is: '%s'", jsonNode2.asText()));
    }
}
