diff options
Diffstat (limited to 'lib/stompqueuemanager.php')
-rw-r--r-- | lib/stompqueuemanager.php | 369 |
1 files changed, 172 insertions, 197 deletions
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index cd62c25bd..bfeeb23b7 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -63,6 +63,7 @@ class StompQueueManager extends QueueManager $this->password = common_config('queue', 'stomp_password'); $this->base = common_config('queue', 'queue_basename'); $this->control = common_config('queue', 'control_channel'); + $this->subscriptions = array($this->control => $this->control); } /** @@ -75,17 +76,25 @@ class StompQueueManager extends QueueManager } /** - * Record each site we'll be handling input for in this process, - * so we can listen to the necessary queues for it. - * - * @fixme possibly actually do subscription here to save another - * loop over all sites later? - * @fixme possibly don't assume it's the current site + * Record queue subscriptions we'll need to handle the current site. */ - public function addSite($server) + public function addSite() { - $this->sites[] = $server; + $this->sites[] = StatusNet::currentSite(); + + // Set up handlers active for this site... $this->initialize(); + + foreach ($this->activeGroups as $group) { + if (isset($this->groups[$group])) { + // Actual queues may be broken out or consolidated... + // Subscribe to all the target queues we'll need. + foreach ($this->groups[$group] as $transport => $class) { + $target = $this->queueName($transport); + $this->subscriptions[$target] = $target; + } + } + } } /** @@ -121,59 +130,11 @@ class StompQueueManager extends QueueManager } /** - * Instantiate the appropriate QueueHandler class for the given queue. + * Saves an object into the queue item table. * + * @param mixed $object * @param string $queue - * @return mixed QueueHandler or null - */ - function getHandler($queue) - { - $handlers = $this->handlers[$this->currentSite()]; - if (isset($handlers[$queue])) { - $class = $handlers[$queue]; - if (class_exists($class)) { - return new $class(); - } else { - common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); - } - } else { - common_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); - } - return null; - } - - /** - * Get a list of all registered queue transport names. - * - * @return array of strings - */ - function getQueues() - { - $group = $this->activeGroup(); - $site = $this->currentSite(); - if (empty($this->groups[$site][$group])) { - return array(); - } else { - return array_keys($this->groups[$site][$group]); - } - } - - /** - * Register a queue transport name and handler class for your plugin. - * Only registered transports will be reliably picked up! * - * @param string $transport - * @param string $class - * @param string $group - */ - public function connect($transport, $class, $group='queuedaemon') - { - $this->handlers[$this->currentSite()][$transport] = $class; - $this->groups[$this->currentSite()][$group][$transport] = $class; - } - - /** - * Saves a notice object reference into the queue item table. * @return boolean true on success * @throws StompException on connection or send error */ @@ -192,8 +153,11 @@ class StompQueueManager extends QueueManager */ protected function _doEnqueue($object, $queue, $idx) { - $msg = $this->encode($object); $rep = $this->logrep($object); + $envelope = array('site' => common_config('site', 'nickname'), + 'handler' => $queue, + 'payload' => $this->encode($object)); + $msg = serialize($envelope); $props = array('created' => common_sql_now()); if ($this->isPersistent($queue)) { @@ -205,11 +169,11 @@ class StompQueueManager extends QueueManager $result = $con->send($this->queueName($queue), $msg, $props); if (!$result) { - common_log(LOG_ERR, "Error sending $rep to $queue queue on $host"); + $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host"); return false; } - common_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host"); + $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host"); $this->stats('enqueued', $queue); return true; } @@ -275,12 +239,14 @@ class StompQueueManager extends QueueManager $idx = $this->connectionFromSocket($socket); $con = $this->cons[$idx]; $host = $con->getServer(); + $this->defaultIdx = $idx; $ok = true; try { $frames = $con->readFrames(); } catch (StompException $e) { - common_log(LOG_ERR, "Lost connection to $host: " . $e->getMessage()); + $this->_log(LOG_ERR, "Lost connection to $host: " . $e->getMessage()); + fclose($socket); // ??? $this->cons[$idx] = null; $this->transaction[$idx] = null; $this->disconnect[$idx] = time(); @@ -289,14 +255,17 @@ class StompQueueManager extends QueueManager foreach ($frames as $frame) { $dest = $frame->headers['destination']; if ($dest == $this->control) { - if (!$this->handleControlSignal($idx, $frame)) { + if (!$this->handleControlSignal($frame)) { // We got a control event that requests a shutdown; // close out and stop handling anything else! break; } } else { - $ok = $ok && $this->handleItem($idx, $frame); + $ok = $this->handleItem($frame) && $ok; } + $this->ack($idx, $frame); + $this->commit($idx); + $this->begin($idx); } return $ok; } @@ -333,22 +302,9 @@ class StompQueueManager extends QueueManager parent::start($master); $this->_connectAll(); - common_log(LOG_INFO, "Subscribing to $this->control"); - foreach ($this->cons as $con) { - if ($con) { - $con->subscribe($this->control); - } - } - if ($this->sites) { - foreach ($this->sites as $server) { - StatusNet::init($server); - $this->doSubscribe(); - } - } else { - $this->doSubscribe(); - } foreach ($this->cons as $i => $con) { if ($con) { + $this->doSubscribe($con); $this->begin($i); } } @@ -356,9 +312,7 @@ class StompQueueManager extends QueueManager } /** - * Subscribe to all the queues we're going to need to handle... - * - * Side effects: in multi-site mode, may reset site configuration. + * Close out any active connections. * * @return bool return false on failure */ @@ -377,15 +331,6 @@ class StompQueueManager extends QueueManager } /** - * Get identifier of the currently active site configuration - * @return string - */ - protected function currentSite() - { - return common_config('site', 'server'); // @fixme switch to nickname - } - - /** * Lazy open a single connection to Stomp queue server. * If multiple servers are configured, we let the Stomp client library * worry about finding a working connection among them. @@ -441,6 +386,10 @@ class StompQueueManager extends QueueManager } } + /** + * Attempt to manually reconnect to the Stomp server for the given + * slot. If successful, set up our subscriptions on it. + */ protected function _reconnect($idx) { try { @@ -453,17 +402,7 @@ class StompQueueManager extends QueueManager $this->cons[$idx] = $con; $this->disconnect[$idx] = null; - // now we have to listen to everything... - // @fixme refactor this nicer. :P - $host = $con->getServer(); - $this->_log(LOG_INFO, "Resubscribing to $this->control on $host"); - $con->subscribe($this->control); - foreach ($this->subscriptions as $site => $queues) { - foreach ($queues as $queue) { - $this->_log(LOG_INFO, "Resubscribing to $queue on $host"); - $con->subscribe($queue); - } - } + $this->doSubscribe($con); $this->begin($idx); } else { // Try again later... @@ -487,41 +426,15 @@ class StompQueueManager extends QueueManager } /** - * Subscribe to all enabled notice queues for the current site. - */ - protected function doSubscribe() - { - $site = $this->currentSite(); - $this->_connect(); - foreach ($this->getQueues() as $queue) { - $rawqueue = $this->queueName($queue); - $this->subscriptions[$site][$queue] = $rawqueue; - $this->_log(LOG_INFO, "Subscribing to $rawqueue"); - foreach ($this->cons as $con) { - if ($con) { - $con->subscribe($rawqueue); - } - } - } - } - - /** - * Subscribe from all enabled notice queues for the current site. + * Set up all our raw queue subscriptions on the given connection + * @param LiberalStomp $con */ - protected function doUnsubscribe() + protected function doSubscribe(LiberalStomp $con) { - $site = $this->currentSite(); - $this->_connect(); - if (!empty($this->subscriptions[$site])) { - foreach ($this->subscriptions[$site] as $queue => $rawqueue) { - $this->_log(LOG_INFO, "Unsubscribing from $rawqueue"); - foreach ($this->cons as $con) { - if ($con) { - $con->unsubscribe($rawqueue); - } - } - unset($this->subscriptions[$site][$queue]); - } + $host = $con->getServer(); + foreach ($this->subscriptions as $queue) { + $this->_log(LOG_INFO, "Subscribing to $queue on $host"); + $con->subscribe($queue); } } @@ -534,25 +447,29 @@ class StompQueueManager extends QueueManager * Side effects: in multi-site mode, may reset site configuration to * match the site that queued the event. * - * @param int $idx connection index * @param StompFrame $frame - * @return bool + * @return bool success */ - protected function handleItem($idx, $frame) + protected function handleItem($frame) { - $this->defaultIdx = $idx; + $host = $this->cons[$this->defaultIdx]->getServer(); + $message = unserialize($frame->body); + $site = $message['site']; + $queue = $message['handler']; - list($site, $queue) = $this->parseDestination($frame->headers['destination']); - if ($site != $this->currentSite()) { - $this->stats('switch'); - StatusNet::init($site); + if ($this->isDeadletter($frame, $message)) { + $this->stats('deadletter', $queue); + return false; } - $host = $this->cons[$idx]->getServer(); - $item = $this->decode($frame->body); + // @fixme detect failing site switches + $this->switchSite($site); + + $item = $this->decode($message['payload']); if (empty($item)) { $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host"); - return true; + $this->stats('baditem', $queue); + return false; } $info = $this->logrep($item) . " posted at " . $frame->headers['created'] . " in queue $queue from $host"; @@ -561,16 +478,10 @@ class StompQueueManager extends QueueManager $handler = $this->getHandler($queue); if (!$handler) { $this->_log(LOG_ERR, "Missing handler class; skipping $info"); - $this->ack($idx, $frame); - $this->commit($idx); - $this->begin($idx); $this->stats('badhandler', $queue); return false; } - // If there's an exception when handling, - // log the error and let it get requeued. - try { $ok = $handler->handle($item); } catch (Exception $e) { @@ -578,25 +489,80 @@ class StompQueueManager extends QueueManager $ok = false; } - if (!$ok) { + if ($ok) { + $this->_log(LOG_INFO, "Successfully handled $info"); + $this->stats('handled', $queue); + } else { $this->_log(LOG_WARNING, "Failed handling $info"); - // FIXME we probably shouldn't have to do - // this kind of queue management ourselves; - // if we don't ack, it should resend... - $this->ack($idx, $frame); + // Requeing moves the item to the end of the line for its next try. + // @fixme add a manual retry count $this->enqueue($item, $queue); - $this->commit($idx); - $this->begin($idx); $this->stats('requeued', $queue); - return false; } - $this->_log(LOG_INFO, "Successfully handled $info"); - $this->ack($idx, $frame); - $this->commit($idx); - $this->begin($idx); - $this->stats('handled', $queue); - return true; + return $ok; + } + + /** + * Check if a redelivered message has been run through enough + * that we're going to give up on it. + * + * @param StompFrame $frame + * @param array $message unserialized message body + * @return boolean true if we should discard + */ + protected function isDeadLetter($frame, $message) + { + if (isset($frame->headers['redelivered']) && $frame->headers['redelivered'] == 'true') { + // Message was redelivered, possibly indicating a previous failure. + $msgId = $frame->headers['message-id']; + $site = $message['site']; + $queue = $message['handler']; + $msgInfo = "message $msgId for $site in queue $queue"; + + $deliveries = $this->incDeliveryCount($msgId); + if ($deliveries > common_config('queue', 'max_retries')) { + $info = "DEAD-LETTER FILE: Gave up after retry $deliveries on $msgInfo"; + + $outdir = common_config('queue', 'dead_letter_dir'); + if ($outdir) { + $filename = $outdir . "/$site-$queue-" . rawurlencode($msgId); + $info .= ": dumping to $filename"; + file_put_contents($filename, $message['payload']); + } + + common_log(LOG_ERR, $info); + return true; + } else { + common_log(LOG_INFO, "retry $deliveries on $msgInfo"); + } + } + return false; + } + + /** + * Update count of times we've re-encountered this message recently, + * triggered when we get a message marked as 'redelivered'. + * + * Requires a CLI-friendly cache configuration. + * + * @param string $msgId message-id header from message + * @return int number of retries recorded + */ + function incDeliveryCount($msgId) + { + $count = 0; + $cache = common_memcache(); + if ($cache) { + $key = 'statusnet:stomp:message-retries:' . $msgId; + $count = $cache->increment($key); + if (!$count) { + $count = 1; + $cache->set($key, $count, null, 3600); + $got = $cache->get($key); + } + } + return $count; } /** @@ -629,14 +595,23 @@ class StompQueueManager extends QueueManager } else { $this->_log(LOG_ERR, "Ignoring unrecognized control message: $message"); } - - $this->ack($idx, $frame); - $this->commit($idx); - $this->begin($idx); return $shutdown; } /** + * Switch site, if necessary, and reset current handler assignments + * @param string $site + */ + function switchSite($site) + { + if ($site != StatusNet::currentSite()) { + $this->stats('switch'); + StatusNet::switchSite($site); + $this->initialize(); + } + } + + /** * Set us up with queue subscriptions for a new site added at runtime, * triggered by a broadcast to the 'statusnet-control' topic. * @@ -648,22 +623,17 @@ class StompQueueManager extends QueueManager if (empty($this->sites)) { if ($nickname == common_config('site', 'nickname')) { StatusNet::init(common_config('site', 'server')); - $this->doUnsubscribe(); - $this->doSubscribe(); } else { $this->_log(LOG_INFO, "Ignoring update ping for other site $nickname"); } } else { $sn = Status_network::staticGet($nickname); if ($sn) { - $server = $sn->getServerName(); // @fixme do config-by-nick - StatusNet::init($server); - if (empty($this->sites[$server])) { - $this->addSite($server); + $this->switchSite($nickname); + if (!in_array($nickname, $this->sites)) { + $this->addSite(); } - $this->_log(LOG_INFO, "(Re)subscribing to queues for site $nickname / $server"); - $this->doUnsubscribe(); - $this->doSubscribe(); + // @fixme update subscriptions, if applicable $this->stats('siteupdate'); } else { $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname"); @@ -673,42 +643,47 @@ class StompQueueManager extends QueueManager /** * Combines the queue_basename from configuration with the - * site server name and queue name to give eg: + * group name for this queue to give eg: * - * /queue/statusnet/identi.ca/sms + * /queue/statusnet/main * * @param string $queue * @return string */ protected function queueName($queue) { - return common_config('queue', 'queue_basename') . - $this->currentSite() . '/' . $queue; + $base = common_config('queue', 'queue_basename'); + $group = $this->queueGroup($queue); + $breakout = $this->breakoutMode($queue); + if ($breakout == 'shared') { + return $base . "$group"; + } else if ($breakout == 'handler') { + return $base . "$group/$queue"; + } else if ($breakout == 'site') { + $site = StatusNet::currentSite(); + return $base . "$group/$queue/$site"; + } + throw Exception("Unrecognized queue breakout mode '$breakout' for '$queue'"); } /** - * Returns the site and queue name from the server-side queue. + * Get the breakout mode for the given queue on the current site. * - * @param string queue destination (eg '/queue/statusnet/identi.ca/sms') - * @return array of site and queue: ('identi.ca','sms') or false if unrecognized + * @param string $queue + * @return string one of 'shared', 'handler', 'site' */ - protected function parseDestination($dest) + protected function breakoutMode($queue) { - $prefix = common_config('queue', 'queue_basename'); - if (substr($dest, 0, strlen($prefix)) == $prefix) { - $rest = substr($dest, strlen($prefix)); - return explode("/", $rest, 2); + $breakout = common_config('queue', 'breakout'); + if (isset($breakout[$queue])) { + return $breakout[$queue]; + } else if (isset($breakout['*'])) { + return $breakout['*']; } else { - common_log(LOG_ERR, "Got a message from unrecognized stomp queue: $dest"); - return array(false, false); + return 'shared'; } } - function _log($level, $msg) - { - common_log($level, 'StompQueueManager: '.$msg); - } - protected function begin($idx) { if ($this->useTransactions) { |