package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.repackaged.sql.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.repackaged.sql.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.repackaged.sql.org.apache.calcite.rel.RelNode;
import org.apache.beam.repackaged.sql.org.apache.calcite.rel.core.CorrelationId;
import org.apache.beam.repackaged.sql.org.apache.calcite.rel.core.Join;
import org.apache.beam.repackaged.sql.org.apache.calcite.rel.core.JoinRelType;
import org.apache.beam.repackaged.sql.org.apache.calcite.rex.RexCall;
import org.apache.beam.repackaged.sql.org.apache.calcite.rex.RexFieldAccess;
import org.apache.beam.repackaged.sql.org.apache.calcite.rex.RexInputRef;
import org.apache.beam.repackaged.sql.org.apache.calcite.rex.RexLiteral;
import org.apache.beam.repackaged.sql.org.apache.calcite.rex.RexNode;
import org.apache.beam.repackaged.sql.org.apache.calcite.util.Pair;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.class */
public class BeamJoinRel extends Join implements BeamRelNode {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel$ExtractJoinKeys.class */
    public class ExtractJoinKeys extends PTransform<PCollectionList<Row>, PCollectionList<KV<Row, Row>>> {
        static final /* synthetic */ boolean $assertionsDisabled;

        private ExtractJoinKeys() {
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PCollectionList<KV<Row, Row>> expand(PCollectionList<Row> pCollectionList) {
            BeamRelNode beamRelInput = BeamSqlRelUtils.getBeamRelInput(BeamJoinRel.this.left);
            Schema schema = CalciteUtils.toSchema(BeamJoinRel.this.left.getRowType());
            Schema schema2 = CalciteUtils.toSchema(BeamJoinRel.this.right.getRowType());
            if (!$assertionsDisabled && pCollectionList.size() != 2) {
                throw new AssertionError();
            }
            PCollection<Row> pCollection = pCollectionList.get(0);
            PCollection<Row> pCollection2 = pCollectionList.get(1);
            int fieldCount = beamRelInput.getRowType().getFieldCount();
            List extractJoinRexNodes = BeamJoinRel.this.extractJoinRexNodes();
            Schema schema3 = (Schema) extractJoinRexNodes.stream().map(pair -> {
                return BeamJoinRel.getFieldBasedOnRexNode(schema, (RexNode) pair.getKey(), 0);
            }).collect(Schema.toSchema());
            Schema schema4 = (Schema) extractJoinRexNodes.stream().map(pair2 -> {
                return BeamJoinRel.getFieldBasedOnRexNode(schema2, (RexNode) pair2.getValue(), fieldCount);
            }).collect(Schema.toSchema());
            SchemaCoder<Row> of = SchemaCoder.of(schema3);
            PCollection coder = ((PCollection) ((PCollection) pCollection.apply("left_TimestampCombiner", Window.configure().withTimestampCombiner(TimestampCombiner.EARLIEST))).apply("left_ExtractJoinFields", MapElements.via((SimpleFunction) new BeamJoinTransforms.ExtractJoinFields(true, extractJoinRexNodes, schema3, 0)))).setCoder(KvCoder.of(of, pCollection.getCoder()));
            return PCollectionList.of(coder).and(((PCollection) ((PCollection) pCollection2.apply("right_TimestampCombiner", Window.configure().withTimestampCombiner(TimestampCombiner.EARLIEST))).apply("right_ExtractJoinFields", MapElements.via((SimpleFunction) new BeamJoinTransforms.ExtractJoinFields(false, extractJoinRexNodes, schema4, fieldCount)))).setCoder(KvCoder.of(of, pCollection2.getCoder())));
        }

        static {
            $assertionsDisabled = !BeamJoinRel.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel$SideInputJoin.class */
    private class SideInputJoin extends PTransform<PCollectionList<Row>, PCollection<Row>> {
        private SideInputJoin() {
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PCollection<Row> expand(PCollectionList<Row> pCollectionList) {
            Schema schema = CalciteUtils.toSchema(BeamJoinRel.this.left.getRowType());
            Schema schema2 = CalciteUtils.toSchema(BeamJoinRel.this.right.getRowType());
            PCollectionList pCollectionList2 = (PCollectionList) pCollectionList.apply(new ExtractJoinKeys());
            return BeamJoinRel.this.sideInputJoin(pCollectionList2.get(0), pCollectionList2.get(1), schema, schema2);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel$SideInputLookupJoin.class */
    private class SideInputLookupJoin extends PTransform<PCollectionList<Row>, PCollection<Row>> {
        private SideInputLookupJoin() {
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PCollection<Row> expand(PCollectionList<Row> pCollectionList) {
            Schema schema = CalciteUtils.toSchema(BeamJoinRel.this.getRowType());
            BeamRelNode beamRelInput = BeamSqlRelUtils.getBeamRelInput(BeamJoinRel.this.getInput(((Integer) BeamJoinRel.this.seekableInputIndex().get()).intValue()));
            BeamRelNode beamRelInput2 = BeamSqlRelUtils.getBeamRelInput(BeamJoinRel.this.getInput(((Integer) BeamJoinRel.this.nonSeekableInputIndex().get()).intValue()));
            int fieldCount = ((Integer) BeamJoinRel.this.nonSeekableInputIndex().get()).intValue() == 0 ? 0 : CalciteUtils.toSchema(beamRelInput.getRowType()).getFieldCount();
            int fieldCount2 = ((Integer) BeamJoinRel.this.seekableInputIndex().get()).intValue() == 0 ? 0 : CalciteUtils.toSchema(beamRelInput2.getRowType()).getFieldCount();
            BeamIOSourceRel beamIOSourceRel = (BeamIOSourceRel) beamRelInput;
            return ((PCollection) pCollectionList.get(0).apply("join_as_lookup", new BeamJoinTransforms.JoinAsLookup(BeamJoinRel.this.condition, (BeamSqlSeekableTable) beamIOSourceRel.getBeamSqlTable(), CalciteUtils.toSchema(beamIOSourceRel.getRowType()), schema, fieldCount, fieldCount2))).setRowSchema(schema);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel$StandardJoin.class */
    private class StandardJoin extends PTransform<PCollectionList<Row>, PCollection<Row>> {
        private StandardJoin() {
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PCollection<Row> expand(PCollectionList<Row> pCollectionList) {
            Schema schema = CalciteUtils.toSchema(BeamJoinRel.this.left.getRowType());
            Schema schema2 = CalciteUtils.toSchema(BeamJoinRel.this.right.getRowType());
            PCollectionList pCollectionList2 = (PCollectionList) pCollectionList.apply(new ExtractJoinKeys());
            PCollection pCollection = pCollectionList2.get(0);
            PCollection pCollection2 = pCollectionList2.get(1);
            try {
                pCollection.getWindowingStrategy().getWindowFn().verifyCompatibility(pCollection2.getWindowingStrategy().getWindowFn());
                BeamJoinRel.this.verifySupportedTrigger(pCollection);
                BeamJoinRel.this.verifySupportedTrigger(pCollection2);
                return BeamJoinRel.this.standardJoin(pCollection, pCollection2, schema, schema2);
            } catch (IncompatibleWindowException e) {
                throw new IllegalArgumentException("WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e);
            }
        }
    }

    public BeamJoinRel(RelOptCluster relOptCluster, RelTraitSet relTraitSet, RelNode relNode, RelNode relNode2, RexNode rexNode, Set<CorrelationId> set, JoinRelType joinRelType) {
        super(relOptCluster, relTraitSet, relNode, relNode2, rexNode, set, joinRelType);
    }

    @Override // org.apache.beam.repackaged.sql.org.apache.calcite.rel.core.Join
    public Join copy(RelTraitSet relTraitSet, RexNode rexNode, RelNode relNode, RelNode relNode2, JoinRelType joinRelType, boolean z) {
        return new BeamJoinRel(getCluster(), relTraitSet, relNode, relNode2, rexNode, this.variablesSet, joinRelType);
    }

    @Override // org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode
    public List<RelNode> getPCollectionInputs() {
        return isSideInputLookupJoin() ? ImmutableList.of(BeamSqlRelUtils.getBeamRelInput(getInputs().get(nonSeekableInputIndex().get().intValue()))) : super.getPCollectionInputs();
    }

    @Override // org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode
    public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
        if (isSideInputLookupJoin()) {
            return new SideInputLookupJoin();
        }
        if (!isSideInputJoin()) {
            return new StandardJoin();
        }
        if (this.joinType == JoinRelType.FULL) {
            throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join a bounded table with an unbounded table.");
        }
        BeamRelNode beamRelInput = BeamSqlRelUtils.getBeamRelInput(this.left);
        BeamRelNode beamRelInput2 = BeamSqlRelUtils.getBeamRelInput(this.right);
        if ((this.joinType == JoinRelType.LEFT && beamRelInput.isBounded() == PCollection.IsBounded.BOUNDED) || (this.joinType == JoinRelType.RIGHT && beamRelInput2.isBounded() == PCollection.IsBounded.BOUNDED)) {
            throw new UnsupportedOperationException("LEFT side of an OUTER JOIN must be Unbounded table.");
        }
        return new SideInputJoin();
    }

    private boolean isSideInputJoin() {
        BeamRelNode beamRelInput = BeamSqlRelUtils.getBeamRelInput(this.left);
        BeamRelNode beamRelInput2 = BeamSqlRelUtils.getBeamRelInput(this.right);
        return (beamRelInput.isBounded() == PCollection.IsBounded.BOUNDED && beamRelInput2.isBounded() == PCollection.IsBounded.UNBOUNDED) || (beamRelInput.isBounded() == PCollection.IsBounded.UNBOUNDED && beamRelInput2.isBounded() == PCollection.IsBounded.BOUNDED);
    }

    private boolean isSideInputLookupJoin() {
        return seekableInputIndex().isPresent() && nonSeekableInputIndex().isPresent();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Optional<Integer> seekableInputIndex() {
        return seekable(BeamSqlRelUtils.getBeamRelInput(this.left)) ? Optional.of(0) : seekable(BeamSqlRelUtils.getBeamRelInput(this.right)) ? Optional.of(1) : Optional.absent();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Optional<Integer> nonSeekableInputIndex() {
        return !seekable(BeamSqlRelUtils.getBeamRelInput(this.left)) ? Optional.of(0) : !seekable(BeamSqlRelUtils.getBeamRelInput(this.right)) ? Optional.of(1) : Optional.absent();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> void verifySupportedTrigger(PCollection<T> pCollection) {
        WindowingStrategy<?, ?> windowingStrategy = pCollection.getWindowingStrategy();
        if (PCollection.IsBounded.UNBOUNDED.equals(pCollection.isBounded()) && !triggersOncePerWindow(windowingStrategy)) {
            throw new UnsupportedOperationException("Joining unbounded PCollections is currently only supported for non-global windows with triggers that are known to produce output once per window,such as the default trigger with zero allowed lateness. In these cases Beam can guarantee it joins all input elements once per window. " + windowingStrategy + " is not supported");
        }
    }

    private boolean triggersOncePerWindow(WindowingStrategy windowingStrategy) {
        return !(windowingStrategy.getWindowFn() instanceof GlobalWindows) && (windowingStrategy.getTrigger() instanceof DefaultTrigger) && Duration.ZERO.equals(windowingStrategy.getAllowedLateness());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PCollection<Row> standardJoin(PCollection<KV<Row, Row>> pCollection, PCollection<KV<Row, Row>> pCollection2, Schema schema, Schema schema2) {
        PCollection innerJoin;
        switch (this.joinType) {
            case LEFT:
                Schema buildNullSchema = buildNullSchema(schema2);
                innerJoin = org.apache.beam.sdk.extensions.joinlibrary.Join.leftOuterJoin(pCollection, setValueCoder(pCollection2, SchemaCoder.of(buildNullSchema)), Row.nullRow(buildNullSchema));
                break;
            case RIGHT:
                Schema buildNullSchema2 = buildNullSchema(schema);
                innerJoin = org.apache.beam.sdk.extensions.joinlibrary.Join.rightOuterJoin(setValueCoder(pCollection, SchemaCoder.of(buildNullSchema2)), pCollection2, Row.nullRow(buildNullSchema2));
                break;
            case FULL:
                Schema buildNullSchema3 = buildNullSchema(schema);
                Schema buildNullSchema4 = buildNullSchema(schema2);
                innerJoin = org.apache.beam.sdk.extensions.joinlibrary.Join.fullOuterJoin(setValueCoder(pCollection, SchemaCoder.of(buildNullSchema3)), setValueCoder(pCollection2, SchemaCoder.of(buildNullSchema4)), Row.nullRow(buildNullSchema3), Row.nullRow(buildNullSchema4));
                break;
            case INNER:
            default:
                innerJoin = org.apache.beam.sdk.extensions.joinlibrary.Join.innerJoin(pCollection, pCollection2);
                break;
        }
        Schema schema3 = CalciteUtils.toSchema(getRowType());
        return ((PCollection) innerJoin.apply("JoinParts2WholeRow", MapElements.via((SimpleFunction) new BeamJoinTransforms.JoinParts2WholeRow(schema3)))).setRowSchema(schema3);
    }

    public PCollection<Row> sideInputJoin(PCollection<KV<Row, Row>> pCollection, PCollection<KV<Row, Row>> pCollection2, Schema schema, Schema schema2) {
        PCollection<KV<Row, Row>> valueCoder;
        Row nullRow;
        boolean z = pCollection.isBounded() == PCollection.IsBounded.BOUNDED;
        JoinRelType joinRelType = (!z || this.joinType == JoinRelType.INNER) ? this.joinType : JoinRelType.LEFT;
        PCollection<KV<Row, Row>> pCollection3 = z ? pCollection2 : pCollection;
        PCollection<KV<Row, Row>> pCollection4 = z ? pCollection : pCollection2;
        if (z) {
            Schema buildNullSchema = buildNullSchema(schema);
            valueCoder = setValueCoder(pCollection4, SchemaCoder.of(buildNullSchema));
            nullRow = Row.nullRow(buildNullSchema);
        } else {
            Schema buildNullSchema2 = buildNullSchema(schema2);
            valueCoder = setValueCoder(pCollection4, SchemaCoder.of(buildNullSchema2));
            nullRow = Row.nullRow(buildNullSchema2);
        }
        return sideInputJoinHelper(joinRelType, pCollection3, valueCoder, nullRow, z);
    }

    private PCollection<Row> sideInputJoinHelper(JoinRelType joinRelType, PCollection<KV<Row, Row>> pCollection, PCollection<KV<Row, Row>> pCollection2, Row row, boolean z) {
        PCollectionView<?> pCollectionView = (PCollectionView) pCollection2.apply(View.asMultimap());
        Schema schema = CalciteUtils.toSchema(getRowType());
        return ((PCollection) pCollection.apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn(joinRelType, row, pCollectionView, z, schema)).withSideInputs(pCollectionView))).setRowSchema(schema);
    }

    private Schema buildNullSchema(Schema schema) {
        Schema.Builder builder = Schema.builder();
        builder.addFields((List<Schema.Field>) schema.getFields().stream().map(field -> {
            return field.withNullable(true);
        }).collect(Collectors.toList()));
        return builder.build();
    }

    private static <K, V> PCollection<KV<K, V>> setValueCoder(PCollection<KV<K, V>> pCollection, Coder<V> coder) {
        return pCollection.setCoder(KvCoder.of(((KvCoder) pCollection.getCoder()).getKeyCoder(), coder));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Schema.Field getFieldBasedOnRexNode(Schema schema, RexNode rexNode, int i) {
        if (rexNode instanceof RexInputRef) {
            return schema.getField(((RexInputRef) rexNode).getIndex() - i);
        }
        if (rexNode instanceof RexFieldAccess) {
            return getFieldBasedOnRexFieldAccess(schema, (RexFieldAccess) rexNode, i);
        }
        throw new UnsupportedOperationException("Does not support " + rexNode.getType() + " in JOIN.");
    }

    private static Schema.Field getFieldBasedOnRexFieldAccess(Schema schema, RexFieldAccess rexFieldAccess, int i) {
        ArrayDeque arrayDeque = new ArrayDeque();
        arrayDeque.push(rexFieldAccess);
        RexFieldAccess rexFieldAccess2 = rexFieldAccess;
        while (rexFieldAccess2.getReferenceExpr() instanceof RexFieldAccess) {
            rexFieldAccess2 = (RexFieldAccess) rexFieldAccess2.getReferenceExpr();
            arrayDeque.push(rexFieldAccess2);
        }
        if (!(rexFieldAccess2.getReferenceExpr() instanceof RexInputRef)) {
            throw new UnsupportedOperationException("Does not support " + rexFieldAccess2.getReferenceExpr().getType() + " in JOIN.");
        }
        Schema.Field field = schema.getField(((RexInputRef) rexFieldAccess2.getReferenceExpr()).getIndex() - i);
        while (true) {
            Schema.Field field2 = field;
            if (arrayDeque.size() <= 0) {
                return field2;
            }
            field = field2.getType().getRowSchema().getField(((RexFieldAccess) arrayDeque.pop()).getField().getIndex());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<Pair<RexNode, RexNode>> extractJoinRexNodes() {
        if ((this.condition instanceof RexLiteral) && ((Boolean) ((RexLiteral) this.condition).getValue()).booleanValue()) {
            throw new UnsupportedOperationException("CROSS JOIN is not supported!");
        }
        RexCall rexCall = (RexCall) this.condition;
        ArrayList arrayList = new ArrayList();
        if ("AND".equals(rexCall.getOperator().getName())) {
            Iterator<RexNode> it = rexCall.getOperands().iterator();
            while (it.hasNext()) {
                arrayList.add(extractJoinPairOfRexNodes((RexCall) it.next()));
            }
        } else {
            if (!"=".equals(rexCall.getOperator().getName())) {
                throw new UnsupportedOperationException("Operator " + rexCall.getOperator().getName() + " is not supported in join condition");
            }
            arrayList.add(extractJoinPairOfRexNodes(rexCall));
        }
        return arrayList;
    }

    private Pair<RexNode, RexNode> extractJoinPairOfRexNodes(RexCall rexCall) {
        if (!rexCall.getOperator().getName().equals("=")) {
            throw new UnsupportedOperationException("Non equi-join is not supported");
        }
        if (isIllegalJoinConjunctionClause(rexCall)) {
            throw new UnsupportedOperationException("Only support column reference or struct field access in conjunction clause");
        }
        return getColumnIndex(rexCall.getOperands().get(0)) < getColumnIndex(rexCall.getOperands().get(1)) ? new Pair<>(rexCall.getOperands().get(0), rexCall.getOperands().get(1)) : new Pair<>(rexCall.getOperands().get(1), rexCall.getOperands().get(0));
    }

    private boolean isIllegalJoinConjunctionClause(RexCall rexCall) {
        return (((rexCall.getOperands().get(0) instanceof RexInputRef) || (rexCall.getOperands().get(0) instanceof RexFieldAccess)) && ((rexCall.getOperands().get(1) instanceof RexInputRef) || (rexCall.getOperands().get(1) instanceof RexFieldAccess))) ? false : true;
    }

    private int getColumnIndex(RexNode rexNode) {
        if (rexNode instanceof RexInputRef) {
            return ((RexInputRef) rexNode).getIndex();
        }
        if (rexNode instanceof RexFieldAccess) {
            return getColumnIndex(((RexFieldAccess) rexNode).getReferenceExpr());
        }
        throw new UnsupportedOperationException("Cannot get column index from " + rexNode.getType());
    }

    private boolean seekable(BeamRelNode beamRelNode) {
        return (beamRelNode instanceof BeamIOSourceRel) && (((BeamIOSourceRel) beamRelNode).getBeamSqlTable() instanceof BeamSqlSeekableTable);
    }
}
