I have a problem with a multi-threaded server, which I create as an academic exercise, or rather, so that the connection is closed gracefully.
Each connection is controlled by the Session class. This class supports 2 threads for connection, DownstreamThread and UpstreamThread.
The UpstreamThread blocks on the client socket and encode all incoming lines into messages that are sent to another level for processing. DownstreamThread blocks on the BlockingQueue into which messages for the client are inserted. When a message appears in the queue, the downstream thread issues the message from the queue, turns it into a string, and sends it to the client. In the final system, the application layer will act on incoming messages and output outgoing messages to the server for sending to the corresponding client, but for now I just have a simple application that sleeps a second in the incoming message before repeating it back as an outgoing message with a label attached time.
The problem I am facing is to completely close the grace when the client disconnects. The first problem I am facing is a normal shutdown when the client lets the server know that it is ending the connection using the QUIT command. The main pseudocode:
while (!quitting) {
inputString = socket.readLine ()
if (inputString != "QUIT") {
server.acceptMessage (inputString);
} else {
quitting = true;
socket.close ();
}
}
The top stream of the main loop looks at the input line. If it is QUIT, the thread sets a flag to say that the client has completed the connection and exited the loop. This causes the upstream flow to stop.
The main downstream loop waits for messages in the BlockingQueue until the close connection flag is set. When this is the case, the downstream should also end. However, this is not so; he just sits there, waiting. Its psuedocode is as follows:
while (!quitting) {
outputMessage = messageQueue.take (); // blocks
sendMessageToClient (outputMessage);
}
, , , , , .
, BlockingQueue, , . QUIT .
? , , - - take(). , , , , . , , , , , , , . QUIT , , , , , , . .
Thread.stop(), , -, , , , . , , , , , .
, , , , , , . Java, . - - , .
Session , , . - 250 .
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
import java.util.logging.*;
public class Session {
private Socket clientSocket = null;
private Server server = null;
private Integer sessionId = 0;
private DownstreamThread downstream = null;
private UpstreamThread upstream = null;
private boolean sessionEnding = false;
private class DownstreamThread implements Runnable {
private BlockingQueue<DownstreamMessage> incomingMessages = null;
private OutputStreamWriter streamWriter = null;
private Session outer = null;
@Override
public void run () {
DownstreamMessage message;
Thread.currentThread ().setName ("DownstreamThread_" + outer.getId ());
try {
this.sendMessageToClient ("Hello, you are client " + outer.getId ());
while (!outer.sessionEnding) {
message = this.incomingMessages.take ();
this.sendMessageToClient (message.getPayload ());
}
this.sendMessageToClient ("Goodbye, client " + getId ());
} catch (InterruptedException | IOException ex) {
Logger.getLogger (DownstreamThread.class.getName ()).log (Level.SEVERE, ex.getMessage (), ex);
} finally {
this.terminate ();
}
}
public DownstreamThread acceptMessage (DownstreamMessage message) throws InterruptedException {
if (!outer.sessionEnding) {
this.incomingMessages.put (message);
}
return this;
}
private DownstreamThread sendMessageToClient (CharSequence message) throws IOException {
OutputStreamWriter osw;
if (null != (osw = this.getStreamWriter ())) {
osw.write ((String) message);
osw.write ("\r\n");
osw.flush ();
}
return this;
}
private DownstreamThread terminate () {
try {
this.streamWriter.close ();
} catch (IOException ex) {
Logger.getLogger (DownstreamThread.class.getName ()).log (Level.SEVERE, ex.getMessage (), ex);
}
this.streamWriter = null;
return this;
}
private OutputStreamWriter getStreamWriter () throws IOException {
if ((null == this.streamWriter)
&& (!outer.sessionEnding)) {
BufferedOutputStream os = new BufferedOutputStream (outer.clientSocket.getOutputStream ());
this.streamWriter = new OutputStreamWriter (os, "UTF8");
}
return this.streamWriter;
}
public DownstreamThread (Session outer) {
this.outer = outer;
this.incomingMessages = new LinkedBlockingQueue ();
System.out.println ("Class " + this.getClass () + " created");
}
}
private class UpstreamThread implements Runnable {
private Session outer = null;
@Override
public void run () {
StringBuffer inputBuffer = new StringBuffer ();
BufferedReader inReader;
Thread.currentThread ().setName ("UpstreamThread_" + outer.getId ());
try {
inReader = new BufferedReader (new InputStreamReader (outer.clientSocket.getInputStream (), "UTF8"));
while (!outer.sessionEnding) {
inputBuffer.delete (0, inputBuffer.length ());
inputBuffer.append (inReader.readLine ());
System.out.println ("Input message was: " + inputBuffer);
if (!inputBuffer.toString ().equals ("QUIT")) {
outer.server.acceptMessage (new UpstreamMessage (sessionId, inputBuffer.toString ()));
} else {
outer.sessionEnding = true;
}
}
} catch (IOException | InterruptedException e) {
Logger.getLogger (Session.class.getName ()).log (Level.SEVERE, e.getMessage (), e);
} finally {
outer.terminate ();
outer.server.deleteSession (outer.getId ());
}
}
public UpstreamThread (Session outer) {
this.outer = outer;
System.out.println ("Class " + this.getClass () + " created");
}
}
public void run ()
{
Thread upThread = new Thread (this.upstream);
Thread downThread = new Thread (this.downstream);
upThread.start ();
downThread.start ();
}
public Session acceptMessage (DownstreamMessage message) throws InterruptedException {
this.downstream.acceptMessage (message);
return this;
}
public Session acceptMessage (String message) throws InterruptedException {
return this.acceptMessage (new DownstreamMessage (this.getId (), message));
}
private void terminate () {
try {
this.clientSocket.close ();
} catch (IOException e) {
Logger.getLogger (Session.class.getName ()).log (Level.SEVERE, e.getMessage (), e);
}
}
public Integer getId () {
return this.sessionId;
}
public Session (Server owner, Socket clientSocket, Integer sessionId) throws IOException {
this.server = owner;
this.clientSocket = clientSocket;
this.sessionId = sessionId;
this.upstream = new UpstreamThread (this);
this.downstream = new DownstreamThread (this);
System.out.println ("Class " + this.getClass () + " created");
System.out.println ("Session ID is " + this.sessionId);
}
}