package net.cnri.microservices;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;

/* loaded from: input_file:net/cnri/microservices/EventConsumingKeyedCompleter.class */
public abstract class EventConsumingKeyedCompleter<EventT, ResultT> implements KeyedCompleter<ResultT> {
    private final ConcurrentMap<String, CompletableFuture<ResultT>> waiters = new ConcurrentHashMap();
    private final Collection<MultithreadedKafkaConsumer> messageConsumers = new ArrayList();

    public EventConsumingKeyedCompleter(int i, Collection<String> collection, String str, Map<String, String> map, String str2, Alerter alerter, ExecutorService executorService) {
        for (int i2 = 0; i2 < i; i2++) {
            this.messageConsumers.add(new MultithreadedKafkaConsumer(collection, str, map, str2, alerter, executorService));
        }
    }

    @Override // net.cnri.microservices.KeyedCompleter
    public void init() {
        Iterator<MultithreadedKafkaConsumer> it = this.messageConsumers.iterator();
        while (it.hasNext()) {
            it.next().start(this::consume);
        }
    }

    @Override // net.cnri.microservices.KeyedCompleter
    public void shutdown() {
        Iterator<MultithreadedKafkaConsumer> it = this.messageConsumers.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
    }

    private void consume(String str) {
        EventT deserialize = deserialize(str);
        if (filter(deserialize)) {
            CompletableFuture<ResultT> remove = this.waiters.remove(getKey(deserialize));
            if (remove == null) {
                return;
            }
            remove.complete(getResult(deserialize));
        }
    }

    protected abstract EventT deserialize(String str);

    protected boolean filter(EventT eventt) {
        return true;
    }

    protected abstract String getKey(EventT eventt);

    protected abstract ResultT getResult(EventT eventt);

    @Override // net.cnri.microservices.KeyedCompleter
    public CompletableFuture<ResultT> register(String str) {
        ensureStarted();
        CompletableFuture<ResultT> completableFuture = new CompletableFuture<>();
        if (this.waiters.putIfAbsent(str, completableFuture) != null) {
            throw new IllegalStateException("Already waiting on " + str);
        }
        return completableFuture;
    }

    private void ensureStarted() {
        Iterator<MultithreadedKafkaConsumer> it = this.messageConsumers.iterator();
        while (it.hasNext()) {
            it.next().ensureStarted();
        }
    }

    @Override // net.cnri.microservices.KeyedCompleter
    public void clear(String str) {
        this.waiters.remove(str);
    }
}
