package org.apache.beam.repackaged.direct_java.runners.fnexecution.environment;

import java.time.Duration;
import java.util.concurrent.TimeoutException;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.direct_java.runners.core.construction.BeamUrns;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.GrpcFnServer;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.repackaged.direct_java.sdk.fn.IdGenerator;
import org.apache.beam.repackaged.direct_java.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/fnexecution/environment/ExternalEnvironmentFactory.class */
public class ExternalEnvironmentFactory implements EnvironmentFactory {
    private static final Logger LOG = LoggerFactory.getLogger(ExternalEnvironmentFactory.class);
    private final GrpcFnServer<FnApiControlClientPoolService> controlServiceServer;
    private final GrpcFnServer<GrpcLoggingService> loggingServiceServer;
    private final GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer;
    private final GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer;
    private final IdGenerator idGenerator;
    private final ControlClientPool.Source clientSource;

    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/fnexecution/environment/ExternalEnvironmentFactory$Provider.class */
    public static class Provider implements EnvironmentFactory.Provider {
        @Override // org.apache.beam.repackaged.direct_java.runners.fnexecution.environment.EnvironmentFactory.Provider
        public EnvironmentFactory createEnvironmentFactory(GrpcFnServer<FnApiControlClientPoolService> grpcFnServer, GrpcFnServer<GrpcLoggingService> grpcFnServer2, GrpcFnServer<ArtifactRetrievalService> grpcFnServer3, GrpcFnServer<StaticGrpcProvisionService> grpcFnServer4, ControlClientPool controlClientPool, IdGenerator idGenerator) {
            return ExternalEnvironmentFactory.create(grpcFnServer, grpcFnServer2, grpcFnServer3, grpcFnServer4, controlClientPool.getSource(), idGenerator);
        }
    }

    public static ExternalEnvironmentFactory create(GrpcFnServer<FnApiControlClientPoolService> grpcFnServer, GrpcFnServer<GrpcLoggingService> grpcFnServer2, GrpcFnServer<ArtifactRetrievalService> grpcFnServer3, GrpcFnServer<StaticGrpcProvisionService> grpcFnServer4, ControlClientPool.Source source, IdGenerator idGenerator) {
        return new ExternalEnvironmentFactory(grpcFnServer, grpcFnServer2, grpcFnServer3, grpcFnServer4, idGenerator, source);
    }

    private ExternalEnvironmentFactory(GrpcFnServer<FnApiControlClientPoolService> grpcFnServer, GrpcFnServer<GrpcLoggingService> grpcFnServer2, GrpcFnServer<ArtifactRetrievalService> grpcFnServer3, GrpcFnServer<StaticGrpcProvisionService> grpcFnServer4, IdGenerator idGenerator, ControlClientPool.Source source) {
        this.controlServiceServer = grpcFnServer;
        this.loggingServiceServer = grpcFnServer2;
        this.retrievalServiceServer = grpcFnServer3;
        this.provisioningServiceServer = grpcFnServer4;
        this.idGenerator = idGenerator;
        this.clientSource = source;
    }

    @Override // org.apache.beam.repackaged.direct_java.runners.fnexecution.environment.EnvironmentFactory
    public RemoteEnvironment createEnvironment(final RunnerApi.Environment environment) throws Exception {
        Preconditions.checkState(environment.getUrn().equals(BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.EXTERNAL)), "The passed environment does not contain an ExternalPayload.");
        RunnerApi.ExternalPayload parseFrom = RunnerApi.ExternalPayload.parseFrom(environment.getPayload());
        String id = this.idGenerator.getId();
        BeamFnApi.NotifyRunnerAvailableRequest build = BeamFnApi.NotifyRunnerAvailableRequest.newBuilder().setWorkerId(id).setControlEndpoint(this.controlServiceServer.getApiServiceDescriptor()).setLoggingEndpoint(this.loggingServiceServer.getApiServiceDescriptor()).setArtifactEndpoint(this.retrievalServiceServer.getApiServiceDescriptor()).setProvisionEndpoint(this.provisioningServiceServer.getApiServiceDescriptor()).putAllParams(parseFrom.getParamsMap()).build();
        LOG.debug("Requesting worker ID {}", id);
        BeamFnApi.NotifyRunnerAvailableResponse notifyRunnerAvailable = BeamFnExternalWorkerPoolGrpc.newBlockingStub(ManagedChannelFactory.createDefault().forDescriptor(parseFrom.getEndpoint())).notifyRunnerAvailable(build);
        if (!notifyRunnerAvailable.getError().isEmpty()) {
            throw new RuntimeException(notifyRunnerAvailable.getError());
        }
        InstructionRequestHandler instructionRequestHandler = null;
        while (instructionRequestHandler == null) {
            try {
                instructionRequestHandler = this.clientSource.take(id, Duration.ofMinutes(2L));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            } catch (TimeoutException e2) {
                LOG.info("Still waiting for startup of environment from {} for worker id {}", parseFrom.getEndpoint().getUrl(), id);
            }
        }
        final InstructionRequestHandler instructionRequestHandler2 = instructionRequestHandler;
        return new RemoteEnvironment() { // from class: org.apache.beam.repackaged.direct_java.runners.fnexecution.environment.ExternalEnvironmentFactory.1
            @Override // org.apache.beam.repackaged.direct_java.runners.fnexecution.environment.RemoteEnvironment
            public RunnerApi.Environment getEnvironment() {
                return environment;
            }

            @Override // org.apache.beam.repackaged.direct_java.runners.fnexecution.environment.RemoteEnvironment
            public InstructionRequestHandler getInstructionRequestHandler() {
                return instructionRequestHandler2;
            }
        };
    }
}
