dodo  0.0.1
A C++ library to create containerized Linux services
tcplistener.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of the dodo library (https://github.com/jmspit/dodo).
3  * Copyright (c) 2019 Jan-Marten Spit.
4  *
5  * This program is free software: you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation, version 3.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program. If not, see <http://www.gnu.org/licenses/>.
16  */
17 
18 /**
19  * @file tcplistener.cpp
20  * Implements the dodo::network::TCPListener class.
21  */
22 
23 #include <network/tcplistener.hpp>
24 #include <network/tcpserver.hpp>
25 #include <common/logger.hpp>
26 #include <common/util.hpp>
27 
28 #include <algorithm>
29 #include <map>
30 
31 
32 namespace dodo {
33 
34  namespace network {
35 
36  using namespace std::literals;
37 
39  common::Bytes tmp;
40  tmp.reserve( 4096 );
41  ssize_t recv = 0;
42  received = 0;
43  common::SystemError error;
44  do {
45  error = socket->receive( tmp.getArray(), tmp.getSize(), recv );
46  if ( ( error == common::SystemError::ecOK || error == common::SystemError::ecEAGAIN ) && recv > 0 ) {
47  log_Debug( "TCPConnectionData::readBuffer socket " << socket->getFD() <<
48  " received " << recv << " bytes" );
49  log_Trace( "TCPConnectionData::readBuffer socket " << socket->getFD() <<
50  " received '" << tmp.hexDump( recv ) << "'" );
51  read_buffer.append( tmp, recv );
52  received += recv;
53  } else if ( ! ( error == common::SystemError::ecOK || error == common::SystemError::ecEAGAIN ) ) {
54  log_Error( "TCPConnectionData::readBuffer receive error socket " <<
55  socket->getFD() << " error : '" << error.asString() << " bytes'" );
56  return false;
57  }
58  } while ( recv == 4096 );
59  return error;
60  }
61 
63  read_buffer.free();
64  }
65 
66  /**
67  * Updates the attribute now_ in the TCPListener at a regular interval to avoid excessive number of calls
68  * to gettimeofday in the TCPListener event loop where time high time precision is of lesser importance.
69  */
71  public:
72  /**
73  * Constructor, specify the TCPListener to operate on.
74  * @param listener the TCPListener to operate on.
75  */
76  explicit TCPListenerTimer( TCPListener &listener ) : listener_(listener), stopped_(false) {};
77 
78  virtual ~TCPListenerTimer() {
79  stop();
80  wait();
81  }
82 
83  virtual void run() {
84  while ( !stopped_ ) {
85  {
86  threads::Mutexer lock( listener_.now_mutex_ );
87  gettimeofday( &(listener_.now_), NULL );
88  }
89  std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
90  }
91  }
92 
93  /**
94  * Stop the timer.
95  */
96  void stop() {
97  stopped_ = true;
98  }
99  protected:
100  /** The associated TCPListener. */
102  /** True if stop() was called. */
103  bool stopped_;
104  };
105 
106  /** minservers YAML configuration name */
107  const std::string yaml_minservers = "min-servers";
108  /** maxservers YAML configuration name */
109  const std::string yaml_maxservers = "max-servers";
110  /** maxconnections YAML configuration name */
111  const std::string yaml_maxconnections = "max-connections";
112  /** maxqdepth YAML configuration name */
113  const std::string yaml_maxqdepth = "max-queue-depth";
114  /** sendbufsz YAML configuration name */
115  const std::string yaml_sendbufsz = "send-buffer";
116  /** recvbufsz YAML configuration name */
117  const std::string yaml_recvbufsz = "receive-buffer";
118  /** server_idle_ttl_s YAML configuration name */
119  const std::string yaml_server_idle_ttl_s = "server-idle-ttl-s";
120  /** pollbatch YAML configuration name */
121  const std::string yaml_pollbatch = "poll-batch";
122  /** listener_sleep_ms YAML configuration name */
123  const std::string yaml_listener_sleep_ms = "listener-sleep-ms";
124  /** throttle_sleep_us YAML configuration name */
125  const std::string yaml_throttle_sleep_us = "throttle-sleep-us";
126  /** cycle_max_throttles YAML configuration name */
127  const std::string yaml_cycle_max_throttles = "cycle-max-throttles";
128  /** stat_trc_interval_s YAML configuration name */
129  const std::string yaml_stat_trc_interval_s = "stat-trc-interval-s";
130  /** yaml_send_timeout_seconds YAML configuration name */
131  const std::string yaml_send_timeout_seconds = "send-timeout-seconds";
132  /** yaml_receive_timeout_seconds YAML configuration name */
133  const std::string yaml_receive_timeout_seconds = "receive-timeout-seconds";
134  /** yaml_tcp_keep_alive YAML configuration name */
135  const std::string yaml_tcp_keep_alive = "tcp-keep-alive";
136 
137 
138 
139  TCPListener::Params::Params( const YAML::Node &node ) {
141  maxservers = common::YAML_read_key_default<size_t>( node, yaml_maxservers, 16 );
142  maxconnections = common::YAML_read_key_default<size_t>( node, yaml_maxconnections, 500 );
143  maxqdepth = common::YAML_read_key_default<size_t>( node, yaml_maxqdepth, 256 );
144  sendbufsz = common::YAML_read_key_default<socklen_t>( node, yaml_sendbufsz, 16384 );
145  recvbufsz = common::YAML_read_key_default<socklen_t>( node, yaml_recvbufsz, 32768 );
146  server_idle_ttl_s = common::YAML_read_key_default<double>( node, yaml_server_idle_ttl_s, 300 );
147  pollbatch = common::YAML_read_key_default<int>( node, yaml_pollbatch, 128 );
148  listener_sleep_ms = common::YAML_read_key_default<int>( node, yaml_listener_sleep_ms, 200 );
149  throttle_sleep_us = common::YAML_read_key_default<size_t>( node, yaml_throttle_sleep_us, 4000 );
150  cycle_max_throttles = common::YAML_read_key_default<size_t>( node, yaml_cycle_max_throttles, 40 );
151  stat_trc_interval_s = common::YAML_read_key_default<time_t>( node, yaml_stat_trc_interval_s, 300 );
152  send_timeout_seconds = common::YAML_read_key_default<int>( node, yaml_send_timeout_seconds, 0 );
153  receive_timeout_seconds = common::YAML_read_key_default<int>( node, yaml_receive_timeout_seconds, 0 );
154  tcp_keep_alive = common::YAML_read_key_default<bool>( node, yaml_tcp_keep_alive, true );
155  }
156 
157  TCPListener::TCPListener( const Address& address, const Params &params ) :
158  Thread() {
159  construct( address, params );
160  }
161 
162  TCPListener::~TCPListener() {
163  if ( listen_socket_ ) delete listen_socket_;
164  }
165 
166  void TCPListener::construct( const Address& address, const Params &params ) {
167  listen_address_ = address;
168  params_ = params;
169  init_server_ = nullptr;
170  stop_server_ = false;
171  work_q_sz_ = 0;
172 
173  read_event_mask_ = EPOLLIN | EPOLLPRI | EPOLLONESHOT | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLWAKEUP;
174  //hangup_event_mask_ = EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLWAKEUP;
175  hangup_event_mask_ = 0;
176  if ( !listen_address_.isValid() ) throw_Exception( "invalid address" );
177  /** @todo This could also be a TLSSocket */
178  listen_socket_ = new Socket( false,
179  SocketParams(
181  SocketParams::stSTREAM,
182  SocketParams::pnHOPOPT )
183  );
186  listen_socket_->setBlocking( false );
191  std::string proc_somaxconn = "/proc/sys/net/core/somaxconn";
192  if ( !common::fileReadInt( proc_somaxconn, backlog_ ) )
193  throw_Exception( "failed to read int from " << proc_somaxconn );
194 
208 
210  if ( error != common::SystemError::ecOK ) throw_SystemException( "listen failed", error );
211 
212  log_Statistics( "address = " << listen_address_.asString(true) );
213  }
214 
215  TCPListener::TCPListener( const YAML::Node &yaml ) {
216  Params params( yaml );
217  std::string saddr = common::YAML_read_key<std::string>( yaml, "listen-address" );
218  uint16_t port = common::YAML_read_key<std::uint16_t>( yaml, "listen-port" );
219  Address addr( saddr, port );
220  if ( !addr.isValid() ) throw_Exception( "invalid listen address " << saddr << ":" << port );
221  construct( addr, params );
222  }
223 
225  std::unique_lock<std::mutex> lk( mt_signal_ );
226  return cv_signal_.wait_for( lk,
227  std::chrono::milliseconds(params_.listener_sleep_ms),
228  [this,server]{ return work_q_sz_ || server->getRequestStop(); } );
229  }
230 
231  void TCPListener::start( TCPServer *server ) {
232  init_server_ = server;
233  Thread::start();
234  }
235 
237  struct epoll_event poll_sockets[params_.pollbatch];
238  struct epoll_event set_event;
240  try {
241  int rc = 0;
242  servers_.push_back(init_server_);
243  memset( poll_sockets, 0, sizeof(poll_sockets) );
244  epoll_fd_ = epoll_create1( 0 );
245  if ( epoll_fd_ < 0 )
246  throw_SystemException( "TCPListener::run: epoll_create1 failed", errno );
247  set_event.events = EPOLLIN | EPOLLPRI;
248  set_event.data.fd = listen_socket_->getFD();
249  rc = epoll_ctl( epoll_fd_, EPOLL_CTL_ADD, listen_socket_->getFD(), &set_event );
250  if ( rc < 0 ) throw_SystemException( "TCPListener::run epoll_ctl failed", errno );
251 
252  log_Debug( "TCPListener::run starting " << params_.minservers << " servers" );
253  init_server_->start();
254  while ( servers_.size() < params_.minservers ) {
255  TCPServer* add_server = init_server_->addServer();
256  servers_.push_back(add_server);
257  add_server->start();
258  }
259 
260  gettimeofday( &prev_stat_time_, NULL );
261  prev_stat_time_.tv_sec -= 1;
262  gettimeofday( &stat_time_, NULL );
263  gettimeofday( &warn_queue_time_, NULL );
264  gettimeofday( &server_stopped_check_time_, NULL );
265  gettimeofday( &now_, NULL );
266 
267  // start TCPListenerTimer
268  TCPListenerTimer timer( *this );
269  timer.start();
270  do {
271  logStats();
272  addServers();
273  throttle();
274  // check for maxservers
275  {
277  if ( servers_.size() == params_.maxservers &&
278  work_q_sz_ > 2 * servers_.size() &&
279  now_.tv_sec > 30 + warn_queue_time_.tv_sec ) {
280  log_Warning( "TCPListener::run queuing on maxservers #clients=" <<
281  clients_.size() << " #servers=" << servers_.size() << " queue=" << work_q_sz_ );
282  gettimeofday( &warn_queue_time_, NULL );
283  }
284  }
285  // wait for activity
286  rc = epoll_wait( epoll_fd_, poll_sockets, params_.pollbatch, params_.listener_sleep_ms );
287  if ( rc < 0 && errno != EINTR )
288  throw_SystemException( "TCPListener::run: epoll_wait failed", errno );
289  else if ( rc > 0 ) {
290  for ( int i = 0; i < rc; i++ ) {
291  if ( poll_sockets[i].events != 0 ) {
292  if ( poll_sockets[i].data.fd == listen_socket_->getFD() ) {
293  network::BaseSocket* client_sock;
294  do {
295  client_sock = listen_socket_->accept();
296  if ( !client_sock->isValid() ) break;
297  if ( clients_.size() >= params_.maxconnections ) {
298  client_sock->close();
299  delete client_sock;
301  } else {
302  {
304  clients_[client_sock->getFD()].socket = client_sock;
305  clients_[client_sock->getFD()].state = SockState::New;
306  clients_[client_sock->getFD()].data = init_server_->newConnectionData();
307  }
308  pushWork( { client_sock, SockState::New } );
309  log_Debug( "TCPListener::run new client socket " <<
310  client_sock->debugString() << " from " <<
311  client_sock->getPeerAddress().asString() );
312  {
315  }
316  client_sock->setBlocking( false );
317  client_sock->setTCPNoDelay( true );
320  client_sock->setTCPKeepAlive( params_.tcp_keep_alive );
321  cv_signal_.notify_one();
322  }
323  } while ( client_sock->isValid() );
324  } else if ( poll_sockets[i].data.fd >= 0 ) {
325  SockState combined_state = SockState::None;
326  if ( (poll_sockets[i].events & EPOLLIN) || (poll_sockets[i].events & EPOLLPRI) ) {
327  log_Debug( "TCPListener::run EPOLLIN || EPOLLPRI on socket " <<
328  poll_sockets[i].data.fd <<
329  " events=(" << poll_sockets[i].events << ")" );
330  combined_state |= SockState::Read;
331  {
334  }
335  }
336  if ( (poll_sockets[i].events & EPOLLRDHUP ) ||
337  (poll_sockets[i].events & EPOLLERR ) ||
338  (poll_sockets[i].events & EPOLLHUP ) ) {
339  combined_state |= SockState::Shut;
340  log_Debug( "TCPListener::run hangup or error on socket " <<
341  poll_sockets[i].data.fd <<
342  " events=(" << poll_sockets[i].events << ")" );
343  }
344  if ( combined_state != SockState::None ) {
345  poll_sockets[i].events = 0;
346  pushWork( poll_sockets[i].data.fd, combined_state );
347  cv_signal_.notify_one();
348  }
349  } else {
350  log_Warning( "TCPListener::run epoll event on invalid descriptor " <<
351  poll_sockets[i].data.fd <<
352  " events=(" << poll_sockets[i].events << ")" );
353  }
354  }
355  }
356  }
357 
359  } while ( !stop_server_ );
360  }
361  catch( const dodo::common::Exception &e ) {
362  std::cout << "dodo::common::Exception" << std::endl;
363  log_Error( "TCPListener::run dodo::common::Exception " << e.what() );
364  }
365  catch( const std::exception &e ) {
366  std::cout << "std::exception" << std::endl;
367  log_Error( "TCPListener::run std::exception " << e.what() );
368  }
369  catch( ... ) {
370  std::cout << "..." << std::endl;
371  log_Error( "TCPListener::run unhandled exception" );
372  }
373  try {
374  log_Debug( "TCPListener::run stop listener finish pending work" );
375  while( work_q_sz_ > 0 ) std::this_thread::sleep_for(20ms);
376  log_Debug( "TCPListener::run stop TCPServers " );
377  for ( auto srv : servers_ ) {
378  srv->requestStop();
379  }
380  for ( auto srv : servers_ ) {
381  srv->wait();
382  delete srv;
383  }
384  log_Debug( "TCPListener::run stopped TCPServers " );
385  servers_.clear();
386  auto clients_copy = clients_; // closeSocket modifies clients_
387  for( auto c: clients_copy ) {
388  log_Debug( "TCPListener::run close socket " << c.second.socket );
389  closeSocket( c.second.socket );
390  }
391  }
392  catch ( ... ) {
393  }
394  }
395 
396  void TCPListener::pushWork( int fd, SockState state ) {
397  {
399  clients_[fd].state |= state;
400  workload_.push_back( &(clients_[fd]) );
401  work_q_sz_++;
402  if ( ! (state & SockState::New) ) pollDel( clients_[fd].socket );
403  }
404  log_Debug( "TCPListener::pushWork BaseSocket* socket " <<
405  clients_[fd].socket->debugString() << " state " << state );
406  }
407 
408  void TCPListener::pushWork( const SocketWork &work ) {
409  {
411  clients_[work.socket->getFD()].state |= work.state;
412  workload_.push_back( &(clients_[work.socket->getFD()]) );
413  work_q_sz_++;
414  if ( ! (work.state & SockState::New) ) pollDel( clients_[work.socket->getFD()].socket );
415  }
416  log_Debug( "TCPListener::pushWork BaseSocket* socket " << work.socket->debugString() <<
417  " state " << work.state );
418  }
419 
421  SocketWork* result = NULL;
422  {
424  if ( workload_.size() > 0 ) {
425  result = workload_.front();
426  workload_.pop_front();
427  }
428  }
429  if ( result )
430  log_Debug( "TCPListener::popWork socket " << result->socket->debugString() <<
431  " state " << result->state << " sockmapstate=" << clients_[result->socket->getFD()].state );
432  return result;
433  }
434 
435  void TCPListener::releaseWork( const SocketWork &work ) {
436  log_Debug( "TCPListener::releaseWork socket " << work.socket->debugString() <<
437  " state " << work.state << " sockmapstate=" << clients_[work.socket->getFD()].state );
438  work_q_sz_--;
439  if ( work.state & TCPListener::SockState::Shut ) {
440  closeSocket( work.socket );
441  } else {
444  clients_[work.socket->getFD()].state ^= work.state;
445  }
446  }
447 
449  int fd = -1;
450  {
452  fd = s->getFD();
453  if ( clients_[s->getFD()].data ) delete clients_[s->getFD()].data;
454  clients_.erase( fd );
455  s->close();
456  delete s;
457  }
458  log_Debug( "TCPListener::closeSocket socket " << s << " fd=" << fd );
459  }
460 
461  void TCPListener::pollAdd( const BaseSocket* s, uint32_t events ) {
462  struct epoll_event set_event;
463  set_event.events = events;
464  set_event.data.fd = s->getFD();
465  int rc = epoll_ctl( epoll_fd_, EPOLL_CTL_ADD, s->getFD(), &set_event );
466  if ( rc < 0 ) throw_SystemException( "TCPListener::pollAdd epoll_ctl failed socket " << s->debugString(), errno );
467  log_Debug( "TCPListener::pollAdd socket " << set_event.data.fd << " events=" << events );
468  }
469 
470  void TCPListener::pollMod( const BaseSocket* s, uint32_t events ) {
471  struct epoll_event set_event;
472  set_event.events = events;
473  set_event.data.fd = s->getFD();
474  int rc = epoll_ctl( epoll_fd_, EPOLL_CTL_MOD, s->getFD(), &set_event );
475  if ( rc < 0 ) throw_SystemException( "TCPListener::pollMod epoll_ctl failed socket " << s->debugString(), errno );
476  log_Debug( "TCPListener::pollMod socket " << set_event.data.fd << " events=" << events );
477  }
478 
479  void TCPListener::pollDel( const BaseSocket* s ) {
480  struct epoll_event set_event;
481  set_event.events = 0;
482  set_event.data.fd = s->getFD();
483  int rc = epoll_ctl( epoll_fd_, EPOLL_CTL_DEL, s->getFD(), &set_event );
484  if ( rc < 0 ) throw_SystemException( "TCPListener::pollDel epoll_ctl failed socket " << s->debugString(), errno );
485  log_Debug( "TCPListener::pollDel socket " << set_event.data.fd );
486  }
487 
489  while ( work_q_sz_ > servers_.size() && servers_.size() < params_.maxservers ) {
490  TCPServer* add_server = init_server_->addServer();
491  servers_.push_back(add_server);
492  add_server->start();
493  log_Debug( "TCPListener::addServers +1" );
494  }
495  }
496 
498  if ( work_q_sz_ > params_.maxqdepth ) {
499  unsigned long last_pending = work_q_sz_;
500  unsigned int c = 0;
501  while ( work_q_sz_ >= last_pending && c++ < params_.cycle_max_throttles ) {
502  std::this_thread::sleep_for( std::chrono::microseconds( params_.throttle_sleep_us ) );
503  {
506  }
507  }
508  }
509  }
510 
512  struct timeval l_now;
513  {
515  l_now = now_;
516  }
517  if ( l_now.tv_sec > params_.stat_trc_interval_s + stat_time_.tv_sec ) {
518  snapRUsage();
520  log_Warning( "TCPListener maxconnections=" << params_.maxconnections <<
521  " limited " << event_maxconnections_reached_ << " times since last stats" );
523  }
524  log_Statistics( "TCPListener #clients=" <<
525  clients_.size() << " #servers=" << servers_.size() << " #queued=" << work_q_sz_ <<
526  " recv=" << (double)( last_stats_.received - prev_stats_.received ) /
528  "KiB/s sent=" << (double)( last_stats_.sent - prev_stats_.sent ) /
530  "KIB/s conn=" << (double)( last_stats_.connections - prev_stats_.connections ) /
532  "/s req=" << (double)( last_stats_.requests - prev_stats_.requests ) /
534  "/s throttle=" << (double)( last_stats_.throttles - prev_stats_.throttles ) /
537  "TCPListener ucpu=" << getLastUserCPU() <<
538  " scpu=" << getLastSysCPU() <<
539  " minflt=" << getLastMinFltRate() << "/s" <<
540  " majflt=" << getLastMajFltRate() << "/s" <<
541  " bi=" << getLastBlkInRate() << "/s" <<
542  " bo=" << getLastBlkOutRate() << "/s" <<
543  " yield=" << getLastVCtx() << "/s" <<
544  " ctxsw=" << getLastICtx() << "/s" <<
545  " maxrss=" << getMaxRSS() );
546  for ( auto srv : servers_ ) {
548  "TCPServer id " << srv->getTID() <<
549  (srv->isBusy()?" busy-":" idle-") << srv->getState() <<
550  " ucpu=" << srv->getLastUserCPU() <<
551  " scpu=" << srv->getLastSysCPU() <<
552  " minflt=" << srv->getLastMinFltRate() << "/s" <<
553  " majflt=" << srv->getLastMajFltRate() << "/s" <<
554  " bi=" << srv->getLastBlkInRate() << "/s" <<
555  " bo=" << srv->getLastBlkOutRate() << "/s" <<
556  " yield=" << srv->getLastVCtx() << "/s" <<
557  " ctxsw=" << srv->getLastICtx() << "/s" );
558  }
560  {
563  }
564  gettimeofday( &stat_time_, NULL );
565  }
566  }
567 
569  struct timeval l_now;
570  {
572  l_now = now_;
573  }
574  if ( server_stopped_check_time_.tv_sec < l_now.tv_sec - 4 ) {
575  gettimeofday( &server_stopped_check_time_, NULL );
576  size_t num_stopped = 0;
577  std::list<TCPServer*>::iterator i_server = servers_.begin();
578  while ( i_server != servers_.end() ) {
579  if ( *i_server != init_server_ ) {
580  if ( (*i_server)->hasStopped() ) {
581  (*i_server)->wait();
582  TCPServer* thisserver = (*i_server);
583  i_server = servers_.erase(i_server);
584  delete thisserver;
585  log_Debug( "TCPListener::run deleted stopped TCPServer " << common::Puts::hex() <<
586  (*i_server)->getId() << common::Puts::dec() );
587  } else {
588  if ( num_stopped == 0 &&
589  servers_.size() > params_.minservers &&
590  (*i_server)->getIdleSeconds() > params_.server_idle_ttl_s &&
591  !(*i_server)->isBusy() ) {
592  (*i_server)->requestStop();
593  cv_signal_.notify_all();
594  num_stopped--;
595  log_Debug( "TCPListener::run stop idle TCPServer " << common::Puts::hex() <<
596  (*i_server)->getId() << common::Puts::dec() );
597  }
598  ++i_server;
599  }
600  } else {
601  if ( init_server_->hasStopped() ) {
602  init_server_->wait();
603  i_server = servers_.erase(i_server);
604  TCPServer* new_server = init_server_->addServer();
605  new_server->start();
606  delete init_server_;
607  init_server_ = new_server;
608  servers_.push_back( init_server_ );
609  log_Debug( "TCPListener::run replaced stopped initial TCPServer " << common::Puts::hex() <<
611  }
612  ++i_server;
613  }
614  }
615  }
616  }
617 
618  }
619 
620 }
dodo::network::BaseSocket::setReceiveTimeout
void setReceiveTimeout(double sec)
Set the receive timeout - a receive will fail if there is no data received before the timeout expires...
Definition: basesocket.cpp:209
dodo::network::TCPListener::listen_socket_
BaseSocket * listen_socket_
The listening BaseSocket.
Definition: tcplistener.hpp:433
dodo::network::TCPListener::last_stats_
Stats last_stats_
The last Stats.
Definition: tcplistener.hpp:509
dodo::network::yaml_sendbufsz
const std::string yaml_sendbufsz
sendbufsz YAML configuration name
Definition: tcplistener.cpp:115
dodo::network::TCPListener::prev_stats_
Stats prev_stats_
The previous Stats.
Definition: tcplistener.hpp:504
dodo::network::BaseSocket::setTCPNoDelay
void setTCPNoDelay(bool set)
Set TCP_NODELAY.
Definition: basesocket.cpp:115
dodo::threads::Mutexer
Waits for and locks the Mutex on construction, unlocks the Mutex when this Mutexer is destructed.
Definition: mutex.hpp:100
dodo::network::TCPListener::now_mutex_
threads::Mutex now_mutex_
Synchronize access to now_.
Definition: tcplistener.hpp:539
dodo::network::TCPListener::addServers
void addServers()
Add a server if work_q_sz_ exceeds servers.size() and servers_.size() < params_.maxservers.
Definition: tcplistener.cpp:488
dodo::network::BaseSocket::getPeerAddress
Address getPeerAddress() const
Get the peer (remote) address for this socket.
Definition: basesocket.cpp:273
dodo::network::TCPListener::event_maxconnections_reached_
size_t event_maxconnections_reached_
The number of times a connection was closed after accept because params_.maxconnectctions was reached...
Definition: tcplistener.hpp:549
dodo::network::BaseSocket::listen
common::SystemError listen(const Address &address, int backlog)
Sets up a listening socket on Address.
Definition: basesocket.cpp:89
dodo::network::TCPListener::cleanStoppedServers
void cleanStoppedServers()
Cleanup stopped TCPServers.
Definition: tcplistener.cpp:568
dodo::common::fileReadInt
bool fileReadInt(const std::string &file, int &i)
Read from a file, expecting it to contain a (signed) int.
Definition: util.cpp:39
dodo::common::Bytes::getArray
Octet * getArray() const
Return the array.
Definition: bytes.hpp:186
dodo::network::TCPListener::SockState
SockState
BaseSocket lifecycle states.
Definition: tcplistener.hpp:260
dodo::network::Address::isValid
bool isValid() const
True if this Address is valid.
Definition: address.hpp:134
dodo::network::TCPListener::Params::sendbufsz
socklen_t sendbufsz
The send buffer size, set on the listening socket, inherited by accepted sockets.
Definition: tcplistener.hpp:184
dodo::threads::Thread::wait
void wait()
Wait for the thread to join the current thread / wait for the thread to finish.
Definition: thread.cpp:64
dodo::network::TCPListener::SockState::None
@ None
Undefined / initial.
dodo::threads::Thread::getTID
pid_t getTID() const
Return the tid.
Definition: thread.hpp:190
dodo::common::Puts::setprecision
Set the precision for floating point fixed format.
Definition: puts.hpp:110
dodo::threads::Thread::getLastMinFltRate
double getLastMinFltRate()
Get the minor fault rate since last sample.
Definition: thread.cpp:126
dodo::threads::Thread::getLastMajFltRate
double getLastMajFltRate()
Get the major fault rate since last sample.
Definition: thread.cpp:131
dodo::network::TCPConnectionData::clearBuffer
void clearBuffer()
Clears the read_buffer.
Definition: tcplistener.cpp:62
dodo::threads::Thread::getLastBlkInRate
double getLastBlkInRate()
Get last block in rate since last sample.
Definition: thread.cpp:136
dodo::network::yaml_minservers
const std::string yaml_minservers
minservers YAML configuration name
Definition: tcplistener.cpp:107
dodo::network::TCPListener::Params::cycle_max_throttles
size_t cycle_max_throttles
Maximum number of throttles per epoll_wait cycle.
Definition: tcplistener.hpp:216
dodo::network::yaml_tcp_keep_alive
const std::string yaml_tcp_keep_alive
yaml_tcp_keep_alive YAML configuration name
Definition: tcplistener.cpp:135
dodo::network::BaseSocket::setBlocking
void setBlocking(bool blocking)
Set the Socket blocking mode.
Definition: basesocket.cpp:131
dodo::common::Puts::hex
Put the stream in hexadecimal mode.
Definition: puts.hpp:80
dodo::network::TCPListenerTimer::stop
void stop()
Stop the timer.
Definition: tcplistener.cpp:96
dodo::network::Address::asString
std::string asString(bool withport=false) const
Return a string representation of this Address.
Definition: address.cpp:107
dodo::network::TCPListener::Stats::received
ssize_t received
The number of bytes received.
Definition: tcplistener.hpp:252
dodo::network::TCPListener::SocketWork::state
SockState state
State of the socket.
Definition: tcplistener.hpp:275
dodo::network::TCPListener::SockState::New
@ New
New connection, TCPServer::handShake() will be called.
dodo::network::yaml_server_idle_ttl_s
const std::string yaml_server_idle_ttl_s
server_idle_ttl_s YAML configuration name
Definition: tcplistener.cpp:119
dodo::common::Bytes::reserve
void reserve(size_t size)
Reserve memory in the Bytes.
Definition: bytes.cpp:33
dodo::network::Address::getAddressFamily
SocketParams::AddressFamily getAddressFamily() const
Get this Address family.
Definition: address.hpp:159
dodo::network::TCPListener::clientmutex_
threads::Mutex clientmutex_
Protects both client_ and workload_.
Definition: tcplistener.hpp:449
dodo::network::BaseSocket::close
virtual void close()
Closes the socket, causing the connection, if it exists, to be terminated.
Definition: basesocket.cpp:71
dodo::network::TCPListener::Stats::sent
ssize_t sent
The number of bytes sent.
Definition: tcplistener.hpp:254
dodo::network::BaseSocket::setReUseAddress
void setReUseAddress()
Enable the socket to re-use an address when listen/bind is called.
Definition: basesocket.cpp:148
dodo::common::Bytes
An array of Octets with size elements.
Definition: bytes.hpp:44
dodo::network::TCPListener::SockState::Shut
@ Shut
BaseSocket is hung up or in error, TCPServer::shutDown() will be called.
dodo::network::Address
Generic network Address, supporting ipv4 and ipv6 transparently.
Definition: address.hpp:90
dodo::network::TCPListener::backlog_
int backlog_
The backlog (incoming connection queue before accept calls clearing it) used by listen.
Definition: tcplistener.hpp:439
dodo::network::TCPListenerTimer
Updates the attribute now_ in the TCPListener at a regular interval to avoid excessive number of call...
Definition: tcplistener.cpp:70
dodo::threads::Thread::getLastUserCPU
double getLastUserCPU()
Get the user mode cpu (cpu seconds/second) since last sample.
Definition: thread.cpp:116
dodo::network::yaml_maxservers
const std::string yaml_maxservers
maxservers YAML configuration name
Definition: tcplistener.cpp:109
dodo::network::TCPListener::init_server_
TCPServer * init_server_
The initial TCPServer, which will live as long as the TCPListener runs, and is sued to create new TCP...
Definition: tcplistener.hpp:428
dodo::common::YAML_read_key_default< size_t >
template size_t YAML_read_key_default< size_t >(const YAML::Node &, const std::string &, const size_t &)
Instantiate template YAML_read_key for size_t.
dodo::network::yaml_receive_timeout_seconds
const std::string yaml_receive_timeout_seconds
yaml_receive_timeout_seconds YAML configuration name
Definition: tcplistener.cpp:133
dodo::common::SystemError::ecEAGAIN
@ ecEAGAIN
11 Try again
Definition: systemerror.hpp:71
dodo::network::TCPListener::Params::minservers
size_t minservers
The minimum number of TCPServers.
Definition: tcplistener.hpp:164
dodo::network::yaml_maxqdepth
const std::string yaml_maxqdepth
maxqdepth YAML configuration name
Definition: tcplistener.cpp:113
dodo::network::TCPListener::Params::stat_trc_interval_s
time_t stat_trc_interval_s
The epol__wait timeout in ms.
Definition: tcplistener.hpp:221
dodo::network::TCPListener::Params::maxqdepth
size_t maxqdepth
Maximum size of connection and request work queues.
Definition: tcplistener.hpp:179
dodo::network::yaml_pollbatch
const std::string yaml_pollbatch
pollbatch YAML configuration name
Definition: tcplistener.cpp:121
dodo::network::TCPListener::closeSocket
void closeSocket(BaseSocket *socket)
Tell the listener to close the socket.
Definition: tcplistener.cpp:448
dodo::common::Exception::what
virtual const char * what() const noexcept
Return the exception message.
Definition: exception.cpp:73
dodo::common::SystemError::asString
std::string asString() const
Get the system error string.
Definition: systemerror.hpp:322
dodo::network::TCPListener::read_event_mask_
uint32_t read_event_mask_
The event mask for read events.
Definition: tcplistener.hpp:492
dodo::threads::Thread::getLastBlkOutRate
double getLastBlkOutRate()
Get last block out rate since last sample.
Definition: thread.cpp:141
dodo::common::YAML_read_key_default< double >
template double YAML_read_key_default< double >(const YAML::Node &, const std::string &, const double &)
Instantiate template YAML_read_key for double.
dodo::threads::Thread::start
void start()
Start the thread.
Definition: thread.cpp:58
dodo::network::TCPListenerTimer::TCPListenerTimer
TCPListenerTimer(TCPListener &listener)
Constructor, specify the TCPListener to operate on.
Definition: tcplistener.cpp:76
dodo::network::TCPListener::stat_time_
struct timeval stat_time_
Time of last statistics.
Definition: tcplistener.hpp:529
dodo::network::TCPListener::Params::maxconnections
size_t maxconnections
The maximum number of connections (in effect, sockets) allowed.
Definition: tcplistener.hpp:174
dodo::network::TCPListener::SocketWork::socket
BaseSocket * socket
Pointer to the socket.
Definition: tcplistener.hpp:273
dodo::network::TCPListener::warn_queue_time_
struct timeval warn_queue_time_
Time of last queueing warning.
Definition: tcplistener.hpp:519
dodo::network::TCPListener::run
virtual void run()
Entrypoint, override of Thread::run()
Definition: tcplistener.cpp:236
dodo::network::yaml_maxconnections
const std::string yaml_maxconnections
maxconnections YAML configuration name
Definition: tcplistener.cpp:111
dodo::network::TCPListener::Params::recvbufsz
socklen_t recvbufsz
The receive buffer size, set on the listening socket, inherited by accepted sockets.
Definition: tcplistener.hpp:189
dodo::network::TCPListener::cv_signal_
std::condition_variable cv_signal_
Condition variable used to wakeup TCPServer threads with mutex mt_signal_.
Definition: tcplistener.hpp:461
dodo::network::BaseSocket::setTCPKeepAlive
void setTCPKeepAlive(bool enable)
Enable or disable TCP keep-alive on the socket.
Definition: basesocket.cpp:120
dodo::network::TCPListener::pollAdd
void pollAdd(const BaseSocket *s, uint32_t events)
Add an epoll event for the BaseSocket.
Definition: tcplistener.cpp:461
dodo::network::BaseSocket::accept
virtual BaseSocket * accept()=0
Accepts a connection request and return a pointer to a new Socket for the new connection,...
dodo::threads::Thread::getLastICtx
double getLastICtx()
Get involuntary context switch rate since last sample.
Definition: thread.cpp:151
dodo::network::TCPListener::releaseWork
void releaseWork(const SocketWork &work)
Called by a TCPServer to signal that the work has been handled and event detection on it can resume.
Definition: tcplistener.cpp:435
dodo::network::TCPListener::listen_address_
Address listen_address_
The listening address.
Definition: tcplistener.hpp:417
dodo
A C++ platform interface to lean Linux services tailored for containerized deployment.
Definition: application.hpp:29
dodo::network::TCPListener::workload_
std::deque< SocketWork * > workload_
Queue of sockets with work for TCPServer instances.
Definition: tcplistener.hpp:477
log_Statistics
#define log_Statistics(what)
Macro to log Statistics.
Definition: logger.hpp:295
log_Error
#define log_Error(what)
Macro to log Error.
Definition: logger.hpp:277
dodo::network::SocketParams
Socket parameters - the family (domain), socket type and protocol triplet.
Definition: socketparams.hpp:35
dodo::network::TCPListener::Params::receive_timeout_seconds
int receive_timeout_seconds
Receive timeout in seconds.
Definition: tcplistener.hpp:231
dodo::network::TCPListener::servers_
std::list< TCPServer * > servers_
List of TCPServers.
Definition: tcplistener.hpp:472
dodo::network::TCPListener::Params::Params
Params()
Construct with default parameters.
Definition: tcplistener.hpp:137
dodo::network::yaml_cycle_max_throttles
const std::string yaml_cycle_max_throttles
cycle_max_throttles YAML configuration name
Definition: tcplistener.cpp:127
dodo::network::TCPListener::mt_signal_
std::mutex mt_signal_
Mutex used to wakeup TCPServer threads with condition variable cv_signal_.
Definition: tcplistener.hpp:455
dodo::network::TCPListener::waitForActivity
bool waitForActivity(TCPServer *server)
Called by the TCPServer, enters wait state until woken up by either a timeout or a notify by TCPListe...
Definition: tcplistener.cpp:224
dodo::network::yaml_throttle_sleep_us
const std::string yaml_throttle_sleep_us
throttle_sleep_us YAML configuration name
Definition: tcplistener.cpp:125
dodo::network::TCPListener::pollDel
void pollDel(const BaseSocket *s)
Delete the epoll event for the BaseSocket.
Definition: tcplistener.cpp:479
dodo::network::TCPListener::Params::throttle_sleep_us
size_t throttle_sleep_us
The time to sleep when the queue gets too big (letting workers clear from the queue without accepting...
Definition: tcplistener.hpp:210
dodo::network::yaml_listener_sleep_ms
const std::string yaml_listener_sleep_ms
listener_sleep_ms YAML configuration name
Definition: tcplistener.cpp:123
throw_SystemException
#define throw_SystemException(what, errno)
Throws an Exception with errno, passes FILE and LINE to constructor.
Definition: exception.hpp:188
dodo::network::TCPListener::popWork
SocketWork * popWork()
Pop work.
Definition: tcplistener.cpp:420
dodo::network::BaseSocket::receive
virtual common::SystemError receive(void *buf, ssize_t request, ssize_t &received)=0
Receive bytes from the socket.
dodo::threads::Thread::snapRUsage
void snapRUsage()
Take a snapshot of the thread's resource usage.
Definition: thread.cpp:68
dodo::network::TCPListenerTimer::stopped_
bool stopped_
True if stop() was called.
Definition: tcplistener.cpp:103
dodo::network::TCPListener::SockState::Read
@ Read
Data is ready to be read, TCPServer::readSocket() will be called.
dodo::network::TCPListener::Stats::throttles
ssize_t throttles
The number of throttles.
Definition: tcplistener.hpp:250
throw_Exception
#define throw_Exception(what)
Throws an Exception, passes FILE and LINE to constructor.
Definition: exception.hpp:174
dodo::network::TCPListener::throttle
void throttle()
Throttle the listener when work_q_sz_ exceeds params_.maxqdepth.
Definition: tcplistener.cpp:497
dodo::network::BaseSocket::getFD
int getFD() const
Return the socket file descriptor.
Definition: basesocket.hpp:121
dodo::network::TCPServer::addServer
virtual TCPServer * addServer()=0
Spawn a new TCPServer.
dodo::common::SystemError::ecOK
@ ecOK
0 Not an error, success
Definition: systemerror.hpp:60
dodo::network::TCPServer::newConnectionData
virtual TCPConnectionData * newConnectionData() const
Return a new TCPConnectionData pointer or override to return a descendant of TCPConnectionData.
Definition: tcpserver.hpp:127
dodo::network::Socket
A Linux socket.
Definition: socket.hpp:50
dodo::network::TCPListener::Params
Parameters affecting TCPListener behavior.
Definition: tcplistener.hpp:132
dodo::network::TCPServer
Used in conjunction with TCPListener to implement high speed, multithreaded TCP services.
Definition: tcpserver.hpp:56
dodo::network::TCPListenerTimer::run
virtual void run()
Decsendants must override the run function.
Definition: tcplistener.cpp:83
dodo::network::yaml_send_timeout_seconds
const std::string yaml_send_timeout_seconds
yaml_send_timeout_seconds YAML configuration name
Definition: tcplistener.cpp:131
dodo::common::getSecondDiff
double getSecondDiff(struct timeval &t1, struct timeval &t2)
Return difference in seconds as a double.
Definition: util.hpp:126
dodo::network::TCPListener::prev_stat_time_
struct timeval prev_stat_time_
Time of previous statistics.
Definition: tcplistener.hpp:524
dodo::common::DebugObject::debugString
std::string debugString() const
Return the object dump to string.
Definition: exception.cpp:32
dodo::network::TCPListener::Params::listener_sleep_ms
int listener_sleep_ms
The TCPListener epoll_wait timeout in ms.
Definition: tcplistener.hpp:204
dodo::network::TCPConnectionData::readBuffer
common::SystemError readBuffer(BaseSocket *socket, ssize_t &received)
Reads and appends data to read_buffer.
Definition: tcplistener.cpp:38
dodo::network::BaseSocket::setSendBufSize
void setSendBufSize(socklen_t size)
Set the size of the send buffer.
Definition: basesocket.cpp:199
dodo::network::TCPListener::TCPListener
TCPListener(const Address &address, const Params &params)
Constructor.
Definition: tcplistener.cpp:157
dodo::network::TCPListener::pollMod
void pollMod(const BaseSocket *s, uint32_t events)
Modify an epoll event for the BaseSocket.
Definition: tcplistener.cpp:470
dodo::network::TCPListener::SocketWork
BaseSocket socket and state pair.
Definition: tcplistener.hpp:271
dodo::threads::Thread::getMaxRSS
long getMaxRSS()
Get the maximum resident set size seen on the thread.
Definition: thread.cpp:112
dodo::network::TCPListener::clients_
std::map< int, SocketWork > clients_
Map of file descriptors to SocketWork for all connected clients.
Definition: tcplistener.hpp:467
dodo::network::BaseSocket::isValid
bool isValid() const
Return true when the socket descriptor is a valid, hence 'possible' descriptor.
Definition: basesocket.hpp:128
dodo::network::TCPListener::logStats
void logStats()
Log TCPListener statistics to Logger::getLogger().
Definition: tcplistener.cpp:511
dodo::network::TCPListener::work_q_sz_
std::atomic< unsigned long long > work_q_sz_
The number of queued work items.
Definition: tcplistener.hpp:482
dodo::common::Bytes::hexDump
std::string hexDump(size_t n) const
hex dump the first n octets of the data.
Definition: bytes.cpp:184
log_Trace
#define log_Trace(what)
Macro to log Trace.
Definition: logger.hpp:312
dodo::network::TCPListener::Stats::requests
ssize_t requests
The number of requests.
Definition: tcplistener.hpp:248
dodo::common::SystemError
Linux system error primitive to provide a consistent interface to Linux error codes.
Definition: systemerror.hpp:53
dodo::network::TCPListenerTimer::listener_
TCPListener & listener_
The associated TCPListener.
Definition: tcplistener.cpp:101
logger.hpp
dodo::threads::Thread
Abstract Thread class.
Definition: thread.hpp:38
dodo::network::TCPListener::construct
void construct(const Address &address, const Params &params)
Common constructor code.
Definition: tcplistener.cpp:166
dodo::network::BaseSocket::setReceiveBufSize
void setReceiveBufSize(socklen_t size)
Set the size of the receive buffer.
Definition: basesocket.cpp:204
dodo::threads::Thread::getLastSysCPU
double getLastSysCPU()
Get the system mode cpu (cpu seconds/second) since last sample.
Definition: thread.cpp:121
dodo::common::Puts::dec
Put the stream in decimal mode.
Definition: puts.hpp:73
dodo::network::TCPListener::Params::send_timeout_seconds
int send_timeout_seconds
Send timeout in seconds.
Definition: tcplistener.hpp:226
dodo::network::TCPListener::hangup_event_mask_
uint32_t hangup_event_mask_
The event mask for hangup events.
Definition: tcplistener.hpp:499
dodo::network::TCPListener::Params::tcp_keep_alive
bool tcp_keep_alive
Toggle TCP keep-alive.
Definition: tcplistener.hpp:236
dodo::network::TCPListener::Stats::connections
ssize_t connections
The number of connections.
Definition: tcplistener.hpp:244
dodo::network::TCPListener::now_
struct timeval now_
Frequently updated by a TCPListenerTimer with enough precision for its use.
Definition: tcplistener.hpp:544
dodo::network::yaml_recvbufsz
const std::string yaml_recvbufsz
recvbufsz YAML configuration name
Definition: tcplistener.cpp:117
dodo::network::TCPListener
The TCPListener listens, accepts connections and generates socket events to produce TCP work to a poo...
Definition: tcplistener.hpp:126
dodo::network::TCPListener::Params::maxservers
size_t maxservers
The maximum number of TCPServers.
Definition: tcplistener.hpp:169
dodo::common::YAML_read_key_default< bool >
template bool YAML_read_key_default< bool >(const YAML::Node &, const std::string &, const bool &)
Instantiate template YAML_read_key for bool.
dodo::network::TCPListener::pushWork
void pushWork(const SocketWork &work)
Push work.
Definition: tcplistener.cpp:408
dodo::common::Bytes::getSize
size_t getSize() const
Return the array size.
Definition: bytes.hpp:192
dodo::network::BaseSocket::setReUsePort
void setReUsePort()
Make the socket re-use a port when listen is called.
Definition: basesocket.cpp:159
dodo::network::BaseSocket::setSendTimeout
void setSendTimeout(double sec)
Set the send timeout - a send will fail if there is no data send before the timeout expires.
Definition: basesocket.cpp:219
dodo::network::TCPListener::params_
Params params_
TCPListener parameters.
Definition: tcplistener.hpp:422
dodo::threads::Thread::getLastVCtx
double getLastVCtx()
Get voluntary context switch rate since last sample.
Definition: thread.cpp:146
dodo::network::TCPListener::epoll_fd_
int epoll_fd_
The epoll interface file descriptor.
Definition: tcplistener.hpp:487
log_Debug
#define log_Debug(what)
Macro to log Debug.
Definition: logger.hpp:302
log_Warning
#define log_Warning(what)
Macro to log Warning.
Definition: logger.hpp:283
dodo::network::TCPListener::Params::server_idle_ttl_s
double server_idle_ttl_s
The maximum TCPServer idle time in seconds before stopping the server, honoring minservers.
Definition: tcplistener.hpp:194
tcpserver.hpp
dodo::network::TCPListener::stats_mutex_
threads::Mutex stats_mutex_
Protects prev_stats_, last_stats_.
Definition: tcplistener.hpp:514
tcplistener.hpp
dodo::network::TCPListener::stop_server_
bool stop_server_
If true, the TCPListener will gracefully stop and finish.
Definition: tcplistener.hpp:444
dodo::network::yaml_stat_trc_interval_s
const std::string yaml_stat_trc_interval_s
stat_trc_interval_s YAML configuration name
Definition: tcplistener.cpp:129
util.hpp
dodo::common::Exception
An Exception is thrown in exceptional circumstances, and its occurrence should generally imply that t...
Definition: exception.hpp:83
dodo::network::BaseSocket
Interface to and common implementation of concrete sockets (Socket, TLSSocket).
Definition: basesocket.hpp:36
dodo::network::TCPServer::hasStopped
bool hasStopped() const
Return true if the TCPServer has stopped working.
Definition: tcpserver.hpp:133
dodo::network::TCPListener::server_stopped_check_time_
struct timeval server_stopped_check_time_
Time of last check for stopped servers.
Definition: tcplistener.hpp:534
dodo::common::YAML_read_key_default< int >
template int YAML_read_key_default< int >(const YAML::Node &, const std::string &, const int &)
Instantiate template YAML_read_key for int.
dodo::network::TCPListener::Params::pollbatch
int pollbatch
The number of epoll_wait events read in one epol__wait wake-up.
Definition: tcplistener.hpp:199