package org.apache.beam.repackaged.direct_java.runners.core.construction.expansion;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.beam.model.expansion.v1.ExpansionApi;
import org.apache.beam.model.expansion.v1.ExpansionServiceGrpc;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.direct_java.runners.core.construction.BeamUrns;
import org.apache.beam.repackaged.direct_java.runners.core.construction.CoderTranslation;
import org.apache.beam.repackaged.direct_java.runners.core.construction.Environments;
import org.apache.beam.repackaged.direct_java.runners.core.construction.JavaReadViaImpulse;
import org.apache.beam.repackaged.direct_java.runners.core.construction.PipelineTranslation;
import org.apache.beam.repackaged.direct_java.runners.core.construction.RehydratedComponents;
import org.apache.beam.repackaged.direct_java.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ProtocolStringList;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ServerBuilder;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.CaseFormat;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Converter;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/construction/expansion/ExpansionService.class */
public class ExpansionService extends ExpansionServiceGrpc.ExpansionServiceImplBase implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ExpansionService.class);
    private Map<String, TransformProvider> registeredTransforms = loadRegisteredTransforms();

    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/construction/expansion/ExpansionService$ExpansionServiceRegistrar.class */
    public interface ExpansionServiceRegistrar {
        Map<String, TransformProvider> knownTransforms();
    }

    @AutoService(ExpansionServiceRegistrar.class)
    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/construction/expansion/ExpansionService$ExternalTransformRegistrarLoader.class */
    public static class ExternalTransformRegistrarLoader implements ExpansionServiceRegistrar {
        @Override // org.apache.beam.repackaged.direct_java.runners.core.construction.expansion.ExpansionService.ExpansionServiceRegistrar
        public Map<String, TransformProvider> knownTransforms() {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            Iterator it = ServiceLoader.load(ExternalTransformRegistrar.class).iterator();
            while (it.hasNext()) {
                for (Map.Entry<String, Class<? extends ExternalTransformBuilder>> entry : ((ExternalTransformRegistrar) it.next()).knownBuilders().entrySet()) {
                    String key = entry.getKey();
                    Class<? extends ExternalTransformBuilder> value = entry.getValue();
                    builder.put(key, functionSpec -> {
                        try {
                            return translate(ExternalTransforms.ExternalConfigurationPayload.parseFrom(functionSpec.getPayload()), value);
                        } catch (Exception e) {
                            throw new RuntimeException(String.format("Failed to build transform %s from spec %s", key, functionSpec), e);
                        }
                    });
                }
            }
            return builder.build();
        }

        private static PTransform translate(ExternalTransforms.ExternalConfigurationPayload externalConfigurationPayload, Class<? extends ExternalTransformBuilder> cls) throws Exception {
            Preconditions.checkState(ExternalTransformBuilder.class.isAssignableFrom(cls), "Provided identifier %s is not an ExternalTransformBuilder.", cls.getName());
            Object initConfiguration = initConfiguration(cls);
            populateConfiguration(initConfiguration, externalConfigurationPayload);
            return buildTransform(cls, initConfiguration);
        }

        private static Object initConfiguration(Class<? extends ExternalTransformBuilder> cls) throws Exception {
            for (Method method : cls.getMethods()) {
                if (method.getName().equals("buildExternal")) {
                    Preconditions.checkState(method.getParameterCount() == 1, "Build method for ExternalTransformBuilder %s must have exactly one parameter, but had %s parameters.", (Object) cls.getSimpleName(), method.getParameterCount());
                    Class<?> cls2 = method.getParameterTypes()[0];
                    if (!Object.class.equals(cls2)) {
                        return cls2.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                    }
                }
            }
            throw new RuntimeException("Couldn't find build method on ExternalTransformBuilder.");
        }

        @VisibleForTesting
        static void populateConfiguration(Object obj, ExternalTransforms.ExternalConfigurationPayload externalConfigurationPayload) throws Exception {
            Converter<String, String> converterTo = CaseFormat.LOWER_UNDERSCORE.converterTo(CaseFormat.LOWER_CAMEL);
            for (Map.Entry<String, ExternalTransforms.ConfigValue> entry : externalConfigurationPayload.getConfigurationMap().entrySet()) {
                String key = entry.getKey();
                ExternalTransforms.ConfigValue value = entry.getValue();
                String convert = converterTo.convert(key);
                ProtocolStringList coderUrnList = value.getCoderUrnList();
                Preconditions.checkArgument(coderUrnList.size() > 0, "No Coder URN provided.");
                Coder resolveCoder = resolveCoder(coderUrnList);
                Class<?> rawType = resolveCoder.getEncodedTypeDescriptor().getRawType();
                String str = "set" + Character.toUpperCase(convert.charAt(0)) + convert.substring(1);
                try {
                    obj.getClass().getMethod(str, rawType).invoke(obj, resolveCoder.decode(entry.getValue().getPayload().newInput()));
                } catch (NoSuchMethodException e) {
                    throw new RuntimeException(String.format("The configuration class %s is missing a setter %s for %s", obj.getClass(), str, convert), e);
                }
            }
        }

        private static Coder resolveCoder(List<String> list) throws Exception {
            Preconditions.checkArgument(list.size() > 0, "No Coder URN provided.");
            RunnerApi.Components.Builder newBuilder = RunnerApi.Components.newBuilder();
            return CoderTranslation.fromProto(buildProto(new ArrayDeque(list), newBuilder), RehydratedComponents.forComponents(newBuilder.build()));
        }

        private static RunnerApi.Coder buildProto(Deque<String> deque, RunnerApi.Components.Builder builder) {
            Preconditions.checkArgument(deque.size() > 0, "No URNs left to construct coder from");
            String pop = deque.pop();
            RunnerApi.Coder.Builder spec = RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(pop).build());
            if (pop.equals(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.ITERABLE))) {
                RunnerApi.Coder buildProto = buildProto(deque, builder);
                String uuid = UUID.randomUUID().toString();
                builder.putCoders(uuid, buildProto);
                spec.addComponentCoderIds(uuid);
            } else if (pop.equals(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.KV))) {
                RunnerApi.Coder buildProto2 = buildProto(deque, builder);
                RunnerApi.Coder buildProto3 = buildProto(deque, builder);
                String uuid2 = UUID.randomUUID().toString();
                String uuid3 = UUID.randomUUID().toString();
                builder.putCoders(uuid2, buildProto2);
                builder.putCoders(uuid3, buildProto3);
                spec.addComponentCoderIds(uuid2);
                spec.addComponentCoderIds(uuid3);
            }
            return spec.build();
        }

        private static PTransform buildTransform(Class<? extends ExternalTransformBuilder> cls, Object obj) throws Exception {
            Constructor<? extends ExternalTransformBuilder> declaredConstructor = cls.getDeclaredConstructor(new Class[0]);
            declaredConstructor.setAccessible(true);
            ExternalTransformBuilder newInstance = declaredConstructor.newInstance(new Object[0]);
            Method method = cls.getMethod("buildExternal", obj.getClass());
            method.setAccessible(true);
            return (PTransform) method.invoke(newInstance, obj);
        }
    }

    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/construction/expansion/ExpansionService$TransformProvider.class */
    public interface TransformProvider<InputT extends PInput, OutputT extends POutput> {
        default InputT createInput(Pipeline pipeline, Map<String, PCollection<?>> map) {
            if (map.size() == 0) {
                return pipeline.begin();
            }
            if (map.size() == 1) {
                return (InputT) Iterables.getOnlyElement(map.values());
            }
            PCollectionTuple empty = PCollectionTuple.empty(pipeline);
            for (Map.Entry<String, PCollection<?>> entry : map.entrySet()) {
                empty = empty.and(new TupleTag(entry.getKey()), entry.getValue());
            }
            return empty;
        }

        PTransform<InputT, OutputT> getTransform(RunnerApi.FunctionSpec functionSpec);

        default Map<String, PCollection<?>> extractOutputs(OutputT outputt) {
            if (outputt instanceof PDone) {
                return Collections.emptyMap();
            }
            if (outputt instanceof PCollection) {
                return ImmutableMap.of("output", (PCollection) outputt);
            }
            if (outputt instanceof PCollectionTuple) {
                return (Map) ((PCollectionTuple) outputt).getAll().entrySet().stream().collect(Collectors.toMap(entry -> {
                    return ((TupleTag) entry.getKey()).toString();
                }, (v0) -> {
                    return v0.getValue();
                }));
            }
            if (!(outputt instanceof PCollectionList)) {
                throw new UnsupportedOperationException("Unknown output type: " + outputt.getClass());
            }
            PCollectionList pCollectionList = (PCollectionList) outputt;
            Stream<Integer> boxed = IntStream.range(0, pCollectionList.size()).boxed();
            Function function = num -> {
                return "output_" + num;
            };
            Objects.requireNonNull(pCollectionList);
            return (Map) boxed.collect(Collectors.toMap(function, (v1) -> {
                return r2.get(v1);
            }));
        }

        /* JADX WARN: Multi-variable type inference failed */
        default Map<String, PCollection<?>> apply(Pipeline pipeline, String str, RunnerApi.FunctionSpec functionSpec, Map<String, PCollection<?>> map) {
            return extractOutputs(Pipeline.applyTransform(str, createInput(pipeline, map), getTransform(functionSpec)));
        }
    }

    private Map<String, TransformProvider> loadRegisteredTransforms() {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Iterator it = ServiceLoader.load(ExpansionServiceRegistrar.class).iterator();
        while (it.hasNext()) {
            builder.putAll(((ExpansionServiceRegistrar) it.next()).knownTransforms());
        }
        return builder.build();
    }

    @VisibleForTesting
    ExpansionApi.ExpansionResponse expand(ExpansionApi.ExpansionRequest expansionRequest) {
        LOG.info("Expanding '{}' with URN '{}'", expansionRequest.getTransform().getUniqueName(), expansionRequest.getTransform().getSpec().getUrn());
        LOG.debug("Full transform: {}", expansionRequest.getTransform());
        Set<String> keySet = expansionRequest.getComponents().getTransformsMap().keySet();
        Pipeline create = Pipeline.create();
        RehydratedComponents withPipeline = RehydratedComponents.forComponents(expansionRequest.getComponents()).withPipeline(create);
        Map<String, PCollection<?>> map = (Map) expansionRequest.getTransform().getInputsMap().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            try {
                return withPipeline.getPCollection((String) entry.getValue());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }));
        if (!this.registeredTransforms.containsKey(expansionRequest.getTransform().getSpec().getUrn())) {
            throw new UnsupportedOperationException("Unknown urn: " + expansionRequest.getTransform().getSpec().getUrn());
        }
        this.registeredTransforms.get(expansionRequest.getTransform().getSpec().getUrn()).apply(create, expansionRequest.getTransform().getUniqueName(), expansionRequest.getTransform().getSpec(), map);
        SdkComponents withNewIdPrefix = withPipeline.getSdkComponents().withNewIdPrefix(expansionRequest.getNamespace());
        withNewIdPrefix.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT);
        create.replaceAll(ImmutableList.of(JavaReadViaImpulse.boundedOverride()));
        RunnerApi.Pipeline proto = PipelineTranslation.toProto(create, withNewIdPrefix);
        String str = (String) Iterables.getOnlyElement((Iterable) proto.getRootTransformIdsList().stream().filter(str2 -> {
            return !keySet.contains(str2);
        }).collect(Collectors.toList()));
        RunnerApi.Components components = proto.getComponents();
        RunnerApi.PTransform build = components.getTransformsOrThrow(str).toBuilder().setUniqueName(str).build();
        LOG.debug("Expanded to {}", build);
        return ExpansionApi.ExpansionResponse.newBuilder().setComponents(components.toBuilder().removeTransforms(str)).setTransform(build).build();
    }

    @Override // org.apache.beam.model.expansion.v1.ExpansionServiceGrpc.ExpansionServiceImplBase
    public void expand(ExpansionApi.ExpansionRequest expansionRequest, StreamObserver<ExpansionApi.ExpansionResponse> streamObserver) {
        try {
            streamObserver.onNext(expand(expansionRequest));
            streamObserver.onCompleted();
        } catch (RuntimeException e) {
            streamObserver.onError(e);
            throw e;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
    }

    /* JADX WARN: Type inference failed for: r0v6, types: [org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ServerBuilder] */
    public static void main(String[] strArr) throws Exception {
        int parseInt = Integer.parseInt(strArr[0]);
        System.out.println("Starting expansion service at localhost:" + parseInt);
        Server build = ServerBuilder.forPort(parseInt).addService(new ExpansionService()).build();
        build.start();
        build.awaitTermination();
    }
}
