SocketMuxer.h++
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifndef __ccxx_SocketMuxer_hxx
00024 #define __ccxx_SocketMuxer_hxx
00025
00026 #include <commonc++/Common.h++>
00027 #include <commonc++/CircularBuffer.h++>
00028 #include <commonc++/CriticalSection.h++>
00029 #include <commonc++/Iterator.h++>
00030 #include <commonc++/StaticObjectPool.h++>
00031 #include <commonc++/ServerSocket.h++>
00032 #include <commonc++/StreamSocket.h++>
00033 #include <commonc++/Thread.h++>
00034 #include <commonc++/Mutex.h++>
00035
00036 #ifdef CCXX_OS_POSIX
00037 #include <sys/select.h>
00038 #endif
00039
00040 namespace ccxx {
00041
00049 class COMMONCPP_API Connection
00050 {
00051 friend class SocketMuxer;
00052
00053 public:
00054
00056 static const size_t DEFAULT_BUFFER_SIZE;
00057
00059 virtual ~Connection() throw();
00060
00069 bool writeData(ByteBuffer& buffer);
00070
00080 bool writeData(const byte_t *buf, size_t count);
00081
00090 bool writeLine(const String &text);
00091
00100 size_t readData(ByteBuffer& buffer, bool fully = true);
00101
00111 size_t readData(byte_t *buf, size_t count, bool fully = true);
00112
00121 size_t readLine(String &text, size_t maxLen);
00122
00124 inline StreamSocket* getSocket() throw()
00125 { return(_socket); }
00126
00134 void setReadLowWaterMark(size_t count) throw();
00135
00144 void setReadHighWaterMark(size_t count) throw();
00145
00150 inline size_t getReadLowWaterMark() const throw()
00151 { return(_readLoMark); }
00152
00156 bool isReadLow() const throw();
00157
00161 bool isReadHigh() const throw();
00162
00172 void setWriteLowWaterMark(size_t count) throw();
00173
00180 void setWriteHighWaterMark(size_t count) throw();
00181
00186 inline size_t getWriteLowWaterMark() const throw()
00187 { return(_writeLoMark); }
00188
00193 inline size_t getWriteHighWaterMark() const throw()
00194 { return(_writeHiMark); }
00195
00199 bool isWriteLow() const throw();
00200
00205 bool isWriteHigh() const throw();
00206
00213 void close(bool immediate = false) throw();
00214
00219 inline byte_t getOOBData() throw()
00220 { _oobFlag = false; return(_oobData); }
00221
00227 inline bool getOOBFlag() const throw()
00228 { return(_oobFlag); }
00229
00232 inline time_ms_t getTimestamp() const throw()
00233 { return(_lastRecv); }
00234
00237 bool isClosePending() const throw();
00238
00239 protected:
00240
00245 Connection(size_t bufferSize = DEFAULT_BUFFER_SIZE);
00246
00247 private:
00248
00249 void read() throw(IOException);
00250 void readOOB() throw(IOException);
00251 void write() throw(IOException);
00252
00253 void attach(StreamSocket *socket) throw();
00254
00255 inline void setOOBFlag(bool flag) throw()
00256 { _oobFlag = flag; }
00257
00258 inline void setTimestamp(time_ms_t stamp) throw()
00259 { _lastRecv = stamp; }
00260
00261 StreamSocket *_socket;
00262
00263 protected:
00264
00265 CircularByteBuffer readBuffer;
00266 CircularByteBuffer writeBuffer;
00267
00268 private:
00269
00270 size_t _readLoMark;
00271 size_t _readHiMark;
00272 size_t _writeLoMark;
00273 size_t _writeHiMark;
00274 bool _oobFlag;
00275 bool _closePending;
00276 byte_t _oobData;
00277 time_ms_t _lastRecv;
00278 mutable CriticalSection _readLock;
00279 mutable CriticalSection _writeLock;
00280
00281 CCXX_COPY_DECLS(Connection);
00282 };
00283
00292 class COMMONCPP_API SocketMuxer : public Thread
00293 {
00294 public:
00295
00308 SocketMuxer(uint_t maxConnections = 64, uint_t defaultIdleLimit = 0,
00309 uint_t sleepInterval = 100);
00310
00313 virtual ~SocketMuxer() throw();
00314
00315 void run();
00316 void cleanup();
00317
00320 size_t getConnectionCount() const;
00321
00331 virtual bool init(ServerSocket* socket);
00332
00341 uint_t writeAll(const byte_t *buf, size_t count);
00342
00343 protected:
00344
00354 virtual Connection *connectionReady(const SocketAddress &address) = 0;
00355
00364 virtual void dataReceived(Connection *connection) = 0;
00365
00373 virtual void dataSent(Connection *connection);
00374
00381 virtual void dataReceivedOOB(Connection *connection);
00382
00390 virtual void connectionClosed(Connection *connection) = 0;
00391
00399 virtual void connectionTimedOut(Connection *connection) = 0;
00400
00408 virtual void exceptionOccurred(Connection *connection,
00409 const IOException& ex);
00410
00411 class ConnectionList;
00412
00415 ConnectionList *_connections;
00416
00417 private:
00418
00419 void _connectionTimedOut(Connection *connection);
00420 void _connectionClosed(Connection *connection);
00421
00422 Mutex _mutex;
00423 StaticObjectPool<StreamSocket> _pool;
00424 uint_t _idleLimit;
00425 uint_t _sleepIntervalSec;
00426 uint_t _sleepIntervalMicroSec;
00427 ServerSocket* _ssock;
00428
00429 CCXX_COPY_DECLS(SocketMuxer);
00430 };
00431
00432 };
00433
00434 #endif // __ccxx_SocketMuxer_hxx
00435
00436