diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/jabber.php | 10 | ||||
-rw-r--r-- | lib/queuehandler.php | 5 | ||||
-rw-r--r-- | lib/stompqueuemanager.php | 81 | ||||
-rw-r--r-- | lib/xmppqueuehandler.php | 16 |
4 files changed, 87 insertions, 25 deletions
diff --git a/lib/jabber.php b/lib/jabber.php index 7d584ad01..e15076160 100644 --- a/lib/jabber.php +++ b/lib/jabber.php @@ -77,6 +77,14 @@ function jabber_daemon_address() return common_config('xmpp', 'user') . '@' . common_config('xmpp', 'server'); } +class Sharing_XMPP extends XMPPHP_XMPP +{ + function getSocket() + { + return $this->socket; + } +} + /** * connect the configured Jabber account to the configured server * @@ -89,7 +97,7 @@ function jabber_connect($resource=null) { static $conn = null; if (!$conn) { - $conn = new XMPPHP_XMPP(common_config('xmpp', 'host') ? + $conn = new Sharing_XMPP(common_config('xmpp', 'host') ? common_config('xmpp', 'host') : common_config('xmpp', 'server'), common_config('xmpp', 'port'), diff --git a/lib/queuehandler.php b/lib/queuehandler.php index c0f38f4e3..c2ff10f32 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -115,5 +115,10 @@ class QueueHandler extends Daemon { common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); } + + function getSockets() + { + return array(); + } } diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 4cefba113..d13af3fa5 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -30,6 +30,14 @@ require_once 'Stomp.php'; +class LiberalStomp extends Stomp +{ + function getSocket() + { + return $this->_socket; + } +} + class StompQueueManager { var $server = null; @@ -50,7 +58,7 @@ class StompQueueManager { if (empty($this->con)) { $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'..."); - $this->con = new Stomp($this->server); + $this->con = new LiberalStomp($this->server); if ($this->con->connect($this->username, $this->password)) { $this->_log(LOG_INFO, "Connected."); @@ -94,37 +102,66 @@ class StompQueueManager while (true) { - $frame = $this->con->readFrame(); + // Wait for something on one of our sockets - if (!empty($frame)) { - $notice = Notice::staticGet('id', $frame->body); + $stompsock = $this->con->getSocket(); - if (empty($notice)) { - $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue); - $this->con->ack($frame); - } else { - if ($handler->handle_notice($notice)) { - $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); - $this->con->ack($frame); - } else { - $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); - // FIXME we probably shouldn't have to do - // this kind of queue management ourselves - $this->con->ack($frame); - $this->enqueue($notice, $queue); + $handsocks = $handler->getSockets(); + + $socks = array_merge(array($stompsock), $handsocks); + + $read = $socks; + $write = array(); + $except = array(); + + $ready = stream_select($read, $write, $except, $handler->timeout(), 0); + + if ($ready === false) { + $this->_log(LOG_ERR, "Error selecting on sockets"); + } else if ($ready > 0) { + if (in_array($stompsock, $read)) { + $this->_handleNotice($queue, $handler); + } + foreach ($handsocks as $sock) { + if (in_array($sock, $read)) { + $handler->idle(QUEUE_HANDLER_HIT_IDLE); + break; } - unset($notice); } - - unset($frame); } - - $handler->idle(0); } $this->con->unsubscribe($this->_queueName($queue)); } + function _handleNotice($queue, $handler) + { + $frame = $this->con->readFrame(); + + if (!empty($frame)) { + $notice = Notice::staticGet('id', $frame->body); + + if (empty($notice)) { + $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue); + $this->con->ack($frame); + } else { + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); + $this->con->ack($frame); + } else { + $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); + // FIXME we probably shouldn't have to do + // this kind of queue management ourselves + $this->con->ack($frame); + $this->enqueue($notice, $queue); + } + unset($notice); + } + + unset($frame); + } + } + function _queueName($queue) { return common_config('queue', 'queue_basename') . $queue; diff --git a/lib/xmppqueuehandler.php b/lib/xmppqueuehandler.php index 7d14422c6..77d476c30 100644 --- a/lib/xmppqueuehandler.php +++ b/lib/xmppqueuehandler.php @@ -21,6 +21,8 @@ if (!defined('LACONICA')) { exit(1); } require_once(INSTALLDIR.'/lib/queuehandler.php'); +define('PING_INTERVAL', 120); + /** * Common superclass for all XMPP-using queue handlers. They all need to * service their message queues on idle, and forward any incoming messages @@ -31,6 +33,7 @@ require_once(INSTALLDIR.'/lib/queuehandler.php'); class XmppQueueHandler extends QueueHandler { var $pingid = 0; + var $lastping = null; function start() { @@ -63,8 +66,12 @@ class XmppQueueHandler extends QueueHandler try { if ($this->conn) { $this->log(LOG_DEBUG, "Servicing the XMPP queue."); - $this->conn->processTime(max($timeout, 1)); - $this->sendPing(); + $this->conn->processTime($timeout); + $now = time(); + if (empty($this->lastping) || $now - $this->lastping > PING_INTERVAL) { + $this->sendPing(); + $this->lastping = $now; + } } } catch (XMPPHP_Exception $e) { $this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); @@ -119,4 +126,9 @@ class XmppQueueHandler extends QueueHandler return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon'; } } + + function getSockets() + { + return array($this->conn->getSocket()); + } } |