package co.cask.microservice;

import co.cask.cdap.api.Resources;
import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.worker.AbstractWorker;
import co.cask.cdap.api.worker.WorkerContext;
import co.cask.microservice.api.Channel;
import co.cask.microservice.api.ChannelType;
import co.cask.microservice.api.Endpoints;
import co.cask.microservice.api.Microservice;
import co.cask.microservice.api.MicroserviceConfiguration;
import co.cask.microservice.api.MicroserviceContext;
import co.cask.microservice.api.MicroserviceDefinition;
import co.cask.microservice.api.MicroserviceException;
import co.cask.microservice.channel.ChannelEvent;
import co.cask.microservice.channel.InboundChannelManager;
import co.cask.microservice.channel.OutboundChannelManager;
import co.cask.microservice.channel.sqs.SQSInboundChannelManager;
import co.cask.microservice.channel.sqs.SQSOutboundChannelManager;
import co.cask.microservice.channel.tms.TMSInboundChannelManager;
import co.cask.microservice.channel.tms.TMSOutboundChannelManager;
import com.amazonaws.util.StringUtils;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.gson.Gson;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.tephra.TransactionFailureException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/microservice/MicroserviceWorker.class */
public class MicroserviceWorker extends AbstractWorker {
    private static final Logger LOG = LoggerFactory.getLogger(MicroserviceWorker.class);
    private static final Gson GSON = new Gson();
    private String pluginId;
    private MicroserviceDefinition definition;
    private Microservice service;
    private MicroserviceContext microserviceContext;
    private Metrics metrics;
    private List<InboundChannelManager> inboundManagers;
    private Map<String, OutboundChannelManager> outboundManagers;
    private int errorThreshold;
    private int fetchSize;
    private AtomicBoolean stop;

    public MicroserviceWorker(String str, MicroserviceDefinition microserviceDefinition) {
        this.pluginId = str;
        this.definition = microserviceDefinition;
    }

    protected void configure() {
        super.configure();
        MicroserviceConfiguration configuration = this.definition.getConfiguration();
        int instances = configuration.getInstances() == 0 ? 1 : configuration.getInstances();
        int vCores = configuration.getVCores() == 0 ? 1 : configuration.getVCores();
        int memory = configuration.getMemory() == 0 ? 512 : configuration.getMemory();
        Endpoints endpoints = configuration.getEndpoints();
        if (endpoints.getInbound().size() == 0 && endpoints.getOutbound().size() == 0) {
            throw new IllegalArgumentException(String.format("Microservice currently requires at least one inbound or outbound channel. ", new Object[0]));
        }
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.ID, this.pluginId);
        hashMap.put("config", GSON.toJson(this.definition));
        setName(Microservice.TYPE);
        setDescription(this.definition.getDescription());
        setResources(new Resources(memory, vCores));
        setInstances(instances);
        setProperties(hashMap);
        LOG.info("Microservice '{}' configured to use {} instances, with {} vcores and {}MB memory.", new Object[]{this.definition.getId(), Integer.valueOf(instances), Integer.valueOf(vCores), Integer.valueOf(memory)});
    }

    public void initialize(WorkerContext workerContext) throws Exception {
        OutboundChannelManager sQSOutboundChannelManager;
        InboundChannelManager sQSInboundChannelManager;
        super.initialize(workerContext);
        Map properties = getContext().getSpecification().getProperties();
        this.pluginId = (String) properties.get(Constants.ID);
        this.definition = (MicroserviceDefinition) GSON.fromJson((String) properties.get("config"), MicroserviceDefinition.class);
        this.errorThreshold = this.definition.getConfiguration().getEthreshold();
        this.service = (Microservice) workerContext.newPluginInstance(this.pluginId);
        this.microserviceContext = new MicroserviceContext(workerContext, this.metrics, this.definition);
        LOG.info("Starting microservice '{}'", this.definition.getId());
        Endpoints endpoints = this.definition.getConfiguration().getEndpoints();
        this.fetchSize = endpoints.getFetch() == 0 ? 100 : endpoints.getFetch();
        this.inboundManagers = new ArrayList();
        for (Channel channel : endpoints.getInbound()) {
            switch (channel.getType()) {
                case TMS:
                    sQSInboundChannelManager = new TMSInboundChannelManager(channel, getContext().getMessageFetcher(), getContext().getAdmin(), getContext().getNamespace());
                    break;
                case SQS:
                    sQSInboundChannelManager = new SQSInboundChannelManager(channel);
                    break;
                default:
                    throw new IllegalArgumentException(String.format("Unsupported Channel Type. Supported types are %s", Joiner.on(StringUtils.COMMA_SEPARATOR).join((Object[]) ChannelType.values())));
            }
            InboundChannelManager inboundChannelManager = sQSInboundChannelManager;
            inboundChannelManager.initialize();
            this.inboundManagers.add(inboundChannelManager);
            LOG.info("'{}' microservice binding to inbound channel '{}' of type '{}' with properties : {}", new Object[]{this.definition.getId(), channel.getName(), channel.getType(), channel.getProperties()});
        }
        this.outboundManagers = new HashMap();
        for (Channel channel2 : endpoints.getOutbound()) {
            switch (channel2.getType()) {
                case TMS:
                    sQSOutboundChannelManager = new TMSOutboundChannelManager(channel2, getContext().getMessagePublisher());
                    break;
                case SQS:
                    sQSOutboundChannelManager = new SQSOutboundChannelManager(channel2);
                    break;
                default:
                    throw new IllegalArgumentException(String.format("Unsupported Channel Type. Supported types are %s", Joiner.on(StringUtils.COMMA_SEPARATOR).join((Object[]) ChannelType.values())));
            }
            OutboundChannelManager outboundChannelManager = sQSOutboundChannelManager;
            outboundChannelManager.initialize();
            this.outboundManagers.put(channel2.getName(), outboundChannelManager);
            LOG.info("'{}' microservice binding to outbound channel '{}' of type '{}' with properties : {}", new Object[]{this.definition.getId(), channel2.getName(), channel2.getType(), channel2.getProperties()});
        }
        this.stop = new AtomicBoolean(false);
    }

    public void run() {
        try {
            this.service.start(this.microserviceContext);
            if (this.inboundManagers.size() > 0) {
                consumeLoop();
            } else if (this.outboundManagers.size() > 0) {
                produceLoop();
            } else {
                LOG.error("Microservice configuration didn't specify either inbound or outbound channels. Terminating.");
            }
            LOG.info("Stopping microservice '{}'", this.definition.getId());
            this.service.stop();
            LOG.info("Stopped microservice '{}'", this.definition.getId());
        } catch (MicroserviceException e) {
            LOG.error("Microservice '{}' unexpectedly terminated. {}", this.definition.getId(), e);
        } catch (Exception e2) {
            LOG.error("Exception occurred!", e2);
        }
    }

    public void stop() {
        super.stop();
        this.stop.set(true);
    }

    private void consumeLoop() throws Exception {
        int i = 0;
        int i2 = 0;
        int instanceId = getContext().getInstanceId();
        int instanceCount = getContext().getInstanceCount();
        OutboundEmitter outboundEmitter = new OutboundEmitter(this.outboundManagers.size() != 1 ? null : this.outboundManagers.keySet().iterator().next());
        while (!this.stop.get()) {
            try {
                InboundChannelManager inboundChannelManager = this.inboundManagers.get(i2);
                i2 = (i2 + 1) % this.inboundManagers.size();
                CloseableIterator<ChannelEvent> poll = inboundChannelManager.poll(getOffset(this.definition.getId(), inboundChannelManager.getChannel().getName(), instanceId), this.fetchSize);
                Throwable th = null;
                while (true) {
                    try {
                        try {
                            if (!poll.hasNext()) {
                                break;
                            }
                            ChannelEvent channelEvent = (ChannelEvent) poll.next();
                            if (channelEvent.getContext().getChannel().getType().equals(ChannelType.SQS) || Math.abs(channelEvent.getContext().getMessageId().hashCode()) % instanceCount == instanceId) {
                                try {
                                    if (this.outboundManagers.size() > 0) {
                                        this.service.process(channelEvent.getData(), channelEvent.getContext(), outboundEmitter);
                                        this.metrics.count(getMetricName(String.format("%s.in", inboundChannelManager.getChannel())), 1);
                                        for (Map.Entry<String, List<byte[]>> entry : outboundEmitter.getData().entrySet()) {
                                            this.outboundManagers.get(entry.getKey()).publish(entry.getValue());
                                            this.metrics.count(getMetricName(String.format("%s.out", entry.getKey())), entry.getValue().size());
                                        }
                                        outboundEmitter.clear();
                                    } else {
                                        this.service.consume(channelEvent.getData(), channelEvent.getContext());
                                    }
                                    inboundChannelManager.acknowledge(channelEvent.getContext().getMessageId());
                                    setOffset(this.definition.getId(), inboundChannelManager.getChannel().getName(), instanceId, channelEvent.getContext().getMessageId());
                                } catch (MicroserviceException e) {
                                    LOG.warn(e.getMessage(), e);
                                    if (i > this.errorThreshold) {
                                        this.stop.set(true);
                                        this.metrics.count(getMetricName("error"), 1);
                                        break;
                                    }
                                    i++;
                                }
                            } else {
                                setOffset(this.definition.getId(), inboundChannelManager.getChannel().getName(), instanceId, channelEvent.getContext().getMessageId());
                            }
                        } catch (Throwable th2) {
                            th = th2;
                            throw th2;
                            break;
                        }
                    } catch (Throwable th3) {
                        if (poll != null) {
                            if (th != null) {
                                try {
                                    poll.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                poll.close();
                            }
                        }
                        throw th3;
                        break;
                    }
                }
                if (poll != null) {
                    if (0 != 0) {
                        try {
                            poll.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        poll.close();
                    }
                }
            } catch (TopicNotFoundException e2) {
                this.metrics.count(getMetricName("topic.not.found"), 1);
            } catch (IOException e3) {
                this.metrics.count(getMetricName("io.error"), 1);
                LOG.error(e3.getMessage());
            }
        }
    }

    private void produceLoop() throws Exception {
        int i = 0;
        OutboundEmitter outboundEmitter = new OutboundEmitter(this.outboundManagers.size() != 1 ? null : this.outboundManagers.keySet().iterator().next());
        while (!this.stop.get()) {
            try {
                this.service.produce(outboundEmitter);
                for (Map.Entry<String, List<byte[]>> entry : outboundEmitter.getData().entrySet()) {
                    this.outboundManagers.get(entry.getKey()).publish(entry.getValue());
                    this.metrics.count(getMetricName(String.format("%s.out", entry.getKey())), entry.getValue().size());
                }
                outboundEmitter.clear();
            } catch (MicroserviceException e) {
                if (i > this.errorThreshold) {
                    this.stop.set(true);
                    this.metrics.count(getMetricName("error"), 1);
                    return;
                }
                i++;
            }
        }
    }

    private String getMetricName(String str) {
        return String.format("%s.%s", this.definition.getId(), str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public byte[] getKey(String str, String str2, int i) {
        return Bytes.toBytes(String.format("%s:%s:%d", str, str2, Integer.valueOf(i)));
    }

    private String getOffset(final String str, final String str2, final int i) {
        final String[] strArr = new String[1];
        try {
            getContext().execute(new TxRunnable() { // from class: co.cask.microservice.MicroserviceWorker.1
                public void run(DatasetContext datasetContext) throws Exception {
                    byte[] read = datasetContext.getDataset(Microservice.TYPE).read(MicroserviceWorker.this.getKey(str, str2, i));
                    if (read != null) {
                        strArr[0] = Bytes.toString(read);
                    }
                }
            });
            return strArr[0];
        } catch (TransactionFailureException e) {
            throw Throwables.propagate(e);
        }
    }

    private void setOffset(final String str, final String str2, final int i, final String str3) {
        try {
            getContext().execute(new TxRunnable() { // from class: co.cask.microservice.MicroserviceWorker.2
                public void run(DatasetContext datasetContext) throws Exception {
                    datasetContext.getDataset(Microservice.TYPE).write(MicroserviceWorker.this.getKey(str, str2, i), Bytes.toBytes(str3));
                }
            });
        } catch (TransactionFailureException e) {
            throw Throwables.propagate(e);
        }
    }
}
