package de.zalando.paradox.nakadi.consumer.partitioned.zk;

import de.zalando.paradox.nakadi.consumer.core.domain.EventTypeCursor;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypePartition;
import de.zalando.paradox.nakadi.consumer.core.http.handlers.EventErrorHandler;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCommitCallback;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCommitCallbackProvider;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionOffsetManagement;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionRebalanceListener;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionRebalanceListenerProvider;
import de.zalando.paradox.nakadi.consumer.core.utils.ThrowableUtils;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/zalando/paradox/nakadi/consumer/partitioned/zk/ZKConsumerSyncOffsetManagement.class */
class ZKConsumerSyncOffsetManagement implements PartitionOffsetManagement {
    private static final int PRECONDITION_FAILED_HTTP_CODE = 412;
    private volatile boolean deleteUnavailableCursors;
    private final PartitionCommitCallbackProvider commitCallbackProvider;
    private final PartitionRebalanceListenerProvider rebalanceListenerProvider;
    private final ZKConsumerOffset consumerOffset;
    private final List<EventErrorHandler> eventErrorHandlers;
    private static final Pattern CURSOR_NOT_AVAILABLE = Pattern.compile("offset \\S+ for partition \\S+ is unavailable");
    private static final Logger LOGGER = LoggerFactory.getLogger(ZKConsumerSyncOffsetManagement.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public ZKConsumerSyncOffsetManagement(@Nonnull ZKConsumerOffset zKConsumerOffset, @Nonnull PartitionCommitCallbackProvider partitionCommitCallbackProvider, @Nonnull PartitionRebalanceListenerProvider partitionRebalanceListenerProvider, @Nonnull List<EventErrorHandler> list) {
        this.commitCallbackProvider = partitionCommitCallbackProvider;
        this.rebalanceListenerProvider = partitionRebalanceListenerProvider;
        this.consumerOffset = zKConsumerOffset;
        this.eventErrorHandlers = (List) Objects.requireNonNull(list);
    }

    public void commit(EventTypeCursor eventTypeCursor) {
        LOGGER.debug("Commit [{}] ", eventTypeCursor);
        try {
            this.consumerOffset.setOffset(eventTypeCursor);
        } catch (Exception e) {
            ThrowableUtils.throwException(e);
        }
        PartitionCommitCallback partitionCommitCallback = this.commitCallbackProvider.getPartitionCommitCallback(eventTypeCursor.getEventTypePartition());
        if (null != partitionCommitCallback) {
            partitionCommitCallback.onCommitComplete(eventTypeCursor);
        }
    }

    public void flush(EventTypePartition eventTypePartition) {
        LOGGER.debug("Flush [{}] ", eventTypePartition);
    }

    public void error(String str, Throwable th, EventTypePartition eventTypePartition, @Nullable String str2, String str3) {
        if (ThrowableUtils.isUnrecoverableException(th)) {
            LOGGER.error("Error [{}] reason [{}]", eventTypePartition, ExceptionUtils.getMessage(th));
            ThrowableUtils.throwException(th);
        } else {
            this.eventErrorHandlers.forEach(eventErrorHandler -> {
                eventErrorHandler.onError(str, th, eventTypePartition, str2, str3);
            });
            LOGGER.error("Error [{}] reason [{}] raw event [{}] ", new Object[]{eventTypePartition, ExceptionUtils.getMessage(th), str3, th});
        }
    }

    public void error(int i, String str, EventTypePartition eventTypePartition) {
        LOGGER.error("Consumer [{}] error [{}] / [{}] for [{}] ", new Object[]{this.consumerOffset.getConsumerName(), Integer.valueOf(i), str, eventTypePartition});
        if (this.deleteUnavailableCursors && i == PRECONDITION_FAILED_HTTP_CODE && CURSOR_NOT_AVAILABLE.matcher(str).find()) {
            String offsetPath = this.consumerOffset.getOffsetPath(eventTypePartition.getName(), eventTypePartition.getPartition());
            try {
                LOGGER.warn("Delete consumer offset [{}] due to error [{}]", offsetPath, str);
                this.consumerOffset.delOffset(offsetPath);
                PartitionRebalanceListener partitionRebalanceListener = this.rebalanceListenerProvider.getPartitionRebalanceListener(eventTypePartition.getEventType());
                if (null != partitionRebalanceListener) {
                    LOGGER.warn("Trying to stop consumer [{}] partition [{}]", this.consumerOffset.getConsumerName(), eventTypePartition);
                    partitionRebalanceListener.onPartitionsRevoked(Collections.singleton(eventTypePartition));
                }
            } catch (Exception e) {
                ThrowableUtils.throwException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setDeleteUnavailableCursors(boolean z) {
        this.deleteUnavailableCursors = z;
    }
}
