package gr.ilsp.fmc.workflows;

import bixo.config.DefaultFetchJobPolicy;
import bixo.config.FetcherPolicy;
import bixo.config.UserAgent;
import bixo.datum.FetchedDatum;
import bixo.datum.StatusDatum;
import bixo.datum.UrlStatus;
import bixo.fetcher.SimpleHttpFetcher;
import bixo.operations.FixedScoreGenerator;
import bixo.operations.UrlLengthener;
import bixo.pipes.FetchPipe;
import bixo.robots.RobotUtils;
import bixo.urls.BaseUrlFilter;
import bixo.urls.SimpleUrlNormalizer;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.cogroup.LeftJoin;
import cascading.pipe.cogroup.RightJoin;
import cascading.scheme.SequenceFile;
import cascading.tap.Hfs;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
import com.bixolabs.cascading.BaseSplitter;
import com.bixolabs.cascading.HadoopUtils;
import com.bixolabs.cascading.SplitterAssembly;
import gr.ilsp.fmc.classifier.Classifier;
import gr.ilsp.fmc.datums.ClassifierDatum;
import gr.ilsp.fmc.datums.CrawlDbDatum;
import gr.ilsp.fmc.datums.ExtendedParsedDatum;
import gr.ilsp.fmc.datums.ExtendedUrlDatum;
import gr.ilsp.fmc.main.SimpleCrawlHFS;
import gr.ilsp.fmc.main.SimpleCrawlHFSOptions;
import gr.ilsp.fmc.operations.CreateCrawlDbDatumFromUrlFunction;
import gr.ilsp.fmc.operations.CreateUrlDatumFromCrawlDbFunction;
import gr.ilsp.fmc.operations.CreateUrlDatumFromStatusFunction;
import gr.ilsp.fmc.operations.ExtendedNormalizeUrlFunction;
import gr.ilsp.fmc.operations.MakeDistinctCrawlDbFunction;
import gr.ilsp.fmc.operations.SelectUrlOnlyFunction;
import gr.ilsp.fmc.parser.ExtendedUrlFilter;
import gr.ilsp.fmc.parser.RobotRulesParser;
import gr.ilsp.fmc.parser.SimpleNoLinksParser;
import gr.ilsp.fmc.pipes.ClassifierPipe;
import gr.ilsp.fmc.pipes.ExtendedParsePipe;
import gr.ilsp.fmc.utils.CrawlConfig;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Properties;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;

/* loaded from: input_file:gr/ilsp/fmc/workflows/SimpleCrawlHFSWorkflow.class */
public class SimpleCrawlHFSWorkflow {
    private static String _subfilter;
    private static String _inithost;
    private static String _mainhost;
    private static String _targlang;
    private static String _type;
    private static final Logger LOGGER = Logger.getLogger(SimpleCrawlHFSWorkflow.class);
    private static final int BUFFER_SIZE = SimpleCrawlHFS.config.getInt("fetcher.fetch_buffer_size.value");
    private static long _numSelected = 0;
    private static HashMap<Integer, Integer> hostsMap = new HashMap<>();
    private static HashMap<Integer, Integer> hostsIpMap = new HashMap<>();
    private static final HashSet<String> statusSet = new HashSet<>(Arrays.asList("SKIPPED_DEFERRED", "SKIPPED_BY_SCORER", "SKIPPED_BY_SCORE", "SKIPPED_TIME_LIMIT", "SKIPPED_INTERRUPTED", "SKIPPED_INEFFICIENT", "SKIPPED_PER_SERVER_LIMIT", "UNFETCHED"));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:gr/ilsp/fmc/workflows/SimpleCrawlHFSWorkflow$ScoreComparator.class */
    public static class ScoreComparator implements Comparator<Double>, Serializable {
        private static final long serialVersionUID = -1057396789369463851L;

        private ScoreComparator() {
        }

        @Override // java.util.Comparator
        public int compare(Double d, Double d2) {
            return d2.compareTo(d);
        }

        /* synthetic */ ScoreComparator(ScoreComparator scoreComparator) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:gr/ilsp/fmc/workflows/SimpleCrawlHFSWorkflow$SplitFetchedUnfetchedCrawlDatums.class */
    public static class SplitFetchedUnfetchedCrawlDatums extends BaseSplitter {
        private static final long serialVersionUID = -5255131937144107833L;
        private static final int urlsPerServer = SimpleCrawlHFS.config.getInt("fetcher.max_fetched_per_host.value");
        private static final int urlsPerServerPerRun = SimpleCrawlHFS.config.getInt("fetcher.max_requests_per_host_per_run.value");

        private SplitFetchedUnfetchedCrawlDatums() {
        }

        @Override // com.bixolabs.cascading.BaseSplitter
        public String getLHSName() {
            return "fetched unfetched UrlDatums";
        }

        @Override // com.bixolabs.cascading.BaseSplitter
        public boolean isLHS(TupleEntry tupleEntry) {
            int i;
            URL url;
            if (SimpleCrawlHFSWorkflow._numSelected >= SimpleCrawlHFSWorkflow.BUFFER_SIZE) {
                return false;
            }
            CrawlDbDatum crawlDbDatum = new CrawlDbDatum(tupleEntry);
            UrlStatus lastStatus = crawlDbDatum.getLastStatus();
            Integer num = null;
            String str = null;
            int i2 = 0;
            try {
                url = new URL(crawlDbDatum.getUrl());
                str = url.getHost();
                if (SimpleCrawlHFSWorkflow._type.equals("p") && SimpleCrawlHFSWorkflow._inithost != null && SimpleCrawlHFSWorkflow._mainhost != null) {
                    String str2 = String.valueOf(url.getAuthority()) + url.getFile();
                    String substring = str2.substring(0, str2.indexOf("/"));
                    if (substring.substring(0, 4).equals("www2") || substring.substring(0, 4).equals("www5")) {
                        substring = substring.substring(5);
                    } else if (substring.substring(0, 3).equals("www")) {
                        substring = substring.substring(4);
                    }
                    int indexOf = substring.toString().indexOf(SimpleCrawlHFSWorkflow._inithost);
                    int indexOf2 = substring.toString().indexOf(SimpleCrawlHFSWorkflow._mainhost);
                    if (indexOf > 3 || indexOf2 < 0 || indexOf2 > 3) {
                        return false;
                    }
                    if (indexOf > 0) {
                        String substring2 = substring.substring(0, indexOf);
                        String[] split = SimpleCrawlHFSWorkflow._targlang.split(";");
                        boolean z = false;
                        int i3 = 0;
                        while (true) {
                            if (i3 >= split.length) {
                                break;
                            }
                            if (substring2.contains(split[i3])) {
                                z = true;
                                break;
                            }
                            i3++;
                        }
                        if (!z) {
                            return false;
                        }
                    }
                }
            } catch (MalformedURLException e) {
                SimpleCrawlHFSWorkflow.LOGGER.error(e.getMessage());
            }
            if (SimpleCrawlHFSWorkflow._subfilter != null && !(String.valueOf(url.getAuthority()) + url.getFile()).matches(SimpleCrawlHFSWorkflow._subfilter)) {
                return false;
            }
            i2 = url.getHost().hashCode();
            num = (Integer) SimpleCrawlHFSWorkflow.hostsMap.get(Integer.valueOf(i2));
            if (!SimpleCrawlHFSWorkflow.statusSet.contains(lastStatus.name())) {
                if (lastStatus != UrlStatus.FETCHED) {
                    return false;
                }
                if (num == null) {
                    SimpleCrawlHFSWorkflow.hostsMap.put(Integer.valueOf(i2), 1);
                    return false;
                }
                SimpleCrawlHFSWorkflow.hostsMap.put(Integer.valueOf(i2), Integer.valueOf(num.intValue() + 1));
                return false;
            }
            if (num == null) {
                SimpleCrawlHFSWorkflow.hostsMap.put(Integer.valueOf(i2), 1);
            } else {
                Integer valueOf = Integer.valueOf(num.intValue() + 1);
                SimpleCrawlHFSWorkflow.hostsMap.put(Integer.valueOf(i2), valueOf);
                if (valueOf.intValue() > urlsPerServer) {
                    return false;
                }
            }
            try {
                i = InetAddress.getByName(str).getHostAddress().hashCode();
            } catch (UnknownHostException e2) {
                i = i2;
            }
            Integer num2 = (Integer) SimpleCrawlHFSWorkflow.hostsIpMap.get(Integer.valueOf(i));
            if (num2 == null) {
                SimpleCrawlHFSWorkflow.hostsIpMap.put(Integer.valueOf(i), 1);
            } else {
                if (num2.intValue() >= urlsPerServerPerRun) {
                    return false;
                }
                SimpleCrawlHFSWorkflow.hostsIpMap.put(Integer.valueOf(i), Integer.valueOf(num2.intValue() + 1));
            }
            SimpleCrawlHFSWorkflow._numSelected++;
            return true;
        }

        /* synthetic */ SplitFetchedUnfetchedCrawlDatums(SplitFetchedUnfetchedCrawlDatums splitFetchedUnfetchedCrawlDatums) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:gr/ilsp/fmc/workflows/SimpleCrawlHFSWorkflow$StatusComparator.class */
    public static class StatusComparator implements Comparator<String>, Serializable {
        private static final String FETCHED_STR = "FETCHED";
        private static final long serialVersionUID = -5250304580989068398L;

        private StatusComparator() {
        }

        @Override // java.util.Comparator
        public int compare(String str, String str2) {
            if (str.equals(FETCHED_STR) && str2.equals(FETCHED_STR)) {
                return 0;
            }
            if (str.equals(FETCHED_STR)) {
                return -1;
            }
            if (str2.equals(FETCHED_STR)) {
                return 1;
            }
            if (SimpleCrawlHFSWorkflow.statusSet.contains(str) && SimpleCrawlHFSWorkflow.statusSet.contains(str2)) {
                return 0;
            }
            return str.compareTo(str2);
        }

        /* synthetic */ StatusComparator(StatusComparator statusComparator) {
            this();
        }
    }

    public static void resetCounters() {
        hostsMap = new HashMap<>();
        hostsIpMap = new HashMap<>();
        _numSelected = 0L;
    }

    public static Flow createFlow(Path path, Path path2, UserAgent userAgent, FetcherPolicy fetcherPolicy, BaseUrlFilter baseUrlFilter, String[] strArr, ArrayList<String[]> arrayList, double d, double d2, int i, int i2, SimpleCrawlHFSOptions simpleCrawlHFSOptions) throws Throwable {
        return createFlow071(path, path2, userAgent, fetcherPolicy, baseUrlFilter, strArr, arrayList, d, d2, i, i2, simpleCrawlHFSOptions);
    }

    public static Flow createFlow060(Path path, Path path2, UserAgent userAgent, FetcherPolicy fetcherPolicy, BaseUrlFilter baseUrlFilter, String[] strArr, ArrayList<String[]> arrayList, double d, double d2, int i, int i2, SimpleCrawlHFSOptions simpleCrawlHFSOptions) throws Throwable {
        int threads = simpleCrawlHFSOptions.getThreads();
        boolean isDebug = simpleCrawlHFSOptions.isDebug();
        boolean keepBoiler = simpleCrawlHFSOptions.keepBoiler();
        String filter = simpleCrawlHFSOptions.getFilter();
        String domain = simpleCrawlHFSOptions.getDomain();
        int tokensNumber = simpleCrawlHFSOptions.getTokensNumber();
        _subfilter = filter;
        _inithost = domain;
        String language = simpleCrawlHFSOptions.getLanguage();
        _targlang = language;
        _type = simpleCrawlHFSOptions.getType();
        String[] langKeys = simpleCrawlHFSOptions.getLangKeys();
        JobConf jobConf = SimpleCrawlHFS.conf;
        int numReduceTasks = jobConf.getNumReduceTasks() * HadoopUtils.getTaskTrackers(jobConf);
        Properties defaultProperties = HadoopUtils.getDefaultProperties(SimpleCrawlWorkflow.class, isDebug, jobConf);
        if (!path.getFileSystem(jobConf).exists(path2)) {
            throw new IllegalStateException(String.format("Input directory %s doesn't exist", path2));
        }
        Hfs hfs = new Hfs(new SequenceFile(CrawlDbDatum.FIELDS), path2.toString());
        Pipe pipe = new Pipe("url importer");
        Fields append = new Fields(CrawlDbDatum.LAST_STATUS_FIELD).append(new Fields(CrawlDbDatum.SCORE));
        StatusComparator statusComparator = new StatusComparator(null);
        ScoreComparator scoreComparator = new ScoreComparator(null);
        append.setComparator(CrawlDbDatum.LAST_STATUS_FIELD, statusComparator);
        append.setComparator(CrawlDbDatum.SCORE, scoreComparator);
        GroupBy groupBy = new GroupBy(pipe, append);
        SplitterAssembly splitterAssembly = new SplitterAssembly(groupBy, new SplitFetchedUnfetchedCrawlDatums(null));
        Pipe pipe2 = new Pipe("finished urls", groupBy);
        Each each = new Each(new Pipe("urls to Fetch", splitterAssembly.getLHSPipe()), new CreateUrlDatumFromCrawlDbFunction());
        Hfs hfs2 = new Hfs(new SequenceFile(CrawlDbDatum.FIELDS), new Path(path, CrawlConfig.CRAWLDB_SUBDIR_NAME).toString());
        Hfs hfs3 = new Hfs(new SequenceFile(FetchedDatum.FIELDS), new Path(path, CrawlConfig.CONTENT_SUBDIR_NAME).toString());
        Hfs hfs4 = new Hfs(new SequenceFile(ExtendedParsedDatum.FIELDS), new Path(path, CrawlConfig.PARSE_SUBDIR_NAME).toString());
        Hfs hfs5 = new Hfs(new SequenceFile(ClassifierDatum.FIELDS), new Path(path, CrawlConfig.CLASSIFIER_SUBDIR_NAME).toString());
        SimpleHttpFetcher simpleHttpFetcher = new SimpleHttpFetcher(threads, fetcherPolicy, userAgent);
        simpleHttpFetcher.setConnectionTimeout(SimpleCrawlHFS.config.getInt("fetcher.connection_timeout.value"));
        simpleHttpFetcher.setSocketTimeout(SimpleCrawlHFS.config.getInt("fetcher.socket_timeout.value"));
        simpleHttpFetcher.setMaxRetryCount(SimpleCrawlHFS.config.getInt("fetcher.max_retry_count.value"));
        FetchPipe fetchPipe = new FetchPipe(each, new FixedScoreGenerator(), simpleHttpFetcher, RobotUtils.createFetcher(simpleHttpFetcher), new RobotRulesParser(), new DefaultFetchJobPolicy(fetcherPolicy.getMaxRequestsPerConnection(), SimpleCrawlHFS.config.getInt("fetcher.max_requests_per_host_per_run.value"), fetcherPolicy.getCrawlDelay()), numReduceTasks);
        Pipe pipe3 = new Pipe("content pipe", fetchPipe.getContentTailPipe());
        ExtendedParsePipe extendedParsePipe = new ExtendedParsePipe(pipe3, new SimpleNoLinksParser(keepBoiler, path.getParent().toString()));
        ClassifierPipe classifierPipe = new ClassifierPipe(extendedParsePipe.getTailPipe(), new Classifier(langKeys, language, strArr, arrayList, d, d2, keepBoiler, i, i2, tokensNumber));
        Each each2 = new Each(new Pipe("urls from classifier", classifierPipe.getClassifierTailPipe()), new SelectUrlOnlyFunction());
        CoGroup coGroup = new CoGroup(pipe3, new Fields(ExtendedUrlDatum.URL_FN), each2, new Fields(ExtendedUrlDatum.URL_FN), FetchedDatum.FIELDS.append(new Fields(UrlLengthener.URL_FN)), new RightJoin());
        CoGroup coGroup2 = new CoGroup(extendedParsePipe, new Fields(ExtendedUrlDatum.URL_FN), each2, new Fields(ExtendedUrlDatum.URL_FN), ExtendedParsedDatum.FIELDS.append(new Fields(UrlLengthener.URL_FN)), new RightJoin());
        Each each3 = new Each(new Each(new Pipe("url from outlinks", classifierPipe.getScoredLinksTailPipe()), new ExtendedUrlFilter(baseUrlFilter)), new ExtendedNormalizeUrlFunction(new SimpleUrlNormalizer()));
        Every every = new Every(new GroupBy(Pipe.pipes(pipe2, new Each(new Each(new CoGroup(new Pipe("fetched pipe", fetchPipe.getStatusTailPipe()), new Fields(ExtendedUrlDatum.URL_FN), classifierPipe.getClassifierTailPipe(), new Fields(ExtendedUrlDatum.URL_FN), StatusDatum.FIELDS.append(ClassifierDatum.FIELDS.rename(new Fields(ClassifierDatum.URL_FN), new Fields("classifier_url")).rename(new Fields(ClassifierDatum.PAYLOAD_FN), new Fields("classifier_payload"))), new LeftJoin()), new CreateUrlDatumFromStatusFunction()), new CreateCrawlDbDatumFromUrlFunction()), new Each(each3, new CreateCrawlDbDatumFromUrlFunction())), new Fields(CrawlDbDatum.URL_FIELD)), new MakeDistinctCrawlDbFunction(), Fields.RESULTS);
        HashMap hashMap = new HashMap();
        hashMap.put(coGroup.getName(), hfs3);
        hashMap.put(coGroup2.getName(), hfs4);
        hashMap.put(ClassifierPipe.CLASSIFIER_PIPE_NAME, hfs5);
        hashMap.put(every.getName(), hfs2);
        return new FlowConnector(defaultProperties).connect(hfs, hashMap, coGroup, coGroup2, classifierPipe.getClassifierTailPipe(), every);
    }

    public static Flow createFlow071(Path path, Path path2, UserAgent userAgent, FetcherPolicy fetcherPolicy, BaseUrlFilter baseUrlFilter, String[] strArr, ArrayList<String[]> arrayList, double d, double d2, int i, int i2, SimpleCrawlHFSOptions simpleCrawlHFSOptions) throws Throwable {
        int threads = simpleCrawlHFSOptions.getThreads();
        boolean isDebug = simpleCrawlHFSOptions.isDebug();
        boolean keepBoiler = simpleCrawlHFSOptions.keepBoiler();
        String filter = simpleCrawlHFSOptions.getFilter();
        String domain = simpleCrawlHFSOptions.getDomain();
        int tokensNumber = simpleCrawlHFSOptions.getTokensNumber();
        _subfilter = filter;
        _inithost = domain;
        _mainhost = simpleCrawlHFSOptions.getMainDomain();
        String language = simpleCrawlHFSOptions.getLanguage();
        _targlang = language;
        _type = simpleCrawlHFSOptions.getType();
        String[] langKeys = simpleCrawlHFSOptions.getLangKeys();
        JobConf jobConf = SimpleCrawlHFS.conf;
        int numReduceTasks = jobConf.getNumReduceTasks() * HadoopUtils.getTaskTrackers(jobConf);
        Properties defaultProperties = HadoopUtils.getDefaultProperties(SimpleCrawlWorkflow.class, isDebug, jobConf);
        if (!path.getFileSystem(jobConf).exists(path2)) {
            throw new IllegalStateException(String.format("Input directory %s doesn't exist", path2));
        }
        Hfs hfs = new Hfs(new SequenceFile(CrawlDbDatum.FIELDS), path2.toString());
        Pipe pipe = new Pipe("url importer");
        Fields append = new Fields(CrawlDbDatum.LAST_STATUS_FIELD).append(new Fields(CrawlDbDatum.SCORE));
        StatusComparator statusComparator = new StatusComparator(null);
        ScoreComparator scoreComparator = new ScoreComparator(null);
        append.setComparator(CrawlDbDatum.LAST_STATUS_FIELD, statusComparator);
        append.setComparator(CrawlDbDatum.SCORE, scoreComparator);
        GroupBy groupBy = new GroupBy(pipe, append);
        SplitterAssembly splitterAssembly = new SplitterAssembly(groupBy, new SplitFetchedUnfetchedCrawlDatums(null));
        Pipe pipe2 = new Pipe("finished urls", groupBy);
        Each each = new Each(new Pipe("urls to Fetch", splitterAssembly.getLHSPipe()), new CreateUrlDatumFromCrawlDbFunction());
        Hfs hfs2 = new Hfs(new SequenceFile(CrawlDbDatum.FIELDS), new Path(path, CrawlConfig.CRAWLDB_SUBDIR_NAME).toString());
        Hfs hfs3 = new Hfs(new SequenceFile(FetchedDatum.FIELDS), new Path(path, CrawlConfig.CONTENT_SUBDIR_NAME).toString());
        Hfs hfs4 = new Hfs(new SequenceFile(ExtendedParsedDatum.FIELDS), new Path(path, CrawlConfig.PARSE_SUBDIR_NAME).toString());
        Hfs hfs5 = new Hfs(new SequenceFile(ClassifierDatum.FIELDS), new Path(path, CrawlConfig.CLASSIFIER_SUBDIR_NAME).toString());
        SimpleHttpFetcher simpleHttpFetcher = new SimpleHttpFetcher(threads, fetcherPolicy, userAgent);
        simpleHttpFetcher.setConnectionTimeout(SimpleCrawlHFS.config.getInt("fetcher.connection_timeout.value"));
        simpleHttpFetcher.setSocketTimeout(SimpleCrawlHFS.config.getInt("fetcher.socket_timeout.value"));
        simpleHttpFetcher.setMaxRetryCount(SimpleCrawlHFS.config.getInt("fetcher.max_retry_count.value"));
        FetchPipe fetchPipe = new FetchPipe(each, new FixedScoreGenerator(), simpleHttpFetcher, RobotUtils.createFetcher(simpleHttpFetcher), new RobotRulesParser(), new DefaultFetchJobPolicy(fetcherPolicy.getMaxRequestsPerConnection(), SimpleCrawlHFS.config.getInt("fetcher.max_requests_per_host_per_run.value"), fetcherPolicy.getCrawlDelay()), numReduceTasks);
        Pipe pipe3 = new Pipe("content pipe", fetchPipe.getContentTailPipe());
        ExtendedParsePipe extendedParsePipe = new ExtendedParsePipe(pipe3, new SimpleNoLinksParser(keepBoiler, path.getParent().toString()));
        ClassifierPipe classifierPipe = new ClassifierPipe(extendedParsePipe.getTailPipe(), new Classifier(langKeys, language, strArr, arrayList, d, d2, keepBoiler, i, i2, tokensNumber));
        Each each2 = new Each(new Pipe("urls from classifier", classifierPipe.getClassifierTailPipe()), new SelectUrlOnlyFunction());
        Fields fields = new Fields(FetchedDatum.URL_FN);
        Fields fields2 = new Fields(StatusDatum.URL_FN);
        Fields fields3 = new Fields(ExtendedUrlDatum.URL_FN);
        Fields append2 = FetchedDatum.FIELDS.append(new Fields(UrlLengthener.URL_FN));
        Fields append3 = ExtendedParsedDatum.FIELDS.append(new Fields(UrlLengthener.URL_FN));
        CoGroup coGroup = new CoGroup(pipe3, fields, each2, fields3, append2, new RightJoin());
        CoGroup coGroup2 = new CoGroup(extendedParsePipe, fields3, each2, fields3, append3, new RightJoin());
        Every every = new Every(new GroupBy(Pipe.pipes(pipe2, new Each(new Each(new CoGroup(new Pipe("fetched pipe", fetchPipe.getStatusTailPipe()), fields2, classifierPipe.getClassifierTailPipe(), fields3, StatusDatum.FIELDS.append(ClassifierDatum.FIELDS.rename(new Fields(ClassifierDatum.URL_FN), new Fields("classifier_url")).rename(new Fields(ClassifierDatum.PAYLOAD_FN), new Fields("classifier_payload"))), new LeftJoin()), new CreateUrlDatumFromStatusFunction()), new CreateCrawlDbDatumFromUrlFunction()), new Each(new Each(new Each(new Pipe("url from outlinks", classifierPipe.getScoredLinksTailPipe()), new ExtendedUrlFilter(baseUrlFilter, _mainhost)), new ExtendedNormalizeUrlFunction(new SimpleUrlNormalizer())), new CreateCrawlDbDatumFromUrlFunction())), new Fields(CrawlDbDatum.URL_FIELD)), new MakeDistinctCrawlDbFunction(), Fields.RESULTS);
        HashMap hashMap = new HashMap();
        hashMap.put(coGroup.getName(), hfs3);
        hashMap.put(coGroup2.getName(), hfs4);
        hashMap.put(ClassifierPipe.CLASSIFIER_PIPE_NAME, hfs5);
        hashMap.put(every.getName(), hfs2);
        return new FlowConnector(defaultProperties).connect(hfs, hashMap, coGroup, coGroup2, classifierPipe.getClassifierTailPipe(), every);
    }
}
