package co.cask.cdap.batch.stream;

import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.test.ApplicationManager;
import co.cask.cdap.test.StreamManager;
import co.cask.cdap.test.XSlowTests;
import co.cask.cdap.test.base.TestFrameworkTestBase;
import com.google.common.base.Charsets;
import java.io.File;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({XSlowTests.class})
/* loaded from: input_file:co/cask/cdap/batch/stream/BatchStreamIntegrationTestRun.class */
public class BatchStreamIntegrationTestRun extends TestFrameworkTestBase {
    @Test
    public void testStreamBatch() throws Exception {
        submitAndVerifyStreamBatchJob(TestBatchStreamIntegrationApp.class, "s_1", "StreamTestBatch", 300);
    }

    @Test
    public void testStreamBatchIdDecoder() throws Exception {
        submitAndVerifyStreamBatchJob(TestBatchStreamIntegrationApp.class, "s_1", "StreamTestBatchIdDecoder", 300);
    }

    @Test
    public void testNoMapperStreamInput() throws Exception {
        submitAndVerifyStreamBatchJob(NoMapperApp.class, "nomapper", "NoMapperMapReduce", 120);
    }

    @Test
    public void testNoMapperOtherStreamInput() throws Exception {
        submitAndVerifyStreamOtherNamespaceBatchJob(NoMapperStreamSpaceApp.class, NoMapperStreamSpaceApp.INPUTSTREAMSPACE, "nomapper", "NoMapperMapReduce", 120);
    }

    private void submitAndVerifyStreamBatchJob(Class<? extends AbstractApplication> cls, String str, String str2, int i) throws Exception {
        verifyStreamBatchJob(getStreamManager(str), deployApplication(cls, new File[0]), str2, i);
    }

    private void submitAndVerifyStreamOtherNamespaceBatchJob(Class<? extends AbstractApplication> cls, String str, String str2, String str3, int i) throws Exception {
        NamespaceId namespaceId = new NamespaceId(str);
        createNamespace(namespaceId.toId());
        deployApplication(namespaceId.toId(), cls, new File[0]);
        verifyStreamBatchJob(getStreamManager(namespaceId.toId(), str2), deployApplication(cls, new File[0]), str3, i);
    }

    private void verifyStreamBatchJob(StreamManager streamManager, ApplicationManager applicationManager, String str, int i) throws Exception {
        for (int i2 = 0; i2 < 50; i2++) {
            streamManager.send(String.valueOf(i2));
        }
        applicationManager.getMapReduceManager(str).start().waitForFinish(i, TimeUnit.SECONDS);
        KeyValueTable keyValueTable = (KeyValueTable) getDataset("results").get();
        for (int i3 = 0; i3 < 50; i3++) {
            byte[] bytes = String.valueOf(i3).getBytes(Charsets.UTF_8);
            Assert.assertArrayEquals(bytes, keyValueTable.read(bytes));
        }
    }
}
