package org.apache.flink.statefun.flink.core.jsonmodule;

import com.google.protobuf.Message;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
import org.apache.flink.statefun.flink.common.json.Selectors;
import org.apache.flink.statefun.flink.core.protorouter.AutoRoutableProtobufRouter;
import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
import org.apache.flink.statefun.flink.io.kinesis.PolyglotKinesisIOTypes;
import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
import org.apache.flink.statefun.sdk.IngressType;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.class */
final class IngressJsonEntity implements JsonEntity {
    private static final JsonPointer INGRESS_SPECS_POINTER = JsonPointer.compile("/ingresses");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity$MetaPointers.class */
    public static final class MetaPointers {
        private static final JsonPointer ID = JsonPointer.compile("/ingress/meta/id");
        private static final JsonPointer TYPE = JsonPointer.compile("/ingress/meta/type");

        private MetaPointers() {
        }
    }

    @Override // org.apache.flink.statefun.flink.core.jsonmodule.JsonEntity
    public void bind(StatefulFunctionModule.Binder binder, JsonNode jsonNode, FormatVersion formatVersion) {
        Selectors.listAt(jsonNode, INGRESS_SPECS_POINTER).forEach(jsonNode2 -> {
            IngressIdentifier<Message> ingressId = ingressId(jsonNode2);
            IngressType ingressType = ingressType(jsonNode2);
            binder.bindIngress(new JsonIngressSpec(ingressType, ingressId, jsonNode2));
            if (isAutoRoutableIngress(ingressType)) {
                binder.bindIngressRouter(ingressId, new AutoRoutableProtobufRouter());
            }
        });
    }

    private static IngressType ingressType(JsonNode jsonNode) {
        NamespaceNamePair from = NamespaceNamePair.from(Selectors.textAt(jsonNode, MetaPointers.TYPE));
        return new IngressType(from.namespace(), from.name());
    }

    private static IngressIdentifier<Message> ingressId(JsonNode jsonNode) {
        NamespaceNamePair from = NamespaceNamePair.from(Selectors.textAt(jsonNode, MetaPointers.ID));
        return new IngressIdentifier<>(Message.class, from.namespace(), from.name());
    }

    private static boolean isAutoRoutableIngress(IngressType ingressType) {
        return ingressType.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE) || ingressType.equals(PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE);
    }
}
