package org.apache.flink.statefun.flink.core.translation;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.statefun.flink.common.UnimplementedTypeInfo;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.message.RoutableMessage;
import org.apache.flink.statefun.flink.core.types.StaticallyRegisteredTypes;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/translation/Sources.class */
final class Sources {
    private final DataStream<Message> sourceUnion;

    private Sources(DataStream<Message> dataStream) {
        this.sourceUnion = dataStream;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Sources create(StreamExecutionEnvironment streamExecutionEnvironment, StatefulFunctionsUniverse statefulFunctionsUniverse, StatefulFunctionsConfig statefulFunctionsConfig) {
        return new Sources(union(dataStreamToEnvelopStream(statefulFunctionsUniverse, sourceFunctionToDataStream(streamExecutionEnvironment, ingressToSourceFunction(statefulFunctionsUniverse)), statefulFunctionsConfig).values()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Sources create(StaticallyRegisteredTypes staticallyRegisteredTypes, Iterable<DataStream<RoutableMessage>> iterable) {
        TypeInformation registerType = staticallyRegisteredTypes.registerType(Message.class);
        ArrayList arrayList = new ArrayList();
        for (DataStream<RoutableMessage> dataStream : iterable) {
            dataStream.getTransformation().setOutputType(registerType);
            arrayList.add(dataStream);
        }
        return new Sources(union(arrayList));
    }

    private static Map<IngressIdentifier<?>, DataStream<Message>> dataStreamToEnvelopStream(StatefulFunctionsUniverse statefulFunctionsUniverse, Map<IngressIdentifier<?>, DataStream<?>> map, StatefulFunctionsConfig statefulFunctionsConfig) {
        return new RouterTranslator(statefulFunctionsUniverse, statefulFunctionsConfig).translate(map);
    }

    private static Map<IngressIdentifier<?>, DataStream<?>> sourceFunctionToDataStream(StreamExecutionEnvironment streamExecutionEnvironment, Map<IngressIdentifier<?>, DecoratedSource> map) {
        HashMap hashMap = new HashMap();
        map.forEach((ingressIdentifier, decoratedSource) -> {
            DataStreamSource addSource = streamExecutionEnvironment.addSource(decoratedSource.source);
            addSource.name(decoratedSource.name);
            addSource.uid(decoratedSource.uid);
            eraseTypeInformation(addSource.getTransformation());
            hashMap.put(ingressIdentifier, addSource);
        });
        return hashMap;
    }

    private static void eraseTypeInformation(Transformation<?> transformation) {
        transformation.setOutputType(new UnimplementedTypeInfo());
    }

    private static Map<IngressIdentifier<?>, DecoratedSource> ingressToSourceFunction(StatefulFunctionsUniverse statefulFunctionsUniverse) {
        return new IngressToSourceFunctionTranslator(statefulFunctionsUniverse).translate();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DataStream<Message> unionStream() {
        return this.sourceUnion;
    }

    private static <T> DataStream<T> union(Collection<DataStream<T>> collection) {
        if (collection.isEmpty()) {
            throw new IllegalStateException("There are no routers defined.");
        }
        int size = collection.size();
        Iterator<DataStream<T>> it2 = collection.iterator();
        if (size == 1) {
            return it2.next();
        }
        DataStream<T> next = it2.next();
        DataStream[] dataStreamArr = new DataStream[size - 1];
        for (int i = 0; i < size - 1; i++) {
            dataStreamArr[i] = it2.next();
        }
        return next.union(dataStreamArr);
    }
}
