//----------------------------------------------------------------
// Example TA1: separate method just to write log message
//----------------------------------------------------------------

    try {
            rpcConn = connectionPool.getConnection(dest);
        } catch (IOException e) {
            /* Signal to caller that destination cannot be reached; 
             * this is unlikely to change on subsequent open attempts, 
             * so we defer to the caller of the method. */
            NetworkErrorLogger.logRpcOpenError(req, dest, e);
            return null;
        }
...

private static class NetworkErrorLogger {
    /**
     * Output information relevant to an error that occurs when trying to 
     * open a connection to send an RPC.
     * 
     * @param req - the RPC request that would have been sent through the connection
     * @param dest - the destination of the RPC
     * @param e - the caught error
     */
     public static void logRpcOpenError(RpcRequest req, AddrPortTuple dest, Exception e) {
        logger.log(Level.WARNING, "Cannot send message: " + req + ". \n"
                + "Unable to find or open connection to " + dest + " :" + e);
    }
...

//----------------------------------------------------------------
// Example TA2: one task interrupted by another
//----------------------------------------------------------------

    ...
    TimerTask sendTask = new TimerTask() {
        public void run() {
            sendHandler.accept(request, recipient);
        }
    };
    
    synchronized(outgoingRequests) {
        outgoingRequests.add(req);
    }
    
    req.resendingTimer.schedule(sendTask, new Date(), 
            resendIntervalMS);
    ....
        
//----------------------------------------------------------------
// Example TA3: two message types for return to client
//----------------------------------------------------------------

    public class ClientResponse extends Message {
        //Output of running command in associated request
        public final String result;
        ...
        
    public class ClientIncorrectLeaderResponse extends Message {
        public final int leaderId;
        ...
    
//----------------------------------------------------------------
// Example TA4: PersistentLog class decides when to truncate log
//----------------------------------------------------------------
    
    /**
     * Replaces all entries at or after index with new entries, unless
     * new entry is already in log at that index (checks if equals)
     * @param index at which to add new entries. 
     * @param newEntries to add at index
     * @throws StorageException if encounters error writing entry to file
     */
    public void setEntriesAtIndex(int index, List<Entry> newEntries) 
            throws StorageException {
        
//----------------------------------------------------------------
// Example O1: Code doesn't seem like it should work (but it does)
//----------------------------------------------------------------
        
    this.persistentState.setTerm(newTerm);
    this.persistentState.setVotedFor(null);
    
//----------------------------------------------------------------
// Example O2: Lack of indentation makes things less obvious
//----------------------------------------------------------------
    
   /**
    * ...
    * @param numThreads The number of threads that this manager should spin 
    * up in order to manage ongoing connections. The MessageManager spins
    * up at least one thread for every open connection, so this should be 
    * at least equal to the number of connections you expect to be open at
    * once. This should be a multiple of that number if you expect to send
    * a lot of messages in a short amount of time. 
    * @param handler Used as a callback in order to handle incoming messages
    * on this MessageManager's open connections. See {@code MessageHandler}
    * and {@code handleMessage} 
    * for details.
    */
    
    /**
     * @param numThreads The number of threads that this manager should spin 
     *      up in order to manage ongoing connections. The MessageManager spins
     *      up at least one thread for every open connection, so this should be 
     *      at least equal to the number of connections you expect to be open at
     *      once. This should be a multiple of that number if you expect to send
     *      a lot of messages in a short amount of time. 
     * @param handler Used as a callback in order to handle incoming messages
     *      on this MessageManager's open connections. See {@code MessageHandler}
     *      and {@code handleMessage} 
     *      for details.
     */
    
    /**
     * @param numThreads
     *      The number of threads that this manager should spin up in order to
     *      manage ongoing connections. The MessageManager spins up at least
     *      one thread for every open connection, so this should be at least
     *      equal to the number of connections you expect to be open at once.
     *      This should be a multiple of that number if you expect to send
     *      a lot of messages in a short amount of time. 
     * @param handler
     *      Used as a callback in order to handle incoming messages on this
     *      MessageManager's open connections. See {@code MessageHandler}
     *      and {@code handleMessage} for details.
     */
    
//----------------------------------------------------------------
// Example O3: Bad code indentation obscures statement boundary
//----------------------------------------------------------------
    
    AppendEntriesRequest aeMsg 
    = new AppendEntriesRequest(serverAddress);
    aeMsg.term = leaderElectionState.getObject().currentTerm;
    aeMsg.commitIndex = commitIndex;
    int nextIndex = peerNextIndices.get(peerAddr);
    
//----------------------------------------------------------------
// Example O4: Program appears to end
//----------------------------------------------------------------

    public static void main(String[] args) {        
        InetSocketAddress myAddress = null;
        ArrayList<InetSocketAddress> serverAddresses = null;

        if (args.length == 2) {
            myAddress = AddressUtils.parseAddress(args[0]);
            serverAddresses = AddressUtils.parseAddresses(args[1]);
        }

        if (args.length != 2 || myAddress == null || 
                serverAddresses == null) {
            System.out.println("Please supply exactly two valid arguments");
            System.out.println(
                    "Usage: <myHostname:myPort> <hostname0:port0>,"
                    + "<hostname1:port1>,...,<hostname$n-1$,port$n-1$>");
            System.exit(1);
        }

        // Java doesn't like calling constructors without an
        // assignment to a variable, even if that variable is not used.
        @SuppressWarnings("unused")
        RaftClient client = new RaftClient(myAddress, serverAddresses);
    }

  //----------------------------------------------------------------
  // Example O5: Command loop distributed among lambdas
  //----------------------------------------------------------------

    public RaftClient(InetSocketAddress myAddress, 
            ArrayList<InetSocketAddress> serverAddresses) {
        networkManager = new NetworkManager(this.myAddress,
                this::handleSerializable, ...);
        ...
        waitForAndProcessInput();
    }
    
    private synchronized void waitForAndProcessInput() {
        ... read command from console ...
        sendRetryingRequest();
    }
    
    private synchronized void sendRetryingRequest() {
        ...
        networkManager.sendSerializable(leaderAddress, outstandingRequest);
        ...
    }
    
    public synchronized void handleSerializable(Serializable object) {
        if (!(object instanceof ClientReply)) {
            System.out.println("Don't know how to handle the serializable "
                    + "object: " + object);
            return;
        }
        ClientReply reply = (ClientReply) object;
        ...
        waitForAndProcessInput();
    }

//----------------------------------------------------------------
// Example O6: Declare as one type, allocate as another
//----------------------------------------------------------------
    
    private List<Message> incomingMessageList;
    ...
    incomingMessageList = new ArrayList<Message>();

//----------------------------------------------------------------
// Example O7: Long (and nested!) lambdas
//----------------------------------------------------------------
    
    public NetworkManager(InetSocketAddress myAddress, 
            Consumer<Serializable> handleSerializableCb, 
            UncaughtExceptionHandler ueh) {
        addrToWriteSocketInfo = 
                new HashMap<InetSocketAddress, WriteSocketInfo>();
        removeWriteSocketTimer = new Timer();

        @SuppressWarnings("resource")
        // We suppress warnings for not closing the listener socket because 
        // any I/O Exception involving the listener socket is fatal and 
        // causes the listener thread to terminate.
        Thread listenerThread = new Thread(() -> {
            ServerSocket listenerSocket = null;
            try {
                listenerSocket = new ServerSocket();
                listenerSocket.bind(myAddress);
            } catch (BindException e) {
                throw new NetworkManagerException("Failed to bind to address"
                        + ": " + myAddress + ". Received exception: " + e);
            } catch (IOException e) {
                throw new NetworkManagerException("Failed to create listener"
                        + " socket. Received exception: " + e);
            }

            while(true) {
                try {
                    Socket socket = listenerSocket.accept();
                    myLogger.debug("Accepted connection from " + 
                            socket.getRemoteSocketAddress());
                    networkIOService.execute(() -> {
                        // Uses one object input stream for the lifetime of
                        // the socket, which is generally the convention.
                        try (Socket readSocket = socket;
                                InputStream is = readSocket.getInputStream();
                                ObjectInputStream ois = 
                                        new ObjectInputStream(is)) {
                            readSocket.setSoTimeout(READ_WRITE_TIMEOUT_MS);
                            // We only exit the while loop below when a read
                            // timeout or some I/O exception occurs.
                            while (true) {
                                handleSerializableCb.accept(
                                        (Serializable) ois.readObject());
                            }
                        } catch ... {
                            ...
                        }
                    });
                } catch (IOException e) {
                    throw new NetworkManagerException(myAddress + 
                            " experienced an I/O exception while trying to "
                            + "accept connection(s). Received exception: " + 
                            e);
                }
            }
        });

        ...

        listenerThread.start();
    }

//----------------------------------------------------------------
// Example O8: using tricky knowledge of internal state to reuse
//            transitionRole(assertion helps)
//----------------------------------------------------------------

    private synchronized void handleRequestVoteRequest(
            RequestVoteRequest request) {        
        boolean grantVote = checkGrantVote(request.serverId, 
                request.lastLogIndex, request.lastLogTerm);

        if (grantVote) {
            assert(this.role == Role.FOLLOWER);
            // Re-transition to follower to reset election timer.
            transitionRole(Role.FOLLOWER);
            persistentState.setVotedFor(request.serverId);
            logMessage("granting vote to " + request.serverId);            
        }

        RequestVoteReply reply = new RequestVoteReply(myId, 
                this.persistentState.currentTerm, grantVote);
        networkManager.sendSerializable(
                peerMetadataMap.get(request.serverId).address, reply);
    }

//----------------------------------------------------------------
// Example N1: heartbeats not just heartbeats anymore
//----------------------------------------------------------------

    case CLIENT_REQ:
            ClientRequest request = (ClientRequest) clientMsg;
            logger.log(Level.INFO, formatLogMsg("Client sent request to "
                            + "leader. Request: " + request));
            int entryIndex = log.appendEntry(persistentState.getTerm(), 
                            request.command);
    
            /* Store client request to reply to after the entry is applied. */
            leaderState.pendingClientRequestsMap.put(entryIndex, request);
    
            /* Replicate the entry across the cluster. */
            leaderState.sendHeartbeats();

 //----------------------------------------------------------------
 // Example N2: name "receive" is confusing
 //----------------------------------------------------------------

    /**
     * A NetworkListener is an entity that wants to hear about all incoming
     * objects from remote hosts to a network manager.
     */
    public interface NetworkListener {
        
        /**
         * Processes a full object sent from a remote host to the local host.
         * The NetworkManager calls this method in a new thread for each object
         * it receives for the local host.
         * 
         * @param obj The received deserialized object.
         * @param senderAddress The IP address and port of the remote host that
         *      sent the object.
         */
        public void receiveObject(Serializable obj, InetSocketAddress senderAddress);
    }

//----------------------------------------------------------------
// Example N3: method name not obvious
//----------------------------------------------------------------
    
    /**
     * Removes all elements stored at or after (inclusive) the specified index
     * in the list. Persists changes to disk.
     * 
     * @param index The index of the first element we'd like to remove.
     * @throws PersistenceException If the change cannot be persisted to disk.
     */
    public void clearFromIndex(int index) {}

 //----------------------------------------------------------------
 // Example N4: name just repeats type
 //----------------------------------------------------------------

    /*
     * The server socket channel is kept open through the lifetime of this class
     * and accepts all incoming connections.
     */
    private ServerSocketChannel serverChannel;

//----------------------------------------------------------------
// Example N5: name too generic
//----------------------------------------------------------------

    /**
     * Saves to file the results of commiting the log entry at the given index.
     */
    public void setResult(int index, String result);

//----------------------------------------------------------------
// Example N6: name cute, but not obvious
//----------------------------------------------------------------

    public class Citizen {

//----------------------------------------------------------------
// Example N7: name obscures real meaning (isRequest)
//----------------------------------------------------------------

    /**
     * Each RPC invocation expects a response. (RPC responses do not expect
     * a response.)
     */
    public final boolean expectsResponse;

//----------------------------------------------------------------
// Example N8: name misleading (handler hasn't been invoked yet)
//----------------------------------------------------------------

    if (msg.expectsResponse) {
        RpcHandler invokedHandler =  methodHandlerMap.get(msg.method);
        ...

//----------------------------------------------------------------
// Example N9: symbol too vague
//----------------------------------------------------------------

    /* used whenever an integer value isn't set - e.g., when a server does not
     * vote in an election for a given term */
    private static final int SENTINEL_VAL = -1;
        
    /*
     * Value representing that the server has not voted (yet) for anyone for
     * the current election term.
     */
    private static final String VOTED_FOR_SENTINEL_VALUE = "null";
         
//----------------------------------------------------------------
// Example N10: name doesn't reflect purpose (used as unique id)
//----------------------------------------------------------------

    /**
     * Number of commands read from the command line.
     */
    private int numCommandsRead;
    
//----------------------------------------------------------------
// Example D1: doesn't say what variable is (noun vs. verb)
//----------------------------------------------------------------

    /**
     * Map that we can query to see whether we need to reply to a particular
     * client after applying the command of a log entry.
     */
    private HashMap<LogEntry, ClientRequest> outstandingClientRequestsMap;
    
//----------------------------------------------------------------
// Example D2: comment duplicates code
//----------------------------------------------------------------

    /**
     * List that maintains the running log size in bytes
     */
    private ArrayList<Integer> runningLogSizeInBytes;

//----------------------------------------------------------------
// Example D3: comment describes implementation, not interface
//----------------------------------------------------------------

    /**
     * Takes a newly accept()ed socket and assigns it to the correct
     * PeerCommunicator instance. It does this by consuming the initial
     * greeting message, reading the source port, and finding (or
     * populating) the peer in the peerMap.
     */
    public static void handleNewReaderSocket(Socket socket) {

--------------

    /**
     * Gets the communicator for the given peer, or instantiating one (and
     * updating peerMap) if one does not yet exist.
     */
    private static PeerCommunicator getPeerCommunicator(InetAddress peerAddr,
            int peerPort) {

--------------

    /**
     * Writes all key-value pairs to file, overwriting any previous file
     * contents. This can be called as many times as you like over the lifetime
     * of your PersistentMap instance. PersistentMaps can be recovered from
     * file by constructing a new PersistentMap.
     *
     * <p>PersistentMap saves and recovers using two files, a primary save file
     * and a shadow save file. The name of the primary save file is passed
     * into the class constructor; the name of the shadow save file is the same
     * name with a tilde at the end. Using two files enables saving without
     * risk of data loss in the middle of writing to disk.
     *
     * <p>In particular, saving to disk is done in shadow-primary order, and
     * recovering is done in primary-shadow order. This ensures that if saving
     * is interrupted, data recovery can be done with the other file. (If saving
     * to the shadow file is interrupted, then the primary file still contains
     * the current data; and if saving to the primary file is interrupted, then
     * the shadow file contains the newest data.)
     *
     * <p>Saving writes all key-value pairs to the shadow save file, then writes
     * all key-value pairs to the primary save file, then deletes the shadow
     * save file if all goes well.
     *
     * @throws PersistenceException if writing to file was not successful,
     *     e.g. you don't have write access to the file.
     */
    public void save() throws PersistenceException {
        saveTo(shadowSaveFile);
        saveTo(primarySaveFile);
        new File(shadowSaveFile).delete();
    }

--------------

    /**
     * Handle an message sent by the client containing the command to execute.
     * If we are a follower, sent back a response directing the client
     * to send it to the leader. If we are the leader, log the command to file
     * and start replicating the command to other raft servers.
     * 
     * @param clientReq         The request object received from the client
     * @param clientChannel     The socket channel used to communicate
     *                          with the client
     */
    private void handleClientCommandRequest(ClientCommandRequest clientReq,
                                            SocketChannel clientChannel) {

//----------------------------------------------------------------
// Example D4: not enough information (what does success == false mean?)
//----------------------------------------------------------------
    
    public class ExecuteCommandResponse implements Serializable {
        private static final long serialVersionUID = -2816578852598064836L;
    
        /** Indicates whether command was successfully executed by raft 
         * leader */
        public boolean success;
        
        /** Result of command that was executed */
        public String result;
        
        /** ID of the current raft leader */
        public int leaderId;

//----------------------------------------------------------------
// Example D5: index documentation not precise enough
//----------------------------------------------------------------
    /**
     * Inserts the string into the list at the specified index. Persists
     * changes to disk.
     * 
     * @param index The index at which we'd like to add the item.
     * @param newStr The string to add to the list.
     * @throws PersistenceException If the change cannot be persisted to disk.
     * @throws IndexOutOfBoundsException If index < 0 or index > list.size()
     */
     public void add(int index, String newStr) {

//----------------------------------------------------------------
// Example D6: not enough information (asynchronous but sequential)
//----------------------------------------------------------------

/** Schedules the command within request to be run and will call callback
 * once the command has finished. See CommandExecutorCallback for more
 * details.
 *
 * @param request The request whose command will be executed
 * @param callback Contains callback function to call with result of
 *        executing command
 */
public void executeRequest(ClientRequest request,
                           CommandExecutorCallback callback) {

//----------------------------------------------------------------
// Example D7: not enough information (must document pointer state)
//----------------------------------------------------------------

    /**
     * Concatenate all the parts received so far
     * @return the entire message as a ByteBuffer
     */
    public ByteBuffer returnEntireBuffer() {

//----------------------------------------------------------------
// Example D8: too vague
//----------------------------------------------------------------

    /** Represents the most conservative estimate for the highest index that
     * a remote RaftServer's log matches with this RaftServer's log. */
    public HashMap<InetSocketAddress, Integer> matchIndexMap;

    /** Represents the most liberal estimate for the highest index that could
     * be appended to a remote RaftServer to append. */
    public HashMap<InetSocketAddress, Integer> nextIndexMap;

------------------

    /** Allows single threaded access to readChannels 
     * for each incoming connection. */
    private Selector selector;

//----------------------------------------------------------------
// Example D9: documentation exposes unnecessary details
//----------------------------------------------------------------

    /* Returns a tab-separated, string representation of this class 
     * to be persisted to a file. */
    public String toString() {
        return Integer.toString(term) + '\t' + command;
    }
    
    /** 
     * Class method: converts the tab-separated, string representation
     * of this class into the original class object.
     *
     * @param objStr    string representation of the log entry.
     * @return the log entry constructed from the string. If the string
     *         is null then the returned entry will be a dummy. Additionally,
     *         dummy entries are equivalent to each other.
     */
    public static LogEntry fromString(String objStr) {
    
    
    
    
    
    