package de.zalando.paradox.nakadi.consumer.partitioned.zk;

import com.google.common.base.Preconditions;
import de.zalando.paradox.nakadi.consumer.core.utils.ThrowableUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.annotation.concurrent.GuardedBy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/zalando/paradox/nakadi/consumer/partitioned/zk/ZKConsumerLeader.class */
abstract class ZKConsumerLeader<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZKConsumerLeader.class);
    private final ZKMember member;
    private final ZKHolder zkHolder;
    private final String consumerName;

    @GuardedBy("this")
    private final Map<T, LeaderControl> keyToLeaderControl = new HashMap();
    private final ThreadFactory leaderSelectorThreadFactory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:de/zalando/paradox/nakadi/consumer/partitioned/zk/ZKConsumerLeader$LeaderControl.class */
    public static class LeaderControl {
        private final LeaderSelector selector;
        private final CountDownLatch stop = new CountDownLatch(1);
        private volatile boolean leader;

        private LeaderControl(LeaderSelector leaderSelector) {
            this.selector = leaderSelector;
        }

        static LeaderControl of(LeaderSelector leaderSelector) {
            return new LeaderControl(leaderSelector);
        }

        void takeLeadership() throws InterruptedException {
            this.leader = true;
            this.stop.await();
        }

        void relinquishLeadership() {
            this.leader = false;
            this.selector.close();
            this.stop.countDown();
        }

        boolean isLeader() {
            return this.leader;
        }
    }

    /* loaded from: input_file:de/zalando/paradox/nakadi/consumer/partitioned/zk/ZKConsumerLeader$LeadershipChangedListener.class */
    interface LeadershipChangedListener<T> {
        void takeLeadership(T t, ZKMember zKMember);

        void relinquishLeadership(T t, ZKMember zKMember);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ZKConsumerLeader(ZKHolder zKHolder, String str, ZKMember zKMember) {
        this.zkHolder = (ZKHolder) Objects.requireNonNull(zKHolder, "zkHolder must not be null");
        this.consumerName = (String) Objects.requireNonNull(str, "consumerName must not be null");
        this.member = (ZKMember) Objects.requireNonNull(zKMember, "member must not be null");
        this.leaderSelectorThreadFactory = ThreadUtils.newThreadFactory("LeaderSelector-" + str);
    }

    public abstract String getLeaderSelectorPath(T t);

    public abstract String getLeaderInfoPath(T t);

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void initGroupLeadership(final T t, final LeadershipChangedListener<T> leadershipChangedListener) throws Exception {
        if (this.keyToLeaderControl.containsKey(t)) {
            return;
        }
        LeaderSelector leaderSelector = new LeaderSelector(this.zkHolder.getCurator(), getLeaderSelectorPath(t), new CloseableExecutorService(Executors.newSingleThreadExecutor(this.leaderSelectorThreadFactory), true), new LeaderSelectorListenerAdapter() { // from class: de.zalando.paradox.nakadi.consumer.partitioned.zk.ZKConsumerLeader.1
            /* JADX WARN: Multi-variable type inference failed */
            public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                ZKConsumerLeader.LOGGER.info("Member [{}] took leadership for [{}]", ZKConsumerLeader.this.member.getMemberId(), t);
                try {
                    LeaderControl leaderControl = (LeaderControl) ZKConsumerLeader.this.keyToLeaderControl.get(t);
                    Preconditions.checkState((leaderControl == null || leaderControl.isLeader()) ? false : true, "Leader control for [%s] in incorrect state", new Object[]{t});
                    ZKConsumerLeader.this.setLeaderInfo(ZKConsumerLeader.this.getLeaderInfoPath(t), ZKConsumerLeader.this.member.getMemberId());
                    leadershipChangedListener.takeLeadership(t, ZKConsumerLeader.this.member);
                    leaderControl.takeLeadership();
                    ZKConsumerLeader.LOGGER.info("Member [{}] relinquished leadership for [{}]", ZKConsumerLeader.this.member.getMemberId(), t);
                    leadershipChangedListener.relinquishLeadership(t, ZKConsumerLeader.this.member);
                } catch (Throwable th) {
                    ZKConsumerLeader.LOGGER.info("Member [{}] relinquished leadership for [{}]", ZKConsumerLeader.this.member.getMemberId(), t);
                    leadershipChangedListener.relinquishLeadership(t, ZKConsumerLeader.this.member);
                    throw th;
                }
            }

            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                ZKConsumerLeader.LOGGER.info("Member [{}] connection state [{}] changed", ZKConsumerLeader.this.member.getMemberId(), t);
                super.stateChanged(curatorFramework, connectionState);
            }
        });
        leaderSelector.setId(this.member.getMemberId());
        LeaderControl of = LeaderControl.of(leaderSelector);
        this.keyToLeaderControl.put(t, of);
        LOGGER.info("Init member [{}] leadership for [{}]", this.member.getMemberId(), t);
        leaderSelector.autoRequeue();
        try {
            leaderSelector.start();
        } catch (Throwable th) {
            this.keyToLeaderControl.remove(t);
            of.relinquishLeadership();
            ThrowableUtils.throwException(th);
        }
    }

    public synchronized void closeGroupLeadership(T t) {
        LeaderControl remove = this.keyToLeaderControl.remove(t);
        if (null == remove) {
            LOGGER.trace("Could not close member [{}] leadership for [{}] because LeaderSelector was not found", this.member.getMemberId(), t);
        } else {
            LOGGER.info("Close member [{}] leadership for [{}]", this.member.getMemberId(), t);
            remove.relinquishLeadership();
        }
    }

    public synchronized void close() {
        LOGGER.info("Closing for member [{}]", this.member.getMemberId());
        this.keyToLeaderControl.values().forEach(leaderControl -> {
            try {
                leaderControl.relinquishLeadership();
            } catch (Exception e) {
                LOGGER.warn("Unexpected error while closing leader selector", e);
            }
        });
        this.keyToLeaderControl.clear();
    }

    public String getConsumerName() {
        return this.consumerName;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setLeaderInfo(String str, String str2) throws Exception {
        CuratorFramework curator = this.zkHolder.getCurator();
        try {
            curator.setData().forPath(str, str2.getBytes("UTF-8"));
        } catch (KeeperException.NoNodeException e) {
            LOGGER.info("Set failed, no leader info node [{}]. Create new node", str);
            ((ACLBackgroundPathAndBytesable) curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)).forPath(str);
            curator.setData().forPath(str, str2.getBytes("UTF-8"));
        }
    }
}
