Gracefully terminating a thread waiting for a lock queue

I have a problem with a multi-threaded server, which I create as an academic exercise, or rather, so that the connection is closed gracefully.

Each connection is controlled by the Session class. This class supports 2 threads for connection, DownstreamThread and UpstreamThread.

The UpstreamThread blocks on the client socket and encode all incoming lines into messages that are sent to another level for processing. DownstreamThread blocks on the BlockingQueue into which messages for the client are inserted. When a message appears in the queue, the downstream thread issues the message from the queue, turns it into a string, and sends it to the client. In the final system, the application layer will act on incoming messages and output outgoing messages to the server for sending to the corresponding client, but for now I just have a simple application that sleeps a second in the incoming message before repeating it back as an outgoing message with a label attached time.

The problem I am facing is to completely close the grace when the client disconnects. The first problem I am facing is a normal shutdown when the client lets the server know that it is ending the connection using the QUIT command. The main pseudocode:

while (!quitting) {
    inputString = socket.readLine () // blocks
    if (inputString != "QUIT") {
        // forward the message upstream
        server.acceptMessage (inputString);
    } else {
        // Do cleanup
        quitting = true;
        socket.close ();
    }
}

The top stream of the main loop looks at the input line. If it is QUIT, the thread sets a flag to say that the client has completed the connection and exited the loop. This causes the upstream flow to stop.

The main downstream loop waits for messages in the BlockingQueue until the close connection flag is set. When this is the case, the downstream should also end. However, this is not so; he just sits there, waiting. Its psuedocode is as follows:

while (!quitting) {
    outputMessage = messageQueue.take (); // blocks
    sendMessageToClient (outputMessage);
}

, , , , , .

, BlockingQueue, , . QUIT .

? , , - - take(). , , , , . , , , , , , , . QUIT , , , , , , . .

Thread.stop(), , -, , , , . , , , , , .

, , , , , , . Java, . - - , .

Session , , . - 250 .

import java.io.*;
import java.net.*;
import java.util.concurrent.*;
import java.util.logging.*;

/**
 * Session class
 * 
 * A session manages the individual connection between a client and the server. 
 * It accepts input from the client and sends output to the client over the 
 * provided socket.
 * 
 */
public class Session {
    private Socket              clientSocket    = null;
    private Server              server          = null;
    private Integer             sessionId       = 0;
    private DownstreamThread    downstream      = null;
    private UpstreamThread      upstream        = null;
    private boolean             sessionEnding   = false;

    /**
     * This thread handles waiting for messages from the server and sending
     * them to the client
     */
    private class DownstreamThread implements Runnable {
        private BlockingQueue<DownstreamMessage>    incomingMessages    = null;
        private OutputStreamWriter                  streamWriter        = null;
        private Session                             outer               = null;

        @Override
        public void run () {
            DownstreamMessage message;
            Thread.currentThread ().setName ("DownstreamThread_" + outer.getId ());

            try {
                // Send connect message
                this.sendMessageToClient ("Hello, you are client " + outer.getId ());

                while (!outer.sessionEnding) {
                    message = this.incomingMessages.take ();
                    this.sendMessageToClient (message.getPayload ());
                }

                // Send disconnect message
                this.sendMessageToClient ("Goodbye, client " + getId ());

            } catch (InterruptedException | IOException ex) {
                Logger.getLogger (DownstreamThread.class.getName ()).log (Level.SEVERE, ex.getMessage (), ex);
            } finally {
                this.terminate ();
            }
        }

        /**
         * Add a message to the downstream queue
         * 
         * @param message
         * @return
         * @throws InterruptedException 
         */
        public DownstreamThread acceptMessage (DownstreamMessage message) throws InterruptedException {
            if (!outer.sessionEnding) {
                this.incomingMessages.put (message);
            }

            return this;
        }

        /**
         * Send the given message to the client
         * 
         * @param message
         * @throws IOException 
         */
        private DownstreamThread sendMessageToClient (CharSequence message) throws IOException {
            OutputStreamWriter osw;
            // Output to client
            if (null != (osw = this.getStreamWriter ())) {
                osw.write ((String) message);
                osw.write ("\r\n");
                osw.flush ();
            }

            return this;
        }

        /**
         * Perform session cleanup
         * 
         * @return 
         */
        private DownstreamThread terminate () {
            try {
                this.streamWriter.close ();
            } catch (IOException ex) {
                Logger.getLogger (DownstreamThread.class.getName ()).log (Level.SEVERE, ex.getMessage (), ex);
            }
            this.streamWriter   = null;

            return this;
        }

        /**
         * Get an output stream writer, initialize it if it not active
         * 
         * @return A configured OutputStreamWriter object
         * @throws IOException 
         */
        private OutputStreamWriter getStreamWriter () throws IOException {
            if ((null == this.streamWriter) 
            && (!outer.sessionEnding)) {
                BufferedOutputStream os = new BufferedOutputStream (outer.clientSocket.getOutputStream ());
                this.streamWriter       = new OutputStreamWriter (os, "UTF8");
            }

            return this.streamWriter;
        }

        /**
         * 
         * @param outer 
         */
        public DownstreamThread (Session outer) {
            this.outer              = outer;
            this.incomingMessages   = new LinkedBlockingQueue ();
            System.out.println ("Class " + this.getClass () + " created");
        }
    }

    /**
     * This thread handles waiting for client input and sending it upstream
     */
    private class UpstreamThread implements Runnable {
        private Session outer   = null;

        @Override
        public void run () {
            StringBuffer    inputBuffer = new StringBuffer ();
            BufferedReader  inReader;

            Thread.currentThread ().setName ("UpstreamThread_" + outer.getId ());

            try {
                inReader    = new BufferedReader (new InputStreamReader (outer.clientSocket.getInputStream (), "UTF8"));

                while (!outer.sessionEnding) {
                    // Read whatever was in the input buffer
                    inputBuffer.delete (0, inputBuffer.length ());
                    inputBuffer.append (inReader.readLine ());
                    System.out.println ("Input message was: " + inputBuffer);

                    if (!inputBuffer.toString ().equals ("QUIT")) {
                        // Forward the message up the chain to the Server
                        outer.server.acceptMessage (new UpstreamMessage (sessionId, inputBuffer.toString ()));
                    } else {
                        // End the session
                        outer.sessionEnding = true;
                    }
                }

            } catch (IOException | InterruptedException e) {
                Logger.getLogger (Session.class.getName ()).log (Level.SEVERE, e.getMessage (), e);
            } finally {
                outer.terminate ();
                outer.server.deleteSession (outer.getId ());
            }
        }

        /**
         * Class constructor
         * 
         * The Core Java volume 1 book said that a constructor such as this 
         * should be implicitly created, but that doesn't seem to be the case!
         * 
         * @param outer 
         */
        public UpstreamThread (Session outer) {
            this.outer  = outer;
            System.out.println ("Class " + this.getClass () + " created");
        }
    }

    /**
     * Start the session threads
     */
    public void run () //throws InterruptedException 
    {
        Thread upThread     = new Thread (this.upstream);
        Thread downThread   = new Thread (this.downstream);

        upThread.start ();
        downThread.start ();
    }

    /**
     * Accept a message to send to the client
     * 
     * @param message
     * @return
     * @throws InterruptedException 
     */
    public Session acceptMessage (DownstreamMessage message) throws InterruptedException {
        this.downstream.acceptMessage (message);
        return this;
    }

    /**
     * Accept a message to send to the client
     * 
     * @param message
     * @return
     * @throws InterruptedException 
     */
    public Session acceptMessage (String message) throws InterruptedException {
        return this.acceptMessage (new DownstreamMessage (this.getId (), message));
    }

    /**
     * Terminate the client connection
     */
    private void terminate () {
        try {
            this.clientSocket.close ();
        } catch (IOException e) {
            Logger.getLogger (Session.class.getName ()).log (Level.SEVERE, e.getMessage (), e);
        }
    }

    /**
     * Get this Session ID
     * 
     * @return The ID of this session
     */
    public Integer getId () {
        return this.sessionId;
    }

    /**
     * Session constructor
     * 
     * @param owner The Server object that owns this session
     * @param sessionId The unique ID this session will be given
     * @throws IOException 
     */
    public Session (Server owner, Socket clientSocket, Integer sessionId) throws IOException {

        this.server         = owner;
        this.clientSocket   = clientSocket;
        this.sessionId      = sessionId;
        this.upstream       = new UpstreamThread (this);
        this.downstream     = new DownstreamThread (this);

        System.out.println ("Class " + this.getClass () + " created");
        System.out.println ("Session ID is " + this.sessionId);
    }
}
+5
2

Thread.stop Thread.interrupt. , take InterruptedException, , , .

+3

"" , outer.sessionEnding true, "QUIT". DownstreamThread, . sessionEnding.

:

while (true) {
    outputMessage = messageQueue.take (); // blocks
    if (QUIT == outputMessage)
        break
    sendMessageToClient (outputMessage);
}
0

All Articles