package backtype.storm.spout;

import backtype.storm.drpc.PrepareRequest;
import backtype.storm.generated.ShellComponent;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.ShellProcess;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.json.simple.JSONObject;

/* loaded from: input_file:backtype/storm/spout/ShellSpout.class */
public class ShellSpout implements ISpout {
    public static Logger LOG = Logger.getLogger(ShellSpout.class);
    private SpoutOutputCollector _collector;
    private String[] _command;
    private ShellProcess _process;
    private JSONObject _next;
    private JSONObject _ack;
    private JSONObject _fail;

    public ShellSpout(ShellComponent shellComponent) {
        this(shellComponent.get_execution_command(), shellComponent.get_script());
    }

    public ShellSpout(String... strArr) {
        this._command = strArr;
    }

    @Override // backtype.storm.spout.ISpout
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this._process = new ShellProcess(this._command);
        this._collector = spoutOutputCollector;
        try {
            LOG.info("Launched subprocess with pid " + this._process.launch(map, topologyContext));
        } catch (IOException e) {
            throw new RuntimeException("Error when launching multilang subprocess\n" + this._process.getErrorsString(), e);
        }
    }

    @Override // backtype.storm.spout.ISpout
    public void close() {
        this._process.destroy();
    }

    @Override // backtype.storm.spout.ISpout
    public void nextTuple() {
        if (this._next == null) {
            this._next = new JSONObject();
            this._next.put("command", "next");
        }
        querySubprocess(this._next);
    }

    @Override // backtype.storm.spout.ISpout
    public void ack(Object obj) {
        if (this._ack == null) {
            this._ack = new JSONObject();
            this._ack.put("command", "ack");
        }
        this._ack.put(PrepareRequest.ID_STREAM, obj);
        querySubprocess(this._ack);
    }

    @Override // backtype.storm.spout.ISpout
    public void fail(Object obj) {
        if (this._fail == null) {
            this._fail = new JSONObject();
            this._fail.put("command", "fail");
        }
        this._fail.put(PrepareRequest.ID_STREAM, obj);
        querySubprocess(this._fail);
    }

    private void querySubprocess(Object obj) {
        try {
            this._process.writeMessage(obj);
            while (true) {
                JSONObject readMessage = this._process.readMessage();
                String str = (String) readMessage.get("command");
                if (str.equals("sync")) {
                    return;
                }
                if (str.equals("log")) {
                    LOG.info("Shell msg: " + ((String) readMessage.get("msg")));
                } else if (str.equals("emit")) {
                    String str2 = (String) readMessage.get("stream");
                    if (str2 == null) {
                        str2 = "default";
                    }
                    Long l = (Long) readMessage.get("task");
                    List<Object> list = (List) readMessage.get("tuple");
                    Object obj2 = readMessage.get(PrepareRequest.ID_STREAM);
                    if (l == null) {
                        this._process.writeMessage(this._collector.emit(str2, list, obj2));
                    } else {
                        this._collector.emitDirect((int) l.longValue(), str2, list, obj2);
                    }
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // backtype.storm.spout.ISpout
    public void activate() {
    }

    @Override // backtype.storm.spout.ISpout
    public void deactivate() {
    }
}
