summaryrefslogtreecommitdiff
path: root/lib/xmppmanager.php
diff options
context:
space:
mode:
authorBrion Vibber <brion@pobox.com>2010-01-21 16:42:50 -0800
committerBrion Vibber <brion@pobox.com>2010-01-21 22:40:35 -0800
commit0e852def6ae5aa529cca0aef1187152fb5a880be (patch)
tree5b4b49327c5d7224c0c05ce08c1ddf72f1e44f13 /lib/xmppmanager.php
parent0bb23e6fd724a12bba6766949cd3294b288d8a43 (diff)
XMPP queued output & initial retooling of DB queue manager to support non-Notice objects.
Queue handlers for XMPP individual & firehose output now send their XML stanzas to another output queue instead of connecting directly to the chat server. This lets us have as many general processing threads as we need, while all actual XMPP input and output go through a single daemon with a single connection open. This avoids problems with multiple connected resources: * multiple windows shown in some chat clients (psi, gajim, kopete) * extra load on server * incoming message delivery forwarding issues Database changes: * queue_item drops 'notice_id' in favor of a 'frame' blob. This is based on Craig Andrews' work branch to generalize queues to take any object, but conservatively leaving out the serialization for now. Table updater (preserves any existing queued items) in db/rc3to09.sql Code changes to watch out for: * Queue handlers should now define a handle() method instead of handle_notice() * QueueDaemon and XmppDaemon now share common i/o (IoMaster) and respawning thread management (RespawningDaemon) infrastructure. * The polling XmppConfirmManager has been dropped, as the message is queued directly when saving IM settings. * Enable $config['queue']['debug_memory'] to output current memory usage at each run through the event loop to watch for memory leaks To do: * Adapt XMPP i/o to component connection mode for multi-site support. * XMPP input can also be broken out to a queue, which would allow the actual notice save etc to be handled by general queue threads. * Make sure there are no problems with simply pushing serialized Notice objects to queues. * Find a way to improve interactive performance of the database-backed queue handler; polling is pretty painful to XMPP. * Possibly redo the way QueueHandlers are injected into a QueueManager. The grouping used to split out the XMPP output queue is a bit awkward.
Diffstat (limited to 'lib/xmppmanager.php')
-rw-r--r--lib/xmppmanager.php286
1 files changed, 247 insertions, 39 deletions
diff --git a/lib/xmppmanager.php b/lib/xmppmanager.php
index dfff63a30..299175dd7 100644
--- a/lib/xmppmanager.php
+++ b/lib/xmppmanager.php
@@ -70,6 +70,7 @@ class XmppManager extends IoManager
function __construct()
{
$this->site = common_config('site', 'server');
+ $this->resource = common_config('xmpp', 'resource') . 'daemon';
}
/**
@@ -86,15 +87,19 @@ class XmppManager extends IoManager
# Low priority; we don't want to receive messages
common_log(LOG_INFO, "INITIALIZE");
- $this->conn = jabber_connect($this->resource());
+ $this->conn = jabber_connect($this->resource);
if (empty($this->conn)) {
common_log(LOG_ERR, "Couldn't connect to server.");
return false;
}
- $this->conn->addEventHandler('message', 'forward_message', $this);
+ $this->log(LOG_DEBUG, "Initializing stanza handlers.");
+
+ $this->conn->addEventHandler('message', 'handle_message', $this);
+ $this->conn->addEventHandler('presence', 'handle_presence', $this);
$this->conn->addEventHandler('reconnect', 'handle_reconnect', $this);
+
$this->conn->setReconnectTimeout(600);
jabber_send_presence("Send me a message to post a notice", 'available', null, 'available', -1);
@@ -176,11 +181,36 @@ class XmppManager extends IoManager
}
/**
+ * For queue handlers to pass us a message to push out,
+ * if we're active.
+ *
+ * @fixme should this be blocking etc?
+ *
+ * @param string $msg XML stanza to send
+ * @return boolean success
+ */
+ public function send($msg)
+ {
+ if ($this->conn && !$this->conn->isDisconnected()) {
+ $bytes = $this->conn->send($msg);
+ if ($bytes > 0) {
+ $this->conn->processTime(0);
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ // Can't send right now...
+ return false;
+ }
+ }
+
+ /**
* Send a keepalive ping to the XMPP server.
*/
protected function sendPing()
{
- $jid = jabber_daemon_address().'/'.$this->resource();
+ $jid = jabber_daemon_address().'/'.$this->resource;
$server = common_config('xmpp', 'server');
if (!isset($this->pingid)) {
@@ -206,61 +236,239 @@ class XmppManager extends IoManager
$this->conn->presence(null, 'available', null, 'available', -1);
}
+
+ function get_user($from)
+ {
+ $user = User::staticGet('jabber', jabber_normalize_jid($from));
+ return $user;
+ }
+
/**
- * Callback for Jabber message event.
- *
- * This connection handles output; if we get a message straight to us,
- * forward it on to our XmppDaemon listener for processing.
- *
- * @param $pl
+ * XMPP callback for handling message input...
+ * @param array $pl XMPP payload
*/
- function forward_message(&$pl)
+ function handle_message(&$pl)
{
+ $from = jabber_normalize_jid($pl['from']);
+
if ($pl['type'] != 'chat') {
- common_log(LOG_DEBUG, 'Ignoring message of type ' . $pl['type'] . ' from ' . $pl['from']);
+ $this->log(LOG_WARNING, "Ignoring message of type ".$pl['type']." from $from.");
return;
}
- $listener = $this->listener();
- if (strtolower($listener) == strtolower($pl['from'])) {
- common_log(LOG_WARNING, 'Ignoring loop message.');
+
+ if (mb_strlen($pl['body']) == 0) {
+ $this->log(LOG_WARNING, "Ignoring message with empty body from $from.");
return;
}
- common_log(LOG_INFO, 'Forwarding message from ' . $pl['from'] . ' to ' . $listener);
- $this->conn->message($this->listener(), $pl['body'], 'chat', null, $this->ofrom($pl['from']));
+
+ // Forwarded from another daemon for us to handle; this shouldn't
+ // happen any more but we might get some legacy items.
+ if ($this->is_self($from)) {
+ $this->log(LOG_INFO, "Got forwarded notice from self ($from).");
+ $from = $this->get_ofrom($pl);
+ $this->log(LOG_INFO, "Originally sent by $from.");
+ if (is_null($from) || $this->is_self($from)) {
+ $this->log(LOG_INFO, "Ignoring notice originally sent by $from.");
+ return;
+ }
+ }
+
+ $user = $this->get_user($from);
+
+ // For common_current_user to work
+ global $_cur;
+ $_cur = $user;
+
+ if (!$user) {
+ $this->from_site($from, 'Unknown user; go to ' .
+ common_local_url('imsettings') .
+ ' to add your address to your account');
+ $this->log(LOG_WARNING, 'Message from unknown user ' . $from);
+ return;
+ }
+ if ($this->handle_command($user, $pl['body'])) {
+ $this->log(LOG_INFO, "Command message by $from handled.");
+ return;
+ } else if ($this->is_autoreply($pl['body'])) {
+ $this->log(LOG_INFO, 'Ignoring auto reply from ' . $from);
+ return;
+ } else if ($this->is_otr($pl['body'])) {
+ $this->log(LOG_INFO, 'Ignoring OTR from ' . $from);
+ return;
+ } else {
+
+ $this->log(LOG_INFO, 'Posting a notice from ' . $user->nickname);
+
+ $this->add_notice($user, $pl);
+ }
+
+ $user->free();
+ unset($user);
+ unset($_cur);
+
+ unset($pl['xml']);
+ $pl['xml'] = null;
+
+ $pl = null;
+ unset($pl);
}
- /**
- * Build an <addresses> block with an ofrom entry for forwarded messages
- *
- * @param string $from Jabber ID of original sender
- * @return string XML fragment
- */
- protected function ofrom($from)
+
+ function is_self($from)
{
- $address = "<addresses xmlns='http://jabber.org/protocol/address'>\n";
- $address .= "<address type='ofrom' jid='$from' />\n";
- $address .= "</addresses>\n";
- return $address;
+ return preg_match('/^'.strtolower(jabber_daemon_address()).'/', strtolower($from));
}
- /**
- * Build the complete JID of the XmppDaemon process which
- * handles primary XMPP input for this site.
- *
- * @return string Jabber ID
- */
- protected function listener()
+ function get_ofrom($pl)
+ {
+ $xml = $pl['xml'];
+ $addresses = $xml->sub('addresses');
+ if (!$addresses) {
+ $this->log(LOG_WARNING, 'Forwarded message without addresses');
+ return null;
+ }
+ $address = $addresses->sub('address');
+ if (!$address) {
+ $this->log(LOG_WARNING, 'Forwarded message without address');
+ return null;
+ }
+ if (!array_key_exists('type', $address->attrs)) {
+ $this->log(LOG_WARNING, 'No type for forwarded message');
+ return null;
+ }
+ $type = $address->attrs['type'];
+ if ($type != 'ofrom') {
+ $this->log(LOG_WARNING, 'Type of forwarded message is not ofrom');
+ return null;
+ }
+ if (!array_key_exists('jid', $address->attrs)) {
+ $this->log(LOG_WARNING, 'No jid for forwarded message');
+ return null;
+ }
+ $jid = $address->attrs['jid'];
+ if (!$jid) {
+ $this->log(LOG_WARNING, 'Could not get jid from address');
+ return null;
+ }
+ $this->log(LOG_DEBUG, 'Got message forwarded from jid ' . $jid);
+ return $jid;
+ }
+
+ function is_autoreply($txt)
+ {
+ if (preg_match('/[\[\(]?[Aa]uto[-\s]?[Rr]e(ply|sponse)[\]\)]/', $txt)) {
+ return true;
+ } else if (preg_match('/^System: Message wasn\'t delivered. Offline storage size was exceeded.$/', $txt)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ function is_otr($txt)
+ {
+ if (preg_match('/^\?OTR/', $txt)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ function from_site($address, $msg)
+ {
+ $text = '['.common_config('site', 'name') . '] ' . $msg;
+ jabber_send_message($address, $text);
+ }
+
+ function handle_command($user, $body)
{
- if (common_config('xmpp', 'listener')) {
- return common_config('xmpp', 'listener');
+ $inter = new CommandInterpreter();
+ $cmd = $inter->handle_command($user, $body);
+ if ($cmd) {
+ $chan = new XMPPChannel($this->conn);
+ $cmd->execute($chan);
+ return true;
} else {
- return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon';
+ return false;
+ }
+ }
+
+ function add_notice(&$user, &$pl)
+ {
+ $body = trim($pl['body']);
+ $content_shortened = common_shorten_links($body);
+ if (Notice::contentTooLong($content_shortened)) {
+ $from = jabber_normalize_jid($pl['from']);
+ $this->from_site($from, sprintf(_('Message too long - maximum is %1$d characters, you sent %2$d.'),
+ Notice::maxContent(),
+ mb_strlen($content_shortened)));
+ return;
+ }
+
+ try {
+ $notice = Notice::saveNew($user->id, $content_shortened, 'xmpp');
+ } catch (Exception $e) {
+ $this->log(LOG_ERR, $e->getMessage());
+ $this->from_site($user->jabber, $e->getMessage());
+ return;
}
+
+ common_broadcast_notice($notice);
+ $this->log(LOG_INFO,
+ 'Added notice ' . $notice->id . ' from user ' . $user->nickname);
+ $notice->free();
+ unset($notice);
+ }
+
+ function handle_presence(&$pl)
+ {
+ $from = jabber_normalize_jid($pl['from']);
+ switch ($pl['type']) {
+ case 'subscribe':
+ # We let anyone subscribe
+ $this->subscribed($from);
+ $this->log(LOG_INFO,
+ 'Accepted subscription from ' . $from);
+ break;
+ case 'subscribed':
+ case 'unsubscribed':
+ case 'unsubscribe':
+ $this->log(LOG_INFO,
+ 'Ignoring "' . $pl['type'] . '" from ' . $from);
+ break;
+ default:
+ if (!$pl['type']) {
+ $user = User::staticGet('jabber', $from);
+ if (!$user) {
+ $this->log(LOG_WARNING, 'Presence from unknown user ' . $from);
+ return;
+ }
+ if ($user->updatefrompresence) {
+ $this->log(LOG_INFO, 'Updating ' . $user->nickname .
+ ' status from presence.');
+ $this->add_notice($user, $pl);
+ }
+ $user->free();
+ unset($user);
+ }
+ break;
+ }
+ unset($pl['xml']);
+ $pl['xml'] = null;
+
+ $pl = null;
+ unset($pl);
+ }
+
+ function log($level, $msg)
+ {
+ $text = 'XMPPDaemon('.$this->resource.'): '.$msg;
+ common_log($level, $text);
}
- protected function resource()
+ function subscribed($to)
{
- return 'queue' . posix_getpid(); // @fixme PIDs won't be host-unique
+ jabber_special_presence('subscribed', $to);
}
/**