package net.openhft.chronicle.engine.pubsub;

import com.vaadin.shared.ui.ui.UIConstants;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Publisher;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.pubsub.TopicPublisher;
import net.openhft.chronicle.engine.api.pubsub.TopicSubscriber;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.AssetNotFoundException;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.server.internal.PublisherHandler;
import net.openhft.chronicle.engine.server.internal.TopicPublisherHandler;
import net.openhft.chronicle.network.connection.AbstractAsyncSubscription;
import net.openhft.chronicle.network.connection.AbstractStatelessClient;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.Wires;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:WEB-INF/lib/chronicle-engine-1.13.21.jar:net/openhft/chronicle/engine/pubsub/RemoteTopicPublisher.class */
public class RemoteTopicPublisher<T, M> extends AbstractStatelessClient<TopicPublisherHandler.EventId> implements TopicPublisher<T, M> {
    final Class<T> topicClass;
    final Class<M> messageClass;

    public RemoteTopicPublisher(@NotNull RequestContext requestContext, @NotNull Asset asset) throws AssetNotFoundException {
        super((TcpChannelHub) asset.findView(TcpChannelHub.class), 0L, toUri(requestContext, "topicPublisher"));
        this.topicClass = requestContext.topicType();
        this.messageClass = requestContext.messageType();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RemoteTopicPublisher(@NotNull RequestContext requestContext, @NotNull Asset asset, String str) throws AssetNotFoundException {
        super((TcpChannelHub) asset.findView(TcpChannelHub.class), 0L, toUri(requestContext, str));
        this.topicClass = requestContext.keyType();
        this.messageClass = requestContext.valueType();
    }

    private static String toUri(@NotNull RequestContext requestContext, String str) {
        StringBuilder sb = new StringBuilder(requestContext.fullName() + "?view=" + str);
        if (requestContext.keyType() != String.class) {
            sb.append("&keyType=").append(requestContext.keyType().getName());
        }
        if (requestContext.valueType() != String.class) {
            sb.append("&valueType=").append(requestContext.valueType().getName());
        }
        if (requestContext.dontPersist()) {
            sb.append("&dontPersist=").append(requestContext.dontPersist());
        }
        return sb.toString();
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.TopicPublisher
    public void publish(@NotNull T t, @NotNull M m) {
        checkTopic(t);
        checkMessage(m);
        sendEventAsync(TopicPublisherHandler.EventId.publish, valueOut -> {
            valueOut.marshallable(wireOut -> {
                wireOut.write(TopicPublisherHandler.Params.topic).object(t);
                wireOut.write(TopicPublisherHandler.Params.message).object(m);
            });
        }, true);
    }

    private void checkTopic(@Nullable Object obj) {
        if (obj == null) {
            throw new NullPointerException("topic can not be null");
        }
    }

    private void checkMessage(@Nullable Object obj) {
        if (obj == null) {
            throw new NullPointerException("message can not be null");
        }
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.TopicPublisher
    public void registerTopicSubscriber(@NotNull final TopicSubscriber<T, M> topicSubscriber) throws AssetNotFoundException {
        if (this.hub.outBytesLock().isHeldByCurrentThread()) {
            throw new IllegalStateException("Cannot view map while debugging");
        }
        this.hub.subscribe(new AbstractAsyncSubscription(this.hub, this.csp, "Remote Topic publisher register subscribe") { // from class: net.openhft.chronicle.engine.pubsub.RemoteTopicPublisher.1
            @Override // net.openhft.chronicle.network.connection.AbstractAsyncSubscription
            public void onSubscribe(@NotNull WireOut wireOut) {
                wireOut.writeEventName(PublisherHandler.EventId.registerSubscriber).text("");
            }

            @Override // net.openhft.chronicle.network.connection.AsyncSubscription
            public void onConsumer(@NotNull WireIn wireIn) {
                TopicSubscriber topicSubscriber2 = topicSubscriber;
                wireIn.readDocument(null, wireIn2 -> {
                    StringBuilder acquireStringBuilder = Wires.acquireStringBuilder();
                    ValueIn readEventName = wireIn2.readEventName(acquireStringBuilder);
                    if (TopicPublisherHandler.EventId.onEndOfSubscription.contentEquals(acquireStringBuilder)) {
                        topicSubscriber2.onEndOfSubscription();
                        RemoteTopicPublisher.this.hub.unsubscribe(tid());
                    } else if (CoreFields.reply.contentEquals(acquireStringBuilder)) {
                        readEventName.marshallable(wireIn2 -> {
                            try {
                                RemoteTopicPublisher.this.onEvent(wireIn2.read(() -> {
                                    return "topic";
                                }).object(RemoteTopicPublisher.this.topicClass), wireIn2.read(() -> {
                                    return UIConstants.ATTRIBUTE_NOTIFICATION_MESSAGE;
                                }).object(RemoteTopicPublisher.this.messageClass), topicSubscriber2);
                            } catch (InvalidSubscriberException e) {
                                throw Jvm.rethrow(e);
                            }
                        });
                    }
                });
            }
        });
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.TopicPublisher
    public void unregisterTopicSubscriber(@NotNull TopicSubscriber<T, M> topicSubscriber) {
        throw new UnsupportedOperationException("todo");
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.TopicPublisher
    public Publisher<M> publisher(@NotNull T t) {
        throw new UnsupportedOperationException("tood");
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.TopicPublisher
    public void registerSubscriber(@NotNull T t, @NotNull Subscriber<M> subscriber) {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onEvent(T t, @Nullable M m, @NotNull TopicSubscriber<T, M> topicSubscriber) throws InvalidSubscriberException {
        if (m != null) {
            topicSubscriber.onMessage(t, m);
        }
    }
}
