From ec145b73fc91dd54695dd374c8a71a11e233b8c0 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Tue, 12 Jan 2010 19:57:15 -0800 Subject: Major refactoring of queue handlers to support running multiple sites in one daemon. Key changes: * Initialization code moved from common.php to StatusNet class; can now switch configurations during runtime. * As a consequence, configuration files must now be idempotent... Be careful with constant, function or class definitions. * Control structure for daemons/QueueManager/QueueHandler has been refactored; the run loop is now managed by IoMaster run via scripts/queuedaemon.php IoManager subclasses are woken to handle socket input or polling, and may cover multiple sites. * Plugins can implement notice queue handlers more easily by registering a QueueHandler class; no more need to add a daemon. The new QueueDaemon runs from scripts/queuedaemon.php: * This replaces most of the old *handler.php scripts; they've been refactored to the bare handler classes. * Spawns multiple child processes to spread load; defaults to CPU count on Linux and Mac OS X systems, or override with --threads=N * When multithreaded, child processes are automatically respawned on failure. * Threads gracefully shut down and restart when passing a soft memory limit (defaults to 90% of memory_limit), limiting damage from memory leaks. * Support for UDP-based monitoring: http://www.gitorious.org/snqmon Rough control flow diagram: QueueDaemon -> IoMaster -> IoManager QueueManager [listen or poll] -> QueueHandler XmppManager [ping & keepalive] XmppConfirmManager [poll updates] Todo: * Respawning features not currently available running single-threaded. * When running single-site, configuration changes aren't picked up. * New sites or config changes affecting queue subscriptions are not yet handled without a daemon restart. * SNMP monitoring output to integrate with general tools (nagios, ganglia) * Convert XMPP confirmation message sends to use stomp queue instead of polling * Convert xmppdaemon.php to IoManager? * Convert Twitter status, friends import polling daemons to IoManager * Clean up some error reporting and failure modes * May need to adjust queue priorities for best perf in backlog/flood cases Detailed code history available in my daemon-work branch: http://www.gitorious.org/~brion/statusnet/brion-fixes/commits/daemon-work --- lib/jabber.php | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) (limited to 'lib/jabber.php') diff --git a/lib/jabber.php b/lib/jabber.php index a821856a8..1d0bb9423 100644 --- a/lib/jabber.php +++ b/lib/jabber.php @@ -86,7 +86,11 @@ class Sharing_XMPP extends XMPPHP_XMPP } /** - * connect the configured Jabber account to the configured server + * Lazy-connect the configured Jabber account to the configured server; + * if already opened, the same connection will be returned. + * + * In a multi-site background process, each site configuration + * will get its own connection. * * @param string $resource Resource to connect (defaults to configured resource) * @@ -95,16 +99,19 @@ class Sharing_XMPP extends XMPPHP_XMPP function jabber_connect($resource=null) { - static $conn = null; - if (!$conn) { + static $connections = array(); + $site = common_config('site', 'server'); + if (empty($connections[$site])) { + if (empty($resource)) { + $resource = common_config('xmpp', 'resource'); + } $conn = new Sharing_XMPP(common_config('xmpp', 'host') ? common_config('xmpp', 'host') : common_config('xmpp', 'server'), common_config('xmpp', 'port'), common_config('xmpp', 'user'), common_config('xmpp', 'password'), - ($resource) ? $resource : - common_config('xmpp', 'resource'), + $resource, common_config('xmpp', 'server'), common_config('xmpp', 'debug') ? true : false, @@ -115,12 +122,16 @@ function jabber_connect($resource=null) if (!$conn) { return false; } + $connections[$site] = $conn; $conn->autoSubscribe(); $conn->useEncryption(common_config('xmpp', 'encryption')); try { - $conn->connect(true); // true = persistent connection + common_log(LOG_INFO, __METHOD__ . ": connecting " . + common_config('xmpp', 'user') . '/' . $resource); + //$conn->connect(true); // true = persistent connection + $conn->connect(); // persistent connections break multisite } catch (XMPPHP_Exception $e) { common_log(LOG_ERR, $e->getMessage()); return false; @@ -128,7 +139,7 @@ function jabber_connect($resource=null) $conn->processUntil('session_start'); } - return $conn; + return $connections[$site]; } /** -- cgit v1.2.3-54-g00ecf From 23599da91e9d4abbd6e73ef3f44c58f302a5231a Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 13 Jan 2010 01:13:49 -0800 Subject: refactor jabber broadcast for notice_inbox removal --- lib/jabber.php | 95 +++++++++++++++++++--------------------------------------- 1 file changed, 30 insertions(+), 65 deletions(-) (limited to 'lib/jabber.php') diff --git a/lib/jabber.php b/lib/jabber.php index 1d0bb9423..69f0d3570 100644 --- a/lib/jabber.php +++ b/lib/jabber.php @@ -356,77 +356,42 @@ function jabber_broadcast_notice($notice) $conn = jabber_connect(); - // First, get users to whom this is a direct reply - $user = new User(); - $UT = common_config('db','type')=='pgsql'?'"user"':'user'; - $user->query("SELECT $UT.id, $UT.jabber " . - "FROM $UT JOIN reply ON $UT.id = reply.profile_id " . - 'WHERE reply.notice_id = ' . $notice->id . ' ' . - "AND $UT.jabber is not null " . - "AND $UT.jabbernotify = 1 " . - "AND $UT.jabberreplies = 1 "); - - while ($user->fetch()) { + $ni = $notice->whoGets(); + + foreach ($ni as $user_id => $reason) { + $user = User::staticGet('user_id', $user_id); + if (empty($user) || + empty($user->jabber) || + !$user->jabbernotify) { + // either not a local user, or just not found + continue; + } + switch ($reason) { + case NOTICE_INBOX_SOURCE_REPLY: + if (!$user->jabberreplies) { + continue; + } + break; + case NOTICE_INBOX_SOURCE_SUB: + $sub = Subscription::pkeyGet(array('subscriber' => $user->id, + 'subscribed' => $notice->profile_id)); + if (empty($sub) || !$sub->jabber) { + continue; + } + break; + case NOTICE_INBOX_SOURCE_GROUP: + break; + default: + throw new Exception(_("Unknown inbox source.")); + } + common_log(LOG_INFO, - 'Sending reply notice ' . $notice->id . ' to ' . $user->jabber, + 'Sending notice ' . $notice->id . ' to ' . $user->jabber, __FILE__); $conn->message($user->jabber, $msg, 'chat', null, $entry); $conn->processTime(0); - $sent_to[$user->id] = 1; } - $user->free(); - - // Now, get users subscribed to this profile - - $user = new User(); - $user->query("SELECT $UT.id, $UT.jabber " . - "FROM $UT JOIN subscription " . - "ON $UT.id = subscription.subscriber " . - 'WHERE subscription.subscribed = ' . $notice->profile_id . ' ' . - "AND $UT.jabber is not null " . - "AND $UT.jabbernotify = 1 " . - 'AND subscription.jabber = 1 '); - - while ($user->fetch()) { - if (!array_key_exists($user->id, $sent_to)) { - common_log(LOG_INFO, - 'Sending notice ' . $notice->id . ' to ' . $user->jabber, - __FILE__); - $conn->message($user->jabber, $msg, 'chat', null, $entry); - // To keep the incoming queue from filling up, - // we service it after each send. - $conn->processTime(0); - $sent_to[$user->id] = 1; - } - } - - // Now, get users who have it in their inbox because of groups - - $user = new User(); - $user->query("SELECT $UT.id, $UT.jabber " . - "FROM $UT JOIN notice_inbox " . - "ON $UT.id = notice_inbox.user_id " . - 'WHERE notice_inbox.notice_id = ' . $notice->id . ' ' . - 'AND notice_inbox.source = 2 ' . - "AND $UT.jabber is not null " . - "AND $UT.jabbernotify = 1 "); - - while ($user->fetch()) { - if (!array_key_exists($user->id, $sent_to)) { - common_log(LOG_INFO, - 'Sending notice ' . $notice->id . ' to ' . $user->jabber, - __FILE__); - $conn->message($user->jabber, $msg, 'chat', null, $entry); - // To keep the incoming queue from filling up, - // we service it after each send. - $conn->processTime(0); - $sent_to[$user->id] = 1; - } - } - - $user->free(); - return true; } -- cgit v1.2.3-54-g00ecf From 0e1f2d4b47e5e340679c4245b62e1d64c6b9c9b9 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Wed, 13 Jan 2010 18:17:36 +0000 Subject: Fix regressions in XMPP output during inbox refactoring - NOTICE_INBOX_SOURCE_* constants moved to common.php since Notice_inbox.php not always loaded - fixed typo in User::staticGet() call which caused user #1 to receive messages once for each subscriber instead of for him/herself - 'continue' -> 'continue 2' inside switch() statement to fix loop escape (PHP considers switch() a looping construct for break & continue) --- classes/Notice_inbox.php | 6 ------ lib/common.php | 6 ++++++ lib/jabber.php | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) (limited to 'lib/jabber.php') diff --git a/classes/Notice_inbox.php b/classes/Notice_inbox.php index 6c328e685..c27dcdfd6 100644 --- a/classes/Notice_inbox.php +++ b/classes/Notice_inbox.php @@ -31,12 +31,6 @@ define('NOTICE_INBOX_GC_MAX', 12800); define('NOTICE_INBOX_LIMIT', 1000); define('NOTICE_INBOX_SOFT_LIMIT', 1000); -define('NOTICE_INBOX_SOURCE_SUB', 1); -define('NOTICE_INBOX_SOURCE_GROUP', 2); -define('NOTICE_INBOX_SOURCE_REPLY', 3); -define('NOTICE_INBOX_SOURCE_FORWARD', 4); -define('NOTICE_INBOX_SOURCE_GATEWAY', -1); - class Notice_inbox extends Memcached_DataObject { ###START_AUTOCODE diff --git a/lib/common.php b/lib/common.php index 61decebb7..00e80373e 100644 --- a/lib/common.php +++ b/lib/common.php @@ -41,6 +41,12 @@ define('FOREIGN_NOTICE_SEND_REPLY', 4); define('FOREIGN_FRIEND_SEND', 1); define('FOREIGN_FRIEND_RECV', 2); +define('NOTICE_INBOX_SOURCE_SUB', 1); +define('NOTICE_INBOX_SOURCE_GROUP', 2); +define('NOTICE_INBOX_SOURCE_REPLY', 3); +define('NOTICE_INBOX_SOURCE_FORWARD', 4); +define('NOTICE_INBOX_SOURCE_GATEWAY', -1); + # append our extlib dir as the last-resort place to find libs set_include_path(get_include_path() . PATH_SEPARATOR . INSTALLDIR . '/extlib/'); diff --git a/lib/jabber.php b/lib/jabber.php index 69f0d3570..4cdfa6746 100644 --- a/lib/jabber.php +++ b/lib/jabber.php @@ -359,7 +359,7 @@ function jabber_broadcast_notice($notice) $ni = $notice->whoGets(); foreach ($ni as $user_id => $reason) { - $user = User::staticGet('user_id', $user_id); + $user = User::staticGet($user_id); if (empty($user) || empty($user->jabber) || !$user->jabbernotify) { @@ -369,20 +369,20 @@ function jabber_broadcast_notice($notice) switch ($reason) { case NOTICE_INBOX_SOURCE_REPLY: if (!$user->jabberreplies) { - continue; + continue 2; } break; case NOTICE_INBOX_SOURCE_SUB: $sub = Subscription::pkeyGet(array('subscriber' => $user->id, 'subscribed' => $notice->profile_id)); if (empty($sub) || !$sub->jabber) { - continue; + continue 2; } break; case NOTICE_INBOX_SOURCE_GROUP: break; default: - throw new Exception(_("Unknown inbox source.")); + throw new Exception(sprintf(_("Unknown inbox source %d."), $reason)); } common_log(LOG_INFO, -- cgit v1.2.3-54-g00ecf