diff options
author | Brion Vibber <brion@status.net> | 2010-01-12 19:57:15 -0800 |
---|---|---|
committer | Brion Vibber <brion@status.net> | 2010-01-12 20:45:09 -0800 |
commit | ec145b73fc91dd54695dd374c8a71a11e233b8c0 (patch) | |
tree | d4e718b0f5bdc917ea7eb6b951b2364e88078dec /classes | |
parent | 2b10e359fea9d6aabc5ab35557954a503bea730b (diff) |
Major refactoring of queue handlers to support running multiple sites in one daemon.
Key changes:
* Initialization code moved from common.php to StatusNet class;
can now switch configurations during runtime.
* As a consequence, configuration files must now be idempotent...
Be careful with constant, function or class definitions.
* Control structure for daemons/QueueManager/QueueHandler has been refactored;
the run loop is now managed by IoMaster run via scripts/queuedaemon.php
IoManager subclasses are woken to handle socket input or polling, and may
cover multiple sites.
* Plugins can implement notice queue handlers more easily by registering a
QueueHandler class; no more need to add a daemon.
The new QueueDaemon runs from scripts/queuedaemon.php:
* This replaces most of the old *handler.php scripts; they've been refactored
to the bare handler classes.
* Spawns multiple child processes to spread load; defaults to CPU count on
Linux and Mac OS X systems, or override with --threads=N
* When multithreaded, child processes are automatically respawned on failure.
* Threads gracefully shut down and restart when passing a soft memory limit
(defaults to 90% of memory_limit), limiting damage from memory leaks.
* Support for UDP-based monitoring: http://www.gitorious.org/snqmon
Rough control flow diagram:
QueueDaemon -> IoMaster -> IoManager
QueueManager [listen or poll] -> QueueHandler
XmppManager [ping & keepalive]
XmppConfirmManager [poll updates]
Todo:
* Respawning features not currently available running single-threaded.
* When running single-site, configuration changes aren't picked up.
* New sites or config changes affecting queue subscriptions are not yet
handled without a daemon restart.
* SNMP monitoring output to integrate with general tools (nagios, ganglia)
* Convert XMPP confirmation message sends to use stomp queue instead of polling
* Convert xmppdaemon.php to IoManager?
* Convert Twitter status, friends import polling daemons to IoManager
* Clean up some error reporting and failure modes
* May need to adjust queue priorities for best perf in backlog/flood cases
Detailed code history available in my daemon-work branch:
http://www.gitorious.org/~brion/statusnet/brion-fixes/commits/daemon-work
Diffstat (limited to 'classes')
-rw-r--r-- | classes/Memcached_DataObject.php | 23 | ||||
-rw-r--r-- | classes/Queue_item.php | 9 | ||||
-rw-r--r-- | classes/Status_network.php | 23 |
3 files changed, 49 insertions, 6 deletions
diff --git a/classes/Memcached_DataObject.php b/classes/Memcached_DataObject.php index 9c9994cea..4ecab9db6 100644 --- a/classes/Memcached_DataObject.php +++ b/classes/Memcached_DataObject.php @@ -331,6 +331,29 @@ class Memcached_DataObject extends DB_DataObject $exists = false; } + // @fixme horrible evil hack! + // + // In multisite configuration we don't want to keep around a separate + // connection for every database; we could end up with thousands of + // connections open per thread. In an ideal world we might keep + // a connection per server and select different databases, but that'd + // be reliant on having the same db username/pass as well. + // + // MySQL connections are cheap enough we're going to try just + // closing out the old connection and reopening when we encounter + // a new DSN. + // + // WARNING WARNING if we end up actually using multiple DBs at a time + // we'll need some fancier logic here. + if (!$exists && !empty($_DB_DATAOBJECT['CONNECTIONS'])) { + foreach ($_DB_DATAOBJECT['CONNECTIONS'] as $index => $conn) { + if (!empty($conn)) { + $conn->disconnect(); + } + unset($_DB_DATAOBJECT['CONNECTIONS'][$index]); + } + } + $result = parent::_connect(); if ($result && !$exists) { diff --git a/classes/Queue_item.php b/classes/Queue_item.php index 9c673540d..cf805a606 100644 --- a/classes/Queue_item.php +++ b/classes/Queue_item.php @@ -25,10 +25,12 @@ class Queue_item extends Memcached_DataObject function sequenceKey() { return array(false, false); } - static function top($transport) { + static function top($transport=null) { $qi = new Queue_item(); - $qi->transport = $transport; + if ($transport) { + $qi->transport = $transport; + } $qi->orderBy('created'); $qi->whereAdd('claimed is null'); @@ -40,7 +42,8 @@ class Queue_item extends Memcached_DataObject # XXX: potential race condition # can we force it to only update if claimed is still null # (or old)? - common_log(LOG_INFO, 'claiming queue item = ' . $qi->notice_id . ' for transport ' . $transport); + common_log(LOG_INFO, 'claiming queue item = ' . $qi->notice_id . + ' for transport ' . $qi->transport); $orig = clone($qi); $qi->claimed = common_sql_now(); $result = $qi->update($orig); diff --git a/classes/Status_network.php b/classes/Status_network.php index 776f6abb0..ef8e1ed43 100644 --- a/classes/Status_network.php +++ b/classes/Status_network.php @@ -49,6 +49,13 @@ class Status_network extends DB_DataObject static $cache = null; static $base = null; + /** + * @param string $dbhost + * @param string $dbuser + * @param string $dbpass + * @param string $dbname + * @param array $servers memcached servers to use for caching config info + */ static function setupDB($dbhost, $dbuser, $dbpass, $dbname, $servers) { global $config; @@ -60,12 +67,17 @@ class Status_network extends DB_DataObject if (class_exists('Memcache')) { self::$cache = new Memcache(); + // Can't close persistent connections, making forking painful. + // + // @fixme only do this in *parent* CLI processes. + // single-process and child-processes *should* use persistent. + $persist = php_sapi_name() != 'cli'; if (is_array($servers)) { foreach($servers as $server) { - self::$cache->addServer($server); + self::$cache->addServer($server, 11211, $persist); } } else { - self::$cache->addServer($servers); + self::$cache->addServer($servers, 11211, $persist); } } @@ -89,7 +101,7 @@ class Status_network extends DB_DataObject if (empty($sn)) { $sn = self::staticGet($k, $v); if (!empty($sn)) { - self::$cache->set($ck, $sn); + self::$cache->set($ck, clone($sn)); } } @@ -121,6 +133,11 @@ class Status_network extends DB_DataObject return parent::delete(); } + /** + * @param string $servername hostname + * @param string $pathname URL base path + * @param string $wildcard hostname suffix to match wildcard config + */ static function setupSite($servername, $pathname, $wildcard) { global $config; |