Performance fix: Check half-closed descriptors at most once per second. A few revisions back, comm checked half-closed descriptors once per second, but the code was buggy. I replaced it with a simpler code that checked each half-closed descriptor whenever the OS would mark it as ready for reading. That was a bad idea: The checks wasted a lot of CPU cycles because half-closed descriptors are usually ready for reading all the time. This revision resurrects 1 check/sec limit, but hopefully with fewer bugs. In my limited tests CPU usage seems to be back to normal. Added a DescriptorSet class to manage an unordered collection of unique descriptors. The class might be useful for deferred reads as well, but that remains to be seen. The DescriptorSet class has O(1) complexity for search, insertion, and deletion. It uses about 2*sizeof(int)*MaxFD bytes. Splay tree that used to store half-closed descriptors previously uses less RAM for small number of descriptors but has O(log n) complexity. === modified file 'src/comm.cc' --- src/comm.cc 2008-09-22 21:56:44 +0000 +++ src/comm.cc 2008-09-24 23:09:33 +0000 @@ -224,6 +224,144 @@ CBDATA_CLASS(ConnectStateData); }; +/// an unordered collection of unique descriptors with O(1) complexity +class DescriptorSet { +// \todo: Should we use std::set with its flexibility? Our implementation +// has constant overhead, which is smaller than log(n) of std::set. +public: + // for STL compatibility, should we decide to switch to std::set or similar + typedef const int *const_iterator; + + DescriptorSet(); + ~DescriptorSet(); + + /// checks whether fd is in the set + bool has(const int fd) const { return 0 <= fd && fd < capacity_ && + index_[fd] >= 0; } + + bool add(int fd); /// adds if unique; returns true if added + bool del(int fd); /// deletes if there; returns true if deleted + int pop(); /// deletes and returns one descriptor, in unspecified order + + bool empty() const { return !size_; } /// number of descriptors in the set + + /// begin iterator a la STL; may become invalid if the object is modified + const_iterator begin() const { return descriptors_; } + /// end iterator a la STL; may become invalid if the object is modified + const_iterator end() const { return begin() + size_; } + + void print(std::ostream &os) const; + +private: + // these would be easy to support when needed; prohibit for now + DescriptorSet(const DescriptorSet &s); // declared but undefined + DescriptorSet &operator =(const DescriptorSet &s); // declared, undefined + + int *descriptors_; /// descriptor values in random order + int *index_; /// descriptor:position index into descriptors_ + int capacity_; /// number of available descriptor slots + int size_; /// number of descriptors in the set +}; + +inline std::ostream & +operator <<(std::ostream &os, const DescriptorSet &ds) +{ + ds.print(os); + return os; +} + +static DescriptorSet *TheHalfClosed = NULL; /// the set of half-closed FDs +static bool WillCheckHalfClosed = false; /// true if check is scheduled +static EVH commHalfClosedCheck; +static void commPlanHalfClosedCheck(); + +DescriptorSet::DescriptorSet(): descriptors_(NULL), index_(NULL), + capacity_(0), size_(0) +{ + // we allocate once and never realloc, at least for now + capacity_ = Squid_MaxFD; + descriptors_ = new int[capacity_]; + index_ = new int[capacity_]; + + // fill index with -1s to be able to say whether a descriptor is present + // it is not essential to fill the descriptors, but it enables more checks + for (int i = 0; i < capacity_; ++i) + index_[i] = descriptors_[i] = -1; +} + +DescriptorSet::~DescriptorSet() +{ + delete[] descriptors_; + delete[] index_; +} + +/// adds if unique; returns true if added +bool +DescriptorSet::add(int fd) +{ + assert(0 <= fd && fd < capacity_); // \todo: replace with Must() + + if (has(fd)) + return false; // already have it + + assert(size_ < capacity_); // \todo: replace with Must() + const int pos = size_++; + index_[fd] = pos; + descriptors_[pos] = fd; + return true; // really added +} + +/// deletes if there; returns true if deleted +bool +DescriptorSet::del(int fd) +{ + assert(0 <= fd && fd < capacity_); // \todo: here and below, use Must() + + if (!has(fd)) + return false; // we do not have it + + assert(!empty()); + const int delPos = index_[fd]; + assert(0 <= delPos && delPos < capacity_); + + // move the last descriptor to the deleted fd position + // to avoid skipping deleted descriptors in pop() + const int lastPos = size_-1; + const int lastFd = descriptors_[lastPos]; + assert(delPos <= lastPos); // may be the same + descriptors_[delPos] = lastFd; + index_[lastFd] = delPos; + + descriptors_[lastPos] = -1; + index_[fd] = -1; + --size_; + + return true; // really added +} + +/// ejects one descriptor in unspecified order +int +DescriptorSet::pop() +{ + assert(!empty()); + const int lastPos =--size_; + const int lastFd = descriptors_[lastPos]; + assert(0 <= lastFd && lastFd < capacity_); + + // cleanup + descriptors_[lastPos] = -1; + index_[lastFd] = -1; + + return lastFd; +} + +void +DescriptorSet::print(std::ostream &os) const +{ + // TODO: add "name" if the set is used for more than just half-closed FDs + os << size_ << " FDs"; +} + /* STATIC */ static comm_err_t commBind(int s, struct addrinfo &); @@ -347,7 +485,7 @@ // Active/passive conflicts are OK and simply cancel passive monitoring. if (ccb->active()) { // if the assertion below fails, we have an active comm_read conflict - assert(commHasHalfClosedMonitor(fd)); + assert(fd_table[fd].halfClosedReader != NULL); commStopHalfClosedMonitor(fd); assert(!ccb->active()); } @@ -1591,6 +1729,9 @@ startParams.fd = fd; ScheduleCallHere(startCall); + // a half-closed fd may lack a reader, so we stop monitoring explicitly + if (commHasHalfClosedMonitor(fd)) + commStopHalfClosedMonitor(fd); commSetTimeout(fd, -1, NULL, NULL); // notify read/write handlers @@ -1928,10 +2069,15 @@ RESERVED_FD = XMIN(100, Squid_MaxFD / 4); conn_close_pool = memPoolCreate("close_handler", sizeof(close_handler)); + + TheHalfClosed = new DescriptorSet; } void comm_exit(void) { + delete TheHalfClosed; + TheHalfClosed = NULL; + safe_free(fd_table); safe_free(fdd_table); if (fdc_table) { @@ -2395,37 +2541,65 @@ // will close the connection on read errors. void commStartHalfClosedMonitor(int fd) { + debugs(5, 5, HERE << "adding FD " << fd << " to " << *TheHalfClosed); assert(isOpen(fd)); assert(!commHasHalfClosedMonitor(fd)); - - AsyncCall::Pointer call = commCbCall(5,4, "commHalfClosedReader", - CommIoCbPtrFun(&commHalfClosedReader, NULL)); - comm_read(fd, NULL, 0, call); + (void)TheHalfClosed->add(fd); // could also assert the result + commPlanHalfClosedCheck(); // may schedule check if we added the first FD +} + +static +void +commPlanHalfClosedCheck() +{ + if (!WillCheckHalfClosed && !TheHalfClosed->empty()) { + eventAdd("commHalfClosedCheck", &commHalfClosedCheck, NULL, 1.0, 1); + WillCheckHalfClosed = true; + } +} + +/// iterates over all descriptors that may need half-closed tests and +/// calls comm_read for those that do; re-schedules the check if needed +static +void +commHalfClosedCheck(void *) { + debugs(5, 5, HERE << "checking " << *TheHalfClosed); + + typedef DescriptorSet::const_iterator DSCI; + const DSCI end = TheHalfClosed->end(); + for (DSCI i = TheHalfClosed->begin(); i != end; ++i) { + const int fd = *i; + if (!fd_table[fd].halfClosedReader) { // not reading already + AsyncCall::Pointer call = commCbCall(5,4, "commHalfClosedReader", + CommIoCbPtrFun(&commHalfClosedReader, NULL)); + comm_read(fd, NULL, 0, call); + fd_table[fd].halfClosedReader = call; + } + } + + WillCheckHalfClosed = false; // as far as we know + commPlanHalfClosedCheck(); // may need to check again } /// checks whether we are waiting for possibly half-closed connection to close // We are monitoring if the read handler for the fd is the monitoring handler. bool commHasHalfClosedMonitor(int fd) { - assert(isOpen(fd)); - - if (const comm_io_callback_t *cb = COMMIO_FD_READCB(fd)) { - AsyncCall::Pointer call = cb->callback; - if (call != NULL) { - // check whether the callback has the right type (it should) - // and uses commHalfClosedReader as the address to call back - typedef CommIoCbPtrFun IoDialer; - if (IoDialer *d = dynamic_cast(call->getDialer())) - return d->handler == &commHalfClosedReader; - } - } - return false; + return TheHalfClosed->has(fd); } /// stop waiting for possibly half-closed connection to close static void commStopHalfClosedMonitor(int const fd) { - comm_read_cancel(fd, &commHalfClosedReader, NULL); + debugs(5, 5, HERE << "removing FD " << fd << " from " << *TheHalfClosed); + + // cancel the read if one was scheduled + AsyncCall::Pointer reader = fd_table[fd].halfClosedReader; + if (reader != NULL) + comm_read_cancel(fd, reader); + fd_table[fd].halfClosedReader = NULL; + + TheHalfClosed->del(fd); } /// I/O handler for the possibly half-closed connection monitoring code @@ -2433,6 +2607,9 @@ commHalfClosedReader(int fd, char *, size_t size, comm_err_t flag, int, void *) { // there cannot be more data coming in on half-closed connections assert(size == 0); + assert(commHasHalfClosedMonitor(fd)); // or we would have canceled the read + + fd_table[fd].halfClosedReader = NULL; // done reading, for now // nothing to do if fd is being closed if (flag == COMM_ERR_CLOSING) @@ -2446,7 +2623,7 @@ } // continue waiting for close or error - commStartHalfClosedMonitor(fd); + commPlanHalfClosedCheck(); // make sure this fd will be checked again } === modified file 'src/fde.h' --- src/fde.h 2008-09-11 05:58:32 +0000 +++ src/fde.h 2008-09-24 21:14:03 +0000 @@ -101,6 +101,7 @@ time_t timeout; void *lifetime_data; AsyncCall::Pointer closeHandler; + AsyncCall::Pointer halfClosedReader; /// read handler for half-closed fds CommWriteStateData *wstate; /* State data for comm_write */ READ_HANDLER *read_method; WRITE_HANDLER *write_method; @@ -121,6 +122,7 @@ inline void clear() { timeoutHandler = NULL; closeHandler = NULL; + halfClosedReader = NULL; // XXX: the following memset may corrupt or leak new or changed members memset(this, 0, sizeof(fde)); local_addr.SetEmpty(); // IPAddress likes to be setup nicely.