package cascading.cascade;

import cascading.flow.Flow;
import cascading.flow.FlowElement;
import cascading.tap.CompositeTap;
import cascading.tap.Hfs;
import cascading.tap.Tap;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import cascading.util.Util;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;
import org.jgrapht.Graphs;
import org.jgrapht.graph.SimpleDirectedGraph;
import org.jgrapht.traverse.TopologicalOrderIterator;
import org.slf4j.Marker;

/* loaded from: input_file:cascading/cascade/CascadeConnector.class */
public class CascadeConnector {
    private static final Logger LOG = Logger.getLogger(CascadeConnector.class);
    private Map<Object, Object> properties;

    /* loaded from: input_file:cascading/cascade/CascadeConnector$RootTap.class */
    static class RootTap extends Tap {
        private static final long serialVersionUID = 1;

        RootTap() {
        }

        @Override // cascading.tap.Tap
        public Path getPath() {
            return null;
        }

        @Override // cascading.tap.Tap
        public boolean makeDirs(JobConf jobConf) throws IOException {
            return false;
        }

        @Override // cascading.tap.Tap
        public boolean deletePath(JobConf jobConf) throws IOException {
            return false;
        }

        @Override // cascading.tap.Tap
        public boolean pathExists(JobConf jobConf) throws IOException {
            return false;
        }

        @Override // cascading.tap.Tap
        public long getPathModified(JobConf jobConf) throws IOException {
            return 0L;
        }

        @Override // cascading.tap.Tap
        public TupleEntryIterator openForRead(JobConf jobConf) throws IOException {
            return null;
        }

        @Override // cascading.tap.Tap
        public TupleEntryCollector openForWrite(JobConf jobConf) throws IOException {
            return null;
        }
    }

    public CascadeConnector() {
    }

    @ConstructorProperties({"properties"})
    public CascadeConnector(Map<Object, Object> map) {
        this.properties = map;
    }

    public Cascade connect(Collection<Flow> collection) {
        return connect((String) null, (Flow[]) collection.toArray(new Flow[collection.size()]));
    }

    public Cascade connect(String str, Collection<Flow> collection) {
        return connect(str, (Flow[]) collection.toArray(new Flow[collection.size()]));
    }

    public Cascade connect(Flow... flowArr) {
        return connect((String) null, flowArr);
    }

    public Cascade connect(String str, Flow... flowArr) {
        verifyUniqueFlowNames(flowArr);
        String makeName = str == null ? makeName(flowArr) : str;
        SimpleDirectedGraph<String, Flow.FlowHolder> simpleDirectedGraph = new SimpleDirectedGraph<>((Class<? extends Flow.FlowHolder>) Flow.FlowHolder.class);
        SimpleDirectedGraph<Flow, Integer> simpleDirectedGraph2 = new SimpleDirectedGraph<>((Class<? extends Integer>) Integer.class);
        makeTapGraph(simpleDirectedGraph, flowArr);
        makeFlowGraph(simpleDirectedGraph2, simpleDirectedGraph);
        verifyNoCycles(simpleDirectedGraph2);
        return new Cascade(makeName, this.properties, simpleDirectedGraph2, simpleDirectedGraph);
    }

    private void verifyUniqueFlowNames(Flow[] flowArr) {
        HashSet hashSet = new HashSet();
        for (Flow flow : flowArr) {
            if (hashSet.contains(flow.getName())) {
                throw new CascadeException("all flow names must be unique, found duplicate: " + flow.getName());
            }
            hashSet.add(flow.getName());
        }
    }

    private String makeName(Flow[] flowArr) {
        String[] strArr = new String[flowArr.length];
        for (int i = 0; i < flowArr.length; i++) {
            strArr[i] = flowArr[i].getName();
        }
        return Util.join(strArr, Marker.ANY_NON_NULL_MARKER);
    }

    private void verifyNoCycles(SimpleDirectedGraph<Flow, Integer> simpleDirectedGraph) {
        HashSet hashSet = new HashSet();
        TopologicalOrderIterator topologicalOrderIterator = new TopologicalOrderIterator(simpleDirectedGraph);
        while (topologicalOrderIterator.hasNext()) {
            hashSet.add(topologicalOrderIterator.next());
        }
        if (hashSet.size() != simpleDirectedGraph.vertexSet().size()) {
            throw new CascadeException("there are likely cycles in the set of given flows, topological iterator cannot traverse flows with cycles");
        }
    }

    private void makeTapGraph(SimpleDirectedGraph<String, Flow.FlowHolder> simpleDirectedGraph, Flow[] flowArr) {
        for (Flow flow : flowArr) {
            LinkedList<Tap> linkedList = new LinkedList<>(flow.getSourcesCollection());
            LinkedList<Tap> linkedList2 = new LinkedList<>(flow.getSinksCollection());
            unwrapCompositeTaps(linkedList);
            unwrapCompositeTaps(linkedList2);
            Iterator<Tap> it = linkedList.iterator();
            while (it.hasNext()) {
                simpleDirectedGraph.addVertex(getFullPath(flow, it.next()));
            }
            Iterator<Tap> it2 = linkedList2.iterator();
            while (it2.hasNext()) {
                simpleDirectedGraph.addVertex(getFullPath(flow, it2.next()));
            }
            Iterator<Tap> it3 = linkedList.iterator();
            while (it3.hasNext()) {
                Tap next = it3.next();
                Iterator<Tap> it4 = linkedList2.iterator();
                while (it4.hasNext()) {
                    addEdgeFor(simpleDirectedGraph, flow, next, it4.next());
                }
            }
        }
    }

    private void addEdgeFor(SimpleDirectedGraph<String, Flow.FlowHolder> simpleDirectedGraph, Flow flow, Tap tap, Tap tap2) {
        try {
            simpleDirectedGraph.addEdge(getFullPath(flow, tap), getFullPath(flow, tap2), flow.getHolder());
        } catch (IllegalArgumentException e) {
            throw new CascadeException("no loops allowed in cascade, flow: " + flow.getName() + ", source: " + tap + ", sink: " + tap2);
        }
    }

    private String getFullPath(Flow flow, Tap tap) {
        String identifier = tap.getIdentifier();
        if (tap instanceof Hfs) {
            try {
                identifier = ((Hfs) tap).getQualifiedPath(flow.getJobConf()).toString();
            } catch (IOException e) {
                throw new CascadeException("could not get fully qualified path for: " + tap);
            }
        }
        return identifier;
    }

    private void unwrapCompositeTaps(LinkedList<Tap> linkedList) {
        ListIterator<Tap> listIterator = linkedList.listIterator();
        while (listIterator.hasNext()) {
            FlowElement flowElement = (Tap) listIterator.next();
            if (flowElement instanceof CompositeTap) {
                listIterator.remove();
                for (Tap tap : ((CompositeTap) flowElement).getChildTaps()) {
                    listIterator.add(tap);
                    listIterator.previous();
                }
            }
        }
    }

    private void makeFlowGraph(SimpleDirectedGraph<Flow, Integer> simpleDirectedGraph, SimpleDirectedGraph<String, Flow.FlowHolder> simpleDirectedGraph2) {
        int i = 0;
        for (String str : simpleDirectedGraph2.vertexSet()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("handling flow source: " + str);
            }
            for (String str2 : Graphs.successorListOf(simpleDirectedGraph2, str)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("handling flow path: " + str + " -> " + str2);
                }
                Flow flow = simpleDirectedGraph2.getEdge(str, str2).flow;
                simpleDirectedGraph.addVertex(flow);
                for (Flow.FlowHolder flowHolder : simpleDirectedGraph2.incomingEdgesOf(str)) {
                    simpleDirectedGraph.addVertex(flowHolder.flow);
                    if (simpleDirectedGraph.getEdge(flowHolder.flow, flow) == null) {
                        int i2 = i;
                        i++;
                        if (!simpleDirectedGraph.addEdge(flowHolder.flow, flow, Integer.valueOf(i2))) {
                            throw new CascadeException("unable to add path between: " + flowHolder.flow.getName() + " and: " + flow.getName());
                        }
                    }
                }
            }
        }
    }
}
