package gr.ilsp.fmc.workflows;

import bixo.config.FetcherPolicy;
import bixo.config.UserAgent;
import bixo.datum.FetchedDatum;
import bixo.datum.StatusDatum;
import bixo.datum.UrlStatus;
import bixo.fetcher.SimpleHttpFetcher;
import bixo.operations.FixedScoreGenerator;
import bixo.operations.UrlFilter;
import bixo.operations.UrlLengthener;
import bixo.pipes.FetchPipe;
import bixo.urls.BaseUrlFilter;
import bixo.urls.SimpleUrlNormalizer;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Buffer;
import cascading.operation.BufferCall;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.OperationCall;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.cogroup.LeftJoin;
import cascading.pipe.cogroup.RightJoin;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import com.bixolabs.cascading.BaseSplitter;
import com.bixolabs.cascading.HadoopUtils;
import com.bixolabs.cascading.NullContext;
import com.bixolabs.cascading.SplitterAssembly;
import gr.ilsp.fmc.classifier.Classifier;
import gr.ilsp.fmc.datums.ClassifierDatum;
import gr.ilsp.fmc.datums.CrawlDbDatum;
import gr.ilsp.fmc.datums.ExtendedParsedDatum;
import gr.ilsp.fmc.datums.ExtendedUrlDatum;
import gr.ilsp.fmc.main.SimpleCrawl;
import gr.ilsp.fmc.mysql.MYSQLTapFactory;
import gr.ilsp.fmc.operations.CreateCrawlDbDatumFromUrlFunction;
import gr.ilsp.fmc.operations.CreateUrlDatumFromStatusFunction;
import gr.ilsp.fmc.operations.ExtendedNormalizeUrlFunction;
import gr.ilsp.fmc.operations.SelectFetchedOnlyFunction;
import gr.ilsp.fmc.parser.SimpleNoLinksParser;
import gr.ilsp.fmc.pipes.ClassifierPipe;
import gr.ilsp.fmc.pipes.ExtendedParsePipe;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;

/* loaded from: input_file:gr/ilsp/fmc/workflows/SimpleCrawlWorkflow.class */
public class SimpleCrawlWorkflow {
    private static final Logger LOGGER = Logger.getLogger(SimpleCrawlWorkflow.class);

    /* loaded from: input_file:gr/ilsp/fmc/workflows/SimpleCrawlWorkflow$CreateUrlDatumFromCrawlDbFunction.class */
    private static class CreateUrlDatumFromCrawlDbFunction extends BaseOperation<NullContext> implements Function<NullContext> {
        public CreateUrlDatumFromCrawlDbFunction() {
            super(ExtendedUrlDatum.FIELDS);
        }

        @Override // cascading.operation.BaseOperation, cascading.operation.Operation
        public void prepare(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
            SimpleCrawlWorkflow.LOGGER.info("Starting creation of URLs from crawldb");
        }

        @Override // cascading.operation.BaseOperation, cascading.operation.Operation
        public void cleanup(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
            SimpleCrawlWorkflow.LOGGER.info("Ending creation of URLs from crawldb");
        }

        @Override // cascading.operation.Function
        public void operate(FlowProcess flowProcess, FunctionCall<NullContext> functionCall) {
            CrawlDbDatum crawlDbDatum = new CrawlDbDatum(functionCall.getArguments());
            ExtendedUrlDatum extendedUrlDatum = new ExtendedUrlDatum(crawlDbDatum.getUrl());
            extendedUrlDatum.setPayloadValue(CrawlDbDatum.LAST_FETCHED_FIELD, Long.valueOf(crawlDbDatum.getLastFetched()));
            extendedUrlDatum.setPayloadValue(CrawlDbDatum.LAST_UPDATED_FIELD, Long.valueOf(crawlDbDatum.getLastUpdated()));
            extendedUrlDatum.setPayloadValue(CrawlDbDatum.LAST_STATUS_FIELD, crawlDbDatum.getLastStatus().name());
            extendedUrlDatum.setPayloadValue(CrawlDbDatum.CRAWL_DEPTH, Integer.valueOf(crawlDbDatum.getCrawlDepth()));
            extendedUrlDatum.setScore(Double.valueOf(crawlDbDatum.getScore()));
            functionCall.getOutputCollector().add(extendedUrlDatum.getTuple());
        }
    }

    /* loaded from: input_file:gr/ilsp/fmc/workflows/SimpleCrawlWorkflow$DistinctMergedPipeFunction.class */
    private static class DistinctMergedPipeFunction extends BaseOperation<NullContext> implements Function<NullContext> {
        public DistinctMergedPipeFunction() {
            super(ExtendedUrlDatum.FIELDS);
        }

        @Override // cascading.operation.BaseOperation, cascading.operation.Operation
        public void prepare(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
            SimpleCrawlWorkflow.LOGGER.info("Starting processing outlinks to find only new ones");
        }

        @Override // cascading.operation.BaseOperation, cascading.operation.Operation
        public void cleanup(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
            SimpleCrawlWorkflow.LOGGER.info("Ending processing outlinks to find new ones.");
        }

        @Override // cascading.operation.Function
        public void operate(FlowProcess flowProcess, FunctionCall<NullContext> functionCall) {
            ExtendedUrlDatum extendedUrlDatum = new ExtendedUrlDatum(functionCall.getArguments());
            ExtendedUrlDatum extendedUrlDatum2 = new ExtendedUrlDatum(extendedUrlDatum.getUrl());
            if (extendedUrlDatum.getTupleEntry().get(3) == null) {
                extendedUrlDatum2.setPayload(extendedUrlDatum.getPayload());
                extendedUrlDatum2.setScore(extendedUrlDatum.getScore());
                functionCall.getOutputCollector().add(extendedUrlDatum2.getTuple());
            }
        }
    }

    /* loaded from: input_file:gr/ilsp/fmc/workflows/SimpleCrawlWorkflow$MakeDistinctFunction.class */
    private static class MakeDistinctFunction extends BaseOperation<NullContext> implements Buffer<NullContext> {
        private long _numSelected;

        public MakeDistinctFunction() {
            super(ExtendedUrlDatum.FIELDS);
            this._numSelected = 0L;
        }

        @Override // cascading.operation.BaseOperation, cascading.operation.Operation
        public void prepare(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
            SimpleCrawlWorkflow.LOGGER.info("Starting processing links to make unique");
        }

        @Override // cascading.operation.BaseOperation, cascading.operation.Operation
        public void cleanup(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
            SimpleCrawlWorkflow.LOGGER.info("Ending making unique links - dropped " + this._numSelected + " urls");
        }

        @Override // cascading.operation.Buffer
        public void operate(FlowProcess flowProcess, BufferCall<NullContext> bufferCall) {
            ExtendedUrlDatum extendedUrlDatum = null;
            Iterator<TupleEntry> argumentsIterator = bufferCall.getArgumentsIterator();
            double d = 0.0d;
            int i = 0;
            while (argumentsIterator.hasNext()) {
                i++;
                ExtendedUrlDatum extendedUrlDatum2 = new ExtendedUrlDatum(argumentsIterator.next());
                if (extendedUrlDatum == null) {
                    extendedUrlDatum = new ExtendedUrlDatum(extendedUrlDatum2.getUrl());
                    extendedUrlDatum.setPayloadValue(CrawlDbDatum.LAST_FETCHED_FIELD, extendedUrlDatum2.getPayloadValue(CrawlDbDatum.LAST_FETCHED_FIELD));
                    extendedUrlDatum.setPayloadValue(CrawlDbDatum.LAST_UPDATED_FIELD, extendedUrlDatum2.getPayloadValue(CrawlDbDatum.LAST_UPDATED_FIELD));
                    extendedUrlDatum.setPayloadValue(CrawlDbDatum.LAST_STATUS_FIELD, extendedUrlDatum2.getPayloadValue(CrawlDbDatum.LAST_STATUS_FIELD));
                    extendedUrlDatum.setPayloadValue(CrawlDbDatum.CRAWL_DEPTH, extendedUrlDatum2.getPayloadValue(CrawlDbDatum.CRAWL_DEPTH));
                    d += extendedUrlDatum2.getScore().doubleValue();
                } else {
                    d += extendedUrlDatum2.getScore().doubleValue();
                    this._numSelected++;
                }
            }
            extendedUrlDatum.setScore(Double.valueOf(d / i));
            if (extendedUrlDatum != null) {
                bufferCall.getOutputCollector().add(extendedUrlDatum.getTuple());
            }
        }
    }

    /* loaded from: input_file:gr/ilsp/fmc/workflows/SimpleCrawlWorkflow$SelectUrlOnlyFunction.class */
    private static class SelectUrlOnlyFunction extends BaseOperation<NullContext> implements Function<NullContext> {
        public SelectUrlOnlyFunction() {
            super(new Fields(ExtendedUrlDatum.URL_FN));
        }

        @Override // cascading.operation.Function
        public void operate(FlowProcess flowProcess, FunctionCall<NullContext> functionCall) {
            functionCall.getOutputCollector().add(new Tuple(functionCall.getArguments().get(ExtendedUrlDatum.URL_FN).toString()));
        }
    }

    /* loaded from: input_file:gr/ilsp/fmc/workflows/SimpleCrawlWorkflow$SplitFetchedUnfetchedCrawlDatums.class */
    private static class SplitFetchedUnfetchedCrawlDatums extends BaseSplitter {
        private long _numSelected;

        private SplitFetchedUnfetchedCrawlDatums() {
            this._numSelected = 0L;
        }

        @Override // com.bixolabs.cascading.BaseSplitter
        public String getLHSName() {
            return "fetched unfetched UrlDatums";
        }

        @Override // com.bixolabs.cascading.BaseSplitter
        public boolean isLHS(TupleEntry tupleEntry) {
            UrlStatus lastStatus = new CrawlDbDatum(tupleEntry).getLastStatus();
            if (this._numSelected >= 256) {
                return false;
            }
            if (lastStatus != UrlStatus.UNFETCHED && lastStatus != UrlStatus.SKIPPED_DEFERRED && lastStatus != UrlStatus.SKIPPED_BY_SCORER && lastStatus != UrlStatus.SKIPPED_BY_SCORE && lastStatus != UrlStatus.SKIPPED_TIME_LIMIT && lastStatus != UrlStatus.SKIPPED_INTERRUPTED && lastStatus != UrlStatus.SKIPPED_INEFFICIENT && lastStatus != UrlStatus.ABORTED_SLOW_RESPONSE && lastStatus != UrlStatus.ERROR_IOEXCEPTION) {
                return false;
            }
            this._numSelected++;
            return true;
        }

        /* synthetic */ SplitFetchedUnfetchedCrawlDatums(SplitFetchedUnfetchedCrawlDatums splitFetchedUnfetchedCrawlDatums) {
            this();
        }
    }

    public static Flow createFlow(Path path, Path path2, UserAgent userAgent, FetcherPolicy fetcherPolicy, BaseUrlFilter baseUrlFilter, int i, boolean z, String str, String str2, String str3, String[] strArr, String[] strArr2, ArrayList<String[]> arrayList, double d, double d2, int i2, int i3, int i4) throws Throwable {
        JobConf jobConf = new JobConf();
        jobConf.setJarByClass(SimpleCrawl.class);
        jobConf.setQuietMode(true);
        int numReduceTasks = jobConf.getNumReduceTasks() * HadoopUtils.getTaskTrackers(jobConf);
        if (!path2.getFileSystem(jobConf).exists(path)) {
            throw new IllegalStateException(String.format("Input directory %s doesn't exist", path));
        }
        Tap createUrlsSourceJDBCTap = MYSQLTapFactory.createUrlsSourceJDBCTap(str, str2);
        SplitterAssembly splitterAssembly = new SplitterAssembly(new Pipe("url importer"), new SplitFetchedUnfetchedCrawlDatums(null));
        Pipe pipe = new Pipe("finished urls", splitterAssembly.getRHSPipe());
        Each each = new Each(new Pipe("urls to Fetch", splitterAssembly.getLHSPipe()), new CreateUrlDatumFromCrawlDbFunction());
        Tap createContentSinkJDBCTap = MYSQLTapFactory.createContentSinkJDBCTap(str, str2);
        Tap createParseSinkJDBCTap = MYSQLTapFactory.createParseSinkJDBCTap(str, str2);
        Tap createClassifierSinkJDBCTap = MYSQLTapFactory.createClassifierSinkJDBCTap(str, str2);
        Tap createUrlsSinkJDBCTap = MYSQLTapFactory.createUrlsSinkJDBCTap(str, str2);
        FetchPipe fetchPipe = new FetchPipe(each, new FixedScoreGenerator(), new SimpleHttpFetcher(i, fetcherPolicy, userAgent), numReduceTasks);
        Pipe pipe2 = new Pipe("content pipe", fetchPipe.getContentTailPipe());
        ExtendedParsePipe extendedParsePipe = new ExtendedParsePipe(pipe2, new SimpleNoLinksParser());
        ClassifierPipe classifierPipe = new ClassifierPipe(extendedParsePipe.getTailPipe(), new Classifier(strArr, str3, strArr2, arrayList, d, d2, false, i2, i3, i4));
        Each each2 = new Each(new Pipe("urls from classifier", classifierPipe.getClassifierTailPipe()), new SelectUrlOnlyFunction());
        CoGroup coGroup = new CoGroup(pipe2, new Fields(ExtendedUrlDatum.URL_FN), each2, new Fields(ExtendedUrlDatum.URL_FN), FetchedDatum.FIELDS.append(new Fields(UrlLengthener.URL_FN)), new RightJoin());
        CoGroup coGroup2 = new CoGroup(extendedParsePipe, new Fields(ExtendedUrlDatum.URL_FN), each2, new Fields(ExtendedUrlDatum.URL_FN), ExtendedParsedDatum.FIELDS.append(new Fields(UrlLengthener.URL_FN)), new RightJoin());
        Every every = new Every(new GroupBy(new Each(new Each(new Pipe("url from outlinks", classifierPipe.getScoredLinksTailPipe()), new UrlFilter(baseUrlFilter)), new ExtendedNormalizeUrlFunction(new SimpleUrlNormalizer())), new Fields(ExtendedUrlDatum.URL_FN)), new MakeDistinctFunction(), Fields.RESULTS);
        Each each3 = new Each(new CoGroup(new Pipe("fetched pipe", fetchPipe.getStatusTailPipe()), new Fields(ExtendedUrlDatum.URL_FN), classifierPipe.getClassifierTailPipe(), new Fields(ExtendedUrlDatum.URL_FN), StatusDatum.FIELDS.append(ClassifierDatum.FIELDS.rename(new Fields(ClassifierDatum.URL_FN), new Fields("classifier_url")).rename(new Fields(ClassifierDatum.PAYLOAD_FN), new Fields("classifier_payload"))), new LeftJoin()), new CreateUrlDatumFromStatusFunction());
        Each each4 = new Each(new Each(each3, new CreateCrawlDbDatumFromUrlFunction()), new SelectFetchedOnlyFunction());
        Tap createUrlUpdateSink = MYSQLTapFactory.createUrlUpdateSink(str, str2);
        Each each5 = new Each(new Each(new CoGroup("url pipe", every, new Fields(ExtendedUrlDatum.URL_FN), new GroupBy(Pipe.pipes(new Each(new Each(pipe, new CreateUrlDatumFromCrawlDbFunction()), new SelectUrlOnlyFunction()), new Each(each3, new SelectUrlOnlyFunction())), new Fields(ExtendedUrlDatum.URL_FN)), new Fields(ExtendedUrlDatum.URL_FN), ExtendedUrlDatum.FIELDS.append(new Fields(UrlLengthener.URL_FN)), new LeftJoin()), new DistinctMergedPipeFunction()), new CreateCrawlDbDatumFromUrlFunction());
        HashMap hashMap = new HashMap();
        hashMap.put(coGroup.getName(), createContentSinkJDBCTap);
        hashMap.put(each4.getName(), createUrlUpdateSink);
        hashMap.put(coGroup2.getName(), createParseSinkJDBCTap);
        hashMap.put(ClassifierPipe.CLASSIFIER_PIPE_NAME, createClassifierSinkJDBCTap);
        hashMap.put(each5.getName(), createUrlsSinkJDBCTap);
        return new FlowConnector(HadoopUtils.getDefaultProperties(SimpleCrawl.class, z, jobConf)).connect(createUrlsSourceJDBCTap, hashMap, coGroup, coGroup2, classifierPipe.getClassifierTailPipe(), each4, each5);
    }
}
