diff options
author | Evan Prodromou <evan@controlyourself.ca> | 2009-07-07 11:35:00 -0400 |
---|---|---|
committer | Evan Prodromou <evan@controlyourself.ca> | 2009-07-07 11:35:00 -0400 |
commit | 84e676b68702084d70794fbaf8871105588c79c1 (patch) | |
tree | e83ecc3ebe7c6d15adcd80a0d4864290cb1351f9 | |
parent | dd705ddaf751ee8132cdd52e47aeef259477912c (diff) | |
parent | 23e6dafff6d82492aa7ab2addc2fae99bd609b57 (diff) |
Merge branch 'queuemanager' into 0.8.x
-rw-r--r-- | classes/Notice.php | 4 | ||||
-rw-r--r-- | classes/Queue_item.php | 11 | ||||
-rw-r--r-- | lib/dbqueuemanager.php | 166 | ||||
-rw-r--r-- | lib/queuehandler.php | 130 | ||||
-rw-r--r-- | lib/queuemanager.php | 74 | ||||
-rw-r--r-- | lib/stompqueuemanager.php | 129 | ||||
-rw-r--r-- | lib/unqueuemanager.php | 85 | ||||
-rw-r--r-- | lib/util.php | 161 | ||||
-rw-r--r-- | lib/xmppqueuehandler.php | 2 |
9 files changed, 506 insertions, 256 deletions
diff --git a/classes/Notice.php b/classes/Notice.php index 8a018068a..5ec0692d9 100644 --- a/classes/Notice.php +++ b/classes/Notice.php @@ -1210,7 +1210,7 @@ class Notice extends Memcached_DataObject $window = explode(',', $laststr); $last_id = $window[0]; $new_ids = call_user_func_array($fn, array_merge($args, array(0, NOTICE_CACHE_WINDOW, - $last_id, 0, null, $tag))); + $last_id, 0, null))); $new_window = array_merge($new_ids, $window); @@ -1225,7 +1225,7 @@ class Notice extends Memcached_DataObject } $window = call_user_func_array($fn, array_merge($args, array(0, NOTICE_CACHE_WINDOW, - 0, 0, null, $tag))); + 0, 0, null))); $windowstr = implode(',', $window); diff --git a/classes/Queue_item.php b/classes/Queue_item.php index 9b909ec22..295c321b5 100644 --- a/classes/Queue_item.php +++ b/classes/Queue_item.php @@ -4,7 +4,7 @@ */ require_once INSTALLDIR.'/classes/Memcached_DataObject.php'; -class Queue_item extends Memcached_DataObject +class Queue_item extends Memcached_DataObject { ###START_AUTOCODE /* the code below is auto generated do not remove the above tag */ @@ -13,7 +13,7 @@ class Queue_item extends Memcached_DataObject public $notice_id; // int(4) primary_key not_null public $transport; // varchar(8) primary_key not_null public $created; // datetime() not_null - public $claimed; // datetime() + public $claimed; // datetime() /* Static get */ function staticGet($k,$v=null) @@ -24,7 +24,7 @@ class Queue_item extends Memcached_DataObject function sequenceKey() { return array(false, false); } - + static function top($transport) { $qi = new Queue_item(); @@ -54,4 +54,9 @@ class Queue_item extends Memcached_DataObject $qi = null; return null; } + + function &pkeyGet($kv) + { + return Memcached_DataObject::pkeyGet('Queue_item', $kv); + } } diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php new file mode 100644 index 000000000..6e7172de0 --- /dev/null +++ b/lib/dbqueuemanager.php @@ -0,0 +1,166 @@ +<?php +/** + * Laconica, the distributed open-source microblogging tool + * + * Simple-minded queue manager for storing items in the database + * + * PHP version 5 + * + * LICENCE: This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou <evan@controlyourself.ca> + * @author Sarven Capadisli <csarven@controlyourself.ca> + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +class DBQueueManager extends QueueManager +{ + var $qis = array(); + + function enqueue($object, $queue) + { + $notice = $object; + + $qi = new Queue_item(); + + $qi->notice_id = $notice->id; + $qi->transport = $queue; + $qi->created = $notice->created; + $result = $qi->insert(); + + if (!$result) { + common_log_db_error($qi, 'INSERT', __FILE__); + throw new ServerException('DB error inserting queue item'); + } + + return true; + } + + function service($queue, $handler) + { + while (true) { + $this->_log(LOG_DEBUG, 'Checking for notices...'); + $notice = $this->_nextItem($queue, null); + if (empty($notice)) { + $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); + // Nothing in the queue. Do you + // have other tasks, like servicing your + // XMPP connection, to do? + $handler->idle(QUEUE_HANDLER_MISS_IDLE); + } else { + $this->_log(LOG_INFO, 'Got notice '. $notice->id); + // Yay! Got one! + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); + $this->_done($notice, $queue); + } else { + $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id); + $this->_fail($notice, $queue); + } + // Chance to e.g. service your XMPP connection + $this->_log(LOG_DEBUG, 'Idling after success.'); + $handler->idle(QUEUE_HANDLER_HIT_IDLE); + } + // XXX: when do we give up? + } + } + + function _nextItem($queue, $timeout=null) + { + $start = time(); + $result = null; + + do { + $qi = Queue_item::top($queue); + if (!empty($qi)) { + $notice = Notice::staticGet('id', $qi->notice_id); + if (!empty($notice)) { + $result = $notice; + } else { + $this->_log(LOG_INFO, 'dequeued non-existent notice ' . $notice->id); + $qi->delete(); + $qi->free(); + $qi = null; + } + } + } while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout)); + + return $result; + } + + function _done($object, $queue) + { + // XXX: right now, we only handle notices + + $notice = $object; + + $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, + 'transport' => $queue)); + + if (empty($qi)) { + $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + } else { + if (empty($qi->claimed)) { + $this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '. + 'for '.$notice->id.', queue '.$queue); + } + $qi->delete(); + $qi->free(); + $qi = null; + } + + $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id); + + $notice->free(); + $notice = null; + } + + function _fail($object, $queue) + { + // XXX: right now, we only handle notices + + $notice = $object; + + $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, + 'transport' => $queue)); + + if (empty($qi)) { + $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + } else { + if (empty($qi->claimed)) { + $this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '. + 'for '.$notice->id.', queue '.$queue); + } else { + $orig = clone($qi); + $qi->claimed = null; + $qi->update($orig); + $qi = null; + } + } + + $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id); + + $notice->free(); + $notice = null; + } + + function _log($level, $msg) + { + common_log($level, 'DBQueueManager: '.$msg); + } +} diff --git a/lib/queuehandler.php b/lib/queuehandler.php index c1c4f3309..c0f38f4e3 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -17,14 +17,16 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -define('CLAIM_TIMEOUT', 1200); - if (!defined('LACONICA')) { exit(1); } require_once(INSTALLDIR.'/lib/daemon.php'); require_once(INSTALLDIR.'/classes/Queue_item.php'); require_once(INSTALLDIR.'/classes/Notice.php'); +define('CLAIM_TIMEOUT', 1200); +define('QUEUE_HANDLER_MISS_IDLE', 10); +define('QUEUE_HANDLER_HIT_IDLE', 0); + class QueueHandler extends Daemon { var $_id = 'generic'; @@ -38,6 +40,11 @@ class QueueHandler extends Daemon } } + function timeout() + { + return 60; + } + function class_name() { return ucfirst($this->transport()) . 'Handler'; @@ -76,110 +83,21 @@ class QueueHandler extends Daemon return true; } - function db_dispatch() { - do { - $qi = Queue_item::top($this->transport()); - if ($qi) { - $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created)); - $notice = Notice::staticGet($qi->notice_id); - if ($notice) { - $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); - # XXX: what to do if broadcast fails? - $result = $this->handle_notice($notice); - if (!$result) { - $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); - $orig = $qi; - $qi->claimed = null; - $qi->update($orig); - $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id); - continue; - } - $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); - $notice->free(); - unset($notice); - $notice = null; - } else { - $this->log(LOG_WARNING, 'queue item for notice that does not exist'); - } - $qi->delete(); - $qi->free(); - unset($qi); - $this->idle(0); - } else { - $this->clear_old_claims(); - $this->idle(5); - } - } while (true); - } - - function stomp_dispatch() { - - // use an external message queue system via STOMP - require_once("Stomp.php"); - - $server = common_config('queue','stomp_server'); - $username = common_config('queue', 'stomp_username'); - $password = common_config('queue', 'stomp_password'); - - $con = new Stomp($server); - - if (!$con->connect($username, $password)) { - $this->log(LOG_ERR, 'Failed to connect to queue server'); - return false; - } - - $queue_basename = common_config('queue','queue_basename'); - // subscribe to the relevant queue (format: basename-transport) - $con->subscribe('/queue/'.$queue_basename.'-'.$this->transport()); - - do { - $frame = $con->readFrame(); - if ($frame) { - $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); - - // XXX: Now the queue handler receives only the ID of the - // notice, and it has to get it from the DB - // A massive improvement would be avoid DB query by transmitting - // all the notice details via queue server... - $notice = Notice::staticGet($frame->body); - - if ($notice) { - $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); - $result = $this->handle_notice($notice); - if ($result) { - // if the msg has been handled positively, ack it - // and the queue server will remove it from the queue - $con->ack($frame); - $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); - } - else { - // no ack - $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); - } - $notice->free(); - unset($notice); - $notice = null; - } else { - $this->log(LOG_WARNING, 'queue item for notice that does not exist'); - } - } - } while (true); - - $con->disconnect(); - } - function run() { if (!$this->start()) { return false; } + $this->log(LOG_INFO, 'checking for queued notices'); - if (common_config('queue','subsystem') == 'stomp') { - $this->stomp_dispatch(); - } - else { - $this->db_dispatch(); - } + + $queue = $this->transport(); + $timeout = $this->timeout(); + + $qm = QueueManager::get(); + + $qm->service($queue, $this); + if (!$this->finish()) { return false; } @@ -188,21 +106,11 @@ class QueueHandler extends Daemon function idle($timeout=0) { - if ($timeout>0) { + if ($timeout > 0) { sleep($timeout); } } - function clear_old_claims() - { - $qi = new Queue_item(); - $qi->transport = $this->transport(); - $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT); - $qi->update(DB_DATAOBJECT_WHEREADD_ONLY); - $qi->free(); - unset($qi); - } - function log($level, $msg) { common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); diff --git a/lib/queuemanager.php b/lib/queuemanager.php new file mode 100644 index 000000000..582c24790 --- /dev/null +++ b/lib/queuemanager.php @@ -0,0 +1,74 @@ +<?php +/** + * Laconica, the distributed open-source microblogging tool + * + * Abstract class for queue managers + * + * PHP version 5 + * + * LICENCE: This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou <evan@controlyourself.ca> + * @author Sarven Capadisli <csarven@controlyourself.ca> + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +class QueueManager +{ + static $qm = null; + + static function get() + { + if (empty(self::$qm)) { + + if (Event::handle('StartNewQueueManager', array(&self::$qm))) { + + $enabled = common_config('queue', 'enabled'); + $type = common_config('queue', 'subsystem'); + + if (!$enabled) { + // does everything immediately + self::$qm = new UnQueueManager(); + } else { + switch ($type) { + case 'db': + self::$qm = new DBQueueManager(); + break; + case 'stomp': + self::$qm = new StompQueueManager(); + break; + default: + throw new ServerException("No queue manager class for type '$type'"); + } + } + } + } + + return self::$qm; + } + + function enqueue($object, $queue) + { + throw ServerException("Unimplemented function 'enqueue' called"); + } + + function service($queue, $handler) + { + throw ServerException("Unimplemented function 'service' called"); + } +} diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php new file mode 100644 index 000000000..e7e1e00dd --- /dev/null +++ b/lib/stompqueuemanager.php @@ -0,0 +1,129 @@ +<?php +/** + * Laconica, the distributed open-source microblogging tool + * + * Abstract class for queue managers + * + * PHP version 5 + * + * LICENCE: This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou <evan@controlyourself.ca> + * @author Sarven Capadisli <csarven@controlyourself.ca> + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +require_once 'Stomp.php'; + +class StompQueueManager +{ + var $server = null; + var $username = null; + var $password = null; + var $base = null; + var $con = null; + + function __construct() + { + $this->server = common_config('queue', 'stomp_server'); + $this->username = common_config('queue', 'stomp_username'); + $this->password = common_config('queue', 'stomp_password'); + $this->base = common_config('queue', 'queue_basename'); + } + + function _connect() + { + if (empty($this->con)) { + $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'..."); + $this->con = new Stomp($this->server); + + if ($this->con->connect($this->username, $this->password)) { + $this->_log(LOG_INFO, "Connected."); + } else { + $this->_log(LOG_ERR, 'Failed to connect to queue server'); + throw new ServerException('Failed to connect to queue server'); + } + } + } + + function enqueue($object, $queue) + { + $notice = $object; + + $this->_connect(); + + // XXX: serialize and send entire notice + + $result = $this->con->send($this->_queueName($queue), + $notice->id, // BODY of the message + array ('created' => $notice->created)); + + if (!$result) { + common_log(LOG_ERR, 'Error sending to '.$queue.' queue'); + return false; + } + + common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' + . $notice->id . ' for ' . $queue); + } + + function service($queue, $handler) + { + $result = null; + + $this->_connect(); + + $this->con->setReadTimeout($handler->timeout()); + + $this->con->subscribe($this->_queueName($queue)); + + while (true) { + + $frame = $this->con->readFrame(); + + if (!empty($frame)) { + $notice = Notice::staticGet('id', $frame->body); + + if (empty($notice)) { + $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice'); + $this->con->ack($frame); + } else if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created']); + $this->con->ack($frame); + unset($notice); + } + + unset($frame); + } + + $handler->idle(0); + } + + $this->con->unsubscribe($this->_queueName($queue)); + } + + function _queueName($queue) + { + return common_config('queue', 'queue_basename') . $queue; + } + + function _log($level, $msg) + { + common_log($level, 'StompQueueManager: '.$msg); + } +} diff --git a/lib/unqueuemanager.php b/lib/unqueuemanager.php new file mode 100644 index 000000000..515461072 --- /dev/null +++ b/lib/unqueuemanager.php @@ -0,0 +1,85 @@ +<?php +/** + * Laconica, the distributed open-source microblogging tool + * + * A queue manager interface for just doing things immediately + * + * PHP version 5 + * + * LICENCE: This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou <evan@controlyourself.ca> + * @author Sarven Capadisli <csarven@controlyourself.ca> + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +class UnQueueManager +{ + function enqueue($object, $queue) + { + $notice = $object; + + switch ($queue) + { + case 'omb': + if ($this->_isLocal($notice)) { + require_once(INSTALLDIR.'/lib/omb.php'); + omb_broadcast_remote_subscribers($notice); + } + break; + case 'public': + if ($this->_isLocal($notice)) { + require_once(INSTALLDIR.'/lib/jabber.php'); + jabber_public_notice($notice); + } + break; + case 'twitter': + if ($this->_isLocal($notice)) { + broadcast_twitter($notice); + } + break; + case 'facebook': + if ($this->_isLocal($notice)) { + require_once INSTALLDIR . '/lib/facebookutil.php'; + return facebookBroadcastNotice($notice); + } + break; + case 'ping': + if ($this->_isLocal($notice)) { + require_once INSTALLDIR . '/lib/ping.php'; + return ping_broadcast_notice($notice); + } + case 'sms': + require_once(INSTALLDIR.'/lib/mail.php'); + mail_broadcast_notice_sms($notice); + break; + case 'jabber': + require_once(INSTALLDIR.'/lib/jabber.php'); + jabber_broadcast_notice($notice); + break; + default: + throw ServerException("UnQueueManager: Unknown queue: $type"); + } + } + + function _isLocal($notice) + { + return ($notice->is_local == NOTICE_LOCAL_PUBLIC || + $notice->is_local == NOTICE_LOCAL_NONPUBLIC); + } +}
\ No newline at end of file diff --git a/lib/util.php b/lib/util.php index d4d79afb3..9e8ec41d2 100644 --- a/lib/util.php +++ b/lib/util.php @@ -862,165 +862,45 @@ function common_redirect($url, $code=307) function common_broadcast_notice($notice, $remote=false) { - if (common_config('queue', 'enabled')) { - // Do it later! - return common_enqueue_notice($notice); - } else { - return common_real_broadcast($notice, $remote); - } + return common_enqueue_notice($notice); } // Stick the notice on the queue function common_enqueue_notice($notice) { - $transports = array('omb', 'sms', 'public', 'twitter', 'facebook', 'ping'); - - if (common_config('xmpp', 'enabled')) - { - $transports[] = 'jabber'; - } - - if (common_config('queue','subsystem') == 'stomp') { - common_enqueue_notice_stomp($notice, $transports); - } - else { - common_enqueue_notice_db($notice, $transports); - } - return $result; -} - -function common_enqueue_notice_stomp($notice, $transports) -{ - // use an external message queue system via STOMP - require_once("Stomp.php"); + static $localTransports = array('omb', + 'twitter', + 'facebook', + 'ping'); + static $allTransports = array('sms'); - $server = common_config('queue','stomp_server'); - $username = common_config('queue', 'stomp_username'); - $password = common_config('queue', 'stomp_password'); + $transports = $allTransports; - $con = new Stomp($server); + $xmpp = common_config('xmpp', 'enabled'); - if (!$con->connect($username, $password)) { - common_log(LOG_ERR, 'Failed to connect to queue server'); - return false; + if ($xmpp) { + $transports[] = 'jabber'; } - $queue_basename = common_config('queue','queue_basename'); - - foreach ($transports as $transport) { - $result = $con->send('/queue/'.$queue_basename.'-'.$transport, // QUEUE - $notice->id, // BODY of the message - array ('created' => $notice->created)); - if (!$result) { - common_log(LOG_ERR, 'Error sending to '.$transport.' queue'); - return false; - } - common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $transport); - } - - //send tags as headers, so they can be used as JMS selectors - common_log(LOG_DEBUG, 'searching for tags ' . $notice->id); - $tags = array(); - $tag = new Notice_tag(); - $tag->notice_id = $notice->id; - if ($tag->find()) { - while ($tag->fetch()) { - common_log(LOG_DEBUG, 'tag found = ' . $tag->tag); - array_push($tags,$tag->tag); + if ($notice->is_local == NOTICE_LOCAL_PUBLIC || + $notice->is_local == NOTICE_LOCAL_NONPUBLIC) { + $transports = array_merge($transports, $localTransports); + if ($xmpp) { + $transports[] = 'public'; } } - $tag->free(); - $con->send('/topic/laconica.'.$notice->profile_id, - $notice->content, - array( - 'profile_id' => $notice->profile_id, - 'created' => $notice->created, - 'tags' => implode($tags,' - ') - ) - ); - common_log(LOG_DEBUG, 'sent to personal topic ' . $notice->id); - $con->send('/topic/laconica.allusers', - $notice->content, - array( - 'profile_id' => $notice->profile_id, - 'created' => $notice->created, - 'tags' => implode($tags,' - ') - ) - ); - common_log(LOG_DEBUG, 'sent to catch-all topic ' . $notice->id); - $result = true; -} + $qm = QueueManager::get(); -function common_enqueue_notice_db($notice, $transports) -{ - // in any other case, 'internal' - foreach ($transports as $transport) { - common_enqueue_notice_transport($notice, $transport); + foreach ($transports as $transport) + { + $qm->enqueue($notice, $transport); } -} -function common_enqueue_notice_transport($notice, $transport) -{ - $qi = new Queue_item(); - $qi->notice_id = $notice->id; - $qi->transport = $transport; - $qi->created = $notice->created; - $result = $qi->insert(); - if (!$result) { - $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError'); - common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message); - throw new ServerException('DB error inserting queue item: ' . $last_error->message); - } - common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport); return true; } -function common_real_broadcast($notice, $remote=false) -{ - $success = true; - if (!$remote) { - // Make sure we have the OMB stuff - require_once(INSTALLDIR.'/lib/omb.php'); - $success = omb_broadcast_remote_subscribers($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in OMB broadcast for notice ' . $notice->id); - } - } - if ($success) { - require_once(INSTALLDIR.'/lib/jabber.php'); - $success = jabber_broadcast_notice($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in jabber broadcast for notice ' . $notice->id); - } - } - if ($success) { - require_once(INSTALLDIR.'/lib/mail.php'); - $success = mail_broadcast_notice_sms($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in sms broadcast for notice ' . $notice->id); - } - } - if ($success) { - $success = jabber_public_notice($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in public broadcast for notice ' . $notice->id); - } - } - if ($success) { - $success = broadcast_twitter($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in Twitter broadcast for notice ' . $notice->id); - } - } - - // XXX: Do a real-time FB broadcast here? - - // XXX: broadcast notices to other IM - return $success; -} - function common_broadcast_profile($profile) { // XXX: optionally use a queue system like http://code.google.com/p/microapps/wiki/NQDQ @@ -1148,6 +1028,9 @@ function common_log_objstring(&$object) if (is_null($object)) { return "null"; } + if (!($object instanceof DB_DataObject)) { + return "(unknown)"; + } $arr = $object->toArray(); $fields = array(); foreach ($arr as $k => $v) { diff --git a/lib/xmppqueuehandler.php b/lib/xmppqueuehandler.php index 986e09c25..c8b5ad1fb 100644 --- a/lib/xmppqueuehandler.php +++ b/lib/xmppqueuehandler.php @@ -91,7 +91,7 @@ class XmppQueueHandler extends QueueHandler if (common_config('xmpp', 'listener')) { return common_config('xmpp', 'listener'); } else { - return jabber_daemon_address() . '/' . common_config('xmpp','resource') . '-listener'; + return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon'; } } } |