package backtype.storm.coordination;

import backtype.storm.coordination.CoordinatedBolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;

/* loaded from: input_file:backtype/storm/coordination/BatchBoltExecutor.class */
public class BatchBoltExecutor implements IRichBolt, CoordinatedBolt.FinishedCallback, CoordinatedBolt.TimeoutCallback {
    public static Logger LOG = Logger.getLogger(BatchBoltExecutor.class);
    byte[] _boltSer;
    Map<Object, IBatchBolt> _openTransactions;
    Map _conf;
    TopologyContext _context;
    BatchOutputCollectorImpl _collector;

    public BatchBoltExecutor(IBatchBolt iBatchBolt) {
        this._boltSer = Utils.serialize(iBatchBolt);
    }

    @Override // backtype.storm.task.IBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this._conf = map;
        this._context = topologyContext;
        this._collector = new BatchOutputCollectorImpl(outputCollector);
        this._openTransactions = new HashMap();
    }

    @Override // backtype.storm.task.IBolt
    public void execute(Tuple tuple) {
        try {
            getBatchBolt(tuple.getValue(0)).execute(tuple);
            this._collector.ack(tuple);
        } catch (FailedException e) {
            LOG.error("Failed to process tuple in batch", e);
            this._collector.fail(tuple);
        }
    }

    @Override // backtype.storm.task.IBolt
    public void cleanup() {
    }

    @Override // backtype.storm.coordination.CoordinatedBolt.FinishedCallback
    public void finishedId(Object obj) {
        IBatchBolt batchBolt = getBatchBolt(obj);
        this._openTransactions.remove(obj);
        batchBolt.finishBatch();
    }

    @Override // backtype.storm.coordination.CoordinatedBolt.TimeoutCallback
    public void timeoutId(Object obj) {
        this._openTransactions.remove(obj);
    }

    @Override // backtype.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        newTransactionalBolt().declareOutputFields(outputFieldsDeclarer);
    }

    @Override // backtype.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        return newTransactionalBolt().getComponentConfiguration();
    }

    private IBatchBolt getBatchBolt(Object obj) {
        IBatchBolt iBatchBolt = this._openTransactions.get(obj);
        if (iBatchBolt == null) {
            iBatchBolt = newTransactionalBolt();
            iBatchBolt.prepare(this._conf, this._context, this._collector, obj);
            this._openTransactions.put(obj, iBatchBolt);
        }
        return iBatchBolt;
    }

    private IBatchBolt newTransactionalBolt() {
        return (IBatchBolt) Utils.deserialize(this._boltSer);
    }
}
