package bixo.pipes;

import bixo.config.BaseFetchJobPolicy;
import bixo.config.DefaultFetchJobPolicy;
import bixo.datum.FetchSetDatum;
import bixo.datum.FetchedDatum;
import bixo.datum.GroupedUrlDatum;
import bixo.datum.ScoredUrlDatum;
import bixo.datum.StatusDatum;
import bixo.datum.UrlDatum;
import bixo.datum.UrlStatus;
import bixo.exceptions.BaseFetchException;
import bixo.fetcher.BaseFetcher;
import bixo.operations.BaseGroupGenerator;
import bixo.operations.BaseScoreGenerator;
import bixo.operations.FetchBuffer;
import bixo.operations.FilterAndScoreByUrlAndRobots;
import bixo.operations.GroupFunction;
import bixo.operations.MakeFetchSetsBuffer;
import bixo.robots.BaseRobotsParser;
import bixo.robots.RobotUtils;
import bixo.robots.SimpleRobotRulesParser;
import bixo.utils.GroupingKey;
import bixo.utils.UrlUtils;
import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import com.bixolabs.cascading.BaseSplitter;
import com.bixolabs.cascading.NullContext;
import com.bixolabs.cascading.NullSinkTap;
import com.bixolabs.cascading.SplitterAssembly;
import java.net.MalformedURLException;
import java.security.InvalidParameterException;
import java.util.HashMap;
import java.util.Map;

/* loaded from: input_file:bixo/pipes/FetchPipe.class */
public class FetchPipe extends SubAssembly {
    public static final String CONTENT_PIPE_NAME = "FetchPipe-content";
    public static final String STATUS_PIPE_NAME = "FetchPipe-status";

    /* loaded from: input_file:bixo/pipes/FetchPipe$FilterErrorsFunction.class */
    private static class FilterErrorsFunction extends BaseOperation<NullContext> implements Function<NullContext> {
        private int _fieldPos;
        private int[] _fieldsToCopy;

        public FilterErrorsFunction() {
            super(FetchedDatum.FIELDS.size() + 1, FetchedDatum.FIELDS);
            int size = FetchedDatum.FIELDS.size();
            this._fieldPos = size;
            this._fieldsToCopy = new int[size];
            for (int i = 0; i < this._fieldsToCopy.length; i++) {
                this._fieldsToCopy[i] = i;
            }
        }

        @Override // cascading.operation.Function
        public void operate(FlowProcess flowProcess, FunctionCall<NullContext> functionCall) {
            Tuple tuple = functionCall.getArguments().getTuple();
            Comparable comparable = tuple.get(this._fieldPos);
            if ((comparable instanceof String) && UrlStatus.valueOf((String) comparable) == UrlStatus.FETCHED) {
                functionCall.getOutputCollector().add(tuple.get(this._fieldsToCopy));
            }
        }
    }

    /* loaded from: input_file:bixo/pipes/FetchPipe$GroupByDomain.class */
    private static class GroupByDomain extends BaseGroupGenerator {
        private GroupByDomain() {
        }

        @Override // bixo.operations.BaseGroupGenerator
        public String getGroupingKey(UrlDatum urlDatum) {
            String url = urlDatum.getUrl();
            try {
                return UrlUtils.makeProtocolAndDomain(url);
            } catch (MalformedURLException e) {
                throw new RuntimeException("Invalid URL: " + url);
            }
        }
    }

    /* loaded from: input_file:bixo/pipes/FetchPipe$MakeSkippedStatus.class */
    private static class MakeSkippedStatus extends BaseOperation<NullContext> implements Function<NullContext> {
        public MakeSkippedStatus() {
            super(StatusDatum.FIELDS);
        }

        @Override // cascading.operation.Function
        public void operate(FlowProcess flowProcess, FunctionCall<NullContext> functionCall) {
            ScoredUrlDatum scoredUrlDatum = new ScoredUrlDatum(functionCall.getArguments());
            String groupKey = scoredUrlDatum.getGroupKey();
            if (!GroupingKey.isSpecialKey(groupKey)) {
                throw new RuntimeException("Can't make skipped status for regular grouping key: " + groupKey);
            }
            StatusDatum statusDatum = new StatusDatum(scoredUrlDatum.getUrl(), GroupingKey.makeUrlStatusFromKey(groupKey), scoredUrlDatum.getPayload());
            statusDatum.setPayload(scoredUrlDatum);
            functionCall.getOutputCollector().add(statusDatum.getTuple());
        }
    }

    /* loaded from: input_file:bixo/pipes/FetchPipe$MakeStatusFunction.class */
    private static class MakeStatusFunction extends BaseOperation<NullContext> implements Function<NullContext> {
        private int _fieldPos;

        public MakeStatusFunction() {
            super(StatusDatum.FIELDS);
            this._fieldPos = FetchedDatum.FIELDS.size();
        }

        @Override // cascading.operation.Function
        public void operate(FlowProcess flowProcess, FunctionCall<NullContext> functionCall) {
            StatusDatum statusDatum;
            TupleEntry arguments = functionCall.getArguments();
            FetchedDatum fetchedDatum = new FetchedDatum(arguments);
            Object obj = arguments.get(this._fieldPos);
            if (obj instanceof String) {
                UrlStatus valueOf = UrlStatus.valueOf((String) obj);
                statusDatum = valueOf == UrlStatus.FETCHED ? new StatusDatum(fetchedDatum.getUrl(), fetchedDatum.getHeaders(), fetchedDatum.getHostAddress(), fetchedDatum.getPayload()) : new StatusDatum(fetchedDatum.getUrl(), valueOf, fetchedDatum.getPayload());
            } else {
                if (!(obj instanceof BaseFetchException)) {
                    throw new RuntimeException("Unknown type for fetch status field: " + obj.getClass());
                }
                statusDatum = new StatusDatum(fetchedDatum.getUrl(), (BaseFetchException) obj, fetchedDatum.getPayload());
            }
            functionCall.getOutputCollector().add(statusDatum.getTuple());
        }
    }

    /* loaded from: input_file:bixo/pipes/FetchPipe$SplitIntoSpecialAndRegularKeys.class */
    private static class SplitIntoSpecialAndRegularKeys extends BaseSplitter {
        private SplitIntoSpecialAndRegularKeys() {
        }

        @Override // com.bixolabs.cascading.BaseSplitter
        public String getLHSName() {
            return "special grouping key";
        }

        @Override // com.bixolabs.cascading.BaseSplitter
        public boolean isLHS(TupleEntry tupleEntry) {
            return GroupingKey.isSpecialKey(new ScoredUrlDatum(tupleEntry).getGroupKey());
        }
    }

    public FetchPipe(Pipe pipe, BaseScoreGenerator baseScoreGenerator, BaseFetcher baseFetcher, int i) {
        this(pipe, baseScoreGenerator, baseFetcher, RobotUtils.createFetcher(baseFetcher), new SimpleRobotRulesParser(), new DefaultFetchJobPolicy(baseFetcher.getFetcherPolicy()), i);
    }

    public FetchPipe(Pipe pipe, BaseScoreGenerator baseScoreGenerator, BaseFetcher baseFetcher, BaseFetcher baseFetcher2, BaseRobotsParser baseRobotsParser, BaseFetchJobPolicy baseFetchJobPolicy, int i) {
        SplitterAssembly splitterAssembly = new SplitterAssembly(new Every(new GroupBy("Grouping URLs by IP/delay", new Each(pipe, new GroupFunction(new GroupByDomain())), GroupedUrlDatum.getGroupingField()), new FilterAndScoreByUrlAndRobots(baseFetcher2, baseRobotsParser, baseScoreGenerator), Fields.RESULTS), new SplitIntoSpecialAndRegularKeys());
        Every every = new Every(new GroupBy("Fetching URL sets", new Every(new GroupBy("Distributing URL sets", splitterAssembly.getRHSPipe(), GroupedUrlDatum.getGroupingField(), ScoredUrlDatum.getSortingField(), true), new MakeFetchSetsBuffer(baseFetchJobPolicy, i), Fields.RESULTS), FetchSetDatum.getGroupingField(), FetchSetDatum.getSortingField()), new FetchBuffer(baseFetcher), Fields.RESULTS);
        setTails(new Pipe(CONTENT_PIPE_NAME, new Each(every, new FilterErrorsFunction())), new GroupBy(STATUS_PIPE_NAME, Pipe.pipes(new Pipe("skipped status", new Each(splitterAssembly.getLHSPipe(), new MakeSkippedStatus())), new Pipe("fetched status", new Each(every, new MakeStatusFunction()))), new Fields(StatusDatum.URL_FN)));
    }

    public Pipe getContentTailPipe() {
        return getTailPipe(CONTENT_PIPE_NAME);
    }

    public Pipe getStatusTailPipe() {
        return getTailPipe(STATUS_PIPE_NAME);
    }

    private Pipe getTailPipe(String str) {
        String[] tailNames = getTailNames();
        for (int i = 0; i < tailNames.length; i++) {
            if (str.equals(tailNames[i])) {
                return getTails()[i];
            }
        }
        throw new InvalidParameterException("Invalid pipe name: " + str);
    }

    public static Map<String, Tap> makeSinkMap(Tap tap, Tap tap2) {
        HashMap hashMap = new HashMap(2);
        if (tap == null) {
            tap = new NullSinkTap(StatusDatum.FIELDS);
        }
        if (tap2 == null) {
            tap2 = new NullSinkTap(FetchedDatum.FIELDS);
        }
        hashMap.put(STATUS_PIPE_NAME, tap);
        hashMap.put(CONTENT_PIPE_NAME, tap2);
        return hashMap;
    }
}
