summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/dbqueuemanager.php95
-rw-r--r--lib/jabberqueuehandler.php4
-rw-r--r--lib/ombqueuehandler.php2
-rw-r--r--lib/pingqueuehandler.php2
-rw-r--r--lib/pluginqueuehandler.php2
-rw-r--r--lib/publicqueuehandler.php6
-rw-r--r--lib/queuehandler.php95
-rw-r--r--lib/smsqueuehandler.php2
-rw-r--r--lib/stompqueuemanager.php38
-rw-r--r--lib/xmppmanager.php24
10 files changed, 86 insertions, 184 deletions
diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php
index 889365b64..139f50234 100644
--- a/lib/dbqueuemanager.php
+++ b/lib/dbqueuemanager.php
@@ -31,19 +31,17 @@
class DBQueueManager extends QueueManager
{
/**
- * Saves a notice object reference into the queue item table.
+ * Saves an object into the queue item table.
* @return boolean true on success
* @throws ServerException on failure
*/
public function enqueue($object, $queue)
{
- $notice = $object;
-
$qi = new Queue_item();
- $qi->notice_id = $notice->id;
+ $qi->frame = serialize($object);
$qi->transport = $queue;
- $qi->created = $notice->created;
+ $qi->created = common_sql_now();
$result = $qi->insert();
if (!$result) {
@@ -73,34 +71,35 @@ class DBQueueManager extends QueueManager
*/
public function poll()
{
- $this->_log(LOG_DEBUG, 'Checking for notices...');
- $item = $this->_nextItem();
- if ($item === false) {
- $this->_log(LOG_DEBUG, 'No notices waiting; idling.');
+ $this->_log(LOG_DEBUG, 'Checking for queued objects...');
+ $qi = $this->_nextItem();
+ if ($qi === false) {
+ $this->_log(LOG_DEBUG, 'No queue items waiting; idling.');
return false;
}
- if ($item === true) {
- // We dequeued an entry for a deleted or invalid notice.
+ if ($qi === true) {
+ // We dequeued an entry for a deleted or invalid object.
// Consider it a hit for poll rate purposes.
return true;
}
- list($queue, $notice) = $item;
- $this->_log(LOG_INFO, 'Got notice '. $notice->id . ' for transport ' . $queue);
+ $queue = $qi->transport;
+ $object = unserialize($qi->frame);
+ $this->_log(LOG_INFO, 'Got item id=' . $qi->id . ' for transport ' . $queue);
// Yay! Got one!
$handler = $this->getHandler($queue);
if ($handler) {
- if ($handler->handle_notice($notice)) {
- $this->_log(LOG_INFO, "[$queue:notice $notice->id] Successfully handled notice");
- $this->_done($notice, $queue);
+ if ($handler->handle($object)) {
+ $this->_log(LOG_INFO, "[$queue] Successfully handled object");
+ $this->_done($qi);
} else {
- $this->_log(LOG_INFO, "[$queue:notice $notice->id] Failed to handle notice");
- $this->_fail($notice, $queue);
+ $this->_log(LOG_INFO, "[$queue] Failed to handle object");
+ $this->_fail($qi);
}
} else {
- $this->_log(LOG_INFO, "[$queue:notice $notice->id] No handler for queue $queue; discarding.");
- $this->_done($notice, $queue);
+ $this->_log(LOG_INFO, "[$queue] No handler for queue $queue; discarding.");
+ $this->_done($qi);
}
return true;
}
@@ -108,8 +107,7 @@ class DBQueueManager extends QueueManager
/**
* Pop the oldest unclaimed item off the queue set and claim it.
*
- * @return mixed false if no items; true if bogus hit; otherwise array(string, Notice)
- * giving the queue transport name.
+ * @return mixed false if no items; true if bogus hit; otherwise Queue_item
*/
protected function _nextItem()
{
@@ -121,70 +119,42 @@ class DBQueueManager extends QueueManager
return false;
}
- $queue = $qi->transport;
- $notice = Notice::staticGet('id', $qi->notice_id);
- if (empty($notice)) {
- $this->_log(LOG_INFO, "[$queue:notice $notice->id] dequeued non-existent notice");
- $qi->delete();
- return true;
- }
-
- $result = $notice;
- return array($queue, $notice);
+ return $qi;
}
/**
* Delete our claimed item from the queue after successful processing.
*
- * @param Notice $object
- * @param string $queue
+ * @param QueueItem $qi
*/
- protected function _done($object, $queue)
+ protected function _done($qi)
{
- // XXX: right now, we only handle notices
-
- $notice = $object;
-
- $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
- 'transport' => $queue));
-
if (empty($qi)) {
- $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item");
+ $this->_log(LOG_INFO, "_done passed an empty queue item");
} else {
if (empty($qi->claimed)) {
- $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Reluctantly releasing unclaimed queue item");
+ $this->_log(LOG_WARNING, "Reluctantly releasing unclaimed queue item");
}
$qi->delete();
$qi->free();
}
- $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with item");
- $this->stats('handled', $queue);
-
- $notice->free();
+ $this->_log(LOG_INFO, "done with item");
}
/**
* Free our claimed queue item for later reprocessing in case of
* temporary failure.
*
- * @param Notice $object
- * @param string $queue
+ * @param QueueItem $qi
*/
- protected function _fail($object, $queue)
+ protected function _fail($qi)
{
- // XXX: right now, we only handle notices
-
- $notice = $object;
-
- $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
- 'transport' => $queue));
-
if (empty($qi)) {
- $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item");
+ $this->_log(LOG_INFO, "_fail passed an empty queue item");
} else {
if (empty($qi->claimed)) {
- $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Ignoring failure for unclaimed queue item");
+ $this->_log(LOG_WARNING, "Ignoring failure for unclaimed queue item");
} else {
$orig = clone($qi);
$qi->claimed = null;
@@ -193,10 +163,7 @@ class DBQueueManager extends QueueManager
}
}
- $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with queue item");
- $this->stats('error', $queue);
-
- $notice->free();
+ $this->_log(LOG_INFO, "done with queue item");
}
protected function _log($level, $msg)
diff --git a/lib/jabberqueuehandler.php b/lib/jabberqueuehandler.php
index b1518866d..83471f2df 100644
--- a/lib/jabberqueuehandler.php
+++ b/lib/jabberqueuehandler.php
@@ -34,14 +34,14 @@ class JabberQueueHandler extends QueueHandler
return 'jabber';
}
- function handle_notice($notice)
+ function handle($notice)
{
require_once(INSTALLDIR.'/lib/jabber.php');
try {
return jabber_broadcast_notice($notice);
} catch (XMPPHP_Exception $e) {
$this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
- exit(1);
+ return false;
}
}
}
diff --git a/lib/ombqueuehandler.php b/lib/ombqueuehandler.php
index 3ffc1313b..24896c784 100644
--- a/lib/ombqueuehandler.php
+++ b/lib/ombqueuehandler.php
@@ -36,7 +36,7 @@ class OmbQueueHandler extends QueueHandler
* @fixme doesn't currently report failure back to the queue manager
* because omb_broadcast_notice() doesn't report it to us
*/
- function handle_notice($notice)
+ function handle($notice)
{
if ($this->is_remote($notice)) {
$this->log(LOG_DEBUG, 'Ignoring remote notice ' . $notice->id);
diff --git a/lib/pingqueuehandler.php b/lib/pingqueuehandler.php
index 8bb218078..4e4d74cb1 100644
--- a/lib/pingqueuehandler.php
+++ b/lib/pingqueuehandler.php
@@ -30,7 +30,7 @@ class PingQueueHandler extends QueueHandler {
return 'ping';
}
- function handle_notice($notice) {
+ function handle($notice) {
require_once INSTALLDIR . '/lib/ping.php';
return ping_broadcast_notice($notice);
}
diff --git a/lib/pluginqueuehandler.php b/lib/pluginqueuehandler.php
index 24d504699..9653ccad4 100644
--- a/lib/pluginqueuehandler.php
+++ b/lib/pluginqueuehandler.php
@@ -42,7 +42,7 @@ class PluginQueueHandler extends QueueHandler
return 'plugin';
}
- function handle_notice($notice)
+ function handle($notice)
{
Event::handle('HandleQueuedNotice', array(&$notice));
return true;
diff --git a/lib/publicqueuehandler.php b/lib/publicqueuehandler.php
index 9ea9ee73a..c9edb8d5d 100644
--- a/lib/publicqueuehandler.php
+++ b/lib/publicqueuehandler.php
@@ -23,7 +23,6 @@ if (!defined('STATUSNET') && !defined('LACONICA')) {
/**
* Queue handler for pushing new notices to public XMPP subscribers.
- * @fixme correct this exception handling
*/
class PublicQueueHandler extends QueueHandler
{
@@ -33,15 +32,14 @@ class PublicQueueHandler extends QueueHandler
return 'public';
}
- function handle_notice($notice)
+ function handle($notice)
{
require_once(INSTALLDIR.'/lib/jabber.php');
try {
return jabber_public_notice($notice);
} catch (XMPPHP_Exception $e) {
$this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
- die($e->getMessage());
+ return false;
}
- return true;
}
}
diff --git a/lib/queuehandler.php b/lib/queuehandler.php
index 613be6e33..2909cd83b 100644
--- a/lib/queuehandler.php
+++ b/lib/queuehandler.php
@@ -22,51 +22,20 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); }
/**
* Base class for queue handlers.
*
- * As extensions of the Daemon class, each queue handler has the ability
- * to launch itself in the background, at which point it'll pass control
- * to the configured QueueManager class to poll for updates.
+ * As of 0.9, queue handlers are short-lived for items as they are
+ * dequeued by a QueueManager running in an IoMaster in a daemon
+ * such as queuedaemon.php.
+ *
+ * Extensions requiring long-running maintenance or polling should
+ * register an IoManager.
*
* Subclasses must override at least the following methods:
* - transport
- * - handle_notice
+ * - handle
*/
-#class QueueHandler extends Daemon
class QueueHandler
{
-# function __construct($id=null, $daemonize=true)
-# {
-# parent::__construct($daemonize);
-#
-# if ($id) {
-# $this->set_id($id);
-# }
-# }
-
- /**
- * How many seconds a polling-based queue manager should wait between
- * checks for new items to handle.
- *
- * Defaults to 60 seconds; override to speed up or slow down.
- *
- * @fixme not really compatible with global queue manager
- * @return int timeout in seconds
- */
-# function timeout()
-# {
-# return 60;
-# }
-
-# function class_name()
-# {
-# return ucfirst($this->transport()) . 'Handler';
-# }
-
-# function name()
-# {
-# return strtolower($this->class_name().'.'.$this->get_id());
-# }
-
/**
* Return transport keyword which identifies items this queue handler
* services; must be defined for all subclasses.
@@ -83,61 +52,17 @@ class QueueHandler
/**
* Here's the meat of your queue handler -- you're handed a Notice
- * object, which you may do as you will with.
+ * or other object, which you may do as you will with.
*
* If this function indicates failure, a warning will be logged
* and the item is placed back in the queue to be re-run.
*
- * @param Notice $notice
- * @return boolean true on success, false on failure
- */
- function handle_notice($notice)
- {
- return true;
- }
-
- /**
- * Setup and start of run loop for this queue handler as a daemon.
- * Most of the heavy lifting is passed on to the QueueManager's service()
- * method, which passes control back to our handle_notice() method for
- * each notice that comes in on the queue.
- *
- * Most of the time this won't need to be overridden in a subclass.
- *
+ * @param mixed $object
* @return boolean true on success, false on failure
*/
- function run()
+ function handle($object)
{
- if (!$this->start()) {
- $this->log(LOG_WARNING, 'failed to start');
- return false;
- }
-
- $this->log(LOG_INFO, 'checking for queued notices');
-
- $queue = $this->transport();
- $timeout = $this->timeout();
-
- $qm = QueueManager::get();
-
- $qm->service($queue, $this);
-
- $this->log(LOG_INFO, 'finished servicing the queue');
-
- if (!$this->finish()) {
- $this->log(LOG_WARNING, 'failed to clean up');
- return false;
- }
-
- $this->log(LOG_INFO, 'terminating normally');
-
return true;
}
-
-
- function log($level, $msg)
- {
- common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
- }
}
diff --git a/lib/smsqueuehandler.php b/lib/smsqueuehandler.php
index 48a96409d..6085d2b4a 100644
--- a/lib/smsqueuehandler.php
+++ b/lib/smsqueuehandler.php
@@ -31,7 +31,7 @@ class SmsQueueHandler extends QueueHandler
return 'sms';
}
- function handle_notice($notice)
+ function handle($notice)
{
require_once(INSTALLDIR.'/lib/mail.php');
return mail_broadcast_notice_sms($notice);
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index 00590fdb6..6496b5cf1 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -125,28 +125,25 @@ class StompQueueManager extends QueueManager
}
/**
- * Saves a notice object reference into the queue item table.
+ * Saves an object into the queue item table.
* @return boolean true on success
*/
public function enqueue($object, $queue)
{
- $notice = $object;
+ $msg = serialize($object);
$this->_connect();
- // XXX: serialize and send entire notice
-
$result = $this->con->send($this->queueName($queue),
- $notice->id, // BODY of the message
- array ('created' => $notice->created));
+ $msg, // BODY of the message
+ array ('created' => $timestamp));
if (!$result) {
common_log(LOG_ERR, 'Error sending to '.$queue.' queue');
return false;
}
- common_log(LOG_DEBUG, 'complete remote queueing notice ID = '
- . $notice->id . ' for ' . $queue);
+ common_log(LOG_DEBUG, "complete remote queueing $log for $queue");
$this->stats('enqueued', $queue);
}
@@ -174,7 +171,7 @@ class StompQueueManager extends QueueManager
$ok = true;
$frames = $this->con->readFrames();
foreach ($frames as $frame) {
- $ok = $ok && $this->_handleNotice($frame);
+ $ok = $ok && $this->_handleItem($frame);
}
return $ok;
}
@@ -265,10 +262,10 @@ class StompQueueManager extends QueueManager
}
/**
- * Handle and acknowledge a notice event that's come in through a queue.
+ * Handle and acknowledge an event that's come in through a queue.
*
* If the queue handler reports failure, the message is requeued for later.
- * Missing notices or handler classes will drop the message.
+ * Missing objects or handler classes will drop the message.
*
* Side effects: in multi-site mode, may reset site configuration to
* match the site that queued the event.
@@ -276,24 +273,15 @@ class StompQueueManager extends QueueManager
* @param StompFrame $frame
* @return bool
*/
- protected function _handleNotice($frame)
+ protected function _handleItem($frame)
{
list($site, $queue) = $this->parseDestination($frame->headers['destination']);
if ($site != common_config('site', 'server')) {
$this->stats('switch');
StatusNet::init($site);
}
-
- $id = intval($frame->body);
- $info = "notice $id posted at {$frame->headers['created']} in queue $queue";
-
- $notice = Notice::staticGet('id', $id);
- if (empty($notice)) {
- $this->_log(LOG_WARNING, "Skipping missing $info");
- $this->con->ack($frame);
- $this->stats('badnotice', $queue);
- return false;
- }
+ $info = "object posted at {$frame->headers['created']} in queue $queue";
+ $item = unserialize($frame->body);
$handler = $this->getHandler($queue);
if (!$handler) {
@@ -303,7 +291,7 @@ class StompQueueManager extends QueueManager
return false;
}
- $ok = $handler->handle_notice($notice);
+ $ok = $handler->handle($item);
if (!$ok) {
$this->_log(LOG_WARNING, "Failed handling $info");
@@ -311,7 +299,7 @@ class StompQueueManager extends QueueManager
// this kind of queue management ourselves;
// if we don't ack, it should resend...
$this->con->ack($frame);
- $this->enqueue($notice, $queue);
+ $this->enqueue($item, $queue);
$this->stats('requeued', $queue);
return false;
}
diff --git a/lib/xmppmanager.php b/lib/xmppmanager.php
index dfff63a30..c49986854 100644
--- a/lib/xmppmanager.php
+++ b/lib/xmppmanager.php
@@ -176,6 +176,30 @@ class XmppManager extends IoManager
}
/**
+ * For queue handlers to pass us a message to push out,
+ * if we're active.
+ *
+ * @fixme should this be blocking etc?
+ *
+ * @param string $msg XML stanza to send
+ * @return boolean success
+ */
+ public function send($msg)
+ {
+ if ($this->conn && !$this->conn->isDisconnected()) {
+ $bytes = $this->conn->send($msg);
+ if ($bytes > 0) {
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ // Can't send right now...
+ return false;
+ }
+ }
+
+ /**
* Send a keepalive ping to the XMPP server.
*/
protected function sendPing()