package de.zalando.paradox.nakadi.consumer.partitioned.zk;

import com.google.common.base.Preconditions;
import de.zalando.paradox.nakadi.consumer.core.domain.EventType;
import de.zalando.paradox.nakadi.consumer.core.domain.NakadiPartition;
import de.zalando.paradox.nakadi.consumer.partitioned.zk.ConsumerPartitionRebalanceStrategy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/zalando/paradox/nakadi/consumer/partitioned/zk/ZKLeaderConsumerPartitionRebalanceStrategy.class */
public class ZKLeaderConsumerPartitionRebalanceStrategy implements ConsumerPartitionRebalanceStrategy {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZKLeaderConsumerPartitionRebalanceStrategy.class);
    private final Map<EventType, Collection<NakadiPartition>> eventNakadiPartitions;
    private final Map<EventType, Map<String, ZKMember>> eventCurrentMembers;
    private final Map<EventType, Lock> eventLocks;
    private final ZKMember member;
    private final MembersOrdering membersOrdering;

    @FunctionalInterface
    /* loaded from: input_file:de/zalando/paradox/nakadi/consumer/partitioned/zk/ZKLeaderConsumerPartitionRebalanceStrategy$MembersOrdering.class */
    public interface MembersOrdering {
        String[] getOrderedMembersIds(Map<String, ZKMember> map);
    }

    public ZKLeaderConsumerPartitionRebalanceStrategy(ZKMember zKMember) {
        this(zKMember, ZKMembersOrderings.MEMBERS_ID_ORDER);
    }

    public ZKLeaderConsumerPartitionRebalanceStrategy(ZKMember zKMember, MembersOrdering membersOrdering) {
        this.eventNakadiPartitions = new ConcurrentHashMap();
        this.eventCurrentMembers = new ConcurrentHashMap();
        this.eventLocks = new ConcurrentHashMap();
        this.member = (ZKMember) Objects.requireNonNull(zKMember, "member must not be null");
        this.membersOrdering = (MembersOrdering) Objects.requireNonNull(membersOrdering, "membersOrdering must not be null");
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // de.zalando.paradox.nakadi.consumer.partitioned.zk.ConsumerPartitionRebalanceStrategy
    public void setNakadiPartitions(EventType eventType, Collection<NakadiPartition> collection) {
        this.eventNakadiPartitions.put(Objects.requireNonNull(eventType, "eventType must not be null"), collection);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // de.zalando.paradox.nakadi.consumer.partitioned.zk.ConsumerPartitionRebalanceStrategy
    public void setCurrentMembers(EventType eventType, Map<String, ZKMember> map) {
        this.eventCurrentMembers.put(Objects.requireNonNull(eventType, "eventType must not be null"), map);
    }

    @Override // de.zalando.paradox.nakadi.consumer.partitioned.zk.ConsumerPartitionRebalanceStrategy
    public void rebalance(EventType eventType, ConsumerPartitionRebalanceStrategy.ResultCallback resultCallback) {
        Objects.requireNonNull(eventType, "eventType must not be null");
        Objects.requireNonNull(resultCallback, "resultCallback must not be null");
        doWithLock(eventType, () -> {
            Collection<NakadiPartition> orDefault = this.eventNakadiPartitions.getOrDefault(eventType, Collections.emptyList());
            Map<String, ZKMember> orDefault2 = this.eventCurrentMembers.getOrDefault(eventType, Collections.emptyMap());
            LOGGER.debug("Rebalance input for [{}] member [{}] , all members [{}] , nakadi partitions [{}]", new Object[]{eventType, this.member.getMemberId(), orDefault2.keySet(), orDefault});
            if (orDefault.isEmpty() || !orDefault2.containsKey(this.member.getMemberId())) {
                LOGGER.info("More data is required to invoke rebalance for [{}] member", this.member.getMemberId());
                return;
            }
            if (orDefault2.size() == 1) {
                resultCallback.rebalancePartitions(eventType, orDefault, Collections.emptyList());
                return;
            }
            Map map = (Map) orDefault.stream().collect(Collectors.toMap((v0) -> {
                return v0.getPartition();
            }, Function.identity()));
            String[] strArr = (String[]) map.keySet().toArray(new String[map.size()]);
            Arrays.sort(strArr);
            String[] orderedMembersIds = this.membersOrdering.getOrderedMembersIds(orDefault2);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            for (int i = 0; i < strArr.length; i++) {
                int length = i % orderedMembersIds.length;
                NakadiPartition nakadiPartition = (NakadiPartition) map.get(strArr[i]);
                Preconditions.checkState(null != nakadiPartition, "Partition [%s] value is missing", new Object[]{strArr[i]});
                if (orderedMembersIds[length].equals(this.member.getMemberId())) {
                    arrayList.add(nakadiPartition);
                } else {
                    arrayList2.add(nakadiPartition);
                }
            }
            resultCallback.rebalancePartitions(eventType, arrayList, arrayList2);
        });
    }

    private void doWithLock(EventType eventType, Runnable runnable) {
        Lock lock = getLock(eventType);
        lock.lock();
        try {
            runnable.run();
            lock.unlock();
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    private Lock getLock(EventType eventType) {
        Lock lock = this.eventLocks.get(eventType);
        if (null == lock) {
            Map<EventType, Lock> map = this.eventLocks;
            ReentrantLock reentrantLock = new ReentrantLock();
            lock = reentrantLock;
            Lock putIfAbsent = map.putIfAbsent(eventType, reentrantLock);
            if (null != putIfAbsent) {
                lock = putIfAbsent;
            }
        }
        return lock;
    }
}
