package org.apache.flink.statefun.flink.core.state;

import java.util.Objects;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.statefun.flink.core.common.KeyBy;
import org.apache.flink.statefun.flink.core.di.Inject;
import org.apache.flink.statefun.flink.core.di.Label;
import org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes;
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.state.Accessor;
import org.apache.flink.statefun.sdk.state.AppendingBufferAccessor;
import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
import org.apache.flink.statefun.sdk.state.PersistedTable;
import org.apache.flink.statefun.sdk.state.PersistedValue;
import org.apache.flink.statefun.sdk.state.TableAccessor;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/state/FlinkState.class */
public final class FlinkState implements State {
    private final RuntimeContext runtimeContext;
    private final KeyedStateBackend<Object> keyedStateBackend;
    private final DynamicallyRegisteredTypes types;

    @Inject
    public FlinkState(@Label("runtime-context") RuntimeContext runtimeContext, @Label("keyed-state-backend") KeyedStateBackend<Object> keyedStateBackend, DynamicallyRegisteredTypes dynamicallyRegisteredTypes) {
        this.runtimeContext = (RuntimeContext) Objects.requireNonNull(runtimeContext);
        this.keyedStateBackend = (KeyedStateBackend) Objects.requireNonNull(keyedStateBackend);
        this.types = (DynamicallyRegisteredTypes) Objects.requireNonNull(dynamicallyRegisteredTypes);
    }

    @Override // org.apache.flink.statefun.flink.core.state.State
    public <T> Accessor<T> createFlinkStateAccessor(FunctionType functionType, PersistedValue<T> persistedValue) {
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor(flinkStateName(functionType, persistedValue.name()), this.types.registerType(persistedValue.type()));
        ExpirationUtil.configureStateTtl(valueStateDescriptor, persistedValue.expiration());
        return new FlinkValueAccessor(this.runtimeContext.getState(valueStateDescriptor));
    }

    @Override // org.apache.flink.statefun.flink.core.state.State
    public <K, V> TableAccessor<K, V> createFlinkStateTableAccessor(FunctionType functionType, PersistedTable<K, V> persistedTable) {
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor(flinkStateName(functionType, persistedTable.name()), this.types.registerType(persistedTable.keyType()), this.types.registerType(persistedTable.valueType()));
        ExpirationUtil.configureStateTtl(mapStateDescriptor, persistedTable.expiration());
        return new FlinkTableAccessor(this.runtimeContext.getMapState(mapStateDescriptor));
    }

    @Override // org.apache.flink.statefun.flink.core.state.State
    public <E> AppendingBufferAccessor<E> createFlinkStateAppendingBufferAccessor(FunctionType functionType, PersistedAppendingBuffer<E> persistedAppendingBuffer) {
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor(flinkStateName(functionType, persistedAppendingBuffer.name()), this.types.registerType(persistedAppendingBuffer.elementType()));
        ExpirationUtil.configureStateTtl(listStateDescriptor, persistedAppendingBuffer.expiration());
        return new FlinkAppendingBufferAccessor(this.runtimeContext.getListState(listStateDescriptor));
    }

    @Override // org.apache.flink.statefun.flink.core.state.State
    public void setCurrentKey(Address address) {
        this.keyedStateBackend.setCurrentKey(KeyBy.apply(address));
    }

    public static String flinkStateName(FunctionType functionType, String str) {
        return String.format("%s.%s.%s", functionType.namespace(), functionType.name(), str);
    }
}
