package cascading.flow.stack;

import cascading.CascadingException;
import cascading.flow.FlowElement;
import cascading.flow.FlowException;
import cascading.flow.FlowProcess;
import cascading.flow.Scope;
import cascading.flow.StepCounters;
import cascading.tap.Tap;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.Tuples;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.mapred.OutputCollector;

/* loaded from: input_file:cascading/flow/stack/SinkReducerStackElement.class */
class SinkReducerStackElement extends ReducerStackElement {
    private final Tap sink;
    private OutputCollector outputCollector;

    public SinkReducerStackElement(StackElement stackElement, FlowProcess flowProcess, Scope scope, Tap tap, boolean z) throws IOException {
        this(stackElement, flowProcess, scope, null, null, tap, z);
    }

    public SinkReducerStackElement(StackElement stackElement, FlowProcess flowProcess, Scope scope, String str, Tap tap, Tap tap2, boolean z) throws IOException {
        super(stackElement, flowProcess, scope, str, tap);
        this.sink = tap2;
        if (z) {
            this.outputCollector = (OutputCollector) tap2.openForWrite(getJobConf());
        }
    }

    @Override // cascading.flow.stack.ReducerStackElement
    public FlowElement getFlowElement() {
        return this.sink;
    }

    @Override // cascading.flow.stack.ReducerStackElement, cascading.flow.FlowCollector
    public void collect(Tuple tuple, Iterator it) {
        operateSink(tuple, it);
    }

    @Override // cascading.flow.stack.ReducerStackElement, cascading.flow.FlowCollector
    public void collect(Tuple tuple) {
        TupleEntry tupleEntry = null;
        try {
            try {
                tupleEntry = getTupleEntry(tuple);
                operateSink(tupleEntry);
                Tuples.asModifiable(tuple);
            } catch (Exception e) {
                handleException(e, tupleEntry);
                Tuples.asModifiable(tuple);
            }
        } catch (Throwable th) {
            Tuples.asModifiable(tuple);
            throw th;
        }
    }

    private void operateSink(Tuple tuple, Iterator it) {
        while (it.hasNext()) {
            TupleEntry tupleEntry = (TupleEntry) it.next();
            try {
                try {
                    operateSink(tupleEntry);
                    Tuples.asModifiable(tupleEntry.getTuple());
                } catch (Exception e) {
                    handleException(e, tupleEntry);
                    Tuples.asModifiable(tupleEntry.getTuple());
                }
            } catch (Throwable th) {
                Tuples.asModifiable(tupleEntry.getTuple());
                throw th;
            }
        }
    }

    private void operateSink(TupleEntry tupleEntry) {
        try {
            if (this.outputCollector != null) {
                getFlowProcess().keepAlive();
                ((Tap) getFlowElement()).sink(tupleEntry, this.outputCollector);
            } else {
                ((Tap) getFlowElement()).sink(tupleEntry, this.lastOutput);
            }
            getFlowProcess().increment(StepCounters.Tuples_Written, 1);
        } catch (IOException e) {
            throw new StackException("io exception writing to tap: " + this.sink.toString(), e);
        } catch (OutOfMemoryError e2) {
            throw new StackException("out of memory, try increasing task memory allocation", e2);
        } catch (Throwable th) {
            if (!(th instanceof CascadingException)) {
                throw new FlowException("internal error: " + tupleEntry.getTuple().print(), th);
            }
            throw ((CascadingException) th);
        }
    }

    @Override // cascading.flow.stack.StackElement
    public void prepare() {
    }

    @Override // cascading.flow.stack.StackElement
    public void cleanup() {
    }

    @Override // cascading.flow.stack.StackElement
    public void close() throws IOException {
        try {
            super.close();
            if (this.outputCollector != null) {
                ((TupleEntryCollector) this.outputCollector).close();
            }
        } catch (Throwable th) {
            if (this.outputCollector != null) {
                ((TupleEntryCollector) this.outputCollector).close();
            }
            throw th;
        }
    }
}
