package org.apache.beam.runners.fnexecution.state;

import java.util.ArrayList;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.stream.DataStreams;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.sdk.v2.sdk.extensions.protobuf.ByteStringCoder;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/state/StateRequestHandlers.class */
public class StateRequestHandlers {

    @ThreadSafe
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/state/StateRequestHandlers$BagUserStateHandler.class */
    public interface BagUserStateHandler<K, V, W extends BoundedWindow> {
        Iterable<V> get(K k, W w);

        void append(K k, W w, Iterator<V> it);

        void clear(K k, W w);
    }

    @ThreadSafe
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/state/StateRequestHandlers$BagUserStateHandlerFactory.class */
    public interface BagUserStateHandlerFactory {
        <K, V, W extends BoundedWindow> BagUserStateHandler<K, V, W> forUserState(String str, String str2, Coder<K> coder, Coder<V> coder2, Coder<W> coder3);

        static BagUserStateHandlerFactory unsupported() {
            return new BagUserStateHandlerFactory() { // from class: org.apache.beam.runners.fnexecution.state.StateRequestHandlers.BagUserStateHandlerFactory.1
                @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.BagUserStateHandlerFactory
                public <K, V, W extends BoundedWindow> BagUserStateHandler<K, V, W> forUserState(String str, String str2, Coder<K> coder, Coder<V> coder2, Coder<W> coder3) {
                    throw new UnsupportedOperationException(String.format("The %s does not support handling sides inputs for PTransform %s with user state id %s.", BagUserStateHandler.class.getSimpleName(), str, str2));
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/state/StateRequestHandlers$ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter.class */
    static class ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter implements StateRequestHandler {
        private final ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor;
        private final BagUserStateHandlerFactory handlerFactory;
        private final ConcurrentHashMap<ProcessBundleDescriptors.BagUserStateSpec, BagUserStateHandler> cache = new ConcurrentHashMap<>();

        ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor executableProcessBundleDescriptor, BagUserStateHandlerFactory bagUserStateHandlerFactory) {
            this.processBundleDescriptor = executableProcessBundleDescriptor;
            this.handlerFactory = bagUserStateHandlerFactory;
        }

        @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandler
        public CompletionStage<BeamFnApi.StateResponse.Builder> handle(BeamFnApi.StateRequest stateRequest) throws Exception {
            try {
                Preconditions.checkState(BeamFnApi.StateKey.TypeCase.BAG_USER_STATE.equals(stateRequest.getStateKey().getTypeCase()), "Unsupported %s type %s, expected %s", BeamFnApi.StateRequest.class.getSimpleName(), stateRequest.getStateKey().getTypeCase(), BeamFnApi.StateKey.TypeCase.BAG_USER_STATE);
                BeamFnApi.StateKey.BagUserState bagUserState = stateRequest.getStateKey().getBagUserState();
                ProcessBundleDescriptors.BagUserStateSpec bagUserStateSpec = this.processBundleDescriptor.getBagUserStateSpecs().get(bagUserState.getPtransformId()).get(bagUserState.getUserStateId());
                Preconditions.checkState(bagUserStateSpec.keyCoder() instanceof ByteStringCoder, "This %s only supports the %s as the key coder.", BagUserStateHandlerFactory.class.getSimpleName(), ByteStringCoder.class.getSimpleName());
                Preconditions.checkState(bagUserStateSpec.valueCoder() instanceof ByteStringCoder, "This %s only supports the %s as the value coder.", BagUserStateHandlerFactory.class.getSimpleName(), ByteStringCoder.class.getSimpleName());
                BagUserStateHandler computeIfAbsent = this.cache.computeIfAbsent(bagUserStateSpec, this::createHandler);
                ByteString key = bagUserState.getKey();
                BoundedWindow boundedWindow = (BoundedWindow) bagUserStateSpec.windowCoder().decode(bagUserState.getWindow().newInput());
                switch (stateRequest.getRequestCase()) {
                    case GET:
                        return handleGetRequest(stateRequest, key, boundedWindow, computeIfAbsent);
                    case APPEND:
                        return handleAppendRequest(stateRequest, key, boundedWindow, computeIfAbsent);
                    case CLEAR:
                        return handleClearRequest(stateRequest, key, boundedWindow, computeIfAbsent);
                    default:
                        throw new Exception(String.format("Unsupported request type %s for user state.", stateRequest.getRequestCase()));
                }
            } catch (Exception e) {
                CompletableFuture completableFuture = new CompletableFuture();
                completableFuture.completeExceptionally(e);
                return completableFuture;
            }
        }

        private static <W extends BoundedWindow> CompletionStage<BeamFnApi.StateResponse.Builder> handleGetRequest(BeamFnApi.StateRequest stateRequest, ByteString byteString, W w, BagUserStateHandler<ByteString, ByteString, W> bagUserStateHandler) {
            Preconditions.checkState(stateRequest.getGet().getContinuationToken().isEmpty(), "Continuation tokens are unsupported.");
            return CompletableFuture.completedFuture(BeamFnApi.StateResponse.newBuilder().setId(stateRequest.getId()).setGet(BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(bagUserStateHandler.get(byteString, w)))));
        }

        private static <W extends BoundedWindow> CompletionStage<BeamFnApi.StateResponse.Builder> handleAppendRequest(BeamFnApi.StateRequest stateRequest, ByteString byteString, W w, BagUserStateHandler<ByteString, ByteString, W> bagUserStateHandler) {
            bagUserStateHandler.append(byteString, w, ImmutableList.of(stateRequest.getAppend().getData()).iterator());
            return CompletableFuture.completedFuture(BeamFnApi.StateResponse.newBuilder().setId(stateRequest.getId()).setAppend(BeamFnApi.StateAppendResponse.getDefaultInstance()));
        }

        private static <W extends BoundedWindow> CompletionStage<BeamFnApi.StateResponse.Builder> handleClearRequest(BeamFnApi.StateRequest stateRequest, ByteString byteString, W w, BagUserStateHandler<ByteString, ByteString, W> bagUserStateHandler) {
            bagUserStateHandler.clear(byteString, w);
            return CompletableFuture.completedFuture(BeamFnApi.StateResponse.newBuilder().setId(stateRequest.getId()).setClear(BeamFnApi.StateClearResponse.getDefaultInstance()));
        }

        private <K, V, W extends BoundedWindow> BagUserStateHandler<K, V, W> createHandler(ProcessBundleDescriptors.BagUserStateSpec bagUserStateSpec) {
            return this.handlerFactory.forUserState(bagUserStateSpec.transformId(), bagUserStateSpec.userStateId(), bagUserStateSpec.keyCoder(), bagUserStateSpec.valueCoder(), bagUserStateSpec.windowCoder());
        }
    }

    @ThreadSafe
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/state/StateRequestHandlers$SideInputHandler.class */
    public interface SideInputHandler<V, W extends BoundedWindow> {
        Iterable<V> get(byte[] bArr, W w);

        Coder<V> resultCoder();
    }

    @ThreadSafe
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/state/StateRequestHandlers$SideInputHandlerFactory.class */
    public interface SideInputHandlerFactory {
        <T, V, W extends BoundedWindow> SideInputHandler<V, W> forSideInput(String str, String str2, RunnerApi.FunctionSpec functionSpec, Coder<T> coder, Coder<W> coder2);

        static SideInputHandlerFactory unsupported() {
            return new SideInputHandlerFactory() { // from class: org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory.1
                @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory
                public <T, V, W extends BoundedWindow> SideInputHandler<V, W> forSideInput(String str, String str2, RunnerApi.FunctionSpec functionSpec, Coder<T> coder, Coder<W> coder2) {
                    throw new UnsupportedOperationException(String.format("The %s does not support handling sides inputs for PTransform %s with side input id %s.", SideInputHandler.class.getSimpleName(), str, str2));
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/state/StateRequestHandlers$StateKeyTypeDelegatingStateRequestHandler.class */
    static class StateKeyTypeDelegatingStateRequestHandler implements StateRequestHandler {
        private final EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler> handlers;

        StateKeyTypeDelegatingStateRequestHandler(EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler> enumMap) {
            this.handlers = enumMap;
        }

        @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandler
        public CompletionStage<BeamFnApi.StateResponse.Builder> handle(BeamFnApi.StateRequest stateRequest) throws Exception {
            return ((StateRequestHandler) this.handlers.getOrDefault(stateRequest.getStateKey().getTypeCase(), this::handlerNotFound)).handle(stateRequest);
        }

        private CompletionStage<BeamFnApi.StateResponse.Builder> handlerNotFound(BeamFnApi.StateRequest stateRequest) {
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.completeExceptionally(new IllegalStateException());
            return completableFuture;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/state/StateRequestHandlers$StateRequestHandlerToSideInputHandlerFactoryAdapter.class */
    static class StateRequestHandlerToSideInputHandlerFactoryAdapter implements StateRequestHandler {
        private final Map<String, Map<String, ProcessBundleDescriptors.SideInputSpec>> sideInputSpecs;
        private final SideInputHandlerFactory sideInputHandlerFactory;
        private final ConcurrentHashMap<ProcessBundleDescriptors.SideInputSpec, SideInputHandler> cache = new ConcurrentHashMap<>();

        StateRequestHandlerToSideInputHandlerFactoryAdapter(Map<String, Map<String, ProcessBundleDescriptors.SideInputSpec>> map, SideInputHandlerFactory sideInputHandlerFactory) {
            this.sideInputSpecs = map;
            this.sideInputHandlerFactory = sideInputHandlerFactory;
        }

        @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandler
        public CompletionStage<BeamFnApi.StateResponse.Builder> handle(BeamFnApi.StateRequest stateRequest) throws Exception {
            try {
                Preconditions.checkState(BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT.equals(stateRequest.getStateKey().getTypeCase()), "Unsupported %s type %s, expected %s", BeamFnApi.StateRequest.class.getSimpleName(), stateRequest.getStateKey().getTypeCase(), BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT);
                BeamFnApi.StateKey.MultimapSideInput multimapSideInput = stateRequest.getStateKey().getMultimapSideInput();
                SideInputHandler computeIfAbsent = this.cache.computeIfAbsent(this.sideInputSpecs.get(multimapSideInput.getPtransformId()).get(multimapSideInput.getSideInputId()), this::createHandler);
                switch (stateRequest.getRequestCase()) {
                    case GET:
                        return handleGetRequest(stateRequest, computeIfAbsent);
                    case APPEND:
                    case CLEAR:
                    default:
                        throw new Exception(String.format("Unsupported request type %s for side input.", stateRequest.getRequestCase()));
                }
            } catch (Exception e) {
                CompletableFuture completableFuture = new CompletableFuture();
                completableFuture.completeExceptionally(e);
                return completableFuture;
            }
        }

        private <K, V, W extends BoundedWindow> CompletionStage<BeamFnApi.StateResponse.Builder> handleGetRequest(BeamFnApi.StateRequest stateRequest, SideInputHandler<V, W> sideInputHandler) throws Exception {
            Preconditions.checkState(stateRequest.getGet().getContinuationToken().isEmpty(), "Continuation tokens are unsupported.");
            BeamFnApi.StateKey.MultimapSideInput multimapSideInput = stateRequest.getStateKey().getMultimapSideInput();
            Iterable<V> iterable = sideInputHandler.get(multimapSideInput.getKey().toByteArray(), this.sideInputSpecs.get(multimapSideInput.getPtransformId()).get(multimapSideInput.getSideInputId()).windowCoder().decode(multimapSideInput.getWindow().newInput()));
            ArrayList arrayList = new ArrayList();
            Objects.requireNonNull(arrayList);
            DataStreams.ElementDelimitedOutputStream outbound = DataStreams.outbound((v1) -> {
                r0.add(v1);
            });
            Iterator<V> it = iterable.iterator();
            while (it.hasNext()) {
                sideInputHandler.resultCoder().encode(it.next(), outbound);
                outbound.delimitElement();
            }
            outbound.close();
            BeamFnApi.StateResponse.Builder newBuilder = BeamFnApi.StateResponse.newBuilder();
            newBuilder.setId(stateRequest.getId());
            newBuilder.setGet(BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(arrayList)).build());
            return CompletableFuture.completedFuture(newBuilder);
        }

        private <K, V, W extends BoundedWindow> SideInputHandler<V, W> createHandler(ProcessBundleDescriptors.SideInputSpec sideInputSpec) {
            return this.sideInputHandlerFactory.forSideInput(sideInputSpec.transformId(), sideInputSpec.sideInputId(), sideInputSpec.accessPattern(), sideInputSpec.elementCoder(), sideInputSpec.windowCoder());
        }
    }

    public static StateRequestHandler delegateBasedUponType(EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler> enumMap) {
        return new StateKeyTypeDelegatingStateRequestHandler(enumMap);
    }

    public static StateRequestHandler forSideInputHandlerFactory(Map<String, Map<String, ProcessBundleDescriptors.SideInputSpec>> map, SideInputHandlerFactory sideInputHandlerFactory) {
        return new StateRequestHandlerToSideInputHandlerFactoryAdapter(map, sideInputHandlerFactory);
    }

    public static StateRequestHandler forBagUserStateHandlerFactory(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor executableProcessBundleDescriptor, BagUserStateHandlerFactory bagUserStateHandlerFactory) {
        return new ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter(executableProcessBundleDescriptor, bagUserStateHandlerFactory);
    }
}
