package org.apache.beam.runners.direct;

import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.beam.repackaged.direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.direct_java.runners.core.KeyedWorkItems;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespace;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.direct_java.runners.core.StateTags;
import org.apache.beam.repackaged.direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.ParDoMultiOverrideFactory;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.class */
public final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements TransformEvaluatorFactory {
    private final LoadingCache<AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT>, Runnable> cleanupRegistry;
    private final ParDoEvaluatorFactory<KV<K, InputT>, OutputT> delegateFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory$AppliedPTransformOutputKeyAndWindow.class */
    public static abstract class AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> {
        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract AppliedPTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, ParDoMultiOverrideFactory.StatefulParDo<K, InputT, OutputT>> getTransform();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract StructuralKey<K> getKey();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract BoundedWindow getWindow();

        static <K, InputT, OutputT> AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> create(AppliedPTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, ParDoMultiOverrideFactory.StatefulParDo<K, InputT, OutputT>> appliedPTransform, StructuralKey<K> structuralKey, BoundedWindow boundedWindow) {
            return new AutoValue_StatefulParDoEvaluatorFactory_AppliedPTransformOutputKeyAndWindow(appliedPTransform, structuralKey, boundedWindow);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory$CleanupSchedulingLoader.class */
    private class CleanupSchedulingLoader extends CacheLoader<AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT>, Runnable> {
        private final EvaluationContext evaluationContext;

        public CleanupSchedulingLoader(EvaluationContext evaluationContext) {
            this.evaluationContext = evaluationContext;
        }

        @Override // org.apache.beam.vendor.guava.v20_0.com.google.common.cache.CacheLoader
        public Runnable load(AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> appliedPTransformOutputKeyAndWindow) {
            String stepName = this.evaluationContext.getStepName(appliedPTransformOutputKeyAndWindow.getTransform());
            HashMap hashMap = new HashMap();
            for (Map.Entry<TupleTag<?>, PValue> entry : appliedPTransformOutputKeyAndWindow.getTransform().getOutputs().entrySet()) {
                hashMap.put(entry.getKey(), (PCollection) entry.getValue());
            }
            WindowingStrategy<?, ?> windowingStrategy = ((PCollection) hashMap.get(appliedPTransformOutputKeyAndWindow.getTransform().getTransform().getMainOutputTag())).getWindowingStrategy();
            BoundedWindow window = appliedPTransformOutputKeyAndWindow.getWindow();
            DoFn<KV<K, InputT>, OutputT> doFn = appliedPTransformOutputKeyAndWindow.getTransform().getTransform().getDoFn();
            DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
            DirectExecutionContext.DirectStepContext stepContext = this.evaluationContext.getExecutionContext(appliedPTransformOutputKeyAndWindow.getTransform(), appliedPTransformOutputKeyAndWindow.getKey()).getStepContext(stepName);
            StateNamespace window2 = StateNamespaces.window(windowingStrategy.getWindowFn().windowCoder(), window);
            Runnable runnable = () -> {
                for (DoFnSignature.StateDeclaration stateDeclaration : signature.stateDeclarations().values()) {
                    try {
                        stepContext.stateInternals().state(window2, StateTags.tagForSpec(stateDeclaration.id(), (StateSpec) stateDeclaration.field().get(doFn))).clear();
                    } catch (IllegalAccessException e) {
                        throw new RuntimeException(String.format("Error accessing %s for %s", StateSpec.class.getName(), doFn.getClass().getName()), e);
                    }
                }
                StatefulParDoEvaluatorFactory.this.cleanupRegistry.invalidate(appliedPTransformOutputKeyAndWindow);
            };
            this.evaluationContext.scheduleAfterWindowExpiration(appliedPTransformOutputKeyAndWindow.getTransform(), window, windowingStrategy, runnable);
            return runnable;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory$StatefulParDoEvaluator.class */
    public static class StatefulParDoEvaluator<K, InputT> implements TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> {
        private final DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator;

        public StatefulParDoEvaluator(DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> doFnLifecycleManagerRemovingTransformEvaluator) {
            this.delegateEvaluator = doFnLifecycleManagerRemovingTransformEvaluator;
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> windowedValue) throws Exception {
            Iterator<WindowedValue<KV<K, InputT>>> it = windowedValue.getValue().elementsIterable().iterator();
            while (it.hasNext()) {
                this.delegateEvaluator.processElement(it.next());
            }
            for (TimerInternals.TimerData timerData : windowedValue.getValue().timersIterable()) {
                Preconditions.checkState(timerData.getNamespace() instanceof StateNamespaces.WindowNamespace, "Expected Timer %s to be in a %s, but got %s", timerData, StateNamespaces.WindowNamespace.class.getSimpleName(), timerData.getNamespace().getClass().getName());
                this.delegateEvaluator.onTimer(timerData, ((StateNamespaces.WindowNamespace) timerData.getNamespace()).getWindow());
            }
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public TransformResult<KeyedWorkItem<K, KV<K, InputT>>> finishBundle() throws Exception {
            TransformResult<KV<K, InputT>> finishBundle = this.delegateEvaluator.finishBundle();
            StepTransformResult.Builder<InputT> addOutput = StepTransformResult.withHold(finishBundle.getTransform(), finishBundle.getWatermarkHold()).withTimerUpdate(finishBundle.getTimerUpdate()).withState(finishBundle.getState()).withMetricUpdates(finishBundle.getLogicalMetricUpdates()).addOutput(Lists.newArrayList(finishBundle.getOutputBundles()));
            for (WindowedValue<KV<K, InputT>> windowedValue : finishBundle.getUnprocessedElements()) {
                addOutput.addUnprocessedElements(windowedValue.withValue(KeyedWorkItems.elementsWorkItem(windowedValue.getValue().getKey(), Collections.singleton(windowedValue))));
            }
            return addOutput.build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StatefulParDoEvaluatorFactory(EvaluationContext evaluationContext, PipelineOptions pipelineOptions) {
        this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext, ParDoEvaluator.defaultRunnerFactory(), new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>() { // from class: org.apache.beam.runners.direct.StatefulParDoEvaluatorFactory.1
            @Override // org.apache.beam.vendor.guava.v20_0.com.google.common.cache.CacheLoader
            public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> appliedPTransform) throws Exception {
                return DoFnLifecycleManager.of(((ParDoMultiOverrideFactory.StatefulParDo) appliedPTransform.getTransform()).getDoFn());
            }
        }, pipelineOptions);
        this.cleanupRegistry = (LoadingCache<AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT>, Runnable>) CacheBuilder.newBuilder().weakValues().build(new CleanupSchedulingLoader(evaluationContext));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public <T> TransformEvaluator<T> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, CommittedBundle<?> committedBundle) throws Exception {
        return createEvaluator(appliedPTransform, committedBundle);
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public void cleanup() throws Exception {
        this.delegateFactory.cleanup();
    }

    private TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> createEvaluator(AppliedPTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, ParDoMultiOverrideFactory.StatefulParDo<K, InputT, OutputT>> appliedPTransform, CommittedBundle<KeyedWorkItem<K, KV<K, InputT>>> committedBundle) throws Exception {
        if (DoFnSignatures.getSignature(appliedPTransform.getTransform().getDoFn().getClass()).stateDeclarations().size() > 0) {
            Iterator<WindowedValue<KeyedWorkItem<K, KV<K, InputT>>>> it = committedBundle.getElements().iterator();
            while (it.hasNext()) {
                Iterator<? extends BoundedWindow> it2 = it.next().getWindows().iterator();
                while (it2.hasNext()) {
                    this.cleanupRegistry.get(AppliedPTransformOutputKeyAndWindow.create(appliedPTransform, committedBundle.getKey(), it2.next()));
                }
            }
        }
        return new StatefulParDoEvaluator(this.delegateFactory.createEvaluator(appliedPTransform, committedBundle.getPCollection(), committedBundle.getKey(), appliedPTransform.getTransform().getSideInputs(), appliedPTransform.getTransform().getMainOutputTag(), appliedPTransform.getTransform().getAdditionalOutputTags().getAll(), appliedPTransform.getTransform().getSchemaInformation()));
    }
}
