package org.apache.beam.runners.direct;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation;
import org.apache.beam.repackaged.direct_java.runners.core.metrics.MetricUpdates;
import org.apache.beam.repackaged.direct_java.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/direct/DirectTransformExecutor.class */
class DirectTransformExecutor<T> implements TransformExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(DirectTransformExecutor.class);
    private final TransformEvaluatorRegistry evaluatorRegistry;
    private final Iterable<? extends ModelEnforcementFactory> modelEnforcements;
    private final AppliedPTransform<?, ?, ?> transform;
    private final CommittedBundle<T> inputBundle;
    private final CompletionCallback onComplete;
    private final TransformExecutorService transformEvaluationState;
    private final EvaluationContext context;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/DirectTransformExecutor$Factory.class */
    public static class Factory implements TransformExecutorFactory {
        private final EvaluationContext context;
        private final TransformEvaluatorRegistry registry;
        private final Map<String, Collection<ModelEnforcementFactory>> transformEnforcements;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Factory(EvaluationContext evaluationContext, TransformEvaluatorRegistry transformEvaluatorRegistry, Map<String, Collection<ModelEnforcementFactory>> map) {
            this.context = evaluationContext;
            this.registry = transformEvaluatorRegistry;
            this.transformEnforcements = map;
        }

        @Override // org.apache.beam.runners.direct.TransformExecutorFactory
        public TransformExecutor create(CommittedBundle<?> committedBundle, AppliedPTransform<?, ?, ?> appliedPTransform, CompletionCallback completionCallback, TransformExecutorService transformExecutorService) {
            return new DirectTransformExecutor(this.context, this.registry, (Collection) MoreObjects.firstNonNull(this.transformEnforcements.get(PTransformTranslation.urnForTransform(appliedPTransform.getTransform())), Collections.emptyList()), committedBundle, appliedPTransform, completionCallback, transformExecutorService);
        }
    }

    @VisibleForTesting
    DirectTransformExecutor(EvaluationContext evaluationContext, TransformEvaluatorRegistry transformEvaluatorRegistry, Iterable<? extends ModelEnforcementFactory> iterable, CommittedBundle<T> committedBundle, AppliedPTransform<?, ?, ?> appliedPTransform, CompletionCallback completionCallback, TransformExecutorService transformExecutorService) {
        this.evaluatorRegistry = transformEvaluatorRegistry;
        this.modelEnforcements = iterable;
        this.inputBundle = committedBundle;
        this.transform = appliedPTransform;
        this.onComplete = completionCallback;
        this.transformEvaluationState = transformExecutorService;
        this.context = evaluationContext;
    }

    @Override // java.lang.Runnable
    public void run() {
        MetricsContainerImpl metricsContainerImpl = new MetricsContainerImpl(this.transform.getFullName());
        try {
            try {
                try {
                    Closeable scopedMetricsContainer = MetricsEnvironment.scopedMetricsContainer(metricsContainerImpl);
                    Throwable th = null;
                    try {
                        ArrayList arrayList = new ArrayList();
                        Iterator<? extends ModelEnforcementFactory> it = this.modelEnforcements.iterator();
                        while (it.hasNext()) {
                            arrayList.add(it.next().forBundle(this.inputBundle, this.transform));
                        }
                        TransformEvaluator<T> forApplication = this.evaluatorRegistry.forApplication(this.transform, this.inputBundle);
                        if (forApplication == null) {
                            this.onComplete.handleEmpty(this.transform);
                            if (scopedMetricsContainer != null) {
                                if (0 != 0) {
                                    try {
                                        scopedMetricsContainer.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    scopedMetricsContainer.close();
                                }
                            }
                            return;
                        }
                        processElements(forApplication, metricsContainerImpl, arrayList);
                        finishBundle(forApplication, metricsContainerImpl, arrayList);
                        if (scopedMetricsContainer != null) {
                            if (0 != 0) {
                                try {
                                    scopedMetricsContainer.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                scopedMetricsContainer.close();
                            }
                        }
                        this.context.getMetrics().commitPhysical(this.inputBundle, metricsContainerImpl.getCumulative());
                        this.transformEvaluationState.complete(this);
                    } catch (Throwable th4) {
                        if (scopedMetricsContainer != null) {
                            if (0 != 0) {
                                try {
                                    scopedMetricsContainer.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                scopedMetricsContainer.close();
                            }
                        }
                        throw th4;
                    }
                } catch (Error e) {
                    LOG.error("Error occurred within {}", this, e);
                    this.onComplete.handleError(e);
                    throw e;
                }
            } catch (Exception e2) {
                this.onComplete.handleException(this.inputBundle, e2);
                if (!(e2 instanceof RuntimeException)) {
                    throw new RuntimeException(e2);
                }
                throw ((RuntimeException) e2);
            }
        } finally {
            this.context.getMetrics().commitPhysical(this.inputBundle, metricsContainerImpl.getCumulative());
            this.transformEvaluationState.complete(this);
        }
    }

    private void processElements(TransformEvaluator<T> transformEvaluator, MetricsContainerImpl metricsContainerImpl, Collection<ModelEnforcement<T>> collection) throws Exception {
        if (this.inputBundle != null) {
            for (WindowedValue<T> windowedValue : this.inputBundle.getElements()) {
                Iterator<ModelEnforcement<T>> it = collection.iterator();
                while (it.hasNext()) {
                    it.next().beforeElement(windowedValue);
                }
                transformEvaluator.processElement(windowedValue);
                MetricUpdates updates = metricsContainerImpl.getUpdates();
                if (updates != null) {
                    this.context.getMetrics().updatePhysical(this.inputBundle, updates);
                    metricsContainerImpl.commitUpdates();
                }
                Iterator<ModelEnforcement<T>> it2 = collection.iterator();
                while (it2.hasNext()) {
                    it2.next().afterElement(windowedValue);
                }
            }
        }
    }

    private TransformResult<T> finishBundle(TransformEvaluator<T> transformEvaluator, MetricsContainerImpl metricsContainerImpl, Collection<ModelEnforcement<T>> collection) throws Exception {
        TransformResult<T> withLogicalMetricUpdates = transformEvaluator.finishBundle().withLogicalMetricUpdates(metricsContainerImpl.getCumulative());
        CommittedResult handleResult = this.onComplete.handleResult(this.inputBundle, withLogicalMetricUpdates);
        Iterator<ModelEnforcement<T>> it = collection.iterator();
        while (it.hasNext()) {
            it.next().afterFinish(this.inputBundle, withLogicalMetricUpdates, handleResult.getOutputs());
        }
        return withLogicalMetricUpdates;
    }
}
