/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred.pipes;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Map;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.pipes.DownwardProtocol;
import org.apache.hadoop.mapred.pipes.Submitter;
import org.apache.hadoop.mapred.pipes.UpwardProtocol;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable, K2 extends WritableComparable, V2 extends Writable>
implements DownwardProtocol<K1, V1> {
    public static final int CURRENT_PROTOCOL_VERSION = 0;
    private static final int BUFFER_SIZE = 131072;
    private DataOutputStream stream;
    private DataOutputBuffer buffer = new DataOutputBuffer();
    private static final Logger LOG = LoggerFactory.getLogger(BinaryProtocol.class.getName());
    private UplinkReaderThread uplink;

    public BinaryProtocol(Socket sock, UpwardProtocol<K2, V2> handler, K2 key, V2 value, JobConf config) throws IOException {
        OutputStream raw = sock.getOutputStream();
        if (Submitter.getKeepCommandFile(config)) {
            raw = new TeeOutputStream("downlink.data", raw);
        }
        this.stream = new DataOutputStream(new BufferedOutputStream(raw, 131072));
        this.uplink = new UplinkReaderThread<K2, V2>(sock.getInputStream(), handler, key, value);
        this.uplink.setName("pipe-uplink-handler");
        this.uplink.start();
    }

    @Override
    public void close() throws IOException, InterruptedException {
        LOG.debug("closing connection");
        this.stream.close();
        this.uplink.closeConnection();
        this.uplink.interrupt();
        this.uplink.join();
    }

    @Override
    public void authenticate(String digest, String challenge) throws IOException {
        LOG.debug("Sending AUTHENTICATION_REQ, digest=" + digest + ", challenge=" + challenge);
        WritableUtils.writeVInt(this.stream, MessageType.AUTHENTICATION_REQ.code);
        Text.writeString(this.stream, digest);
        Text.writeString(this.stream, challenge);
    }

    @Override
    public void start() throws IOException {
        LOG.debug("starting downlink");
        WritableUtils.writeVInt(this.stream, MessageType.START.code);
        WritableUtils.writeVInt(this.stream, 0);
    }

    @Override
    public void setJobConf(JobConf job) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.SET_JOB_CONF.code);
        ArrayList<String> list = new ArrayList<String>();
        for (Map.Entry<String, String> itm : job) {
            list.add(itm.getKey());
            list.add(itm.getValue());
        }
        WritableUtils.writeVInt(this.stream, list.size());
        for (String entry : list) {
            Text.writeString(this.stream, entry);
        }
    }

    @Override
    public void setInputTypes(String keyType, String valueType) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.SET_INPUT_TYPES.code);
        Text.writeString(this.stream, keyType);
        Text.writeString(this.stream, valueType);
    }

    @Override
    public void runMap(InputSplit split, int numReduces, boolean pipedInput) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.RUN_MAP.code);
        this.writeObject(split);
        WritableUtils.writeVInt(this.stream, numReduces);
        WritableUtils.writeVInt(this.stream, pipedInput ? 1 : 0);
    }

    @Override
    public void mapItem(WritableComparable key, Writable value) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.MAP_ITEM.code);
        this.writeObject(key);
        this.writeObject(value);
    }

    @Override
    public void runReduce(int reduce, boolean pipedOutput) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.RUN_REDUCE.code);
        WritableUtils.writeVInt(this.stream, reduce);
        WritableUtils.writeVInt(this.stream, pipedOutput ? 1 : 0);
    }

    @Override
    public void reduceKey(WritableComparable key) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.REDUCE_KEY.code);
        this.writeObject(key);
    }

    @Override
    public void reduceValue(Writable value) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.REDUCE_VALUE.code);
        this.writeObject(value);
    }

    @Override
    public void endOfInput() throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.CLOSE.code);
        LOG.debug("Sent close command");
    }

    @Override
    public void abort() throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.ABORT.code);
        LOG.debug("Sent abort command");
    }

    @Override
    public void flush() throws IOException {
        this.stream.flush();
    }

    private void writeObject(Writable obj) throws IOException {
        if (obj instanceof Text) {
            Text t = (Text)obj;
            int len = t.getLength();
            WritableUtils.writeVInt(this.stream, len);
            this.stream.write(t.getBytes(), 0, len);
        } else if (obj instanceof BytesWritable) {
            BytesWritable b = (BytesWritable)obj;
            int len = b.getLength();
            WritableUtils.writeVInt(this.stream, len);
            this.stream.write(b.getBytes(), 0, len);
        } else {
            this.buffer.reset();
            obj.write(this.buffer);
            int length = this.buffer.getLength();
            WritableUtils.writeVInt(this.stream, length);
            this.stream.write(this.buffer.getData(), 0, length);
        }
    }

    private static class TeeOutputStream
    extends FilterOutputStream {
        private OutputStream file;

        TeeOutputStream(String filename, OutputStream base) throws IOException {
            super(base);
            this.file = new FileOutputStream(filename);
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            this.file.write(b, off, len);
            this.out.write(b, off, len);
        }

        @Override
        public void write(int b) throws IOException {
            this.file.write(b);
            this.out.write(b);
        }

        @Override
        public void flush() throws IOException {
            this.file.flush();
            this.out.flush();
        }

        @Override
        public void close() throws IOException {
            try {
                this.flush();
            }
            finally {
                IOUtils.closeStream(this.file);
                IOUtils.closeStream(this.out);
            }
        }
    }

    private static class UplinkReaderThread<K2 extends WritableComparable, V2 extends Writable>
    extends Thread {
        private DataInputStream inStream;
        private UpwardProtocol<K2, V2> handler;
        private K2 key;
        private V2 value;
        private boolean authPending = true;

        public UplinkReaderThread(InputStream stream, UpwardProtocol<K2, V2> handler, K2 key, V2 value) throws IOException {
            this.inStream = new DataInputStream(new BufferedInputStream(stream, 131072));
            this.handler = handler;
            this.key = key;
            this.value = value;
        }

        public void closeConnection() throws IOException {
            this.inStream.close();
        }

        @Override
        public void run() {
            try {
                int cmd;
                while (true) {
                    if (Thread.currentThread().isInterrupted()) {
                        throw new InterruptedException();
                    }
                    cmd = WritableUtils.readVInt(this.inStream);
                    LOG.debug("Handling uplink command " + cmd);
                    if (cmd == MessageType.AUTHENTICATION_RESP.code) {
                        String digest = Text.readString(this.inStream);
                        this.authPending = !this.handler.authenticate(digest);
                        continue;
                    }
                    if (this.authPending) {
                        LOG.warn("Message " + cmd + " received before authentication is complete. Ignoring");
                        continue;
                    }
                    if (cmd == MessageType.OUTPUT.code) {
                        this.readObject((Writable)this.key);
                        this.readObject((Writable)this.value);
                        this.handler.output(this.key, this.value);
                        continue;
                    }
                    if (cmd == MessageType.PARTITIONED_OUTPUT.code) {
                        int part = WritableUtils.readVInt(this.inStream);
                        this.readObject((Writable)this.key);
                        this.readObject((Writable)this.value);
                        this.handler.partitionedOutput(part, this.key, this.value);
                        continue;
                    }
                    if (cmd == MessageType.STATUS.code) {
                        this.handler.status(Text.readString(this.inStream));
                        continue;
                    }
                    if (cmd == MessageType.PROGRESS.code) {
                        this.handler.progress(this.inStream.readFloat());
                        continue;
                    }
                    if (cmd == MessageType.REGISTER_COUNTER.code) {
                        int id = WritableUtils.readVInt(this.inStream);
                        String group = Text.readString(this.inStream);
                        String name = Text.readString(this.inStream);
                        this.handler.registerCounter(id, group, name);
                        continue;
                    }
                    if (cmd != MessageType.INCREMENT_COUNTER.code) break;
                    int id = WritableUtils.readVInt(this.inStream);
                    long amount = WritableUtils.readVLong(this.inStream);
                    this.handler.incrementCounter(id, amount);
                }
                if (cmd == MessageType.DONE.code) {
                    LOG.debug("Pipe child done");
                    this.handler.done();
                    return;
                }
                throw new IOException("Bad command code: " + cmd);
            }
            catch (InterruptedException e) {
                return;
            }
            catch (Throwable e) {
                LOG.error(StringUtils.stringifyException(e));
                this.handler.failed(e);
                return;
            }
        }

        private void readObject(Writable obj) throws IOException {
            int numBytes = WritableUtils.readVInt(this.inStream);
            if (obj instanceof BytesWritable) {
                byte[] buffer = new byte[numBytes];
                this.inStream.readFully(buffer);
                ((BytesWritable)obj).set(buffer, 0, numBytes);
            } else if (obj instanceof Text) {
                byte[] buffer = new byte[numBytes];
                this.inStream.readFully(buffer);
                ((Text)obj).set(buffer);
            } else {
                obj.readFields(this.inStream);
            }
        }
    }

    private static enum MessageType {
        START(0),
        SET_JOB_CONF(1),
        SET_INPUT_TYPES(2),
        RUN_MAP(3),
        MAP_ITEM(4),
        RUN_REDUCE(5),
        REDUCE_KEY(6),
        REDUCE_VALUE(7),
        CLOSE(8),
        ABORT(9),
        AUTHENTICATION_REQ(10),
        OUTPUT(50),
        PARTITIONED_OUTPUT(51),
        STATUS(52),
        PROGRESS(53),
        DONE(54),
        REGISTER_COUNTER(55),
        INCREMENT_COUNTER(56),
        AUTHENTICATION_RESP(57);

        final int code;

        private MessageType(int code) {
            this.code = code;
        }
    }
}

