package com.adsoul.redjob.queue;

import com.adsoul.redjob.worker.AbstractWorker;
import com.adsoul.redjob.worker.Execution;
import com.adsoul.redjob.worker.WorkerDaoImpl;
import com.adsoul.redjob.worker.events.JobStale;
import com.adsoul.redjob.worker.events.WorkerError;
import com.adsoul.redjob.worker.events.WorkerFailure;
import com.adsoul.redjob.worker.events.WorkerNext;
import com.adsoul.redjob.worker.events.WorkerPause;
import com.adsoul.redjob.worker.events.WorkerPoll;
import com.adsoul.redjob.worker.events.WorkerStart;
import com.adsoul.redjob.worker.events.WorkerStopped;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import org.slf4j.MDC;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/adsoul/redjob/queue/AbstractQueueWorker.class */
public abstract class AbstractQueueWorker extends AbstractWorker<QueueWorkerState> implements Runnable, QueueWorker {
    public static final int RESTART_DELAY_MS = 5000;
    private List<String> queues;
    private final Object lock;
    private volatile Long executionId;
    private Thread thread;
    protected final AtomicBoolean pause;

    public AbstractQueueWorker() {
        super(new QueueWorkerState());
        this.lock = new Object();
        this.executionId = null;
        this.pause = new AtomicBoolean(false);
    }

    @Override // com.adsoul.redjob.worker.AbstractWorker
    @PostConstruct
    public void afterPropertiesSet() throws Exception {
        Assert.notEmpty(this.queues, "Precondition violated: queues not empty.");
        super.afterPropertiesSet();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.adsoul.redjob.worker.AbstractWorker
    public String createName() {
        return super.createName() + ":" + StringUtils.collectionToCommaDelimitedString(this.queues);
    }

    @Override // com.adsoul.redjob.worker.Worker
    public void start() {
        this.thread = new Thread(this, getName());
        this.thread.start();
    }

    @Override // com.adsoul.redjob.worker.AbstractWorker, com.adsoul.redjob.worker.Worker
    public void stop() {
        super.stop();
        pause(false);
    }

    @Override // com.adsoul.redjob.worker.AbstractWorker
    public void destroy() {
        super.destroy();
        if (this.thread != null) {
            try {
                this.thread.interrupt();
                Thread.yield();
                if (this.thread.isAlive()) {
                    this.log.info("Waiting for worker {} to shutdown.", this.name);
                    this.thread.join();
                    this.log.info("Worker {} has been shut down.", this.name);
                }
                this.thread = null;
            } catch (InterruptedException e) {
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                MDC.put(WorkerDaoImpl.WORKER, getName());
                this.log.info("Starting worker {}.", getName());
                ((QueueWorkerState) this.state).setQueues(this.queues);
                doRun();
                if (!((QueueWorkerState) this.state).isFailed()) {
                    this.log.info("Stopped worker {}.", getName());
                    setWorkerState((v0) -> {
                        v0.stopped();
                    }, new WorkerStopped(this));
                    this.workerDao.stop(this.name);
                }
            } catch (Throwable th) {
                this.log.error("Uncaught exception in worker. Worker {} stopped.", this.name, th);
                setWorkerState((v0) -> {
                    v0.failed();
                }, new WorkerError(this, th));
                if (!((QueueWorkerState) this.state).isFailed()) {
                    this.log.info("Stopped worker {}.", getName());
                    setWorkerState((v0) -> {
                        v0.stopped();
                    }, new WorkerStopped(this));
                    this.workerDao.stop(this.name);
                }
            }
        } catch (Throwable th2) {
            if (!((QueueWorkerState) this.state).isFailed()) {
                this.log.info("Stopped worker {}.", getName());
                setWorkerState((v0) -> {
                    v0.stopped();
                }, new WorkerStopped(this));
                this.workerDao.stop(this.name);
            }
            throw th2;
        }
    }

    protected void doRun() throws Throwable {
        while (this.run.get()) {
            try {
                this.workerDao.ping();
                setWorkerState((v0) -> {
                    v0.start();
                }, new WorkerStart(this));
                startup();
                poll();
            } catch (RedisConnectionFailureException e) {
                this.log.warn("Worker {} failed to connect to Redis. Restarting in {} ms.", getName(), Integer.valueOf(RESTART_DELAY_MS));
                if (!((QueueWorkerState) this.state).isFailed()) {
                    setWorkerState((v0) -> {
                        v0.failed();
                    }, new WorkerFailure(this));
                }
                Thread.sleep(5000L);
            }
        }
    }

    protected void startup() throws Throwable {
        Iterator<String> it = this.queues.iterator();
        while (it.hasNext()) {
            restoreInflight(it.next());
        }
    }

    @Override // com.adsoul.redjob.queue.QueueWorker
    public void pause(boolean z) {
        synchronized (this.pause) {
            this.pause.set(z);
            this.pause.notifyAll();
        }
    }

    private void blockWhilePaused() throws InterruptedException {
        synchronized (this.pause) {
            while (this.pause.get()) {
                try {
                    this.log.info("Pausing worker {}.", this.name);
                    setWorkerState((v0) -> {
                        v0.pause();
                    }, new WorkerPause(this));
                    this.pause.wait();
                    this.log.info("Resuming worker {}.", this.name);
                    setWorkerState((v0) -> {
                        v0.start();
                    }, new WorkerStart(this));
                } catch (Throwable th) {
                    this.log.info("Resuming worker {}.", this.name);
                    setWorkerState((v0) -> {
                        v0.start();
                    }, new WorkerStart(this));
                    throw th;
                }
            }
        }
    }

    protected void poll() throws InterruptedException {
        while (this.run.get()) {
            blockWhilePaused();
            try {
                pollQueues();
            } catch (InterruptedException e) {
                Thread.interrupted();
                this.log.debug("Thread has been interrupted.");
            } catch (Throwable th) {
                this.log.error("Polling queues for jobs failed.", th);
            }
        }
    }

    protected void pollQueues() throws Throwable {
        Iterator<String> it = this.queues.iterator();
        while (it.hasNext()) {
            String next = it.next();
            try {
                MDC.put(FifoDaoImpl.QUEUE, next);
                WorkerPoll workerPoll = new WorkerPoll(this, next);
                this.eventBus.publishEvent(workerPoll);
                if (workerPoll.isVeto()) {
                    this.log.debug("Queue poll vetoed.");
                } else if (pollQueue(next)) {
                    return;
                }
                this.eventBus.publishEvent(new WorkerNext(this, next));
                MDC.remove(FifoDaoImpl.QUEUE);
            } finally {
                this.eventBus.publishEvent(new WorkerNext(this, next));
                MDC.remove(FifoDaoImpl.QUEUE);
            }
        }
        Thread.sleep(this.emptyQueuesSleepMillis);
    }

    /* JADX WARN: Finally extract failed */
    protected boolean pollQueue(String str) throws Throwable {
        Execution doPollQueue = doPollQueue(str);
        if (doPollQueue == null) {
            this.log.debug("Queue is empty.");
            return false;
        }
        Assert.state(doPollQueue.getQueue().equals(str), "Pre-condition violated: Queue is consistent.");
        try {
            try {
                try {
                    synchronized (this.lock) {
                        this.executionId = Long.valueOf(doPollQueue.getId());
                    }
                    MDC.put("execution", Long.toString(doPollQueue.getId()));
                    MDC.put("job", doPollQueue.getJob().getClass().getSimpleName());
                    boolean process = process(doPollQueue);
                    synchronized (this.lock) {
                        this.executionId = null;
                    }
                    try {
                        if (process) {
                            restoreInflight(str);
                        } else {
                            removeInflight(str);
                        }
                        MDC.remove("job");
                        MDC.remove("execution");
                        return true;
                    } catch (Throwable th) {
                        MDC.remove("job");
                        MDC.remove("execution");
                        throw th;
                    }
                } catch (Throwable th2) {
                    synchronized (this.lock) {
                        this.executionId = null;
                        try {
                            if (0 != 0) {
                                restoreInflight(str);
                            } else {
                                removeInflight(str);
                            }
                            MDC.remove("job");
                            MDC.remove("execution");
                            throw th2;
                        } catch (Throwable th3) {
                            MDC.remove("job");
                            MDC.remove("execution");
                            throw th3;
                        }
                    }
                }
            } catch (InvalidDataAccessApiUsageException e) {
                this.log.error("Job processing failed: {}", e.getMessage());
                synchronized (this.lock) {
                    this.executionId = null;
                    try {
                        if (0 != 0) {
                            restoreInflight(str);
                        } else {
                            removeInflight(str);
                        }
                        MDC.remove("job");
                        MDC.remove("execution");
                        return true;
                    } catch (Throwable th4) {
                        MDC.remove("job");
                        MDC.remove("execution");
                        throw th4;
                    }
                }
            }
        } catch (Throwable th5) {
            this.log.error("Job processing failed.", th5);
            synchronized (this.lock) {
                this.executionId = null;
                try {
                    if (0 != 0) {
                        restoreInflight(str);
                    } else {
                        removeInflight(str);
                    }
                    MDC.remove("job");
                    MDC.remove("execution");
                    return true;
                } catch (Throwable th6) {
                    MDC.remove("job");
                    MDC.remove("execution");
                    throw th6;
                }
            }
        }
    }

    @Override // com.adsoul.redjob.queue.QueueWorker
    public void stop(long j) {
        synchronized (this.lock) {
            if (this.executionId.longValue() == j) {
                getExecutionStrategy().stop();
                return;
            }
            try {
                stopStale(j);
            } catch (Throwable th) {
                this.log.error("Failed to stop execution {}.", Long.valueOf(j), th);
            }
        }
    }

    private void stopStale(long j) throws Throwable {
        Execution execution = get(j);
        if (execution == null || !execution.isRunning()) {
            return;
        }
        String worker = execution.getWorker();
        if (worker == null || worker.equals(this.name) || !this.workerDao.names().contains(worker)) {
            execution.stop();
            this.eventBus.publishEvent(new JobStale(this, execution));
            update(execution);
        }
    }

    protected abstract Execution get(long j) throws Throwable;

    protected abstract Execution doPollQueue(String str) throws Throwable;

    protected abstract void removeInflight(String str) throws Throwable;

    protected abstract void restoreInflight(String str) throws Throwable;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.adsoul.redjob.worker.AbstractWorker
    public void startExecution(Execution execution) throws Throwable {
        super.startExecution(execution);
        update(execution);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.adsoul.redjob.worker.AbstractWorker
    public void stopExecution(Execution execution) throws Throwable {
        Execution execution2 = get(this.executionId.longValue());
        if (execution2 != null) {
            super.stopExecution(execution2);
            update(execution2);
        }
    }

    @Override // com.adsoul.redjob.queue.QueueWorker
    public List<String> getQueues() {
        return this.queues;
    }

    public void setQueues(String... strArr) {
        setQueues(Arrays.asList(strArr));
    }

    public void setQueues(List<String> list) {
        this.queues = list;
    }
}
