From e5b758dbbef6774943abf453a43114a2c3371b4a Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sun, 28 Jun 2009 14:38:31 -0400 Subject: start of queuemanager code --- lib/dbqueuemanager.php | 106 ++++++++++++++++++++++++++++++++++++++++ lib/queuemanager.php | 78 +++++++++++++++++++++++++++++ lib/stompqueuemanager.php | 122 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 306 insertions(+) create mode 100644 lib/dbqueuemanager.php create mode 100644 lib/queuemanager.php create mode 100644 lib/stompqueuemanager.php diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php new file mode 100644 index 000000000..c0d4dcd29 --- /dev/null +++ b/lib/dbqueuemanager.php @@ -0,0 +1,106 @@ +. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou + * @author Sarven Capadisli + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +class DBQueueManager extends QueueManager +{ + var $qis = array(); + + function enqueue($object, $queue) + { + $notice = (Notice)$object; + + $qi = new Queue_item(); + + $qi->notice_id = $notice->id; + $qi->transport = $queue; + $qi->created = $notice->created; + $result = $qi->insert(); + + if (!$result) { + common_log_db_error($qi, 'INSERT', __FILE__); + throw new ServerException('DB error inserting queue item'); + } + + return true; + } + + function nextItem($queue, $timeout=null) + { + $start = time(); + $result = null; + + do { + $qi = Queue_item::top($queue); + if (!empty($qi)) { + $notice = Notice::staticGet('id', $qi->notice_id); + if (!empty($notice)) { + $result = $notice; + } else { + $this->_log(LOG_INFO, 'dequeued non-existent notice ' . $notice->id); + $qi->delete(); + $qi->free(); + $qi = null; + } + } + } while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout)); + + return $result; + } + + function done($object, $queue) + { + $notice = (Notice)$object; + + $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, + 'transport' => $queue)); + + if (empty($qi)) { + $this->log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + } else { + if (empty($qi->claimed)) { + $this->log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '. + 'for '.$notice->id.', queue '.$queue); + } + $qi->delete(); + $qi->free(); + $qi = null; + } + + $this->log(LOG_INFO, 'done with notice ID = ' . $notice->id); + + $notice->free(); + $notice = null; + } + + function _log($level, $msg) + { + common_log($level, 'DBQueueManager: '.$msg); + } +} diff --git a/lib/queuemanager.php b/lib/queuemanager.php new file mode 100644 index 000000000..64aca1bc1 --- /dev/null +++ b/lib/queuemanager.php @@ -0,0 +1,78 @@ +. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou + * @author Sarven Capadisli + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +class QueueManager +{ + static $qm = null; + + static function get() + { + if (empty(self::$qm)) { + + if (Event::handle('StartNewQueueManager', self::$qm)) { + + $type = common_config('queue', 'sub'); + + switch ($type) { + case 'db': + self::$qm = new DBQueueManager(); + break; + case 'stomp': + self::$qm = new StompQueueManager(); + break; + default: + throw new ServerException("No queue manager class for type '$type'"); + } + } + + return self::$qm; + } + } + + function enqueue($object, $queue) + { + throw ServerException("Unimplemented function 'enqueue' called"); + } + + function peek($queue) + { + throw ServerException("Unimplemented function 'peek' called"); + } + + function nextItem($queue, $timeout=null) + { + throw ServerException("Unimplemented function 'nextItem' called"); + } + + function done($object, $queue) + { + throw ServerException("Unimplemented function 'done' called"); + } +} diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php new file mode 100644 index 000000000..20c6e7a34 --- /dev/null +++ b/lib/stompqueuemanager.php @@ -0,0 +1,122 @@ +. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou + * @author Sarven Capadisli + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +require_once 'Stomp.php'; + +class QueueManager +{ + var $server = null; + var $username = null; + var $password = null; + var $base = null; + var $con = null; + var $frames = array(); + + function __construct() + { + $this->server = common_config('queue', 'stomp_server'); + $this->username = common_config('queue', 'stomp_username'); + $this->password = common_config('queue', 'stomp_password'); + $this->base = common_config('queue', 'queue_basename'); + } + + function _connect() + { + if (empty($this->con)) { + $this->con = new Stomp($this->server); + + if (!$this->con->connect($this->username, $this->password)) { + $this->_log(LOG_ERR, 'Failed to connect to queue server'); + throw new ServerException('Failed to connect to queue server'); + } + } + } + + function enqueue($object, $queue) + { + $notice = (Notice)$object; + + $this->_connect(); + + $result = $this->con->send($this->_queueName($queue), + $notice->id, // BODY of the message + array ('created' => $notice->created)); + + if (!$result) { + common_log(LOG_ERR, 'Error sending to '.$transport.' queue'); + return false; + } + + common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' + . $notice->id . ' for ' . $transport); + } + + function nextItem($queue, $timeout=null) + { + $result = null; + + $this->_connect(); + + $frame = $this->con->readFrame(); + + if ($frame) { + $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); + + // XXX: Now the queue handler receives only the ID of the + // notice, and it has to get it from the DB + // A massive improvement would be avoid DB query by transmitting + // all the notice details via queue server... + $notice = Notice::staticGet($frame->body); + + if ($notice) { + } else { + $this->log(LOG_WARNING, 'queue item for notice that does not exist'); + } + } + } + + function done($object, $queue) + { + $notice = (Notice)$object; + + $this->_connect(); + + $frame = $this->_getFrame($notice, $queue); + + if (empty($frame)) { + $this->log(LOG_ERR, 'Cannot find frame for notice '.$notice->id.' in queue '.$queue); + } else { + // if the msg has been handled positively, ack it + // and the queue server will remove it from the queue + $this->con->ack($frame); + $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); + } + } +} -- cgit v1.2.3-54-g00ecf From 854c82cfd53cb071afa39259fb467b4730bd6494 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sun, 28 Jun 2009 14:38:34 -0400 Subject: start of queuemanager code --- classes/Queue_item.php | 11 ++++++-- lib/util.php | 75 -------------------------------------------------- 2 files changed, 8 insertions(+), 78 deletions(-) diff --git a/classes/Queue_item.php b/classes/Queue_item.php index 9b909ec22..295c321b5 100644 --- a/classes/Queue_item.php +++ b/classes/Queue_item.php @@ -4,7 +4,7 @@ */ require_once INSTALLDIR.'/classes/Memcached_DataObject.php'; -class Queue_item extends Memcached_DataObject +class Queue_item extends Memcached_DataObject { ###START_AUTOCODE /* the code below is auto generated do not remove the above tag */ @@ -13,7 +13,7 @@ class Queue_item extends Memcached_DataObject public $notice_id; // int(4) primary_key not_null public $transport; // varchar(8) primary_key not_null public $created; // datetime() not_null - public $claimed; // datetime() + public $claimed; // datetime() /* Static get */ function staticGet($k,$v=null) @@ -24,7 +24,7 @@ class Queue_item extends Memcached_DataObject function sequenceKey() { return array(false, false); } - + static function top($transport) { $qi = new Queue_item(); @@ -54,4 +54,9 @@ class Queue_item extends Memcached_DataObject $qi = null; return null; } + + function &pkeyGet($kv) + { + return Memcached_DataObject::pkeyGet('Queue_item', $kv); + } } diff --git a/lib/util.php b/lib/util.php index 9c1af7a0d..3f924c8de 100644 --- a/lib/util.php +++ b/lib/util.php @@ -889,69 +889,6 @@ function common_enqueue_notice($notice) return $result; } -function common_enqueue_notice_stomp($notice, $transports) -{ - // use an external message queue system via STOMP - require_once("Stomp.php"); - - $server = common_config('queue','stomp_server'); - $username = common_config('queue', 'stomp_username'); - $password = common_config('queue', 'stomp_password'); - - $con = new Stomp($server); - - if (!$con->connect($username, $password)) { - common_log(LOG_ERR, 'Failed to connect to queue server'); - return false; - } - - $queue_basename = common_config('queue','queue_basename'); - - foreach ($transports as $transport) { - $result = $con->send('/queue/'.$queue_basename.'-'.$transport, // QUEUE - $notice->id, // BODY of the message - array ('created' => $notice->created)); - if (!$result) { - common_log(LOG_ERR, 'Error sending to '.$transport.' queue'); - return false; - } - common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $transport); - } - - //send tags as headers, so they can be used as JMS selectors - common_log(LOG_DEBUG, 'searching for tags ' . $notice->id); - $tags = array(); - $tag = new Notice_tag(); - $tag->notice_id = $notice->id; - if ($tag->find()) { - while ($tag->fetch()) { - common_log(LOG_DEBUG, 'tag found = ' . $tag->tag); - array_push($tags,$tag->tag); - } - } - $tag->free(); - - $con->send('/topic/laconica.'.$notice->profile_id, - $notice->content, - array( - 'profile_id' => $notice->profile_id, - 'created' => $notice->created, - 'tags' => implode($tags,' - ') - ) - ); - common_log(LOG_DEBUG, 'sent to personal topic ' . $notice->id); - $con->send('/topic/laconica.allusers', - $notice->content, - array( - 'profile_id' => $notice->profile_id, - 'created' => $notice->created, - 'tags' => implode($tags,' - ') - ) - ); - common_log(LOG_DEBUG, 'sent to catch-all topic ' . $notice->id); - $result = true; -} - function common_enqueue_notice_db($notice, $transports) { // in any other case, 'internal' @@ -962,18 +899,6 @@ function common_enqueue_notice_db($notice, $transports) function common_enqueue_notice_transport($notice, $transport) { - $qi = new Queue_item(); - $qi->notice_id = $notice->id; - $qi->transport = $transport; - $qi->created = $notice->created; - $result = $qi->insert(); - if (!$result) { - $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError'); - common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message); - throw new ServerException('DB error inserting queue item: ' . $last_error->message); - } - common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport); - return true; } function common_real_broadcast($notice, $remote=false) -- cgit v1.2.3-54-g00ecf From 58b427869a001a91d66cff497f1563b8277f1a67 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 11:09:42 -0400 Subject: compile errors in DBQueueManager --- lib/dbqueuemanager.php | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index c0d4dcd29..46be54b30 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -34,7 +34,7 @@ class DBQueueManager extends QueueManager function enqueue($object, $queue) { - $notice = (Notice)$object; + $notice = $object; $qi = new Queue_item(); @@ -76,7 +76,9 @@ class DBQueueManager extends QueueManager function done($object, $queue) { - $notice = (Notice)$object; + // XXX: right now, we only handle notices + + $notice = $object; $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, 'transport' => $queue)); -- cgit v1.2.3-54-g00ecf From 4c256a6d7ee287def5c26f401c8caa6bfe0b8dff Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 11:09:58 -0400 Subject: better hook variables for StartQueueManager --- lib/queuemanager.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 64aca1bc1..92f0e10de 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -36,9 +36,9 @@ class QueueManager { if (empty(self::$qm)) { - if (Event::handle('StartNewQueueManager', self::$qm)) { + $type = common_config('queue', 'sub'); - $type = common_config('queue', 'sub'); + if (Event::handle('StartNewQueueManager', array($type, &self::$qm))) { switch ($type) { case 'db': -- cgit v1.2.3-54-g00ecf From 7b66a129139d8c2f03677f6a5b71412a111f655d Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 11:10:23 -0400 Subject: save frames for StompQueueManager --- lib/stompqueuemanager.php | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 20c6e7a34..1b4a26f2e 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -30,7 +30,7 @@ require_once 'Stomp.php'; -class QueueManager +class StompQueueManager { var $server = null; var $username = null; @@ -61,10 +61,12 @@ class QueueManager function enqueue($object, $queue) { - $notice = (Notice)$object; + $notice = $object; $this->_connect(); + // XXX: serialize and send entire notice + $result = $this->con->send($this->_queueName($queue), $notice->id, // BODY of the message array ('created' => $notice->created)); @@ -93,9 +95,11 @@ class QueueManager // notice, and it has to get it from the DB // A massive improvement would be avoid DB query by transmitting // all the notice details via queue server... + $notice = Notice::staticGet($frame->body); if ($notice) { + $this->_saveFrame($notice, $queue, $frame); } else { $this->log(LOG_WARNING, 'queue item for notice that does not exist'); } @@ -104,7 +108,7 @@ class QueueManager function done($object, $queue) { - $notice = (Notice)$object; + $notice = $object; $this->_connect(); @@ -116,7 +120,33 @@ class QueueManager // if the msg has been handled positively, ack it // and the queue server will remove it from the queue $this->con->ack($frame); + $this->_clearFrame($notice, $queue); + $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); } } + + function _frameKey($notice, $queue) + { + return ((string)$notice->id) . '-' . $queue; + } + + function _saveFrame($notice, $queue, $frame) + { + $k = $this->_frameKey($notice, $queue); + $this->_frames[$k] = $frame; + return true; + } + + function _getFrame($notice, $queue) + { + $k = $this->_frameKey($notice, $queue); + return $this->_frames[$k]; + } + + function _clearFrame($notice, $queue) + { + $k = $this->_frameKey($notice, $queue); + unset($this->_frames[$k]); + } } -- cgit v1.2.3-54-g00ecf From e0bf8ad95b2d2ddc7b988c25e9cffa20075a5d8c Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 11:34:12 -0400 Subject: Add UnQueueManager for immediate handling Perhaps it's a little precious, but I took out the switches in util.php to determine what's supposed to be sent when, and made a queuemanager class that will just do things when they're supposed to be done. --- lib/queuemanager.php | 10 ++++-- lib/unqueuemanager.php | 85 ++++++++++++++++++++++++++++++++++++++++++++++++++ lib/util.php | 85 ++++++++++---------------------------------------- 3 files changed, 109 insertions(+), 71 deletions(-) create mode 100644 lib/unqueuemanager.php diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 92f0e10de..6bb21de9b 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -36,9 +36,15 @@ class QueueManager { if (empty(self::$qm)) { - $type = common_config('queue', 'sub'); + if (Event::handle('StartNewQueueManager', array(&self::$qm))) { - if (Event::handle('StartNewQueueManager', array($type, &self::$qm))) { + $enabled = common_config('queue', 'enabled'); + $type = common_config('queue', 'sub'); + + if (!$enabled) { + // does everything immediately + return new UnQueueManager(); + } switch ($type) { case 'db': diff --git a/lib/unqueuemanager.php b/lib/unqueuemanager.php new file mode 100644 index 000000000..515461072 --- /dev/null +++ b/lib/unqueuemanager.php @@ -0,0 +1,85 @@ +. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou + * @author Sarven Capadisli + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +class UnQueueManager +{ + function enqueue($object, $queue) + { + $notice = $object; + + switch ($queue) + { + case 'omb': + if ($this->_isLocal($notice)) { + require_once(INSTALLDIR.'/lib/omb.php'); + omb_broadcast_remote_subscribers($notice); + } + break; + case 'public': + if ($this->_isLocal($notice)) { + require_once(INSTALLDIR.'/lib/jabber.php'); + jabber_public_notice($notice); + } + break; + case 'twitter': + if ($this->_isLocal($notice)) { + broadcast_twitter($notice); + } + break; + case 'facebook': + if ($this->_isLocal($notice)) { + require_once INSTALLDIR . '/lib/facebookutil.php'; + return facebookBroadcastNotice($notice); + } + break; + case 'ping': + if ($this->_isLocal($notice)) { + require_once INSTALLDIR . '/lib/ping.php'; + return ping_broadcast_notice($notice); + } + case 'sms': + require_once(INSTALLDIR.'/lib/mail.php'); + mail_broadcast_notice_sms($notice); + break; + case 'jabber': + require_once(INSTALLDIR.'/lib/jabber.php'); + jabber_broadcast_notice($notice); + break; + default: + throw ServerException("UnQueueManager: Unknown queue: $type"); + } + } + + function _isLocal($notice) + { + return ($notice->is_local == NOTICE_LOCAL_PUBLIC || + $notice->is_local == NOTICE_LOCAL_NONPUBLIC); + } +} \ No newline at end of file diff --git a/lib/util.php b/lib/util.php index 3f924c8de..b1b4faa7e 100644 --- a/lib/util.php +++ b/lib/util.php @@ -861,88 +861,35 @@ function common_redirect($url, $code=307) function common_broadcast_notice($notice, $remote=false) { - if (common_config('queue', 'enabled')) { - // Do it later! - return common_enqueue_notice($notice); - } else { - return common_real_broadcast($notice, $remote); - } + return common_enqueue_notice($notice); } // Stick the notice on the queue function common_enqueue_notice($notice) { - $transports = array('omb', 'sms', 'public', 'twitter', 'facebook', 'ping'); + static $localTransports = array('omb', + 'public', + 'twitter', + 'facebook', + 'ping'); + static $allTransports = array('sms', 'jabber'); - if (common_config('xmpp', 'enabled')) - { - $transports[] = 'jabber'; - } + $transports = $allTransports; - if (common_config('queue','subsystem') == 'stomp') { - common_enqueue_notice_stomp($notice, $transports); + if ($notice->is_local == NOTICE_LOCAL_PUBLIC || + $notice->is_local == NOTICE_LOCAL_NONPUBLIC) { + $transports = array_merge($transports, $localTransports); } - else { - common_enqueue_notice_db($notice, $transports); - } - return $result; -} -function common_enqueue_notice_db($notice, $transports) -{ - // in any other case, 'internal' - foreach ($transports as $transport) { - common_enqueue_notice_transport($notice, $transport); - } -} - -function common_enqueue_notice_transport($notice, $transport) -{ -} + $qm = QueueManager::get(); -function common_real_broadcast($notice, $remote=false) -{ - $success = true; - if (!$remote) { - // Make sure we have the OMB stuff - require_once(INSTALLDIR.'/lib/omb.php'); - $success = omb_broadcast_remote_subscribers($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in OMB broadcast for notice ' . $notice->id); - } - } - if ($success) { - require_once(INSTALLDIR.'/lib/jabber.php'); - $success = jabber_broadcast_notice($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in jabber broadcast for notice ' . $notice->id); - } - } - if ($success) { - require_once(INSTALLDIR.'/lib/mail.php'); - $success = mail_broadcast_notice_sms($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in sms broadcast for notice ' . $notice->id); - } - } - if ($success) { - $success = jabber_public_notice($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in public broadcast for notice ' . $notice->id); - } - } - if ($success) { - $success = broadcast_twitter($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in Twitter broadcast for notice ' . $notice->id); - } + foreach ($transports as $transport) + { + $qm->enqueue($notice, $transport); } - // XXX: Do a real-time FB broadcast here? - - // XXX: broadcast notices to other IM - return $success; + return true; } function common_broadcast_profile($profile) -- cgit v1.2.3-54-g00ecf From 887d35cfc8c1d42e5af67d0161b244545cda464a Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 12:09:18 -0400 Subject: better queue manager detection, new method fail() --- lib/queuemanager.php | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 6bb21de9b..1bf4d4dec 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -39,22 +39,22 @@ class QueueManager if (Event::handle('StartNewQueueManager', array(&self::$qm))) { $enabled = common_config('queue', 'enabled'); - $type = common_config('queue', 'sub'); + $type = common_config('queue', 'subsystem'); if (!$enabled) { // does everything immediately - return new UnQueueManager(); - } - - switch ($type) { - case 'db': - self::$qm = new DBQueueManager(); - break; - case 'stomp': - self::$qm = new StompQueueManager(); - break; - default: - throw new ServerException("No queue manager class for type '$type'"); + self::$qm = new UnQueueManager(); + } else { + switch ($type) { + case 'db': + self::$qm = new DBQueueManager(); + break; + case 'stomp': + self::$qm = new StompQueueManager(); + break; + default: + throw new ServerException("No queue manager class for type '$type'"); + } } } @@ -81,4 +81,9 @@ class QueueManager { throw ServerException("Unimplemented function 'done' called"); } + + function fail($object, $queue) + { + throw ServerException("Unimplemented function 'fail' called"); + } } -- cgit v1.2.3-54-g00ecf From 557418bc1e4e9d8a06025910ad7be5f60557f71e Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 12:09:41 -0400 Subject: better transport choices when xmpp is disabled --- lib/util.php | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/lib/util.php b/lib/util.php index b1b4faa7e..656374516 100644 --- a/lib/util.php +++ b/lib/util.php @@ -869,17 +869,25 @@ function common_broadcast_notice($notice, $remote=false) function common_enqueue_notice($notice) { static $localTransports = array('omb', - 'public', 'twitter', 'facebook', 'ping'); - static $allTransports = array('sms', 'jabber'); + static $allTransports = array('sms'); $transports = $allTransports; + $xmpp = common_config('xmpp', 'enabled'); + + if ($xmpp) { + $transports[] = 'jabber'; + } + if ($notice->is_local == NOTICE_LOCAL_PUBLIC || $notice->is_local == NOTICE_LOCAL_NONPUBLIC) { $transports = array_merge($transports, $localTransports); + if ($xmpp) { + $transports[] = 'public'; + } } $qm = QueueManager::get(); -- cgit v1.2.3-54-g00ecf From 2325d934a8abfc611f455d4f0b816e2dd62c5ec4 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 12:10:11 -0400 Subject: add fail() method to stompqueuemanager --- lib/stompqueuemanager.php | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 1b4a26f2e..badcd4abb 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -126,6 +126,17 @@ class StompQueueManager } } + function fail($object, $queue) + { + $notice = $object; + + // STOMP server will requeue it after a while anyways, + // so no need to notify. Just get it out of our little + // array + + $this->_clearFrame($notice, $queue); + } + function _frameKey($notice, $queue) { return ((string)$notice->id) . '-' . $queue; -- cgit v1.2.3-54-g00ecf From a35138b2684ec5275a1ffd7badfe7826cf2173b1 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 12:10:25 -0400 Subject: add fail() method to dbqueuemanager and fix logging --- lib/dbqueuemanager.php | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index 46be54b30..c9e5ef243 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -84,10 +84,10 @@ class DBQueueManager extends QueueManager 'transport' => $queue)); if (empty($qi)) { - $this->log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); } else { if (empty($qi->claimed)) { - $this->log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '. + $this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '. 'for '.$notice->id.', queue '.$queue); } $qi->delete(); @@ -95,7 +95,36 @@ class DBQueueManager extends QueueManager $qi = null; } - $this->log(LOG_INFO, 'done with notice ID = ' . $notice->id); + $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id); + + $notice->free(); + $notice = null; + } + + function fail($object, $queue) + { + // XXX: right now, we only handle notices + + $notice = $object; + + $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, + 'transport' => $queue)); + + if (empty($qi)) { + $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + } else { + if (empty($qi->claimed)) { + $this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '. + 'for '.$notice->id.', queue '.$queue); + } else { + $orig = clone($qi); + $qi->claimed = null; + $qi->update($orig); + $qi = null; + } + } + + $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id); $notice->free(); $notice = null; -- cgit v1.2.3-54-g00ecf From e52997e52fe02960908eb6a9637a3349a2c74dad Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 12:11:02 -0400 Subject: change queuehandler class to use queuemanager interface --- lib/queuehandler.php | 151 ++++++++++++++------------------------------------- 1 file changed, 41 insertions(+), 110 deletions(-) diff --git a/lib/queuehandler.php b/lib/queuehandler.php index ae403c65e..045432ae5 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -17,17 +17,18 @@ * along with this program. If not, see . */ -define('CLAIM_TIMEOUT', 1200); - if (!defined('LACONICA')) { exit(1); } require_once(INSTALLDIR.'/lib/daemon.php'); require_once(INSTALLDIR.'/classes/Queue_item.php'); require_once(INSTALLDIR.'/classes/Notice.php'); +define('CLAIM_TIMEOUT', 1200); +define('QUEUE_HANDLER_MISS_IDLE', 10); +define('QUEUE_HANDLER_HIT_IDLE', 10); + class QueueHandler extends Daemon { - var $_id = 'generic'; function QueueHandler($id=null) @@ -37,6 +38,11 @@ class QueueHandler extends Daemon } } + function timeout() + { + return null; + } + function class_name() { return ucfirst($this->transport()) . 'Handler'; @@ -75,110 +81,45 @@ class QueueHandler extends Daemon return true; } - function db_dispatch() { - do { - $qi = Queue_item::top($this->transport()); - if ($qi) { - $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created)); - $notice = Notice::staticGet($qi->notice_id); - if ($notice) { - $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); - # XXX: what to do if broadcast fails? - $result = $this->handle_notice($notice); - if (!$result) { - $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); - $orig = $qi; - $qi->claimed = null; - $qi->update($orig); - $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id); - continue; - } - $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); - $notice->free(); - unset($notice); - $notice = null; - } else { - $this->log(LOG_WARNING, 'queue item for notice that does not exist'); - } - $qi->delete(); - $qi->free(); - unset($qi); - $this->idle(0); - } else { - $this->clear_old_claims(); - $this->idle(5); - } - } while (true); - } - - function stomp_dispatch() { - - // use an external message queue system via STOMP - require_once("Stomp.php"); + function run() + { + if (!$this->start()) { + return false; + } - $server = common_config('queue','stomp_server'); - $username = common_config('queue', 'stomp_username'); - $password = common_config('queue', 'stomp_password'); + $this->log(LOG_INFO, 'checking for queued notices'); - $con = new Stomp($server); + $queue = $this->transport(); + $timeout = $this->timeout(); - if (!$con->connect($username, $password)) { - $this->log(LOG_ERR, 'Failed to connect to queue server'); - return false; - } + $qm = QueueManager::get(); - $queue_basename = common_config('queue','queue_basename'); - // subscribe to the relevant queue (format: basename-transport) - $con->subscribe('/queue/'.$queue_basename.'-'.$this->transport()); - - do { - $frame = $con->readFrame(); - if ($frame) { - $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); - - // XXX: Now the queue handler receives only the ID of the - // notice, and it has to get it from the DB - // A massive improvement would be avoid DB query by transmitting - // all the notice details via queue server... - $notice = Notice::staticGet($frame->body); - - if ($notice) { - $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); - $result = $this->handle_notice($notice); - if ($result) { - // if the msg has been handled positively, ack it - // and the queue server will remove it from the queue - $con->ack($frame); - $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); - } - else { - // no ack - $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); - } - $notice->free(); - unset($notice); - $notice = null; + while (true) { + $this->log(LOG_DEBUG, 'Checking for notices...'); + $notice = $qm->nextItem($queue, $timeout); + if (empty($notice)) { + $this->log(LOG_DEBUG, 'No notices waiting; idling.'); + // Nothing in the queue. Do you + // have other tasks, like servicing your + // XMPP connection, to do? + $this->idle(QUEUE_HANDLER_MISS_IDLE); + } else { + $this->log(LOG_INFO, 'Got notice '. $notice->id); + // Yay! Got one! + if ($this->handle_notice($notice)) { + $this->log(LOG_INFO, 'Successfully handled notice '. $notice->id); + $qm->done($notice, $queue); } else { - $this->log(LOG_WARNING, 'queue item for notice that does not exist'); + $this->log(LOG_INFO, 'Failed to handle notice '. $notice->id); + $qm->fail($notice, $queue); } + // Chance to e.g. service your XMPP connection + $this->log(LOG_DEBUG, 'Idling after success.'); + $this->idle(QUEUE_HANDLER_HIT_IDLE); } - } while (true); - - $con->disconnect(); - } - - function run() - { - if (!$this->start()) { - return false; - } - $this->log(LOG_INFO, 'checking for queued notices'); - if (common_config('queue','subsystem') == 'stomp') { - $this->stomp_dispatch(); - } - else { - $this->db_dispatch(); + // XXX: when do we give up? } + if (!$this->finish()) { return false; } @@ -187,21 +128,11 @@ class QueueHandler extends Daemon function idle($timeout=0) { - if ($timeout>0) { + if ($timeout > 0) { sleep($timeout); } } - function clear_old_claims() - { - $qi = new Queue_item(); - $qi->transport = $this->transport(); - $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT); - $qi->update(DB_DATAOBJECT_WHEREADD_ONLY); - $qi->free(); - unset($qi); - } - function log($level, $msg) { common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); -- cgit v1.2.3-54-g00ecf From e8f27025ba7869057d86fe37a5264e1c742969f5 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Thu, 2 Jul 2009 12:43:09 -0400 Subject: more logging in stompqueuemanager --- lib/stompqueuemanager.php | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index badcd4abb..08a5790d4 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -49,10 +49,14 @@ class StompQueueManager function _connect() { + $this->_log(LOG_DEBUG, "Connecting to $this->server..."); if (empty($this->con)) { + $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'..."); $this->con = new Stomp($this->server); - if (!$this->con->connect($this->username, $this->password)) { + if ($this->con->connect($this->username, $this->password)) { + $this->_log(LOG_INFO, "Connected."); + } else { $this->_log(LOG_ERR, 'Failed to connect to queue server'); throw new ServerException('Failed to connect to queue server'); } @@ -160,4 +164,9 @@ class StompQueueManager $k = $this->_frameKey($notice, $queue); unset($this->_frames[$k]); } + + function _log($level, $msg) + { + common_log($level, 'StompQueueManager: '.$msg); + } } -- cgit v1.2.3-54-g00ecf From 3e4be98ff6de7a1044f0d7b0deef4f6054e64464 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Fri, 3 Jul 2009 10:05:07 -0400 Subject: add _queueName function --- lib/stompqueuemanager.php | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 08a5790d4..1ad687036 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -165,6 +165,11 @@ class StompQueueManager unset($this->_frames[$k]); } + function _queueName($queue) + { + return common_config('queue', 'queue_basename') . $queue; + } + function _log($level, $msg) { common_log($level, 'StompQueueManager: '.$msg); -- cgit v1.2.3-54-g00ecf From 49c5c6f92bc1d06e6464eade81eead891d86f10d Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sat, 4 Jul 2009 00:31:28 -0400 Subject: move handling code into queuemanager --- lib/dbqueuemanager.php | 35 +++++++++++++++++++-- lib/queuehandler.php | 30 ++---------------- lib/queuemanager.php | 19 ++--------- lib/stompqueuemanager.php | 80 +++++++++-------------------------------------- 4 files changed, 51 insertions(+), 113 deletions(-) diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index c9e5ef243..6e7172de0 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -51,7 +51,36 @@ class DBQueueManager extends QueueManager return true; } - function nextItem($queue, $timeout=null) + function service($queue, $handler) + { + while (true) { + $this->_log(LOG_DEBUG, 'Checking for notices...'); + $notice = $this->_nextItem($queue, null); + if (empty($notice)) { + $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); + // Nothing in the queue. Do you + // have other tasks, like servicing your + // XMPP connection, to do? + $handler->idle(QUEUE_HANDLER_MISS_IDLE); + } else { + $this->_log(LOG_INFO, 'Got notice '. $notice->id); + // Yay! Got one! + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); + $this->_done($notice, $queue); + } else { + $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id); + $this->_fail($notice, $queue); + } + // Chance to e.g. service your XMPP connection + $this->_log(LOG_DEBUG, 'Idling after success.'); + $handler->idle(QUEUE_HANDLER_HIT_IDLE); + } + // XXX: when do we give up? + } + } + + function _nextItem($queue, $timeout=null) { $start = time(); $result = null; @@ -74,7 +103,7 @@ class DBQueueManager extends QueueManager return $result; } - function done($object, $queue) + function _done($object, $queue) { // XXX: right now, we only handle notices @@ -101,7 +130,7 @@ class DBQueueManager extends QueueManager $notice = null; } - function fail($object, $queue) + function _fail($object, $queue) { // XXX: right now, we only handle notices diff --git a/lib/queuehandler.php b/lib/queuehandler.php index ddb47a28e..c0f38f4e3 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -25,7 +25,7 @@ require_once(INSTALLDIR.'/classes/Notice.php'); define('CLAIM_TIMEOUT', 1200); define('QUEUE_HANDLER_MISS_IDLE', 10); -define('QUEUE_HANDLER_HIT_IDLE', 10); +define('QUEUE_HANDLER_HIT_IDLE', 0); class QueueHandler extends Daemon { @@ -42,7 +42,7 @@ class QueueHandler extends Daemon function timeout() { - return null; + return 60; } function class_name() @@ -96,31 +96,7 @@ class QueueHandler extends Daemon $qm = QueueManager::get(); - while (true) { - $this->log(LOG_DEBUG, 'Checking for notices...'); - $notice = $qm->nextItem($queue, $timeout); - if (empty($notice)) { - $this->log(LOG_DEBUG, 'No notices waiting; idling.'); - // Nothing in the queue. Do you - // have other tasks, like servicing your - // XMPP connection, to do? - $this->idle(QUEUE_HANDLER_MISS_IDLE); - } else { - $this->log(LOG_INFO, 'Got notice '. $notice->id); - // Yay! Got one! - if ($this->handle_notice($notice)) { - $this->log(LOG_INFO, 'Successfully handled notice '. $notice->id); - $qm->done($notice, $queue); - } else { - $this->log(LOG_INFO, 'Failed to handle notice '. $notice->id); - $qm->fail($notice, $queue); - } - // Chance to e.g. service your XMPP connection - $this->log(LOG_DEBUG, 'Idling after success.'); - $this->idle(QUEUE_HANDLER_HIT_IDLE); - } - // XXX: when do we give up? - } + $qm->service($queue, $this); if (!$this->finish()) { return false; diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 1bf4d4dec..f36e99d16 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -67,23 +67,8 @@ class QueueManager throw ServerException("Unimplemented function 'enqueue' called"); } - function peek($queue) + function service($queue, $handler) { - throw ServerException("Unimplemented function 'peek' called"); - } - - function nextItem($queue, $timeout=null) - { - throw ServerException("Unimplemented function 'nextItem' called"); - } - - function done($object, $queue) - { - throw ServerException("Unimplemented function 'done' called"); - } - - function fail($object, $queue) - { - throw ServerException("Unimplemented function 'fail' called"); + throw ServerException("Unimplemented function 'service' called"); } } diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 1ad687036..b8731d543 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -84,85 +84,33 @@ class StompQueueManager . $notice->id . ' for ' . $transport); } - function nextItem($queue, $timeout=null) + function service($queue, $handler) { $result = null; $this->_connect(); - $frame = $this->con->readFrame(); + $this->con->setReadTimeout($handler->timeout()); - if ($frame) { - $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); + $this->con->subscribe($this->_queueName($queue)); - // XXX: Now the queue handler receives only the ID of the - // notice, and it has to get it from the DB - // A massive improvement would be avoid DB query by transmitting - // all the notice details via queue server... + while (true) { - $notice = Notice::staticGet($frame->body); + $frame = $this->con->readFrame(); - if ($notice) { - $this->_saveFrame($notice, $queue, $frame); - } else { - $this->log(LOG_WARNING, 'queue item for notice that does not exist'); - } - } - } - - function done($object, $queue) - { - $notice = $object; + if ($frame) { + $notice = Notice::staticGet($frame->body); - $this->_connect(); - - $frame = $this->_getFrame($notice, $queue); - - if (empty($frame)) { - $this->log(LOG_ERR, 'Cannot find frame for notice '.$notice->id.' in queue '.$queue); - } else { - // if the msg has been handled positively, ack it - // and the queue server will remove it from the queue - $this->con->ack($frame); - $this->_clearFrame($notice, $queue); + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); + $this->con->ack($frame); + } + } - $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); + $handler->idle(0); } - } - - function fail($object, $queue) - { - $notice = $object; - - // STOMP server will requeue it after a while anyways, - // so no need to notify. Just get it out of our little - // array - - $this->_clearFrame($notice, $queue); - } - - function _frameKey($notice, $queue) - { - return ((string)$notice->id) . '-' . $queue; - } - function _saveFrame($notice, $queue, $frame) - { - $k = $this->_frameKey($notice, $queue); - $this->_frames[$k] = $frame; - return true; - } - - function _getFrame($notice, $queue) - { - $k = $this->_frameKey($notice, $queue); - return $this->_frames[$k]; - } - - function _clearFrame($notice, $queue) - { - $k = $this->_frameKey($notice, $queue); - unset($this->_frames[$k]); + $this->con->unsubscribe($this->_queueName($queue)); } function _queueName($queue) -- cgit v1.2.3-54-g00ecf From f63702579a672d35c5db262873a4a22835301074 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sat, 4 Jul 2009 01:16:58 -0400 Subject: don't say we're connecting if we're not --- lib/stompqueuemanager.php | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index b8731d543..a6bac861b 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -49,7 +49,6 @@ class StompQueueManager function _connect() { - $this->_log(LOG_DEBUG, "Connecting to $this->server..."); if (empty($this->con)) { $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'..."); $this->con = new Stomp($this->server); -- cgit v1.2.3-54-g00ecf From 6d72864618b73271a83aa566f35838bb1a5c57c7 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sat, 4 Jul 2009 01:17:37 -0400 Subject: don't try to show non-object --- lib/util.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/util.php b/lib/util.php index a40cd3d54..9e8ec41d2 100644 --- a/lib/util.php +++ b/lib/util.php @@ -1028,6 +1028,9 @@ function common_log_objstring(&$object) if (is_null($object)) { return "null"; } + if (!($object instanceof DB_DataObject)) { + return "(unknown)"; + } $arr = $object->toArray(); $fields = array(); foreach ($arr as $k => $v) { -- cgit v1.2.3-54-g00ecf From 9dee9e1612ebe6d6f28c21bce8c426658d60f171 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sat, 4 Jul 2009 01:20:39 -0400 Subject: new default daemon jid --- lib/xmppqueuehandler.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/xmppqueuehandler.php b/lib/xmppqueuehandler.php index 986e09c25..c8b5ad1fb 100644 --- a/lib/xmppqueuehandler.php +++ b/lib/xmppqueuehandler.php @@ -91,7 +91,7 @@ class XmppQueueHandler extends QueueHandler if (common_config('xmpp', 'listener')) { return common_config('xmpp', 'listener'); } else { - return jabber_daemon_address() . '/' . common_config('xmpp','resource') . '-listener'; + return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon'; } } } -- cgit v1.2.3-54-g00ecf From 49eaa04b508f6e27533f494dedd4997416670bef Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sat, 4 Jul 2009 01:42:42 -0400 Subject: return singleton if initialized --- lib/queuemanager.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/queuemanager.php b/lib/queuemanager.php index f36e99d16..582c24790 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -57,9 +57,9 @@ class QueueManager } } } - - return self::$qm; } + + return self::$qm; } function enqueue($object, $queue) -- cgit v1.2.3-54-g00ecf From 66a4a60e0bb67ba9094cd94be5992c70e5352e54 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sat, 4 Jul 2009 01:43:18 -0400 Subject: better debug logging in stomp queue manager --- lib/stompqueuemanager.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index a6bac861b..5f0b88d8a 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -75,12 +75,12 @@ class StompQueueManager array ('created' => $notice->created)); if (!$result) { - common_log(LOG_ERR, 'Error sending to '.$transport.' queue'); + common_log(LOG_ERR, 'Error sending to '.$queue.' queue'); return false; } common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' - . $notice->id . ' for ' . $transport); + . $notice->id . ' for ' . $queue); } function service($queue, $handler) @@ -101,7 +101,7 @@ class StompQueueManager $notice = Notice::staticGet($frame->body); if ($handler->handle_notice($notice)) { - $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created']); $this->con->ack($frame); } } -- cgit v1.2.3-54-g00ecf From cb019f7aad9c4a618316fb3c2e4a36bc013c8da3 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sat, 4 Jul 2009 01:43:35 -0400 Subject: don't send unused variable for streams --- classes/Notice.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/classes/Notice.php b/classes/Notice.php index 8a018068a..5ec0692d9 100644 --- a/classes/Notice.php +++ b/classes/Notice.php @@ -1210,7 +1210,7 @@ class Notice extends Memcached_DataObject $window = explode(',', $laststr); $last_id = $window[0]; $new_ids = call_user_func_array($fn, array_merge($args, array(0, NOTICE_CACHE_WINDOW, - $last_id, 0, null, $tag))); + $last_id, 0, null))); $new_window = array_merge($new_ids, $window); @@ -1225,7 +1225,7 @@ class Notice extends Memcached_DataObject } $window = call_user_func_array($fn, array_merge($args, array(0, NOTICE_CACHE_WINDOW, - 0, 0, null, $tag))); + 0, 0, null))); $windowstr = implode(',', $window); -- cgit v1.2.3-54-g00ecf From 23e6dafff6d82492aa7ab2addc2fae99bd609b57 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sun, 5 Jul 2009 11:01:07 -0400 Subject: better handling of frames and notices --- lib/stompqueuemanager.php | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 5f0b88d8a..e7e1e00dd 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -37,7 +37,6 @@ class StompQueueManager var $password = null; var $base = null; var $con = null; - var $frames = array(); function __construct() { @@ -97,13 +96,19 @@ class StompQueueManager $frame = $this->con->readFrame(); - if ($frame) { - $notice = Notice::staticGet($frame->body); + if (!empty($frame)) { + $notice = Notice::staticGet('id', $frame->body); - if ($handler->handle_notice($notice)) { + if (empty($notice)) { + $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice'); + $this->con->ack($frame); + } else if ($handler->handle_notice($notice)) { $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created']); $this->con->ack($frame); + unset($notice); } + + unset($frame); } $handler->idle(0); -- cgit v1.2.3-54-g00ecf