package backtype.storm.transactional;

import backtype.storm.coordination.BatchOutputCollectorImpl;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.transactional.ICommitterTransactionalSpout;
import backtype.storm.transactional.ITransactionalSpout;
import backtype.storm.tuple.Tuple;
import java.math.BigInteger;
import java.util.Map;
import java.util.TreeMap;
import org.apache.log4j.Logger;

/* loaded from: input_file:backtype/storm/transactional/TransactionalSpoutBatchExecutor.class */
public class TransactionalSpoutBatchExecutor implements IRichBolt {
    public static Logger LOG = Logger.getLogger(TransactionalSpoutBatchExecutor.class);
    BatchOutputCollectorImpl _collector;
    ITransactionalSpout _spout;
    ITransactionalSpout.Emitter _emitter;
    TreeMap<BigInteger, TransactionAttempt> _activeTransactions = new TreeMap<>();

    public TransactionalSpoutBatchExecutor(ITransactionalSpout iTransactionalSpout) {
        this._spout = iTransactionalSpout;
    }

    @Override // backtype.storm.task.IBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this._collector = new BatchOutputCollectorImpl(outputCollector);
        this._emitter = this._spout.getEmitter(map, topologyContext);
    }

    @Override // backtype.storm.task.IBolt
    public void execute(Tuple tuple) {
        TransactionAttempt transactionAttempt = (TransactionAttempt) tuple.getValue(0);
        try {
            if (!tuple.getSourceStreamId().equals(TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID)) {
                this._emitter.emitBatch(transactionAttempt, tuple.getValue(1), this._collector);
                this._activeTransactions.put(transactionAttempt.getTransactionId(), transactionAttempt);
                this._collector.ack(tuple);
                BigInteger bigInteger = (BigInteger) tuple.getValue(2);
                if (bigInteger != null) {
                    this._activeTransactions.headMap(bigInteger).clear();
                    this._emitter.cleanupBefore(bigInteger);
                }
            } else if (transactionAttempt.equals(this._activeTransactions.get(transactionAttempt.getTransactionId()))) {
                ((ICommitterTransactionalSpout.Emitter) this._emitter).commit(transactionAttempt);
                this._activeTransactions.remove(transactionAttempt.getTransactionId());
                this._collector.ack(tuple);
            } else {
                this._collector.fail(tuple);
            }
        } catch (FailedException e) {
            LOG.warn("Failed to emit batch for transaction", e);
            this._collector.fail(tuple);
        }
    }

    @Override // backtype.storm.task.IBolt
    public void cleanup() {
        this._emitter.close();
    }

    @Override // backtype.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        this._spout.declareOutputFields(outputFieldsDeclarer);
    }

    @Override // backtype.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        return this._spout.getComponentConfiguration();
    }
}
