diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/dbqueuemanager.php | 95 | ||||
-rw-r--r-- | lib/jabberqueuehandler.php | 4 | ||||
-rw-r--r-- | lib/ombqueuehandler.php | 2 | ||||
-rw-r--r-- | lib/pingqueuehandler.php | 2 | ||||
-rw-r--r-- | lib/pluginqueuehandler.php | 2 | ||||
-rw-r--r-- | lib/publicqueuehandler.php | 6 | ||||
-rw-r--r-- | lib/queuehandler.php | 95 | ||||
-rw-r--r-- | lib/smsqueuehandler.php | 2 | ||||
-rw-r--r-- | lib/stompqueuemanager.php | 38 | ||||
-rw-r--r-- | lib/xmppmanager.php | 24 |
10 files changed, 184 insertions, 86 deletions
diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index 139f50234..889365b64 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -31,17 +31,19 @@ class DBQueueManager extends QueueManager { /** - * Saves an object into the queue item table. + * Saves a notice object reference into the queue item table. * @return boolean true on success * @throws ServerException on failure */ public function enqueue($object, $queue) { + $notice = $object; + $qi = new Queue_item(); - $qi->frame = serialize($object); + $qi->notice_id = $notice->id; $qi->transport = $queue; - $qi->created = common_sql_now(); + $qi->created = $notice->created; $result = $qi->insert(); if (!$result) { @@ -71,35 +73,34 @@ class DBQueueManager extends QueueManager */ public function poll() { - $this->_log(LOG_DEBUG, 'Checking for queued objects...'); - $qi = $this->_nextItem(); - if ($qi === false) { - $this->_log(LOG_DEBUG, 'No queue items waiting; idling.'); + $this->_log(LOG_DEBUG, 'Checking for notices...'); + $item = $this->_nextItem(); + if ($item === false) { + $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); return false; } - if ($qi === true) { - // We dequeued an entry for a deleted or invalid object. + if ($item === true) { + // We dequeued an entry for a deleted or invalid notice. // Consider it a hit for poll rate purposes. return true; } - $queue = $qi->transport; - $object = unserialize($qi->frame); - $this->_log(LOG_INFO, 'Got item id=' . $qi->id . ' for transport ' . $queue); + list($queue, $notice) = $item; + $this->_log(LOG_INFO, 'Got notice '. $notice->id . ' for transport ' . $queue); // Yay! Got one! $handler = $this->getHandler($queue); if ($handler) { - if ($handler->handle($object)) { - $this->_log(LOG_INFO, "[$queue] Successfully handled object"); - $this->_done($qi); + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, "[$queue:notice $notice->id] Successfully handled notice"); + $this->_done($notice, $queue); } else { - $this->_log(LOG_INFO, "[$queue] Failed to handle object"); - $this->_fail($qi); + $this->_log(LOG_INFO, "[$queue:notice $notice->id] Failed to handle notice"); + $this->_fail($notice, $queue); } } else { - $this->_log(LOG_INFO, "[$queue] No handler for queue $queue; discarding."); - $this->_done($qi); + $this->_log(LOG_INFO, "[$queue:notice $notice->id] No handler for queue $queue; discarding."); + $this->_done($notice, $queue); } return true; } @@ -107,7 +108,8 @@ class DBQueueManager extends QueueManager /** * Pop the oldest unclaimed item off the queue set and claim it. * - * @return mixed false if no items; true if bogus hit; otherwise Queue_item + * @return mixed false if no items; true if bogus hit; otherwise array(string, Notice) + * giving the queue transport name. */ protected function _nextItem() { @@ -119,42 +121,70 @@ class DBQueueManager extends QueueManager return false; } - return $qi; + $queue = $qi->transport; + $notice = Notice::staticGet('id', $qi->notice_id); + if (empty($notice)) { + $this->_log(LOG_INFO, "[$queue:notice $notice->id] dequeued non-existent notice"); + $qi->delete(); + return true; + } + + $result = $notice; + return array($queue, $notice); } /** * Delete our claimed item from the queue after successful processing. * - * @param QueueItem $qi + * @param Notice $object + * @param string $queue */ - protected function _done($qi) + protected function _done($object, $queue) { + // XXX: right now, we only handle notices + + $notice = $object; + + $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, + 'transport' => $queue)); + if (empty($qi)) { - $this->_log(LOG_INFO, "_done passed an empty queue item"); + $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item"); } else { if (empty($qi->claimed)) { - $this->_log(LOG_WARNING, "Reluctantly releasing unclaimed queue item"); + $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Reluctantly releasing unclaimed queue item"); } $qi->delete(); $qi->free(); } - $this->_log(LOG_INFO, "done with item"); + $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with item"); + $this->stats('handled', $queue); + + $notice->free(); } /** * Free our claimed queue item for later reprocessing in case of * temporary failure. * - * @param QueueItem $qi + * @param Notice $object + * @param string $queue */ - protected function _fail($qi) + protected function _fail($object, $queue) { + // XXX: right now, we only handle notices + + $notice = $object; + + $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, + 'transport' => $queue)); + if (empty($qi)) { - $this->_log(LOG_INFO, "_fail passed an empty queue item"); + $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item"); } else { if (empty($qi->claimed)) { - $this->_log(LOG_WARNING, "Ignoring failure for unclaimed queue item"); + $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Ignoring failure for unclaimed queue item"); } else { $orig = clone($qi); $qi->claimed = null; @@ -163,7 +193,10 @@ class DBQueueManager extends QueueManager } } - $this->_log(LOG_INFO, "done with queue item"); + $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with queue item"); + $this->stats('error', $queue); + + $notice->free(); } protected function _log($level, $msg) diff --git a/lib/jabberqueuehandler.php b/lib/jabberqueuehandler.php index 83471f2df..b1518866d 100644 --- a/lib/jabberqueuehandler.php +++ b/lib/jabberqueuehandler.php @@ -34,14 +34,14 @@ class JabberQueueHandler extends QueueHandler return 'jabber'; } - function handle($notice) + function handle_notice($notice) { require_once(INSTALLDIR.'/lib/jabber.php'); try { return jabber_broadcast_notice($notice); } catch (XMPPHP_Exception $e) { $this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); - return false; + exit(1); } } } diff --git a/lib/ombqueuehandler.php b/lib/ombqueuehandler.php index 24896c784..3ffc1313b 100644 --- a/lib/ombqueuehandler.php +++ b/lib/ombqueuehandler.php @@ -36,7 +36,7 @@ class OmbQueueHandler extends QueueHandler * @fixme doesn't currently report failure back to the queue manager * because omb_broadcast_notice() doesn't report it to us */ - function handle($notice) + function handle_notice($notice) { if ($this->is_remote($notice)) { $this->log(LOG_DEBUG, 'Ignoring remote notice ' . $notice->id); diff --git a/lib/pingqueuehandler.php b/lib/pingqueuehandler.php index 4e4d74cb1..8bb218078 100644 --- a/lib/pingqueuehandler.php +++ b/lib/pingqueuehandler.php @@ -30,7 +30,7 @@ class PingQueueHandler extends QueueHandler { return 'ping'; } - function handle($notice) { + function handle_notice($notice) { require_once INSTALLDIR . '/lib/ping.php'; return ping_broadcast_notice($notice); } diff --git a/lib/pluginqueuehandler.php b/lib/pluginqueuehandler.php index 9653ccad4..24d504699 100644 --- a/lib/pluginqueuehandler.php +++ b/lib/pluginqueuehandler.php @@ -42,7 +42,7 @@ class PluginQueueHandler extends QueueHandler return 'plugin'; } - function handle($notice) + function handle_notice($notice) { Event::handle('HandleQueuedNotice', array(&$notice)); return true; diff --git a/lib/publicqueuehandler.php b/lib/publicqueuehandler.php index c9edb8d5d..9ea9ee73a 100644 --- a/lib/publicqueuehandler.php +++ b/lib/publicqueuehandler.php @@ -23,6 +23,7 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { /** * Queue handler for pushing new notices to public XMPP subscribers. + * @fixme correct this exception handling */ class PublicQueueHandler extends QueueHandler { @@ -32,14 +33,15 @@ class PublicQueueHandler extends QueueHandler return 'public'; } - function handle($notice) + function handle_notice($notice) { require_once(INSTALLDIR.'/lib/jabber.php'); try { return jabber_public_notice($notice); } catch (XMPPHP_Exception $e) { $this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); - return false; + die($e->getMessage()); } + return true; } } diff --git a/lib/queuehandler.php b/lib/queuehandler.php index 2909cd83b..613be6e33 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -22,20 +22,51 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } /** * Base class for queue handlers. * - * As of 0.9, queue handlers are short-lived for items as they are - * dequeued by a QueueManager running in an IoMaster in a daemon - * such as queuedaemon.php. - * - * Extensions requiring long-running maintenance or polling should - * register an IoManager. + * As extensions of the Daemon class, each queue handler has the ability + * to launch itself in the background, at which point it'll pass control + * to the configured QueueManager class to poll for updates. * * Subclasses must override at least the following methods: * - transport - * - handle + * - handle_notice */ +#class QueueHandler extends Daemon class QueueHandler { +# function __construct($id=null, $daemonize=true) +# { +# parent::__construct($daemonize); +# +# if ($id) { +# $this->set_id($id); +# } +# } + + /** + * How many seconds a polling-based queue manager should wait between + * checks for new items to handle. + * + * Defaults to 60 seconds; override to speed up or slow down. + * + * @fixme not really compatible with global queue manager + * @return int timeout in seconds + */ +# function timeout() +# { +# return 60; +# } + +# function class_name() +# { +# return ucfirst($this->transport()) . 'Handler'; +# } + +# function name() +# { +# return strtolower($this->class_name().'.'.$this->get_id()); +# } + /** * Return transport keyword which identifies items this queue handler * services; must be defined for all subclasses. @@ -52,17 +83,61 @@ class QueueHandler /** * Here's the meat of your queue handler -- you're handed a Notice - * or other object, which you may do as you will with. + * object, which you may do as you will with. * * If this function indicates failure, a warning will be logged * and the item is placed back in the queue to be re-run. * - * @param mixed $object + * @param Notice $notice + * @return boolean true on success, false on failure + */ + function handle_notice($notice) + { + return true; + } + + /** + * Setup and start of run loop for this queue handler as a daemon. + * Most of the heavy lifting is passed on to the QueueManager's service() + * method, which passes control back to our handle_notice() method for + * each notice that comes in on the queue. + * + * Most of the time this won't need to be overridden in a subclass. + * * @return boolean true on success, false on failure */ - function handle($object) + function run() { + if (!$this->start()) { + $this->log(LOG_WARNING, 'failed to start'); + return false; + } + + $this->log(LOG_INFO, 'checking for queued notices'); + + $queue = $this->transport(); + $timeout = $this->timeout(); + + $qm = QueueManager::get(); + + $qm->service($queue, $this); + + $this->log(LOG_INFO, 'finished servicing the queue'); + + if (!$this->finish()) { + $this->log(LOG_WARNING, 'failed to clean up'); + return false; + } + + $this->log(LOG_INFO, 'terminating normally'); + return true; } + + + function log($level, $msg) + { + common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); + } } diff --git a/lib/smsqueuehandler.php b/lib/smsqueuehandler.php index 6085d2b4a..48a96409d 100644 --- a/lib/smsqueuehandler.php +++ b/lib/smsqueuehandler.php @@ -31,7 +31,7 @@ class SmsQueueHandler extends QueueHandler return 'sms'; } - function handle($notice) + function handle_notice($notice) { require_once(INSTALLDIR.'/lib/mail.php'); return mail_broadcast_notice_sms($notice); diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 6496b5cf1..00590fdb6 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -125,25 +125,28 @@ class StompQueueManager extends QueueManager } /** - * Saves an object into the queue item table. + * Saves a notice object reference into the queue item table. * @return boolean true on success */ public function enqueue($object, $queue) { - $msg = serialize($object); + $notice = $object; $this->_connect(); + // XXX: serialize and send entire notice + $result = $this->con->send($this->queueName($queue), - $msg, // BODY of the message - array ('created' => $timestamp)); + $notice->id, // BODY of the message + array ('created' => $notice->created)); if (!$result) { common_log(LOG_ERR, 'Error sending to '.$queue.' queue'); return false; } - common_log(LOG_DEBUG, "complete remote queueing $log for $queue"); + common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' + . $notice->id . ' for ' . $queue); $this->stats('enqueued', $queue); } @@ -171,7 +174,7 @@ class StompQueueManager extends QueueManager $ok = true; $frames = $this->con->readFrames(); foreach ($frames as $frame) { - $ok = $ok && $this->_handleItem($frame); + $ok = $ok && $this->_handleNotice($frame); } return $ok; } @@ -262,10 +265,10 @@ class StompQueueManager extends QueueManager } /** - * Handle and acknowledge an event that's come in through a queue. + * Handle and acknowledge a notice event that's come in through a queue. * * If the queue handler reports failure, the message is requeued for later. - * Missing objects or handler classes will drop the message. + * Missing notices or handler classes will drop the message. * * Side effects: in multi-site mode, may reset site configuration to * match the site that queued the event. @@ -273,15 +276,24 @@ class StompQueueManager extends QueueManager * @param StompFrame $frame * @return bool */ - protected function _handleItem($frame) + protected function _handleNotice($frame) { list($site, $queue) = $this->parseDestination($frame->headers['destination']); if ($site != common_config('site', 'server')) { $this->stats('switch'); StatusNet::init($site); } - $info = "object posted at {$frame->headers['created']} in queue $queue"; - $item = unserialize($frame->body); + + $id = intval($frame->body); + $info = "notice $id posted at {$frame->headers['created']} in queue $queue"; + + $notice = Notice::staticGet('id', $id); + if (empty($notice)) { + $this->_log(LOG_WARNING, "Skipping missing $info"); + $this->con->ack($frame); + $this->stats('badnotice', $queue); + return false; + } $handler = $this->getHandler($queue); if (!$handler) { @@ -291,7 +303,7 @@ class StompQueueManager extends QueueManager return false; } - $ok = $handler->handle($item); + $ok = $handler->handle_notice($notice); if (!$ok) { $this->_log(LOG_WARNING, "Failed handling $info"); @@ -299,7 +311,7 @@ class StompQueueManager extends QueueManager // this kind of queue management ourselves; // if we don't ack, it should resend... $this->con->ack($frame); - $this->enqueue($item, $queue); + $this->enqueue($notice, $queue); $this->stats('requeued', $queue); return false; } diff --git a/lib/xmppmanager.php b/lib/xmppmanager.php index c49986854..dfff63a30 100644 --- a/lib/xmppmanager.php +++ b/lib/xmppmanager.php @@ -176,30 +176,6 @@ class XmppManager extends IoManager } /** - * For queue handlers to pass us a message to push out, - * if we're active. - * - * @fixme should this be blocking etc? - * - * @param string $msg XML stanza to send - * @return boolean success - */ - public function send($msg) - { - if ($this->conn && !$this->conn->isDisconnected()) { - $bytes = $this->conn->send($msg); - if ($bytes > 0) { - return true; - } else { - return false; - } - } else { - // Can't send right now... - return false; - } - } - - /** * Send a keepalive ping to the XMPP server. */ protected function sendPing() |