From c7507e7e9dafa6d6e054978e720e4fce3abc9929 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Fri, 22 Jan 2010 12:52:36 -0800 Subject: XMPP queued output & initial retooling of DB queue manager to support non-Notice objects. Queue handlers for XMPP individual & firehose output now send their XML stanzas to another output queue instead of connecting directly to the chat server. This lets us have as many general processing threads as we need, while all actual XMPP input and output go through a single daemon with a single connection open. This avoids problems with multiple connected resources: * multiple windows shown in some chat clients (psi, gajim, kopete) * extra load on server * incoming message delivery forwarding issues Database changes: * queue_item drops 'notice_id' in favor of a 'frame' blob. This is based on Craig Andrews' work branch to generalize queues to take any object, but conservatively leaving out the serialization for now. Table updater (preserves any existing queued items) in db/rc3to09.sql Code changes to watch out for: * Queue handlers should now define a handle() method instead of handle_notice() * QueueDaemon and XmppDaemon now share common i/o (IoMaster) and respawning thread management (RespawningDaemon) infrastructure. * The polling XmppConfirmManager has been dropped, as the message is queued directly when saving IM settings. * Enable $config['queue']['debug_memory'] to output current memory usage at each run through the event loop to watch for memory leaks To do: * Adapt XMPP i/o to component connection mode for multi-site support. * XMPP input can also be broken out to a queue, which would allow the actual notice save etc to be handled by general queue threads. * Make sure there are no problems with simply pushing serialized Notice objects to queues. * Find a way to improve interactive performance of the database-backed queue handler; polling is pretty painful to XMPP. * Possibly redo the way QueueHandlers are injected into a QueueManager. The grouping used to split out the XMPP output queue is a bit awkward. Conflicts: scripts/xmppdaemon.php --- lib/queuemanager.php | 125 +++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 101 insertions(+), 24 deletions(-) (limited to 'lib/queuemanager.php') diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 291174d3c..4eb39bfa8 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -39,6 +39,10 @@ abstract class QueueManager extends IoManager { static $qm = null; + public $master = null; + public $handlers = array(); + public $groups = array(); + /** * Factory function to pull the appropriate QueueManager object * for this site's configuration. It can then be used to queue @@ -109,6 +113,64 @@ abstract class QueueManager extends IoManager */ abstract function enqueue($object, $queue); + /** + * Build a representation for an object for logging + * @param mixed + * @return string + */ + function logrep($object) { + if (is_object($object)) { + $class = get_class($object); + if (isset($object->id)) { + return "$class $object->id"; + } + return $class; + } + if (is_string($object)) { + $len = strlen($object); + $fragment = mb_substr($object, 0, 32); + if (mb_strlen($object) > 32) { + $fragment .= '...'; + } + return "string '$fragment' ($len bytes)"; + } + return strval($object); + } + + /** + * Encode an object for queued storage. + * Next gen may use serialization. + * + * @param mixed $object + * @return string + */ + protected function encode($object) + { + if ($object instanceof Notice) { + return $object->id; + } else if (is_string($object)) { + return $object; + } else { + throw new ServerException("Can't queue this type", 500); + } + } + + /** + * Decode an object from queued storage. + * Accepts back-compat notice reference entries and strings for now. + * + * @param string + * @return mixed + */ + protected function decode($frame) + { + if (is_numeric($frame)) { + return Notice::staticGet(intval($frame)); + } else { + return $frame; + } + } + /** * Instantiate the appropriate QueueHandler class for the given queue. * @@ -131,13 +193,15 @@ abstract class QueueManager extends IoManager } /** - * Get a list of all registered queue transport names. + * Get a list of registered queue transport names to be used + * for this daemon. * * @return array of strings */ function getQueues() { - return array_keys($this->handlers); + $group = $this->activeGroup(); + return array_keys($this->groups[$group]); } /** @@ -148,33 +212,29 @@ abstract class QueueManager extends IoManager */ function initialize() { + // @fixme we'll want to be able to listen to particular queues... if (Event::handle('StartInitializeQueueManager', array($this))) { - if (!defined('XMPP_ONLY_FLAG')) { // hack! - $this->connect('plugin', 'PluginQueueHandler'); - $this->connect('omb', 'OmbQueueHandler'); - $this->connect('ping', 'PingQueueHandler'); - if (common_config('sms', 'enabled')) { - $this->connect('sms', 'SmsQueueHandler'); - } + $this->connect('plugin', 'PluginQueueHandler'); + $this->connect('omb', 'OmbQueueHandler'); + $this->connect('ping', 'PingQueueHandler'); + if (common_config('sms', 'enabled')) { + $this->connect('sms', 'SmsQueueHandler'); } // XMPP output handlers... - if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) { - $this->connect('jabber', 'JabberQueueHandler'); - $this->connect('public', 'PublicQueueHandler'); - - // @fixme this should move up a level or should get an actual queue - $this->connect('confirm', 'XmppConfirmHandler'); - } + $this->connect('jabber', 'JabberQueueHandler'); + $this->connect('public', 'PublicQueueHandler'); + + // @fixme this should get an actual queue + //$this->connect('confirm', 'XmppConfirmHandler'); + + // For compat with old plugins not registering their own handlers. + $this->connect('plugin', 'PluginQueueHandler'); + + $this->connect('xmppout', 'XmppOutQueueHandler', 'xmppdaemon'); - if (!defined('XMPP_ONLY_FLAG')) { // hack! - // For compat with old plugins not registering their own handlers. - $this->connect('plugin', 'PluginQueueHandler'); - } - } - if (!defined('XMPP_ONLY_FLAG')) { // hack! - Event::handle('EndInitializeQueueManager', array($this)); } + Event::handle('EndInitializeQueueManager', array($this)); } /** @@ -183,10 +243,27 @@ abstract class QueueManager extends IoManager * * @param string $transport * @param string $class + * @param string $group */ - public function connect($transport, $class) + public function connect($transport, $class, $group='queuedaemon') { $this->handlers[$transport] = $class; + $this->groups[$group][$transport] = $class; + } + + /** + * @return string queue group to use for this request + */ + function activeGroup() + { + $group = 'queuedaemon'; + if ($this->master) { + // hack hack + if ($this->master instanceof XmppMaster) { + return 'xmppdaemon'; + } + } + return $group; } /** -- cgit v1.2.3-54-g00ecf From e26a843caf9f6bb0d11a7128884db235ededcce0 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Mon, 25 Jan 2010 18:08:21 -0500 Subject: Offload inbox updates to a queue handler to speed up posting online Moved much of the writing that happens when posting a notice to a new queuehandler, distribqueuehandler. This updates tags, groups, replies and inboxes at queue time (or at Web time, if queues are disabled). To make this work well, I had to break up the monolithic Notice::blowCaches() and make cache blowing happen closer to where data is updated. Squashed commit of the following: commit 5257626c62750ac4ac1db0ce2b71410c5711cfa3 Author: Evan Prodromou Date: Mon Jan 25 14:56:41 2010 -0500 slightly better handling of blowing tag memory cache commit 8a22a3cdf6ec28685da129a0313e7b2a0837c9ef Author: Evan Prodromou Date: Mon Jan 25 01:42:56 2010 -0500 change 'distribute' to 'distrib' so not too long for dbqueue commit 7a063315b0f7fad27cb6fbd2bdd74e253af83e4f Author: Evan Prodromou Date: Mon Jan 25 01:39:15 2010 -0500 change handle_notice() to handle() in distributqueuehandler commit 1a39ccd28b9994137d7bfd21bb4f230546938e77 Author: Evan Prodromou Date: Mon Jan 25 16:05:25 2010 -0500 error with queuemanager commit e6b3bb93f305cfd2de71a6340b8aa6fb890049b7 Author: Evan Prodromou Date: Mon Jan 25 01:11:34 2010 -0500 Blow memcache at different point rather than one big function for Notice class commit 94d557cdc016187d1d0647ae1794cd94d6fb8ac8 Author: Evan Prodromou Date: Mon Jan 25 00:48:44 2010 -0500 Blow memcache at different point rather than one big function for Notice class commit 1c781dd08c88a35dafc5c01230b4872fd6b95182 Author: Evan Prodromou Date: Wed Jan 20 08:54:18 2010 -0500 move broadcasting and distributing to new queuehandler commit da3e46d26b84e4f028f34a13fd2ee373e4c1b954 Author: Evan Prodromou Date: Wed Jan 20 08:53:12 2010 -0500 Move distribution of notices to new distribute queue handler --- actions/apistatusesretweet.php | 2 +- actions/apistatusesupdate.php | 2 +- actions/newnotice.php | 2 +- actions/repeat.php | 2 +- classes/Inbox.php | 6 +- classes/Memcached_DataObject.php | 19 ++ classes/Notice.php | 406 +++++++++++++----------------------- classes/Notice_tag.php | 10 +- classes/User.php | 2 +- lib/command.php | 4 +- lib/distribqueuehandler.php | 84 ++++++++ lib/mailhandler.php | 2 +- lib/oauthstore.php | 2 +- lib/queuemanager.php | 3 +- lib/util.php | 2 +- plugins/Facebook/facebookaction.php | 2 +- 16 files changed, 269 insertions(+), 281 deletions(-) create mode 100644 lib/distribqueuehandler.php (limited to 'lib/queuemanager.php') diff --git a/actions/apistatusesretweet.php b/actions/apistatusesretweet.php index d9d4820c0..128c881e2 100644 --- a/actions/apistatusesretweet.php +++ b/actions/apistatusesretweet.php @@ -112,7 +112,7 @@ class ApiStatusesRetweetAction extends ApiAuthAction $repeat = $this->original->repeat($this->user->id, $this->source); - common_broadcast_notice($repeat); + $this->showNotice($repeat); } diff --git a/actions/apistatusesupdate.php b/actions/apistatusesupdate.php index f594bbf39..9d831b9db 100644 --- a/actions/apistatusesupdate.php +++ b/actions/apistatusesupdate.php @@ -250,7 +250,7 @@ class ApiStatusesUpdateAction extends ApiAuthAction $upload->attachToNotice($this->notice); } - common_broadcast_notice($this->notice); + } $this->showNotice(); diff --git a/actions/newnotice.php b/actions/newnotice.php index a4ed87bb6..78480abab 100644 --- a/actions/newnotice.php +++ b/actions/newnotice.php @@ -201,7 +201,7 @@ class NewnoticeAction extends Action $upload->attachToNotice($notice); } - common_broadcast_notice($notice); + if ($this->boolean('ajax')) { header('Content-Type: text/xml;charset=utf-8'); diff --git a/actions/repeat.php b/actions/repeat.php index b75523498..e112496bc 100644 --- a/actions/repeat.php +++ b/actions/repeat.php @@ -106,7 +106,7 @@ class RepeatAction extends Action { $repeat = $this->notice->repeat($this->user->id, 'web'); - common_broadcast_notice($repeat); + if ($this->boolean('ajax')) { $this->startHTML('text/xml;charset=utf-8'); diff --git a/classes/Inbox.php b/classes/Inbox.php index 086dba1c9..26b27d2b5 100644 --- a/classes/Inbox.php +++ b/classes/Inbox.php @@ -120,11 +120,7 @@ class Inbox extends Memcached_DataObject $notice_id, $user_id)); if ($result) { - $c = self::memcache(); - - if (!empty($c)) { - $c->delete(self::cacheKey('inbox', 'user_id', $user_id)); - } + self::blow('inbox:user_id:%d', $user_id); } return $result; diff --git a/classes/Memcached_DataObject.php b/classes/Memcached_DataObject.php index 6ddef4816..2c9dcf595 100644 --- a/classes/Memcached_DataObject.php +++ b/classes/Memcached_DataObject.php @@ -425,4 +425,23 @@ class Memcached_DataObject extends DB_DataObject return $dsn; } + + static function blow() + { + $c = self::memcache(); + + if (empty($c)) { + return false; + } + + $args = func_get_args(); + + $format = array_shift($args); + + $keyPart = vsprintf($format, $args); + + $cacheKey = common_cache_key($keyPart); + + return $c->delete($cacheKey); + } } diff --git a/classes/Notice.php b/classes/Notice.php index 38b10db04..0966697e2 100644 --- a/classes/Notice.php +++ b/classes/Notice.php @@ -94,10 +94,6 @@ class Notice extends Memcached_DataObject function delete() { - $this->blowCaches(true); - $this->blowFavesCache(true); - $this->blowSubsCache(true); - // For auditing purposes, save a record that the notice // was deleted. @@ -109,31 +105,20 @@ class Notice extends Memcached_DataObject $deleted->created = $this->created; $deleted->deleted = common_sql_now(); - $this->query('BEGIN'); - $deleted->insert(); - //Null any notices that are replies to this notice - $this->query(sprintf("UPDATE notice set reply_to = null WHERE reply_to = %d", $this->id)); - - //Null any notices that are repeats of this notice - //XXX: probably need to uncache these, too + // Clear related records - $this->query(sprintf("UPDATE notice set repeat_of = null WHERE repeat_of = %d", $this->id)); + $this->clearReplies(); + $this->clearRepeats(); + $this->clearFaves(); + $this->clearTags(); + $this->clearGroupInboxes(); - $related = array('Reply', - 'Fave', - 'Notice_tag', - 'Group_inbox', - 'Queue_item'); + // NOTE: we don't clear inboxes + // NOTE: we don't clear queue items - foreach ($related as $cls) { - $inst = new $cls(); - $inst->notice_id = $this->id; - $inst->delete(); - } $result = parent::delete(); - $this->query('COMMIT'); } function saveTags() @@ -155,6 +140,7 @@ class Notice extends Memcached_DataObject foreach(array_unique($hashtags) as $hashtag) { /* elide characters we don't want in the tag */ $this->saveTag($hashtag); + self::blow('profile:notice_ids_tagged:%d:%s', $this->profile_id, $tag->tag); } return true; } @@ -172,6 +158,9 @@ class Notice extends Memcached_DataObject $last_error->message)); return; } + + // if it's saved, blow its cache + $tag->blowCache(false); } /** @@ -331,27 +320,43 @@ class Notice extends Memcached_DataObject } } - // XXX: do we need to change this for remote users? + } + + # Clear the cache for subscribed users, so they'll update at next request + # XXX: someone clever could prepend instead of clearing the cache + $notice->blowOnInsert(); - $notice->saveTags(); + $qm = QueueManager::get(); - $groups = $notice->saveGroups(); + $qm->enqueue($notice, 'distrib'); - $recipients = $notice->saveReplies(); + return $notice; + } - $notice->addToInboxes($groups, $recipients); + function blowOnInsert() + { + self::blow('profile:notice_ids:%d', $this->profile_id); + self::blow('public'); - $notice->saveUrls(); + if ($this->conversation != $this->id) { + self::blow('notice:conversation_ids:%d', $this->conversation); + } - Event::handle('EndNoticeSave', array($notice)); + if (!empty($this->repeat_of)) { + self::blow('notice:repeats:%d', $this->repeat_of); } - # Clear the cache for subscribed users, so they'll update at next request - # XXX: someone clever could prepend instead of clearing the cache + $original = Notice::staticGet('id', $this->repeat_of); - $notice->blowCaches(); + if (!empty($original)) { + $originalUser = User::staticGet('id', $original->profile_id); + if (!empty($originalUser)) { + self::blow('user:repeats_of_me:%d', $originalUser->id); + } + } - return $notice; + $profile = Profile::staticGet($this->profile_id); + $profile->blowNoticeCount(); } /** save all urls in the notice to the db @@ -456,227 +461,6 @@ class Notice extends Memcached_DataObject return $att; } - function blowCaches($blowLast=false) - { - $this->blowSubsCache($blowLast); - $this->blowNoticeCache($blowLast); - $this->blowRepliesCache($blowLast); - $this->blowPublicCache($blowLast); - $this->blowTagCache($blowLast); - $this->blowGroupCache($blowLast); - $this->blowConversationCache($blowLast); - $this->blowRepeatCache(); - $profile = Profile::staticGet($this->profile_id); - $profile->blowNoticeCount(); - } - - function blowRepeatCache() - { - if (!empty($this->repeat_of)) { - $cache = common_memcache(); - if (!empty($cache)) { - // XXX: only blow if <100 in cache - $ck = common_cache_key('notice:repeats:'.$this->repeat_of); - $result = $cache->delete($ck); - - $user = User::staticGet('id', $this->profile_id); - - if (!empty($user)) { - $uk = common_cache_key('user:repeated_by_me:'.$user->id); - $cache->delete($uk); - $user->free(); - unset($user); - } - - $original = Notice::staticGet('id', $this->repeat_of); - - if (!empty($original)) { - $originalUser = User::staticGet('id', $original->profile_id); - if (!empty($originalUser)) { - $ouk = common_cache_key('user:repeats_of_me:'.$originalUser->id); - $cache->delete($ouk); - $originalUser->free(); - unset($originalUser); - } - $original->free(); - unset($original); - } - } - } - } - - function blowConversationCache($blowLast=false) - { - $cache = common_memcache(); - if ($cache) { - $ck = common_cache_key('notice:conversation_ids:'.$this->conversation); - $cache->delete($ck); - if ($blowLast) { - $cache->delete($ck.';last'); - } - } - } - - function blowGroupCache($blowLast=false) - { - $cache = common_memcache(); - if ($cache) { - $group_inbox = new Group_inbox(); - $group_inbox->notice_id = $this->id; - if ($group_inbox->find()) { - while ($group_inbox->fetch()) { - $cache->delete(common_cache_key('user_group:notice_ids:' . $group_inbox->group_id)); - if ($blowLast) { - $cache->delete(common_cache_key('user_group:notice_ids:' . $group_inbox->group_id.';last')); - } - $member = new Group_member(); - $member->group_id = $group_inbox->group_id; - if ($member->find()) { - while ($member->fetch()) { - $cache->delete(common_cache_key('notice_inbox:by_user:' . $member->profile_id)); - $cache->delete(common_cache_key('notice_inbox:by_user_own:' . $member->profile_id)); - if (empty($this->repeat_of)) { - $cache->delete(common_cache_key('user:friends_timeline:' . $member->profile_id)); - $cache->delete(common_cache_key('user:friends_timeline_own:' . $member->profile_id)); - } - if ($blowLast) { - $cache->delete(common_cache_key('notice_inbox:by_user:' . $member->profile_id . ';last')); - $cache->delete(common_cache_key('notice_inbox:by_user_own:' . $member->profile_id . ';last')); - if (empty($this->repeat_of)) { - $cache->delete(common_cache_key('user:friends_timeline:' . $member->profile_id . ';last')); - $cache->delete(common_cache_key('user:friends_timeline_own:' . $member->profile_id . ';last')); - } - } - } - } - } - } - $group_inbox->free(); - unset($group_inbox); - } - } - - function blowTagCache($blowLast=false) - { - $cache = common_memcache(); - if ($cache) { - $tag = new Notice_tag(); - $tag->notice_id = $this->id; - if ($tag->find()) { - while ($tag->fetch()) { - $tag->blowCache($blowLast); - $ck = 'profile:notice_ids_tagged:' . $this->profile_id . ':' . $tag->tag; - - $cache->delete($ck); - if ($blowLast) { - $cache->delete($ck . ';last'); - } - } - } - $tag->free(); - unset($tag); - } - } - - function blowSubsCache($blowLast=false) - { - $cache = common_memcache(); - if ($cache) { - $user = new User(); - - $UT = common_config('db','type')=='pgsql'?'"user"':'user'; - $user->query('SELECT id ' . - - "FROM $UT JOIN subscription ON $UT.id = subscription.subscriber " . - 'WHERE subscription.subscribed = ' . $this->profile_id); - - while ($user->fetch()) { - $cache->delete(common_cache_key('notice_inbox:by_user:'.$user->id)); - $cache->delete(common_cache_key('notice_inbox:by_user_own:'.$user->id)); - if (empty($this->repeat_of)) { - $cache->delete(common_cache_key('user:friends_timeline:'.$user->id)); - $cache->delete(common_cache_key('user:friends_timeline_own:'.$user->id)); - } - if ($blowLast) { - $cache->delete(common_cache_key('notice_inbox:by_user:'.$user->id.';last')); - $cache->delete(common_cache_key('notice_inbox:by_user_own:'.$user->id.';last')); - if (empty($this->repeat_of)) { - $cache->delete(common_cache_key('user:friends_timeline:'.$user->id.';last')); - $cache->delete(common_cache_key('user:friends_timeline_own:'.$user->id.';last')); - } - } - } - $user->free(); - unset($user); - } - } - - function blowNoticeCache($blowLast=false) - { - if ($this->is_local) { - $cache = common_memcache(); - if (!empty($cache)) { - $cache->delete(common_cache_key('profile:notice_ids:'.$this->profile_id)); - if ($blowLast) { - $cache->delete(common_cache_key('profile:notice_ids:'.$this->profile_id.';last')); - } - } - } - } - - function blowRepliesCache($blowLast=false) - { - $cache = common_memcache(); - if ($cache) { - $reply = new Reply(); - $reply->notice_id = $this->id; - if ($reply->find()) { - while ($reply->fetch()) { - $cache->delete(common_cache_key('reply:stream:'.$reply->profile_id)); - if ($blowLast) { - $cache->delete(common_cache_key('reply:stream:'.$reply->profile_id.';last')); - } - } - } - $reply->free(); - unset($reply); - } - } - - function blowPublicCache($blowLast=false) - { - if ($this->is_local == Notice::LOCAL_PUBLIC) { - $cache = common_memcache(); - if ($cache) { - $cache->delete(common_cache_key('public')); - if ($blowLast) { - $cache->delete(common_cache_key('public').';last'); - } - } - } - } - - function blowFavesCache($blowLast=false) - { - $cache = common_memcache(); - if ($cache) { - $fave = new Fave(); - $fave->notice_id = $this->id; - if ($fave->find()) { - while ($fave->fetch()) { - $cache->delete(common_cache_key('fave:ids_by_user:'.$fave->user_id)); - $cache->delete(common_cache_key('fave:by_user_own:'.$fave->user_id)); - if ($blowLast) { - $cache->delete(common_cache_key('fave:ids_by_user:'.$fave->user_id.';last')); - $cache->delete(common_cache_key('fave:by_user_own:'.$fave->user_id.';last')); - } - } - } - $fave->free(); - unset($fave); - } - } - function getStreamByIds($ids) { $cache = common_memcache(); @@ -999,7 +783,14 @@ class Notice extends Memcached_DataObject $gi->notice_id = $this->id; $gi->created = $this->created; - return $gi->insert(); + $result = $gi->insert(); + + if (!result) { + common_log_db_error($gi, 'INSERT', __FILE__); + throw new ServerException(_('Problem saving group inbox.')); + } + + self::blow('user_group:notice_ids:%d', $gi->group_id); } return true; @@ -1094,7 +885,8 @@ class Notice extends Memcached_DataObject foreach ($recipientIds as $recipientId) { $user = User::staticGet('id', $recipientId); - if ($user) { + if (!empty($user)) { + self::blow('reply:stream:%d', $reply->profile_id); mail_notify_attn($user, $this); } } @@ -1553,4 +1345,104 @@ class Notice extends Memcached_DataObject return $options; } + + function clearReplies() + { + $replyNotice = new Notice(); + $replyNotice->reply_to = $this->id; + + //Null any notices that are replies to this notice + + if ($replyNotice->find()) { + while ($replyNotice->fetch()) { + $orig = clone($replyNotice); + $replyNotice->reply_to = null; + $replyNotice->update($orig); + } + } + + // Reply records + + $reply = new Reply(); + $reply->notice_id = $this->id; + + if ($reply->find()) { + while($reply->fetch()) { + self::blow('reply:stream:%d', $reply->profile_id); + $reply->delete(); + } + } + + $reply->free(); + + return $ids; + } + + function clearRepeats() + { + $repeatNotice = new Notice(); + $repeatNotice->repeat_of = $this->id; + + //Null any notices that are repeats of this notice + + if ($repeatNotice->find()) { + while ($repeatNotice->fetch()) { + $orig = clone($repeatNotice); + $repeatNotice->repeat_of = null; + $repeatNotice->update($orig); + } + } + } + + function clearFaves() + { + $fave = new Fave(); + $fave->notice_id = $this->id; + + if ($fave->find()) { + while ($fave->fetch()) { + self::blow('fave:ids_by_user_own:%d', $fave->user_id); + self::blow('fave:ids_by_user_own:%d;last', $fave->user_id); + self::blow('fave:ids_by_user:%d', $fave->user_id); + self::blow('fave:ids_by_user:%d;last', $fave->user_id); + $fave->delete(); + } + } + + $fave->free(); + } + + function clearTags() + { + $tag = new Notice_tag(); + $tag->notice_id = $this->id; + + if ($tag->find()) { + while ($tag->fetch()) { + self::blow('profile:notice_ids_tagged:%d:%s', $this->profile_id, common_keyize($tag->tag)); + self::blow('profile:notice_ids_tagged:%d:%s;last', $this->profile_id, common_keyize($tag->tag)); + self::blow('notice_tag:notice_ids:%s', common_keyize($tag->tag)); + self::blow('notice_tag:notice_ids:%s;last', common_keyize($tag->tag)); + $tag->delete(); + } + } + + $tag->free(); + } + + function clearGroupInboxes() + { + $gi = new Group_inbox(); + + $gi->notice_id = $this->id; + + if ($gi->find()) { + while ($gi->fetch()) { + self::blow('user_group:notice_ids:%d', $gi->group_id); + $gi->delete(); + } + } + + $gi->free(); + } } diff --git a/classes/Notice_tag.php b/classes/Notice_tag.php index 79231f0b0..4fd76e8ea 100644 --- a/classes/Notice_tag.php +++ b/classes/Notice_tag.php @@ -86,13 +86,9 @@ class Notice_tag extends Memcached_DataObject function blowCache($blowLast=false) { - $cache = common_memcache(); - if ($cache) { - $idkey = common_cache_key('notice_tag:notice_ids:' . common_keyize($this->tag)); - $cache->delete($idkey); - if ($blowLast) { - $cache->delete($idkey.';last'); - } + self::blow('notice_tag:notice_ids:%s', common_keyize($this->tag)); + if ($blowLast) { + self::blow('notice_tag:notice_ids:%s;last', common_keyize($this->tag)); } } diff --git a/classes/User.php b/classes/User.php index d6b52be01..6ea975202 100644 --- a/classes/User.php +++ b/classes/User.php @@ -383,7 +383,7 @@ class User extends Memcached_DataObject common_config('site', 'name'), $user->nickname), 'system'); - common_broadcast_notice($notice); + } } diff --git a/lib/command.php b/lib/command.php index c0a32e1b1..2a51fd687 100644 --- a/lib/command.php +++ b/lib/command.php @@ -422,7 +422,7 @@ class RepeatCommand extends Command $repeat = $notice->repeat($this->user->id, $channel->source); if ($repeat) { - common_broadcast_notice($repeat); + $channel->output($this->user, sprintf(_('Notice from %s repeated'), $recipient->nickname)); } else { $channel->error($this->user, _('Error repeating notice.')); @@ -492,7 +492,7 @@ class ReplyCommand extends Command } else { $channel->error($this->user, _('Error saving notice.')); } - common_broadcast_notice($notice); + } } diff --git a/lib/distribqueuehandler.php b/lib/distribqueuehandler.php new file mode 100644 index 000000000..f458d238d --- /dev/null +++ b/lib/distribqueuehandler.php @@ -0,0 +1,84 @@ +. + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } + +/** + * Base class for queue handlers. + * + * As extensions of the Daemon class, each queue handler has the ability + * to launch itself in the background, at which point it'll pass control + * to the configured QueueManager class to poll for updates. + * + * Subclasses must override at least the following methods: + * - transport + * - handle_notice + */ + +class DistribQueueHandler +{ + /** + * Return transport keyword which identifies items this queue handler + * services; must be defined for all subclasses. + * + * Must be 8 characters or less to fit in the queue_item database. + * ex "email", "jabber", "sms", "irc", ... + * + * @return string + */ + + function transport() + { + return 'distrib'; + } + + /** + * Here's the meat of your queue handler -- you're handed a Notice + * object, which you may do as you will with. + * + * If this function indicates failure, a warning will be logged + * and the item is placed back in the queue to be re-run. + * + * @param Notice $notice + * @return boolean true on success, false on failure + */ + function handle($notice) + { + // XXX: do we need to change this for remote users? + + $notice->saveTags(); + + $groups = $notice->saveGroups(); + + $recipients = $notice->saveReplies(); + + $notice->addToInboxes($groups, $recipients); + + $notice->saveUrls(); + + Event::handle('EndNoticeSave', array($notice)); + + // Enqueue for other handlers + + common_enqueue_notice($notice); + + return true; + } +} + diff --git a/lib/mailhandler.php b/lib/mailhandler.php index 85be89f18..890f6d5b4 100644 --- a/lib/mailhandler.php +++ b/lib/mailhandler.php @@ -160,7 +160,7 @@ class MailHandler foreach($mediafiles as $mf){ $mf->attachToNotice($notice); } - common_broadcast_notice($notice); + $this->log(LOG_INFO, 'Added notice ' . $notice->id . ' from user ' . $user->nickname); return true; diff --git a/lib/oauthstore.php b/lib/oauthstore.php index df63cc151..b30fb49d5 100644 --- a/lib/oauthstore.php +++ b/lib/oauthstore.php @@ -362,7 +362,7 @@ class StatusNetOAuthDataStore extends OAuthDataStore array('is_local' => Notice::REMOTE_OMB, 'uri' => $omb_notice->getIdentifierURI())); - common_broadcast_notice($notice, true); + } /** diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 4eb39bfa8..e5cf8239e 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -217,6 +217,7 @@ abstract class QueueManager extends IoManager $this->connect('plugin', 'PluginQueueHandler'); $this->connect('omb', 'OmbQueueHandler'); $this->connect('ping', 'PingQueueHandler'); + $this->connect('distrib', 'DistribQueueHandler'); if (common_config('sms', 'enabled')) { $this->connect('sms', 'SmsQueueHandler'); } @@ -224,7 +225,7 @@ abstract class QueueManager extends IoManager // XMPP output handlers... $this->connect('jabber', 'JabberQueueHandler'); $this->connect('public', 'PublicQueueHandler'); - + // @fixme this should get an actual queue //$this->connect('confirm', 'XmppConfirmHandler'); diff --git a/lib/util.php b/lib/util.php index fb3b8be87..4312f9876 100644 --- a/lib/util.php +++ b/lib/util.php @@ -987,7 +987,7 @@ function common_redirect($url, $code=307) function common_broadcast_notice($notice, $remote=false) { - return common_enqueue_notice($notice); + // DO NOTHING! } // Stick the notice on the queue diff --git a/plugins/Facebook/facebookaction.php b/plugins/Facebook/facebookaction.php index bf9c037a5..815fee094 100644 --- a/plugins/Facebook/facebookaction.php +++ b/plugins/Facebook/facebookaction.php @@ -397,7 +397,7 @@ class FacebookAction extends Action return; } - common_broadcast_notice($notice); + } -- cgit v1.2.3-54-g00ecf