summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvan Prodromou <evan@status.net>2010-01-25 18:08:21 -0500
committerEvan Prodromou <evan@status.net>2010-01-25 18:08:21 -0500
commite26a843caf9f6bb0d11a7128884db235ededcce0 (patch)
tree487082fa439a9b952756a2196066579e14705a33
parentf3beed68898a59c40220f4c47cb81a7cd91fbd09 (diff)
Offload inbox updates to a queue handler to speed up posting online
Moved much of the writing that happens when posting a notice to a new queuehandler, distribqueuehandler. This updates tags, groups, replies and inboxes at queue time (or at Web time, if queues are disabled). To make this work well, I had to break up the monolithic Notice::blowCaches() and make cache blowing happen closer to where data is updated. Squashed commit of the following: commit 5257626c62750ac4ac1db0ce2b71410c5711cfa3 Author: Evan Prodromou <evan@status.net> Date: Mon Jan 25 14:56:41 2010 -0500 slightly better handling of blowing tag memory cache commit 8a22a3cdf6ec28685da129a0313e7b2a0837c9ef Author: Evan Prodromou <evan@status.net> Date: Mon Jan 25 01:42:56 2010 -0500 change 'distribute' to 'distrib' so not too long for dbqueue commit 7a063315b0f7fad27cb6fbd2bdd74e253af83e4f Author: Evan Prodromou <evan@status.net> Date: Mon Jan 25 01:39:15 2010 -0500 change handle_notice() to handle() in distributqueuehandler commit 1a39ccd28b9994137d7bfd21bb4f230546938e77 Author: Evan Prodromou <evan@status.net> Date: Mon Jan 25 16:05:25 2010 -0500 error with queuemanager commit e6b3bb93f305cfd2de71a6340b8aa6fb890049b7 Author: Evan Prodromou <evan@status.net> Date: Mon Jan 25 01:11:34 2010 -0500 Blow memcache at different point rather than one big function for Notice class commit 94d557cdc016187d1d0647ae1794cd94d6fb8ac8 Author: Evan Prodromou <evan@status.net> Date: Mon Jan 25 00:48:44 2010 -0500 Blow memcache at different point rather than one big function for Notice class commit 1c781dd08c88a35dafc5c01230b4872fd6b95182 Author: Evan Prodromou <evan@status.net> Date: Wed Jan 20 08:54:18 2010 -0500 move broadcasting and distributing to new queuehandler commit da3e46d26b84e4f028f34a13fd2ee373e4c1b954 Author: Evan Prodromou <evan@status.net> Date: Wed Jan 20 08:53:12 2010 -0500 Move distribution of notices to new distribute queue handler
-rw-r--r--actions/apistatusesretweet.php2
-rw-r--r--actions/apistatusesupdate.php2
-rw-r--r--actions/newnotice.php2
-rw-r--r--actions/repeat.php2
-rw-r--r--classes/Inbox.php6
-rw-r--r--classes/Memcached_DataObject.php19
-rw-r--r--classes/Notice.php406
-rw-r--r--classes/Notice_tag.php10
-rw-r--r--classes/User.php2
-rw-r--r--lib/command.php4
-rw-r--r--lib/distribqueuehandler.php84
-rw-r--r--lib/mailhandler.php2
-rw-r--r--lib/oauthstore.php2
-rw-r--r--lib/queuemanager.php3
-rw-r--r--lib/util.php2
-rw-r--r--plugins/Facebook/facebookaction.php2
16 files changed, 269 insertions, 281 deletions
diff --git a/actions/apistatusesretweet.php b/actions/apistatusesretweet.php
index d9d4820c0..128c881e2 100644
--- a/actions/apistatusesretweet.php
+++ b/actions/apistatusesretweet.php
@@ -112,7 +112,7 @@ class ApiStatusesRetweetAction extends ApiAuthAction
$repeat = $this->original->repeat($this->user->id, $this->source);
- common_broadcast_notice($repeat);
+
$this->showNotice($repeat);
}
diff --git a/actions/apistatusesupdate.php b/actions/apistatusesupdate.php
index f594bbf39..9d831b9db 100644
--- a/actions/apistatusesupdate.php
+++ b/actions/apistatusesupdate.php
@@ -250,7 +250,7 @@ class ApiStatusesUpdateAction extends ApiAuthAction
$upload->attachToNotice($this->notice);
}
- common_broadcast_notice($this->notice);
+
}
$this->showNotice();
diff --git a/actions/newnotice.php b/actions/newnotice.php
index a4ed87bb6..78480abab 100644
--- a/actions/newnotice.php
+++ b/actions/newnotice.php
@@ -201,7 +201,7 @@ class NewnoticeAction extends Action
$upload->attachToNotice($notice);
}
- common_broadcast_notice($notice);
+
if ($this->boolean('ajax')) {
header('Content-Type: text/xml;charset=utf-8');
diff --git a/actions/repeat.php b/actions/repeat.php
index b75523498..e112496bc 100644
--- a/actions/repeat.php
+++ b/actions/repeat.php
@@ -106,7 +106,7 @@ class RepeatAction extends Action
{
$repeat = $this->notice->repeat($this->user->id, 'web');
- common_broadcast_notice($repeat);
+
if ($this->boolean('ajax')) {
$this->startHTML('text/xml;charset=utf-8');
diff --git a/classes/Inbox.php b/classes/Inbox.php
index 086dba1c9..26b27d2b5 100644
--- a/classes/Inbox.php
+++ b/classes/Inbox.php
@@ -120,11 +120,7 @@ class Inbox extends Memcached_DataObject
$notice_id, $user_id));
if ($result) {
- $c = self::memcache();
-
- if (!empty($c)) {
- $c->delete(self::cacheKey('inbox', 'user_id', $user_id));
- }
+ self::blow('inbox:user_id:%d', $user_id);
}
return $result;
diff --git a/classes/Memcached_DataObject.php b/classes/Memcached_DataObject.php
index 6ddef4816..2c9dcf595 100644
--- a/classes/Memcached_DataObject.php
+++ b/classes/Memcached_DataObject.php
@@ -425,4 +425,23 @@ class Memcached_DataObject extends DB_DataObject
return $dsn;
}
+
+ static function blow()
+ {
+ $c = self::memcache();
+
+ if (empty($c)) {
+ return false;
+ }
+
+ $args = func_get_args();
+
+ $format = array_shift($args);
+
+ $keyPart = vsprintf($format, $args);
+
+ $cacheKey = common_cache_key($keyPart);
+
+ return $c->delete($cacheKey);
+ }
}
diff --git a/classes/Notice.php b/classes/Notice.php
index 38b10db04..0966697e2 100644
--- a/classes/Notice.php
+++ b/classes/Notice.php
@@ -94,10 +94,6 @@ class Notice extends Memcached_DataObject
function delete()
{
- $this->blowCaches(true);
- $this->blowFavesCache(true);
- $this->blowSubsCache(true);
-
// For auditing purposes, save a record that the notice
// was deleted.
@@ -109,31 +105,20 @@ class Notice extends Memcached_DataObject
$deleted->created = $this->created;
$deleted->deleted = common_sql_now();
- $this->query('BEGIN');
-
$deleted->insert();
- //Null any notices that are replies to this notice
- $this->query(sprintf("UPDATE notice set reply_to = null WHERE reply_to = %d", $this->id));
-
- //Null any notices that are repeats of this notice
- //XXX: probably need to uncache these, too
+ // Clear related records
- $this->query(sprintf("UPDATE notice set repeat_of = null WHERE repeat_of = %d", $this->id));
+ $this->clearReplies();
+ $this->clearRepeats();
+ $this->clearFaves();
+ $this->clearTags();
+ $this->clearGroupInboxes();
- $related = array('Reply',
- 'Fave',
- 'Notice_tag',
- 'Group_inbox',
- 'Queue_item');
+ // NOTE: we don't clear inboxes
+ // NOTE: we don't clear queue items
- foreach ($related as $cls) {
- $inst = new $cls();
- $inst->notice_id = $this->id;
- $inst->delete();
- }
$result = parent::delete();
- $this->query('COMMIT');
}
function saveTags()
@@ -155,6 +140,7 @@ class Notice extends Memcached_DataObject
foreach(array_unique($hashtags) as $hashtag) {
/* elide characters we don't want in the tag */
$this->saveTag($hashtag);
+ self::blow('profile:notice_ids_tagged:%d:%s', $this->profile_id, $tag->tag);
}
return true;
}
@@ -172,6 +158,9 @@ class Notice extends Memcached_DataObject
$last_error->message));
return;
}
+
+ // if it's saved, blow its cache
+ $tag->blowCache(false);
}
/**
@@ -331,27 +320,43 @@ class Notice extends Memcached_DataObject
}
}
- // XXX: do we need to change this for remote users?
+ }
+
+ # Clear the cache for subscribed users, so they'll update at next request
+ # XXX: someone clever could prepend instead of clearing the cache
+ $notice->blowOnInsert();
- $notice->saveTags();
+ $qm = QueueManager::get();
- $groups = $notice->saveGroups();
+ $qm->enqueue($notice, 'distrib');
- $recipients = $notice->saveReplies();
+ return $notice;
+ }
- $notice->addToInboxes($groups, $recipients);
+ function blowOnInsert()
+ {
+ self::blow('profile:notice_ids:%d', $this->profile_id);
+ self::blow('public');
- $notice->saveUrls();
+ if ($this->conversation != $this->id) {
+ self::blow('notice:conversation_ids:%d', $this->conversation);
+ }
- Event::handle('EndNoticeSave', array($notice));
+ if (!empty($this->repeat_of)) {
+ self::blow('notice:repeats:%d', $this->repeat_of);
}
- # Clear the cache for subscribed users, so they'll update at next request
- # XXX: someone clever could prepend instead of clearing the cache
+ $original = Notice::staticGet('id', $this->repeat_of);
- $notice->blowCaches();
+ if (!empty($original)) {
+ $originalUser = User::staticGet('id', $original->profile_id);
+ if (!empty($originalUser)) {
+ self::blow('user:repeats_of_me:%d', $originalUser->id);
+ }
+ }
- return $notice;
+ $profile = Profile::staticGet($this->profile_id);
+ $profile->blowNoticeCount();
}
/** save all urls in the notice to the db
@@ -456,227 +461,6 @@ class Notice extends Memcached_DataObject
return $att;
}
- function blowCaches($blowLast=false)
- {
- $this->blowSubsCache($blowLast);
- $this->blowNoticeCache($blowLast);
- $this->blowRepliesCache($blowLast);
- $this->blowPublicCache($blowLast);
- $this->blowTagCache($blowLast);
- $this->blowGroupCache($blowLast);
- $this->blowConversationCache($blowLast);
- $this->blowRepeatCache();
- $profile = Profile::staticGet($this->profile_id);
- $profile->blowNoticeCount();
- }
-
- function blowRepeatCache()
- {
- if (!empty($this->repeat_of)) {
- $cache = common_memcache();
- if (!empty($cache)) {
- // XXX: only blow if <100 in cache
- $ck = common_cache_key('notice:repeats:'.$this->repeat_of);
- $result = $cache->delete($ck);
-
- $user = User::staticGet('id', $this->profile_id);
-
- if (!empty($user)) {
- $uk = common_cache_key('user:repeated_by_me:'.$user->id);
- $cache->delete($uk);
- $user->free();
- unset($user);
- }
-
- $original = Notice::staticGet('id', $this->repeat_of);
-
- if (!empty($original)) {
- $originalUser = User::staticGet('id', $original->profile_id);
- if (!empty($originalUser)) {
- $ouk = common_cache_key('user:repeats_of_me:'.$originalUser->id);
- $cache->delete($ouk);
- $originalUser->free();
- unset($originalUser);
- }
- $original->free();
- unset($original);
- }
- }
- }
- }
-
- function blowConversationCache($blowLast=false)
- {
- $cache = common_memcache();
- if ($cache) {
- $ck = common_cache_key('notice:conversation_ids:'.$this->conversation);
- $cache->delete($ck);
- if ($blowLast) {
- $cache->delete($ck.';last');
- }
- }
- }
-
- function blowGroupCache($blowLast=false)
- {
- $cache = common_memcache();
- if ($cache) {
- $group_inbox = new Group_inbox();
- $group_inbox->notice_id = $this->id;
- if ($group_inbox->find()) {
- while ($group_inbox->fetch()) {
- $cache->delete(common_cache_key('user_group:notice_ids:' . $group_inbox->group_id));
- if ($blowLast) {
- $cache->delete(common_cache_key('user_group:notice_ids:' . $group_inbox->group_id.';last'));
- }
- $member = new Group_member();
- $member->group_id = $group_inbox->group_id;
- if ($member->find()) {
- while ($member->fetch()) {
- $cache->delete(common_cache_key('notice_inbox:by_user:' . $member->profile_id));
- $cache->delete(common_cache_key('notice_inbox:by_user_own:' . $member->profile_id));
- if (empty($this->repeat_of)) {
- $cache->delete(common_cache_key('user:friends_timeline:' . $member->profile_id));
- $cache->delete(common_cache_key('user:friends_timeline_own:' . $member->profile_id));
- }
- if ($blowLast) {
- $cache->delete(common_cache_key('notice_inbox:by_user:' . $member->profile_id . ';last'));
- $cache->delete(common_cache_key('notice_inbox:by_user_own:' . $member->profile_id . ';last'));
- if (empty($this->repeat_of)) {
- $cache->delete(common_cache_key('user:friends_timeline:' . $member->profile_id . ';last'));
- $cache->delete(common_cache_key('user:friends_timeline_own:' . $member->profile_id . ';last'));
- }
- }
- }
- }
- }
- }
- $group_inbox->free();
- unset($group_inbox);
- }
- }
-
- function blowTagCache($blowLast=false)
- {
- $cache = common_memcache();
- if ($cache) {
- $tag = new Notice_tag();
- $tag->notice_id = $this->id;
- if ($tag->find()) {
- while ($tag->fetch()) {
- $tag->blowCache($blowLast);
- $ck = 'profile:notice_ids_tagged:' . $this->profile_id . ':' . $tag->tag;
-
- $cache->delete($ck);
- if ($blowLast) {
- $cache->delete($ck . ';last');
- }
- }
- }
- $tag->free();
- unset($tag);
- }
- }
-
- function blowSubsCache($blowLast=false)
- {
- $cache = common_memcache();
- if ($cache) {
- $user = new User();
-
- $UT = common_config('db','type')=='pgsql'?'"user"':'user';
- $user->query('SELECT id ' .
-
- "FROM $UT JOIN subscription ON $UT.id = subscription.subscriber " .
- 'WHERE subscription.subscribed = ' . $this->profile_id);
-
- while ($user->fetch()) {
- $cache->delete(common_cache_key('notice_inbox:by_user:'.$user->id));
- $cache->delete(common_cache_key('notice_inbox:by_user_own:'.$user->id));
- if (empty($this->repeat_of)) {
- $cache->delete(common_cache_key('user:friends_timeline:'.$user->id));
- $cache->delete(common_cache_key('user:friends_timeline_own:'.$user->id));
- }
- if ($blowLast) {
- $cache->delete(common_cache_key('notice_inbox:by_user:'.$user->id.';last'));
- $cache->delete(common_cache_key('notice_inbox:by_user_own:'.$user->id.';last'));
- if (empty($this->repeat_of)) {
- $cache->delete(common_cache_key('user:friends_timeline:'.$user->id.';last'));
- $cache->delete(common_cache_key('user:friends_timeline_own:'.$user->id.';last'));
- }
- }
- }
- $user->free();
- unset($user);
- }
- }
-
- function blowNoticeCache($blowLast=false)
- {
- if ($this->is_local) {
- $cache = common_memcache();
- if (!empty($cache)) {
- $cache->delete(common_cache_key('profile:notice_ids:'.$this->profile_id));
- if ($blowLast) {
- $cache->delete(common_cache_key('profile:notice_ids:'.$this->profile_id.';last'));
- }
- }
- }
- }
-
- function blowRepliesCache($blowLast=false)
- {
- $cache = common_memcache();
- if ($cache) {
- $reply = new Reply();
- $reply->notice_id = $this->id;
- if ($reply->find()) {
- while ($reply->fetch()) {
- $cache->delete(common_cache_key('reply:stream:'.$reply->profile_id));
- if ($blowLast) {
- $cache->delete(common_cache_key('reply:stream:'.$reply->profile_id.';last'));
- }
- }
- }
- $reply->free();
- unset($reply);
- }
- }
-
- function blowPublicCache($blowLast=false)
- {
- if ($this->is_local == Notice::LOCAL_PUBLIC) {
- $cache = common_memcache();
- if ($cache) {
- $cache->delete(common_cache_key('public'));
- if ($blowLast) {
- $cache->delete(common_cache_key('public').';last');
- }
- }
- }
- }
-
- function blowFavesCache($blowLast=false)
- {
- $cache = common_memcache();
- if ($cache) {
- $fave = new Fave();
- $fave->notice_id = $this->id;
- if ($fave->find()) {
- while ($fave->fetch()) {
- $cache->delete(common_cache_key('fave:ids_by_user:'.$fave->user_id));
- $cache->delete(common_cache_key('fave:by_user_own:'.$fave->user_id));
- if ($blowLast) {
- $cache->delete(common_cache_key('fave:ids_by_user:'.$fave->user_id.';last'));
- $cache->delete(common_cache_key('fave:by_user_own:'.$fave->user_id.';last'));
- }
- }
- }
- $fave->free();
- unset($fave);
- }
- }
-
function getStreamByIds($ids)
{
$cache = common_memcache();
@@ -999,7 +783,14 @@ class Notice extends Memcached_DataObject
$gi->notice_id = $this->id;
$gi->created = $this->created;
- return $gi->insert();
+ $result = $gi->insert();
+
+ if (!result) {
+ common_log_db_error($gi, 'INSERT', __FILE__);
+ throw new ServerException(_('Problem saving group inbox.'));
+ }
+
+ self::blow('user_group:notice_ids:%d', $gi->group_id);
}
return true;
@@ -1094,7 +885,8 @@ class Notice extends Memcached_DataObject
foreach ($recipientIds as $recipientId) {
$user = User::staticGet('id', $recipientId);
- if ($user) {
+ if (!empty($user)) {
+ self::blow('reply:stream:%d', $reply->profile_id);
mail_notify_attn($user, $this);
}
}
@@ -1553,4 +1345,104 @@ class Notice extends Memcached_DataObject
return $options;
}
+
+ function clearReplies()
+ {
+ $replyNotice = new Notice();
+ $replyNotice->reply_to = $this->id;
+
+ //Null any notices that are replies to this notice
+
+ if ($replyNotice->find()) {
+ while ($replyNotice->fetch()) {
+ $orig = clone($replyNotice);
+ $replyNotice->reply_to = null;
+ $replyNotice->update($orig);
+ }
+ }
+
+ // Reply records
+
+ $reply = new Reply();
+ $reply->notice_id = $this->id;
+
+ if ($reply->find()) {
+ while($reply->fetch()) {
+ self::blow('reply:stream:%d', $reply->profile_id);
+ $reply->delete();
+ }
+ }
+
+ $reply->free();
+
+ return $ids;
+ }
+
+ function clearRepeats()
+ {
+ $repeatNotice = new Notice();
+ $repeatNotice->repeat_of = $this->id;
+
+ //Null any notices that are repeats of this notice
+
+ if ($repeatNotice->find()) {
+ while ($repeatNotice->fetch()) {
+ $orig = clone($repeatNotice);
+ $repeatNotice->repeat_of = null;
+ $repeatNotice->update($orig);
+ }
+ }
+ }
+
+ function clearFaves()
+ {
+ $fave = new Fave();
+ $fave->notice_id = $this->id;
+
+ if ($fave->find()) {
+ while ($fave->fetch()) {
+ self::blow('fave:ids_by_user_own:%d', $fave->user_id);
+ self::blow('fave:ids_by_user_own:%d;last', $fave->user_id);
+ self::blow('fave:ids_by_user:%d', $fave->user_id);
+ self::blow('fave:ids_by_user:%d;last', $fave->user_id);
+ $fave->delete();
+ }
+ }
+
+ $fave->free();
+ }
+
+ function clearTags()
+ {
+ $tag = new Notice_tag();
+ $tag->notice_id = $this->id;
+
+ if ($tag->find()) {
+ while ($tag->fetch()) {
+ self::blow('profile:notice_ids_tagged:%d:%s', $this->profile_id, common_keyize($tag->tag));
+ self::blow('profile:notice_ids_tagged:%d:%s;last', $this->profile_id, common_keyize($tag->tag));
+ self::blow('notice_tag:notice_ids:%s', common_keyize($tag->tag));
+ self::blow('notice_tag:notice_ids:%s;last', common_keyize($tag->tag));
+ $tag->delete();
+ }
+ }
+
+ $tag->free();
+ }
+
+ function clearGroupInboxes()
+ {
+ $gi = new Group_inbox();
+
+ $gi->notice_id = $this->id;
+
+ if ($gi->find()) {
+ while ($gi->fetch()) {
+ self::blow('user_group:notice_ids:%d', $gi->group_id);
+ $gi->delete();
+ }
+ }
+
+ $gi->free();
+ }
}
diff --git a/classes/Notice_tag.php b/classes/Notice_tag.php
index 79231f0b0..4fd76e8ea 100644
--- a/classes/Notice_tag.php
+++ b/classes/Notice_tag.php
@@ -86,13 +86,9 @@ class Notice_tag extends Memcached_DataObject
function blowCache($blowLast=false)
{
- $cache = common_memcache();
- if ($cache) {
- $idkey = common_cache_key('notice_tag:notice_ids:' . common_keyize($this->tag));
- $cache->delete($idkey);
- if ($blowLast) {
- $cache->delete($idkey.';last');
- }
+ self::blow('notice_tag:notice_ids:%s', common_keyize($this->tag));
+ if ($blowLast) {
+ self::blow('notice_tag:notice_ids:%s;last', common_keyize($this->tag));
}
}
diff --git a/classes/User.php b/classes/User.php
index d6b52be01..6ea975202 100644
--- a/classes/User.php
+++ b/classes/User.php
@@ -383,7 +383,7 @@ class User extends Memcached_DataObject
common_config('site', 'name'),
$user->nickname),
'system');
- common_broadcast_notice($notice);
+
}
}
diff --git a/lib/command.php b/lib/command.php
index c0a32e1b1..2a51fd687 100644
--- a/lib/command.php
+++ b/lib/command.php
@@ -422,7 +422,7 @@ class RepeatCommand extends Command
$repeat = $notice->repeat($this->user->id, $channel->source);
if ($repeat) {
- common_broadcast_notice($repeat);
+
$channel->output($this->user, sprintf(_('Notice from %s repeated'), $recipient->nickname));
} else {
$channel->error($this->user, _('Error repeating notice.'));
@@ -492,7 +492,7 @@ class ReplyCommand extends Command
} else {
$channel->error($this->user, _('Error saving notice.'));
}
- common_broadcast_notice($notice);
+
}
}
diff --git a/lib/distribqueuehandler.php b/lib/distribqueuehandler.php
new file mode 100644
index 000000000..f458d238d
--- /dev/null
+++ b/lib/distribqueuehandler.php
@@ -0,0 +1,84 @@
+<?php
+/*
+ * StatusNet - the distributed open-source microblogging tool
+ * Copyright (C) 2008, 2009, StatusNet, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); }
+
+/**
+ * Base class for queue handlers.
+ *
+ * As extensions of the Daemon class, each queue handler has the ability
+ * to launch itself in the background, at which point it'll pass control
+ * to the configured QueueManager class to poll for updates.
+ *
+ * Subclasses must override at least the following methods:
+ * - transport
+ * - handle_notice
+ */
+
+class DistribQueueHandler
+{
+ /**
+ * Return transport keyword which identifies items this queue handler
+ * services; must be defined for all subclasses.
+ *
+ * Must be 8 characters or less to fit in the queue_item database.
+ * ex "email", "jabber", "sms", "irc", ...
+ *
+ * @return string
+ */
+
+ function transport()
+ {
+ return 'distrib';
+ }
+
+ /**
+ * Here's the meat of your queue handler -- you're handed a Notice
+ * object, which you may do as you will with.
+ *
+ * If this function indicates failure, a warning will be logged
+ * and the item is placed back in the queue to be re-run.
+ *
+ * @param Notice $notice
+ * @return boolean true on success, false on failure
+ */
+ function handle($notice)
+ {
+ // XXX: do we need to change this for remote users?
+
+ $notice->saveTags();
+
+ $groups = $notice->saveGroups();
+
+ $recipients = $notice->saveReplies();
+
+ $notice->addToInboxes($groups, $recipients);
+
+ $notice->saveUrls();
+
+ Event::handle('EndNoticeSave', array($notice));
+
+ // Enqueue for other handlers
+
+ common_enqueue_notice($notice);
+
+ return true;
+ }
+}
+
diff --git a/lib/mailhandler.php b/lib/mailhandler.php
index 85be89f18..890f6d5b4 100644
--- a/lib/mailhandler.php
+++ b/lib/mailhandler.php
@@ -160,7 +160,7 @@ class MailHandler
foreach($mediafiles as $mf){
$mf->attachToNotice($notice);
}
- common_broadcast_notice($notice);
+
$this->log(LOG_INFO,
'Added notice ' . $notice->id . ' from user ' . $user->nickname);
return true;
diff --git a/lib/oauthstore.php b/lib/oauthstore.php
index df63cc151..b30fb49d5 100644
--- a/lib/oauthstore.php
+++ b/lib/oauthstore.php
@@ -362,7 +362,7 @@ class StatusNetOAuthDataStore extends OAuthDataStore
array('is_local' => Notice::REMOTE_OMB,
'uri' => $omb_notice->getIdentifierURI()));
- common_broadcast_notice($notice, true);
+
}
/**
diff --git a/lib/queuemanager.php b/lib/queuemanager.php
index 4eb39bfa8..e5cf8239e 100644
--- a/lib/queuemanager.php
+++ b/lib/queuemanager.php
@@ -217,6 +217,7 @@ abstract class QueueManager extends IoManager
$this->connect('plugin', 'PluginQueueHandler');
$this->connect('omb', 'OmbQueueHandler');
$this->connect('ping', 'PingQueueHandler');
+ $this->connect('distrib', 'DistribQueueHandler');
if (common_config('sms', 'enabled')) {
$this->connect('sms', 'SmsQueueHandler');
}
@@ -224,7 +225,7 @@ abstract class QueueManager extends IoManager
// XMPP output handlers...
$this->connect('jabber', 'JabberQueueHandler');
$this->connect('public', 'PublicQueueHandler');
-
+
// @fixme this should get an actual queue
//$this->connect('confirm', 'XmppConfirmHandler');
diff --git a/lib/util.php b/lib/util.php
index fb3b8be87..4312f9876 100644
--- a/lib/util.php
+++ b/lib/util.php
@@ -987,7 +987,7 @@ function common_redirect($url, $code=307)
function common_broadcast_notice($notice, $remote=false)
{
- return common_enqueue_notice($notice);
+ // DO NOTHING!
}
// Stick the notice on the queue
diff --git a/plugins/Facebook/facebookaction.php b/plugins/Facebook/facebookaction.php
index bf9c037a5..815fee094 100644
--- a/plugins/Facebook/facebookaction.php
+++ b/plugins/Facebook/facebookaction.php
@@ -397,7 +397,7 @@ class FacebookAction extends Action
return;
}
- common_broadcast_notice($notice);
+
}