diff options
author | Brion Vibber <brion@pobox.com> | 2010-01-21 16:42:50 -0800 |
---|---|---|
committer | Brion Vibber <brion@pobox.com> | 2010-01-21 22:40:35 -0800 |
commit | 0e852def6ae5aa529cca0aef1187152fb5a880be (patch) | |
tree | 5b4b49327c5d7224c0c05ce08c1ddf72f1e44f13 /lib/xmppmanager.php | |
parent | 0bb23e6fd724a12bba6766949cd3294b288d8a43 (diff) |
XMPP queued output & initial retooling of DB queue manager to support non-Notice objects.
Queue handlers for XMPP individual & firehose output now send their XML stanzas
to another output queue instead of connecting directly to the chat server. This
lets us have as many general processing threads as we need, while all actual
XMPP input and output go through a single daemon with a single connection open.
This avoids problems with multiple connected resources:
* multiple windows shown in some chat clients (psi, gajim, kopete)
* extra load on server
* incoming message delivery forwarding issues
Database changes:
* queue_item drops 'notice_id' in favor of a 'frame' blob.
This is based on Craig Andrews' work branch to generalize queues to take any
object, but conservatively leaving out the serialization for now.
Table updater (preserves any existing queued items) in db/rc3to09.sql
Code changes to watch out for:
* Queue handlers should now define a handle() method instead of handle_notice()
* QueueDaemon and XmppDaemon now share common i/o (IoMaster) and respawning
thread management (RespawningDaemon) infrastructure.
* The polling XmppConfirmManager has been dropped, as the message is queued
directly when saving IM settings.
* Enable $config['queue']['debug_memory'] to output current memory usage at
each run through the event loop to watch for memory leaks
To do:
* Adapt XMPP i/o to component connection mode for multi-site support.
* XMPP input can also be broken out to a queue, which would allow the actual
notice save etc to be handled by general queue threads.
* Make sure there are no problems with simply pushing serialized Notice objects
to queues.
* Find a way to improve interactive performance of the database-backed queue
handler; polling is pretty painful to XMPP.
* Possibly redo the way QueueHandlers are injected into a QueueManager. The
grouping used to split out the XMPP output queue is a bit awkward.
Diffstat (limited to 'lib/xmppmanager.php')
-rw-r--r-- | lib/xmppmanager.php | 286 |
1 files changed, 247 insertions, 39 deletions
diff --git a/lib/xmppmanager.php b/lib/xmppmanager.php index dfff63a30..299175dd7 100644 --- a/lib/xmppmanager.php +++ b/lib/xmppmanager.php @@ -70,6 +70,7 @@ class XmppManager extends IoManager function __construct() { $this->site = common_config('site', 'server'); + $this->resource = common_config('xmpp', 'resource') . 'daemon'; } /** @@ -86,15 +87,19 @@ class XmppManager extends IoManager # Low priority; we don't want to receive messages common_log(LOG_INFO, "INITIALIZE"); - $this->conn = jabber_connect($this->resource()); + $this->conn = jabber_connect($this->resource); if (empty($this->conn)) { common_log(LOG_ERR, "Couldn't connect to server."); return false; } - $this->conn->addEventHandler('message', 'forward_message', $this); + $this->log(LOG_DEBUG, "Initializing stanza handlers."); + + $this->conn->addEventHandler('message', 'handle_message', $this); + $this->conn->addEventHandler('presence', 'handle_presence', $this); $this->conn->addEventHandler('reconnect', 'handle_reconnect', $this); + $this->conn->setReconnectTimeout(600); jabber_send_presence("Send me a message to post a notice", 'available', null, 'available', -1); @@ -176,11 +181,36 @@ class XmppManager extends IoManager } /** + * For queue handlers to pass us a message to push out, + * if we're active. + * + * @fixme should this be blocking etc? + * + * @param string $msg XML stanza to send + * @return boolean success + */ + public function send($msg) + { + if ($this->conn && !$this->conn->isDisconnected()) { + $bytes = $this->conn->send($msg); + if ($bytes > 0) { + $this->conn->processTime(0); + return true; + } else { + return false; + } + } else { + // Can't send right now... + return false; + } + } + + /** * Send a keepalive ping to the XMPP server. */ protected function sendPing() { - $jid = jabber_daemon_address().'/'.$this->resource(); + $jid = jabber_daemon_address().'/'.$this->resource; $server = common_config('xmpp', 'server'); if (!isset($this->pingid)) { @@ -206,61 +236,239 @@ class XmppManager extends IoManager $this->conn->presence(null, 'available', null, 'available', -1); } + + function get_user($from) + { + $user = User::staticGet('jabber', jabber_normalize_jid($from)); + return $user; + } + /** - * Callback for Jabber message event. - * - * This connection handles output; if we get a message straight to us, - * forward it on to our XmppDaemon listener for processing. - * - * @param $pl + * XMPP callback for handling message input... + * @param array $pl XMPP payload */ - function forward_message(&$pl) + function handle_message(&$pl) { + $from = jabber_normalize_jid($pl['from']); + if ($pl['type'] != 'chat') { - common_log(LOG_DEBUG, 'Ignoring message of type ' . $pl['type'] . ' from ' . $pl['from']); + $this->log(LOG_WARNING, "Ignoring message of type ".$pl['type']." from $from."); return; } - $listener = $this->listener(); - if (strtolower($listener) == strtolower($pl['from'])) { - common_log(LOG_WARNING, 'Ignoring loop message.'); + + if (mb_strlen($pl['body']) == 0) { + $this->log(LOG_WARNING, "Ignoring message with empty body from $from."); return; } - common_log(LOG_INFO, 'Forwarding message from ' . $pl['from'] . ' to ' . $listener); - $this->conn->message($this->listener(), $pl['body'], 'chat', null, $this->ofrom($pl['from'])); + + // Forwarded from another daemon for us to handle; this shouldn't + // happen any more but we might get some legacy items. + if ($this->is_self($from)) { + $this->log(LOG_INFO, "Got forwarded notice from self ($from)."); + $from = $this->get_ofrom($pl); + $this->log(LOG_INFO, "Originally sent by $from."); + if (is_null($from) || $this->is_self($from)) { + $this->log(LOG_INFO, "Ignoring notice originally sent by $from."); + return; + } + } + + $user = $this->get_user($from); + + // For common_current_user to work + global $_cur; + $_cur = $user; + + if (!$user) { + $this->from_site($from, 'Unknown user; go to ' . + common_local_url('imsettings') . + ' to add your address to your account'); + $this->log(LOG_WARNING, 'Message from unknown user ' . $from); + return; + } + if ($this->handle_command($user, $pl['body'])) { + $this->log(LOG_INFO, "Command message by $from handled."); + return; + } else if ($this->is_autoreply($pl['body'])) { + $this->log(LOG_INFO, 'Ignoring auto reply from ' . $from); + return; + } else if ($this->is_otr($pl['body'])) { + $this->log(LOG_INFO, 'Ignoring OTR from ' . $from); + return; + } else { + + $this->log(LOG_INFO, 'Posting a notice from ' . $user->nickname); + + $this->add_notice($user, $pl); + } + + $user->free(); + unset($user); + unset($_cur); + + unset($pl['xml']); + $pl['xml'] = null; + + $pl = null; + unset($pl); } - /** - * Build an <addresses> block with an ofrom entry for forwarded messages - * - * @param string $from Jabber ID of original sender - * @return string XML fragment - */ - protected function ofrom($from) + + function is_self($from) { - $address = "<addresses xmlns='http://jabber.org/protocol/address'>\n"; - $address .= "<address type='ofrom' jid='$from' />\n"; - $address .= "</addresses>\n"; - return $address; + return preg_match('/^'.strtolower(jabber_daemon_address()).'/', strtolower($from)); } - /** - * Build the complete JID of the XmppDaemon process which - * handles primary XMPP input for this site. - * - * @return string Jabber ID - */ - protected function listener() + function get_ofrom($pl) + { + $xml = $pl['xml']; + $addresses = $xml->sub('addresses'); + if (!$addresses) { + $this->log(LOG_WARNING, 'Forwarded message without addresses'); + return null; + } + $address = $addresses->sub('address'); + if (!$address) { + $this->log(LOG_WARNING, 'Forwarded message without address'); + return null; + } + if (!array_key_exists('type', $address->attrs)) { + $this->log(LOG_WARNING, 'No type for forwarded message'); + return null; + } + $type = $address->attrs['type']; + if ($type != 'ofrom') { + $this->log(LOG_WARNING, 'Type of forwarded message is not ofrom'); + return null; + } + if (!array_key_exists('jid', $address->attrs)) { + $this->log(LOG_WARNING, 'No jid for forwarded message'); + return null; + } + $jid = $address->attrs['jid']; + if (!$jid) { + $this->log(LOG_WARNING, 'Could not get jid from address'); + return null; + } + $this->log(LOG_DEBUG, 'Got message forwarded from jid ' . $jid); + return $jid; + } + + function is_autoreply($txt) + { + if (preg_match('/[\[\(]?[Aa]uto[-\s]?[Rr]e(ply|sponse)[\]\)]/', $txt)) { + return true; + } else if (preg_match('/^System: Message wasn\'t delivered. Offline storage size was exceeded.$/', $txt)) { + return true; + } else { + return false; + } + } + + function is_otr($txt) + { + if (preg_match('/^\?OTR/', $txt)) { + return true; + } else { + return false; + } + } + + function from_site($address, $msg) + { + $text = '['.common_config('site', 'name') . '] ' . $msg; + jabber_send_message($address, $text); + } + + function handle_command($user, $body) { - if (common_config('xmpp', 'listener')) { - return common_config('xmpp', 'listener'); + $inter = new CommandInterpreter(); + $cmd = $inter->handle_command($user, $body); + if ($cmd) { + $chan = new XMPPChannel($this->conn); + $cmd->execute($chan); + return true; } else { - return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon'; + return false; + } + } + + function add_notice(&$user, &$pl) + { + $body = trim($pl['body']); + $content_shortened = common_shorten_links($body); + if (Notice::contentTooLong($content_shortened)) { + $from = jabber_normalize_jid($pl['from']); + $this->from_site($from, sprintf(_('Message too long - maximum is %1$d characters, you sent %2$d.'), + Notice::maxContent(), + mb_strlen($content_shortened))); + return; + } + + try { + $notice = Notice::saveNew($user->id, $content_shortened, 'xmpp'); + } catch (Exception $e) { + $this->log(LOG_ERR, $e->getMessage()); + $this->from_site($user->jabber, $e->getMessage()); + return; } + + common_broadcast_notice($notice); + $this->log(LOG_INFO, + 'Added notice ' . $notice->id . ' from user ' . $user->nickname); + $notice->free(); + unset($notice); + } + + function handle_presence(&$pl) + { + $from = jabber_normalize_jid($pl['from']); + switch ($pl['type']) { + case 'subscribe': + # We let anyone subscribe + $this->subscribed($from); + $this->log(LOG_INFO, + 'Accepted subscription from ' . $from); + break; + case 'subscribed': + case 'unsubscribed': + case 'unsubscribe': + $this->log(LOG_INFO, + 'Ignoring "' . $pl['type'] . '" from ' . $from); + break; + default: + if (!$pl['type']) { + $user = User::staticGet('jabber', $from); + if (!$user) { + $this->log(LOG_WARNING, 'Presence from unknown user ' . $from); + return; + } + if ($user->updatefrompresence) { + $this->log(LOG_INFO, 'Updating ' . $user->nickname . + ' status from presence.'); + $this->add_notice($user, $pl); + } + $user->free(); + unset($user); + } + break; + } + unset($pl['xml']); + $pl['xml'] = null; + + $pl = null; + unset($pl); + } + + function log($level, $msg) + { + $text = 'XMPPDaemon('.$this->resource.'): '.$msg; + common_log($level, $text); } - protected function resource() + function subscribed($to) { - return 'queue' . posix_getpid(); // @fixme PIDs won't be host-unique + jabber_special_presence('subscribed', $to); } /** |