package de.zalando.paradox.nakadi.consumer.core.partitioned.impl;

import de.zalando.paradox.nakadi.consumer.core.domain.EventType;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypeCursor;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypePartition;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypePartitions;
import de.zalando.paradox.nakadi.consumer.core.domain.NakadiPartition;
import de.zalando.paradox.nakadi.consumer.core.http.handlers.EventErrorHandler;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCommitCallback;
import de.zalando.paradox.nakadi.consumer.core.utils.ThrowableUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/zalando/paradox/nakadi/consumer/core/partitioned/impl/SimplePartitionCoordinator.class */
public class SimplePartitionCoordinator extends AbstractPartitionCoordinator {
    private final AtomicBoolean running;
    private boolean startNewestAvailableOffset;
    private final List<EventErrorHandler> eventErrorHandlerList;

    public SimplePartitionCoordinator() {
        this(Collections.emptyList());
    }

    public SimplePartitionCoordinator(List<EventErrorHandler> list) {
        super(LoggerFactory.getLogger(SimplePartitionCoordinator.class));
        this.running = new AtomicBoolean(true);
        this.startNewestAvailableOffset = true;
        this.eventErrorHandlerList = list;
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCoordinator
    public void init() {
        if (this.running.compareAndSet(false, true)) {
            this.log.info("Init coordinator");
        } else {
            this.log.info("Coordinator is already running");
        }
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCoordinator
    public void close() {
        if (this.running.compareAndSet(true, false)) {
            this.log.info("Closing coordinator");
        } else {
            this.log.warn("Coordinator is already closed");
        }
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCoordinator
    public void rebalance(EventTypePartitions eventTypePartitions, Collection<NakadiPartition> collection) {
        if (!this.running.get()) {
            this.log.warn("Coordinator is not running.");
        } else {
            revokePartitions(eventTypePartitions.getEventType(), getPartitionsToRevoke(eventTypePartitions, collection));
            assignPartitions(eventTypePartitions.getEventType(), getPartitionsToAssign(eventTypePartitions, collection), collection, getOffsetSelector(eventTypePartitions.getEventType()));
        }
    }

    private Function<NakadiPartition, EventTypeCursor> getOffsetSelector(EventType eventType) {
        return nakadiPartition -> {
            String str;
            if (this.startNewestAvailableOffset) {
                str = nakadiPartition.getNewestAvailableOffset();
            } else {
                str = "BEGIN";
                this.log.warn("Using oldest available offset [{}] without persistent storage.", str);
            }
            return EventTypeCursor.of(EventTypePartition.of(eventType, nakadiPartition.getPartition()), str);
        };
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionOffsetManagement
    public void commit(EventTypeCursor eventTypeCursor) {
        this.log.debug("Commit {} ", eventTypeCursor);
        PartitionCommitCallback partitionCommitCallback = getPartitionCommitCallback(eventTypeCursor.getEventTypePartition());
        if (null != partitionCommitCallback) {
            partitionCommitCallback.onCommitComplete(eventTypeCursor);
        }
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionOffsetManagement
    public void flush(EventTypePartition eventTypePartition) {
        this.log.debug("Flush {} ", eventTypePartition);
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionOffsetManagement
    public void error(Throwable th, EventTypePartition eventTypePartition, String str, String str2) {
        if (ThrowableUtils.isUnrecoverableException(th)) {
            this.log.error("Error [{}] reason [{}]", eventTypePartition, ExceptionUtils.getMessage(th));
            ThrowableUtils.throwException(th);
        } else {
            this.eventErrorHandlerList.forEach(eventErrorHandler -> {
                eventErrorHandler.onError(th, eventTypePartition, str, str2);
            });
            this.log.error("Error [{}] reason [{}]", new Object[]{eventTypePartition, ExceptionUtils.getMessage(th), th});
        }
    }

    @Override // de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionOffsetManagement
    public void error(int i, String str, EventTypePartition eventTypePartition) {
        this.log.error("Error [{}] code [{} / {}]", new Object[]{eventTypePartition, Integer.valueOf(i), str});
    }

    public void setStartNewestAvailableOffset(boolean z) {
        this.startNewestAvailableOffset = z;
    }
}
