diff options
Diffstat (limited to 'lib/iomaster.php')
-rw-r--r-- | lib/iomaster.php | 361 |
1 files changed, 361 insertions, 0 deletions
diff --git a/lib/iomaster.php b/lib/iomaster.php new file mode 100644 index 000000000..aff5b145c --- /dev/null +++ b/lib/iomaster.php @@ -0,0 +1,361 @@ +<?php +/** + * StatusNet, the distributed open-source microblogging tool + * + * I/O manager to wrap around socket-reading and polling queue & connection managers. + * + * PHP version 5 + * + * LICENCE: This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * @category QueueManager + * @package StatusNet + * @author Brion Vibber <brion@status.net> + * @copyright 2009 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://status.net/ + */ + +class IoMaster +{ + public $id; + + protected $multiSite = false; + protected $managers = array(); + protected $singletons = array(); + + protected $pollTimeouts = array(); + protected $lastPoll = array(); + + /** + * @param string $id process ID to use in logging/monitoring + */ + public function __construct($id) + { + $this->id = $id; + $this->monitor = new QueueMonitor(); + } + + public function init($multiSite=null) + { + if ($multiSite !== null) { + $this->multiSite = $multiSite; + } + if ($this->multiSite) { + $this->sites = $this->findAllSites(); + } else { + $this->sites = array(common_config('site', 'server')); + } + + if (empty($this->sites)) { + throw new Exception("Empty status_network table, cannot init"); + } + + foreach ($this->sites as $site) { + if ($site != common_config('site', 'server')) { + StatusNet::init($site); + } + + $classes = array(); + if (Event::handle('StartIoManagerClasses', array(&$classes))) { + $classes[] = 'QueueManager'; + if (common_config('xmpp', 'enabled')) { + $classes[] = 'XmppManager'; // handles pings/reconnects + $classes[] = 'XmppConfirmManager'; // polls for outgoing confirmations + } + } + Event::handle('EndIoManagerClasses', array(&$classes)); + + foreach ($classes as $class) { + $this->instantiate($class); + } + } + } + + /** + * Pull all local sites from status_network table. + * @return array of hostnames + */ + protected function findAllSites() + { + $hosts = array(); + $sn = new Status_network(); + $sn->find(); + while ($sn->fetch()) { + $hosts[] = $sn->hostname; + } + return $hosts; + } + + /** + * Instantiate an i/o manager class for the current site. + * If a multi-site capable handler is already present, + * we don't need to build a new one. + * + * @param string $class + */ + protected function instantiate($class) + { + if (isset($this->singletons[$class])) { + // Already instantiated a multi-site-capable handler. + // Just let it know it should listen to this site too! + $this->singletons[$class]->addSite(common_config('site', 'server')); + return; + } + + $manager = $this->getManager($class); + + if ($this->multiSite) { + $caps = $manager->multiSite(); + if ($caps == IoManager::SINGLE_ONLY) { + throw new Exception("$class can't run with --all; aborting."); + } + if ($caps == IoManager::INSTANCE_PER_PROCESS) { + // Save this guy for later! + // We'll only need the one to cover multiple sites. + $this->singletons[$class] = $manager; + $manager->addSite(common_config('site', 'server')); + } + } + + $this->managers[] = $manager; + } + + protected function getManager($class) + { + return call_user_func(array($class, 'get')); + } + + /** + * Basic run loop... + * + * Initialize all io managers, then sit around waiting for input. + * Between events or timeouts, pass control back to idle() method + * to allow for any additional background processing. + */ + function service() + { + $this->logState('init'); + $this->start(); + + while (true) { + $timeouts = array_values($this->pollTimeouts); + $timeouts[] = 60; // default max timeout + + // Wait for something on one of our sockets + $sockets = array(); + $managers = array(); + foreach ($this->managers as $manager) { + foreach ($manager->getSockets() as $socket) { + $sockets[] = $socket; + $managers[] = $manager; + } + $timeouts[] = intval($manager->timeout()); + } + + $timeout = min($timeouts); + if ($sockets) { + $read = $sockets; + $write = array(); + $except = array(); + $this->logState('listening'); + common_log(LOG_INFO, "Waiting up to $timeout seconds for socket data..."); + $ready = stream_select($read, $write, $except, $timeout, 0); + + if ($ready === false) { + common_log(LOG_ERR, "Error selecting on sockets"); + } else if ($ready > 0) { + foreach ($read as $socket) { + $index = array_search($socket, $sockets, true); + if ($index !== false) { + $this->logState('queue'); + $managers[$index]->handleInput($socket); + } else { + common_log(LOG_ERR, "Saw input on a socket we didn't listen to"); + } + } + } + } + + if ($timeout > 0 && empty($sockets)) { + // If we had no listeners, sleep until the pollers' next requested wakeup. + common_log(LOG_INFO, "Sleeping $timeout seconds until next poll cycle..."); + $this->logState('sleep'); + sleep($timeout); + } + + $this->logState('poll'); + $this->poll(); + + $this->logState('idle'); + $this->idle(); + + $memoryLimit = $this->softMemoryLimit(); + if ($memoryLimit > 0) { + $usage = memory_get_usage(); + if ($usage > $memoryLimit) { + common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting."); + break; + } + } + } + + $this->logState('shutdown'); + $this->finish(); + } + + /** + * Return fully-parsed soft memory limit in bytes. + * @return intval 0 or -1 if not set + */ + function softMemoryLimit() + { + $softLimit = trim(common_config('queue', 'softlimit')); + if (substr($softLimit, -1) == '%') { + $limit = trim(ini_get('memory_limit')); + $limit = $this->parseMemoryLimit($limit); + if ($limit > 0) { + return intval(substr($softLimit, 0, -1) * $limit / 100); + } else { + return -1; + } + } else { + return $this->parseMemoryLimit($limit); + } + return $softLimit; + } + + /** + * Interpret PHP shorthand for memory_limit and friends. + * Why don't they just expose the actual numeric value? :P + * @param string $mem + * @return int + */ + protected function parseMemoryLimit($mem) + { + // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes + $size = array('k' => 1024, + 'm' => 1024*1024, + 'g' => 1024*1024*1024); + if (empty($mem)) { + return 0; + } else if (is_numeric($mem)) { + return intval($mem); + } else { + $mult = strtolower(substr($mem, -1)); + if (isset($size[$mult])) { + return substr($mem, 0, -1) * $size[$mult]; + } else { + return intval($mem); + } + } + } + + function start() + { + foreach ($this->managers as $index => $manager) { + $manager->start($this); + // @fixme error check + if ($manager->pollInterval()) { + // We'll want to check for input on the first pass + $this->pollTimeouts[$index] = 0; + $this->lastPoll[$index] = 0; + } + } + } + + function finish() + { + foreach ($this->managers as $manager) { + $manager->finish(); + // @fixme error check + } + } + + /** + * Called during the idle portion of the runloop to see which handlers + */ + function poll() + { + foreach ($this->managers as $index => $manager) { + $interval = $manager->pollInterval(); + if ($interval <= 0) { + // Not a polling manager. + continue; + } + + if (isset($this->pollTimeouts[$index])) { + $timeout = $this->pollTimeouts[$index]; + if (time() - $this->lastPoll[$index] < $timeout) { + // Not time to poll yet. + continue; + } + } else { + $timeout = 0; + } + $hit = $manager->poll(); + + $this->lastPoll[$index] = time(); + if ($hit) { + // Do the next poll quickly, there may be more input! + $this->pollTimeouts[$index] = 0; + } else { + // Empty queue. Exponential backoff up to the maximum poll interval. + if ($timeout > 0) { + $timeout = min($timeout * 2, $interval); + } else { + $timeout = 1; + } + $this->pollTimeouts[$index] = $timeout; + } + } + } + + /** + * Called after each handled item or empty polling cycle. + * This is a good time to e.g. service your XMPP connection. + */ + function idle() + { + foreach ($this->managers as $manager) { + $manager->idle(); + } + } + + /** + * Send thread state update to the monitoring server, if configured. + * + * @param string $state ('init', 'queue', 'shutdown' etc) + * @param string $substate (optional, eg queue name 'omb' 'sms' etc) + */ + protected function logState($state, $substate='') + { + $this->monitor->logState($this->id, $state, $substate); + } + + /** + * Send thread stats. + * Thread ID will be implicit; other owners can be listed as well + * for per-queue and per-site records. + * + * @param string $key counter name + * @param array $owners list of owner keys like 'queue:jabber' or 'site:stat01' + */ + public function stats($key, $owners=array()) + { + $owners[] = "thread:" . $this->id; + $this->monitor->stats($key, $owners); + } +} + |