summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorEvan Prodromou <evan@status.net>2010-01-22 14:18:21 -0500
committerEvan Prodromou <evan@status.net>2010-01-22 14:18:21 -0500
commit29d83c8ca94201cb010b5aef564df78ab868ea0c (patch)
tree8bc66e2fc03865e287a4c1907e766d6b4311c832 /lib
parent104d300799c0817bdbe893f08fd9b46d37a75a80 (diff)
parent0e852def6ae5aa529cca0aef1187152fb5a880be (diff)
Merge branch 'testing' of git@gitorious.org:statusnet/mainline into testing
Diffstat (limited to 'lib')
-rw-r--r--lib/dbqueuemanager.php140
-rw-r--r--lib/default.php5
-rw-r--r--lib/iomaster.php39
-rw-r--r--lib/jabber.php43
-rw-r--r--lib/jabberqueuehandler.php4
-rw-r--r--lib/ombqueuehandler.php2
-rw-r--r--lib/pingqueuehandler.php2
-rw-r--r--lib/pluginqueuehandler.php2
-rw-r--r--lib/publicqueuehandler.php6
-rw-r--r--lib/queued_xmpp.php117
-rw-r--r--lib/queuehandler.php95
-rw-r--r--lib/queuemanager.php125
-rw-r--r--lib/smsqueuehandler.php2
-rw-r--r--lib/spawningdaemon.php159
-rw-r--r--lib/stompqueuemanager.php56
-rw-r--r--lib/util.php3
-rw-r--r--lib/xmppconfirmmanager.php168
-rw-r--r--lib/xmppmanager.php286
-rw-r--r--lib/xmppoutqueuehandler.php55
19 files changed, 824 insertions, 485 deletions
diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php
index 889365b64..c6350fc66 100644
--- a/lib/dbqueuemanager.php
+++ b/lib/dbqueuemanager.php
@@ -31,19 +31,17 @@
class DBQueueManager extends QueueManager
{
/**
- * Saves a notice object reference into the queue item table.
+ * Saves an object reference into the queue item table.
* @return boolean true on success
* @throws ServerException on failure
*/
public function enqueue($object, $queue)
{
- $notice = $object;
-
$qi = new Queue_item();
- $qi->notice_id = $notice->id;
+ $qi->frame = $this->encode($object);
$qi->transport = $queue;
- $qi->created = $notice->created;
+ $qi->created = common_sql_now();
$result = $qi->insert();
if (!$result) {
@@ -57,146 +55,92 @@ class DBQueueManager extends QueueManager
}
/**
- * Poll every minute for new events during idle periods.
+ * Poll every 10 seconds for new events during idle periods.
* We'll look in more often when there's data available.
*
* @return int seconds
*/
public function pollInterval()
{
- return 60;
+ return 10;
}
/**
* Run a polling cycle during idle processing in the input loop.
- * @return boolean true if we had a hit
+ * @return boolean true if we should poll again for more data immediately
*/
public function poll()
{
$this->_log(LOG_DEBUG, 'Checking for notices...');
- $item = $this->_nextItem();
- if ($item === false) {
+ $qi = Queue_item::top($this->getQueues());
+ if (empty($qi)) {
$this->_log(LOG_DEBUG, 'No notices waiting; idling.');
return false;
}
- if ($item === true) {
- // We dequeued an entry for a deleted or invalid notice.
- // Consider it a hit for poll rate purposes.
- return true;
- }
- list($queue, $notice) = $item;
- $this->_log(LOG_INFO, 'Got notice '. $notice->id . ' for transport ' . $queue);
-
- // Yay! Got one!
- $handler = $this->getHandler($queue);
- if ($handler) {
- if ($handler->handle_notice($notice)) {
- $this->_log(LOG_INFO, "[$queue:notice $notice->id] Successfully handled notice");
- $this->_done($notice, $queue);
+ $queue = $qi->transport;
+ $item = $this->decode($qi->frame);
+
+ if ($item) {
+ $rep = $this->logrep($item);
+ $this->_log(LOG_INFO, "Got $rep for transport $queue");
+
+ $handler = $this->getHandler($queue);
+ if ($handler) {
+ if ($handler->handle($item)) {
+ $this->_log(LOG_INFO, "[$queue:$rep] Successfully handled item");
+ $this->_done($qi);
+ } else {
+ $this->_log(LOG_INFO, "[$queue:$rep] Failed to handle item");
+ $this->_fail($qi);
+ }
} else {
- $this->_log(LOG_INFO, "[$queue:notice $notice->id] Failed to handle notice");
- $this->_fail($notice, $queue);
+ $this->_log(LOG_INFO, "[$queue:$rep] No handler for queue $queue; discarding.");
+ $this->_done($qi);
}
} else {
- $this->_log(LOG_INFO, "[$queue:notice $notice->id] No handler for queue $queue; discarding.");
- $this->_done($notice, $queue);
+ $this->_log(LOG_INFO, "[$queue] Got empty/deleted item, discarding");
+ $this->_fail($qi);
}
return true;
}
/**
- * Pop the oldest unclaimed item off the queue set and claim it.
- *
- * @return mixed false if no items; true if bogus hit; otherwise array(string, Notice)
- * giving the queue transport name.
- */
- protected function _nextItem()
- {
- $start = time();
- $result = null;
-
- $qi = Queue_item::top();
- if (empty($qi)) {
- return false;
- }
-
- $queue = $qi->transport;
- $notice = Notice::staticGet('id', $qi->notice_id);
- if (empty($notice)) {
- $this->_log(LOG_INFO, "[$queue:notice $notice->id] dequeued non-existent notice");
- $qi->delete();
- return true;
- }
-
- $result = $notice;
- return array($queue, $notice);
- }
-
- /**
* Delete our claimed item from the queue after successful processing.
*
- * @param Notice $object
- * @param string $queue
+ * @param QueueItem $qi
*/
- protected function _done($object, $queue)
+ protected function _done($qi)
{
- // XXX: right now, we only handle notices
-
- $notice = $object;
-
- $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
- 'transport' => $queue));
+ $queue = $qi->transport;
- if (empty($qi)) {
- $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item");
- } else {
- if (empty($qi->claimed)) {
- $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Reluctantly releasing unclaimed queue item");
- }
- $qi->delete();
- $qi->free();
+ if (empty($qi->claimed)) {
+ $this->_log(LOG_WARNING, "Reluctantly releasing unclaimed queue item $qi->id from $qi->queue");
}
+ $qi->delete();
- $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with item");
$this->stats('handled', $queue);
-
- $notice->free();
}
/**
* Free our claimed queue item for later reprocessing in case of
* temporary failure.
*
- * @param Notice $object
- * @param string $queue
+ * @param QueueItem $qi
*/
- protected function _fail($object, $queue)
+ protected function _fail($qi)
{
- // XXX: right now, we only handle notices
-
- $notice = $object;
-
- $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
- 'transport' => $queue));
+ $queue = $qi->transport;
- if (empty($qi)) {
- $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item");
+ if (empty($qi->claimed)) {
+ $this->_log(LOG_WARNING, "[$queue:item $qi->id] Ignoring failure for unclaimed queue item");
} else {
- if (empty($qi->claimed)) {
- $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Ignoring failure for unclaimed queue item");
- } else {
- $orig = clone($qi);
- $qi->claimed = null;
- $qi->update($orig);
- $qi = null;
- }
+ $orig = clone($qi);
+ $qi->claimed = null;
+ $qi->update($orig);
}
- $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with queue item");
$this->stats('error', $queue);
-
- $notice->free();
}
protected function _log($level, $msg)
diff --git a/lib/default.php b/lib/default.php
index 5b2ae6c7c..0cb70de2d 100644
--- a/lib/default.php
+++ b/lib/default.php
@@ -67,7 +67,9 @@ $default =
'db_driver' => 'DB', # XXX: JanRain libs only work with DB
'quote_identifiers' => false,
'type' => 'mysql',
- 'schemacheck' => 'runtime'), // 'runtime' or 'script'
+ 'schemacheck' => 'runtime', // 'runtime' or 'script'
+ 'log_queries' => false, // true to log all DB queries
+ 'log_slow_queries' => 0), // if set, log queries taking over N seconds
'syslog' =>
array('appname' => 'statusnet', # for syslog
'priority' => 'debug', # XXX: currently ignored
@@ -81,6 +83,7 @@ $default =
'stomp_password' => null,
'monitor' => null, // URL to monitor ping endpoint (work in progress)
'softlimit' => '90%', // total size or % of memory_limit at which to restart queue threads gracefully
+ 'debug_memory' => false, // true to spit memory usage to log
),
'license' =>
array('type' => 'cc', # can be 'cc', 'allrightsreserved', 'private'
diff --git a/lib/iomaster.php b/lib/iomaster.php
index ce77b53b2..004e92b3e 100644
--- a/lib/iomaster.php
+++ b/lib/iomaster.php
@@ -27,7 +27,7 @@
* @link http://status.net/
*/
-class IoMaster
+abstract class IoMaster
{
public $id;
@@ -66,24 +66,19 @@ class IoMaster
if ($site != common_config('site', 'server')) {
StatusNet::init($site);
}
-
- $classes = array();
- if (Event::handle('StartIoManagerClasses', array(&$classes))) {
- $classes[] = 'QueueManager';
- if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) {
- $classes[] = 'XmppManager'; // handles pings/reconnects
- $classes[] = 'XmppConfirmManager'; // polls for outgoing confirmations
- }
- }
- Event::handle('EndIoManagerClasses', array(&$classes));
-
- foreach ($classes as $class) {
- $this->instantiate($class);
- }
+ $this->initManagers();
}
}
/**
+ * Initialize IoManagers for the currently configured site
+ * which are appropriate to this instance.
+ *
+ * Pass class names into $this->instantiate()
+ */
+ abstract function initManagers();
+
+ /**
* Pull all local sites from status_network table.
* @return array of hostnames
*/
@@ -170,7 +165,7 @@ class IoMaster
$write = array();
$except = array();
$this->logState('listening');
- common_log(LOG_INFO, "Waiting up to $timeout seconds for socket data...");
+ common_log(LOG_DEBUG, "Waiting up to $timeout seconds for socket data...");
$ready = stream_select($read, $write, $except, $timeout, 0);
if ($ready === false) {
@@ -190,7 +185,7 @@ class IoMaster
if ($timeout > 0 && empty($sockets)) {
// If we had no listeners, sleep until the pollers' next requested wakeup.
- common_log(LOG_INFO, "Sleeping $timeout seconds until next poll cycle...");
+ common_log(LOG_DEBUG, "Sleeping $timeout seconds until next poll cycle...");
$this->logState('sleep');
sleep($timeout);
}
@@ -207,6 +202,8 @@ class IoMaster
if ($usage > $memoryLimit) {
common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
break;
+ } else if (common_config('queue', 'debug_memory')) {
+ common_log(LOG_DEBUG, "Memory usage $usage");
}
}
}
@@ -223,8 +220,7 @@ class IoMaster
{
$softLimit = trim(common_config('queue', 'softlimit'));
if (substr($softLimit, -1) == '%') {
- $limit = trim(ini_get('memory_limit'));
- $limit = $this->parseMemoryLimit($limit);
+ $limit = $this->parseMemoryLimit(ini_get('memory_limit'));
if ($limit > 0) {
return intval(substr($softLimit, 0, -1) * $limit / 100);
} else {
@@ -242,9 +238,10 @@ class IoMaster
* @param string $mem
* @return int
*/
- protected function parseMemoryLimit($mem)
+ public function parseMemoryLimit($mem)
{
// http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
+ $mem = strtolower(trim($mem));
$size = array('k' => 1024,
'm' => 1024*1024,
'g' => 1024*1024*1024);
@@ -253,7 +250,7 @@ class IoMaster
} else if (is_numeric($mem)) {
return intval($mem);
} else {
- $mult = strtolower(substr($mem, -1));
+ $mult = substr($mem, -1);
if (isset($size[$mult])) {
return substr($mem, 0, -1) * $size[$mult];
} else {
diff --git a/lib/jabber.php b/lib/jabber.php
index 4cdfa6746..b6b23521b 100644
--- a/lib/jabber.php
+++ b/lib/jabber.php
@@ -86,6 +86,27 @@ class Sharing_XMPP extends XMPPHP_XMPP
}
/**
+ * Build an XMPP proxy connection that'll save outgoing messages
+ * to the 'xmppout' queue to be picked up by xmppdaemon later.
+ */
+function jabber_proxy()
+{
+ $proxy = new Queued_XMPP(common_config('xmpp', 'host') ?
+ common_config('xmpp', 'host') :
+ common_config('xmpp', 'server'),
+ common_config('xmpp', 'port'),
+ common_config('xmpp', 'user'),
+ common_config('xmpp', 'password'),
+ common_config('xmpp', 'resource') . 'daemon',
+ common_config('xmpp', 'server'),
+ common_config('xmpp', 'debug') ?
+ true : false,
+ common_config('xmpp', 'debug') ?
+ XMPPHP_Log::LEVEL_VERBOSE : null);
+ return $proxy;
+}
+
+/**
* Lazy-connect the configured Jabber account to the configured server;
* if already opened, the same connection will be returned.
*
@@ -143,7 +164,7 @@ function jabber_connect($resource=null)
}
/**
- * send a single notice to a given Jabber address
+ * Queue send for a single notice to a given Jabber address
*
* @param string $to JID to send the notice to
* @param Notice $notice notice to send
@@ -153,10 +174,7 @@ function jabber_connect($resource=null)
function jabber_send_notice($to, $notice)
{
- $conn = jabber_connect();
- if (!$conn) {
- return false;
- }
+ $conn = jabber_proxy();
$profile = Profile::staticGet($notice->profile_id);
if (!$profile) {
common_log(LOG_WARNING, 'Refusing to send notice with ' .
@@ -221,10 +239,7 @@ function jabber_format_entry($profile, $notice)
function jabber_send_message($to, $body, $type='chat', $subject=null)
{
- $conn = jabber_connect();
- if (!$conn) {
- return false;
- }
+ $conn = jabber_proxy();
$conn->message($to, $body, $type, $subject);
return true;
}
@@ -319,7 +334,7 @@ function jabber_special_presence($type, $to=null, $show=null, $status=null)
}
/**
- * broadcast a notice to all subscribers and reply recipients
+ * Queue broadcast of a notice to all subscribers and reply recipients
*
* This function will send a notice to all subscribers on the local server
* who have Jabber addresses, and have Jabber notification enabled, and
@@ -354,7 +369,7 @@ function jabber_broadcast_notice($notice)
$sent_to = array();
- $conn = jabber_connect();
+ $conn = jabber_proxy();
$ni = $notice->whoGets();
@@ -389,14 +404,13 @@ function jabber_broadcast_notice($notice)
'Sending notice ' . $notice->id . ' to ' . $user->jabber,
__FILE__);
$conn->message($user->jabber, $msg, 'chat', null, $entry);
- $conn->processTime(0);
}
return true;
}
/**
- * send a notice to all public listeners
+ * Queue send of a notice to all public listeners
*
* For notices that are generated on the local system (by users), we can optionally
* forward them to remote listeners by XMPP.
@@ -429,7 +443,7 @@ function jabber_public_notice($notice)
$msg = jabber_format_notice($profile, $notice);
$entry = jabber_format_entry($profile, $notice);
- $conn = jabber_connect();
+ $conn = jabber_proxy();
foreach ($public as $address) {
common_log(LOG_INFO,
@@ -437,7 +451,6 @@ function jabber_public_notice($notice)
' to public listener ' . $address,
__FILE__);
$conn->message($address, $msg, 'chat', null, $entry);
- $conn->processTime(0);
}
$profile->free();
}
diff --git a/lib/jabberqueuehandler.php b/lib/jabberqueuehandler.php
index b1518866d..83471f2df 100644
--- a/lib/jabberqueuehandler.php
+++ b/lib/jabberqueuehandler.php
@@ -34,14 +34,14 @@ class JabberQueueHandler extends QueueHandler
return 'jabber';
}
- function handle_notice($notice)
+ function handle($notice)
{
require_once(INSTALLDIR.'/lib/jabber.php');
try {
return jabber_broadcast_notice($notice);
} catch (XMPPHP_Exception $e) {
$this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
- exit(1);
+ return false;
}
}
}
diff --git a/lib/ombqueuehandler.php b/lib/ombqueuehandler.php
index 3ffc1313b..24896c784 100644
--- a/lib/ombqueuehandler.php
+++ b/lib/ombqueuehandler.php
@@ -36,7 +36,7 @@ class OmbQueueHandler extends QueueHandler
* @fixme doesn't currently report failure back to the queue manager
* because omb_broadcast_notice() doesn't report it to us
*/
- function handle_notice($notice)
+ function handle($notice)
{
if ($this->is_remote($notice)) {
$this->log(LOG_DEBUG, 'Ignoring remote notice ' . $notice->id);
diff --git a/lib/pingqueuehandler.php b/lib/pingqueuehandler.php
index 8bb218078..4e4d74cb1 100644
--- a/lib/pingqueuehandler.php
+++ b/lib/pingqueuehandler.php
@@ -30,7 +30,7 @@ class PingQueueHandler extends QueueHandler {
return 'ping';
}
- function handle_notice($notice) {
+ function handle($notice) {
require_once INSTALLDIR . '/lib/ping.php';
return ping_broadcast_notice($notice);
}
diff --git a/lib/pluginqueuehandler.php b/lib/pluginqueuehandler.php
index 24d504699..9653ccad4 100644
--- a/lib/pluginqueuehandler.php
+++ b/lib/pluginqueuehandler.php
@@ -42,7 +42,7 @@ class PluginQueueHandler extends QueueHandler
return 'plugin';
}
- function handle_notice($notice)
+ function handle($notice)
{
Event::handle('HandleQueuedNotice', array(&$notice));
return true;
diff --git a/lib/publicqueuehandler.php b/lib/publicqueuehandler.php
index 9ea9ee73a..c9edb8d5d 100644
--- a/lib/publicqueuehandler.php
+++ b/lib/publicqueuehandler.php
@@ -23,7 +23,6 @@ if (!defined('STATUSNET') && !defined('LACONICA')) {
/**
* Queue handler for pushing new notices to public XMPP subscribers.
- * @fixme correct this exception handling
*/
class PublicQueueHandler extends QueueHandler
{
@@ -33,15 +32,14 @@ class PublicQueueHandler extends QueueHandler
return 'public';
}
- function handle_notice($notice)
+ function handle($notice)
{
require_once(INSTALLDIR.'/lib/jabber.php');
try {
return jabber_public_notice($notice);
} catch (XMPPHP_Exception $e) {
$this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
- die($e->getMessage());
+ return false;
}
- return true;
}
}
diff --git a/lib/queued_xmpp.php b/lib/queued_xmpp.php
new file mode 100644
index 000000000..4b890c4ca
--- /dev/null
+++ b/lib/queued_xmpp.php
@@ -0,0 +1,117 @@
+<?php
+/**
+ * StatusNet, the distributed open-source microblogging tool
+ *
+ * Queue-mediated proxy class for outgoing XMPP messages.
+ *
+ * PHP version 5
+ *
+ * LICENCE: This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @category Network
+ * @package StatusNet
+ * @author Brion Vibber <brion@status.net>
+ * @copyright 2010 StatusNet, Inc.
+ * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
+ * @link http://status.net/
+ */
+
+if (!defined('STATUSNET') && !defined('LACONICA')) {
+ exit(1);
+}
+
+require_once INSTALLDIR . '/lib/jabber.php';
+
+class Queued_XMPP extends XMPPHP_XMPP
+{
+ /**
+ * Constructor
+ *
+ * @param string $host
+ * @param integer $port
+ * @param string $user
+ * @param string $password
+ * @param string $resource
+ * @param string $server
+ * @param boolean $printlog
+ * @param string $loglevel
+ */
+ public function __construct($host, $port, $user, $password, $resource, $server = null, $printlog = false, $loglevel = null)
+ {
+ parent::__construct($host, $port, $user, $password, $resource, $server, $printlog, $loglevel);
+ // Normally the fulljid isn't filled out until resource binding time;
+ // we need to save it here since we're not talking to a real server.
+ $this->fulljid = "{$this->basejid}/{$this->resource}";
+ }
+
+ /**
+ * Send a formatted message to the outgoing queue for later forwarding
+ * to a real XMPP connection.
+ *
+ * @param string $msg
+ */
+ public function send($msg, $timeout=NULL)
+ {
+ $qm = QueueManager::get();
+ $qm->enqueue(strval($msg), 'xmppout');
+ }
+
+ /**
+ * Since we'll be getting input through a queue system's run loop,
+ * we'll process one standalone message at a time rather than our
+ * own XMPP message pump.
+ *
+ * @param string $message
+ */
+ public function processMessage($message) {
+ $frame = array_shift($this->frames);
+ xml_parse($this->parser, $frame->body, false);
+ }
+
+ //@{
+ /**
+ * Stream i/o functions disabled; push input through processMessage()
+ */
+ public function connect($timeout = 30, $persistent = false, $sendinit = true)
+ {
+ throw new Exception("Can't connect to server from XMPP queue proxy.");
+ }
+
+ public function disconnect()
+ {
+ throw new Exception("Can't connect to server from XMPP queue proxy.");
+ }
+
+ public function process()
+ {
+ throw new Exception("Can't read stream from XMPP queue proxy.");
+ }
+
+ public function processUntil($event, $timeout=-1)
+ {
+ throw new Exception("Can't read stream from XMPP queue proxy.");
+ }
+
+ public function read()
+ {
+ throw new Exception("Can't read stream from XMPP queue proxy.");
+ }
+
+ public function readyToProcess()
+ {
+ throw new Exception("Can't read stream from XMPP queue proxy.");
+ }
+ //@}
+}
+
diff --git a/lib/queuehandler.php b/lib/queuehandler.php
index 613be6e33..2909cd83b 100644
--- a/lib/queuehandler.php
+++ b/lib/queuehandler.php
@@ -22,51 +22,20 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); }
/**
* Base class for queue handlers.
*
- * As extensions of the Daemon class, each queue handler has the ability
- * to launch itself in the background, at which point it'll pass control
- * to the configured QueueManager class to poll for updates.
+ * As of 0.9, queue handlers are short-lived for items as they are
+ * dequeued by a QueueManager running in an IoMaster in a daemon
+ * such as queuedaemon.php.
+ *
+ * Extensions requiring long-running maintenance or polling should
+ * register an IoManager.
*
* Subclasses must override at least the following methods:
* - transport
- * - handle_notice
+ * - handle
*/
-#class QueueHandler extends Daemon
class QueueHandler
{
-# function __construct($id=null, $daemonize=true)
-# {
-# parent::__construct($daemonize);
-#
-# if ($id) {
-# $this->set_id($id);
-# }
-# }
-
- /**
- * How many seconds a polling-based queue manager should wait between
- * checks for new items to handle.
- *
- * Defaults to 60 seconds; override to speed up or slow down.
- *
- * @fixme not really compatible with global queue manager
- * @return int timeout in seconds
- */
-# function timeout()
-# {
-# return 60;
-# }
-
-# function class_name()
-# {
-# return ucfirst($this->transport()) . 'Handler';
-# }
-
-# function name()
-# {
-# return strtolower($this->class_name().'.'.$this->get_id());
-# }
-
/**
* Return transport keyword which identifies items this queue handler
* services; must be defined for all subclasses.
@@ -83,61 +52,17 @@ class QueueHandler
/**
* Here's the meat of your queue handler -- you're handed a Notice
- * object, which you may do as you will with.
+ * or other object, which you may do as you will with.
*
* If this function indicates failure, a warning will be logged
* and the item is placed back in the queue to be re-run.
*
- * @param Notice $notice
- * @return boolean true on success, false on failure
- */
- function handle_notice($notice)
- {
- return true;
- }
-
- /**
- * Setup and start of run loop for this queue handler as a daemon.
- * Most of the heavy lifting is passed on to the QueueManager's service()
- * method, which passes control back to our handle_notice() method for
- * each notice that comes in on the queue.
- *
- * Most of the time this won't need to be overridden in a subclass.
- *
+ * @param mixed $object
* @return boolean true on success, false on failure
*/
- function run()
+ function handle($object)
{
- if (!$this->start()) {
- $this->log(LOG_WARNING, 'failed to start');
- return false;
- }
-
- $this->log(LOG_INFO, 'checking for queued notices');
-
- $queue = $this->transport();
- $timeout = $this->timeout();
-
- $qm = QueueManager::get();
-
- $qm->service($queue, $this);
-
- $this->log(LOG_INFO, 'finished servicing the queue');
-
- if (!$this->finish()) {
- $this->log(LOG_WARNING, 'failed to clean up');
- return false;
- }
-
- $this->log(LOG_INFO, 'terminating normally');
-
return true;
}
-
-
- function log($level, $msg)
- {
- common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
- }
}
diff --git a/lib/queuemanager.php b/lib/queuemanager.php
index 291174d3c..4eb39bfa8 100644
--- a/lib/queuemanager.php
+++ b/lib/queuemanager.php
@@ -39,6 +39,10 @@ abstract class QueueManager extends IoManager
{
static $qm = null;
+ public $master = null;
+ public $handlers = array();
+ public $groups = array();
+
/**
* Factory function to pull the appropriate QueueManager object
* for this site's configuration. It can then be used to queue
@@ -110,6 +114,64 @@ abstract class QueueManager extends IoManager
abstract function enqueue($object, $queue);
/**
+ * Build a representation for an object for logging
+ * @param mixed
+ * @return string
+ */
+ function logrep($object) {
+ if (is_object($object)) {
+ $class = get_class($object);
+ if (isset($object->id)) {
+ return "$class $object->id";
+ }
+ return $class;
+ }
+ if (is_string($object)) {
+ $len = strlen($object);
+ $fragment = mb_substr($object, 0, 32);
+ if (mb_strlen($object) > 32) {
+ $fragment .= '...';
+ }
+ return "string '$fragment' ($len bytes)";
+ }
+ return strval($object);
+ }
+
+ /**
+ * Encode an object for queued storage.
+ * Next gen may use serialization.
+ *
+ * @param mixed $object
+ * @return string
+ */
+ protected function encode($object)
+ {
+ if ($object instanceof Notice) {
+ return $object->id;
+ } else if (is_string($object)) {
+ return $object;
+ } else {
+ throw new ServerException("Can't queue this type", 500);
+ }
+ }
+
+ /**
+ * Decode an object from queued storage.
+ * Accepts back-compat notice reference entries and strings for now.
+ *
+ * @param string
+ * @return mixed
+ */
+ protected function decode($frame)
+ {
+ if (is_numeric($frame)) {
+ return Notice::staticGet(intval($frame));
+ } else {
+ return $frame;
+ }
+ }
+
+ /**
* Instantiate the appropriate QueueHandler class for the given queue.
*
* @param string $queue
@@ -131,13 +193,15 @@ abstract class QueueManager extends IoManager
}
/**
- * Get a list of all registered queue transport names.
+ * Get a list of registered queue transport names to be used
+ * for this daemon.
*
* @return array of strings
*/
function getQueues()
{
- return array_keys($this->handlers);
+ $group = $this->activeGroup();
+ return array_keys($this->groups[$group]);
}
/**
@@ -148,33 +212,29 @@ abstract class QueueManager extends IoManager
*/
function initialize()
{
+ // @fixme we'll want to be able to listen to particular queues...
if (Event::handle('StartInitializeQueueManager', array($this))) {
- if (!defined('XMPP_ONLY_FLAG')) { // hack!
- $this->connect('plugin', 'PluginQueueHandler');
- $this->connect('omb', 'OmbQueueHandler');
- $this->connect('ping', 'PingQueueHandler');
- if (common_config('sms', 'enabled')) {
- $this->connect('sms', 'SmsQueueHandler');
- }
+ $this->connect('plugin', 'PluginQueueHandler');
+ $this->connect('omb', 'OmbQueueHandler');
+ $this->connect('ping', 'PingQueueHandler');
+ if (common_config('sms', 'enabled')) {
+ $this->connect('sms', 'SmsQueueHandler');
}
// XMPP output handlers...
- if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) {
- $this->connect('jabber', 'JabberQueueHandler');
- $this->connect('public', 'PublicQueueHandler');
-
- // @fixme this should move up a level or should get an actual queue
- $this->connect('confirm', 'XmppConfirmHandler');
- }
+ $this->connect('jabber', 'JabberQueueHandler');
+ $this->connect('public', 'PublicQueueHandler');
+
+ // @fixme this should get an actual queue
+ //$this->connect('confirm', 'XmppConfirmHandler');
+
+ // For compat with old plugins not registering their own handlers.
+ $this->connect('plugin', 'PluginQueueHandler');
+
+ $this->connect('xmppout', 'XmppOutQueueHandler', 'xmppdaemon');
- if (!defined('XMPP_ONLY_FLAG')) { // hack!
- // For compat with old plugins not registering their own handlers.
- $this->connect('plugin', 'PluginQueueHandler');
- }
- }
- if (!defined('XMPP_ONLY_FLAG')) { // hack!
- Event::handle('EndInitializeQueueManager', array($this));
}
+ Event::handle('EndInitializeQueueManager', array($this));
}
/**
@@ -183,10 +243,27 @@ abstract class QueueManager extends IoManager
*
* @param string $transport
* @param string $class
+ * @param string $group
*/
- public function connect($transport, $class)
+ public function connect($transport, $class, $group='queuedaemon')
{
$this->handlers[$transport] = $class;
+ $this->groups[$group][$transport] = $class;
+ }
+
+ /**
+ * @return string queue group to use for this request
+ */
+ function activeGroup()
+ {
+ $group = 'queuedaemon';
+ if ($this->master) {
+ // hack hack
+ if ($this->master instanceof XmppMaster) {
+ return 'xmppdaemon';
+ }
+ }
+ return $group;
}
/**
diff --git a/lib/smsqueuehandler.php b/lib/smsqueuehandler.php
index 48a96409d..6085d2b4a 100644
--- a/lib/smsqueuehandler.php
+++ b/lib/smsqueuehandler.php
@@ -31,7 +31,7 @@ class SmsQueueHandler extends QueueHandler
return 'sms';
}
- function handle_notice($notice)
+ function handle($notice)
{
require_once(INSTALLDIR.'/lib/mail.php');
return mail_broadcast_notice_sms($notice);
diff --git a/lib/spawningdaemon.php b/lib/spawningdaemon.php
new file mode 100644
index 000000000..8baefe88e
--- /dev/null
+++ b/lib/spawningdaemon.php
@@ -0,0 +1,159 @@
+<?php
+/*
+ * StatusNet - the distributed open-source microblogging tool
+ * Copyright (C) 2010, StatusNet, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/**
+ * Base class for daemon that can launch one or more processing threads,
+ * respawning them if they exit.
+ *
+ * This is mainly intended for indefinite workloads such as monitoring
+ * a queue or maintaining an IM channel.
+ *
+ * Child classes should implement the
+ *
+ * We can then pass individual items through the QueueHandler subclasses
+ * they belong to. We additionally can handle queues for multiple sites.
+ *
+ * @package QueueHandler
+ * @author Brion Vibber <brion@status.net>
+ */
+abstract class SpawningDaemon extends Daemon
+{
+ protected $threads=1;
+
+ function __construct($id=null, $daemonize=true, $threads=1)
+ {
+ parent::__construct($daemonize);
+
+ if ($id) {
+ $this->set_id($id);
+ }
+ $this->threads = $threads;
+ }
+
+ /**
+ * Perform some actual work!
+ *
+ * @return boolean true on success, false on failure
+ */
+ public abstract function runThread();
+
+ /**
+ * Spawn one or more background processes and let them start running.
+ * Each individual process will execute whatever's in the runThread()
+ * method, which should be overridden.
+ *
+ * Child processes will be automatically respawned when they exit.
+ *
+ * @todo possibly allow for not respawning on "normal" exits...
+ * though ParallelizingDaemon is probably better for workloads
+ * that have forseeable endpoints.
+ */
+ function run()
+ {
+ $children = array();
+ for ($i = 1; $i <= $this->threads; $i++) {
+ $pid = pcntl_fork();
+ if ($pid < 0) {
+ $this->log(LOG_ERROR, "Couldn't fork for thread $i; aborting\n");
+ exit(1);
+ } else if ($pid == 0) {
+ $this->initAndRunChild($i);
+ } else {
+ $this->log(LOG_INFO, "Spawned thread $i as pid $pid");
+ $children[$i] = $pid;
+ }
+ }
+
+ $this->log(LOG_INFO, "Waiting for children to complete.");
+ while (count($children) > 0) {
+ $status = null;
+ $pid = pcntl_wait($status);
+ if ($pid > 0) {
+ $i = array_search($pid, $children);
+ if ($i === false) {
+ $this->log(LOG_ERR, "Unrecognized child pid $pid exited!");
+ continue;
+ }
+ unset($children[$i]);
+ $this->log(LOG_INFO, "Thread $i pid $pid exited.");
+
+ $pid = pcntl_fork();
+ if ($pid < 0) {
+ $this->log(LOG_ERROR, "Couldn't fork to respawn thread $i; aborting thread.\n");
+ } else if ($pid == 0) {
+ $this->initAndRunChild($i);
+ } else {
+ $this->log(LOG_INFO, "Respawned thread $i as pid $pid");
+ $children[$i] = $pid;
+ }
+ }
+ }
+ $this->log(LOG_INFO, "All child processes complete.");
+ return true;
+ }
+
+ /**
+ * Initialize things for a fresh thread, call runThread(), and
+ * exit at completion with appropriate return value.
+ */
+ protected function initAndRunChild($thread)
+ {
+ $this->set_id($this->get_id() . "." . $thread);
+ $this->resetDb();
+ $ok = $this->runThread();
+ exit($ok ? 0 : 1);
+ }
+
+ /**
+ * Reconnect to the database for each child process,
+ * or they'll get very confused trying to use the
+ * same socket.
+ */
+ protected function resetDb()
+ {
+ // @fixme do we need to explicitly open the db too
+ // or is this implied?
+ global $_DB_DATAOBJECT;
+ unset($_DB_DATAOBJECT['CONNECTIONS']);
+
+ // Reconnect main memcached, or threads will stomp on
+ // each other and corrupt their requests.
+ $cache = common_memcache();
+ if ($cache) {
+ $cache->reconnect();
+ }
+
+ // Also reconnect memcached for status_network table.
+ if (!empty(Status_network::$cache)) {
+ Status_network::$cache->close();
+ Status_network::$cache = null;
+ }
+ }
+
+ function log($level, $msg)
+ {
+ common_log($level, get_class($this) . ' ('. $this->get_id() .'): '.$msg);
+ }
+
+ function name()
+ {
+ return strtolower(get_class($this).'.'.$this->get_id());
+ }
+}
+
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index 00590fdb6..f057bd9e4 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -39,7 +39,6 @@ class StompQueueManager extends QueueManager
var $base = null;
var $con = null;
- protected $master = null;
protected $sites = array();
function __construct()
@@ -104,11 +103,12 @@ class StompQueueManager extends QueueManager
*/
function getQueues()
{
+ $group = $this->activeGroup();
$site = common_config('site', 'server');
- if (empty($this->handlers[$site])) {
+ if (empty($this->groups[$site][$group])) {
return array();
} else {
- return array_keys($this->handlers[$site]);
+ return array_keys($this->groups[$site][$group]);
}
}
@@ -118,10 +118,12 @@ class StompQueueManager extends QueueManager
*
* @param string $transport
* @param string $class
+ * @param string $group
*/
- public function connect($transport, $class)
+ public function connect($transport, $class, $group='queuedaemon')
{
$this->handlers[common_config('site', 'server')][$transport] = $class;
+ $this->groups[common_config('site', 'server')][$group][$transport] = $class;
}
/**
@@ -130,23 +132,23 @@ class StompQueueManager extends QueueManager
*/
public function enqueue($object, $queue)
{
- $notice = $object;
+ $msg = $this->encode($object);
+ $rep = $this->logrep($object);
$this->_connect();
// XXX: serialize and send entire notice
$result = $this->con->send($this->queueName($queue),
- $notice->id, // BODY of the message
- array ('created' => $notice->created));
+ $msg, // BODY of the message
+ array ('created' => common_sql_now()));
if (!$result) {
- common_log(LOG_ERR, 'Error sending to '.$queue.' queue');
+ common_log(LOG_ERR, "Error sending $rep to $queue queue");
return false;
}
- common_log(LOG_DEBUG, 'complete remote queueing notice ID = '
- . $notice->id . ' for ' . $queue);
+ common_log(LOG_DEBUG, "complete remote queueing $rep for $queue");
$this->stats('enqueued', $queue);
}
@@ -174,7 +176,7 @@ class StompQueueManager extends QueueManager
$ok = true;
$frames = $this->con->readFrames();
foreach ($frames as $frame) {
- $ok = $ok && $this->_handleNotice($frame);
+ $ok = $ok && $this->_handleItem($frame);
}
return $ok;
}
@@ -265,7 +267,7 @@ class StompQueueManager extends QueueManager
}
/**
- * Handle and acknowledge a notice event that's come in through a queue.
+ * Handle and acknowledge an event that's come in through a queue.
*
* If the queue handler reports failure, the message is requeued for later.
* Missing notices or handler classes will drop the message.
@@ -276,7 +278,7 @@ class StompQueueManager extends QueueManager
* @param StompFrame $frame
* @return bool
*/
- protected function _handleNotice($frame)
+ protected function _handleItem($frame)
{
list($site, $queue) = $this->parseDestination($frame->headers['destination']);
if ($site != common_config('site', 'server')) {
@@ -284,15 +286,23 @@ class StompQueueManager extends QueueManager
StatusNet::init($site);
}
- $id = intval($frame->body);
- $info = "notice $id posted at {$frame->headers['created']} in queue $queue";
+ if (is_numeric($frame->body)) {
+ $id = intval($frame->body);
+ $info = "notice $id posted at {$frame->headers['created']} in queue $queue";
- $notice = Notice::staticGet('id', $id);
- if (empty($notice)) {
- $this->_log(LOG_WARNING, "Skipping missing $info");
- $this->con->ack($frame);
- $this->stats('badnotice', $queue);
- return false;
+ $notice = Notice::staticGet('id', $id);
+ if (empty($notice)) {
+ $this->_log(LOG_WARNING, "Skipping missing $info");
+ $this->con->ack($frame);
+ $this->stats('badnotice', $queue);
+ return false;
+ }
+
+ $item = $notice;
+ } else {
+ // @fixme should we serialize, or json, or what here?
+ $info = "string posted at {$frame->headers['created']} in queue $queue";
+ $item = $frame->body;
}
$handler = $this->getHandler($queue);
@@ -303,7 +313,7 @@ class StompQueueManager extends QueueManager
return false;
}
- $ok = $handler->handle_notice($notice);
+ $ok = $handler->handle($item);
if (!$ok) {
$this->_log(LOG_WARNING, "Failed handling $info");
@@ -311,7 +321,7 @@ class StompQueueManager extends QueueManager
// this kind of queue management ourselves;
// if we don't ack, it should resend...
$this->con->ack($frame);
- $this->enqueue($notice, $queue);
+ $this->enqueue($item, $queue);
$this->stats('requeued', $queue);
return false;
}
diff --git a/lib/util.php b/lib/util.php
index ef8a5d1f0..fb3b8be87 100644
--- a/lib/util.php
+++ b/lib/util.php
@@ -1130,7 +1130,8 @@ function common_request_id()
$pid = getmypid();
$server = common_config('site', 'server');
if (php_sapi_name() == 'cli') {
- return "$server:$pid";
+ $script = basename($_SERVER['PHP_SELF']);
+ return "$server:$script:$pid";
} else {
static $req_id = null;
if (!isset($req_id)) {
diff --git a/lib/xmppconfirmmanager.php b/lib/xmppconfirmmanager.php
deleted file mode 100644
index ee4e294fd..000000000
--- a/lib/xmppconfirmmanager.php
+++ /dev/null
@@ -1,168 +0,0 @@
-<?php
-/*
- * StatusNet - the distributed open-source microblogging tool
- * Copyright (C) 2008-2010 StatusNet, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-if (!defined('STATUSNET') && !defined('LACONICA')) {
- exit(1);
-}
-
-/**
- * Event handler for pushing new confirmations to Jabber users.
- * @fixme recommend redoing this on a queue-trigger model
- * @fixme expiration of old items got dropped in the past, put it back?
- */
-class XmppConfirmManager extends IoManager
-{
-
- /**
- * @return mixed XmppConfirmManager, or false if unneeded
- */
- public static function get()
- {
- if (common_config('xmpp', 'enabled')) {
- $site = common_config('site', 'server');
- return new XmppConfirmManager();
- } else {
- return false;
- }
- }
-
- /**
- * Tell the i/o master we need one instance for each supporting site
- * being handled in this process.
- */
- public static function multiSite()
- {
- return IoManager::INSTANCE_PER_SITE;
- }
-
- function __construct()
- {
- $this->site = common_config('site', 'server');
- }
-
- /**
- * 10 seconds? Really? That seems a bit frequent.
- */
- function pollInterval()
- {
- return 10;
- }
-
- /**
- * Ping!
- * @return boolean true if we found something
- */
- function poll()
- {
- $this->switchSite();
- $confirm = $this->next_confirm();
- if ($confirm) {
- $this->handle_confirm($confirm);
- return true;
- } else {
- return false;
- }
- }
-
- protected function handle_confirm($confirm)
- {
- require_once INSTALLDIR . '/lib/jabber.php';
-
- common_log(LOG_INFO, 'Sending confirmation for ' . $confirm->address);
- $user = User::staticGet($confirm->user_id);
- if (!$user) {
- common_log(LOG_WARNING, 'Confirmation for unknown user ' . $confirm->user_id);
- return;
- }
- $success = jabber_confirm_address($confirm->code,
- $user->nickname,
- $confirm->address);
- if (!$success) {
- common_log(LOG_ERR, 'Confirmation failed for ' . $confirm->address);
- # Just let the claim age out; hopefully things work then
- return;
- } else {
- common_log(LOG_INFO, 'Confirmation sent for ' . $confirm->address);
- # Mark confirmation sent; need a dupe so we don't have the WHERE clause
- $dupe = Confirm_address::staticGet('code', $confirm->code);
- if (!$dupe) {
- common_log(LOG_WARNING, 'Could not refetch confirm', __FILE__);
- return;
- }
- $orig = clone($dupe);
- $dupe->sent = $dupe->claimed;
- $result = $dupe->update($orig);
- if (!$result) {
- common_log_db_error($dupe, 'UPDATE', __FILE__);
- # Just let the claim age out; hopefully things work then
- return;
- }
- }
- return true;
- }
-
- protected function next_confirm()
- {
- $confirm = new Confirm_address();
- $confirm->whereAdd('claimed IS null');
- $confirm->whereAdd('sent IS null');
- # XXX: eventually we could do other confirmations in the queue, too
- $confirm->address_type = 'jabber';
- $confirm->orderBy('modified DESC');
- $confirm->limit(1);
- if ($confirm->find(true)) {
- common_log(LOG_INFO, 'Claiming confirmation for ' . $confirm->address);
- # working around some weird DB_DataObject behaviour
- $confirm->whereAdd(''); # clears where stuff
- $original = clone($confirm);
- $confirm->claimed = common_sql_now();
- $result = $confirm->update($original);
- if ($result) {
- common_log(LOG_INFO, 'Succeeded in claim! '. $result);
- return $confirm;
- } else {
- common_log(LOG_INFO, 'Failed in claim!');
- return false;
- }
- }
- return null;
- }
-
- protected function clear_old_confirm_claims()
- {
- $confirm = new Confirm();
- $confirm->claimed = null;
- $confirm->whereAdd('now() - claimed > '.CLAIM_TIMEOUT);
- $confirm->update(DB_DATAOBJECT_WHEREADD_ONLY);
- $confirm->free();
- unset($confirm);
- }
-
- /**
- * Make sure we're on the right site configuration
- */
- protected function switchSite()
- {
- if ($this->site != common_config('site', 'server')) {
- common_log(LOG_DEBUG, __METHOD__ . ": switching to site $this->site");
- $this->stats('switch');
- StatusNet::init($this->site);
- }
- }
-}
diff --git a/lib/xmppmanager.php b/lib/xmppmanager.php
index dfff63a30..299175dd7 100644
--- a/lib/xmppmanager.php
+++ b/lib/xmppmanager.php
@@ -70,6 +70,7 @@ class XmppManager extends IoManager
function __construct()
{
$this->site = common_config('site', 'server');
+ $this->resource = common_config('xmpp', 'resource') . 'daemon';
}
/**
@@ -86,15 +87,19 @@ class XmppManager extends IoManager
# Low priority; we don't want to receive messages
common_log(LOG_INFO, "INITIALIZE");
- $this->conn = jabber_connect($this->resource());
+ $this->conn = jabber_connect($this->resource);
if (empty($this->conn)) {
common_log(LOG_ERR, "Couldn't connect to server.");
return false;
}
- $this->conn->addEventHandler('message', 'forward_message', $this);
+ $this->log(LOG_DEBUG, "Initializing stanza handlers.");
+
+ $this->conn->addEventHandler('message', 'handle_message', $this);
+ $this->conn->addEventHandler('presence', 'handle_presence', $this);
$this->conn->addEventHandler('reconnect', 'handle_reconnect', $this);
+
$this->conn->setReconnectTimeout(600);
jabber_send_presence("Send me a message to post a notice", 'available', null, 'available', -1);
@@ -176,11 +181,36 @@ class XmppManager extends IoManager
}
/**
+ * For queue handlers to pass us a message to push out,
+ * if we're active.
+ *
+ * @fixme should this be blocking etc?
+ *
+ * @param string $msg XML stanza to send
+ * @return boolean success
+ */
+ public function send($msg)
+ {
+ if ($this->conn && !$this->conn->isDisconnected()) {
+ $bytes = $this->conn->send($msg);
+ if ($bytes > 0) {
+ $this->conn->processTime(0);
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ // Can't send right now...
+ return false;
+ }
+ }
+
+ /**
* Send a keepalive ping to the XMPP server.
*/
protected function sendPing()
{
- $jid = jabber_daemon_address().'/'.$this->resource();
+ $jid = jabber_daemon_address().'/'.$this->resource;
$server = common_config('xmpp', 'server');
if (!isset($this->pingid)) {
@@ -206,61 +236,239 @@ class XmppManager extends IoManager
$this->conn->presence(null, 'available', null, 'available', -1);
}
+
+ function get_user($from)
+ {
+ $user = User::staticGet('jabber', jabber_normalize_jid($from));
+ return $user;
+ }
+
/**
- * Callback for Jabber message event.
- *
- * This connection handles output; if we get a message straight to us,
- * forward it on to our XmppDaemon listener for processing.
- *
- * @param $pl
+ * XMPP callback for handling message input...
+ * @param array $pl XMPP payload
*/
- function forward_message(&$pl)
+ function handle_message(&$pl)
{
+ $from = jabber_normalize_jid($pl['from']);
+
if ($pl['type'] != 'chat') {
- common_log(LOG_DEBUG, 'Ignoring message of type ' . $pl['type'] . ' from ' . $pl['from']);
+ $this->log(LOG_WARNING, "Ignoring message of type ".$pl['type']." from $from.");
return;
}
- $listener = $this->listener();
- if (strtolower($listener) == strtolower($pl['from'])) {
- common_log(LOG_WARNING, 'Ignoring loop message.');
+
+ if (mb_strlen($pl['body']) == 0) {
+ $this->log(LOG_WARNING, "Ignoring message with empty body from $from.");
return;
}
- common_log(LOG_INFO, 'Forwarding message from ' . $pl['from'] . ' to ' . $listener);
- $this->conn->message($this->listener(), $pl['body'], 'chat', null, $this->ofrom($pl['from']));
+
+ // Forwarded from another daemon for us to handle; this shouldn't
+ // happen any more but we might get some legacy items.
+ if ($this->is_self($from)) {
+ $this->log(LOG_INFO, "Got forwarded notice from self ($from).");
+ $from = $this->get_ofrom($pl);
+ $this->log(LOG_INFO, "Originally sent by $from.");
+ if (is_null($from) || $this->is_self($from)) {
+ $this->log(LOG_INFO, "Ignoring notice originally sent by $from.");
+ return;
+ }
+ }
+
+ $user = $this->get_user($from);
+
+ // For common_current_user to work
+ global $_cur;
+ $_cur = $user;
+
+ if (!$user) {
+ $this->from_site($from, 'Unknown user; go to ' .
+ common_local_url('imsettings') .
+ ' to add your address to your account');
+ $this->log(LOG_WARNING, 'Message from unknown user ' . $from);
+ return;
+ }
+ if ($this->handle_command($user, $pl['body'])) {
+ $this->log(LOG_INFO, "Command message by $from handled.");
+ return;
+ } else if ($this->is_autoreply($pl['body'])) {
+ $this->log(LOG_INFO, 'Ignoring auto reply from ' . $from);
+ return;
+ } else if ($this->is_otr($pl['body'])) {
+ $this->log(LOG_INFO, 'Ignoring OTR from ' . $from);
+ return;
+ } else {
+
+ $this->log(LOG_INFO, 'Posting a notice from ' . $user->nickname);
+
+ $this->add_notice($user, $pl);
+ }
+
+ $user->free();
+ unset($user);
+ unset($_cur);
+
+ unset($pl['xml']);
+ $pl['xml'] = null;
+
+ $pl = null;
+ unset($pl);
}
- /**
- * Build an <addresses> block with an ofrom entry for forwarded messages
- *
- * @param string $from Jabber ID of original sender
- * @return string XML fragment
- */
- protected function ofrom($from)
+
+ function is_self($from)
{
- $address = "<addresses xmlns='http://jabber.org/protocol/address'>\n";
- $address .= "<address type='ofrom' jid='$from' />\n";
- $address .= "</addresses>\n";
- return $address;
+ return preg_match('/^'.strtolower(jabber_daemon_address()).'/', strtolower($from));
}
- /**
- * Build the complete JID of the XmppDaemon process which
- * handles primary XMPP input for this site.
- *
- * @return string Jabber ID
- */
- protected function listener()
+ function get_ofrom($pl)
+ {
+ $xml = $pl['xml'];
+ $addresses = $xml->sub('addresses');
+ if (!$addresses) {
+ $this->log(LOG_WARNING, 'Forwarded message without addresses');
+ return null;
+ }
+ $address = $addresses->sub('address');
+ if (!$address) {
+ $this->log(LOG_WARNING, 'Forwarded message without address');
+ return null;
+ }
+ if (!array_key_exists('type', $address->attrs)) {
+ $this->log(LOG_WARNING, 'No type for forwarded message');
+ return null;
+ }
+ $type = $address->attrs['type'];
+ if ($type != 'ofrom') {
+ $this->log(LOG_WARNING, 'Type of forwarded message is not ofrom');
+ return null;
+ }
+ if (!array_key_exists('jid', $address->attrs)) {
+ $this->log(LOG_WARNING, 'No jid for forwarded message');
+ return null;
+ }
+ $jid = $address->attrs['jid'];
+ if (!$jid) {
+ $this->log(LOG_WARNING, 'Could not get jid from address');
+ return null;
+ }
+ $this->log(LOG_DEBUG, 'Got message forwarded from jid ' . $jid);
+ return $jid;
+ }
+
+ function is_autoreply($txt)
+ {
+ if (preg_match('/[\[\(]?[Aa]uto[-\s]?[Rr]e(ply|sponse)[\]\)]/', $txt)) {
+ return true;
+ } else if (preg_match('/^System: Message wasn\'t delivered. Offline storage size was exceeded.$/', $txt)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ function is_otr($txt)
+ {
+ if (preg_match('/^\?OTR/', $txt)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ function from_site($address, $msg)
+ {
+ $text = '['.common_config('site', 'name') . '] ' . $msg;
+ jabber_send_message($address, $text);
+ }
+
+ function handle_command($user, $body)
{
- if (common_config('xmpp', 'listener')) {
- return common_config('xmpp', 'listener');
+ $inter = new CommandInterpreter();
+ $cmd = $inter->handle_command($user, $body);
+ if ($cmd) {
+ $chan = new XMPPChannel($this->conn);
+ $cmd->execute($chan);
+ return true;
} else {
- return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon';
+ return false;
+ }
+ }
+
+ function add_notice(&$user, &$pl)
+ {
+ $body = trim($pl['body']);
+ $content_shortened = common_shorten_links($body);
+ if (Notice::contentTooLong($content_shortened)) {
+ $from = jabber_normalize_jid($pl['from']);
+ $this->from_site($from, sprintf(_('Message too long - maximum is %1$d characters, you sent %2$d.'),
+ Notice::maxContent(),
+ mb_strlen($content_shortened)));
+ return;
+ }
+
+ try {
+ $notice = Notice::saveNew($user->id, $content_shortened, 'xmpp');
+ } catch (Exception $e) {
+ $this->log(LOG_ERR, $e->getMessage());
+ $this->from_site($user->jabber, $e->getMessage());
+ return;
}
+
+ common_broadcast_notice($notice);
+ $this->log(LOG_INFO,
+ 'Added notice ' . $notice->id . ' from user ' . $user->nickname);
+ $notice->free();
+ unset($notice);
+ }
+
+ function handle_presence(&$pl)
+ {
+ $from = jabber_normalize_jid($pl['from']);
+ switch ($pl['type']) {
+ case 'subscribe':
+ # We let anyone subscribe
+ $this->subscribed($from);
+ $this->log(LOG_INFO,
+ 'Accepted subscription from ' . $from);
+ break;
+ case 'subscribed':
+ case 'unsubscribed':
+ case 'unsubscribe':
+ $this->log(LOG_INFO,
+ 'Ignoring "' . $pl['type'] . '" from ' . $from);
+ break;
+ default:
+ if (!$pl['type']) {
+ $user = User::staticGet('jabber', $from);
+ if (!$user) {
+ $this->log(LOG_WARNING, 'Presence from unknown user ' . $from);
+ return;
+ }
+ if ($user->updatefrompresence) {
+ $this->log(LOG_INFO, 'Updating ' . $user->nickname .
+ ' status from presence.');
+ $this->add_notice($user, $pl);
+ }
+ $user->free();
+ unset($user);
+ }
+ break;
+ }
+ unset($pl['xml']);
+ $pl['xml'] = null;
+
+ $pl = null;
+ unset($pl);
+ }
+
+ function log($level, $msg)
+ {
+ $text = 'XMPPDaemon('.$this->resource.'): '.$msg;
+ common_log($level, $text);
}
- protected function resource()
+ function subscribed($to)
{
- return 'queue' . posix_getpid(); // @fixme PIDs won't be host-unique
+ jabber_special_presence('subscribed', $to);
}
/**
diff --git a/lib/xmppoutqueuehandler.php b/lib/xmppoutqueuehandler.php
new file mode 100644
index 000000000..2afa260f1
--- /dev/null
+++ b/lib/xmppoutqueuehandler.php
@@ -0,0 +1,55 @@
+<?php
+/*
+ * StatusNet - the distributed open-source microblogging tool
+ * Copyright (C) 2010, StatusNet, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/**
+ * Queue handler for pre-processed outgoing XMPP messages.
+ * Formatted XML stanzas will have been pushed into the queue
+ * via the Queued_XMPP connection proxy, probably from some
+ * other queue processor.
+ *
+ * Here, the XML stanzas are simply pulled out of the queue and
+ * pushed out over the wire; an XmppManager is needed to set up
+ * and maintain the actual server connection.
+ *
+ * This queue will be run via XmppDaemon rather than QueueDaemon.
+ *
+ * @author Brion Vibber <brion@status.net>
+ */
+class XmppOutQueueHandler extends QueueHandler
+{
+ function transport() {
+ return 'xmppout';
+ }
+
+ /**
+ * Take a previously-queued XMPP stanza and send it out ot the server.
+ * @param string $msg
+ * @return boolean true on success
+ */
+ function handle($msg)
+ {
+ assert(is_string($msg));
+
+ $xmpp = XmppManager::get();
+ $ok = $xmpp->send($msg);
+
+ return $ok;
+ }
+}
+