package org.apache.flink.connector.base.source.reader.fetcher;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.class */
public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(SplitFetcher.class);
    private static final SplitFetcherTask WAKEUP_TASK = new DummySplitFetcherTask("WAKEUP_TASK");
    private final int id;
    private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
    private final SplitReader<E, SplitT> splitReader;
    private final Runnable shutdownHook;
    private FetchTask<E, SplitT> fetchTask;
    private volatile Thread runningThread;
    private volatile SplitFetcherTask runningTask = null;
    private final BlockingDeque<SplitFetcherTask> taskQueue = new LinkedBlockingDeque();
    private final Queue<SplitsChange<SplitT>> splitChanges = new LinkedList();
    private final Map<String, SplitT> assignedSplits = new HashMap();
    private volatile boolean isIdle = true;
    private final AtomicBoolean wakeUp = new AtomicBoolean(false);
    private final AtomicBoolean closed = new AtomicBoolean(false);

    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher$DummySplitFetcherTask.class */
    private static class DummySplitFetcherTask implements SplitFetcherTask {
        private final String name;

        private DummySplitFetcherTask(String str) {
            this.name = str;
        }

        @Override // org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask
        public boolean run() throws InterruptedException {
            return false;
        }

        @Override // org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask
        public void wakeUp() {
        }

        public String toString() {
            return this.name;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SplitFetcher(int i, BlockingQueue<RecordsWithSplitIds<E>> blockingQueue, SplitReader<E, SplitT> splitReader, Runnable runnable) {
        this.id = i;
        this.elementsQueue = blockingQueue;
        this.splitReader = splitReader;
        this.shutdownHook = runnable;
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info("Starting split fetcher {}", Integer.valueOf(this.id));
        try {
            this.runningThread = Thread.currentThread();
            this.fetchTask = new FetchTask<>(this.splitReader, this.elementsQueue, collection -> {
                Map<String, SplitT> map = this.assignedSplits;
                map.getClass();
                collection.forEach((v1) -> {
                    r1.remove(v1);
                });
                updateIsIdle();
            }, this.runningThread);
            while (!this.closed.get()) {
                runOnce();
            }
            Thread.interrupted();
            this.shutdownHook.run();
            LOG.info("Split fetcher {} exited.", Integer.valueOf(this.id));
        } catch (Throwable th) {
            Thread.interrupted();
            this.shutdownHook.run();
            LOG.info("Split fetcher {} exited.", Integer.valueOf(this.id));
            throw th;
        }
    }

    void runOnce() {
        try {
            if (shouldRunFetchTask()) {
                this.runningTask = this.fetchTask;
            } else {
                this.runningTask = this.taskQueue.take();
            }
            LOG.debug("Prepare to run {}", this.runningTask);
            if (!Thread.interrupted() && !this.wakeUp.get() && this.runningTask.run()) {
                LOG.debug("Finished running task {}", this.runningTask);
                this.runningTask = null;
            }
        } catch (InterruptedException e) {
            if (this.closed.get()) {
                return;
            }
            if (!this.wakeUp.get()) {
                throw new RuntimeException(String.format("SplitFetcher thread %d interrupted while polling the records", Integer.valueOf(this.id)), e);
            }
            LOG.debug("Split fetcher has been waken up.");
        }
        maybeEnqueueTask(this.runningTask);
        synchronized (this.wakeUp) {
            this.runningTask = null;
            Thread.interrupted();
            this.wakeUp.set(false);
            LOG.debug("Cleaned wakeup flag.");
        }
    }

    public void addSplits(List<SplitT> list) {
        maybeEnqueueTask(new AddSplitsTask(this.splitReader, list, this.splitChanges, this.assignedSplits));
        updateIsIdle();
        wakeUp(true);
    }

    public void shutdown() {
        if (this.closed.compareAndSet(false, true)) {
            LOG.info("Shutting down split fetcher {}", Integer.valueOf(this.id));
            wakeUp(false);
        }
    }

    Map<String, SplitT> assignedSplits() {
        return this.assignedSplits;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isIdle() {
        return this.isIdle;
    }

    boolean shouldRunFetchTask() {
        return this.taskQueue.isEmpty() && !this.assignedSplits.isEmpty();
    }

    void wakeUp(boolean z) {
        synchronized (this.wakeUp) {
            this.wakeUp.set(true);
            SplitFetcherTask splitFetcherTask = this.runningTask;
            if (isRunningTask(splitFetcherTask)) {
                LOG.debug("Waking up running task {}", splitFetcherTask);
                splitFetcherTask.wakeUp();
            } else if (!z) {
                LOG.debug("Waking up fetcher thread.");
                this.taskQueue.add(WAKEUP_TASK);
            }
        }
    }

    private void maybeEnqueueTask(SplitFetcherTask splitFetcherTask) {
        if (!this.closed.get() && isRunningTask(splitFetcherTask) && splitFetcherTask != this.fetchTask && !this.taskQueue.offerFirst(splitFetcherTask)) {
            throw new RuntimeException("The task queue is full. This is only theoretically possible when really bad thing happens.");
        }
        if (splitFetcherTask != null) {
            LOG.debug("Enqueued task {}", splitFetcherTask);
        }
    }

    private boolean isRunningTask(SplitFetcherTask splitFetcherTask) {
        return (splitFetcherTask == null || splitFetcherTask == WAKEUP_TASK) ? false : true;
    }

    private void updateIsIdle() {
        this.isIdle = this.taskQueue.isEmpty() && this.splitChanges.isEmpty() && this.assignedSplits.isEmpty();
    }
}
