From e5b758dbbef6774943abf453a43114a2c3371b4a Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sun, 28 Jun 2009 14:38:31 -0400 Subject: start of queuemanager code --- lib/queuemanager.php | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 lib/queuemanager.php (limited to 'lib/queuemanager.php') diff --git a/lib/queuemanager.php b/lib/queuemanager.php new file mode 100644 index 000000000..64aca1bc1 --- /dev/null +++ b/lib/queuemanager.php @@ -0,0 +1,78 @@ +. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou + * @author Sarven Capadisli + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +class QueueManager +{ + static $qm = null; + + static function get() + { + if (empty(self::$qm)) { + + if (Event::handle('StartNewQueueManager', self::$qm)) { + + $type = common_config('queue', 'sub'); + + switch ($type) { + case 'db': + self::$qm = new DBQueueManager(); + break; + case 'stomp': + self::$qm = new StompQueueManager(); + break; + default: + throw new ServerException("No queue manager class for type '$type'"); + } + } + + return self::$qm; + } + } + + function enqueue($object, $queue) + { + throw ServerException("Unimplemented function 'enqueue' called"); + } + + function peek($queue) + { + throw ServerException("Unimplemented function 'peek' called"); + } + + function nextItem($queue, $timeout=null) + { + throw ServerException("Unimplemented function 'nextItem' called"); + } + + function done($object, $queue) + { + throw ServerException("Unimplemented function 'done' called"); + } +} -- cgit v1.2.3-54-g00ecf From 4c256a6d7ee287def5c26f401c8caa6bfe0b8dff Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 11:09:58 -0400 Subject: better hook variables for StartQueueManager --- lib/queuemanager.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'lib/queuemanager.php') diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 64aca1bc1..92f0e10de 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -36,9 +36,9 @@ class QueueManager { if (empty(self::$qm)) { - if (Event::handle('StartNewQueueManager', self::$qm)) { + $type = common_config('queue', 'sub'); - $type = common_config('queue', 'sub'); + if (Event::handle('StartNewQueueManager', array($type, &self::$qm))) { switch ($type) { case 'db': -- cgit v1.2.3-54-g00ecf From e0bf8ad95b2d2ddc7b988c25e9cffa20075a5d8c Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 11:34:12 -0400 Subject: Add UnQueueManager for immediate handling Perhaps it's a little precious, but I took out the switches in util.php to determine what's supposed to be sent when, and made a queuemanager class that will just do things when they're supposed to be done. --- lib/queuemanager.php | 10 ++++-- lib/unqueuemanager.php | 85 ++++++++++++++++++++++++++++++++++++++++++++++++++ lib/util.php | 85 ++++++++++---------------------------------------- 3 files changed, 109 insertions(+), 71 deletions(-) create mode 100644 lib/unqueuemanager.php (limited to 'lib/queuemanager.php') diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 92f0e10de..6bb21de9b 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -36,9 +36,15 @@ class QueueManager { if (empty(self::$qm)) { - $type = common_config('queue', 'sub'); + if (Event::handle('StartNewQueueManager', array(&self::$qm))) { - if (Event::handle('StartNewQueueManager', array($type, &self::$qm))) { + $enabled = common_config('queue', 'enabled'); + $type = common_config('queue', 'sub'); + + if (!$enabled) { + // does everything immediately + return new UnQueueManager(); + } switch ($type) { case 'db': diff --git a/lib/unqueuemanager.php b/lib/unqueuemanager.php new file mode 100644 index 000000000..515461072 --- /dev/null +++ b/lib/unqueuemanager.php @@ -0,0 +1,85 @@ +. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou + * @author Sarven Capadisli + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +class UnQueueManager +{ + function enqueue($object, $queue) + { + $notice = $object; + + switch ($queue) + { + case 'omb': + if ($this->_isLocal($notice)) { + require_once(INSTALLDIR.'/lib/omb.php'); + omb_broadcast_remote_subscribers($notice); + } + break; + case 'public': + if ($this->_isLocal($notice)) { + require_once(INSTALLDIR.'/lib/jabber.php'); + jabber_public_notice($notice); + } + break; + case 'twitter': + if ($this->_isLocal($notice)) { + broadcast_twitter($notice); + } + break; + case 'facebook': + if ($this->_isLocal($notice)) { + require_once INSTALLDIR . '/lib/facebookutil.php'; + return facebookBroadcastNotice($notice); + } + break; + case 'ping': + if ($this->_isLocal($notice)) { + require_once INSTALLDIR . '/lib/ping.php'; + return ping_broadcast_notice($notice); + } + case 'sms': + require_once(INSTALLDIR.'/lib/mail.php'); + mail_broadcast_notice_sms($notice); + break; + case 'jabber': + require_once(INSTALLDIR.'/lib/jabber.php'); + jabber_broadcast_notice($notice); + break; + default: + throw ServerException("UnQueueManager: Unknown queue: $type"); + } + } + + function _isLocal($notice) + { + return ($notice->is_local == NOTICE_LOCAL_PUBLIC || + $notice->is_local == NOTICE_LOCAL_NONPUBLIC); + } +} \ No newline at end of file diff --git a/lib/util.php b/lib/util.php index 3f924c8de..b1b4faa7e 100644 --- a/lib/util.php +++ b/lib/util.php @@ -861,88 +861,35 @@ function common_redirect($url, $code=307) function common_broadcast_notice($notice, $remote=false) { - if (common_config('queue', 'enabled')) { - // Do it later! - return common_enqueue_notice($notice); - } else { - return common_real_broadcast($notice, $remote); - } + return common_enqueue_notice($notice); } // Stick the notice on the queue function common_enqueue_notice($notice) { - $transports = array('omb', 'sms', 'public', 'twitter', 'facebook', 'ping'); + static $localTransports = array('omb', + 'public', + 'twitter', + 'facebook', + 'ping'); + static $allTransports = array('sms', 'jabber'); - if (common_config('xmpp', 'enabled')) - { - $transports[] = 'jabber'; - } + $transports = $allTransports; - if (common_config('queue','subsystem') == 'stomp') { - common_enqueue_notice_stomp($notice, $transports); + if ($notice->is_local == NOTICE_LOCAL_PUBLIC || + $notice->is_local == NOTICE_LOCAL_NONPUBLIC) { + $transports = array_merge($transports, $localTransports); } - else { - common_enqueue_notice_db($notice, $transports); - } - return $result; -} -function common_enqueue_notice_db($notice, $transports) -{ - // in any other case, 'internal' - foreach ($transports as $transport) { - common_enqueue_notice_transport($notice, $transport); - } -} - -function common_enqueue_notice_transport($notice, $transport) -{ -} + $qm = QueueManager::get(); -function common_real_broadcast($notice, $remote=false) -{ - $success = true; - if (!$remote) { - // Make sure we have the OMB stuff - require_once(INSTALLDIR.'/lib/omb.php'); - $success = omb_broadcast_remote_subscribers($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in OMB broadcast for notice ' . $notice->id); - } - } - if ($success) { - require_once(INSTALLDIR.'/lib/jabber.php'); - $success = jabber_broadcast_notice($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in jabber broadcast for notice ' . $notice->id); - } - } - if ($success) { - require_once(INSTALLDIR.'/lib/mail.php'); - $success = mail_broadcast_notice_sms($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in sms broadcast for notice ' . $notice->id); - } - } - if ($success) { - $success = jabber_public_notice($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in public broadcast for notice ' . $notice->id); - } - } - if ($success) { - $success = broadcast_twitter($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in Twitter broadcast for notice ' . $notice->id); - } + foreach ($transports as $transport) + { + $qm->enqueue($notice, $transport); } - // XXX: Do a real-time FB broadcast here? - - // XXX: broadcast notices to other IM - return $success; + return true; } function common_broadcast_profile($profile) -- cgit v1.2.3-54-g00ecf From 887d35cfc8c1d42e5af67d0161b244545cda464a Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 12:09:18 -0400 Subject: better queue manager detection, new method fail() --- lib/queuemanager.php | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) (limited to 'lib/queuemanager.php') diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 6bb21de9b..1bf4d4dec 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -39,22 +39,22 @@ class QueueManager if (Event::handle('StartNewQueueManager', array(&self::$qm))) { $enabled = common_config('queue', 'enabled'); - $type = common_config('queue', 'sub'); + $type = common_config('queue', 'subsystem'); if (!$enabled) { // does everything immediately - return new UnQueueManager(); - } - - switch ($type) { - case 'db': - self::$qm = new DBQueueManager(); - break; - case 'stomp': - self::$qm = new StompQueueManager(); - break; - default: - throw new ServerException("No queue manager class for type '$type'"); + self::$qm = new UnQueueManager(); + } else { + switch ($type) { + case 'db': + self::$qm = new DBQueueManager(); + break; + case 'stomp': + self::$qm = new StompQueueManager(); + break; + default: + throw new ServerException("No queue manager class for type '$type'"); + } } } @@ -81,4 +81,9 @@ class QueueManager { throw ServerException("Unimplemented function 'done' called"); } + + function fail($object, $queue) + { + throw ServerException("Unimplemented function 'fail' called"); + } } -- cgit v1.2.3-54-g00ecf From 49c5c6f92bc1d06e6464eade81eead891d86f10d Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sat, 4 Jul 2009 00:31:28 -0400 Subject: move handling code into queuemanager --- lib/dbqueuemanager.php | 35 +++++++++++++++++++-- lib/queuehandler.php | 30 ++---------------- lib/queuemanager.php | 19 ++--------- lib/stompqueuemanager.php | 80 +++++++++-------------------------------------- 4 files changed, 51 insertions(+), 113 deletions(-) (limited to 'lib/queuemanager.php') diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index c9e5ef243..6e7172de0 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -51,7 +51,36 @@ class DBQueueManager extends QueueManager return true; } - function nextItem($queue, $timeout=null) + function service($queue, $handler) + { + while (true) { + $this->_log(LOG_DEBUG, 'Checking for notices...'); + $notice = $this->_nextItem($queue, null); + if (empty($notice)) { + $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); + // Nothing in the queue. Do you + // have other tasks, like servicing your + // XMPP connection, to do? + $handler->idle(QUEUE_HANDLER_MISS_IDLE); + } else { + $this->_log(LOG_INFO, 'Got notice '. $notice->id); + // Yay! Got one! + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); + $this->_done($notice, $queue); + } else { + $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id); + $this->_fail($notice, $queue); + } + // Chance to e.g. service your XMPP connection + $this->_log(LOG_DEBUG, 'Idling after success.'); + $handler->idle(QUEUE_HANDLER_HIT_IDLE); + } + // XXX: when do we give up? + } + } + + function _nextItem($queue, $timeout=null) { $start = time(); $result = null; @@ -74,7 +103,7 @@ class DBQueueManager extends QueueManager return $result; } - function done($object, $queue) + function _done($object, $queue) { // XXX: right now, we only handle notices @@ -101,7 +130,7 @@ class DBQueueManager extends QueueManager $notice = null; } - function fail($object, $queue) + function _fail($object, $queue) { // XXX: right now, we only handle notices diff --git a/lib/queuehandler.php b/lib/queuehandler.php index ddb47a28e..c0f38f4e3 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -25,7 +25,7 @@ require_once(INSTALLDIR.'/classes/Notice.php'); define('CLAIM_TIMEOUT', 1200); define('QUEUE_HANDLER_MISS_IDLE', 10); -define('QUEUE_HANDLER_HIT_IDLE', 10); +define('QUEUE_HANDLER_HIT_IDLE', 0); class QueueHandler extends Daemon { @@ -42,7 +42,7 @@ class QueueHandler extends Daemon function timeout() { - return null; + return 60; } function class_name() @@ -96,31 +96,7 @@ class QueueHandler extends Daemon $qm = QueueManager::get(); - while (true) { - $this->log(LOG_DEBUG, 'Checking for notices...'); - $notice = $qm->nextItem($queue, $timeout); - if (empty($notice)) { - $this->log(LOG_DEBUG, 'No notices waiting; idling.'); - // Nothing in the queue. Do you - // have other tasks, like servicing your - // XMPP connection, to do? - $this->idle(QUEUE_HANDLER_MISS_IDLE); - } else { - $this->log(LOG_INFO, 'Got notice '. $notice->id); - // Yay! Got one! - if ($this->handle_notice($notice)) { - $this->log(LOG_INFO, 'Successfully handled notice '. $notice->id); - $qm->done($notice, $queue); - } else { - $this->log(LOG_INFO, 'Failed to handle notice '. $notice->id); - $qm->fail($notice, $queue); - } - // Chance to e.g. service your XMPP connection - $this->log(LOG_DEBUG, 'Idling after success.'); - $this->idle(QUEUE_HANDLER_HIT_IDLE); - } - // XXX: when do we give up? - } + $qm->service($queue, $this); if (!$this->finish()) { return false; diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 1bf4d4dec..f36e99d16 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -67,23 +67,8 @@ class QueueManager throw ServerException("Unimplemented function 'enqueue' called"); } - function peek($queue) + function service($queue, $handler) { - throw ServerException("Unimplemented function 'peek' called"); - } - - function nextItem($queue, $timeout=null) - { - throw ServerException("Unimplemented function 'nextItem' called"); - } - - function done($object, $queue) - { - throw ServerException("Unimplemented function 'done' called"); - } - - function fail($object, $queue) - { - throw ServerException("Unimplemented function 'fail' called"); + throw ServerException("Unimplemented function 'service' called"); } } diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 1ad687036..b8731d543 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -84,85 +84,33 @@ class StompQueueManager . $notice->id . ' for ' . $transport); } - function nextItem($queue, $timeout=null) + function service($queue, $handler) { $result = null; $this->_connect(); - $frame = $this->con->readFrame(); + $this->con->setReadTimeout($handler->timeout()); - if ($frame) { - $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); + $this->con->subscribe($this->_queueName($queue)); - // XXX: Now the queue handler receives only the ID of the - // notice, and it has to get it from the DB - // A massive improvement would be avoid DB query by transmitting - // all the notice details via queue server... + while (true) { - $notice = Notice::staticGet($frame->body); + $frame = $this->con->readFrame(); - if ($notice) { - $this->_saveFrame($notice, $queue, $frame); - } else { - $this->log(LOG_WARNING, 'queue item for notice that does not exist'); - } - } - } - - function done($object, $queue) - { - $notice = $object; + if ($frame) { + $notice = Notice::staticGet($frame->body); - $this->_connect(); - - $frame = $this->_getFrame($notice, $queue); - - if (empty($frame)) { - $this->log(LOG_ERR, 'Cannot find frame for notice '.$notice->id.' in queue '.$queue); - } else { - // if the msg has been handled positively, ack it - // and the queue server will remove it from the queue - $this->con->ack($frame); - $this->_clearFrame($notice, $queue); + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); + $this->con->ack($frame); + } + } - $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); + $handler->idle(0); } - } - - function fail($object, $queue) - { - $notice = $object; - - // STOMP server will requeue it after a while anyways, - // so no need to notify. Just get it out of our little - // array - - $this->_clearFrame($notice, $queue); - } - - function _frameKey($notice, $queue) - { - return ((string)$notice->id) . '-' . $queue; - } - function _saveFrame($notice, $queue, $frame) - { - $k = $this->_frameKey($notice, $queue); - $this->_frames[$k] = $frame; - return true; - } - - function _getFrame($notice, $queue) - { - $k = $this->_frameKey($notice, $queue); - return $this->_frames[$k]; - } - - function _clearFrame($notice, $queue) - { - $k = $this->_frameKey($notice, $queue); - unset($this->_frames[$k]); + $this->con->unsubscribe($this->_queueName($queue)); } function _queueName($queue) -- cgit v1.2.3-54-g00ecf From 49eaa04b508f6e27533f494dedd4997416670bef Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sat, 4 Jul 2009 01:42:42 -0400 Subject: return singleton if initialized --- lib/queuemanager.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'lib/queuemanager.php') diff --git a/lib/queuemanager.php b/lib/queuemanager.php index f36e99d16..582c24790 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -57,9 +57,9 @@ class QueueManager } } } - - return self::$qm; } + + return self::$qm; } function enqueue($object, $queue) -- cgit v1.2.3-54-g00ecf