package de.zalando.paradox.nakadi.consumer.core.partitioned.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import de.zalando.paradox.nakadi.consumer.core.domain.EventType;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypeCursor;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypePartition;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypePartitions;
import de.zalando.paradox.nakadi.consumer.core.domain.NakadiPartition;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCommitCallback;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCommitCallbackProvider;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCoordinator;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionRebalanceListener;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionRebalanceListenerProvider;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;

/* loaded from: input_file:de/zalando/paradox/nakadi/consumer/core/partitioned/impl/AbstractPartitionCoordinator.class */
public abstract class AbstractPartitionCoordinator implements PartitionCoordinator, PartitionCommitCallbackProvider, PartitionRebalanceListenerProvider {
    protected final Logger log;
    private final ConcurrentMap<EventType, PartitionRebalanceListener> rebalanceListeners = new ConcurrentHashMap();
    private final ConcurrentMap<EventTypePartition, PartitionCommitCallback> commitCallbacks = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractPartitionCoordinator(Logger logger) {
        this.log = logger;
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCoordinator
    public void registerRebalanceListener(EventType eventType, PartitionRebalanceListener partitionRebalanceListener) {
        this.log.info("Register PartitionRebalanceListener for [{}]", eventType);
        Preconditions.checkState(null == this.rebalanceListeners.putIfAbsent(eventType, partitionRebalanceListener), "PartitionRebalanceListener for [%s] already registered", new Object[]{eventType});
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCoordinator
    public void unregisterRebalanceListener(EventType eventType) {
        this.log.info("Unregister PartitionRebalanceListener for [{}]", eventType);
        if (null == this.rebalanceListeners.remove(eventType)) {
            this.log.warn("PartitionRebalanceListener for [{}] is already unregistered", eventType);
        }
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCoordinator
    public void registerCommitCallback(EventTypePartition eventTypePartition, PartitionCommitCallback partitionCommitCallback) {
        Preconditions.checkState(null == this.commitCallbacks.putIfAbsent(eventTypePartition, partitionCommitCallback), "PartitionCommitCallback for [%s] already registered", new Object[]{eventTypePartition});
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCoordinator
    public void unregisterCommitCallback(EventTypePartition eventTypePartition) {
        if (null == this.commitCallbacks.remove(eventTypePartition)) {
            this.log.warn("PartitionCommitCallback for [%s] is already unregistered", eventTypePartition);
        }
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCoordinator
    public void finished(EventTypePartition eventTypePartition) {
        this.log.info("Revoke partition on finished [{}]", eventTypePartition);
        revokePartition(eventTypePartition.getEventType(), eventTypePartition.getPartition());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Set<String> getPartitionsToAssign(EventTypePartitions eventTypePartitions, Collection<NakadiPartition> collection) {
        return Sets.difference(getPartitions(collection), eventTypePartitions.getPartitions());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Set<String> getPartitionsToRevoke(EventTypePartitions eventTypePartitions, Collection<NakadiPartition> collection) {
        return Sets.difference(eventTypePartitions.getPartitions(), getPartitions(collection));
    }

    protected Set<String> getPartitions(Collection<NakadiPartition> collection) {
        return (Set) collection.stream().map((v0) -> {
            return v0.getPartition();
        }).collect(Collectors.toSet());
    }

    protected void revokePartition(EventType eventType, String str) {
        revokePartitions(eventType, Collections.singleton(str));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void revokePartitions(EventType eventType, Set<String> set) {
        if (set.isEmpty()) {
            return;
        }
        PartitionRebalanceListener partitionRebalanceListener = this.rebalanceListeners.get(eventType);
        Preconditions.checkState(null != partitionRebalanceListener, "PartitionRebalanceListener for [%s] is not registered", new Object[]{eventType});
        partitionRebalanceListener.onPartitionsRevoked((List) set.stream().map(str -> {
            return EventTypePartition.of(eventType, str);
        }).collect(Collectors.toList()));
    }

    protected void assignPartition(EventType eventType, NakadiPartition nakadiPartition, Function<NakadiPartition, EventTypeCursor> function) {
        EventTypeCursor apply = function.apply(nakadiPartition);
        PartitionRebalanceListener partitionRebalanceListener = this.rebalanceListeners.get(eventType);
        Preconditions.checkState(null != partitionRebalanceListener, "PartitionRebalanceListener for [%s] is not registered", new Object[]{eventType});
        partitionRebalanceListener.onPartitionsAssigned(Collections.singleton(apply));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assignPartitions(EventType eventType, Set<String> set, Collection<NakadiPartition> collection, Function<NakadiPartition, EventTypeCursor> function) {
        if (set.isEmpty()) {
            PartitionRebalanceListener partitionRebalanceListener = this.rebalanceListeners.get(eventType);
            if (null != partitionRebalanceListener) {
                partitionRebalanceListener.onPartitionsHealthCheck();
                return;
            }
            return;
        }
        List list = (List) collection.stream().filter(nakadiPartition -> {
            return set.contains(nakadiPartition.getPartition());
        }).map(function).collect(Collectors.toList());
        PartitionRebalanceListener partitionRebalanceListener2 = this.rebalanceListeners.get(eventType);
        Preconditions.checkState(null != partitionRebalanceListener2, "PartitionRebalanceListener for [%s] is not registered", new Object[]{eventType});
        partitionRebalanceListener2.onPartitionsAssigned(list);
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCommitCallbackProvider
    public PartitionCommitCallback getPartitionCommitCallback(EventTypePartition eventTypePartition) {
        return this.commitCallbacks.get(eventTypePartition);
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionRebalanceListenerProvider
    public PartitionRebalanceListener getPartitionRebalanceListener(EventType eventType) {
        return this.rebalanceListeners.get(eventType);
    }
}
