package org.apache.samza.system.eventhub.producer;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.SamzaHistogram;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/system/eventhub/producer/AsyncSystemProducer.class */
public abstract class AsyncSystemProducer implements SystemProducer {
    public static final String CONFIG_STREAM_LIST = "systems.%s.stream.list";
    private static final String SEND_ERRORS = "sendErrors";
    private static final String SEND_CALLBACK_LATENCY = "sendCallbackLatency";
    private static final String SEND_LATENCY = "sendLatency";
    protected static final String AGGREGATE = "aggregate";
    protected final List<String> streamIds;
    protected final Map<String, String> physicalToStreamIds;
    protected final MetricsRegistry metricsRegistry;
    protected final Set<CompletableFuture<Void>> pendingFutures = ConcurrentHashMap.newKeySet();
    private final AtomicReference<Throwable> sendExceptionOnCallback = new AtomicReference<>(null);
    private final HashMap<String, SamzaHistogram> sendLatency = new HashMap<>();
    private final HashMap<String, SamzaHistogram> sendCallbackLatency = new HashMap<>();
    private final HashMap<String, Counter> sendErrors = new HashMap<>();
    private static final Logger LOG = LoggerFactory.getLogger(AsyncSystemProducer.class.getName());
    private static final long DEFAULT_FLUSH_TIMEOUT_MILLIS = Duration.ofMinutes(1).toMillis();
    private static Counter aggSendErrors = null;
    private static SamzaHistogram aggSendLatency = null;
    private static SamzaHistogram aggSendCallbackLatency = null;

    public AsyncSystemProducer(String str, Config config, MetricsRegistry metricsRegistry) {
        StreamConfig streamConfig = new StreamConfig(config);
        this.streamIds = config.getList(String.format("systems.%s.stream.list", str));
        Stream<String> stream = this.streamIds.stream();
        streamConfig.getClass();
        this.physicalToStreamIds = (Map) stream.collect(Collectors.toMap(streamConfig::getPhysicalName, Function.identity()));
        this.metricsRegistry = metricsRegistry;
    }

    public synchronized void send(String str, OutgoingMessageEnvelope outgoingMessageEnvelope) {
        checkForSendCallbackErrors("Received exception on message send");
        String stream = outgoingMessageEnvelope.getSystemStream().getStream();
        String orDefault = this.physicalToStreamIds.getOrDefault(stream, stream);
        long currentTimeMillis = System.currentTimeMillis();
        CompletableFuture<Void> sendAsync = sendAsync(str, outgoingMessageEnvelope);
        long currentTimeMillis2 = System.currentTimeMillis();
        long j = currentTimeMillis2 - currentTimeMillis;
        this.sendLatency.get(orDefault).update(j);
        aggSendLatency.update(j);
        this.pendingFutures.add(sendAsync);
        sendAsync.handle((r9, th) -> {
            long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis2;
            this.sendCallbackLatency.get(orDefault).update(currentTimeMillis3);
            aggSendCallbackLatency.update(currentTimeMillis3);
            if (th != null) {
                this.sendErrors.get(orDefault).inc();
                aggSendErrors.inc();
                LOG.error("Send message to event hub: {} failed with exception: ", orDefault, th);
                this.sendExceptionOnCallback.compareAndSet(null, th);
            }
            return r9;
        });
    }

    public void start() {
        this.streamIds.forEach(str -> {
            this.sendCallbackLatency.put(str, new SamzaHistogram(this.metricsRegistry, str, SEND_CALLBACK_LATENCY));
            this.sendLatency.put(str, new SamzaHistogram(this.metricsRegistry, str, SEND_LATENCY));
            this.sendErrors.put(str, this.metricsRegistry.newCounter(str, SEND_ERRORS));
        });
        if (aggSendLatency == null) {
            aggSendLatency = new SamzaHistogram(this.metricsRegistry, "aggregate", SEND_LATENCY);
            aggSendCallbackLatency = new SamzaHistogram(this.metricsRegistry, "aggregate", SEND_CALLBACK_LATENCY);
            aggSendErrors = this.metricsRegistry.newCounter("aggregate", SEND_ERRORS);
        }
    }

    public synchronized void flush(String str) {
        LOG.info("Trying to flush pending {} sends.", Long.valueOf(this.pendingFutures.stream().filter(completableFuture -> {
            return !completableFuture.isDone();
        }).count()));
        checkForSendCallbackErrors("Received exception on message send.");
        try {
            CompletableFuture.allOf((CompletableFuture[]) this.pendingFutures.toArray(new CompletableFuture[this.pendingFutures.size()])).get(DEFAULT_FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
            this.pendingFutures.clear();
            checkForSendCallbackErrors("Sending one or more of the messages failed during flush.");
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            String format = String.format("Flush failed with error. Total pending sends %d", Long.valueOf(this.pendingFutures.stream().filter(completableFuture2 -> {
                return !completableFuture2.isDone();
            }).count()));
            LOG.error(format, e);
            throw new SamzaException(format, e);
        }
    }

    public abstract CompletableFuture<Void> sendAsync(String str, OutgoingMessageEnvelope outgoingMessageEnvelope);

    protected void checkForSendCallbackErrors(String str) {
        Throwable th = this.sendExceptionOnCallback.get();
        if (th != null) {
            LOG.error(str, th);
            throw new SamzaException(str, th);
        }
    }
}
