package org.apache.flink.statefun.flink.core.functions;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
import org.apache.flink.statefun.flink.core.state.FlinkState;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/functions/RemoteFunctionStateMigrator.class */
final class RemoteFunctionStateMigrator implements KeyedStateFunction<String, MapState<String, byte[]>> {
    private static final String LEGACY_MUX_STATE_NAME = "states";
    private final Map<String, ValueState<byte[]>> demuxValueStates;

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void apply(Map<FunctionType, StatefulFunctionProvider> map, KeyedStateBackend<String> keyedStateBackend, TypeInformation<String> typeInformation, TypeInformation<byte[]> typeInformation2) throws Exception {
        map.entrySet().stream().filter(RemoteFunctionStateMigrator::isRemoteFunctionProvider).forEach(entry -> {
            migrateRemoteFunctionState(entry, keyedStateBackend, typeInformation, typeInformation2);
        });
    }

    private static boolean isRemoteFunctionProvider(Map.Entry<FunctionType, StatefulFunctionProvider> entry) {
        return entry.getValue() instanceof HttpFunctionProvider;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void migrateRemoteFunctionState(Map.Entry<FunctionType, StatefulFunctionProvider> entry, KeyedStateBackend<String> keyedStateBackend, TypeInformation<String> typeInformation, TypeInformation<byte[]> typeInformation2) {
        FunctionType key = entry.getKey();
        try {
            keyedStateBackend.applyToAllKeys(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, multiplexedStateDescriptor(key, typeInformation, typeInformation2), new RemoteFunctionStateMigrator(demuxValueStateHandles(((HttpFunctionProvider) entry.getValue()).getFunctionSpec(key).states(), key, keyedStateBackend, typeInformation2)));
        } catch (Exception e) {
            throw new RuntimeException("Error migrating multiplexed state for remote function type " + key);
        }
    }

    private RemoteFunctionStateMigrator(Map<String, ValueState<byte[]>> map) {
        this.demuxValueStates = (Map) Objects.requireNonNull(map);
    }

    public void process(String str, MapState<String, byte[]> mapState) throws Exception {
        for (Map.Entry entry : mapState.entries()) {
            String str2 = (String) entry.getKey();
            byte[] bArr = (byte[]) entry.getValue();
            ValueState<byte[]> valueState = this.demuxValueStates.get(str2);
            if (valueState != null) {
                valueState.update(bArr);
            }
        }
        mapState.clear();
    }

    private static Map<String, ValueState<byte[]>> demuxValueStateHandles(List<StateSpec> list, FunctionType functionType, KeyedStateBackend<String> keyedStateBackend, TypeInformation<byte[]> typeInformation) throws Exception {
        HashMap hashMap = new HashMap(list.size());
        for (StateSpec stateSpec : list) {
            hashMap.put(stateSpec.name(), keyedStateBackend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, demuxValueStateDescriptor(functionType, stateSpec, typeInformation)));
        }
        return hashMap;
    }

    private static ValueStateDescriptor<byte[]> demuxValueStateDescriptor(FunctionType functionType, StateSpec stateSpec, TypeInformation<byte[]> typeInformation) {
        return new ValueStateDescriptor<>(FlinkState.flinkStateName(functionType, stateSpec.name()), typeInformation);
    }

    private static MapStateDescriptor<String, byte[]> multiplexedStateDescriptor(FunctionType functionType, TypeInformation<String> typeInformation, TypeInformation<byte[]> typeInformation2) {
        return new MapStateDescriptor<>(FlinkState.flinkStateName(functionType, LEGACY_MUX_STATE_NAME), typeInformation, typeInformation2);
    }
}
