summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/dbqueuemanager.php95
-rw-r--r--lib/jabberqueuehandler.php4
-rw-r--r--lib/ombqueuehandler.php2
-rw-r--r--lib/pingqueuehandler.php2
-rw-r--r--lib/pluginqueuehandler.php2
-rw-r--r--lib/publicqueuehandler.php6
-rw-r--r--lib/queuehandler.php95
-rw-r--r--lib/smsqueuehandler.php2
-rw-r--r--lib/stompqueuemanager.php38
-rw-r--r--lib/xmppmanager.php24
10 files changed, 184 insertions, 86 deletions
diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php
index 139f50234..889365b64 100644
--- a/lib/dbqueuemanager.php
+++ b/lib/dbqueuemanager.php
@@ -31,17 +31,19 @@
class DBQueueManager extends QueueManager
{
/**
- * Saves an object into the queue item table.
+ * Saves a notice object reference into the queue item table.
* @return boolean true on success
* @throws ServerException on failure
*/
public function enqueue($object, $queue)
{
+ $notice = $object;
+
$qi = new Queue_item();
- $qi->frame = serialize($object);
+ $qi->notice_id = $notice->id;
$qi->transport = $queue;
- $qi->created = common_sql_now();
+ $qi->created = $notice->created;
$result = $qi->insert();
if (!$result) {
@@ -71,35 +73,34 @@ class DBQueueManager extends QueueManager
*/
public function poll()
{
- $this->_log(LOG_DEBUG, 'Checking for queued objects...');
- $qi = $this->_nextItem();
- if ($qi === false) {
- $this->_log(LOG_DEBUG, 'No queue items waiting; idling.');
+ $this->_log(LOG_DEBUG, 'Checking for notices...');
+ $item = $this->_nextItem();
+ if ($item === false) {
+ $this->_log(LOG_DEBUG, 'No notices waiting; idling.');
return false;
}
- if ($qi === true) {
- // We dequeued an entry for a deleted or invalid object.
+ if ($item === true) {
+ // We dequeued an entry for a deleted or invalid notice.
// Consider it a hit for poll rate purposes.
return true;
}
- $queue = $qi->transport;
- $object = unserialize($qi->frame);
- $this->_log(LOG_INFO, 'Got item id=' . $qi->id . ' for transport ' . $queue);
+ list($queue, $notice) = $item;
+ $this->_log(LOG_INFO, 'Got notice '. $notice->id . ' for transport ' . $queue);
// Yay! Got one!
$handler = $this->getHandler($queue);
if ($handler) {
- if ($handler->handle($object)) {
- $this->_log(LOG_INFO, "[$queue] Successfully handled object");
- $this->_done($qi);
+ if ($handler->handle_notice($notice)) {
+ $this->_log(LOG_INFO, "[$queue:notice $notice->id] Successfully handled notice");
+ $this->_done($notice, $queue);
} else {
- $this->_log(LOG_INFO, "[$queue] Failed to handle object");
- $this->_fail($qi);
+ $this->_log(LOG_INFO, "[$queue:notice $notice->id] Failed to handle notice");
+ $this->_fail($notice, $queue);
}
} else {
- $this->_log(LOG_INFO, "[$queue] No handler for queue $queue; discarding.");
- $this->_done($qi);
+ $this->_log(LOG_INFO, "[$queue:notice $notice->id] No handler for queue $queue; discarding.");
+ $this->_done($notice, $queue);
}
return true;
}
@@ -107,7 +108,8 @@ class DBQueueManager extends QueueManager
/**
* Pop the oldest unclaimed item off the queue set and claim it.
*
- * @return mixed false if no items; true if bogus hit; otherwise Queue_item
+ * @return mixed false if no items; true if bogus hit; otherwise array(string, Notice)
+ * giving the queue transport name.
*/
protected function _nextItem()
{
@@ -119,42 +121,70 @@ class DBQueueManager extends QueueManager
return false;
}
- return $qi;
+ $queue = $qi->transport;
+ $notice = Notice::staticGet('id', $qi->notice_id);
+ if (empty($notice)) {
+ $this->_log(LOG_INFO, "[$queue:notice $notice->id] dequeued non-existent notice");
+ $qi->delete();
+ return true;
+ }
+
+ $result = $notice;
+ return array($queue, $notice);
}
/**
* Delete our claimed item from the queue after successful processing.
*
- * @param QueueItem $qi
+ * @param Notice $object
+ * @param string $queue
*/
- protected function _done($qi)
+ protected function _done($object, $queue)
{
+ // XXX: right now, we only handle notices
+
+ $notice = $object;
+
+ $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
+ 'transport' => $queue));
+
if (empty($qi)) {
- $this->_log(LOG_INFO, "_done passed an empty queue item");
+ $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item");
} else {
if (empty($qi->claimed)) {
- $this->_log(LOG_WARNING, "Reluctantly releasing unclaimed queue item");
+ $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Reluctantly releasing unclaimed queue item");
}
$qi->delete();
$qi->free();
}
- $this->_log(LOG_INFO, "done with item");
+ $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with item");
+ $this->stats('handled', $queue);
+
+ $notice->free();
}
/**
* Free our claimed queue item for later reprocessing in case of
* temporary failure.
*
- * @param QueueItem $qi
+ * @param Notice $object
+ * @param string $queue
*/
- protected function _fail($qi)
+ protected function _fail($object, $queue)
{
+ // XXX: right now, we only handle notices
+
+ $notice = $object;
+
+ $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
+ 'transport' => $queue));
+
if (empty($qi)) {
- $this->_log(LOG_INFO, "_fail passed an empty queue item");
+ $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item");
} else {
if (empty($qi->claimed)) {
- $this->_log(LOG_WARNING, "Ignoring failure for unclaimed queue item");
+ $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Ignoring failure for unclaimed queue item");
} else {
$orig = clone($qi);
$qi->claimed = null;
@@ -163,7 +193,10 @@ class DBQueueManager extends QueueManager
}
}
- $this->_log(LOG_INFO, "done with queue item");
+ $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with queue item");
+ $this->stats('error', $queue);
+
+ $notice->free();
}
protected function _log($level, $msg)
diff --git a/lib/jabberqueuehandler.php b/lib/jabberqueuehandler.php
index 83471f2df..b1518866d 100644
--- a/lib/jabberqueuehandler.php
+++ b/lib/jabberqueuehandler.php
@@ -34,14 +34,14 @@ class JabberQueueHandler extends QueueHandler
return 'jabber';
}
- function handle($notice)
+ function handle_notice($notice)
{
require_once(INSTALLDIR.'/lib/jabber.php');
try {
return jabber_broadcast_notice($notice);
} catch (XMPPHP_Exception $e) {
$this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
- return false;
+ exit(1);
}
}
}
diff --git a/lib/ombqueuehandler.php b/lib/ombqueuehandler.php
index 24896c784..3ffc1313b 100644
--- a/lib/ombqueuehandler.php
+++ b/lib/ombqueuehandler.php
@@ -36,7 +36,7 @@ class OmbQueueHandler extends QueueHandler
* @fixme doesn't currently report failure back to the queue manager
* because omb_broadcast_notice() doesn't report it to us
*/
- function handle($notice)
+ function handle_notice($notice)
{
if ($this->is_remote($notice)) {
$this->log(LOG_DEBUG, 'Ignoring remote notice ' . $notice->id);
diff --git a/lib/pingqueuehandler.php b/lib/pingqueuehandler.php
index 4e4d74cb1..8bb218078 100644
--- a/lib/pingqueuehandler.php
+++ b/lib/pingqueuehandler.php
@@ -30,7 +30,7 @@ class PingQueueHandler extends QueueHandler {
return 'ping';
}
- function handle($notice) {
+ function handle_notice($notice) {
require_once INSTALLDIR . '/lib/ping.php';
return ping_broadcast_notice($notice);
}
diff --git a/lib/pluginqueuehandler.php b/lib/pluginqueuehandler.php
index 9653ccad4..24d504699 100644
--- a/lib/pluginqueuehandler.php
+++ b/lib/pluginqueuehandler.php
@@ -42,7 +42,7 @@ class PluginQueueHandler extends QueueHandler
return 'plugin';
}
- function handle($notice)
+ function handle_notice($notice)
{
Event::handle('HandleQueuedNotice', array(&$notice));
return true;
diff --git a/lib/publicqueuehandler.php b/lib/publicqueuehandler.php
index c9edb8d5d..9ea9ee73a 100644
--- a/lib/publicqueuehandler.php
+++ b/lib/publicqueuehandler.php
@@ -23,6 +23,7 @@ if (!defined('STATUSNET') && !defined('LACONICA')) {
/**
* Queue handler for pushing new notices to public XMPP subscribers.
+ * @fixme correct this exception handling
*/
class PublicQueueHandler extends QueueHandler
{
@@ -32,14 +33,15 @@ class PublicQueueHandler extends QueueHandler
return 'public';
}
- function handle($notice)
+ function handle_notice($notice)
{
require_once(INSTALLDIR.'/lib/jabber.php');
try {
return jabber_public_notice($notice);
} catch (XMPPHP_Exception $e) {
$this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
- return false;
+ die($e->getMessage());
}
+ return true;
}
}
diff --git a/lib/queuehandler.php b/lib/queuehandler.php
index 2909cd83b..613be6e33 100644
--- a/lib/queuehandler.php
+++ b/lib/queuehandler.php
@@ -22,20 +22,51 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); }
/**
* Base class for queue handlers.
*
- * As of 0.9, queue handlers are short-lived for items as they are
- * dequeued by a QueueManager running in an IoMaster in a daemon
- * such as queuedaemon.php.
- *
- * Extensions requiring long-running maintenance or polling should
- * register an IoManager.
+ * As extensions of the Daemon class, each queue handler has the ability
+ * to launch itself in the background, at which point it'll pass control
+ * to the configured QueueManager class to poll for updates.
*
* Subclasses must override at least the following methods:
* - transport
- * - handle
+ * - handle_notice
*/
+#class QueueHandler extends Daemon
class QueueHandler
{
+# function __construct($id=null, $daemonize=true)
+# {
+# parent::__construct($daemonize);
+#
+# if ($id) {
+# $this->set_id($id);
+# }
+# }
+
+ /**
+ * How many seconds a polling-based queue manager should wait between
+ * checks for new items to handle.
+ *
+ * Defaults to 60 seconds; override to speed up or slow down.
+ *
+ * @fixme not really compatible with global queue manager
+ * @return int timeout in seconds
+ */
+# function timeout()
+# {
+# return 60;
+# }
+
+# function class_name()
+# {
+# return ucfirst($this->transport()) . 'Handler';
+# }
+
+# function name()
+# {
+# return strtolower($this->class_name().'.'.$this->get_id());
+# }
+
/**
* Return transport keyword which identifies items this queue handler
* services; must be defined for all subclasses.
@@ -52,17 +83,61 @@ class QueueHandler
/**
* Here's the meat of your queue handler -- you're handed a Notice
- * or other object, which you may do as you will with.
+ * object, which you may do as you will with.
*
* If this function indicates failure, a warning will be logged
* and the item is placed back in the queue to be re-run.
*
- * @param mixed $object
+ * @param Notice $notice
+ * @return boolean true on success, false on failure
+ */
+ function handle_notice($notice)
+ {
+ return true;
+ }
+
+ /**
+ * Setup and start of run loop for this queue handler as a daemon.
+ * Most of the heavy lifting is passed on to the QueueManager's service()
+ * method, which passes control back to our handle_notice() method for
+ * each notice that comes in on the queue.
+ *
+ * Most of the time this won't need to be overridden in a subclass.
+ *
* @return boolean true on success, false on failure
*/
- function handle($object)
+ function run()
{
+ if (!$this->start()) {
+ $this->log(LOG_WARNING, 'failed to start');
+ return false;
+ }
+
+ $this->log(LOG_INFO, 'checking for queued notices');
+
+ $queue = $this->transport();
+ $timeout = $this->timeout();
+
+ $qm = QueueManager::get();
+
+ $qm->service($queue, $this);
+
+ $this->log(LOG_INFO, 'finished servicing the queue');
+
+ if (!$this->finish()) {
+ $this->log(LOG_WARNING, 'failed to clean up');
+ return false;
+ }
+
+ $this->log(LOG_INFO, 'terminating normally');
+
return true;
}
+
+
+ function log($level, $msg)
+ {
+ common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
+ }
}
diff --git a/lib/smsqueuehandler.php b/lib/smsqueuehandler.php
index 6085d2b4a..48a96409d 100644
--- a/lib/smsqueuehandler.php
+++ b/lib/smsqueuehandler.php
@@ -31,7 +31,7 @@ class SmsQueueHandler extends QueueHandler
return 'sms';
}
- function handle($notice)
+ function handle_notice($notice)
{
require_once(INSTALLDIR.'/lib/mail.php');
return mail_broadcast_notice_sms($notice);
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index 6496b5cf1..00590fdb6 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -125,25 +125,28 @@ class StompQueueManager extends QueueManager
}
/**
- * Saves an object into the queue item table.
+ * Saves a notice object reference into the queue item table.
* @return boolean true on success
*/
public function enqueue($object, $queue)
{
- $msg = serialize($object);
+ $notice = $object;
$this->_connect();
+ // XXX: serialize and send entire notice
+
$result = $this->con->send($this->queueName($queue),
- $msg, // BODY of the message
- array ('created' => $timestamp));
+ $notice->id, // BODY of the message
+ array ('created' => $notice->created));
if (!$result) {
common_log(LOG_ERR, 'Error sending to '.$queue.' queue');
return false;
}
- common_log(LOG_DEBUG, "complete remote queueing $log for $queue");
+ common_log(LOG_DEBUG, 'complete remote queueing notice ID = '
+ . $notice->id . ' for ' . $queue);
$this->stats('enqueued', $queue);
}
@@ -171,7 +174,7 @@ class StompQueueManager extends QueueManager
$ok = true;
$frames = $this->con->readFrames();
foreach ($frames as $frame) {
- $ok = $ok && $this->_handleItem($frame);
+ $ok = $ok && $this->_handleNotice($frame);
}
return $ok;
}
@@ -262,10 +265,10 @@ class StompQueueManager extends QueueManager
}
/**
- * Handle and acknowledge an event that's come in through a queue.
+ * Handle and acknowledge a notice event that's come in through a queue.
*
* If the queue handler reports failure, the message is requeued for later.
- * Missing objects or handler classes will drop the message.
+ * Missing notices or handler classes will drop the message.
*
* Side effects: in multi-site mode, may reset site configuration to
* match the site that queued the event.
@@ -273,15 +276,24 @@ class StompQueueManager extends QueueManager
* @param StompFrame $frame
* @return bool
*/
- protected function _handleItem($frame)
+ protected function _handleNotice($frame)
{
list($site, $queue) = $this->parseDestination($frame->headers['destination']);
if ($site != common_config('site', 'server')) {
$this->stats('switch');
StatusNet::init($site);
}
- $info = "object posted at {$frame->headers['created']} in queue $queue";
- $item = unserialize($frame->body);
+
+ $id = intval($frame->body);
+ $info = "notice $id posted at {$frame->headers['created']} in queue $queue";
+
+ $notice = Notice::staticGet('id', $id);
+ if (empty($notice)) {
+ $this->_log(LOG_WARNING, "Skipping missing $info");
+ $this->con->ack($frame);
+ $this->stats('badnotice', $queue);
+ return false;
+ }
$handler = $this->getHandler($queue);
if (!$handler) {
@@ -291,7 +303,7 @@ class StompQueueManager extends QueueManager
return false;
}
- $ok = $handler->handle($item);
+ $ok = $handler->handle_notice($notice);
if (!$ok) {
$this->_log(LOG_WARNING, "Failed handling $info");
@@ -299,7 +311,7 @@ class StompQueueManager extends QueueManager
// this kind of queue management ourselves;
// if we don't ack, it should resend...
$this->con->ack($frame);
- $this->enqueue($item, $queue);
+ $this->enqueue($notice, $queue);
$this->stats('requeued', $queue);
return false;
}
diff --git a/lib/xmppmanager.php b/lib/xmppmanager.php
index c49986854..dfff63a30 100644
--- a/lib/xmppmanager.php
+++ b/lib/xmppmanager.php
@@ -176,30 +176,6 @@ class XmppManager extends IoManager
}
/**
- * For queue handlers to pass us a message to push out,
- * if we're active.
- *
- * @fixme should this be blocking etc?
- *
- * @param string $msg XML stanza to send
- * @return boolean success
- */
- public function send($msg)
- {
- if ($this->conn && !$this->conn->isDisconnected()) {
- $bytes = $this->conn->send($msg);
- if ($bytes > 0) {
- return true;
- } else {
- return false;
- }
- } else {
- // Can't send right now...
- return false;
- }
- }
-
- /**
* Send a keepalive ping to the XMPP server.
*/
protected function sendPing()