dodo
0.0.1
A C++ library to create containerized Linux services
|
Go to the documentation of this file.
36 using namespace std::literals;
47 log_Debug(
"TCPConnectionData::readBuffer socket " << socket->
getFD() <<
48 " received " << recv <<
" bytes" );
49 log_Trace(
"TCPConnectionData::readBuffer socket " << socket->
getFD() <<
50 " received '" << tmp.
hexDump( recv ) <<
"'" );
51 read_buffer.append( tmp, recv );
54 log_Error(
"TCPConnectionData::readBuffer receive error socket " <<
55 socket->
getFD() <<
" error : '" << error.
asString() <<
" bytes'" );
58 }
while ( recv == 4096 );
87 gettimeofday( &(listener_.now_), NULL );
89 std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
144 sendbufsz = common::YAML_read_key_default<socklen_t>( node,
yaml_sendbufsz, 16384 );
145 recvbufsz = common::YAML_read_key_default<socklen_t>( node,
yaml_recvbufsz, 32768 );
162 TCPListener::~TCPListener() {
173 read_event_mask_ = EPOLLIN | EPOLLPRI | EPOLLONESHOT | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLWAKEUP;
181 SocketParams::stSTREAM,
182 SocketParams::pnHOPOPT )
191 std::string proc_somaxconn =
"/proc/sys/net/core/somaxconn";
217 std::string saddr = common::YAML_read_key<std::string>( yaml,
"listen-address" );
218 uint16_t port = common::YAML_read_key<std::uint16_t>( yaml,
"listen-port" );
225 std::unique_lock<std::mutex> lk(
mt_signal_ );
228 [
this,server]{ return work_q_sz_ || server->getRequestStop(); } );
237 struct epoll_event poll_sockets[
params_.pollbatch];
238 struct epoll_event set_event;
243 memset( poll_sockets, 0,
sizeof(poll_sockets) );
247 set_event.events = EPOLLIN | EPOLLPRI;
265 gettimeofday( &
now_, NULL );
280 log_Warning(
"TCPListener::run queuing on maxservers #clients=" <<
287 if ( rc < 0 && errno != EINTR )
290 for (
int i = 0; i < rc; i++ ) {
291 if ( poll_sockets[i].events != 0 ) {
296 if ( !client_sock->
isValid() )
break;
298 client_sock->
close();
309 log_Debug(
"TCPListener::run new client socket " <<
323 }
while ( client_sock->
isValid() );
324 }
else if ( poll_sockets[i].data.fd >= 0 ) {
326 if ( (poll_sockets[i].events & EPOLLIN) || (poll_sockets[i].events & EPOLLPRI) ) {
327 log_Debug(
"TCPListener::run EPOLLIN || EPOLLPRI on socket " <<
328 poll_sockets[i].data.fd <<
329 " events=(" << poll_sockets[i].events <<
")" );
336 if ( (poll_sockets[i].events & EPOLLRDHUP ) ||
337 (poll_sockets[i].events & EPOLLERR ) ||
338 (poll_sockets[i].events & EPOLLHUP ) ) {
340 log_Debug(
"TCPListener::run hangup or error on socket " <<
341 poll_sockets[i].data.fd <<
342 " events=(" << poll_sockets[i].events <<
")" );
345 poll_sockets[i].events = 0;
346 pushWork( poll_sockets[i].data.fd, combined_state );
350 log_Warning(
"TCPListener::run epoll event on invalid descriptor " <<
351 poll_sockets[i].data.fd <<
352 " events=(" << poll_sockets[i].events <<
")" );
362 std::cout <<
"dodo::common::Exception" << std::endl;
363 log_Error(
"TCPListener::run dodo::common::Exception " << e.
what() );
365 catch(
const std::exception &e ) {
366 std::cout <<
"std::exception" << std::endl;
367 log_Error(
"TCPListener::run std::exception " << e.what() );
370 std::cout <<
"..." << std::endl;
371 log_Error(
"TCPListener::run unhandled exception" );
374 log_Debug(
"TCPListener::run stop listener finish pending work" );
375 while(
work_q_sz_ > 0 ) std::this_thread::sleep_for(20ms);
376 log_Debug(
"TCPListener::run stop TCPServers " );
384 log_Debug(
"TCPListener::run stopped TCPServers " );
387 for(
auto c: clients_copy ) {
388 log_Debug(
"TCPListener::run close socket " << c.second.socket );
404 log_Debug(
"TCPListener::pushWork BaseSocket* socket " <<
405 clients_[fd].socket->debugString() <<
" state " << state );
417 " state " << work.
state );
458 log_Debug(
"TCPListener::closeSocket socket " << s <<
" fd=" << fd );
462 struct epoll_event set_event;
463 set_event.events = events;
464 set_event.data.fd = s->
getFD();
465 int rc = epoll_ctl(
epoll_fd_, EPOLL_CTL_ADD, s->
getFD(), &set_event );
467 log_Debug(
"TCPListener::pollAdd socket " << set_event.data.fd <<
" events=" << events );
471 struct epoll_event set_event;
472 set_event.events = events;
473 set_event.data.fd = s->
getFD();
474 int rc = epoll_ctl(
epoll_fd_, EPOLL_CTL_MOD, s->
getFD(), &set_event );
476 log_Debug(
"TCPListener::pollMod socket " << set_event.data.fd <<
" events=" << events );
480 struct epoll_event set_event;
481 set_event.events = 0;
482 set_event.data.fd = s->
getFD();
483 int rc = epoll_ctl(
epoll_fd_, EPOLL_CTL_DEL, s->
getFD(), &set_event );
485 log_Debug(
"TCPListener::pollDel socket " << set_event.data.fd );
493 log_Debug(
"TCPListener::addServers +1" );
512 struct timeval l_now;
548 "TCPServer id " << srv->getTID() <<
549 (srv->isBusy()?
" busy-":
" idle-") << srv->getState() <<
550 " ucpu=" << srv->getLastUserCPU() <<
551 " scpu=" << srv->getLastSysCPU() <<
552 " minflt=" << srv->getLastMinFltRate() <<
"/s" <<
553 " majflt=" << srv->getLastMajFltRate() <<
"/s" <<
554 " bi=" << srv->getLastBlkInRate() <<
"/s" <<
555 " bo=" << srv->getLastBlkOutRate() <<
"/s" <<
556 " yield=" << srv->getLastVCtx() <<
"/s" <<
557 " ctxsw=" << srv->getLastICtx() <<
"/s" );
569 struct timeval l_now;
576 size_t num_stopped = 0;
577 std::list<TCPServer*>::iterator i_server =
servers_.begin();
578 while ( i_server !=
servers_.end() ) {
580 if ( (*i_server)->hasStopped() ) {
583 i_server =
servers_.erase(i_server);
588 if ( num_stopped == 0 &&
591 !(*i_server)->isBusy() ) {
592 (*i_server)->requestStop();
603 i_server =
servers_.erase(i_server);
void setReceiveTimeout(double sec)
Set the receive timeout - a receive will fail if there is no data received before the timeout expires...
BaseSocket * listen_socket_
The listening BaseSocket.
Stats last_stats_
The last Stats.
const std::string yaml_sendbufsz
sendbufsz YAML configuration name
Stats prev_stats_
The previous Stats.
void setTCPNoDelay(bool set)
Set TCP_NODELAY.
Waits for and locks the Mutex on construction, unlocks the Mutex when this Mutexer is destructed.
threads::Mutex now_mutex_
Synchronize access to now_.
void addServers()
Add a server if work_q_sz_ exceeds servers.size() and servers_.size() < params_.maxservers.
Address getPeerAddress() const
Get the peer (remote) address for this socket.
size_t event_maxconnections_reached_
The number of times a connection was closed after accept because params_.maxconnectctions was reached...
common::SystemError listen(const Address &address, int backlog)
Sets up a listening socket on Address.
void cleanStoppedServers()
Cleanup stopped TCPServers.
bool fileReadInt(const std::string &file, int &i)
Read from a file, expecting it to contain a (signed) int.
Octet * getArray() const
Return the array.
SockState
BaseSocket lifecycle states.
bool isValid() const
True if this Address is valid.
socklen_t sendbufsz
The send buffer size, set on the listening socket, inherited by accepted sockets.
void wait()
Wait for the thread to join the current thread / wait for the thread to finish.
@ None
Undefined / initial.
pid_t getTID() const
Return the tid.
Set the precision for floating point fixed format.
double getLastMinFltRate()
Get the minor fault rate since last sample.
double getLastMajFltRate()
Get the major fault rate since last sample.
void clearBuffer()
Clears the read_buffer.
double getLastBlkInRate()
Get last block in rate since last sample.
const std::string yaml_minservers
minservers YAML configuration name
size_t cycle_max_throttles
Maximum number of throttles per epoll_wait cycle.
const std::string yaml_tcp_keep_alive
yaml_tcp_keep_alive YAML configuration name
void setBlocking(bool blocking)
Set the Socket blocking mode.
Put the stream in hexadecimal mode.
void stop()
Stop the timer.
std::string asString(bool withport=false) const
Return a string representation of this Address.
ssize_t received
The number of bytes received.
SockState state
State of the socket.
@ New
New connection, TCPServer::handShake() will be called.
const std::string yaml_server_idle_ttl_s
server_idle_ttl_s YAML configuration name
void reserve(size_t size)
Reserve memory in the Bytes.
SocketParams::AddressFamily getAddressFamily() const
Get this Address family.
threads::Mutex clientmutex_
Protects both client_ and workload_.
virtual void close()
Closes the socket, causing the connection, if it exists, to be terminated.
ssize_t sent
The number of bytes sent.
void setReUseAddress()
Enable the socket to re-use an address when listen/bind is called.
An array of Octets with size elements.
@ Shut
BaseSocket is hung up or in error, TCPServer::shutDown() will be called.
Generic network Address, supporting ipv4 and ipv6 transparently.
int backlog_
The backlog (incoming connection queue before accept calls clearing it) used by listen.
Updates the attribute now_ in the TCPListener at a regular interval to avoid excessive number of call...
double getLastUserCPU()
Get the user mode cpu (cpu seconds/second) since last sample.
const std::string yaml_maxservers
maxservers YAML configuration name
TCPServer * init_server_
The initial TCPServer, which will live as long as the TCPListener runs, and is sued to create new TCP...
template size_t YAML_read_key_default< size_t >(const YAML::Node &, const std::string &, const size_t &)
Instantiate template YAML_read_key for size_t.
const std::string yaml_receive_timeout_seconds
yaml_receive_timeout_seconds YAML configuration name
size_t minservers
The minimum number of TCPServers.
const std::string yaml_maxqdepth
maxqdepth YAML configuration name
time_t stat_trc_interval_s
The epol__wait timeout in ms.
size_t maxqdepth
Maximum size of connection and request work queues.
const std::string yaml_pollbatch
pollbatch YAML configuration name
void closeSocket(BaseSocket *socket)
Tell the listener to close the socket.
virtual const char * what() const noexcept
Return the exception message.
std::string asString() const
Get the system error string.
uint32_t read_event_mask_
The event mask for read events.
double getLastBlkOutRate()
Get last block out rate since last sample.
template double YAML_read_key_default< double >(const YAML::Node &, const std::string &, const double &)
Instantiate template YAML_read_key for double.
void start()
Start the thread.
TCPListenerTimer(TCPListener &listener)
Constructor, specify the TCPListener to operate on.
struct timeval stat_time_
Time of last statistics.
size_t maxconnections
The maximum number of connections (in effect, sockets) allowed.
BaseSocket * socket
Pointer to the socket.
struct timeval warn_queue_time_
Time of last queueing warning.
virtual void run()
Entrypoint, override of Thread::run()
const std::string yaml_maxconnections
maxconnections YAML configuration name
socklen_t recvbufsz
The receive buffer size, set on the listening socket, inherited by accepted sockets.
std::condition_variable cv_signal_
Condition variable used to wakeup TCPServer threads with mutex mt_signal_.
void setTCPKeepAlive(bool enable)
Enable or disable TCP keep-alive on the socket.
void pollAdd(const BaseSocket *s, uint32_t events)
Add an epoll event for the BaseSocket.
virtual BaseSocket * accept()=0
Accepts a connection request and return a pointer to a new Socket for the new connection,...
double getLastICtx()
Get involuntary context switch rate since last sample.
void releaseWork(const SocketWork &work)
Called by a TCPServer to signal that the work has been handled and event detection on it can resume.
Address listen_address_
The listening address.
A C++ platform interface to lean Linux services tailored for containerized deployment.
std::deque< SocketWork * > workload_
Queue of sockets with work for TCPServer instances.
#define log_Statistics(what)
Macro to log Statistics.
#define log_Error(what)
Macro to log Error.
Socket parameters - the family (domain), socket type and protocol triplet.
int receive_timeout_seconds
Receive timeout in seconds.
std::list< TCPServer * > servers_
List of TCPServers.
Params()
Construct with default parameters.
const std::string yaml_cycle_max_throttles
cycle_max_throttles YAML configuration name
std::mutex mt_signal_
Mutex used to wakeup TCPServer threads with condition variable cv_signal_.
bool waitForActivity(TCPServer *server)
Called by the TCPServer, enters wait state until woken up by either a timeout or a notify by TCPListe...
const std::string yaml_throttle_sleep_us
throttle_sleep_us YAML configuration name
void pollDel(const BaseSocket *s)
Delete the epoll event for the BaseSocket.
size_t throttle_sleep_us
The time to sleep when the queue gets too big (letting workers clear from the queue without accepting...
const std::string yaml_listener_sleep_ms
listener_sleep_ms YAML configuration name
#define throw_SystemException(what, errno)
Throws an Exception with errno, passes FILE and LINE to constructor.
SocketWork * popWork()
Pop work.
virtual common::SystemError receive(void *buf, ssize_t request, ssize_t &received)=0
Receive bytes from the socket.
void snapRUsage()
Take a snapshot of the thread's resource usage.
bool stopped_
True if stop() was called.
@ Read
Data is ready to be read, TCPServer::readSocket() will be called.
ssize_t throttles
The number of throttles.
#define throw_Exception(what)
Throws an Exception, passes FILE and LINE to constructor.
void throttle()
Throttle the listener when work_q_sz_ exceeds params_.maxqdepth.
int getFD() const
Return the socket file descriptor.
virtual TCPServer * addServer()=0
Spawn a new TCPServer.
@ ecOK
0 Not an error, success
virtual TCPConnectionData * newConnectionData() const
Return a new TCPConnectionData pointer or override to return a descendant of TCPConnectionData.
Parameters affecting TCPListener behavior.
Used in conjunction with TCPListener to implement high speed, multithreaded TCP services.
virtual void run()
Decsendants must override the run function.
const std::string yaml_send_timeout_seconds
yaml_send_timeout_seconds YAML configuration name
double getSecondDiff(struct timeval &t1, struct timeval &t2)
Return difference in seconds as a double.
struct timeval prev_stat_time_
Time of previous statistics.
std::string debugString() const
Return the object dump to string.
int listener_sleep_ms
The TCPListener epoll_wait timeout in ms.
common::SystemError readBuffer(BaseSocket *socket, ssize_t &received)
Reads and appends data to read_buffer.
void setSendBufSize(socklen_t size)
Set the size of the send buffer.
TCPListener(const Address &address, const Params ¶ms)
Constructor.
void pollMod(const BaseSocket *s, uint32_t events)
Modify an epoll event for the BaseSocket.
BaseSocket socket and state pair.
long getMaxRSS()
Get the maximum resident set size seen on the thread.
std::map< int, SocketWork > clients_
Map of file descriptors to SocketWork for all connected clients.
bool isValid() const
Return true when the socket descriptor is a valid, hence 'possible' descriptor.
void logStats()
Log TCPListener statistics to Logger::getLogger().
std::atomic< unsigned long long > work_q_sz_
The number of queued work items.
std::string hexDump(size_t n) const
hex dump the first n octets of the data.
#define log_Trace(what)
Macro to log Trace.
ssize_t requests
The number of requests.
Linux system error primitive to provide a consistent interface to Linux error codes.
TCPListener & listener_
The associated TCPListener.
void construct(const Address &address, const Params ¶ms)
Common constructor code.
void setReceiveBufSize(socklen_t size)
Set the size of the receive buffer.
double getLastSysCPU()
Get the system mode cpu (cpu seconds/second) since last sample.
Put the stream in decimal mode.
int send_timeout_seconds
Send timeout in seconds.
uint32_t hangup_event_mask_
The event mask for hangup events.
bool tcp_keep_alive
Toggle TCP keep-alive.
ssize_t connections
The number of connections.
struct timeval now_
Frequently updated by a TCPListenerTimer with enough precision for its use.
const std::string yaml_recvbufsz
recvbufsz YAML configuration name
The TCPListener listens, accepts connections and generates socket events to produce TCP work to a poo...
size_t maxservers
The maximum number of TCPServers.
template bool YAML_read_key_default< bool >(const YAML::Node &, const std::string &, const bool &)
Instantiate template YAML_read_key for bool.
void pushWork(const SocketWork &work)
Push work.
size_t getSize() const
Return the array size.
void setReUsePort()
Make the socket re-use a port when listen is called.
void setSendTimeout(double sec)
Set the send timeout - a send will fail if there is no data send before the timeout expires.
Params params_
TCPListener parameters.
double getLastVCtx()
Get voluntary context switch rate since last sample.
int epoll_fd_
The epoll interface file descriptor.
#define log_Debug(what)
Macro to log Debug.
#define log_Warning(what)
Macro to log Warning.
double server_idle_ttl_s
The maximum TCPServer idle time in seconds before stopping the server, honoring minservers.
threads::Mutex stats_mutex_
Protects prev_stats_, last_stats_.
bool stop_server_
If true, the TCPListener will gracefully stop and finish.
const std::string yaml_stat_trc_interval_s
stat_trc_interval_s YAML configuration name
An Exception is thrown in exceptional circumstances, and its occurrence should generally imply that t...
Interface to and common implementation of concrete sockets (Socket, TLSSocket).
bool hasStopped() const
Return true if the TCPServer has stopped working.
struct timeval server_stopped_check_time_
Time of last check for stopped servers.
template int YAML_read_key_default< int >(const YAML::Node &, const std::string &, const int &)
Instantiate template YAML_read_key for int.
int pollbatch
The number of epoll_wait events read in one epol__wait wake-up.