package org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput;

import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/KeyedBufferingElementsHandler.class */
public class KeyedBufferingElementsHandler implements BufferingElementsHandler {
    private final KeyedStateBackend backend;
    private final ListStateDescriptor<BufferedElement> stateDescriptor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public static KeyedBufferingElementsHandler create(KeyedStateBackend keyedStateBackend, ListStateDescriptor<BufferedElement> listStateDescriptor) {
        return new KeyedBufferingElementsHandler(keyedStateBackend, listStateDescriptor);
    }

    private KeyedBufferingElementsHandler(KeyedStateBackend keyedStateBackend, ListStateDescriptor<BufferedElement> listStateDescriptor) {
        this.backend = keyedStateBackend;
        this.stateDescriptor = listStateDescriptor;
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferingElementsHandler
    public void buffer(BufferedElement bufferedElement) {
        try {
            ((ListState) this.backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, this.stateDescriptor)).add(bufferedElement);
        } catch (Exception e) {
            throw new RuntimeException("Failed to buffer element in state backend." + bufferedElement, e);
        }
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferingElementsHandler
    public Stream<BufferedElement> getElements() {
        return this.backend.getKeys(this.stateDescriptor.getName(), VoidNamespace.INSTANCE).flatMap(obj -> {
            try {
                this.backend.setCurrentKey(obj);
                return StreamSupport.stream(((ListState) this.backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, this.stateDescriptor)).get().spliterator(), false);
            } catch (Exception e) {
                throw new RuntimeException("Failed to retrieve buffered element from state backend.", e);
            }
        });
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferingElementsHandler
    public void clear() {
        try {
            Iterator it = ((List) this.backend.getKeys(this.stateDescriptor.getName(), VoidNamespace.INSTANCE).collect(Collectors.toList())).iterator();
            while (it.hasNext()) {
                this.backend.setCurrentKey(it.next());
                ((ListState) this.backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, this.stateDescriptor)).clear();
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to clear buffered element state", e);
        }
    }
}
