package com.adsoul.redjob.queue;

import com.adsoul.redjob.AbstractDao;
import com.adsoul.redjob.worker.Execution;
import com.adsoul.redjob.worker.json.ExecutionRedisSerializer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/adsoul/redjob/queue/FifoDaoImpl.class */
public class FifoDaoImpl extends AbstractDao implements FifoDao {
    public static final String ID = "id";
    public static final String QUEUES = "queues";
    public static final String QUEUE = "queue";
    public static final String JOBS = "jobs";
    public static final String INFLIGHT = "inflight";
    private ExecutionRedisSerializer executions;
    private RedisTemplate<String, String> redis;

    @Override // com.adsoul.redjob.AbstractDao
    @PostConstruct
    public void afterPropertiesSet() {
        super.afterPropertiesSet();
        Assert.notNull(this.executions, "Precondition violated: executions != null.");
        this.redis = new RedisTemplate<>();
        this.redis.setConnectionFactory(this.connectionFactory);
        this.redis.setKeySerializer(this.strings);
        this.redis.setValueSerializer(this.strings);
        this.redis.setHashKeySerializer(this.strings);
        this.redis.setHashValueSerializer(this.executions);
        this.redis.afterPropertiesSet();
    }

    @Override // com.adsoul.redjob.queue.FifoDao
    public Execution enqueue(String str, Object obj, boolean z) {
        return (Execution) this.redis.execute(redisConnection -> {
            Long incr = redisConnection.incr(key(ID));
            Execution execution = new Execution(this.namespace, str, incr.longValue(), obj);
            redisConnection.sAdd(key(QUEUES), (byte[][]) new byte[]{value(str)});
            byte[] value = value(incr.longValue());
            byte[] value2 = value(execution);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Enqueuing: {}", new String(value2, StandardCharsets.UTF_8));
            }
            redisConnection.hSet(key(JOBS), value, value2);
            if (z) {
                redisConnection.lPush(key(QUEUE, str), (byte[][]) new byte[]{value});
            } else {
                redisConnection.rPush(key(QUEUE, str), (byte[][]) new byte[]{value});
            }
            return execution;
        });
    }

    @Override // com.adsoul.redjob.queue.FifoDao
    public boolean dequeue(String str, long j) {
        return ((Boolean) this.redis.execute(redisConnection -> {
            byte[] value = value(j);
            Long lRem = redisConnection.lRem(key(QUEUE, str), 0L, value);
            redisConnection.hDel(key(JOBS), (byte[][]) new byte[]{value});
            return Boolean.valueOf(lRem != null && lRem.longValue() > 0);
        })).booleanValue();
    }

    @Override // com.adsoul.redjob.queue.FifoDao
    public Execution get(long j) {
        return (Execution) this.redis.execute(redisConnection -> {
            byte[] hGet = redisConnection.hGet(key(JOBS), value(j));
            if (hGet == null) {
                return null;
            }
            return parseExecution(hGet);
        });
    }

    @Override // com.adsoul.redjob.queue.FifoDao
    public void update(Execution execution) {
        this.redis.execute(redisConnection -> {
            byte[] value = value(execution.getId());
            if (!redisConnection.hSet(key(JOBS), value, value(execution)).booleanValue()) {
                return null;
            }
            redisConnection.hDel(key(JOBS), (byte[][]) new byte[]{value});
            return null;
        });
    }

    @Override // com.adsoul.redjob.queue.FifoDao
    public List<Execution> getQueued(String str) {
        return (List) this.redis.execute(redisConnection -> {
            List lRange = redisConnection.lRange(key(QUEUE, str), 0L, -1L);
            if (CollectionUtils.isEmpty(lRange)) {
                return Collections.emptyList();
            }
            List hMGet = redisConnection.hMGet(key(JOBS), (byte[][]) lRange.toArray((Object[]) new byte[lRange.size()]));
            if (CollectionUtils.isEmpty(hMGet)) {
                return Collections.emptyList();
            }
            Assert.isTrue(hMGet.size() == lRange.size(), "Precondition violated: Redis response has the expected length.");
            return (List) hMGet.stream().map(this::parseExecution).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList());
        });
    }

    @Override // com.adsoul.redjob.queue.FifoDao
    public List<Execution> getAll() {
        return (List) this.redis.execute(redisConnection -> {
            Map hGetAll = redisConnection.hGetAll(key(JOBS));
            return CollectionUtils.isEmpty(hGetAll) ? Collections.emptyList() : (List) hGetAll.values().stream().map(this::parseExecution).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList());
        });
    }

    @Override // com.adsoul.redjob.queue.FifoDao
    public int cleanUp() {
        return ((Integer) this.redis.execute(redisConnection -> {
            Map hGetAll = redisConnection.hGetAll(key(JOBS));
            if (CollectionUtils.isEmpty(hGetAll)) {
                return 0;
            }
            byte[][] bArr = (byte[][]) hGetAll.entrySet().stream().filter(entry -> {
                return tryParseExecution((byte[]) entry.getValue()) == null;
            }).map((v0) -> {
                return v0.getKey();
            }).toArray(i -> {
                return new byte[i];
            });
            if (bArr.length == 0) {
                return 0;
            }
            redisConnection.hDel(key(JOBS), bArr);
            return Integer.valueOf(bArr.length);
        })).intValue();
    }

    private Execution tryParseExecution(byte[] bArr) {
        try {
            return parseExecution(bArr);
        } catch (Exception e) {
            return null;
        }
    }

    @Override // com.adsoul.redjob.queue.FifoDao
    public Execution pop(String str, String str2) {
        return (Execution) this.redis.execute(redisConnection -> {
            byte[] lPop = redisConnection.lPop(key(QUEUE, str));
            if (lPop == null) {
                return null;
            }
            redisConnection.lPush(key(INFLIGHT, str2, str), (byte[][]) new byte[]{lPop});
            byte[] hGet = redisConnection.hGet(key(JOBS), lPop);
            if (hGet == null) {
                return null;
            }
            return parseExecution(hGet);
        });
    }

    @Override // com.adsoul.redjob.queue.FifoDao
    public void removeInflight(String str, String str2) {
        this.redis.execute(redisConnection -> {
            redisConnection.lPop(key(INFLIGHT, str2, str));
            return null;
        });
    }

    @Override // com.adsoul.redjob.queue.FifoDao
    public void restoreInflight(String str, String str2) {
        this.redis.execute(redisConnection -> {
            byte[] lPop = redisConnection.lPop(key(INFLIGHT, str2, str));
            if (lPop == null) {
                return null;
            }
            redisConnection.lPush(key(QUEUE, str), (byte[][]) new byte[]{lPop});
            return null;
        });
    }

    @Override // com.adsoul.redjob.queue.FifoDao
    public List<Execution> getInflight(String str, String str2) {
        return (List) this.redis.execute(redisConnection -> {
            List lRange = redisConnection.lRange(key(INFLIGHT, str2, str), 0L, -1L);
            if (CollectionUtils.isEmpty(lRange)) {
                return Collections.emptyList();
            }
            List hMGet = redisConnection.hMGet(key(JOBS), (byte[][]) lRange.toArray((Object[]) new byte[lRange.size()]));
            if (CollectionUtils.isEmpty(hMGet)) {
                return Collections.emptyList();
            }
            Assert.isTrue(hMGet.size() == lRange.size(), "Precondition violated: Redis response has the expected length.");
            return (List) hMGet.stream().map(this::parseExecution).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList());
        });
    }

    protected byte[] value(Execution execution) {
        return this.executions.serialize(execution);
    }

    protected Execution parseExecution(byte[] bArr) {
        return this.executions.m11deserialize(bArr);
    }

    public ExecutionRedisSerializer getExecutions() {
        return this.executions;
    }

    public void setExecutions(ExecutionRedisSerializer executionRedisSerializer) {
        this.executions = executionRedisSerializer;
    }
}
