package com.adsoul.redjob.channel;

import com.adsoul.redjob.queue.FifoDaoImpl;
import com.adsoul.redjob.worker.AbstractWorker;
import com.adsoul.redjob.worker.Execution;
import com.adsoul.redjob.worker.WorkerDaoImpl;
import com.adsoul.redjob.worker.events.WorkerError;
import com.adsoul.redjob.worker.events.WorkerStart;
import com.adsoul.redjob.worker.events.WorkerStopped;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.MDC;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/adsoul/redjob/channel/ChannelWorker.class */
public class ChannelWorker extends AbstractWorker<ChannelWorkerState> {
    private ChannelDao channelDao;
    private List<String> channels;
    private RedisMessageListenerContainer listenerContainer;
    private final MessageListener listener;
    private final ReadWriteLock active;

    public ChannelWorker() {
        super(new ChannelWorkerState());
        this.listener = this::receive;
        this.active = new ReentrantReadWriteLock();
    }

    @Override // com.adsoul.redjob.worker.AbstractWorker
    @PostConstruct
    public void afterPropertiesSet() throws Exception {
        Assert.notNull(this.channels, "Precondition violated: channels != null.");
        Assert.notNull(this.channelDao, "Precondition violated: channelDao != null.");
        super.afterPropertiesSet();
    }

    @Override // com.adsoul.redjob.worker.Worker
    public void start() {
        this.log.info("Starting worker {}.", getName());
        this.log.info("Listening to channels {}.", StringUtils.collectionToCommaDelimitedString(this.channels));
        Stream<String> stream = this.channels.stream();
        ChannelDao channelDao = this.channelDao;
        Objects.requireNonNull(channelDao);
        List list = (List) stream.map(channelDao::mo1getTopic).collect(Collectors.toList());
        synchronized (this.listenerContainer) {
            this.listenerContainer.addMessageListener(this.listener, list);
        }
        ((ChannelWorkerState) this.state).setChannels((Collection) list.stream().map((v0) -> {
            return v0.getTopic();
        }).collect(Collectors.toSet()));
        setWorkerState((v0) -> {
            v0.start();
        }, new WorkerStart(this));
    }

    @Override // com.adsoul.redjob.worker.AbstractWorker, com.adsoul.redjob.worker.Worker
    public void stop() {
        super.stop();
        synchronized (this.listenerContainer) {
            this.listenerContainer.removeMessageListener(this.listener);
        }
        try {
            this.active.writeLock().lock();
            setWorkerState((v0) -> {
                v0.stopped();
            }, new WorkerStopped(this));
            this.workerDao.stop(this.name);
        } finally {
            this.active.writeLock().unlock();
            this.log.info("Stopped worker {}.", getName());
        }
    }

    @Override // com.adsoul.redjob.worker.AbstractWorker
    @PreDestroy
    public void destroy() {
        stop();
    }

    private void receive(Message message, byte[] bArr) {
        try {
            try {
                this.active.readLock().lock();
                MDC.put(WorkerDaoImpl.WORKER, getName());
                String channel = this.channelDao.getChannel(message);
                MDC.put(FifoDaoImpl.QUEUE, channel);
                Execution execution = this.channelDao.getExecution(message);
                if (execution == null) {
                    this.log.warn("Failed to deserialize job execution.");
                    this.active.readLock().unlock();
                    MDC.remove("job");
                    MDC.remove("execution");
                    MDC.remove(FifoDaoImpl.QUEUE);
                    MDC.remove(WorkerDaoImpl.WORKER);
                    return;
                }
                MDC.put("execution", Long.toString(execution.getId()));
                MDC.put("job", execution.getJob().getClass().getSimpleName());
                process(channel, execution);
                this.active.readLock().unlock();
                MDC.remove("job");
                MDC.remove("execution");
                MDC.remove(FifoDaoImpl.QUEUE);
                MDC.remove(WorkerDaoImpl.WORKER);
            } catch (InvalidDataAccessApiUsageException e) {
                this.log.error("Uncaught exception in worker: {}", e.getMessage());
                this.eventBus.publishEvent(new WorkerError(this, e));
                this.active.readLock().unlock();
                MDC.remove("job");
                MDC.remove("execution");
                MDC.remove(FifoDaoImpl.QUEUE);
                MDC.remove(WorkerDaoImpl.WORKER);
            } catch (Throwable th) {
                this.log.error("Uncaught exception in worker.", th);
                this.eventBus.publishEvent(new WorkerError(this, th));
                this.active.readLock().unlock();
                MDC.remove("job");
                MDC.remove("execution");
                MDC.remove(FifoDaoImpl.QUEUE);
                MDC.remove(WorkerDaoImpl.WORKER);
            }
        } catch (Throwable th2) {
            this.active.readLock().unlock();
            MDC.remove("job");
            MDC.remove("execution");
            MDC.remove(FifoDaoImpl.QUEUE);
            MDC.remove(WorkerDaoImpl.WORKER);
            throw th2;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.adsoul.redjob.worker.AbstractWorker
    public String createName() {
        return super.createName() + ":" + StringUtils.collectionToCommaDelimitedString(this.channels);
    }

    public List<String> getChannels() {
        return this.channels;
    }

    public void setChannels(String... strArr) {
        setChannels(Arrays.asList(strArr));
    }

    public void setChannels(List<String> list) {
        this.channels = list;
    }

    public ChannelDao getChannelDao() {
        return this.channelDao;
    }

    public void setChannelDao(ChannelDao channelDao) {
        this.channelDao = channelDao;
    }

    public RedisMessageListenerContainer getListenerContainer() {
        return this.listenerContainer;
    }

    public void setListenerContainer(RedisMessageListenerContainer redisMessageListenerContainer) {
        this.listenerContainer = redisMessageListenerContainer;
    }
}
