package co.cask.hydrator.plugin.realtime.source;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.plugin.PluginConfig;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.realtime.RealtimeContext;
import co.cask.cdap.etl.api.realtime.RealtimeSource;
import co.cask.cdap.etl.api.realtime.SourceState;
import co.cask.hydrator.plugin.common.Properties;
import com.amazon.sqs.javamessaging.SQSConnection;
import com.amazon.sqs.javamessaging.SQSConnectionFactory;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import javax.annotation.Nullable;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("AmazonSQS")
@Description("Amazon Simple Queue Service real-time source: emits a record with a field 'body' of type String.")
@Plugin(type = "realtimesource")
/* loaded from: input_file:lib/core-plugins-1.2.0.jar:co/cask/hydrator/plugin/realtime/source/SqsSource.class */
public class SqsSource extends RealtimeSource<StructuredRecord> {
    private static final String REGION_DESCRIPTION = "Region where the queue is located.";
    private static final String ACCESSKEY_DESCRIPTION = "Access Key of the AWS (Amazon Web Services) account to use.";
    private static final String ACCESSID_DESCRIPTION = "Access ID of the AWS (Amazon Web Services) account to use.";
    private static final String QUEUENAME_DESCRIPTION = "Name of the queue.";
    private static final String ENDPOINT_DESCRIPTION = "Endpoint of the SQS server to connect to. Omit this field to connect to AWS (Amazon Web Services).";
    private static final int MAX_MESSAGE_COUNT = 20;
    private static final int TIMEOUT_LENGTH = 1000;
    private final SqsConfig config;
    private SQSConnectionFactory connectionFactory;
    private MessageConsumer consumer;
    private Session session;
    private SQSConnection connection;
    private static final Logger LOG = LoggerFactory.getLogger(SqsSource.class);
    private static final Schema DEFAULT_SCHEMA = Schema.recordOf("event", Schema.Field.of(Properties.Stream.DEFAULT_BODY_FIELD, Schema.of(Schema.Type.STRING)));

    /* loaded from: input_file:lib/core-plugins-1.2.0.jar:co/cask/hydrator/plugin/realtime/source/SqsSource$SqsConfig.class */
    public static class SqsConfig extends PluginConfig {

        @Name("region")
        @Description(SqsSource.REGION_DESCRIPTION)
        private String region;

        @Name(Properties.S3.ACCESS_KEY)
        @Description(SqsSource.ACCESSKEY_DESCRIPTION)
        private String accessKey;

        @Name(Properties.S3.ACCESS_ID)
        @Description(SqsSource.ACCESSID_DESCRIPTION)
        private String accessID;

        @Name("queueName")
        @Description(SqsSource.QUEUENAME_DESCRIPTION)
        private String queueName;

        @Name("endpoint")
        @Description(SqsSource.ENDPOINT_DESCRIPTION)
        @Nullable
        private String endpoint;

        public SqsConfig(String str, String str2, String str3, String str4, @Nullable String str5) {
            this.region = str;
            this.accessID = str3;
            this.accessKey = str2;
            this.queueName = str4;
            this.endpoint = str5;
        }
    }

    public SqsSource(SqsConfig sqsConfig) {
        this.config = sqsConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        pipelineConfigurer.getStageConfigurer().setOutputSchema(DEFAULT_SCHEMA);
    }

    public void initialize(RealtimeContext realtimeContext) {
        try {
            super.initialize(realtimeContext);
            SQSConnectionFactory.Builder withRegion = SQSConnectionFactory.builder().withRegion(Region.getRegion(Regions.fromName(this.config.region)));
            this.connectionFactory = this.config.endpoint == null ? withRegion.build() : withRegion.withEndpoint(this.config.endpoint).build();
            this.connection = this.connectionFactory.createConnection(this.config.accessID, this.config.accessKey);
            this.session = this.connection.createSession(false, 1);
            this.consumer = this.session.createConsumer(this.session.createQueue(this.config.queueName));
            this.connection.start();
        } catch (Exception e) {
            if (this.session != null) {
                try {
                    this.session.close();
                } catch (Exception e2) {
                    LOG.warn("Exception when closing session", (Throwable) e2);
                }
            }
            if (this.connection != null) {
                try {
                    this.connection.close();
                } catch (Exception e3) {
                    LOG.warn("Exception when closing connection", (Throwable) e3);
                }
            }
            if (this.consumer != null) {
                try {
                    this.consumer.close();
                } catch (Exception e4) {
                    LOG.warn("Exception when closing consumer", (Throwable) e4);
                }
            }
            LOG.error("Failed to connect to SQS");
            throw new IllegalStateException("Could not connect to SQS.");
        }
    }

    public SourceState poll(Emitter<StructuredRecord> emitter, SourceState sourceState) throws Exception {
        int i = 0;
        while (true) {
            Message receive = this.consumer.receive(1000L);
            if (receive == null || i >= 20) {
                break;
            }
            String text = ((TextMessage) receive).getText();
            if (text.isEmpty()) {
                receive.acknowledge();
            } else {
                emitter.emit(StructuredRecord.builder(DEFAULT_SCHEMA).set(Properties.Stream.DEFAULT_BODY_FIELD, text).build());
                receive.acknowledge();
                i++;
            }
        }
        return sourceState;
    }

    public void destroy() {
        try {
            this.consumer.close();
            this.session.close();
            this.connection.close();
        } catch (Exception e) {
            throw new RuntimeException("Exception on closing SQS connection: " + e.getMessage(), e);
        }
    }
}
