//*********************************************************************
// comm.cpp
//*********************************************************************
//
// This module performs all TCP/IP communications for farVIEW, and is
// the repository for the CComm, CListen, and CSession classes, along
// with a several supporting classes. All activities implemented by
// these classes are instigated by callers through specific command
// interfaces in the CComm object.
//
// CSession and CListen are derived from CThread, and, as such, have
// input ports for receiving commands. CListen, however, does not
// examine its inport since its purpose is to receive client request
// commands via its listenSocket.
//
// A command is an object derived from CItem so that it can be
// enqueued in the input port.
//
// Commands take three parameters:
//
// code a ulong containing up to four characters, as in
// 'ABCD', identifying the command to perform. See
// the specific commands (e.g., OpenComm) listed
// below for the actual codes.
//
// data an object derived from CItem that provides
// parameters for the implementation code. Several
// commands use a specialized object for this purpose.
// They are listed here.
//
// CCloseSession
// COpenConnect
// CCloseConnect
// CSendMessage
// CReceiveMessage
//
// These objects are used internally by the CComm
// package. Normal use of the CComm package is
// accomplished through the CComm API, which provides
// a small set of methods to perform these functions.
//
// result a CCommResult object that provides return
// information from a Comm request. One important
// return value is the result of the operation with
// which the result was associated. The result is set
// to FVE_INPROGRESS until the operation is either
// complete or is aborted. Then, the result is set to
// the appropriate value. Values have the form
// FVE_state. Valid constants are:
//
// const ulong FVE_INPROGRESS
// const ulong FVE_OK
// const ulong FVE_CMMDINVALID
// const ulong FVE_CMMDDATAINVALID
// const ulong FVE_CONNECTIONNOTFOUND
// const ulong FVE_CONNECTIONNOTOPEN
// const ulong FVE_SESSIONNOTFOUND
// const ulong FVE_SESSIONNOTOPEN
// const ulong FVE_SENDFAILED
// const ulong FVE_RECEIVEFAILED
// const ulong FVE_RECEIVEABORTED
// const ulong FVE_HEADERSINVALID
// const ulong FVE_SOCKETNOTOPEN
//
// An important feature of the return object is that
// it can be used to wait for the completion of the
// operation with which it is associated by
// specifying _wait = true when you construct the
// result object. This is recommended in most cases.
//
// All the information obtained by the ReceiveMessage
// command is returned in the CCommResult object.
//
// CComm clients use the following six CComm methods to control the
// communication process.
//
// OpenComm Creates a internet listener thread (if
// specified). Should be called at the time of
// InitInstance of the application.
//
// CloseComm Disconnects the Comm object from the
// internet and discards all Comm objects.
// Should be called at the time of
// TermInstance of the application.
//
// OpenSession Creates a session to communicate via a
// specified URL. The session is identified by
// a unique identifier (supplied by the
// caller.)
//
// CloseSession Closes the session specified by the
// enclosed session identifier.
//
// SendMessage Sends a message to a correspondent via the
// session identified by the enclosed session
// identifier.
//
// ReceiveMessage Waits to receive a message from a
// correspondent via the session identified by
// the enclosed session identifier.
//
// The CComm interface posts CSession commands to a specified CSession
// object via its input port. The CSession object performs the commands
// in the order presented in its input port using the accompanying
// CCommResult object to return results to CComm and the package user.
//
// Below is a list of the commands recognized by a CSession object.
//
// OpenConnect ('OC') Opens a (socket) connection to the
// correspondent for a specified session.
//
// CloseSession ('CS') Sends a message to the session to perform
// required shut-down procedures.
//
// SendMessage ('SM') Sends a message to the correspondent
// associated with the CSession object.
//
// ReceiveMessage ('RM') Waits to receive a message sent by a
// correspondent.
//
//*********************************************************************
//
// All farVIEW actions and their associated messages are described in
// a document called farhttp.txt.
//
// This communication subsystem supports a farVIEW system's ability to
// communicate with standard Internet HTTP servers as well as other
// farVIEW systems. farVIEW uses the standard HTTP messages to request
// pages from HTTP servers, and a FAR message to request actions from
// other farVIEW systems.
//
// A farVIEW system that has received a message requesting an activity
// uses the standard HTTP response (ACK) message to return data and to
// report errors. All headers used are standard HTTP.
//
// * The HTTP GET request message is used to access standard HTTP
// servers and has the standard format
//
// GET <URI> HTTP/1.0
//
// * The FAR request message is used to request farVIEW actions and
// has the following format:
//
// FAR /<cmmd>[/<argument>] HTTP/1.0
// Date: Thu, 10 Feb 2000 18:32:12 GMT
// Port: <portNr>
// From: <userName>
// Connection: keep-open | close
//
// The <cmmd> field is required. See comments in RMTBOOK.CPP for a
// list of the recognized commands.
//
// If a body is included in the message, the following is appended to
// the FAR message given above:
//
// Content-Type: <mimeType> ;<bodyType> [;E]
// Content-Length: <bodySize>
// <crlf>
// <signatureBlock>
// <body>
//
// The MIME types that farVIEW recognizes are listed below.
//
// text/html
// text/plain
// text/xml
// text/abst
// image/bmp
// image/gif
// image/jpg
// image/png
// audio/midi
// audio/mp3
// audio/wav
// video/?
// application/?
//
// This list is currently hardcoded but maybe should be moved out to
// the configuration file.
//
// The signature block is included only when the E-option is included
// in the Content-Type header; i.e., when the body is encrypted. The
// signature block has a fixed size, which is counted along with the
// body in the Content-Length header. Since the signature block is not
// yet implemented, I can't tell you how big it is.
//
// * A farVIEW response message has the following format:
//
// HTTP/1.0 <code> <reason>
// Server: farVIEW/3.x
// Date: Thu, 10 Feb 2000 18:32:13 GMT
// From: <userName>
// Connection: close
//
// Note that if a body is included in a response message, the Content-
// Type and Content-Length headers are also included as described
// above.
//
//*********************************************************************
#include "prefixb.h"
#include "book.h"
#include "comm.h"
#include "factory.h"
#include "inilist.h"
#include "record.h"
#include "socket.h"
#include "syslog.h"
const char fvNSSName[] = "127.0.0.1";
const short fvNSSPort = -1;
short pause = 100; // Waits up to 10 seconds for data
ulong szLimit = 2500000;
pCComm CComm::TheComm = 0;
//=====================================================================
// Comm result object class
//
//---------------------------------------------------------------------
// Default constructor sets to wait
//
CCommResult::CCommResult(void):
CAtom (),
result (FVE_INPROGRESS),
code (0),
mssg (0),
headers(0),
type (0),
body (0) {
cls = iCommResult;
wait = new CSemaphore();
} // CCommResult
//---------------------------------------------------------------------
//
CCommResult::CCommResult(bool _wait):
CAtom (),
result (FVE_INPROGRESS),
code (0),
mssg (0),
headers(0),
type (0),
body (0) {
cls = iCommResult;
if (_wait)
wait = new CSemaphore();
} // CCommResult
//---------------------------------------------------------------------
//
CCommResult::~CCommResult() {
delete[] mssg;
mssg = 0;
delete[] type;
type = 0;
} // ~CCommResult
//---------------------------------------------------------------------
//
RTTI_CPP(CommResult, Atom)
//---------------------------------------------------------------------
//
void CCommResult::Wait(void) {
if (wait)
wait->Wait();
} // Wait
//---------------------------------------------------------------------
//
void CCommResult::Release(void) {
if (wait)
wait->Release();
} // Release
//---------------------------------------------------------------------
//
void CCommResult::SetResult(ulong _result) {
result = _result;
} // SetResult
//---------------------------------------------------------------------
//
ulong CCommResult::GetResult(void) {
return result;
} // GetResult
//---------------------------------------------------------------------
//
void CCommResult::SetCode(ulong _code) {
code = _code;
} // SetCode
//---------------------------------------------------------------------
//
ulong CCommResult::GetCode(void) {
return code;
} // GetCode
//---------------------------------------------------------------------
//
void CCommResult::SetMssg(Pchar _mssg) {
STRINIT(mssg, _mssg)
} // SetMssg
//---------------------------------------------------------------------
//
Pchar CCommResult::GetMssg(void) {
return mssg;
} // GetMssg
//---------------------------------------------------------------------
//
void CCommResult::SetHead(RPCStrList _headers) {
headers = _headers;
} // SetHead
//---------------------------------------------------------------------
//
PCStrList CCommResult::GetHead(void) {
return headers;
} // GetHead
//---------------------------------------------------------------------
//
void CCommResult::SetType(Pchar _type) {
// Clean up the MIME type/subtype provided
char wrk1[sizeName];
Strcpy(wrk1, _type, sizeName);
CapStr(wrk1);
ChrCat(wrk1, ';', sizeName);
char wrk2[sizeName];
Before(";", wrk1, wrk2, sizeName);
Trim(wrk2);
STRINIT(type, wrk2)
} // SetType
//---------------------------------------------------------------------
//
Pchar CCommResult::GetType(void) {
return type;
} // GetType
//---------------------------------------------------------------------
//
void CCommResult::SetBody(RPCAtom _body) {
body = _body;
} // SetBody
//---------------------------------------------------------------------
//
PCAtom CCommResult::GetBody(void) {
return body;
} // GetBody
//=====================================================================
//
//---------------------------------------------------------------------
//
CCloseSession::CCloseSession(Pchar _sessionID):
CItem (),
sessionID(0) {
cls = iCloseSession;
Assert(!StrEmpty(_sessionID));
STRINIT(sessionID, _sessionID)
} // CCloseSession
//---------------------------------------------------------------------
//
CCloseSession::~CCloseSession() {
delete[] sessionID;
sessionID = 0;
} // ~CCloseSession
//---------------------------------------------------------------------
//
RTTI_CPP(CloseSession, Item)
//=====================================================================
//
//---------------------------------------------------------------------
//
COpenConnect::COpenConnect(Pchar _sessionID):
CItem (),
sessionID(0) {
cls = iOpenConnect;
Assert(!StrEmpty(_sessionID));
STRINIT(sessionID, _sessionID)
} // COpenConnect
//---------------------------------------------------------------------
//
COpenConnect::~COpenConnect() {
delete[] sessionID;
sessionID = 0;
} // ~COpenConnect
//---------------------------------------------------------------------
//
RTTI_CPP(OpenConnect, Item)
//=====================================================================
//
//---------------------------------------------------------------------
//
CSendMessage::CSendMessage(Pchar _sessionID,
Pchar _mssgType,
Pchar _command,
Pchar _argument,
Pchar _bodyType,
RPCBlock _signature,
RPCAtom _body,
disposition _life):
CItem (),
sessionID(0),
mssgType (0),
command (0),
argument (0),
bodyType (0),
signature(_signature),
body (_body),
life (_life) {
cls = iSendMessage;
Assert(!StrEmpty(_sessionID));
STRINIT(sessionID, _sessionID)
STRINIT(mssgType, _mssgType)
CapStr(mssgType);
STRINIT(command, _command)
CapStr(command);
STRINIT(argument, _argument)
STRINIT(bodyType, _bodyType)
} // CSendMessage
//---------------------------------------------------------------------
//
CSendMessage::~CSendMessage() {
delete[] sessionID;
sessionID = 0;
delete[] mssgType;
mssgType = 0;
delete[] command;
command = 0;
delete[] argument;
argument = 0;
delete[] bodyType;
bodyType = 0;
} // ~CSendMessage
//---------------------------------------------------------------------
//
RTTI_CPP(SendMessage, Item)
//=====================================================================
//
//---------------------------------------------------------------------
//
CReceiveMessage::CReceiveMessage(Pchar _sessionID,
bool _doRequest):
CItem (),
sessionID(0),
doRequest(_doRequest) {
cls = iReceiveMessage;
Assert(!StrEmpty(_sessionID));
STRINIT(sessionID, _sessionID);
} // CReceiveMessage
//---------------------------------------------------------------------
//
CReceiveMessage::~CReceiveMessage() {
delete[] sessionID;
sessionID = 0;
} // ~CReceiveMessage
//---------------------------------------------------------------------
//
RTTI_CPP(ReceiveMessage, Item)
//=====================================================================
//
//---------------------------------------------------------------------
//
CCommCmmd::CCommCmmd(ulong _cmmd,
RPCItem _data,
RPCCommResult _result):
CItem (),
cmmd (_cmmd),
data (_data),
result (_result) {
cls = iCommCmmd;
} // CCommCmmd
//---------------------------------------------------------------------
//
CCommCmmd::~CCommCmmd() {
} // ~CCommCmmd
//---------------------------------------------------------------------
//
RTTI_CPP(CommCmmd, Item)
//---------------------------------------------------------------------
//
void CCommCmmd::SetResult(ulong _result) {
if (result)
result->SetResult(_result);
} // SetResult
//=====================================================================
// Session dictionary
//
//---------------------------------------------------------------------
//
CSessionDict::CSessionDict(short _wide):
CHshList(_wide) {
cls = iSessionDict;
cs = new CCriticalSection();
} // CSessionDict
//---------------------------------------------------------------------
//
RTTI_CPP(SessionDict, HshList)
//---------------------------------------------------------------------
//
void CSessionDict::InsertSession(Pchar _sessionID,
RPCSession _session) {
cs->Enter();
PCItem item = TYPECAST(_session, Item);
CHshList::Insert(_sessionID, item);
cs->Leave();
} // InsertSession
//---------------------------------------------------------------------
//
PCSession CSessionDict::GetFirstSession(Pchar _sessionID) {
cs->Enter();
PCItem item = CHshList::GetFirstItem(_sessionID);
cs->Leave();
if (item)
return TYPECAST(item, Session);
else
return 0;
} // GetFirstItem
//---------------------------------------------------------------------
//
PCSession CSessionDict::RemoveSession(Pchar _sessionID) {
cs->Enter();
PCItem item = CHshList::GetFirstItem(_sessionID);
if (item) {
CHshList::Remove();
cs->Leave();
return TYPECAST(item, Session);
}
else {
cs->Leave();
return 0;
}
} // RemoveSession
//---------------------------------------------------------------------
// Hash the specified key into an unsigned integer.
//
ushort CSessionDict::hash(Pvoid _key) {
ulong h = 0;
Pchar key = Pchar(_key);
while (*key) {
char c = MapChar(*key++);
if (c)
h = h * 0x2B + c;
}
return ushort(h % wide);
} // hash
//=====================================================================
// farVIEW TCP/IP session thread class. When acting as a server, this
// class receives control from the CListen thread to process the
// client's request message. A server class object is usually derived
// from CSession to respond to the request message. The CSession
// object handles all the communication part of the server activity.
//
// A CSession object also acts on behalf of a client to send and/or
// receive one or more messages to a server. The CSession object
// receives all its requests through its input port, usually placed
// there by the CComm object on behalf of user code.
//=====================================================================
//---------------------------------------------------------------------
//
CSession::CSession(pCComm _comm,
RPCAtom _book,
Pchar _userName,
Pchar _domain,
Pchar _sessionID,
Pchar _hostIP,
long _hostPort,
long _listenPort,
short _szPort):
CThread (_szPort),
error (false),
comm (_comm),
book (_book),
userName (0),
domain (0),
sessionID (0),
hostIP (0),
hostPort (_hostPort),
listenPort(_listenPort),
typeName (0),
result (0) {
cls = iSession;
//?Assert(_book);
Assert(_userName);
Assert(!StrEmpty(_sessionID));
Assert(!StrEmpty(_hostIP));
//?Assert(hostPort > 0); // host sessions always have this zero
STRINIT(userName, _userName)
STRINIT(domain, _domain)
STRINIT(sessionID, _sessionID)
STRINIT(hostIP, _hostIP);
} // CSession
//---------------------------------------------------------------------
//
CSession::~CSession() {
delete[] userName;
userName = 0;
delete[] domain;
domain = 0;
delete[] sessionID;
sessionID = 0;
delete[] hostIP;
hostIP = 0;
delete[] typeName;
typeName = 0;
} // ~CSession
//---------------------------------------------------------------------
//
RTTI_CPP(Session, Thread)
//---------------------------------------------------------------------
// Send error acknowledge to the client
//
void CSession::ackError(Pchar _msg, ...) {
Log('K', "CSession.ackError: * Entry");
Assert(socket);
// Format the message
va_list args;
va_start(args, _msg);
char work[sizeString];
vsprintf(work, _msg, args);
va_end(args);
// Send them the error message
char msg[sizeString];
sprintf(msg, "HTTP/1.0 %s\r\n", work);
Log('D', "CSession.ackError: * Sent %s", msg);
if (!(socket->Send(msg, strlen(msg))))
Log('D',
"CSession.ackError: * Socket error on sending %s (%s)",
msg,
socket->GetLastErrorMessage());
/*?
// Tell them to shut down
sprintf(msg, "Connection: close\r\n");
if (!(socket->Send(msg, strlen(msg))))
Log('D',
"CSession.ackError: * Socket error on sending %s (%s)",
msg,
socket->GetLastErrorMessage());
?*/
// Send a blank line to terminate the message
msg[0] = '\r';
msg[1] = '\n';
msg[2] = 0;
if (!(socket->Send(msg, 2)))
Log('D',
"CSession.ackError: * Socket error on sending %s (%s)",
msg,
socket->GetLastErrorMessage());
} // ackError
//---------------------------------------------------------------------
// Already connected
//
void CSession::Socket(RPCSocket _socket) {
socket = _socket;
} // Socket
//---------------------------------------------------------------------
Pchar CSession::CommID(bool _alc) {
if (_alc) {
Pchar p = new char[strlen(sessionID)+1];
strcpy(p, sessionID);
return p;
}
return sessionID;
} // CommID
//---------------------------------------------------------------------
// Use only in farSlang
//
Pchar CSession::CommID(void) {
return CommID(true);
} // CommID
//---------------------------------------------------------------------
Pchar CSession::CommIP(bool _alc) {
if (_alc) {
Pchar p = new char[strlen(hostIP)+1];
strcpy(p, hostIP);
return p;
}
return hostIP;
} // CommIP
//---------------------------------------------------------------------
// Use only in farSlang
//
Pchar CSession::CommIP(void) {
return CommIP(true);
} // CommIP
//---------------------------------------------------------------------
long CSession::CommPort(void) {
return hostPort;
} // CommPort
//---------------------------------------------------------------------
// Generate a CloseSession command and enqueue it for later processing.
//
void CSession::Close(void) {
Log('K', "CSession.Close: * Entry");
PCItem temp = new CItem();
PCCommResult rslt = new CCommResult(/*_wait*/false);
PCItem command = new CCommCmmd('C', temp, rslt);
GetPort()->Post(command);
} // Close
//---------------------------------------------------------------------
// This method must be overriden to actually encrypt the body.
//
bool CSession::doEncrypt(RPCStrList _headers,
RPCBlock _signature,
RPCBlock _body) {
_headers;
_signature;
_body;
Log('K', "CSession.Close: * Entry");
Assert(false);
return false;
} // doEncrypt
//---------------------------------------------------------------------
// Send the composited message to the correspondent.
//
bool CSession::sendMessage(RPCStrList _head,
RPCBlock _signature,
RPCBlock _body) {
Log('K', "CSession.sendMessage: * Entry");
Assert(socket);
// Dump the headers to the log file
if (_head->GotoHead()) do {
Log('K',
"CSession.sendHeaders: <%s>",
_head->StrItem(false));
} while (_head->GotoNext());
// Send the headers
PCList list = TYPECAST(_head, List);
if (!(socket->Send(list))) {
Log('D',
"CSession.sendMessage: * Could not send headers (%s)",
socket->GetLastErrorMessage());
return false;
}
// Send a blank line that forms the headers/body divider
char crlf[3];
crlf[0] = '\r';
crlf[1] = '\n';
crlf[2] = 0;
if (!(socket->Send(crlf, 2))) {
Log('D',
"CSession.sendMessage: * Could not send "
"headers-body divider (%s)",
socket->GetLastErrorMessage());
return false;
}
// Send the signature
if (_signature && _signature->GetSize() > 0) {
Pchar p = _signature->Lock();
if (!(socket->Send(p, _signature->GetSize()))) {
Log('D',
"CSession.sendMessage: * Could not send signature (%s)",
socket->GetLastErrorMessage());
return false;
}
}
// Send the body
if (_body && _body->GetSize() > 0) {
Pchar p = _body->Lock();
if (!(socket->Send(p, _body->GetSize()))) {
Log('D',
"CSession.sendMessage: * Could not send body (%s)",
socket->GetLastErrorMessage());
return false;
}
}
return true;
} // sendMessage
//---------------------------------------------------------------------
// Prepare the body for sending to the correspondent. Append the
// Content-Type and Content-Length headers if the body is not empty.
//
PCBlock CSession::makeBody(RPCStrList _headers,
RPCSendMessage _sendMssg) {
Log('K', "CSession.makeBody: * Entry");
// Check for a body to send
if (!(_sendMssg->body))
return 0;
// Only allow to send bodies derived from CAtom
if (!(_sendMssg->body->Isa(iAtom))) {
Log('D',
"CSession.makeBody: * Body is not derived from CAtom");
return 0;
}
// Convert the body to a CBlock
PCBlock body;
if (_sendMssg->body->Isa(iBlock))
body = TYPECAST(_sendMssg->body, Block);
else if (_sendMssg->body->Isa(iStrList)) {
PCStrList bdy = TYPECAST(_sendMssg->body, StrList);
char line[sizeString];
Strcpy(line, bdy->StrItem(1, false), sizeString);
Log('K', "CSession.makeBody: %s", line);
body = new CBlock();
PCAtom pbdy = TYPECAST(body, Atom);
if (!_sendMssg->body->ToBlock(pbdy)) {
Log('D',
"CSession.makeBody:"
" * Failed to convert CStrList body to CBlock");
return 0;
}
}
else {
Log('D',
"CSession.makeBody:"
" * Message body must be CStrList or CBlock");
return 0;
}
//------------------------------------------------------------------
// Let the user encrypt the body and the signature
doEncrypt(_headers, _sendMssg->signature, body);
char line[sizeString];
// Append the type header
if (!StrEmpty(_sendMssg->bodyType)) {
Strcpy(line, "Content-Type: ", sizeString);
Strcat(line, _sendMssg->bodyType, sizeString);
_headers->StrInsert(line);
}
// Compute the size of the body and append the size header
ulong size = 0;
if (_sendMssg->signature)
size += _sendMssg->signature->GetSize();
size += body->GetSize();
sprintf(line, "Content-Length: %ld", size);
_headers->StrInsert(line);
return body;
} // makeBody
//---------------------------------------------------------------------
// Date: Thu, 10 Feb 2000 18:32:13 GMT
// From: <userName>
// Connection: keep-open | close
//
void CSession::makeComm(RPCStrList _headers,
RPCSendMessage _sendMssg) {
Log('K', "CSession.makeComm: * Entry");
char line[sizeString];
// Append the message date (and time)
Strcpy(line, "Date: ", sizeString);
GetGMT(line+6, sizeString-6);
_headers->StrInsert(line);
// Append the port number
if (listenPort) {
sprintf(line, "Port: %d", listenPort);
_headers->StrInsert(line);
}
// Append the user name
if (!StrEmpty(userName)) {
Strcpy(line, "From: ", sizeString);
Strcat(line, userName, sizeString);
_headers->StrInsert(line);
}
// Append the connection type
Strcpy(line, "Connection: ", sizeString);
if (_sendMssg->life == keepAlive) {
Strcat(line, "keep-alive", sizeString);
}
else {
Strcat(line, "close", sizeString);
}
_headers->StrInsert(line);
} // makeComm
//---------------------------------------------------------------------
// <command> <argument> HTTP/1.0
//
// where
//
// <command> is CONNECT | DELETE | GET | HEAD | POST | PUT | TRACE
//
bool CSession::sendHTTP(RPCSendMessage _sendMssg) {
Log('K', "CSession.sendHTTP: * Entry");
Assert(!StrEmpty(hostIP));
PCStrList headers = new CStrList();
char line[sizeString];
// Default HTTP message type is "GET" not coded correctly for POST
if (StrEmpty(_sendMssg->command)) {
Strcpy(line, "GET", sizeString);
}
else
Strcpy(line, _sendMssg->command, sizeString);
Strcat(line, " ", sizeString);
// get resource from http://domain:port/resource
PCRecord url = new CRecord(_sendMssg->argument, "/");
char host[sizeString];
int i = ChrPos(':', host);
if (i > 0)
host[i] = 0;
url->GetField(3, host, sizeLine);
url->DeleteField(1);
url->DeleteField(1);
url->DeleteField(1);
Pchar resource = url->GetDelimStrD(false);
Strcat(line, resource, sizeString);
Strcat(line, " HTTP/1.0", sizeString);
headers->StrInsert(line);
// headers->StrInsert("Accept: image/gif, image/x-xbitmap,"
// " image/jpeg, image/pjpeg, application/vnd.ms-excel,"
// " application/msword, application/vnd.ms-powerpoint,"
// " *");
headers->StrInsert("Accept-Language: en-us");
headers->StrInsert("Accept-Encoding: gzip, deflate");
headers->StrInsert("User-Agent: Mozilla/4.0");
Strcpy(line, "Host: ", sizeString);
Strcat(line, host, sizeString);
headers->StrInsert(line);
headers->StrInsert("");
// Process the body if any
PCBlock body = makeBody(headers, _sendMssg);
// Send the message to the correspondent
if (sendMessage(headers, _sendMssg->signature, body)) {
Log('L', "CSession.sendHTTP: Sent <%s> (sz=0) to %s",
line, hostIP);
result->SetResult(FVE_OK);
return true;
}
else {
result->SetResult(FVE_SENDFAILED);
return false;
}
} // sendHTTP
//---------------------------------------------------------------------
// HTTP/1.0 <code> <reason>
// Server: farVIEW/3.x
//
bool CSession::sendACK(RPCSendMessage _sendMssg) {
Log('K', "CSession.sendACK: * Entry");
Assert(!StrEmpty(hostIP));
PCStrList headers = new CStrList();
char line[sizeString];
// Append the ACK line
Strcpy(line, "HTTP/1.0 ", sizeString);
Strcat(line, _sendMssg->argument, sizeString);
headers->StrInsert(line);
// Append the server line
headers->StrInsert("Server: farVIEW/3.x");
// Append the common header lines
makeComm(headers, _sendMssg);
// Initialize the body part, if any
PCBlock body = makeBody(headers, _sendMssg);
// Send the message to the farVIEW comm thread
bool rslt = sendMessage(headers, _sendMssg->signature, body);
// Check to disconnect the socket
if (_sendMssg->life != keepAlive)
socket->Disconnect();
if (rslt) {
if (body)
Log('L', "CSession.sendACK: Sent <%s> (sz=%ld) to %s",
line, body->GetSize(), hostIP);
else
Log('L', "CSession.sendACK: Sent <%s> (sz=0) to %s",
line, hostIP);
result->SetResult(FVE_OK);
return true;
}
else {
result->SetResult(FVE_SENDFAILED);
return false;
}
} // sendACK
//---------------------------------------------------------------------
// FAR /<cmmd>[/<argument>] HTTP/1.0
//
bool CSession::sendFAR(RPCSendMessage _sendMssg) {
Log('K', "CSession.sendFAR: * Entry");
Assert(!StrEmpty(hostIP));
PCStrList headers = new CStrList();
char line[sizeString];
// Append the FAR line
Strcpy(line, "FAR /", sizeString);
Strcat(line, _sendMssg->command, sizeString);
if (!StrEmpty(_sendMssg->argument)) {
ChrCat(line, '/', sizeString);
Strcat(line, _sendMssg->argument, sizeString);
}
Strcat(line, " HTTP/1.0", sizeString);
headers->StrInsert(line);
// Append the common header lines
makeComm(headers, _sendMssg);
// Initialize the body part, if any
PCBlock body = makeBody(headers, _sendMssg);
// Send the message to the farVIEW comm thread
if (sendMessage(headers, _sendMssg->signature, body)) {
if (body)
Log('L', "CSession.sendFAR: Sent <%s> (sz=%ld) to %s",
line, body->GetSize(), hostIP);
else
Log('L', "CSession.sendFAR: Sent <%s> (sz=0) to %s",
line, hostIP);
result->SetResult(FVE_OK);
return true;
}
else {
result->SetResult(FVE_SENDFAILED);
return false;
}
} // sendFAR
//---------------------------------------------------------------------
// Formulate a message from its parts, then send it. The type of the
// message to be built is determined by the command. Two commands are
// reserved: HTTP to communicate with an HTTP server, and ACK to
// respond to a previously received request. Any other command is sent
// using the FAR corruption of HTTP.
//
// Encrypt outgoing messages in the following way:
// 1. Hash the body b giving H
// 2. Create a throwaway key K
// 3. Use K to encrypt the body b giving K(b)
// 3a.How about using K to generate a base-64 transform table? And then
// map K(b) to base-64 using the scrambled table.
// 4. Use myPrivateKey to encrypt GMT+H+K+param giving p
// 5. Use herPublicKey to encrypt myUserName+p giving P
// 6. Send headers + line_break + P + K(b).
// The param value is a random number in messages from the client, and
// is ignored by the server, but it is the client's hash in acknowledge
// messages so that the client can verify that the server received the
// message that the client sent.
//
// The acknowledge message should be encrypted only if the request
// message was. The encrypted acknowledge should include the original
// hash for verification.
//
// Decrypt incoming messages in the following way:
// 1. Use myPrivateKey to decrypt P, giving herUserName+p
// 2. Validate herUserName.
// 3. Use herPublicKey to decrypt p giving GMT+H+K+param
// 4. Validate GMT and H.
// 5. Log receipt storing herUserName+GMT
// 5a.Use K to reproduce the base-64 transform table. And then map the
// base-64 code back into K(b)
// 6. Use K to decrypt the body K(b) giving b
// 7. Hash b to validate with H.
// (note: this would verify that she received something, but how to
// tell what it was she received! she should return the original hash
// as the param value.)
//
// Could this be sent as an XML message? Two parts: the protocol part
// and the payload part. Ok, how about this?
//
// http-headers
//
// <far-message>
// <far-head>...</far-head>
// <far-body>...</far-body>
// </far-message>
//
// redesign the messages that you already set up for file xfer to use
// this model, or something similar.
//
bool CSession::Send(RPCSendMessage _sendMssg) {
Log('K', "CSession.send: * Entry");
if (_sendMssg->body && _sendMssg->body->Isa(iStrList)) {
PCStrList list = TYPECAST(_sendMssg->body, StrList);
list->Save("sendbody.log");
}
// Determine the message type
if (strcmp(_sendMssg->mssgType, "HTTP") == 0)
return sendHTTP(_sendMssg);
else if (strcmp(_sendMssg->mssgType, "ACK") == 0)
return sendACK (_sendMssg);
else if (strcmp(_sendMssg->mssgType, "FAR") == 0)
return sendFAR (_sendMssg);
else
Assert(false);
return false;
} // Send
//---------------------------------------------------------------------
// This method must be overridden to actually decrypt the body received.
//
bool CSession::doDecrypt(RPCStrList _headers, RPCBlock _body) {
_headers;
_body;
Log('K', "CSession.doDecrypt: Entry");
Assert(false);
return false;
} // doDecrypt
//---------------------------------------------------------------------
//
void CSession::getStrValue(Pchar _line) {
strcpy(_line, _line+ChrPos(':', _line)+1);
Trim(_line);
} // getStrValue
//---------------------------------------------------------------------
// Get an integer value from a header line.
//
long CSession::getIntValue(Pchar _line) {
getStrValue(_line);
return StrToLong(_line);
} // getIntValue
//---------------------------------------------------------------------
//
void CSession::getHeaderStrValue(RPCStrList _head,
Pchar _name,
Pchar _value,
ushort _size) {
if (_head->FindFirstString(_name)) {
char work[sizeString];
_head->StrItem(work, sizeLine);
getStrValue(work);
Strcpy(_value, work,_size);
}
else
_value[0] = 0;
} // getHeaderStrValue
//---------------------------------------------------------------------
//
long CSession::getHeaderIntValue(RPCStrList _head, Pchar _name) {
char work[sizeString];
getHeaderStrValue(_head, _name, work, sizeLine);
if (StrEmpty(work))
return 0;
else
return StrToLong(work);
} // getHeaderIntValue
//---------------------------------------------------------------------
// Translate standard MIME type (of those that I support, or hope to
// support, anyway) to the farVIEW identifier for the CAtom container
// to hold the content.
//
struct {
char type[32];
ushort form;
} MIMETypes[] = {{"TEXT/HTML", iBlock},
{"TEXT/PLAIN", iBlock},
{"TEXT/XML", iBlock},
{"TEXT/ABST", iManifest},
{"IMAGE/BMP", iBlock},
{"IMAGE/GIF", iBlock},
{"IMAGE/JPG", iBlock},
{"IMAGE/PNG", iBlock},
{"AUDIO/MIDI", iBlock},
{"AUDIO/MP3", iBlock},
{"AUDIO/WAV", iBlock},
{"VIDEO/?", 0},
// for generic file xfer
{"APPLICATION/XML", iBlock},
{"APPLICATION/BINARY", iBlock},
{"", 0}};
ushort CSession::getFVType(Pchar _MIMEType) {
// Clean up the MIMEType provided
char work[sizeName];
Strcpy(work, _MIMEType, sizeName);
CapStr(work);
ChrCat(work, ';', sizeName);
char MIMEType[sizeName];
Before(";", work, MIMEType, sizeName);
Trim(MIMEType);
// Search the table for the container code
int i = 0;
while (!StrEmpty(MIMETypes[i].type))
if (strcmp(MIMEType, MIMETypes[i].type) == 0)
return MIMETypes[i].form;
else
i++;
return 0;
} // getFVType
//---------------------------------------------------------------------
// Read the message header into an strlist. After obtaining the
// header, validate that the minimum required headers are present.
//
PCStrList CSession::receiveHeaders(void) {
Log('K', "CSession.receiveHeaders Entry");
Assert(socket);
PCStrList headers = new CStrList();
char line[sizeString];
line[0] = 0;
// Read the first header line
if (!(socket->GetLine(line, sizeString, /*_block*/true))) {
Log('K',
"CSession.recvHeaders: * Failed to read header line");
return 0;
}
// Read the remaining header lines from the socket into a
// strlist. Stop reading when an empty line is obtained
while (strlen(line)) {
headers->StrInsert(line);
if (!(socket->GetLine(line, sizeString, /*_block*/false))) {
Log('K',
"CSession.recvHeaders: * Failed to read header line");
return 0;
}
}
Log('K', "CSession.recvHeaders: Read headers");
// Dump the headers to the log file
if (headers->GotoHead()) do {
Strcpy(line, headers->StrItem(false), sizeString);
Log('K', "CSession.recvHeaders: <%s>", line);
} while (headers->GotoNext());
return headers;
} // receiveHeaders
//---------------------------------------------------------------------
// Receive the body as a memory block, then convert it to the specified
// CAtom object. (note for later: should cache the body to disk if it
// is above some size? or maybe should always cache it? or maybe always
// require that it not be so large that it busts memory (current
// assumption). So if you need to send a file, you send it as a series
// of blocks? another note: I think that the file transfer protocol
// should be handled at a higher level from this level. Will want to
// worry about block numbers and complete block sets, etc. Much later:
// Ok, see Server.sendFile and Server.ReceiveFile
//
PCAtom CSession::receiveBody(long _szBody, RPCStrList _headers) {
Log('K', "CSession.receiveBody: Entry");
Assert(socket);
// Collect the body of the request into a single memory block
Log('K', "CSession.receiveBody: Reading %d bytes", _szBody);
long szBody = _szBody;
PCBlock block = new CBlock();
block->Alloc(szBody+1);
Pchar p = block->Lock();
// Collect the pieces as they arrive
PCBlock msg = new CBlock();
ulong iBuf = 0;
short timeOut = pause; // Wait awhile for data
while (szBody > 0) {
// Receive a portion of the expected message
long size = socket->Receive(msg, szBody);
if (size == 0) {
int err = socket->GetLastError();
if (err == WSAECONNABORTED) {
result->SetResult(FVE_RECEIVEABORTED);
return 0;
}
else {
char mssg[sizeString];
Strcpy(mssg, socket->GetLastErrorMessage(), sizeString);
if (strlen(mssg) && strcmp(mssg, "(null)")) {
ackError("500 Internal Service Error - "
"Failed to receive data");
Log('D',
"CSession.receiveBody: * Error <%s> while reading body",
mssg);
result->SetResult(500);
return 0;
}
}
// May need more time for the full message to arrive
if (timeOut-- == 0) {
// Failed to receive all the data
ackError("500 Internal Service Error - "
"Waited too long for data");
Log('D',
"CSession.receiveBody: * Timed out "
"waiting for data from %s",
hostIP);
result->SetResult(500);
return 0;
}
Pause(10);
continue;
}
if (size > szBody) {
msg->Lock();
ackError("500 Internal Service Error - "
"Size of data exceeds Content-Length %ld", _szBody);
Log('D',
"CSession.receiveBody: * Size of "
"data from %s larger than "
"Size-header specified %ld > %ld",
hostIP, size, szBody);
result->SetResult(500);
return 0;
}
timeOut = pause;
Pchar m = msg->Lock();
memcpy(p+iBuf, m, size);
msg->Free();
szBody -= size;
iBuf += size;
}
//------------------------------------------------------------------
// Let the user decrypt the body and the signature
doDecrypt(_headers, block);
//------------------------------------------------------------------
// Convert the block to a CAtom object of the appropriate type
char typeName[sizeString];
getHeaderStrValue(_headers, "Content-Type:", typeName, sizeString);
if (StrEmpty(typeName)) {
ackError("400 Bad Request - Content-Type Missing");
Log('D',
"CSession.receiveBody: * Could not "
"find the Content-Type header");
result->SetResult(400);
return 0;
}
// Translate the contentType (type/subtype) into the farVIEW
// container class into which to store the content
ushort type = getFVType(typeName);
result->SetType(typeName);
// Validate the type code
if (!type) {
ackError("400 Bad Request - MIME Type Not Recognized");
Log('D',
"CSession.receiveBody: * MIME Type %s not recognized",
typeName);
result->SetResult(400);
return 0;
}
// Create a body object of the specified type
PCAtom body = CFactory::TheFactory->MakeAtom(type);
if (!body) {
ackError("400 Bad Request - MIME Type Not Recognized");
Log('D',
"CSession.receiveBody: * Could not make a body "
"object of type %s",
typeName);
result->SetResult(400);
return 0;
}
// Convert the block contents to the specified body type
PCAtom blck = TYPECAST(block, Atom);
if (!(body->BlockTo(blck))) {
ackError("500 Internal Server Error - "
"Could not convert type of data");
Log('D',
"CSession.receiveBody: * Could not "
"convert body to type %s",
typeName);
result->SetResult(500);
return 0;
}
// Temporary
//?ackError("200 OK");
body->Save("recvbody.log");
Log('K', "CSession.receiveBody: Body was received ok");
// Dump the first line if the body is a string list (which is the
// usual case) and is not empty
if (body->Isa(iStrList)) {
PCStrList b = TYPECAST(body, StrList);
if (b->GotoHead()) {
p = b->StrItem(false);
Log('K', "CSession.receiveBody: %s", p);
}
}
return body;
} // receiveBody
//---------------------------------------------------------------------
// Must override this method to actually do anything with the data
// received.
//
bool CSession::doRequest(RPCStrList _headers, RPCAtom _body) {
_headers;
_body;
Log('K', "CSession.doRequest: Entry");
Assert(false);
return false;
} // doRequest
//---------------------------------------------------------------------
// Receive an expected message. Messages are assumed to have a standard
// HTTP form of one or more headers, followed by a line break, followed
// possibly by content, the length of which is determined by the
// Content-Length header.
//
bool CSession::Receive(RPCReceiveMessage _recvMssg) {
Log('K',
"CSession.Receive: Receiving from %s",
hostIP);
bool isResponse = false;
ulong err = 200; // = OK
char line[sizeString];
char head[sizeString];
head[0] = 0;
long szBody;
//------------------------------------------------------------------
// Process the headers
//
// Collect the header portion of the message
PCStrList headers = receiveHeaders();
// Should have received headers
if (!headers) {
int err = socket->GetLastError();
if (err == WSAECONNABORTED) {
result->SetResult(FVE_RECEIVEABORTED);
return true;
}
else {
Log('D',
"CSession.Receive: * No headers received");
result->SetResult(FVE_RECEIVEFAILED);
return false;
}
}
// Validate the first header
if (headers->GotoHead()) {
headers->StrItem(line, sizeString);
Squeeze(line, ' ');
Strcpy(head, line, sizeString);
if (StrPos("FAR", line) < 0) {
// form: HTTP/1.x ccc mssg
isResponse = true;
// Validate response headers
Squeeze(line, ' ');
PCRecord rec = new CRecord(line, " ");
err = rec->GetLongField(2);
if (err != 0 && (err < 200 || err > 299))
Log('D',
"CSession.Receive: * Response %d indicates error",
err);
result->code = err;
rec->DeleteField(1);
rec->DeleteField(1);
rec->GetDelimStr(line, sizeString);
Trim(line);
Pchar mssg = new char[strlen(line)+1];
strcpy(mssg, line);
result->SetMssg(mssg);
}
// Validate the FAR header
// form: FAR url HTTP/1.0
else if (StrPos("FAR", line) < 0 ||
StrPos("HTTP/1.0", line) < 0) {
ackError("400 Bad Request - Invalid Header");
Log('D',
"CSession.Receive: * Header <%s> invalid",
line);
result->SetResult(FVE_HEADERSINVALID);
return false;
}
}
//------------------------------------------------------------------
// Process the body. If the Content-Length header is present and
// specifies a non-zero size, the body is expected as well as the
// Content-Type header.
//
// Get the size of the body in the request
if (err == 200) {
szBody = getHeaderIntValue(headers, "Content-Length:");
if (!szBody) {
result->SetHead(headers);
PCAtom body;
result->SetBody(body);
result->SetResult(FVE_OK);
return true;
}
// Check for too big. Sender should only send "small" blocks
if (szBody > szLimit) {
Log('D',
"CSession.Receive: * Body length exceeds "
"program's size limit (%d)", szBody);
if (!isResponse)
ackError("413 Request Entity Too Large");
result->SetResult(FVE_HEADERSINVALID);
return false;
}
// Get the body from the sender
if (szBody) {
PCAtom body = receiveBody(szBody, headers);
if (!body) {
if (result->GetResult() == FVE_RECEIVEABORTED)
return true;
else {
Log('D',
"CSession.Receive: * Failed to "
"receive the body from %s",
hostIP);
if (!isResponse)
ackError("400 Bad Request - Error In Body");
result->SetResult(FVE_RECEIVEFAILED);
return false;
}
}
// Process the message with a body
else if (_recvMssg->doRequest) { // Process the message
Log('L',
"CSession.Receive.1:"
" Received <%s> (sz=%ld) from %s",
head, szBody, hostIP);
doRequest(headers, body);
}
else { // Return the message
Log('L',
"CSession.Receive.2:"
" Received <%s> (sz=%ld) from %s",
head, szBody, hostIP);
getHeaderStrValue(headers, "Content-Type:", line, sizeString);
result->SetType(line);
result->SetHead(headers);
result->SetBody(body);
}
result->SetResult(FVE_OK);
return true;
}
}
// Process the message without a body
else if (_recvMssg->doRequest) { // Process the message
Log('L', "CSession.Receive.3: Received <%s> from %s",
head, hostIP);
PCAtom body;
doRequest(headers, body);
}
else { // Return the message
Log('L', "CSession.Receive.4: Received <%s> from %s",
head, hostIP);
result->SetHead(headers);
PCAtom body;
result->SetBody(body);
}
result->SetResult(FVE_OK);
return true;
} // Receive
//---------------------------------------------------------------------
// Receive and handle commands enqueued in the input port
//
void CSession::Run(void) {
Log('D',
"CSession.Run: Starting session %s with %s:%d",
sessionID,
hostIP,
hostPort);
while (true) {
// Get the next request
Log('K', "CSession.Run: Waiting for command");
PCItem cmd = GetPort()->Receive();
PCCommCmmd cmmd = TYPECAST(cmd, CommCmmd);
Assert(cmmd->Isa(iCommCmmd));
// Save the user's result object for notification later
result = cmmd->GetCommResult();
if (!result)
result = new CCommResult(/*_wait*/false);
//---------------------------------------------------------------
// Handle OpenConnect command
//
if (cmmd->cmmd == 'OC') {
Log('K',
"CSession.Run:"
" Received OpenConnect -connect command for %s",
sessionID);
Assert(!socket);
socket = new CSocket();
if (socket->ClientConnect(hostIP, hostPort)) {
Log('K',
"CSession.Run: Connect to %s:%d succeeded for %s",
hostIP,
hostPort,
sessionID);
result->SetResult(FVE_OK);
}
else {
Log('D',
"CSession.Run: * Connect to %s:%d failed for %s",
hostIP,
hostPort,
sessionID);
socket = 0;
result->SetResult(FVE_CONNECTIONNOTOPEN);
}
if (GetPort()->Empty())
result->Release();
} // OpenConnect command
//---------------------------------------------------------------
// Handle CloseSession command
//
else if (cmmd->cmmd == 'CS') {
Log('K',
"CSession.Run:"
" Received CloseSession command for %s",
sessionID);
break; // Quits the session
} // CloseSession command
//---------------------------------------------------------------
// Handle SendMessage command
//
else if (cmmd->cmmd == 'SM') {
Log('K',
"CSession.Run:"
" Received SendMessage command for %s",
sessionID);
if (socket) {
PCSendMessage sendMssg = TYPECAST(cmmd->data, SendMessage);
// Send the specified message
if (!Send(sendMssg))
Log('D',
"CSession.Run:"
" * SendMessage failed for %s",
sessionID);
}
else
result->SetResult(FVE_SOCKETNOTOPEN);
Log('K',
"CSession.Run:"
" SendMessage processing complete for %s",
sessionID);
if (GetPort()->Empty())
result->Release();
} // SendMessage
//---------------------------------------------------------------
// Handle ReceiveMessage command
//
else if (cmmd->cmmd == 'RM') {
Log('K',
"CSession.Run:"
" Received ReceiveMessage command for %s",
sessionID);
if (socket) {
PCReceiveMessage recvMssg = TYPECAST(cmmd->data,
ReceiveMessage);
if (!Receive(recvMssg))
Log('D',
"CSession.Run:"
" * ReceiveMessage failed for %s",
sessionID);
}
else
result->SetResult(FVE_SOCKETNOTOPEN);
Log('K',
"CSession.Run:"
" ReceiveMessage processing complete for %s",
sessionID);
if (GetPort()->Empty())
result->Release();
} // ReceiveMessage
//---------------------------------------------------------------
// Handle unrecognized command
//
else {
Log('D',
"CSession.Run:"
" * Received unrecognized command (%ld) for %s",
cmmd->cmmd,
sessionID
);
cmmd->SetResult(FVE_CMMDINVALID);
Assert(false);
} // Unrecognized command
}
//------------------------------------------------------------------
// Shut-down procedure follows
Log('L',
"CSession.Run: Stopping session %s",
sessionID);
if (socket) {
socket->Disconnect();
socket = 0;
}
Log('K',
"CSession.Run:"
" Disconnect from %s:%d succeeded for %s",
hostIP,
hostPort,
sessionID);
result->SetResult(FVE_OK);
result->Release();
} // Run
//=====================================================================
// farVIEW TCP/IP listener thread class. Listens for incoming messages.
// When a message is detected, the listener thread spawns a CSession
// thread object to handle the client's request.
//=====================================================================
//---------------------------------------------------------------------
//
CListen::CListen(pCComm _comm,
short _listenPort,
RPCCommResult _result,
short _size):
CThread (_size),
comm (_comm),
listenPort(_listenPort),
result (_result) {
cls = iListen;
Assert(comm);
Assert(result);
} // CListen
//---------------------------------------------------------------------
//
CListen::~CListen() {
Log('D', "CListen destructing");
} // ~CListen
//---------------------------------------------------------------------
//
RTTI_CPP(Listen, Thread)
//---------------------------------------------------------------------
// Create a new session thread and store it in the dictionary.
//
PCSession CListen::getSession(RPCSocket _socket) {
// Make a new session to handle the incoming message. (note: a bad
// person could flood this farVIEW with bogus sessionIDs, which
// could cause it to create many dangling sessions. This has to be
// handled!! Maybe an upper limit of the number of sessions active
// at one time? Could also include some minor trickery to avoid
// script-kiddy intrusions. Could do something like encrypting the
// sessionID along with some fixed (secret) keyword. If the keyword
// is missing, discard the request.)
Log('K', "CListen.getSession: Entry");
// Create the new session
char sessionID[sizeLine];
::GetID(sessionID, sizeLine);
PCSession session = comm->newSession(sessionID,
_socket->GetName(),
0,
"");
Assert(session);
session->Socket(_socket);
comm->sessions->InsertSession(sessionID, session);
session->Start();
return session;
} // getSession
//---------------------------------------------------------------------
// Disconnect the socket from the correspondent.
//
void CListen::Disconnect(void) {
Log('K', "CListen.Disconnect: Entry");
if (listenSocket)
listenSocket->Disconnect();
} // Disconnect
//---------------------------------------------------------------------
// Accept client request connections and hand them off to CSession
// subthreads for resolution. Note: need to include some code to limit
// the number of active clients.
//
void CListen::Run(void) {
Log('C', "CListen.Run: Entry");
Assert(result);
// Validate time between samplings
if (pause < 0)
pause = 0;
// Get a socket to listen for incoming messages
listenSocket = new CSocket(SOCKET_ERROR, /*_blocking=*/true);
if (!(listenSocket->ServerConnect(listenPort))) {
Log('D',
"CListen.Run: * Failed - ServerConnect error %s",
listenSocket->GetLastError());
return;
}
Log('D',
"CListen.Run: Listening to port %ld",
listenPort);
char line[sizeString];
while (true) {
// Get a socket for the CSession
PCSocket sessionSocket = listenSocket->Accept();
if (!sessionSocket) { // No client right now
if (!listenSocket || listenSocket->GetLastError())
break;
continue;
}
// Check to accept the client
// TBD sessionSocket->GetName(..);
// Create and start up a CSession thread object
PCSession session = getSession(sessionSocket);
if (!session)
continue;
// Send a ReceiveMessage command to the CSession object
PCReceiveMessage
recvMssg = new CReceiveMessage(session->CommID(false),
true);
PCItem recv = TYPECAST(recvMssg, Item);
PCCommResult rslt = new CCommResult(/*_wait*/false);
PCItem cmmd = new CCommCmmd('RM', recv, rslt);
session->GetPort()->Post(cmmd);
Log('K',
"CListen.Run:"
" Sent RM command from %s to session %s",
listenSocket->GetName(),
session->CommID(false));
} // while (true)
Log('D', "CListen::Run terminating");
Assert(result);
result->SetResult(FVE_OK);
result->Release();
} // Run
//=====================================================================
//
//---------------------------------------------------------------------
//
CWatcher::CWatcher(RPCSessionDict _sessions,
//? RPCCriticalSection _section,
RPCCommResult _result,
short _size):
CThread (_size),
done (false),
//?section (_section),
sessions(_sessions),
result (_result) {
//?cls = iWatcher;
Assert(result);
} // CWatcher
//---------------------------------------------------------------------
//
CWatcher::~CWatcher() {
if (!done)
Assert(done);
Log('D', "CWatcher destructing");
} // ~CWatcher
//---------------------------------------------------------------------
//
void CWatcher::Run(void) {
Log('C', "CWatcher.Run: Entry");
Assert(result);
short width = sessions->GetWidth();
while (!done) {
/*?
// Close any CSession thread not running
sessions->Enter();
for (short i = 0; i < width; i++) {
PCList list = sessions->GetList(i);
if (list && list->GotoHead()) do {
pCSession session = TYPECAST(list->Item(), Session);
// Remove the session if stopped
if (session->GetState() == THREAD_STOPPED) {
SysLog::Log('K',
"CWatch.Run: "
"Removing session %d (%d:%s)",
session->ThreadID(),
session->NrRefs(),
session->CommID(false));
list->Remove(atTop); // leaves the head item active
}
} while (list->GotoNext());
}
sessions->Leave();
?*/
Pause(10);
} // while (true)
Log('D', "CWatcher::Run terminating");
Assert(result);
result->SetResult(FVE_OK);
result->Release();
} // Run
//=====================================================================
// farVIEW communication command handler class. Provides a procedural
// interface between users and the Comm threads. Provides APIs for
// OpenComm and CloseComm, OpenSession and CloseSession, SendMessage,
// and ReceiveMessage. When specified in the CCommResult parameter,
// CComm methods wait for completion before returning control to the
// caller.
//=====================================================================
//---------------------------------------------------------------------
//
CComm::CComm(RPCAtom _book,
Pchar _userName,
Pchar _domain,
short _listenPort,
short _pause,
short _size):
CAtom (),
book (_book),
userName (0),
domain (0),
listenPort (_listenPort),
size (_size) {
cls = iComm;
//?Assert(_book);
if (!TheComm)
TheComm = this;
Assert(!StrEmpty(_userName));
STRINIT(userName, _userName)
STRINIT(domain, _domain)
pause = _pause;
// Create a sessions dictionary
sessions = new CSessionDict();
// Initialize a CommResult object for general use
listenResult = new CCommResult();
} // CComm
//---------------------------------------------------------------------
//
CComm::~CComm() {
delete[] userName;
userName = 0;
delete[] domain;
domain = 0;
} // ~CComm
//---------------------------------------------------------------------
//
RTTI_CPP(Comm, Atom)
//---------------------------------------------------------------------
//
void CComm::log(char _flag, Pchar _format, ...) {
if (!StrEmpty(_format)) {
// Print formatted message
va_list args;
va_start(args, _format);
char line[sizeString];
vsprintf(line, _format, args);
va_end(args);
Assert(strlen(line) < sizeString);
char date[sizeLine];
GetGMT(date, sizeLine);
ulong now = GetTicks();
SysLog::Log(_flag, "(%d: %ld) %s", 0, now, line);
//? SysLog::Log(_flag, "(%d: %ld) %s", 0, now - then, line);
//? then = now;
}
} // log
//---------------------------------------------------------------------
// This method creates a CSession object from the specified parameters.
// Users of the communication package must override this method to
// create a tailored CSession object. Note that this method is
// protected and is used only by CListen and CComm methods.
//
PCSession CComm::newSession(Pchar _sessionID,
Pchar _hostIP,
long _hostPort,
Pchar _userName) {
return new CSession(this,
book,
_userName,
domain,
_sessionID,
_hostIP,
_hostPort,
listenPort);
} // newSession
//---------------------------------------------------------------------
// Initialize the socket interface software. Create and start a TCP
// socket listener object if a listenPort was specified in the
// constructor.
//
bool CComm::OpenComm(RPCCommResult _result) {
log('K', "CComm.OpenComm: * Entry");
Assert(_result);
_result->result = FVE_INPROGRESS;
// Initialize the WinSock subsystem
CSocket::Initialize(pause);
// Create a listener if requested
if (listenPort) {
listener = new CListen(this, listenPort, listenResult, size);
listener->Start(16000);
}
// Create a dead-session watcher
watcherResult = new CCommResult();
watcher = new CWatcher(sessions, watcherResult);
watcher->Start(16000);
_result->SetResult(FVE_OK);
_result->Release();
return true;
} // OpenComm
//---------------------------------------------------------------------
// Terminate the listener thread if active. Close all active CSession
// threads by force. Shut down the socket interface software.
//
bool CComm::CloseComm(RPCCommResult _result) {
log('K', "CComm.CloseComm: * Entry");
Assert(_result);
_result->result = FVE_INPROGRESS;
// Close the listener port (it quits when it is disconnected)
if (!!listener) {
listener->Disconnect();
listenResult->Wait();
log('D', "Listener done");
}
// Stop the dead-session watcher
watcher->Done();
watcherResult->Wait();
log('D', "Watcher done");
// Close any CSession threads still open
short width = sessions->GetWidth();
for (short i = 0; i < width; i++) {
PCList list = sessions->GetList(i);
if (list && list->GotoHead()) do {
PCSession session = TYPECAST(list->Item(), Session);
// Close the session if needed
if (session->GetState() == THREAD_RUNNING) {
log('K',
"CComm.CloseComm: Closing session %d (%d:%s)",
session->ThreadID(),
session->NrRefs(),
session->CommID(false));
PCCommResult r = new CCommResult();
PCSocket socket = session->Socket();
if (socket)
socket->Disconnect();
// Send a CloseSession request to the session
PCItem close = new CCloseSession(session->CommID(false));
PCItem command = new CCommCmmd('CS', close, r);
session->GetPort()->Post(command);
r->Wait(); // wait for completion
}
else
log('K',
"CComm.CloseComm: Session %d closed (%d:%s)",
session->ThreadID(),
session->NrRefs(),
session->CommID(false));
} while (list->GotoNext());
}
sessions->Flush();
listener = 0;
watcher = 0;
// Close the WinSock subsytem
CSocket::UnInitialize();
_result->SetResult(FVE_OK);
_result->Release();
return true;
} // CloseComm
//---------------------------------------------------------------------
// Create a CSession thread object, associating it with the specified
// session ID. Add the CSession object to the session dictionary.
// Request that the CSession object connect to the specified host
// computer.
//
bool CComm::OpenSession(Pchar _sessionID,
Pchar _hostIP,
long _hostPort,
Pchar _userName,
RPCCommResult _result) {
log('K',
"CComm.OpenSession:"
" sessionID=%s, hostIP=%s, hostPort=%ld",
_sessionID,
_hostIP,
_hostPort);
Assert(!StrEmpty(_sessionID));
Assert(_result);
_result->result = FVE_INPROGRESS;
// Create a CSession thread to handle communication via the
// specified connection
PCSession session = newSession(_sessionID,
_hostIP,
_hostPort,
_userName);
if (!session) {
log('D', "CComm.OpenSession: * Could not open session");
_result->SetResult(FVE_SESSIONNOTOPEN);
_result->Release();
return false;
}
// Add the CSession object to the dictionary
sessions->InsertSession(_sessionID, session);
// Start the thread
session->Start();
PCCommResult r = new CCommResult();
// Request that the CSession thread open the connection
PCItem opening = new COpenConnect(_sessionID);
PCItem command = new CCommCmmd('OC', opening, r);
session->GetPort()->Post(command);
r->Wait(); // wait until the thread opens the connection
if (r->GetResult() != FVE_OK) {
log('D',
"CComm.OpenSession:"
" * Could not open session (%ld)",
r->GetResult());
_result->SetResult(FVE_SESSIONNOTOPEN);
_result->Release();
return false;
}
log('K', "CComm.OpenSession: Opened session");
_result->SetResult(FVE_OK);
_result->Release();
return true;
} // OpenSession
//---------------------------------------------------------------------
// Request that the CSession thread object associated with the
// specified session ID disconnect from the correspondent computer.
// Close and remove the CSession object from the dictionary.
//
bool CComm::CloseSession(Pchar _sessionID, RPCCommResult _result) {
log('K', "CComm.CloseSession: sessionID=%s", _sessionID);
Assert(_result);
_result->result = FVE_INPROGRESS;
// Find the CSession thread to handle this command
PCSession session = sessions->GetFirstSession(_sessionID);
if (!session) {
log('D', "CComm.CloseSession: * Could not find session");
_result->SetResult(FVE_SESSIONNOTFOUND);
return false;
}
log('K', "CComm.CloseSession: Found session");
PCItem command;
// Request the CSession thread to close itself
PCItem close = new CCloseSession(_sessionID);
command = new CCommCmmd('CS', close, _result);
session->GetPort()->Post(command);
// Remove the CSession object from the dictionary
//?sessions->RemoveSession(_sessionID);
log('K',
"CComm.CloseSession: Removed session (%d)",
session->NrRefs());
_result->SetResult(FVE_OK);
return true;
} // CloseSession
//---------------------------------------------------------------------
// Forward the SendMessage request to the CSession thread object
// associated with the specified session ID. Wait for the request to
// be completed.
//
bool CComm::SendMessage(Pchar _sessionID,
Pchar _messageType,
Pchar _command,
Pchar _argument,
Pchar _contentType,
RPCAtom _body,
disposition _life,
RPCCommResult _result) {
log('K', "CComm.SendMessage: sessionID=%s", _sessionID);
Assert(stricmp(_messageType, "HTTP") == 0 ||
stricmp(_messageType, "FAR") == 0 ||
stricmp(_messageType, "ACK") == 0);
if (stricmp(_messageType, "ACK") != 0)
Assert(!StrEmpty(_command));
Assert(_result);
_result->result = FVE_INPROGRESS;
// Find the CSession thread to handle this command
PCSession session = sessions->GetFirstSession(_sessionID);
if (!session) {
log('D', "CComm.SendMessage: * Could not find session");
_result->SetResult(FVE_SESSIONNOTFOUND);
_result->Release();
return false;
}
log('K', "CComm.SendMessage: Found session");
// Request the CSession thread to send a message
PCBlock signature;
PCItem message = new CSendMessage(_sessionID,
_messageType,
_command,
_argument,
_contentType,
signature,
_body,
_life);
_result->Wait(); // wait until the message is sent
PCItem command = new CCommCmmd('SM', message, _result);
session->GetPort()->Post(command);
return true;
} // SendMessage
//---------------------------------------------------------------------
// Forward the ReceiveMessage request to the CSession thread object
// associated with the specified session ID. Wait for the request to
// be completed.
//
bool CComm::ReceiveMessage(Pchar _sessionID,
RPCCommResult _result,
bool _doRequest) {
log('K', "CComm.ReceiveMessage: sessionID=%s", _sessionID);
Assert(_result);
_result->result = FVE_INPROGRESS;
// Find the CSession thread to handle this command
PCSession session = sessions->GetFirstSession(_sessionID);
if (!session) {
log('D', "CComm.ReceiveMessage: * Could not find session");
_result->SetResult(FVE_SESSIONNOTFOUND);
_result->Release();
return false;
}
log('K', "CComm.ReceiveMessage: Found session");
// Request the CSession thread to set itself to receive a message
PCItem recvMssg = new CReceiveMessage (_sessionID, _doRequest);
_result->Wait(); // wait until the message is received
PCItem command = new CCommCmmd('RM', recvMssg, _result);
session->GetPort()->Post(command);
return true;
} // ReceiveMessage