diff options
author | Evan Prodromou <evan@prodromou.name> | 2008-08-29 16:03:52 -0400 |
---|---|---|
committer | Evan Prodromou <evan@prodromou.name> | 2008-08-29 16:03:52 -0400 |
commit | f5fe013657ced6920e3fe794a2a8d4122a85e377 (patch) | |
tree | 3495a0e116cbcbe052921634cd0c940c1920a3f2 | |
parent | a0f726fb6cc42bd1390ab08164329f60e5333435 (diff) |
many jabber queue management changes
Added a method to QueueManager to let subclasses do stuff when idle.
Needed so that XMPP queue manager can service its message queue.
Cleaned up jabber_broadcast_message quite a bit. Use custom joins
instead of loop-and-query, should fix some problems with users who are
getting messages even after turning off notification. Only build $msg
and $entry once, and use the XMPPHP function for messages with a
payload, rather than rolling our own.
darcs-hash:20080829200352-84dde-427e4ca8c81d4222a36f78e7c580b611ff0bf765.gz
-rw-r--r-- | lib/jabber.php | 126 | ||||
-rw-r--r-- | lib/queuehandler.php | 14 | ||||
-rwxr-xr-x | scripts/xmppqueuehandler.php | 11 |
3 files changed, 77 insertions, 74 deletions
diff --git a/lib/jabber.php b/lib/jabber.php index 171dff4df..b943b6f78 100644 --- a/lib/jabber.php +++ b/lib/jabber.php @@ -25,25 +25,6 @@ require_once('XMPPHP/XMPP.php'); class Laconica_XMPP extends XMPPHP_XMPP { - function messageplus($to, $body, $type = 'chat', $subject = null, $payload = null) { - $to = htmlspecialchars($to); - $body = htmlspecialchars($body); - $subject = htmlspecialchars($subject); - - $jid = jabber_daemon_address(); - - $out = "<message from='$jid' to='$to' type='$type'>"; - if($subject) $out .= "<subject>$subject</subject>"; - $out .= "<body>$body</body>"; - if($payload) $out .= $payload; - $out .= "</message>"; - - $cnt = strlen($out); - common_log(LOG_DEBUG, "Sending $cnt chars to $to"); - $this->send($out); - common_log(LOG_DEBUG, 'Done.'); - } - public function presence($status = null, $show = 'available', $to = null, $type='available', $priority=NULL) { if($type == 'available') $type = ''; $to = htmlspecialchars($to); @@ -134,7 +115,7 @@ function jabber_send_notice($to, $notice) { } $msg = jabber_format_notice($profile, $notice); $entry = jabber_format_entry($profile, $notice); - $conn->messageplus($to, $msg, 'chat', NULL, $entry); + $conn->message($to, $msg, 'chat', NULL, $entry); return true; } @@ -142,6 +123,8 @@ function jabber_send_notice($to, $notice) { function jabber_format_entry($profile, $notice) { + # FIXME: notice url might be remote + $noticeurl = common_local_url('shownotice', array('notice' => $notice->id)); $msg = jabber_format_notice($profile, $notice); @@ -168,6 +151,10 @@ function jabber_format_entry($profile, $notice) { $html .= "\n</body>\n"; $html .= "\n</html>\n"; + $address = "<addresses xmlns='http://jabber.org/protocol/address'>\n"; + $address .= "<address type='replyto' jid='" . jabber_daemon_address() . "' />\n"; + $address .= "</addresses>\n"; + $event = "<event xmlns='http://jabber.org/protocol/pubsub#event'>\n"; $event .= "<items xmlns='http://jabber.org/protocol/pubsub' "; $event .= "node='" . common_local_url('public') . "'>\n"; @@ -175,7 +162,7 @@ function jabber_format_entry($profile, $notice) { $event .= "</items>\n"; $event .= "</event>\n"; # FIXME: include the pubsub event, too. - return $html . $entry; + return $html . $entry . $address; # return $entry . "\n" . $event; } @@ -234,63 +221,54 @@ function jabber_broadcast_notice($notice) { return true; } $profile = Profile::staticGet($notice->profile_id); + if (!$profile) { common_log(LOG_WARNING, 'Refusing to broadcast notice with ' . 'unknown profile ' . common_log_objstring($notice), __FILE__); return false; } + + $msg = jabber_format_notice($profile, $notice); + $entry = jabber_format_entry($profile, $notice); + $sent_to = array(); - # First, get users who this is a direct reply to - $reply = new Reply(); - $reply->notice_id = $notice->id; - if ($reply->find()) { - while ($reply->fetch()) { - $user = User::staticGet($reply->profile_id); - if ($user && $user->jabber && $user->jabbernotify && $user->jabberreplies) { - common_log(LOG_INFO, - 'Sending reply notice ' . $notice->id . ' to ' . $user->jabber, - __FILE__); - $success = jabber_send_notice($user->jabber, $notice); - if ($success) { - # Remember so we don't send twice - $sent_to[$user->id] = true; - } else { - # XXX: Not sure, but I think that's the right thing to do - common_log(LOG_WARNING, - 'Sending reply notice ' . $notice->id . ' to ' . $user->jabber . ' FAILED, cancelling.', - __FILE__); - return false; - } - } - } + $conn = jabber_connect(); + + # First, get users to whom this is a direct reply + $user = new User(); + $user->query('SELECT user.id, user.jabber ' . + 'FROM user JOIN reply ON user.id = reply.profile_id ' . + 'WHERE reply.notice_id = ' . $notice->id . ' ' . + 'AND user.jabber is not null ' . + 'AND user.jabbernotify = 1 ' . + 'AND user.jabberreplies = 1 '); + + while ($user->fetch()) { + common_log(LOG_INFO, + 'Sending reply notice ' . $notice->id . ' to ' . $user->jabber, + __FILE__); + $conn->message($user->jabber, $msg, 'chat', NULL, $entry); } + # Now, get users subscribed to this profile - # XXX: use a join here rather than looping through results - $sub = new Subscription(); - $sub->subscribed = $notice->profile_id; - - if ($sub->find()) { - while ($sub->fetch()) { - $user = User::staticGet($sub->subscriber); - if ($user && $user->jabber && $user->jabbernotify && !array_key_exists($user->id,$sent_to)) { - common_log(LOG_INFO, - 'Sending notice ' . $notice->id . ' to ' . $user->jabber, - __FILE__); - $success = jabber_send_notice($user->jabber, $notice); - if ($success) { - $sent_to[$user->id] = true; - } else { - # XXX: Not sure, but I think that's the right thing to do - common_log(LOG_WARNING, - 'Sending notice ' . $notice->id . ' to ' . $user->jabber . ' FAILED, cancelling.', - __FILE__); - return false; - } - } + + $user = new User(); + $user->query('SELECT user.id, user.jabber ' . + 'FROM user JOIN subscription ON user.id = subscription.subscriber ' . + 'WHERE subscription.subscribed = ' . $notice->profile_id . ' ' . + 'AND user.jabber is not null ' . + 'AND user.jabbernotify = 1 '); + + while ($user->fetch()) { + if (!array_key_exists($user->id, $sent_to)) { + common_log(LOG_INFO, + 'Sending notice ' . $notice->id . ' to ' . $user->jabber, + __FILE__); + $conn->message($user->jabber, $msg, 'chat', NULL, $entry); } } - + return true; } @@ -305,11 +283,17 @@ function jabber_public_notice($notice) { # = false? I think not if ($public && $notice->is_local) { + $msg = jabber_format_notice($profile, $notice); + $entry = jabber_format_entry($profile, $notice); + + $sent_to = array(); + $conn = jabber_connect(); + foreach ($public as $address) { - common_log(LOG_INFO, - 'Sending notice ' . $notice->id . ' to public listener ' . $address, - __FILE__); - jabber_send_notice($address, $notice); + common_log(LOG_INFO, + 'Sending notice ' . $notice->id . ' to public listener ' . $address, + __FILE__); + $conn->message($address, $msg, 'chat', NULL, $entry); } } diff --git a/lib/queuehandler.php b/lib/queuehandler.php index 747e7b493..470b595ba 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -57,7 +57,6 @@ class QueueHandler { function handle_queue() { $this->log(LOG_INFO, 'checking for queued notices'); - $cnt = 0; $transport = $this->transport(); do { $qi = Queue_item::top($transport); @@ -82,14 +81,23 @@ class QueueHandler { $this->log(LOG_WARNING, 'queue item for notice that does not exist'); } $qi->delete(); - $cnt++; + $this->idle(); } else { $this->clear_old_claims(); - sleep(10); + $start = microtime(); + $this->idle(); + $used = microtime() - $start; + if ($used < 5000000) { + usleep(5000000 - $used); + } } } while (true); } + function idle() { + return true; + } + function clear_old_claims() { $qi = new Queue_item(); $qi->transport = $this->transport(); diff --git a/scripts/xmppqueuehandler.php b/scripts/xmppqueuehandler.php index 3f632d5a8..83928982b 100755 --- a/scripts/xmppqueuehandler.php +++ b/scripts/xmppqueuehandler.php @@ -34,6 +34,8 @@ require_once(INSTALLDIR . '/lib/queuehandler.php'); set_error_handler('common_error_handler'); class XmppQueueHandler extends QueueHandler { + + var $conn = NULL; function transport() { return 'jabber'; @@ -42,15 +44,24 @@ class XmppQueueHandler extends QueueHandler { function start() { # Low priority; we don't want to receive messages $this->conn = jabber_connect($this->_id, NULL, -1); + $this->conn->addEventHandler('message', 'forward_message', $this); return !is_null($this->conn); } function handle_notice($notice) { return jabber_broadcast_notice($notice); } + + function idle() { + # Process the queue for a second + $this->conn->processTime(1); + } function finish() { } + + function forward_message(&$pl) { + } } mb_internal_encoding('UTF-8'); |