package kr.jm.utils.flow.processor;

import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.BiConsumer;
import kr.jm.utils.flow.TransformerInterface;
import kr.jm.utils.flow.publisher.JMSubmissionPublisher;
import kr.jm.utils.flow.subscriber.JMSubscriberBuilder;
import kr.jm.utils.helper.JMLog;
import kr.jm.utils.helper.JMThread;

/* loaded from: input_file:kr/jm/utils/flow/processor/JMConcurrentTransformProcessor.class */
public class JMConcurrentTransformProcessor<T, R> extends JMTransformProcessor<T, R> implements AutoCloseable {
    private SubmissionPublisher<R> submissionPublisher;

    public JMConcurrentTransformProcessor(TransformerInterface<T, R> transformerInterface) {
        this(Flow.defaultBufferSize(), transformerInterface);
    }

    public JMConcurrentTransformProcessor(int i, TransformerInterface<T, R> transformerInterface) {
        this((Executor) null, i, transformerInterface);
    }

    public JMConcurrentTransformProcessor(Executor executor, int i, TransformerInterface<T, R> transformerInterface) {
        this(executor, i, getSingleInputPublisherBiConsumer(transformerInterface));
    }

    public JMConcurrentTransformProcessor(BiConsumer<T, JMSubmissionPublisher<? super R>> biConsumer) {
        this(Flow.defaultBufferSize(), biConsumer);
    }

    public JMConcurrentTransformProcessor(int i, BiConsumer<T, JMSubmissionPublisher<? super R>> biConsumer) {
        this((Executor) null, i, biConsumer);
    }

    public JMConcurrentTransformProcessor(Executor executor, int i, BiConsumer<T, JMSubmissionPublisher<? super R>> biConsumer) {
        super(biConsumer);
        this.submissionPublisher = new SubmissionPublisher<>(executor == null ? JMThread.getCommonPool() : executor, i);
        SubmissionPublisher<R> submissionPublisher = this.submissionPublisher;
        Objects.requireNonNull(submissionPublisher);
        super.subscribe(JMSubscriberBuilder.build(submissionPublisher::submit));
    }

    @Override // kr.jm.utils.flow.processor.JMTransformProcessor, java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super R> subscriber) {
        JMLog.info(this.log, "subscribeWith", subscriber);
        this.submissionPublisher.subscribe(subscriber);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        JMLog.info(this.log, "close");
        this.submissionPublisher.close();
    }
}
