//********************************************************************* // comm.cpp //********************************************************************* // // This module performs all TCP/IP communications for farVIEW, and is // the repository for the CComm, CListen, and CSession classes, along // with a several supporting classes. All activities implemented by // these classes are instigated by callers through specific command // interfaces in the CComm object. // // CSession and CListen are derived from CThread, and, as such, have // input ports for receiving commands. CListen, however, does not // examine its inport since its purpose is to receive client request // commands via its listenSocket. // // A command is an object derived from CItem so that it can be // enqueued in the input port. // // Commands take three parameters: // // code a ulong containing up to four characters, as in // 'ABCD', identifying the command to perform. See // the specific commands (e.g., OpenComm) listed // below for the actual codes. // // data an object derived from CItem that provides // parameters for the implementation code. Several // commands use a specialized object for this purpose. // They are listed here. // // CCloseSession // COpenConnect // CCloseConnect // CSendMessage // CReceiveMessage // // These objects are used internally by the CComm // package. Normal use of the CComm package is // accomplished through the CComm API, which provides // a small set of methods to perform these functions. // // result a CCommResult object that provides return // information from a Comm request. One important // return value is the result of the operation with // which the result was associated. The result is set // to FVE_INPROGRESS until the operation is either // complete or is aborted. Then, the result is set to // the appropriate value. Values have the form // FVE_state. Valid constants are: // // const ulong FVE_INPROGRESS // const ulong FVE_OK // const ulong FVE_CMMDINVALID // const ulong FVE_CMMDDATAINVALID // const ulong FVE_CONNECTIONNOTFOUND // const ulong FVE_CONNECTIONNOTOPEN // const ulong FVE_SESSIONNOTFOUND // const ulong FVE_SESSIONNOTOPEN // const ulong FVE_SENDFAILED // const ulong FVE_RECEIVEFAILED // const ulong FVE_RECEIVEABORTED // const ulong FVE_HEADERSINVALID // const ulong FVE_SOCKETNOTOPEN // // An important feature of the return object is that // it can be used to wait for the completion of the // operation with which it is associated by // specifying _wait = true when you construct the // result object. This is recommended in most cases. // // All the information obtained by the ReceiveMessage // command is returned in the CCommResult object. // // CComm clients use the following six CComm methods to control the // communication process. // // OpenComm Creates a internet listener thread (if // specified). Should be called at the time of // InitInstance of the application. // // CloseComm Disconnects the Comm object from the // internet and discards all Comm objects. // Should be called at the time of // TermInstance of the application. // // OpenSession Creates a session to communicate via a // specified URL. The session is identified by // a unique identifier (supplied by the // caller.) // // CloseSession Closes the session specified by the // enclosed session identifier. // // SendMessage Sends a message to a correspondent via the // session identified by the enclosed session // identifier. // // ReceiveMessage Waits to receive a message from a // correspondent via the session identified by // the enclosed session identifier. // // The CComm interface posts CSession commands to a specified CSession // object via its input port. The CSession object performs the commands // in the order presented in its input port using the accompanying // CCommResult object to return results to CComm and the package user. // // Below is a list of the commands recognized by a CSession object. // // OpenConnect ('OC') Opens a (socket) connection to the // correspondent for a specified session. // // CloseSession ('CS') Sends a message to the session to perform // required shut-down procedures. // // SendMessage ('SM') Sends a message to the correspondent // associated with the CSession object. // // ReceiveMessage ('RM') Waits to receive a message sent by a // correspondent. // //********************************************************************* // // All farVIEW actions and their associated messages are described in // a document called farhttp.txt. // // This communication subsystem supports a farVIEW system's ability to // communicate with standard Internet HTTP servers as well as other // farVIEW systems. farVIEW uses the standard HTTP messages to request // pages from HTTP servers, and a FAR message to request actions from // other farVIEW systems. // // A farVIEW system that has received a message requesting an activity // uses the standard HTTP response (ACK) message to return data and to // report errors. All headers used are standard HTTP. // // * The HTTP GET request message is used to access standard HTTP // servers and has the standard format // // GET <URI> HTTP/1.0 // // * The FAR request message is used to request farVIEW actions and // has the following format: // // FAR /<cmmd>[/<argument>] HTTP/1.0 // Date: Thu, 10 Feb 2000 18:32:12 GMT // Port: <portNr> // From: <userName> // Connection: keep-open | close // // The <cmmd> field is required. See comments in RMTBOOK.CPP for a // list of the recognized commands. // // If a body is included in the message, the following is appended to // the FAR message given above: // // Content-Type: <mimeType> ;<bodyType> [;E] // Content-Length: <bodySize> // <crlf> // <signatureBlock> // <body> // // The MIME types that farVIEW recognizes are listed below. // // text/html // text/plain // text/xml // text/abst // image/bmp // image/gif // image/jpg // image/png // audio/midi // audio/mp3 // audio/wav // video/? // application/? // // This list is currently hardcoded but maybe should be moved out to // the configuration file. // // The signature block is included only when the E-option is included // in the Content-Type header; i.e., when the body is encrypted. The // signature block has a fixed size, which is counted along with the // body in the Content-Length header. Since the signature block is not // yet implemented, I can't tell you how big it is. // // * A farVIEW response message has the following format: // // HTTP/1.0 <code> <reason> // Server: farVIEW/3.x // Date: Thu, 10 Feb 2000 18:32:13 GMT // From: <userName> // Connection: close // // Note that if a body is included in a response message, the Content- // Type and Content-Length headers are also included as described // above. // //********************************************************************* #include "prefixb.h" #include "book.h" #include "comm.h" #include "factory.h" #include "inilist.h" #include "record.h" #include "socket.h" #include "syslog.h" const char fvNSSName[] = "127.0.0.1"; const short fvNSSPort = -1; short pause = 100; // Waits up to 10 seconds for data ulong szLimit = 2500000; pCComm CComm::TheComm = 0; //===================================================================== // Comm result object class // //--------------------------------------------------------------------- // Default constructor sets to wait // CCommResult::CCommResult(void): CAtom (), result (FVE_INPROGRESS), code (0), mssg (0), headers(0), type (0), body (0) { cls = iCommResult; wait = new CSemaphore(); } // CCommResult //--------------------------------------------------------------------- // CCommResult::CCommResult(bool _wait): CAtom (), result (FVE_INPROGRESS), code (0), mssg (0), headers(0), type (0), body (0) { cls = iCommResult; if (_wait) wait = new CSemaphore(); } // CCommResult //--------------------------------------------------------------------- // CCommResult::~CCommResult() { delete[] mssg; mssg = 0; delete[] type; type = 0; } // ~CCommResult //--------------------------------------------------------------------- // RTTI_CPP(CommResult, Atom) //--------------------------------------------------------------------- // void CCommResult::Wait(void) { if (wait) wait->Wait(); } // Wait //--------------------------------------------------------------------- // void CCommResult::Release(void) { if (wait) wait->Release(); } // Release //--------------------------------------------------------------------- // void CCommResult::SetResult(ulong _result) { result = _result; } // SetResult //--------------------------------------------------------------------- // ulong CCommResult::GetResult(void) { return result; } // GetResult //--------------------------------------------------------------------- // void CCommResult::SetCode(ulong _code) { code = _code; } // SetCode //--------------------------------------------------------------------- // ulong CCommResult::GetCode(void) { return code; } // GetCode //--------------------------------------------------------------------- // void CCommResult::SetMssg(Pchar _mssg) { STRINIT(mssg, _mssg) } // SetMssg //--------------------------------------------------------------------- // Pchar CCommResult::GetMssg(void) { return mssg; } // GetMssg //--------------------------------------------------------------------- // void CCommResult::SetHead(RPCStrList _headers) { headers = _headers; } // SetHead //--------------------------------------------------------------------- // PCStrList CCommResult::GetHead(void) { return headers; } // GetHead //--------------------------------------------------------------------- // void CCommResult::SetType(Pchar _type) { // Clean up the MIME type/subtype provided char wrk1[sizeName]; Strcpy(wrk1, _type, sizeName); CapStr(wrk1); ChrCat(wrk1, ';', sizeName); char wrk2[sizeName]; Before(";", wrk1, wrk2, sizeName); Trim(wrk2); STRINIT(type, wrk2) } // SetType //--------------------------------------------------------------------- // Pchar CCommResult::GetType(void) { return type; } // GetType //--------------------------------------------------------------------- // void CCommResult::SetBody(RPCAtom _body) { body = _body; } // SetBody //--------------------------------------------------------------------- // PCAtom CCommResult::GetBody(void) { return body; } // GetBody //===================================================================== // //--------------------------------------------------------------------- // CCloseSession::CCloseSession(Pchar _sessionID): CItem (), sessionID(0) { cls = iCloseSession; Assert(!StrEmpty(_sessionID)); STRINIT(sessionID, _sessionID) } // CCloseSession //--------------------------------------------------------------------- // CCloseSession::~CCloseSession() { delete[] sessionID; sessionID = 0; } // ~CCloseSession //--------------------------------------------------------------------- // RTTI_CPP(CloseSession, Item) //===================================================================== // //--------------------------------------------------------------------- // COpenConnect::COpenConnect(Pchar _sessionID): CItem (), sessionID(0) { cls = iOpenConnect; Assert(!StrEmpty(_sessionID)); STRINIT(sessionID, _sessionID) } // COpenConnect //--------------------------------------------------------------------- // COpenConnect::~COpenConnect() { delete[] sessionID; sessionID = 0; } // ~COpenConnect //--------------------------------------------------------------------- // RTTI_CPP(OpenConnect, Item) //===================================================================== // //--------------------------------------------------------------------- // CSendMessage::CSendMessage(Pchar _sessionID, Pchar _mssgType, Pchar _command, Pchar _argument, Pchar _bodyType, RPCBlock _signature, RPCAtom _body, disposition _life): CItem (), sessionID(0), mssgType (0), command (0), argument (0), bodyType (0), signature(_signature), body (_body), life (_life) { cls = iSendMessage; Assert(!StrEmpty(_sessionID)); STRINIT(sessionID, _sessionID) STRINIT(mssgType, _mssgType) CapStr(mssgType); STRINIT(command, _command) CapStr(command); STRINIT(argument, _argument) STRINIT(bodyType, _bodyType) } // CSendMessage //--------------------------------------------------------------------- // CSendMessage::~CSendMessage() { delete[] sessionID; sessionID = 0; delete[] mssgType; mssgType = 0; delete[] command; command = 0; delete[] argument; argument = 0; delete[] bodyType; bodyType = 0; } // ~CSendMessage //--------------------------------------------------------------------- // RTTI_CPP(SendMessage, Item) //===================================================================== // //--------------------------------------------------------------------- // CReceiveMessage::CReceiveMessage(Pchar _sessionID, bool _doRequest): CItem (), sessionID(0), doRequest(_doRequest) { cls = iReceiveMessage; Assert(!StrEmpty(_sessionID)); STRINIT(sessionID, _sessionID); } // CReceiveMessage //--------------------------------------------------------------------- // CReceiveMessage::~CReceiveMessage() { delete[] sessionID; sessionID = 0; } // ~CReceiveMessage //--------------------------------------------------------------------- // RTTI_CPP(ReceiveMessage, Item) //===================================================================== // //--------------------------------------------------------------------- // CCommCmmd::CCommCmmd(ulong _cmmd, RPCItem _data, RPCCommResult _result): CItem (), cmmd (_cmmd), data (_data), result (_result) { cls = iCommCmmd; } // CCommCmmd //--------------------------------------------------------------------- // CCommCmmd::~CCommCmmd() { } // ~CCommCmmd //--------------------------------------------------------------------- // RTTI_CPP(CommCmmd, Item) //--------------------------------------------------------------------- // void CCommCmmd::SetResult(ulong _result) { if (result) result->SetResult(_result); } // SetResult //===================================================================== // Session dictionary // //--------------------------------------------------------------------- // CSessionDict::CSessionDict(short _wide): CHshList(_wide) { cls = iSessionDict; cs = new CCriticalSection(); } // CSessionDict //--------------------------------------------------------------------- // RTTI_CPP(SessionDict, HshList) //--------------------------------------------------------------------- // void CSessionDict::InsertSession(Pchar _sessionID, RPCSession _session) { cs->Enter(); PCItem item = TYPECAST(_session, Item); CHshList::Insert(_sessionID, item); cs->Leave(); } // InsertSession //--------------------------------------------------------------------- // PCSession CSessionDict::GetFirstSession(Pchar _sessionID) { cs->Enter(); PCItem item = CHshList::GetFirstItem(_sessionID); cs->Leave(); if (item) return TYPECAST(item, Session); else return 0; } // GetFirstItem //--------------------------------------------------------------------- // PCSession CSessionDict::RemoveSession(Pchar _sessionID) { cs->Enter(); PCItem item = CHshList::GetFirstItem(_sessionID); if (item) { CHshList::Remove(); cs->Leave(); return TYPECAST(item, Session); } else { cs->Leave(); return 0; } } // RemoveSession //--------------------------------------------------------------------- // Hash the specified key into an unsigned integer. // ushort CSessionDict::hash(Pvoid _key) { ulong h = 0; Pchar key = Pchar(_key); while (*key) { char c = MapChar(*key++); if (c) h = h * 0x2B + c; } return ushort(h % wide); } // hash //===================================================================== // farVIEW TCP/IP session thread class. When acting as a server, this // class receives control from the CListen thread to process the // client's request message. A server class object is usually derived // from CSession to respond to the request message. The CSession // object handles all the communication part of the server activity. // // A CSession object also acts on behalf of a client to send and/or // receive one or more messages to a server. The CSession object // receives all its requests through its input port, usually placed // there by the CComm object on behalf of user code. //===================================================================== //--------------------------------------------------------------------- // CSession::CSession(pCComm _comm, RPCAtom _book, Pchar _userName, Pchar _domain, Pchar _sessionID, Pchar _hostIP, long _hostPort, long _listenPort, short _szPort): CThread (_szPort), error (false), comm (_comm), book (_book), userName (0), domain (0), sessionID (0), hostIP (0), hostPort (_hostPort), listenPort(_listenPort), typeName (0), result (0) { cls = iSession; //?Assert(_book); Assert(_userName); Assert(!StrEmpty(_sessionID)); Assert(!StrEmpty(_hostIP)); //?Assert(hostPort > 0); // host sessions always have this zero STRINIT(userName, _userName) STRINIT(domain, _domain) STRINIT(sessionID, _sessionID) STRINIT(hostIP, _hostIP); } // CSession //--------------------------------------------------------------------- // CSession::~CSession() { delete[] userName; userName = 0; delete[] domain; domain = 0; delete[] sessionID; sessionID = 0; delete[] hostIP; hostIP = 0; delete[] typeName; typeName = 0; } // ~CSession //--------------------------------------------------------------------- // RTTI_CPP(Session, Thread) //--------------------------------------------------------------------- // Send error acknowledge to the client // void CSession::ackError(Pchar _msg, ...) { Log('K', "CSession.ackError: * Entry"); Assert(socket); // Format the message va_list args; va_start(args, _msg); char work[sizeString]; vsprintf(work, _msg, args); va_end(args); // Send them the error message char msg[sizeString]; sprintf(msg, "HTTP/1.0 %s\r\n", work); Log('D', "CSession.ackError: * Sent %s", msg); if (!(socket->Send(msg, strlen(msg)))) Log('D', "CSession.ackError: * Socket error on sending %s (%s)", msg, socket->GetLastErrorMessage()); /*? // Tell them to shut down sprintf(msg, "Connection: close\r\n"); if (!(socket->Send(msg, strlen(msg)))) Log('D', "CSession.ackError: * Socket error on sending %s (%s)", msg, socket->GetLastErrorMessage()); ?*/ // Send a blank line to terminate the message msg[0] = '\r'; msg[1] = '\n'; msg[2] = 0; if (!(socket->Send(msg, 2))) Log('D', "CSession.ackError: * Socket error on sending %s (%s)", msg, socket->GetLastErrorMessage()); } // ackError //--------------------------------------------------------------------- // Already connected // void CSession::Socket(RPCSocket _socket) { socket = _socket; } // Socket //--------------------------------------------------------------------- Pchar CSession::CommID(bool _alc) { if (_alc) { Pchar p = new char[strlen(sessionID)+1]; strcpy(p, sessionID); return p; } return sessionID; } // CommID //--------------------------------------------------------------------- // Use only in farSlang // Pchar CSession::CommID(void) { return CommID(true); } // CommID //--------------------------------------------------------------------- Pchar CSession::CommIP(bool _alc) { if (_alc) { Pchar p = new char[strlen(hostIP)+1]; strcpy(p, hostIP); return p; } return hostIP; } // CommIP //--------------------------------------------------------------------- // Use only in farSlang // Pchar CSession::CommIP(void) { return CommIP(true); } // CommIP //--------------------------------------------------------------------- long CSession::CommPort(void) { return hostPort; } // CommPort //--------------------------------------------------------------------- // Generate a CloseSession command and enqueue it for later processing. // void CSession::Close(void) { Log('K', "CSession.Close: * Entry"); PCItem temp = new CItem(); PCCommResult rslt = new CCommResult(/*_wait*/false); PCItem command = new CCommCmmd('C', temp, rslt); GetPort()->Post(command); } // Close //--------------------------------------------------------------------- // This method must be overriden to actually encrypt the body. // bool CSession::doEncrypt(RPCStrList _headers, RPCBlock _signature, RPCBlock _body) { _headers; _signature; _body; Log('K', "CSession.Close: * Entry"); Assert(false); return false; } // doEncrypt //--------------------------------------------------------------------- // Send the composited message to the correspondent. // bool CSession::sendMessage(RPCStrList _head, RPCBlock _signature, RPCBlock _body) { Log('K', "CSession.sendMessage: * Entry"); Assert(socket); // Dump the headers to the log file if (_head->GotoHead()) do { Log('K', "CSession.sendHeaders: <%s>", _head->StrItem(false)); } while (_head->GotoNext()); // Send the headers PCList list = TYPECAST(_head, List); if (!(socket->Send(list))) { Log('D', "CSession.sendMessage: * Could not send headers (%s)", socket->GetLastErrorMessage()); return false; } // Send a blank line that forms the headers/body divider char crlf[3]; crlf[0] = '\r'; crlf[1] = '\n'; crlf[2] = 0; if (!(socket->Send(crlf, 2))) { Log('D', "CSession.sendMessage: * Could not send " "headers-body divider (%s)", socket->GetLastErrorMessage()); return false; } // Send the signature if (_signature && _signature->GetSize() > 0) { Pchar p = _signature->Lock(); if (!(socket->Send(p, _signature->GetSize()))) { Log('D', "CSession.sendMessage: * Could not send signature (%s)", socket->GetLastErrorMessage()); return false; } } // Send the body if (_body && _body->GetSize() > 0) { Pchar p = _body->Lock(); if (!(socket->Send(p, _body->GetSize()))) { Log('D', "CSession.sendMessage: * Could not send body (%s)", socket->GetLastErrorMessage()); return false; } } return true; } // sendMessage //--------------------------------------------------------------------- // Prepare the body for sending to the correspondent. Append the // Content-Type and Content-Length headers if the body is not empty. // PCBlock CSession::makeBody(RPCStrList _headers, RPCSendMessage _sendMssg) { Log('K', "CSession.makeBody: * Entry"); // Check for a body to send if (!(_sendMssg->body)) return 0; // Only allow to send bodies derived from CAtom if (!(_sendMssg->body->Isa(iAtom))) { Log('D', "CSession.makeBody: * Body is not derived from CAtom"); return 0; } // Convert the body to a CBlock PCBlock body; if (_sendMssg->body->Isa(iBlock)) body = TYPECAST(_sendMssg->body, Block); else if (_sendMssg->body->Isa(iStrList)) { PCStrList bdy = TYPECAST(_sendMssg->body, StrList); char line[sizeString]; Strcpy(line, bdy->StrItem(1, false), sizeString); Log('K', "CSession.makeBody: %s", line); body = new CBlock(); PCAtom pbdy = TYPECAST(body, Atom); if (!_sendMssg->body->ToBlock(pbdy)) { Log('D', "CSession.makeBody:" " * Failed to convert CStrList body to CBlock"); return 0; } } else { Log('D', "CSession.makeBody:" " * Message body must be CStrList or CBlock"); return 0; } //------------------------------------------------------------------ // Let the user encrypt the body and the signature doEncrypt(_headers, _sendMssg->signature, body); char line[sizeString]; // Append the type header if (!StrEmpty(_sendMssg->bodyType)) { Strcpy(line, "Content-Type: ", sizeString); Strcat(line, _sendMssg->bodyType, sizeString); _headers->StrInsert(line); } // Compute the size of the body and append the size header ulong size = 0; if (_sendMssg->signature) size += _sendMssg->signature->GetSize(); size += body->GetSize(); sprintf(line, "Content-Length: %ld", size); _headers->StrInsert(line); return body; } // makeBody //--------------------------------------------------------------------- // Date: Thu, 10 Feb 2000 18:32:13 GMT // From: <userName> // Connection: keep-open | close // void CSession::makeComm(RPCStrList _headers, RPCSendMessage _sendMssg) { Log('K', "CSession.makeComm: * Entry"); char line[sizeString]; // Append the message date (and time) Strcpy(line, "Date: ", sizeString); GetGMT(line+6, sizeString-6); _headers->StrInsert(line); // Append the port number if (listenPort) { sprintf(line, "Port: %d", listenPort); _headers->StrInsert(line); } // Append the user name if (!StrEmpty(userName)) { Strcpy(line, "From: ", sizeString); Strcat(line, userName, sizeString); _headers->StrInsert(line); } // Append the connection type Strcpy(line, "Connection: ", sizeString); if (_sendMssg->life == keepAlive) { Strcat(line, "keep-alive", sizeString); } else { Strcat(line, "close", sizeString); } _headers->StrInsert(line); } // makeComm //--------------------------------------------------------------------- // <command> <argument> HTTP/1.0 // // where // // <command> is CONNECT | DELETE | GET | HEAD | POST | PUT | TRACE // bool CSession::sendHTTP(RPCSendMessage _sendMssg) { Log('K', "CSession.sendHTTP: * Entry"); Assert(!StrEmpty(hostIP)); PCStrList headers = new CStrList(); char line[sizeString]; // Default HTTP message type is "GET" not coded correctly for POST if (StrEmpty(_sendMssg->command)) { Strcpy(line, "GET", sizeString); } else Strcpy(line, _sendMssg->command, sizeString); Strcat(line, " ", sizeString); // get resource from http://domain:port/resource PCRecord url = new CRecord(_sendMssg->argument, "/"); char host[sizeString]; int i = ChrPos(':', host); if (i > 0) host[i] = 0; url->GetField(3, host, sizeLine); url->DeleteField(1); url->DeleteField(1); url->DeleteField(1); Pchar resource = url->GetDelimStrD(false); Strcat(line, resource, sizeString); Strcat(line, " HTTP/1.0", sizeString); headers->StrInsert(line); // headers->StrInsert("Accept: image/gif, image/x-xbitmap," // " image/jpeg, image/pjpeg, application/vnd.ms-excel," // " application/msword, application/vnd.ms-powerpoint," // " *"); headers->StrInsert("Accept-Language: en-us"); headers->StrInsert("Accept-Encoding: gzip, deflate"); headers->StrInsert("User-Agent: Mozilla/4.0"); Strcpy(line, "Host: ", sizeString); Strcat(line, host, sizeString); headers->StrInsert(line); headers->StrInsert(""); // Process the body if any PCBlock body = makeBody(headers, _sendMssg); // Send the message to the correspondent if (sendMessage(headers, _sendMssg->signature, body)) { Log('L', "CSession.sendHTTP: Sent <%s> (sz=0) to %s", line, hostIP); result->SetResult(FVE_OK); return true; } else { result->SetResult(FVE_SENDFAILED); return false; } } // sendHTTP //--------------------------------------------------------------------- // HTTP/1.0 <code> <reason> // Server: farVIEW/3.x // bool CSession::sendACK(RPCSendMessage _sendMssg) { Log('K', "CSession.sendACK: * Entry"); Assert(!StrEmpty(hostIP)); PCStrList headers = new CStrList(); char line[sizeString]; // Append the ACK line Strcpy(line, "HTTP/1.0 ", sizeString); Strcat(line, _sendMssg->argument, sizeString); headers->StrInsert(line); // Append the server line headers->StrInsert("Server: farVIEW/3.x"); // Append the common header lines makeComm(headers, _sendMssg); // Initialize the body part, if any PCBlock body = makeBody(headers, _sendMssg); // Send the message to the farVIEW comm thread bool rslt = sendMessage(headers, _sendMssg->signature, body); // Check to disconnect the socket if (_sendMssg->life != keepAlive) socket->Disconnect(); if (rslt) { if (body) Log('L', "CSession.sendACK: Sent <%s> (sz=%ld) to %s", line, body->GetSize(), hostIP); else Log('L', "CSession.sendACK: Sent <%s> (sz=0) to %s", line, hostIP); result->SetResult(FVE_OK); return true; } else { result->SetResult(FVE_SENDFAILED); return false; } } // sendACK //--------------------------------------------------------------------- // FAR /<cmmd>[/<argument>] HTTP/1.0 // bool CSession::sendFAR(RPCSendMessage _sendMssg) { Log('K', "CSession.sendFAR: * Entry"); Assert(!StrEmpty(hostIP)); PCStrList headers = new CStrList(); char line[sizeString]; // Append the FAR line Strcpy(line, "FAR /", sizeString); Strcat(line, _sendMssg->command, sizeString); if (!StrEmpty(_sendMssg->argument)) { ChrCat(line, '/', sizeString); Strcat(line, _sendMssg->argument, sizeString); } Strcat(line, " HTTP/1.0", sizeString); headers->StrInsert(line); // Append the common header lines makeComm(headers, _sendMssg); // Initialize the body part, if any PCBlock body = makeBody(headers, _sendMssg); // Send the message to the farVIEW comm thread if (sendMessage(headers, _sendMssg->signature, body)) { if (body) Log('L', "CSession.sendFAR: Sent <%s> (sz=%ld) to %s", line, body->GetSize(), hostIP); else Log('L', "CSession.sendFAR: Sent <%s> (sz=0) to %s", line, hostIP); result->SetResult(FVE_OK); return true; } else { result->SetResult(FVE_SENDFAILED); return false; } } // sendFAR //--------------------------------------------------------------------- // Formulate a message from its parts, then send it. The type of the // message to be built is determined by the command. Two commands are // reserved: HTTP to communicate with an HTTP server, and ACK to // respond to a previously received request. Any other command is sent // using the FAR corruption of HTTP. // // Encrypt outgoing messages in the following way: // 1. Hash the body b giving H // 2. Create a throwaway key K // 3. Use K to encrypt the body b giving K(b) // 3a.How about using K to generate a base-64 transform table? And then // map K(b) to base-64 using the scrambled table. // 4. Use myPrivateKey to encrypt GMT+H+K+param giving p // 5. Use herPublicKey to encrypt myUserName+p giving P // 6. Send headers + line_break + P + K(b). // The param value is a random number in messages from the client, and // is ignored by the server, but it is the client's hash in acknowledge // messages so that the client can verify that the server received the // message that the client sent. // // The acknowledge message should be encrypted only if the request // message was. The encrypted acknowledge should include the original // hash for verification. // // Decrypt incoming messages in the following way: // 1. Use myPrivateKey to decrypt P, giving herUserName+p // 2. Validate herUserName. // 3. Use herPublicKey to decrypt p giving GMT+H+K+param // 4. Validate GMT and H. // 5. Log receipt storing herUserName+GMT // 5a.Use K to reproduce the base-64 transform table. And then map the // base-64 code back into K(b) // 6. Use K to decrypt the body K(b) giving b // 7. Hash b to validate with H. // (note: this would verify that she received something, but how to // tell what it was she received! she should return the original hash // as the param value.) // // Could this be sent as an XML message? Two parts: the protocol part // and the payload part. Ok, how about this? // // http-headers // // <far-message> // <far-head>...</far-head> // <far-body>...</far-body> // </far-message> // // redesign the messages that you already set up for file xfer to use // this model, or something similar. // bool CSession::Send(RPCSendMessage _sendMssg) { Log('K', "CSession.send: * Entry"); if (_sendMssg->body && _sendMssg->body->Isa(iStrList)) { PCStrList list = TYPECAST(_sendMssg->body, StrList); list->Save("sendbody.log"); } // Determine the message type if (strcmp(_sendMssg->mssgType, "HTTP") == 0) return sendHTTP(_sendMssg); else if (strcmp(_sendMssg->mssgType, "ACK") == 0) return sendACK (_sendMssg); else if (strcmp(_sendMssg->mssgType, "FAR") == 0) return sendFAR (_sendMssg); else Assert(false); return false; } // Send //--------------------------------------------------------------------- // This method must be overridden to actually decrypt the body received. // bool CSession::doDecrypt(RPCStrList _headers, RPCBlock _body) { _headers; _body; Log('K', "CSession.doDecrypt: Entry"); Assert(false); return false; } // doDecrypt //--------------------------------------------------------------------- // void CSession::getStrValue(Pchar _line) { strcpy(_line, _line+ChrPos(':', _line)+1); Trim(_line); } // getStrValue //--------------------------------------------------------------------- // Get an integer value from a header line. // long CSession::getIntValue(Pchar _line) { getStrValue(_line); return StrToLong(_line); } // getIntValue //--------------------------------------------------------------------- // void CSession::getHeaderStrValue(RPCStrList _head, Pchar _name, Pchar _value, ushort _size) { if (_head->FindFirstString(_name)) { char work[sizeString]; _head->StrItem(work, sizeLine); getStrValue(work); Strcpy(_value, work,_size); } else _value[0] = 0; } // getHeaderStrValue //--------------------------------------------------------------------- // long CSession::getHeaderIntValue(RPCStrList _head, Pchar _name) { char work[sizeString]; getHeaderStrValue(_head, _name, work, sizeLine); if (StrEmpty(work)) return 0; else return StrToLong(work); } // getHeaderIntValue //--------------------------------------------------------------------- // Translate standard MIME type (of those that I support, or hope to // support, anyway) to the farVIEW identifier for the CAtom container // to hold the content. // struct { char type[32]; ushort form; } MIMETypes[] = {{"TEXT/HTML", iBlock}, {"TEXT/PLAIN", iBlock}, {"TEXT/XML", iBlock}, {"TEXT/ABST", iManifest}, {"IMAGE/BMP", iBlock}, {"IMAGE/GIF", iBlock}, {"IMAGE/JPG", iBlock}, {"IMAGE/PNG", iBlock}, {"AUDIO/MIDI", iBlock}, {"AUDIO/MP3", iBlock}, {"AUDIO/WAV", iBlock}, {"VIDEO/?", 0}, // for generic file xfer {"APPLICATION/XML", iBlock}, {"APPLICATION/BINARY", iBlock}, {"", 0}}; ushort CSession::getFVType(Pchar _MIMEType) { // Clean up the MIMEType provided char work[sizeName]; Strcpy(work, _MIMEType, sizeName); CapStr(work); ChrCat(work, ';', sizeName); char MIMEType[sizeName]; Before(";", work, MIMEType, sizeName); Trim(MIMEType); // Search the table for the container code int i = 0; while (!StrEmpty(MIMETypes[i].type)) if (strcmp(MIMEType, MIMETypes[i].type) == 0) return MIMETypes[i].form; else i++; return 0; } // getFVType //--------------------------------------------------------------------- // Read the message header into an strlist. After obtaining the // header, validate that the minimum required headers are present. // PCStrList CSession::receiveHeaders(void) { Log('K', "CSession.receiveHeaders Entry"); Assert(socket); PCStrList headers = new CStrList(); char line[sizeString]; line[0] = 0; // Read the first header line if (!(socket->GetLine(line, sizeString, /*_block*/true))) { Log('K', "CSession.recvHeaders: * Failed to read header line"); return 0; } // Read the remaining header lines from the socket into a // strlist. Stop reading when an empty line is obtained while (strlen(line)) { headers->StrInsert(line); if (!(socket->GetLine(line, sizeString, /*_block*/false))) { Log('K', "CSession.recvHeaders: * Failed to read header line"); return 0; } } Log('K', "CSession.recvHeaders: Read headers"); // Dump the headers to the log file if (headers->GotoHead()) do { Strcpy(line, headers->StrItem(false), sizeString); Log('K', "CSession.recvHeaders: <%s>", line); } while (headers->GotoNext()); return headers; } // receiveHeaders //--------------------------------------------------------------------- // Receive the body as a memory block, then convert it to the specified // CAtom object. (note for later: should cache the body to disk if it // is above some size? or maybe should always cache it? or maybe always // require that it not be so large that it busts memory (current // assumption). So if you need to send a file, you send it as a series // of blocks? another note: I think that the file transfer protocol // should be handled at a higher level from this level. Will want to // worry about block numbers and complete block sets, etc. Much later: // Ok, see Server.sendFile and Server.ReceiveFile // PCAtom CSession::receiveBody(long _szBody, RPCStrList _headers) { Log('K', "CSession.receiveBody: Entry"); Assert(socket); // Collect the body of the request into a single memory block Log('K', "CSession.receiveBody: Reading %d bytes", _szBody); long szBody = _szBody; PCBlock block = new CBlock(); block->Alloc(szBody+1); Pchar p = block->Lock(); // Collect the pieces as they arrive PCBlock msg = new CBlock(); ulong iBuf = 0; short timeOut = pause; // Wait awhile for data while (szBody > 0) { // Receive a portion of the expected message long size = socket->Receive(msg, szBody); if (size == 0) { int err = socket->GetLastError(); if (err == WSAECONNABORTED) { result->SetResult(FVE_RECEIVEABORTED); return 0; } else { char mssg[sizeString]; Strcpy(mssg, socket->GetLastErrorMessage(), sizeString); if (strlen(mssg) && strcmp(mssg, "(null)")) { ackError("500 Internal Service Error - " "Failed to receive data"); Log('D', "CSession.receiveBody: * Error <%s> while reading body", mssg); result->SetResult(500); return 0; } } // May need more time for the full message to arrive if (timeOut-- == 0) { // Failed to receive all the data ackError("500 Internal Service Error - " "Waited too long for data"); Log('D', "CSession.receiveBody: * Timed out " "waiting for data from %s", hostIP); result->SetResult(500); return 0; } Pause(10); continue; } if (size > szBody) { msg->Lock(); ackError("500 Internal Service Error - " "Size of data exceeds Content-Length %ld", _szBody); Log('D', "CSession.receiveBody: * Size of " "data from %s larger than " "Size-header specified %ld > %ld", hostIP, size, szBody); result->SetResult(500); return 0; } timeOut = pause; Pchar m = msg->Lock(); memcpy(p+iBuf, m, size); msg->Free(); szBody -= size; iBuf += size; } //------------------------------------------------------------------ // Let the user decrypt the body and the signature doDecrypt(_headers, block); //------------------------------------------------------------------ // Convert the block to a CAtom object of the appropriate type char typeName[sizeString]; getHeaderStrValue(_headers, "Content-Type:", typeName, sizeString); if (StrEmpty(typeName)) { ackError("400 Bad Request - Content-Type Missing"); Log('D', "CSession.receiveBody: * Could not " "find the Content-Type header"); result->SetResult(400); return 0; } // Translate the contentType (type/subtype) into the farVIEW // container class into which to store the content ushort type = getFVType(typeName); result->SetType(typeName); // Validate the type code if (!type) { ackError("400 Bad Request - MIME Type Not Recognized"); Log('D', "CSession.receiveBody: * MIME Type %s not recognized", typeName); result->SetResult(400); return 0; } // Create a body object of the specified type PCAtom body = CFactory::TheFactory->MakeAtom(type); if (!body) { ackError("400 Bad Request - MIME Type Not Recognized"); Log('D', "CSession.receiveBody: * Could not make a body " "object of type %s", typeName); result->SetResult(400); return 0; } // Convert the block contents to the specified body type PCAtom blck = TYPECAST(block, Atom); if (!(body->BlockTo(blck))) { ackError("500 Internal Server Error - " "Could not convert type of data"); Log('D', "CSession.receiveBody: * Could not " "convert body to type %s", typeName); result->SetResult(500); return 0; } // Temporary //?ackError("200 OK"); body->Save("recvbody.log"); Log('K', "CSession.receiveBody: Body was received ok"); // Dump the first line if the body is a string list (which is the // usual case) and is not empty if (body->Isa(iStrList)) { PCStrList b = TYPECAST(body, StrList); if (b->GotoHead()) { p = b->StrItem(false); Log('K', "CSession.receiveBody: %s", p); } } return body; } // receiveBody //--------------------------------------------------------------------- // Must override this method to actually do anything with the data // received. // bool CSession::doRequest(RPCStrList _headers, RPCAtom _body) { _headers; _body; Log('K', "CSession.doRequest: Entry"); Assert(false); return false; } // doRequest //--------------------------------------------------------------------- // Receive an expected message. Messages are assumed to have a standard // HTTP form of one or more headers, followed by a line break, followed // possibly by content, the length of which is determined by the // Content-Length header. // bool CSession::Receive(RPCReceiveMessage _recvMssg) { Log('K', "CSession.Receive: Receiving from %s", hostIP); bool isResponse = false; ulong err = 200; // = OK char line[sizeString]; char head[sizeString]; head[0] = 0; long szBody; //------------------------------------------------------------------ // Process the headers // // Collect the header portion of the message PCStrList headers = receiveHeaders(); // Should have received headers if (!headers) { int err = socket->GetLastError(); if (err == WSAECONNABORTED) { result->SetResult(FVE_RECEIVEABORTED); return true; } else { Log('D', "CSession.Receive: * No headers received"); result->SetResult(FVE_RECEIVEFAILED); return false; } } // Validate the first header if (headers->GotoHead()) { headers->StrItem(line, sizeString); Squeeze(line, ' '); Strcpy(head, line, sizeString); if (StrPos("FAR", line) < 0) { // form: HTTP/1.x ccc mssg isResponse = true; // Validate response headers Squeeze(line, ' '); PCRecord rec = new CRecord(line, " "); err = rec->GetLongField(2); if (err != 0 && (err < 200 || err > 299)) Log('D', "CSession.Receive: * Response %d indicates error", err); result->code = err; rec->DeleteField(1); rec->DeleteField(1); rec->GetDelimStr(line, sizeString); Trim(line); Pchar mssg = new char[strlen(line)+1]; strcpy(mssg, line); result->SetMssg(mssg); } // Validate the FAR header // form: FAR url HTTP/1.0 else if (StrPos("FAR", line) < 0 || StrPos("HTTP/1.0", line) < 0) { ackError("400 Bad Request - Invalid Header"); Log('D', "CSession.Receive: * Header <%s> invalid", line); result->SetResult(FVE_HEADERSINVALID); return false; } } //------------------------------------------------------------------ // Process the body. If the Content-Length header is present and // specifies a non-zero size, the body is expected as well as the // Content-Type header. // // Get the size of the body in the request if (err == 200) { szBody = getHeaderIntValue(headers, "Content-Length:"); if (!szBody) { result->SetHead(headers); PCAtom body; result->SetBody(body); result->SetResult(FVE_OK); return true; } // Check for too big. Sender should only send "small" blocks if (szBody > szLimit) { Log('D', "CSession.Receive: * Body length exceeds " "program's size limit (%d)", szBody); if (!isResponse) ackError("413 Request Entity Too Large"); result->SetResult(FVE_HEADERSINVALID); return false; } // Get the body from the sender if (szBody) { PCAtom body = receiveBody(szBody, headers); if (!body) { if (result->GetResult() == FVE_RECEIVEABORTED) return true; else { Log('D', "CSession.Receive: * Failed to " "receive the body from %s", hostIP); if (!isResponse) ackError("400 Bad Request - Error In Body"); result->SetResult(FVE_RECEIVEFAILED); return false; } } // Process the message with a body else if (_recvMssg->doRequest) { // Process the message Log('L', "CSession.Receive.1:" " Received <%s> (sz=%ld) from %s", head, szBody, hostIP); doRequest(headers, body); } else { // Return the message Log('L', "CSession.Receive.2:" " Received <%s> (sz=%ld) from %s", head, szBody, hostIP); getHeaderStrValue(headers, "Content-Type:", line, sizeString); result->SetType(line); result->SetHead(headers); result->SetBody(body); } result->SetResult(FVE_OK); return true; } } // Process the message without a body else if (_recvMssg->doRequest) { // Process the message Log('L', "CSession.Receive.3: Received <%s> from %s", head, hostIP); PCAtom body; doRequest(headers, body); } else { // Return the message Log('L', "CSession.Receive.4: Received <%s> from %s", head, hostIP); result->SetHead(headers); PCAtom body; result->SetBody(body); } result->SetResult(FVE_OK); return true; } // Receive //--------------------------------------------------------------------- // Receive and handle commands enqueued in the input port // void CSession::Run(void) { Log('D', "CSession.Run: Starting session %s with %s:%d", sessionID, hostIP, hostPort); while (true) { // Get the next request Log('K', "CSession.Run: Waiting for command"); PCItem cmd = GetPort()->Receive(); PCCommCmmd cmmd = TYPECAST(cmd, CommCmmd); Assert(cmmd->Isa(iCommCmmd)); // Save the user's result object for notification later result = cmmd->GetCommResult(); if (!result) result = new CCommResult(/*_wait*/false); //--------------------------------------------------------------- // Handle OpenConnect command // if (cmmd->cmmd == 'OC') { Log('K', "CSession.Run:" " Received OpenConnect -connect command for %s", sessionID); Assert(!socket); socket = new CSocket(); if (socket->ClientConnect(hostIP, hostPort)) { Log('K', "CSession.Run: Connect to %s:%d succeeded for %s", hostIP, hostPort, sessionID); result->SetResult(FVE_OK); } else { Log('D', "CSession.Run: * Connect to %s:%d failed for %s", hostIP, hostPort, sessionID); socket = 0; result->SetResult(FVE_CONNECTIONNOTOPEN); } if (GetPort()->Empty()) result->Release(); } // OpenConnect command //--------------------------------------------------------------- // Handle CloseSession command // else if (cmmd->cmmd == 'CS') { Log('K', "CSession.Run:" " Received CloseSession command for %s", sessionID); break; // Quits the session } // CloseSession command //--------------------------------------------------------------- // Handle SendMessage command // else if (cmmd->cmmd == 'SM') { Log('K', "CSession.Run:" " Received SendMessage command for %s", sessionID); if (socket) { PCSendMessage sendMssg = TYPECAST(cmmd->data, SendMessage); // Send the specified message if (!Send(sendMssg)) Log('D', "CSession.Run:" " * SendMessage failed for %s", sessionID); } else result->SetResult(FVE_SOCKETNOTOPEN); Log('K', "CSession.Run:" " SendMessage processing complete for %s", sessionID); if (GetPort()->Empty()) result->Release(); } // SendMessage //--------------------------------------------------------------- // Handle ReceiveMessage command // else if (cmmd->cmmd == 'RM') { Log('K', "CSession.Run:" " Received ReceiveMessage command for %s", sessionID); if (socket) { PCReceiveMessage recvMssg = TYPECAST(cmmd->data, ReceiveMessage); if (!Receive(recvMssg)) Log('D', "CSession.Run:" " * ReceiveMessage failed for %s", sessionID); } else result->SetResult(FVE_SOCKETNOTOPEN); Log('K', "CSession.Run:" " ReceiveMessage processing complete for %s", sessionID); if (GetPort()->Empty()) result->Release(); } // ReceiveMessage //--------------------------------------------------------------- // Handle unrecognized command // else { Log('D', "CSession.Run:" " * Received unrecognized command (%ld) for %s", cmmd->cmmd, sessionID ); cmmd->SetResult(FVE_CMMDINVALID); Assert(false); } // Unrecognized command } //------------------------------------------------------------------ // Shut-down procedure follows Log('L', "CSession.Run: Stopping session %s", sessionID); if (socket) { socket->Disconnect(); socket = 0; } Log('K', "CSession.Run:" " Disconnect from %s:%d succeeded for %s", hostIP, hostPort, sessionID); result->SetResult(FVE_OK); result->Release(); } // Run //===================================================================== // farVIEW TCP/IP listener thread class. Listens for incoming messages. // When a message is detected, the listener thread spawns a CSession // thread object to handle the client's request. //===================================================================== //--------------------------------------------------------------------- // CListen::CListen(pCComm _comm, short _listenPort, RPCCommResult _result, short _size): CThread (_size), comm (_comm), listenPort(_listenPort), result (_result) { cls = iListen; Assert(comm); Assert(result); } // CListen //--------------------------------------------------------------------- // CListen::~CListen() { Log('D', "CListen destructing"); } // ~CListen //--------------------------------------------------------------------- // RTTI_CPP(Listen, Thread) //--------------------------------------------------------------------- // Create a new session thread and store it in the dictionary. // PCSession CListen::getSession(RPCSocket _socket) { // Make a new session to handle the incoming message. (note: a bad // person could flood this farVIEW with bogus sessionIDs, which // could cause it to create many dangling sessions. This has to be // handled!! Maybe an upper limit of the number of sessions active // at one time? Could also include some minor trickery to avoid // script-kiddy intrusions. Could do something like encrypting the // sessionID along with some fixed (secret) keyword. If the keyword // is missing, discard the request.) Log('K', "CListen.getSession: Entry"); // Create the new session char sessionID[sizeLine]; ::GetID(sessionID, sizeLine); PCSession session = comm->newSession(sessionID, _socket->GetName(), 0, ""); Assert(session); session->Socket(_socket); comm->sessions->InsertSession(sessionID, session); session->Start(); return session; } // getSession //--------------------------------------------------------------------- // Disconnect the socket from the correspondent. // void CListen::Disconnect(void) { Log('K', "CListen.Disconnect: Entry"); if (listenSocket) listenSocket->Disconnect(); } // Disconnect //--------------------------------------------------------------------- // Accept client request connections and hand them off to CSession // subthreads for resolution. Note: need to include some code to limit // the number of active clients. // void CListen::Run(void) { Log('C', "CListen.Run: Entry"); Assert(result); // Validate time between samplings if (pause < 0) pause = 0; // Get a socket to listen for incoming messages listenSocket = new CSocket(SOCKET_ERROR, /*_blocking=*/true); if (!(listenSocket->ServerConnect(listenPort))) { Log('D', "CListen.Run: * Failed - ServerConnect error %s", listenSocket->GetLastError()); return; } Log('D', "CListen.Run: Listening to port %ld", listenPort); char line[sizeString]; while (true) { // Get a socket for the CSession PCSocket sessionSocket = listenSocket->Accept(); if (!sessionSocket) { // No client right now if (!listenSocket || listenSocket->GetLastError()) break; continue; } // Check to accept the client // TBD sessionSocket->GetName(..); // Create and start up a CSession thread object PCSession session = getSession(sessionSocket); if (!session) continue; // Send a ReceiveMessage command to the CSession object PCReceiveMessage recvMssg = new CReceiveMessage(session->CommID(false), true); PCItem recv = TYPECAST(recvMssg, Item); PCCommResult rslt = new CCommResult(/*_wait*/false); PCItem cmmd = new CCommCmmd('RM', recv, rslt); session->GetPort()->Post(cmmd); Log('K', "CListen.Run:" " Sent RM command from %s to session %s", listenSocket->GetName(), session->CommID(false)); } // while (true) Log('D', "CListen::Run terminating"); Assert(result); result->SetResult(FVE_OK); result->Release(); } // Run //===================================================================== // //--------------------------------------------------------------------- // CWatcher::CWatcher(RPCSessionDict _sessions, //? RPCCriticalSection _section, RPCCommResult _result, short _size): CThread (_size), done (false), //?section (_section), sessions(_sessions), result (_result) { //?cls = iWatcher; Assert(result); } // CWatcher //--------------------------------------------------------------------- // CWatcher::~CWatcher() { if (!done) Assert(done); Log('D', "CWatcher destructing"); } // ~CWatcher //--------------------------------------------------------------------- // void CWatcher::Run(void) { Log('C', "CWatcher.Run: Entry"); Assert(result); short width = sessions->GetWidth(); while (!done) { /*? // Close any CSession thread not running sessions->Enter(); for (short i = 0; i < width; i++) { PCList list = sessions->GetList(i); if (list && list->GotoHead()) do { pCSession session = TYPECAST(list->Item(), Session); // Remove the session if stopped if (session->GetState() == THREAD_STOPPED) { SysLog::Log('K', "CWatch.Run: " "Removing session %d (%d:%s)", session->ThreadID(), session->NrRefs(), session->CommID(false)); list->Remove(atTop); // leaves the head item active } } while (list->GotoNext()); } sessions->Leave(); ?*/ Pause(10); } // while (true) Log('D', "CWatcher::Run terminating"); Assert(result); result->SetResult(FVE_OK); result->Release(); } // Run //===================================================================== // farVIEW communication command handler class. Provides a procedural // interface between users and the Comm threads. Provides APIs for // OpenComm and CloseComm, OpenSession and CloseSession, SendMessage, // and ReceiveMessage. When specified in the CCommResult parameter, // CComm methods wait for completion before returning control to the // caller. //===================================================================== //--------------------------------------------------------------------- // CComm::CComm(RPCAtom _book, Pchar _userName, Pchar _domain, short _listenPort, short _pause, short _size): CAtom (), book (_book), userName (0), domain (0), listenPort (_listenPort), size (_size) { cls = iComm; //?Assert(_book); if (!TheComm) TheComm = this; Assert(!StrEmpty(_userName)); STRINIT(userName, _userName) STRINIT(domain, _domain) pause = _pause; // Create a sessions dictionary sessions = new CSessionDict(); // Initialize a CommResult object for general use listenResult = new CCommResult(); } // CComm //--------------------------------------------------------------------- // CComm::~CComm() { delete[] userName; userName = 0; delete[] domain; domain = 0; } // ~CComm //--------------------------------------------------------------------- // RTTI_CPP(Comm, Atom) //--------------------------------------------------------------------- // void CComm::log(char _flag, Pchar _format, ...) { if (!StrEmpty(_format)) { // Print formatted message va_list args; va_start(args, _format); char line[sizeString]; vsprintf(line, _format, args); va_end(args); Assert(strlen(line) < sizeString); char date[sizeLine]; GetGMT(date, sizeLine); ulong now = GetTicks(); SysLog::Log(_flag, "(%d: %ld) %s", 0, now, line); //? SysLog::Log(_flag, "(%d: %ld) %s", 0, now - then, line); //? then = now; } } // log //--------------------------------------------------------------------- // This method creates a CSession object from the specified parameters. // Users of the communication package must override this method to // create a tailored CSession object. Note that this method is // protected and is used only by CListen and CComm methods. // PCSession CComm::newSession(Pchar _sessionID, Pchar _hostIP, long _hostPort, Pchar _userName) { return new CSession(this, book, _userName, domain, _sessionID, _hostIP, _hostPort, listenPort); } // newSession //--------------------------------------------------------------------- // Initialize the socket interface software. Create and start a TCP // socket listener object if a listenPort was specified in the // constructor. // bool CComm::OpenComm(RPCCommResult _result) { log('K', "CComm.OpenComm: * Entry"); Assert(_result); _result->result = FVE_INPROGRESS; // Initialize the WinSock subsystem CSocket::Initialize(pause); // Create a listener if requested if (listenPort) { listener = new CListen(this, listenPort, listenResult, size); listener->Start(16000); } // Create a dead-session watcher watcherResult = new CCommResult(); watcher = new CWatcher(sessions, watcherResult); watcher->Start(16000); _result->SetResult(FVE_OK); _result->Release(); return true; } // OpenComm //--------------------------------------------------------------------- // Terminate the listener thread if active. Close all active CSession // threads by force. Shut down the socket interface software. // bool CComm::CloseComm(RPCCommResult _result) { log('K', "CComm.CloseComm: * Entry"); Assert(_result); _result->result = FVE_INPROGRESS; // Close the listener port (it quits when it is disconnected) if (!!listener) { listener->Disconnect(); listenResult->Wait(); log('D', "Listener done"); } // Stop the dead-session watcher watcher->Done(); watcherResult->Wait(); log('D', "Watcher done"); // Close any CSession threads still open short width = sessions->GetWidth(); for (short i = 0; i < width; i++) { PCList list = sessions->GetList(i); if (list && list->GotoHead()) do { PCSession session = TYPECAST(list->Item(), Session); // Close the session if needed if (session->GetState() == THREAD_RUNNING) { log('K', "CComm.CloseComm: Closing session %d (%d:%s)", session->ThreadID(), session->NrRefs(), session->CommID(false)); PCCommResult r = new CCommResult(); PCSocket socket = session->Socket(); if (socket) socket->Disconnect(); // Send a CloseSession request to the session PCItem close = new CCloseSession(session->CommID(false)); PCItem command = new CCommCmmd('CS', close, r); session->GetPort()->Post(command); r->Wait(); // wait for completion } else log('K', "CComm.CloseComm: Session %d closed (%d:%s)", session->ThreadID(), session->NrRefs(), session->CommID(false)); } while (list->GotoNext()); } sessions->Flush(); listener = 0; watcher = 0; // Close the WinSock subsytem CSocket::UnInitialize(); _result->SetResult(FVE_OK); _result->Release(); return true; } // CloseComm //--------------------------------------------------------------------- // Create a CSession thread object, associating it with the specified // session ID. Add the CSession object to the session dictionary. // Request that the CSession object connect to the specified host // computer. // bool CComm::OpenSession(Pchar _sessionID, Pchar _hostIP, long _hostPort, Pchar _userName, RPCCommResult _result) { log('K', "CComm.OpenSession:" " sessionID=%s, hostIP=%s, hostPort=%ld", _sessionID, _hostIP, _hostPort); Assert(!StrEmpty(_sessionID)); Assert(_result); _result->result = FVE_INPROGRESS; // Create a CSession thread to handle communication via the // specified connection PCSession session = newSession(_sessionID, _hostIP, _hostPort, _userName); if (!session) { log('D', "CComm.OpenSession: * Could not open session"); _result->SetResult(FVE_SESSIONNOTOPEN); _result->Release(); return false; } // Add the CSession object to the dictionary sessions->InsertSession(_sessionID, session); // Start the thread session->Start(); PCCommResult r = new CCommResult(); // Request that the CSession thread open the connection PCItem opening = new COpenConnect(_sessionID); PCItem command = new CCommCmmd('OC', opening, r); session->GetPort()->Post(command); r->Wait(); // wait until the thread opens the connection if (r->GetResult() != FVE_OK) { log('D', "CComm.OpenSession:" " * Could not open session (%ld)", r->GetResult()); _result->SetResult(FVE_SESSIONNOTOPEN); _result->Release(); return false; } log('K', "CComm.OpenSession: Opened session"); _result->SetResult(FVE_OK); _result->Release(); return true; } // OpenSession //--------------------------------------------------------------------- // Request that the CSession thread object associated with the // specified session ID disconnect from the correspondent computer. // Close and remove the CSession object from the dictionary. // bool CComm::CloseSession(Pchar _sessionID, RPCCommResult _result) { log('K', "CComm.CloseSession: sessionID=%s", _sessionID); Assert(_result); _result->result = FVE_INPROGRESS; // Find the CSession thread to handle this command PCSession session = sessions->GetFirstSession(_sessionID); if (!session) { log('D', "CComm.CloseSession: * Could not find session"); _result->SetResult(FVE_SESSIONNOTFOUND); return false; } log('K', "CComm.CloseSession: Found session"); PCItem command; // Request the CSession thread to close itself PCItem close = new CCloseSession(_sessionID); command = new CCommCmmd('CS', close, _result); session->GetPort()->Post(command); // Remove the CSession object from the dictionary //?sessions->RemoveSession(_sessionID); log('K', "CComm.CloseSession: Removed session (%d)", session->NrRefs()); _result->SetResult(FVE_OK); return true; } // CloseSession //--------------------------------------------------------------------- // Forward the SendMessage request to the CSession thread object // associated with the specified session ID. Wait for the request to // be completed. // bool CComm::SendMessage(Pchar _sessionID, Pchar _messageType, Pchar _command, Pchar _argument, Pchar _contentType, RPCAtom _body, disposition _life, RPCCommResult _result) { log('K', "CComm.SendMessage: sessionID=%s", _sessionID); Assert(stricmp(_messageType, "HTTP") == 0 || stricmp(_messageType, "FAR") == 0 || stricmp(_messageType, "ACK") == 0); if (stricmp(_messageType, "ACK") != 0) Assert(!StrEmpty(_command)); Assert(_result); _result->result = FVE_INPROGRESS; // Find the CSession thread to handle this command PCSession session = sessions->GetFirstSession(_sessionID); if (!session) { log('D', "CComm.SendMessage: * Could not find session"); _result->SetResult(FVE_SESSIONNOTFOUND); _result->Release(); return false; } log('K', "CComm.SendMessage: Found session"); // Request the CSession thread to send a message PCBlock signature; PCItem message = new CSendMessage(_sessionID, _messageType, _command, _argument, _contentType, signature, _body, _life); _result->Wait(); // wait until the message is sent PCItem command = new CCommCmmd('SM', message, _result); session->GetPort()->Post(command); return true; } // SendMessage //--------------------------------------------------------------------- // Forward the ReceiveMessage request to the CSession thread object // associated with the specified session ID. Wait for the request to // be completed. // bool CComm::ReceiveMessage(Pchar _sessionID, RPCCommResult _result, bool _doRequest) { log('K', "CComm.ReceiveMessage: sessionID=%s", _sessionID); Assert(_result); _result->result = FVE_INPROGRESS; // Find the CSession thread to handle this command PCSession session = sessions->GetFirstSession(_sessionID); if (!session) { log('D', "CComm.ReceiveMessage: * Could not find session"); _result->SetResult(FVE_SESSIONNOTFOUND); _result->Release(); return false; } log('K', "CComm.ReceiveMessage: Found session"); // Request the CSession thread to set itself to receive a message PCItem recvMssg = new CReceiveMessage (_sessionID, _doRequest); _result->Wait(); // wait until the message is received PCItem command = new CCommCmmd('RM', recvMssg, _result); session->GetPort()->Post(command); return true; } // ReceiveMessage