package bixo.operations;

import bixo.config.FetcherPolicy;
import bixo.datum.FetchSetDatum;
import bixo.datum.FetchedDatum;
import bixo.datum.ScoredUrlDatum;
import bixo.datum.UrlStatus;
import bixo.fetcher.BaseFetcher;
import bixo.fetcher.FetchTask;
import bixo.fetcher.IFetchMgr;
import bixo.hadoop.FetchCounters;
import bixo.utils.DiskQueue;
import bixo.utils.ThreadedExecutor;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Buffer;
import cascading.operation.BufferCall;
import cascading.operation.OperationCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import com.bixolabs.cascading.BaseDatum;
import com.bixolabs.cascading.LoggingFlowProcess;
import com.bixolabs.cascading.LoggingFlowReporter;
import com.bixolabs.cascading.NullContext;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;

/* loaded from: input_file:bixo/operations/FetchBuffer.class */
public class FetchBuffer extends BaseOperation<NullContext> implements Buffer<NullContext>, IFetchMgr {
    private static Logger LOGGER = Logger.getLogger(FetchBuffer.class);
    private static final Fields FETCH_RESULT_FIELD = new Fields(BaseDatum.fieldName(FetchBuffer.class, "fetch-exception"));
    private static final long NOTHING_TO_FETCH_SLEEP_TIME = 1000;
    private static final long HARD_TERMINATION_CLEANUP_DURATION = 10000;
    private BaseFetcher _fetcher;
    private FetcherPolicy.FetcherMode _fetcherMode;
    private transient ThreadedExecutor _executor;
    private transient LoggingFlowProcess _flowProcess;
    private transient TupleEntryCollector _collector;
    private transient Object _refLock;
    private transient ConcurrentHashMap<String, Long> _activeRefs;
    private transient ConcurrentHashMap<String, Long> _pendingRefs;
    private transient AtomicBoolean _keepCollecting;

    /* loaded from: input_file:bixo/operations/FetchBuffer$QueuedFetchSetsComparator.class */
    private class QueuedFetchSetsComparator implements Comparator<FetchSetDatum> {
        private QueuedFetchSetsComparator() {
        }

        private long getFetchTime(String str) {
            if (FetchBuffer.this._activeRefs.get(str) != null) {
                return Long.MAX_VALUE;
            }
            Long l = (Long) FetchBuffer.this._pendingRefs.get(str);
            if (l == null) {
                return 0L;
            }
            return l.longValue();
        }

        @Override // java.util.Comparator
        public int compare(FetchSetDatum fetchSetDatum, FetchSetDatum fetchSetDatum2) {
            long fetchTime = getFetchTime(fetchSetDatum.getGroupingRef());
            long fetchTime2 = getFetchTime(fetchSetDatum2.getGroupingRef());
            if (fetchTime < fetchTime2) {
                return -1;
            }
            if (fetchTime > fetchTime2) {
                return 1;
            }
            if (fetchSetDatum.getUrls().size() > fetchSetDatum2.getUrls().size()) {
                return -1;
            }
            return fetchSetDatum.getUrls().size() < fetchSetDatum2.getUrls().size() ? 1 : 0;
        }
    }

    /* loaded from: input_file:bixo/operations/FetchBuffer$QueuedValues.class */
    private class QueuedValues {
        private static final int MAX_ELEMENTS_IN_MEMORY = 10000;
        private DiskQueue<FetchSetDatum> _queue;
        private Iterator<TupleEntry> _values;
        private boolean _iteratorDone = false;

        public QueuedValues(Iterator<TupleEntry> it) {
            this._values = it;
            this._queue = new DiskQueue<>(10000, new QueuedFetchSetsComparator());
        }

        private boolean safeHasNext() {
            this._iteratorDone = this._iteratorDone || !this._values.hasNext();
            return !this._iteratorDone;
        }

        public boolean isEmpty() {
            return this._queue.isEmpty() && !safeHasNext();
        }

        /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
        /* JADX WARN: Code restructure failed: missing block: B:36:0x01eb, code lost:
        
            return r0;
         */
        /* JADX WARN: Failed to find 'out' block for switch in B:17:0x0090. Please report as an issue. */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public bixo.datum.FetchSetDatum nextOrNull(bixo.config.FetcherPolicy.FetcherMode r9) {
            /*
                Method dump skipped, instructions count: 630
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: bixo.operations.FetchBuffer.QueuedValues.nextOrNull(bixo.config.FetcherPolicy$FetcherMode):bixo.datum.FetchSetDatum");
        }
    }

    public FetchBuffer(BaseFetcher baseFetcher) {
        super(FetchedDatum.FIELDS.append(FETCH_RESULT_FIELD));
        this._fetcher = baseFetcher;
        this._fetcherMode = this._fetcher.getFetcherPolicy().getFetcherMode();
    }

    @Override // cascading.operation.BaseOperation, cascading.operation.Operation
    public boolean isSafe() {
        return false;
    }

    @Override // cascading.operation.BaseOperation, cascading.operation.Operation
    public void prepare(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
        super.prepare(flowProcess, operationCall);
        this._flowProcess = new LoggingFlowProcess((HadoopFlowProcess) flowProcess);
        this._flowProcess.addReporter(new LoggingFlowReporter());
        this._executor = new ThreadedExecutor(this._fetcher.getMaxThreads(), this._fetcher.getFetcherPolicy().getRequestTimeout());
        this._refLock = new Object();
        this._pendingRefs = new ConcurrentHashMap<>();
        this._activeRefs = new ConcurrentHashMap<>();
        this._keepCollecting = new AtomicBoolean(true);
    }

    @Override // cascading.operation.Buffer
    public void operate(FlowProcess flowProcess, BufferCall<NullContext> bufferCall) {
        QueuedValues queuedValues = new QueuedValues(bufferCall.getArgumentsIterator());
        this._collector = bufferCall.getOutputCollector();
        FetcherPolicy fetcherPolicy = this._fetcher.getFetcherPolicy();
        while (!Thread.interrupted() && !fetcherPolicy.isTerminateFetch() && !queuedValues.isEmpty()) {
            FetchSetDatum nextOrNull = queuedValues.nextOrNull(this._fetcherMode);
            if (nextOrNull == null) {
                try {
                    trace("Nothing ready to fetch, sleeping...", new Object[0]);
                    flowProcess.keepAlive();
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    LOGGER.warn("FetchBuffer interrupted!");
                    Thread.currentThread().interrupt();
                }
            } else {
                List<ScoredUrlDatum> urls = nextOrNull.getUrls();
                String groupingRef = nextOrNull.getGroupingRef();
                trace("Processing %d URLs for %s", Integer.valueOf(urls.size()), groupingRef);
                FetchTask fetchTask = new FetchTask(this, this._fetcher, urls, groupingRef);
                if (nextOrNull.isLastList()) {
                    makeActive(groupingRef, 0L);
                    trace("Executing fetch of %d URLs from %s (last batch)", Integer.valueOf(urls.size()), groupingRef);
                } else {
                    Long valueOf = Long.valueOf(System.currentTimeMillis() + nextOrNull.getFetchDelay());
                    makeActive(groupingRef, valueOf);
                    trace("Executing fetch of %d URLs from %s (next fetch time %d)", Integer.valueOf(urls.size()), groupingRef, valueOf);
                }
                long currentTimeMillis = System.currentTimeMillis();
                try {
                    this._executor.execute(fetchTask);
                } catch (RejectedExecutionException e2) {
                    LOGGER.error("Fetch pool rejected our fetch list for " + groupingRef);
                    finished(groupingRef);
                    skipUrls(urls, UrlStatus.SKIPPED_DEFERRED, String.format("Execution rejection skipped %d URLs", Integer.valueOf(urls.size())));
                }
                adjustActive(groupingRef, System.currentTimeMillis() - currentTimeMillis);
            }
        }
        if (queuedValues.isEmpty()) {
            return;
        }
        trace("Found unprocessed URLs", new Object[0]);
        UrlStatus urlStatus = Thread.interrupted() ? UrlStatus.SKIPPED_INTERRUPTED : UrlStatus.SKIPPED_TIME_LIMIT;
        while (!queuedValues.isEmpty()) {
            FetchSetDatum nextOrNull2 = queuedValues.nextOrNull(FetcherPolicy.FetcherMode.IMPOLITE);
            if (nextOrNull2 != null) {
                List<ScoredUrlDatum> urls2 = nextOrNull2.getUrls();
                trace("Skipping %d urls from %s (e.g. %s) ", Integer.valueOf(urls2.size()), nextOrNull2.getGroupingRef(), urls2.get(0).getUrl());
                skipUrls(nextOrNull2.getUrls(), urlStatus, null);
            }
        }
    }

    @Override // cascading.operation.BaseOperation, cascading.operation.Operation
    public void cleanup(FlowProcess flowProcess, OperationCall<NullContext> operationCall) {
        try {
            Thread.sleep(1000L);
            if (!this._executor.terminate(this._fetcher.getFetcherPolicy().getRequestTimeout())) {
                LOGGER.warn("Had to do a hard termination of general fetching");
                this._fetcher.abort();
                Thread.sleep(10000L);
            }
            synchronized (this._keepCollecting) {
                this._keepCollecting.set(false);
            }
        } catch (InterruptedException e) {
            LOGGER.warn("Interrupted while waiting for termination");
        }
        this._flowProcess.dumpCounters();
    }

    @Override // bixo.fetcher.IFetchMgr
    public void finished(String str) {
        synchronized (this._refLock) {
            Long remove = this._activeRefs.remove(str);
            if (remove == null) {
                throw new RuntimeException("finished called on non-active ref: " + str);
            }
            if (remove.longValue() != 0) {
                trace("Finished batch fetch for %s, with next batch at %d", str, remove);
                this._pendingRefs.put(str, remove);
            } else {
                trace("Finished last batch fetch for %s", str);
            }
        }
    }

    @Override // bixo.fetcher.IFetchMgr
    public void collect(Tuple tuple) {
        synchronized (this._keepCollecting) {
            if (this._keepCollecting.get()) {
                this._collector.add(tuple);
            } else {
                LOGGER.warn("Losing an entry: " + tuple);
            }
        }
    }

    @Override // bixo.fetcher.IFetchMgr
    public LoggingFlowProcess getProcess() {
        return this._flowProcess;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void skipUrls(List<ScoredUrlDatum> list, UrlStatus urlStatus, String str) {
        Iterator<ScoredUrlDatum> it = list.iterator();
        while (it.hasNext()) {
            Tuple tuple = new FetchedDatum(it.next()).getTuple();
            tuple.add((Comparable) urlStatus.toString());
            this._collector.add(tuple);
        }
        this._flowProcess.increment(FetchCounters.URLS_SKIPPED, list.size());
        if (urlStatus == UrlStatus.SKIPPED_PER_SERVER_LIMIT) {
            this._flowProcess.increment(FetchCounters.URLS_SKIPPED_PER_SERVER_LIMIT, list.size());
        }
        if (str == null || !LOGGER.isTraceEnabled()) {
            return;
        }
        LOGGER.trace(String.format(str, Integer.valueOf(list.size())));
    }

    private void makeActive(String str, Long l) {
        synchronized (this._refLock) {
            trace("Making %s active", str);
            this._pendingRefs.remove(str);
            this._activeRefs.put(str, l);
        }
    }

    private void adjustActive(String str, long j) {
        synchronized (this._refLock) {
            Long l = this._activeRefs.get(str);
            if (l != null && l.longValue() != 0 && j != 0) {
                this._activeRefs.put(str, Long.valueOf(l.longValue() + j));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void trace(String str, Object... objArr) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace(String.format(str, objArr));
        }
    }
}
