diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/dbqueuemanager.php | 95 | ||||
-rw-r--r-- | lib/jabberqueuehandler.php | 4 | ||||
-rw-r--r-- | lib/ombqueuehandler.php | 2 | ||||
-rw-r--r-- | lib/pingqueuehandler.php | 2 | ||||
-rw-r--r-- | lib/pluginqueuehandler.php | 2 | ||||
-rw-r--r-- | lib/publicqueuehandler.php | 6 | ||||
-rw-r--r-- | lib/queuehandler.php | 95 | ||||
-rw-r--r-- | lib/smsqueuehandler.php | 2 | ||||
-rw-r--r-- | lib/stompqueuemanager.php | 38 | ||||
-rw-r--r-- | lib/xmppmanager.php | 24 |
10 files changed, 86 insertions, 184 deletions
diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index 889365b64..139f50234 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -31,19 +31,17 @@ class DBQueueManager extends QueueManager { /** - * Saves a notice object reference into the queue item table. + * Saves an object into the queue item table. * @return boolean true on success * @throws ServerException on failure */ public function enqueue($object, $queue) { - $notice = $object; - $qi = new Queue_item(); - $qi->notice_id = $notice->id; + $qi->frame = serialize($object); $qi->transport = $queue; - $qi->created = $notice->created; + $qi->created = common_sql_now(); $result = $qi->insert(); if (!$result) { @@ -73,34 +71,35 @@ class DBQueueManager extends QueueManager */ public function poll() { - $this->_log(LOG_DEBUG, 'Checking for notices...'); - $item = $this->_nextItem(); - if ($item === false) { - $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); + $this->_log(LOG_DEBUG, 'Checking for queued objects...'); + $qi = $this->_nextItem(); + if ($qi === false) { + $this->_log(LOG_DEBUG, 'No queue items waiting; idling.'); return false; } - if ($item === true) { - // We dequeued an entry for a deleted or invalid notice. + if ($qi === true) { + // We dequeued an entry for a deleted or invalid object. // Consider it a hit for poll rate purposes. return true; } - list($queue, $notice) = $item; - $this->_log(LOG_INFO, 'Got notice '. $notice->id . ' for transport ' . $queue); + $queue = $qi->transport; + $object = unserialize($qi->frame); + $this->_log(LOG_INFO, 'Got item id=' . $qi->id . ' for transport ' . $queue); // Yay! Got one! $handler = $this->getHandler($queue); if ($handler) { - if ($handler->handle_notice($notice)) { - $this->_log(LOG_INFO, "[$queue:notice $notice->id] Successfully handled notice"); - $this->_done($notice, $queue); + if ($handler->handle($object)) { + $this->_log(LOG_INFO, "[$queue] Successfully handled object"); + $this->_done($qi); } else { - $this->_log(LOG_INFO, "[$queue:notice $notice->id] Failed to handle notice"); - $this->_fail($notice, $queue); + $this->_log(LOG_INFO, "[$queue] Failed to handle object"); + $this->_fail($qi); } } else { - $this->_log(LOG_INFO, "[$queue:notice $notice->id] No handler for queue $queue; discarding."); - $this->_done($notice, $queue); + $this->_log(LOG_INFO, "[$queue] No handler for queue $queue; discarding."); + $this->_done($qi); } return true; } @@ -108,8 +107,7 @@ class DBQueueManager extends QueueManager /** * Pop the oldest unclaimed item off the queue set and claim it. * - * @return mixed false if no items; true if bogus hit; otherwise array(string, Notice) - * giving the queue transport name. + * @return mixed false if no items; true if bogus hit; otherwise Queue_item */ protected function _nextItem() { @@ -121,70 +119,42 @@ class DBQueueManager extends QueueManager return false; } - $queue = $qi->transport; - $notice = Notice::staticGet('id', $qi->notice_id); - if (empty($notice)) { - $this->_log(LOG_INFO, "[$queue:notice $notice->id] dequeued non-existent notice"); - $qi->delete(); - return true; - } - - $result = $notice; - return array($queue, $notice); + return $qi; } /** * Delete our claimed item from the queue after successful processing. * - * @param Notice $object - * @param string $queue + * @param QueueItem $qi */ - protected function _done($object, $queue) + protected function _done($qi) { - // XXX: right now, we only handle notices - - $notice = $object; - - $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, - 'transport' => $queue)); - if (empty($qi)) { - $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item"); + $this->_log(LOG_INFO, "_done passed an empty queue item"); } else { if (empty($qi->claimed)) { - $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Reluctantly releasing unclaimed queue item"); + $this->_log(LOG_WARNING, "Reluctantly releasing unclaimed queue item"); } $qi->delete(); $qi->free(); } - $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with item"); - $this->stats('handled', $queue); - - $notice->free(); + $this->_log(LOG_INFO, "done with item"); } /** * Free our claimed queue item for later reprocessing in case of * temporary failure. * - * @param Notice $object - * @param string $queue + * @param QueueItem $qi */ - protected function _fail($object, $queue) + protected function _fail($qi) { - // XXX: right now, we only handle notices - - $notice = $object; - - $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, - 'transport' => $queue)); - if (empty($qi)) { - $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item"); + $this->_log(LOG_INFO, "_fail passed an empty queue item"); } else { if (empty($qi->claimed)) { - $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Ignoring failure for unclaimed queue item"); + $this->_log(LOG_WARNING, "Ignoring failure for unclaimed queue item"); } else { $orig = clone($qi); $qi->claimed = null; @@ -193,10 +163,7 @@ class DBQueueManager extends QueueManager } } - $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with queue item"); - $this->stats('error', $queue); - - $notice->free(); + $this->_log(LOG_INFO, "done with queue item"); } protected function _log($level, $msg) diff --git a/lib/jabberqueuehandler.php b/lib/jabberqueuehandler.php index b1518866d..83471f2df 100644 --- a/lib/jabberqueuehandler.php +++ b/lib/jabberqueuehandler.php @@ -34,14 +34,14 @@ class JabberQueueHandler extends QueueHandler return 'jabber'; } - function handle_notice($notice) + function handle($notice) { require_once(INSTALLDIR.'/lib/jabber.php'); try { return jabber_broadcast_notice($notice); } catch (XMPPHP_Exception $e) { $this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); - exit(1); + return false; } } } diff --git a/lib/ombqueuehandler.php b/lib/ombqueuehandler.php index 3ffc1313b..24896c784 100644 --- a/lib/ombqueuehandler.php +++ b/lib/ombqueuehandler.php @@ -36,7 +36,7 @@ class OmbQueueHandler extends QueueHandler * @fixme doesn't currently report failure back to the queue manager * because omb_broadcast_notice() doesn't report it to us */ - function handle_notice($notice) + function handle($notice) { if ($this->is_remote($notice)) { $this->log(LOG_DEBUG, 'Ignoring remote notice ' . $notice->id); diff --git a/lib/pingqueuehandler.php b/lib/pingqueuehandler.php index 8bb218078..4e4d74cb1 100644 --- a/lib/pingqueuehandler.php +++ b/lib/pingqueuehandler.php @@ -30,7 +30,7 @@ class PingQueueHandler extends QueueHandler { return 'ping'; } - function handle_notice($notice) { + function handle($notice) { require_once INSTALLDIR . '/lib/ping.php'; return ping_broadcast_notice($notice); } diff --git a/lib/pluginqueuehandler.php b/lib/pluginqueuehandler.php index 24d504699..9653ccad4 100644 --- a/lib/pluginqueuehandler.php +++ b/lib/pluginqueuehandler.php @@ -42,7 +42,7 @@ class PluginQueueHandler extends QueueHandler return 'plugin'; } - function handle_notice($notice) + function handle($notice) { Event::handle('HandleQueuedNotice', array(&$notice)); return true; diff --git a/lib/publicqueuehandler.php b/lib/publicqueuehandler.php index 9ea9ee73a..c9edb8d5d 100644 --- a/lib/publicqueuehandler.php +++ b/lib/publicqueuehandler.php @@ -23,7 +23,6 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { /** * Queue handler for pushing new notices to public XMPP subscribers. - * @fixme correct this exception handling */ class PublicQueueHandler extends QueueHandler { @@ -33,15 +32,14 @@ class PublicQueueHandler extends QueueHandler return 'public'; } - function handle_notice($notice) + function handle($notice) { require_once(INSTALLDIR.'/lib/jabber.php'); try { return jabber_public_notice($notice); } catch (XMPPHP_Exception $e) { $this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); - die($e->getMessage()); + return false; } - return true; } } diff --git a/lib/queuehandler.php b/lib/queuehandler.php index 613be6e33..2909cd83b 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -22,51 +22,20 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } /** * Base class for queue handlers. * - * As extensions of the Daemon class, each queue handler has the ability - * to launch itself in the background, at which point it'll pass control - * to the configured QueueManager class to poll for updates. + * As of 0.9, queue handlers are short-lived for items as they are + * dequeued by a QueueManager running in an IoMaster in a daemon + * such as queuedaemon.php. + * + * Extensions requiring long-running maintenance or polling should + * register an IoManager. * * Subclasses must override at least the following methods: * - transport - * - handle_notice + * - handle */ -#class QueueHandler extends Daemon class QueueHandler { -# function __construct($id=null, $daemonize=true) -# { -# parent::__construct($daemonize); -# -# if ($id) { -# $this->set_id($id); -# } -# } - - /** - * How many seconds a polling-based queue manager should wait between - * checks for new items to handle. - * - * Defaults to 60 seconds; override to speed up or slow down. - * - * @fixme not really compatible with global queue manager - * @return int timeout in seconds - */ -# function timeout() -# { -# return 60; -# } - -# function class_name() -# { -# return ucfirst($this->transport()) . 'Handler'; -# } - -# function name() -# { -# return strtolower($this->class_name().'.'.$this->get_id()); -# } - /** * Return transport keyword which identifies items this queue handler * services; must be defined for all subclasses. @@ -83,61 +52,17 @@ class QueueHandler /** * Here's the meat of your queue handler -- you're handed a Notice - * object, which you may do as you will with. + * or other object, which you may do as you will with. * * If this function indicates failure, a warning will be logged * and the item is placed back in the queue to be re-run. * - * @param Notice $notice - * @return boolean true on success, false on failure - */ - function handle_notice($notice) - { - return true; - } - - /** - * Setup and start of run loop for this queue handler as a daemon. - * Most of the heavy lifting is passed on to the QueueManager's service() - * method, which passes control back to our handle_notice() method for - * each notice that comes in on the queue. - * - * Most of the time this won't need to be overridden in a subclass. - * + * @param mixed $object * @return boolean true on success, false on failure */ - function run() + function handle($object) { - if (!$this->start()) { - $this->log(LOG_WARNING, 'failed to start'); - return false; - } - - $this->log(LOG_INFO, 'checking for queued notices'); - - $queue = $this->transport(); - $timeout = $this->timeout(); - - $qm = QueueManager::get(); - - $qm->service($queue, $this); - - $this->log(LOG_INFO, 'finished servicing the queue'); - - if (!$this->finish()) { - $this->log(LOG_WARNING, 'failed to clean up'); - return false; - } - - $this->log(LOG_INFO, 'terminating normally'); - return true; } - - - function log($level, $msg) - { - common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); - } } diff --git a/lib/smsqueuehandler.php b/lib/smsqueuehandler.php index 48a96409d..6085d2b4a 100644 --- a/lib/smsqueuehandler.php +++ b/lib/smsqueuehandler.php @@ -31,7 +31,7 @@ class SmsQueueHandler extends QueueHandler return 'sms'; } - function handle_notice($notice) + function handle($notice) { require_once(INSTALLDIR.'/lib/mail.php'); return mail_broadcast_notice_sms($notice); diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 00590fdb6..6496b5cf1 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -125,28 +125,25 @@ class StompQueueManager extends QueueManager } /** - * Saves a notice object reference into the queue item table. + * Saves an object into the queue item table. * @return boolean true on success */ public function enqueue($object, $queue) { - $notice = $object; + $msg = serialize($object); $this->_connect(); - // XXX: serialize and send entire notice - $result = $this->con->send($this->queueName($queue), - $notice->id, // BODY of the message - array ('created' => $notice->created)); + $msg, // BODY of the message + array ('created' => $timestamp)); if (!$result) { common_log(LOG_ERR, 'Error sending to '.$queue.' queue'); return false; } - common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' - . $notice->id . ' for ' . $queue); + common_log(LOG_DEBUG, "complete remote queueing $log for $queue"); $this->stats('enqueued', $queue); } @@ -174,7 +171,7 @@ class StompQueueManager extends QueueManager $ok = true; $frames = $this->con->readFrames(); foreach ($frames as $frame) { - $ok = $ok && $this->_handleNotice($frame); + $ok = $ok && $this->_handleItem($frame); } return $ok; } @@ -265,10 +262,10 @@ class StompQueueManager extends QueueManager } /** - * Handle and acknowledge a notice event that's come in through a queue. + * Handle and acknowledge an event that's come in through a queue. * * If the queue handler reports failure, the message is requeued for later. - * Missing notices or handler classes will drop the message. + * Missing objects or handler classes will drop the message. * * Side effects: in multi-site mode, may reset site configuration to * match the site that queued the event. @@ -276,24 +273,15 @@ class StompQueueManager extends QueueManager * @param StompFrame $frame * @return bool */ - protected function _handleNotice($frame) + protected function _handleItem($frame) { list($site, $queue) = $this->parseDestination($frame->headers['destination']); if ($site != common_config('site', 'server')) { $this->stats('switch'); StatusNet::init($site); } - - $id = intval($frame->body); - $info = "notice $id posted at {$frame->headers['created']} in queue $queue"; - - $notice = Notice::staticGet('id', $id); - if (empty($notice)) { - $this->_log(LOG_WARNING, "Skipping missing $info"); - $this->con->ack($frame); - $this->stats('badnotice', $queue); - return false; - } + $info = "object posted at {$frame->headers['created']} in queue $queue"; + $item = unserialize($frame->body); $handler = $this->getHandler($queue); if (!$handler) { @@ -303,7 +291,7 @@ class StompQueueManager extends QueueManager return false; } - $ok = $handler->handle_notice($notice); + $ok = $handler->handle($item); if (!$ok) { $this->_log(LOG_WARNING, "Failed handling $info"); @@ -311,7 +299,7 @@ class StompQueueManager extends QueueManager // this kind of queue management ourselves; // if we don't ack, it should resend... $this->con->ack($frame); - $this->enqueue($notice, $queue); + $this->enqueue($item, $queue); $this->stats('requeued', $queue); return false; } diff --git a/lib/xmppmanager.php b/lib/xmppmanager.php index dfff63a30..c49986854 100644 --- a/lib/xmppmanager.php +++ b/lib/xmppmanager.php @@ -176,6 +176,30 @@ class XmppManager extends IoManager } /** + * For queue handlers to pass us a message to push out, + * if we're active. + * + * @fixme should this be blocking etc? + * + * @param string $msg XML stanza to send + * @return boolean success + */ + public function send($msg) + { + if ($this->conn && !$this->conn->isDisconnected()) { + $bytes = $this->conn->send($msg); + if ($bytes > 0) { + return true; + } else { + return false; + } + } else { + // Can't send right now... + return false; + } + } + + /** * Send a keepalive ping to the XMPP server. */ protected function sendPing() |