package de.zalando.paradox.nakadi.consumer.partitioned.zk;

import com.google.common.annotations.VisibleForTesting;
import de.zalando.paradox.nakadi.consumer.core.domain.EventType;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypeCursor;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypePartition;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypePartitions;
import de.zalando.paradox.nakadi.consumer.core.domain.NakadiPartition;
import de.zalando.paradox.nakadi.consumer.core.http.handlers.EventErrorHandler;
import de.zalando.paradox.nakadi.consumer.core.utils.LoggingUtils;
import de.zalando.paradox.nakadi.consumer.core.utils.ThrowableUtils;
import de.zalando.paradox.nakadi.consumer.partitioned.zk.ConsumerPartitionRebalanceStrategy;
import de.zalando.paradox.nakadi.consumer.partitioned.zk.ZKConsumerGroupMember;
import de.zalando.paradox.nakadi.consumer.partitioned.zk.ZKConsumerLeader;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.json.JSONException;

/* loaded from: input_file:de/zalando/paradox/nakadi/consumer/partitioned/zk/ZKLeaderConsumerPartitionCoordinator.class */
public class ZKLeaderConsumerPartitionCoordinator extends AbstractZKConsumerPartitionCoordinator {
    private final ZKMember member;
    private final ConcurrentMap<EventType, ZKGroupMember> eventTypeToGroupMember;
    private final ZKConsumerGroupMember consumerGroupMember;
    private final ConsumerPartitionRebalanceStrategy rebalancer;
    private final AtomicBoolean running;
    private final ZKConsumerPartitionLeader consumerPartitionLeader;
    private final ConsumerPartitionRebalanceStrategy.ResultCallback rebalanceResultCallback;

    private ConsumerPartitionRebalanceStrategy.ResultCallback getResultCallbackHandler() {
        return (eventType, collection, collection2) -> {
            if (this.running.get()) {
                this.log.debug("Rebalance [{}], assign [{}], revoke [{}]", new Object[]{eventType.getName(), getPartitions(collection), getPartitions(collection2)});
                collection2.forEach(nakadiPartition -> {
                    try {
                        this.consumerPartitionLeader.closeGroupLeadership(EventTypePartition.of(eventType, nakadiPartition.getPartition()));
                    } finally {
                        revokePartition(eventType, nakadiPartition.getPartition());
                    }
                });
                collection.forEach(nakadiPartition2 -> {
                    try {
                        this.consumerPartitionLeader.initGroupLeadership(EventTypePartition.of(eventType, nakadiPartition2.getPartition()), new ZKConsumerLeader.LeadershipChangedListener<EventTypePartition>() { // from class: de.zalando.paradox.nakadi.consumer.partitioned.zk.ZKLeaderConsumerPartitionCoordinator.1
                            @Override // de.zalando.paradox.nakadi.consumer.partitioned.zk.ZKConsumerLeader.LeadershipChangedListener
                            public void takeLeadership(EventTypePartition eventTypePartition, ZKMember zKMember) {
                                ZKLeaderConsumerPartitionCoordinator.this.log.info("Member [{}] takes leadership [{}]", zKMember.getMemberId(), eventTypePartition);
                                ZKLeaderConsumerPartitionCoordinator.this.assignPartition(eventTypePartition.getEventType(), nakadiPartition2, ZKLeaderConsumerPartitionCoordinator.this.getOffsetSelector(eventTypePartition.getEventType()));
                            }

                            @Override // de.zalando.paradox.nakadi.consumer.partitioned.zk.ZKConsumerLeader.LeadershipChangedListener
                            public void relinquishLeadership(EventTypePartition eventTypePartition, ZKMember zKMember) {
                                ZKLeaderConsumerPartitionCoordinator.this.log.info("Member [{}] relinquishes leadership [{}]", zKMember.getMemberId(), eventTypePartition);
                                ZKLeaderConsumerPartitionCoordinator.this.revokePartition(eventTypePartition.getEventType(), eventTypePartition.getPartition());
                            }
                        });
                    } catch (Exception e) {
                        ThrowableUtils.throwException(e);
                    }
                });
            }
        };
    }

    public ZKLeaderConsumerPartitionCoordinator(ZKHolder zKHolder, String str, List<EventErrorHandler> list) {
        super(LoggingUtils.getLogger(ZKLeaderConsumerPartitionCoordinator.class, str), zKHolder, str, list);
        this.eventTypeToGroupMember = new ConcurrentHashMap();
        this.running = new AtomicBoolean(true);
        this.rebalanceResultCallback = getResultCallbackHandler();
        this.member = ZKMember.of(UUID.randomUUID().toString());
        this.consumerGroupMember = new ZKConsumerGroupMember(zKHolder, str, this.member);
        this.rebalancer = new ZKLeaderConsumerPartitionRebalanceStrategy(this.member);
        this.consumerPartitionLeader = new ZKConsumerPartitionLeader(zKHolder, str, this.member);
    }

    @VisibleForTesting
    ZKLeaderConsumerPartitionCoordinator(ZKHolder zKHolder, String str, ZKMember zKMember, ZKConsumerGroupMember zKConsumerGroupMember, ConsumerPartitionRebalanceStrategy consumerPartitionRebalanceStrategy, ZKConsumerPartitionLeader zKConsumerPartitionLeader, List<EventErrorHandler> list) {
        super(LoggingUtils.getLogger(ZKLeaderConsumerPartitionCoordinator.class, str), zKHolder, str, list);
        this.eventTypeToGroupMember = new ConcurrentHashMap();
        this.running = new AtomicBoolean(true);
        this.rebalanceResultCallback = getResultCallbackHandler();
        this.member = (ZKMember) Objects.requireNonNull(zKMember);
        this.consumerGroupMember = (ZKConsumerGroupMember) Objects.requireNonNull(zKConsumerGroupMember);
        this.rebalancer = (ConsumerPartitionRebalanceStrategy) Objects.requireNonNull(consumerPartitionRebalanceStrategy);
        this.consumerPartitionLeader = (ZKConsumerPartitionLeader) Objects.requireNonNull(zKConsumerPartitionLeader);
    }

    public void close() {
        if (!this.running.compareAndSet(true, false)) {
            this.log.warn("Coordinator for member [{}] is already closed", this.member.getMemberId());
            return;
        }
        this.log.info("Closing coordinator for member [{}]", this.member.getMemberId());
        this.eventTypeToGroupMember.entrySet().forEach(entry -> {
            ((ZKGroupMember) entry.getValue()).close();
        });
        this.eventTypeToGroupMember.clear();
        this.consumerPartitionLeader.close();
    }

    public void init() {
        if (this.running.compareAndSet(false, true)) {
            this.log.info("Init coordinator for member [{}]", this.member.getMemberId());
        } else {
            this.log.info("Coordinator for member [{}] is already running", this.member.getMemberId());
        }
    }

    public void rebalance(EventTypePartitions eventTypePartitions, Collection<NakadiPartition> collection) {
        if (this.running.get()) {
            rebalance0(eventTypePartitions, collection);
        } else {
            this.log.warn("ConsumerPartitionCoordinator is not running.");
        }
    }

    private void rebalance0(EventTypePartitions eventTypePartitions, Collection<NakadiPartition> collection) {
        revokePartitions(eventTypePartitions.getEventType(), getPartitionsToRevoke(eventTypePartitions, collection));
        this.rebalancer.setNakadiPartitions(eventTypePartitions.getEventType(), collection);
        this.rebalancer.rebalance(eventTypePartitions.getEventType(), this.rebalanceResultCallback);
        joinGroup(eventTypePartitions.getEventType());
    }

    public void finished(EventTypePartition eventTypePartition) {
        this.log.info("Close group leadership on finished [{}]", eventTypePartition);
        try {
            this.consumerPartitionLeader.closeGroupLeadership(eventTypePartition);
        } finally {
            revokePartition(eventTypePartition.getEventType(), eventTypePartition.getPartition());
        }
    }

    private void joinGroup(EventType eventType) {
        if (this.eventTypeToGroupMember.containsKey(eventType)) {
            return;
        }
        ZKGroupMember newGroupMember = this.consumerGroupMember.newGroupMember(eventType, newGroupChangedListener());
        if (null == this.eventTypeToGroupMember.putIfAbsent(eventType, newGroupMember)) {
            try {
                this.log.info("Member [{}] is joining group for event type [{}] ", this.member.getMemberId(), eventType);
                newGroupMember.start();
            } catch (Throwable th) {
                leaveGroup(eventType);
                ThrowableUtils.throwException(th);
            }
        }
    }

    private ZKConsumerGroupMember.GroupChangedListener newGroupChangedListener() {
        return new ZKConsumerGroupMember.GroupChangedListener() { // from class: de.zalando.paradox.nakadi.consumer.partitioned.zk.ZKLeaderConsumerPartitionCoordinator.2
            @Override // de.zalando.paradox.nakadi.consumer.partitioned.zk.ZKConsumerGroupMember.GroupChangedListener
            public void memberAdded(EventType eventType, String str) {
                ZKLeaderConsumerPartitionCoordinator.this.log.info("[{}] member [{}] joined group for event type [{}]", new Object[]{ZKLeaderConsumerPartitionCoordinator.this.member.getMemberId().equals(str) ? "This" : "Other", str, eventType, str});
                onGroupChanged(eventType);
            }

            @Override // de.zalando.paradox.nakadi.consumer.partitioned.zk.ZKConsumerGroupMember.GroupChangedListener
            public void memberRemoved(EventType eventType, String str) {
                ZKLeaderConsumerPartitionCoordinator.this.log.info("[{}] member [{}] left group for event type [{}]", new Object[]{ZKLeaderConsumerPartitionCoordinator.this.member.getMemberId().equals(str) ? "This" : "Other", str, eventType, str});
                onGroupChanged(eventType);
            }

            private Map<String, ZKMember> getCurrentMembers(EventType eventType) {
                ZKGroupMember zKGroupMember = (ZKGroupMember) ZKLeaderConsumerPartitionCoordinator.this.eventTypeToGroupMember.get(eventType);
                if (null == zKGroupMember) {
                    return Collections.emptyMap();
                }
                return (Map) zKGroupMember.getCurrentMembers().entrySet().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry -> {
                    try {
                        return ZKMember.fromByteJson((byte[]) entry.getValue());
                    } catch (JSONException e) {
                        ZKLeaderConsumerPartitionCoordinator.this.log.error("Cannot decode ZK data of member [{}]", entry.getKey(), e);
                        return null;
                    }
                }));
            }

            private void onGroupChanged(EventType eventType) {
                ZKLeaderConsumerPartitionCoordinator.this.rebalancer.setCurrentMembers(eventType, getCurrentMembers(eventType));
                ZKLeaderConsumerPartitionCoordinator.this.rebalancer.rebalance(eventType, ZKLeaderConsumerPartitionCoordinator.this.rebalanceResultCallback);
            }
        };
    }

    private void leaveGroup(EventType eventType) {
        ZKGroupMember remove = this.eventTypeToGroupMember.remove(eventType);
        if (null != remove) {
            this.log.info("Member [{}] is leaving group for event type [{}]", this.member.getMemberId(), eventType);
            remove.close();
        }
    }

    @Override // de.zalando.paradox.nakadi.consumer.partitioned.zk.AbstractZKConsumerPartitionCoordinator
    public /* bridge */ /* synthetic */ Optional getAdminService() {
        return super.getAdminService();
    }

    @Override // de.zalando.paradox.nakadi.consumer.partitioned.zk.AbstractZKConsumerPartitionCoordinator
    public /* bridge */ /* synthetic */ String getConsumerName() {
        return super.getConsumerName();
    }

    @Override // de.zalando.paradox.nakadi.consumer.partitioned.zk.AbstractZKConsumerPartitionCoordinator
    public /* bridge */ /* synthetic */ void setDeleteUnavailableCursors(boolean z) {
        super.setDeleteUnavailableCursors(z);
    }

    @Override // de.zalando.paradox.nakadi.consumer.partitioned.zk.AbstractZKConsumerPartitionCoordinator
    public /* bridge */ /* synthetic */ void setStartNewestAvailableOffset(boolean z) {
        super.setStartNewestAvailableOffset(z);
    }

    @Override // de.zalando.paradox.nakadi.consumer.partitioned.zk.AbstractZKConsumerPartitionCoordinator
    public /* bridge */ /* synthetic */ void error(int i, String str, EventTypePartition eventTypePartition) {
        super.error(i, str, eventTypePartition);
    }

    @Override // de.zalando.paradox.nakadi.consumer.partitioned.zk.AbstractZKConsumerPartitionCoordinator
    public /* bridge */ /* synthetic */ void error(String str, Throwable th, EventTypePartition eventTypePartition, @Nullable String str2, String str3) {
        super.error(str, th, eventTypePartition, str2, str3);
    }

    @Override // de.zalando.paradox.nakadi.consumer.partitioned.zk.AbstractZKConsumerPartitionCoordinator
    public /* bridge */ /* synthetic */ void flush(EventTypePartition eventTypePartition) {
        super.flush(eventTypePartition);
    }

    @Override // de.zalando.paradox.nakadi.consumer.partitioned.zk.AbstractZKConsumerPartitionCoordinator
    public /* bridge */ /* synthetic */ void commit(EventTypeCursor eventTypeCursor) {
        super.commit(eventTypeCursor);
    }
}
