diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/action.php | 7 | ||||
-rw-r--r-- | lib/common.php | 3 | ||||
-rw-r--r-- | lib/currentuserdesignaction.php | 12 | ||||
-rw-r--r-- | lib/dbqueuemanager.php | 166 | ||||
-rw-r--r-- | lib/facebookutil.php | 81 | ||||
-rw-r--r-- | lib/groupdesignaction.php | 16 | ||||
-rw-r--r-- | lib/jabber.php | 10 | ||||
-rw-r--r-- | lib/mail.php | 72 | ||||
-rw-r--r-- | lib/ownerdesignaction.php | 12 | ||||
-rw-r--r-- | lib/peoplesearchresults.php | 80 | ||||
-rw-r--r-- | lib/ping.php | 2 | ||||
-rw-r--r-- | lib/popularnoticesection.php | 2 | ||||
-rw-r--r-- | lib/profilelist.php | 9 | ||||
-rw-r--r-- | lib/queuehandler.php | 133 | ||||
-rw-r--r-- | lib/queuemanager.php | 74 | ||||
-rw-r--r-- | lib/router.php | 4 | ||||
-rw-r--r-- | lib/stompqueuemanager.php | 174 | ||||
-rw-r--r-- | lib/twitter.php | 99 | ||||
-rw-r--r-- | lib/twitterapi.php | 61 | ||||
-rw-r--r-- | lib/unqueuemanager.php | 85 | ||||
-rw-r--r-- | lib/util.php | 164 | ||||
-rw-r--r-- | lib/xmppqueuehandler.php | 39 |
22 files changed, 922 insertions, 383 deletions
diff --git a/lib/action.php b/lib/action.php index c89fe180a..da5b48858 100644 --- a/lib/action.php +++ b/lib/action.php @@ -439,8 +439,6 @@ class Action extends HTMLOutputter // lawsuit $this->menuItem(common_local_url('register'), _('Register'), _('Create an account'), false, 'nav_register'); } - $this->menuItem(common_local_url('openidlogin'), - _('OpenID'), _('Login with OpenID'), false, 'nav_openid'); $this->menuItem(common_local_url('login'), _('Login'), _('Login to the site'), false, 'nav_login'); } @@ -708,6 +706,11 @@ class Action extends HTMLOutputter // lawsuit _('About')); $this->menuItem(common_local_url('doc', array('title' => 'faq')), _('FAQ')); + $bb = common_config('site', 'broughtby'); + if (!empty($bb)) { + $this->menuItem(common_local_url('doc', array('title' => 'tos')), + _('TOS')); + } $this->menuItem(common_local_url('doc', array('title' => 'privacy')), _('Privacy')); $this->menuItem(common_local_url('doc', array('title' => 'source')), diff --git a/lib/common.php b/lib/common.php index 5d451463b..14be747bc 100644 --- a/lib/common.php +++ b/lib/common.php @@ -124,7 +124,8 @@ $config = 'dupelimit' => 60), # default for same person saying the same thing 'syslog' => array('appname' => 'laconica', # for syslog - 'priority' => 'debug'), # XXX: currently ignored + 'priority' => 'debug', # XXX: currently ignored + 'facility' => LOG_USER), 'queue' => array('enabled' => false, 'subsystem' => 'db', # default to database, or 'stomp' diff --git a/lib/currentuserdesignaction.php b/lib/currentuserdesignaction.php index 7c2520cf6..4c7e15a8b 100644 --- a/lib/currentuserdesignaction.php +++ b/lib/currentuserdesignaction.php @@ -53,14 +53,19 @@ class CurrentUserDesignAction extends Action * * @return nothing */ + function showStylesheets() { parent::showStylesheets(); - $design = $this->getDesign(); + $user = common_current_user(); + + if (empty($user) || $user->viewdesigns) { + $design = $this->getDesign(); - if (!empty($design)) { - $design->showCSS($this); + if (!empty($design)) { + $design->showCSS($this); + } } } @@ -84,5 +89,4 @@ class CurrentUserDesignAction extends Action return $cur->getDesign(); } - } diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php new file mode 100644 index 000000000..6e7172de0 --- /dev/null +++ b/lib/dbqueuemanager.php @@ -0,0 +1,166 @@ +<?php +/** + * Laconica, the distributed open-source microblogging tool + * + * Simple-minded queue manager for storing items in the database + * + * PHP version 5 + * + * LICENCE: This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou <evan@controlyourself.ca> + * @author Sarven Capadisli <csarven@controlyourself.ca> + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +class DBQueueManager extends QueueManager +{ + var $qis = array(); + + function enqueue($object, $queue) + { + $notice = $object; + + $qi = new Queue_item(); + + $qi->notice_id = $notice->id; + $qi->transport = $queue; + $qi->created = $notice->created; + $result = $qi->insert(); + + if (!$result) { + common_log_db_error($qi, 'INSERT', __FILE__); + throw new ServerException('DB error inserting queue item'); + } + + return true; + } + + function service($queue, $handler) + { + while (true) { + $this->_log(LOG_DEBUG, 'Checking for notices...'); + $notice = $this->_nextItem($queue, null); + if (empty($notice)) { + $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); + // Nothing in the queue. Do you + // have other tasks, like servicing your + // XMPP connection, to do? + $handler->idle(QUEUE_HANDLER_MISS_IDLE); + } else { + $this->_log(LOG_INFO, 'Got notice '. $notice->id); + // Yay! Got one! + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); + $this->_done($notice, $queue); + } else { + $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id); + $this->_fail($notice, $queue); + } + // Chance to e.g. service your XMPP connection + $this->_log(LOG_DEBUG, 'Idling after success.'); + $handler->idle(QUEUE_HANDLER_HIT_IDLE); + } + // XXX: when do we give up? + } + } + + function _nextItem($queue, $timeout=null) + { + $start = time(); + $result = null; + + do { + $qi = Queue_item::top($queue); + if (!empty($qi)) { + $notice = Notice::staticGet('id', $qi->notice_id); + if (!empty($notice)) { + $result = $notice; + } else { + $this->_log(LOG_INFO, 'dequeued non-existent notice ' . $notice->id); + $qi->delete(); + $qi->free(); + $qi = null; + } + } + } while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout)); + + return $result; + } + + function _done($object, $queue) + { + // XXX: right now, we only handle notices + + $notice = $object; + + $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, + 'transport' => $queue)); + + if (empty($qi)) { + $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + } else { + if (empty($qi->claimed)) { + $this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '. + 'for '.$notice->id.', queue '.$queue); + } + $qi->delete(); + $qi->free(); + $qi = null; + } + + $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id); + + $notice->free(); + $notice = null; + } + + function _fail($object, $queue) + { + // XXX: right now, we only handle notices + + $notice = $object; + + $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, + 'transport' => $queue)); + + if (empty($qi)) { + $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + } else { + if (empty($qi->claimed)) { + $this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '. + 'for '.$notice->id.', queue '.$queue); + } else { + $orig = clone($qi); + $qi->claimed = null; + $qi->update($orig); + $qi = null; + } + } + + $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id); + + $notice->free(); + $notice = null; + } + + function _log($level, $msg) + { + common_log($level, 'DBQueueManager: '.$msg); + } +} diff --git a/lib/facebookutil.php b/lib/facebookutil.php index 4d0df797b..632ec4bad 100644 --- a/lib/facebookutil.php +++ b/lib/facebookutil.php @@ -51,6 +51,10 @@ function updateProfileBox($facebook, $flink, $notice) { function isFacebookBound($notice, $flink) { + if (empty($flink)) { + return false; + } + // If the user does not want to broadcast to Facebook, move along if (!($flink->noticesync & FOREIGN_NOTICE_SEND == FOREIGN_NOTICE_SEND)) { common_log(LOG_INFO, "Skipping notice $notice->id " . @@ -86,10 +90,10 @@ function isFacebookBound($notice, $flink) { if ($result != 1) { $user = $flink->getUser(); - $msg = "Can't send notice $notice->id to Facebook " . + $msg = "Not sending notice $notice->id to Facebook " . "because user $user->nickname hasn't given the " . 'Facebook app \'status_update\' permission.'; - common_log(LOG_INFO, $msg); + common_debug($msg); $success = false; } @@ -108,13 +112,16 @@ function facebookBroadcastNotice($notice) { $facebook = getFacebook(); $flink = Foreign_link::getByUserID($notice->profile_id, FACEBOOK_SERVICE); - $fbuid = $flink->foreign_id; if (isFacebookBound($notice, $flink)) { $status = null; + $fbuid = $flink->foreign_id; + + $user = $flink->getUser(); // Get the status 'verb' (prefix) the user has set + try { $prefix = $facebook->api_client-> data_getUserPreference(FACEBOOK_NOTICE_PREFIX, $fbuid); @@ -122,23 +129,79 @@ function facebookBroadcastNotice($notice) $status = "$prefix $notice->content"; } catch(FacebookRestClientException $e) { - common_log(LOG_ERR, $e->getMessage()); - return false; + common_log(LOG_WARNING, $e->getMessage()); + common_log(LOG_WARNING, + 'Unable to get the status verb setting from Facebook ' . + "for $user->nickname (user id: $user->id)."); } - // Okay, we're good to go! + // Okay, we're good to go, update the FB status try { $facebook->api_client->users_setStatus($status, $fbuid, false, true); - updateProfileBox($facebook, $flink, $notice); } catch(FacebookRestClientException $e) { common_log(LOG_ERR, $e->getMessage()); - return false; + common_log(LOG_ERR, + 'Unable to update Facebook status for ' . + "$user->nickname (user id: $user->id)!"); + + $code = $e->getCode(); - // Should we remove flink if this fails? + if ($code >= 200) { + + // 200 The application does not have permission to operate on the passed in uid parameter. + // 250 Updating status requires the extended permission status_update. + // see: http://wiki.developers.facebook.com/index.php/Users.setStatus#Example_Return_XML + + remove_facebook_app($flink); + } + + } + + // Now try to update the profile box + + try { + updateProfileBox($facebook, $flink, $notice); + } catch(FacebookRestClientException $e) { + common_log(LOG_WARNING, $e->getMessage()); + common_log(LOG_WARNING, + 'Unable to update Facebook profile box for ' . + "$user->nickname (user id: $user->id)."); } } return true; } + +function remove_facebook_app($flink) +{ + + $user = $flink->getUser(); + + common_log(LOG_INFO, 'Removing Facebook App Foreign link for ' . + "user $user->nickname (user id: $user->id)."); + + $result = $flink->delete(); + + if (empty($result)) { + common_log(LOG_ERR, 'Could not remove Facebook App ' . + "Foreign_link for $user->nickname (user id: $user->id)!"); + common_log_db_error($flink, 'DELETE', __FILE__); + } + + // Notify the user that we are removing their FB app access + + $result = mail_facebook_app_removed($user); + + if (!$result) { + + $msg = 'Unable to send email to notify ' . + "$user->nickname (user id: $user->id) " . + 'that their Facebook app link was ' . + 'removed!'; + + common_log(LOG_WARNING, $msg); + } + +} diff --git a/lib/groupdesignaction.php b/lib/groupdesignaction.php index bc95921f1..58777c283 100644 --- a/lib/groupdesignaction.php +++ b/lib/groupdesignaction.php @@ -34,7 +34,7 @@ if (!defined('LACONICA')) { /** * Base class for actions that use a group's design * - * Pages related to groups can be themed with a design. + * Pages related to groups can be themed with a design. * This superclass returns that design. * * @category Action @@ -48,7 +48,7 @@ class GroupDesignAction extends Action { /** The group in question */ var $group = null; - + /** * Show the groups's design stylesheet * @@ -58,10 +58,14 @@ class GroupDesignAction extends Action { { parent::showStylesheets(); - $design = $this->getDesign(); + $user = common_current_user(); + + if (empty($user) || $user->viewdesigns) { + $design = $this->getDesign(); - if (!empty($design)) { - $design->showCSS($this); + if (!empty($design)) { + $design->showCSS($this); + } } } @@ -76,12 +80,10 @@ class GroupDesignAction extends Action { function getDesign() { - if (empty($this->group)) { return null; } return $this->group->getDesign(); } - } diff --git a/lib/jabber.php b/lib/jabber.php index 7d584ad01..e15076160 100644 --- a/lib/jabber.php +++ b/lib/jabber.php @@ -77,6 +77,14 @@ function jabber_daemon_address() return common_config('xmpp', 'user') . '@' . common_config('xmpp', 'server'); } +class Sharing_XMPP extends XMPPHP_XMPP +{ + function getSocket() + { + return $this->socket; + } +} + /** * connect the configured Jabber account to the configured server * @@ -89,7 +97,7 @@ function jabber_connect($resource=null) { static $conn = null; if (!$conn) { - $conn = new XMPPHP_XMPP(common_config('xmpp', 'host') ? + $conn = new Sharing_XMPP(common_config('xmpp', 'host') ? common_config('xmpp', 'host') : common_config('xmpp', 'server'), common_config('xmpp', 'port'), diff --git a/lib/mail.php b/lib/mail.php index 4e1f1dbb1..90ee3c992 100644 --- a/lib/mail.php +++ b/lib/mail.php @@ -625,3 +625,75 @@ function mail_notify_attn($user, $notice) common_init_locale(); mail_to_user($user, $subject, $body); } + +/** + * Send a mail message to notify a user that her Twitter bridge link + * has stopped working, and therefore has been removed. This can + * happen when the user changes her Twitter password, or otherwise + * revokes access. + * + * @param User $user user whose Twitter bridge link has been removed + * + * @return boolean success flag + */ + +function mail_twitter_bridge_removed($user) +{ + common_init_locale($user->language); + + $profile = $user->getProfile(); + + $subject = sprintf(_('Your Twitter bridge has been disabled.')); + + $body = sprintf(_("Hi, %1\$s. We're sorry to inform you that your " . + 'link to Twitter has been disabled. Your Twitter credentials ' . + 'have either changed (did you recently change your Twitter ' . + 'password?) or you have otherwise revoked our access to your ' . + "Twitter account.\n\n" . + 'You can re-enable your Twitter bridge by visiting your ' . + "Twitter settings page:\n\n\t%2\$s\n\n" . + "Regards,\n%3\$s\n"), + $profile->getBestName(), + common_local_url('twittersettings'), + common_config('site', 'name')); + + common_init_locale(); + return mail_to_user($user, $subject, $body); +} + +/** + * Send a mail message to notify a user that her Facebook Application + * access has been removed. + * + * @param User $user user whose Facebook app link has been removed + * + * @return boolean success flag + */ + +function mail_facebook_app_removed($user) +{ + common_init_locale($user->language); + + $profile = $user->getProfile(); + + $site_name = common_config('site', 'name'); + + $subject = sprintf( + _('Your %s Facebook application access has been disabled.', + $site_name)); + + $body = sprintf(_("Hi, %1\$s. We're sorry to inform you that we are " . + 'unable to update your Facebook status from %s, and have disabled ' . + 'the Facebook application for your account. This may be because ' . + 'you have removed the Facebook application\'s authorization, or ' . + 'have deleted your Facebook account. You can re-enable the ' . + 'Facebook application and automatic status updating by ' . + "re-installing the %1\$s Facebook application.\n\nRegards,\n\n%1\$s"), + $site_name); + + common_init_locale(); + return mail_to_user($user, $subject, $body); + +} + + diff --git a/lib/ownerdesignaction.php b/lib/ownerdesignaction.php index 424474f42..785b8a93d 100644 --- a/lib/ownerdesignaction.php +++ b/lib/ownerdesignaction.php @@ -61,11 +61,15 @@ class OwnerDesignAction extends Action { { parent::showStylesheets(); - $design = $this->getDesign(); + $user = common_current_user(); - if (!empty($design)) { - $design->showCSS($this); - } + if (empty($user) || $user->viewdesigns) { + $design = $this->getDesign(); + + if (!empty($design)) { + $design->showCSS($this); + } + } } /** diff --git a/lib/peoplesearchresults.php b/lib/peoplesearchresults.php deleted file mode 100644 index 9f6696b5f..000000000 --- a/lib/peoplesearchresults.php +++ /dev/null @@ -1,80 +0,0 @@ -<?php -/** - * People search results class - * - * PHP version 5 - * - * @category Widget - * @package Laconica - * @author Evan Prodromou <evan@controlyourself.ca> - * @author Robin Millette <millette@controlyourself.ca> - * @license http://www.fsf.org/licensing/licenses/agpl.html AGPLv3 - * @link http://laconi.ca/ - * - * Laconica - a distributed open-source microblogging tool - * Copyright (C) 2008, 2009, Control Yourself, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -if (!defined('LACONICA')) { - exit(1); -} - -require_once INSTALLDIR.'/lib/profilelist.php'; - -/** - * People search results class - * - * Derivative of ProfileList with specialization for highlighting search terms. - * - * @category Widget - * @package Laconica - * @author Evan Prodromou <evan@controlyourself.ca> - * @author Robin Millette <millette@controlyourself.ca> - * @license http://www.fsf.org/licensing/licenses/agpl.html AGPLv3 - * @link http://laconi.ca/ - * - * @see PeoplesearchAction - */ - -class PeopleSearchResults extends ProfileList -{ - var $terms = null; - var $pattern = null; - - function __construct($profile, $terms, $action) - { - parent::__construct($profile, $action); - - $this->terms = array_map('preg_quote', - array_map('htmlspecialchars', $terms)); - - $this->pattern = '/('.implode('|',$terms).')/i'; - } - - function newProfileItem($profile) - { - return new PeopleSearchResultItem($profile, $this->action); - } -} - -class PeopleSearchResultItem extends ProfileListItem -{ - function highlight($text) - { - return preg_replace($this->pattern, '<strong>\\1</strong>', htmlspecialchars($text)); - } -} - diff --git a/lib/ping.php b/lib/ping.php index 3de541e9a..d26c73417 100644 --- a/lib/ping.php +++ b/lib/ping.php @@ -59,7 +59,7 @@ function ping_broadcast_notice($notice) { $response = xmlrpc_decode($file); - if (xmlrpc_is_fault($response)) { + if (is_array($response) && xmlrpc_is_fault($response)) { common_log(LOG_WARNING, "XML-RPC error for ping ($notify_url, $notice->id) ". "$response[faultString] ($response[faultCode])"); diff --git a/lib/popularnoticesection.php b/lib/popularnoticesection.php index 375d5538b..e47c9b385 100644 --- a/lib/popularnoticesection.php +++ b/lib/popularnoticesection.php @@ -68,7 +68,7 @@ class PopularNoticeSection extends NoticeSection } $qry .= ' GROUP BY notice.id,notice.profile_id,notice.content,notice.uri,' . 'notice.rendered,notice.url,notice.created,notice.modified,' . - 'notice.reply_to,notice.is_local,notice.source ' . + 'notice.reply_to,notice.is_local,notice.source,notice.conversation ' . 'ORDER BY weight DESC'; $offset = 0; diff --git a/lib/profilelist.php b/lib/profilelist.php index a604230f8..774538a4b 100644 --- a/lib/profilelist.php +++ b/lib/profilelist.php @@ -248,8 +248,13 @@ class ProfileListItem extends Widget $usf = new UnsubscribeForm($this->out, $this->profile); $usf->show(); } else { - $sf = new SubscribeForm($this->out, $this->profile); - $sf->show(); + // Is it a local user? can't remote sub from a list + // XXX: make that possible! + $other = User::staticGet('id', $this->profile->id); + if (!empty($other)) { + $sf = new SubscribeForm($this->out, $this->profile); + $sf->show(); + } } $this->out->elementEnd('li'); } diff --git a/lib/queuehandler.php b/lib/queuehandler.php index c1c4f3309..c2ff10f32 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -17,14 +17,16 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -define('CLAIM_TIMEOUT', 1200); - if (!defined('LACONICA')) { exit(1); } require_once(INSTALLDIR.'/lib/daemon.php'); require_once(INSTALLDIR.'/classes/Queue_item.php'); require_once(INSTALLDIR.'/classes/Notice.php'); +define('CLAIM_TIMEOUT', 1200); +define('QUEUE_HANDLER_MISS_IDLE', 10); +define('QUEUE_HANDLER_HIT_IDLE', 0); + class QueueHandler extends Daemon { var $_id = 'generic'; @@ -38,6 +40,11 @@ class QueueHandler extends Daemon } } + function timeout() + { + return 60; + } + function class_name() { return ucfirst($this->transport()) . 'Handler'; @@ -76,110 +83,21 @@ class QueueHandler extends Daemon return true; } - function db_dispatch() { - do { - $qi = Queue_item::top($this->transport()); - if ($qi) { - $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created)); - $notice = Notice::staticGet($qi->notice_id); - if ($notice) { - $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); - # XXX: what to do if broadcast fails? - $result = $this->handle_notice($notice); - if (!$result) { - $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); - $orig = $qi; - $qi->claimed = null; - $qi->update($orig); - $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id); - continue; - } - $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); - $notice->free(); - unset($notice); - $notice = null; - } else { - $this->log(LOG_WARNING, 'queue item for notice that does not exist'); - } - $qi->delete(); - $qi->free(); - unset($qi); - $this->idle(0); - } else { - $this->clear_old_claims(); - $this->idle(5); - } - } while (true); - } - - function stomp_dispatch() { - - // use an external message queue system via STOMP - require_once("Stomp.php"); - - $server = common_config('queue','stomp_server'); - $username = common_config('queue', 'stomp_username'); - $password = common_config('queue', 'stomp_password'); - - $con = new Stomp($server); - - if (!$con->connect($username, $password)) { - $this->log(LOG_ERR, 'Failed to connect to queue server'); - return false; - } - - $queue_basename = common_config('queue','queue_basename'); - // subscribe to the relevant queue (format: basename-transport) - $con->subscribe('/queue/'.$queue_basename.'-'.$this->transport()); - - do { - $frame = $con->readFrame(); - if ($frame) { - $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); - - // XXX: Now the queue handler receives only the ID of the - // notice, and it has to get it from the DB - // A massive improvement would be avoid DB query by transmitting - // all the notice details via queue server... - $notice = Notice::staticGet($frame->body); - - if ($notice) { - $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); - $result = $this->handle_notice($notice); - if ($result) { - // if the msg has been handled positively, ack it - // and the queue server will remove it from the queue - $con->ack($frame); - $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); - } - else { - // no ack - $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); - } - $notice->free(); - unset($notice); - $notice = null; - } else { - $this->log(LOG_WARNING, 'queue item for notice that does not exist'); - } - } - } while (true); - - $con->disconnect(); - } - function run() { if (!$this->start()) { return false; } + $this->log(LOG_INFO, 'checking for queued notices'); - if (common_config('queue','subsystem') == 'stomp') { - $this->stomp_dispatch(); - } - else { - $this->db_dispatch(); - } + + $queue = $this->transport(); + $timeout = $this->timeout(); + + $qm = QueueManager::get(); + + $qm->service($queue, $this); + if (!$this->finish()) { return false; } @@ -188,24 +106,19 @@ class QueueHandler extends Daemon function idle($timeout=0) { - if ($timeout>0) { + if ($timeout > 0) { sleep($timeout); } } - function clear_old_claims() + function log($level, $msg) { - $qi = new Queue_item(); - $qi->transport = $this->transport(); - $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT); - $qi->update(DB_DATAOBJECT_WHEREADD_ONLY); - $qi->free(); - unset($qi); + common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); } - function log($level, $msg) + function getSockets() { - common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); + return array(); } } diff --git a/lib/queuemanager.php b/lib/queuemanager.php new file mode 100644 index 000000000..582c24790 --- /dev/null +++ b/lib/queuemanager.php @@ -0,0 +1,74 @@ +<?php +/** + * Laconica, the distributed open-source microblogging tool + * + * Abstract class for queue managers + * + * PHP version 5 + * + * LICENCE: This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou <evan@controlyourself.ca> + * @author Sarven Capadisli <csarven@controlyourself.ca> + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +class QueueManager +{ + static $qm = null; + + static function get() + { + if (empty(self::$qm)) { + + if (Event::handle('StartNewQueueManager', array(&self::$qm))) { + + $enabled = common_config('queue', 'enabled'); + $type = common_config('queue', 'subsystem'); + + if (!$enabled) { + // does everything immediately + self::$qm = new UnQueueManager(); + } else { + switch ($type) { + case 'db': + self::$qm = new DBQueueManager(); + break; + case 'stomp': + self::$qm = new StompQueueManager(); + break; + default: + throw new ServerException("No queue manager class for type '$type'"); + } + } + } + } + + return self::$qm; + } + + function enqueue($object, $queue) + { + throw ServerException("Unimplemented function 'enqueue' called"); + } + + function service($queue, $handler) + { + throw ServerException("Unimplemented function 'service' called"); + } +} diff --git a/lib/router.php b/lib/router.php index 784ea9882..75e72f932 100644 --- a/lib/router.php +++ b/lib/router.php @@ -261,7 +261,7 @@ class Router $m->connect('api/statuses/:method', array('action' => 'api', 'apiaction' => 'statuses'), - array('method' => '(public_timeline|friends_timeline|user_timeline|update|replies|mentions|friends|followers|featured)(\.(atom|rss|xml|json))?')); + array('method' => '(public_timeline|friends_timeline|user_timeline|update|replies|mentions|show|friends|followers|featured)(\.(atom|rss|xml|json))?')); $m->connect('api/statuses/:method/:argument', array('action' => 'api', @@ -317,7 +317,7 @@ class Router $m->connect('api/friendships/:method', array('action' => 'api', 'apiaction' => 'friendships'), - array('method' => 'exists(\.(xml|json))')); + array('method' => '(show|exists)(\.(xml|json))')); // Social graph diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php new file mode 100644 index 000000000..d13af3fa5 --- /dev/null +++ b/lib/stompqueuemanager.php @@ -0,0 +1,174 @@ +<?php +/** + * Laconica, the distributed open-source microblogging tool + * + * Abstract class for queue managers + * + * PHP version 5 + * + * LICENCE: This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou <evan@controlyourself.ca> + * @author Sarven Capadisli <csarven@controlyourself.ca> + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +require_once 'Stomp.php'; + +class LiberalStomp extends Stomp +{ + function getSocket() + { + return $this->_socket; + } +} + +class StompQueueManager +{ + var $server = null; + var $username = null; + var $password = null; + var $base = null; + var $con = null; + + function __construct() + { + $this->server = common_config('queue', 'stomp_server'); + $this->username = common_config('queue', 'stomp_username'); + $this->password = common_config('queue', 'stomp_password'); + $this->base = common_config('queue', 'queue_basename'); + } + + function _connect() + { + if (empty($this->con)) { + $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'..."); + $this->con = new LiberalStomp($this->server); + + if ($this->con->connect($this->username, $this->password)) { + $this->_log(LOG_INFO, "Connected."); + } else { + $this->_log(LOG_ERR, 'Failed to connect to queue server'); + throw new ServerException('Failed to connect to queue server'); + } + } + } + + function enqueue($object, $queue) + { + $notice = $object; + + $this->_connect(); + + // XXX: serialize and send entire notice + + $result = $this->con->send($this->_queueName($queue), + $notice->id, // BODY of the message + array ('created' => $notice->created)); + + if (!$result) { + common_log(LOG_ERR, 'Error sending to '.$queue.' queue'); + return false; + } + + common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' + . $notice->id . ' for ' . $queue); + } + + function service($queue, $handler) + { + $result = null; + + $this->_connect(); + + $this->con->setReadTimeout($handler->timeout()); + + $this->con->subscribe($this->_queueName($queue)); + + while (true) { + + // Wait for something on one of our sockets + + $stompsock = $this->con->getSocket(); + + $handsocks = $handler->getSockets(); + + $socks = array_merge(array($stompsock), $handsocks); + + $read = $socks; + $write = array(); + $except = array(); + + $ready = stream_select($read, $write, $except, $handler->timeout(), 0); + + if ($ready === false) { + $this->_log(LOG_ERR, "Error selecting on sockets"); + } else if ($ready > 0) { + if (in_array($stompsock, $read)) { + $this->_handleNotice($queue, $handler); + } + foreach ($handsocks as $sock) { + if (in_array($sock, $read)) { + $handler->idle(QUEUE_HANDLER_HIT_IDLE); + break; + } + } + } + } + + $this->con->unsubscribe($this->_queueName($queue)); + } + + function _handleNotice($queue, $handler) + { + $frame = $this->con->readFrame(); + + if (!empty($frame)) { + $notice = Notice::staticGet('id', $frame->body); + + if (empty($notice)) { + $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue); + $this->con->ack($frame); + } else { + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); + $this->con->ack($frame); + } else { + $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); + // FIXME we probably shouldn't have to do + // this kind of queue management ourselves + $this->con->ack($frame); + $this->enqueue($notice, $queue); + } + unset($notice); + } + + unset($frame); + } + } + + function _queueName($queue) + { + return common_config('queue', 'queue_basename') . $queue; + } + + function _log($level, $msg) + { + common_log($level, 'StompQueueManager: '.$msg); + } +} diff --git a/lib/twitter.php b/lib/twitter.php index 3ec082686..47af32e61 100644 --- a/lib/twitter.php +++ b/lib/twitter.php @@ -360,14 +360,11 @@ function is_twitter_bound($notice, $flink) { function broadcast_twitter($notice) { - $success = true; $flink = Foreign_link::getByUserID($notice->profile_id, TWITTER_SERVICE); - // XXX: Not sure WHERE to check whether a notice should go to - // Twitter. Should we even put in the queue if it shouldn't? --Zach - if (!is_null($flink) && is_twitter_bound($notice, $flink)) { + if (is_twitter_bound($notice, $flink)) { $fuser = $flink->getForeignUser(); $twitter_user = $fuser->nickname; @@ -401,33 +398,99 @@ function broadcast_twitter($notice) curl_setopt_array($ch, $options); $data = curl_exec($ch); $errmsg = curl_error($ch); + $errno = curl_errno($ch); - if ($errmsg) { - common_debug("cURL error: $errmsg - " . + if (!empty($errmsg)) { + common_debug("cURL error ($errno): $errmsg - " . "trying to send notice for $twitter_user.", __FILE__); - $success = false; + + $user = $flink->getUser(); + + if ($errmsg == 'The requested URL returned error: 401') { + common_debug(sprintf('User %s (user id: %s) ' . + 'has bad Twitter credentials!', + $user->nickname, $user->id)); + + // Bad credentials we need to delete the foreign_link + // to Twitter and inform the user. + + remove_twitter_link($flink); + + return true; + + } else { + + // Some other error happened, so we should try to + // send again later + + return false; + } + } curl_close($ch); - if (!$data) { + if (empty($data)) { common_debug("No data returned by Twitter's " . "API trying to send update for $twitter_user", __FILE__); - $success = false; - } - // Twitter should return a status - $status = json_decode($data); + // XXX: Not sure this represents a failure to send, but it + // probably does - if (!$status->id) { - common_debug("Unexpected data returned by Twitter " . - " API trying to send update for $twitter_user", - __FILE__); - $success = false; + return false; + + } else { + + // Twitter should return a status + $status = json_decode($data); + + if (empty($status)) { + common_debug("Unexpected data returned by Twitter " . + " API trying to send update for $twitter_user", + __FILE__); + + // XXX: Again, this could represent a failure posting + // or the Twitter API might just be behaving flakey. + // We're treating it as a failure to post. + + return false; + } } } - return $success; + return true; +} + +function remove_twitter_link($flink) +{ + $user = $flink->getUser(); + + common_log(LOG_INFO, 'Removing Twitter bridge Foreign link for ' . + "user $user->nickname (user id: $user->id)."); + + $result = $flink->delete(); + + if (empty($result)) { + common_log(LOG_ERR, 'Could not remove Twitter bridge ' . + "Foreign_link for $user->nickname (user id: $user->id)!"); + common_log_db_error($flink, 'DELETE', __FILE__); + } + + // Notify the user that her Twitter bridge is down + + $result = mail_twitter_bridge_removed($user); + + if (!$result) { + + $msg = 'Unable to send email to notify ' . + "$user->nickname (user id: $user->id) " . + 'that their Twitter bridge link was ' . + 'removed!'; + + common_log(LOG_WARNING, $msg); + } + } + diff --git a/lib/twitterapi.php b/lib/twitterapi.php index f538a0298..40e5b5067 100644 --- a/lib/twitterapi.php +++ b/lib/twitterapi.php @@ -278,6 +278,67 @@ class TwitterapiAction extends Action return $twitter_dm; } + function twitter_relationship_array($source, $target) + { + $relationship = array(); + + $relationship['source'] = + $this->relationship_details_array($source, $target); + $relationship['target'] = + $this->relationship_details_array($target, $source); + + return array('relationship' => $relationship); + } + + function relationship_details_array($source, $target) + { + $details = array(); + + $details['screen_name'] = $source->nickname; + $details['followed_by'] = $target->isSubscribed($source); + $details['following'] = $source->isSubscribed($target); + + $notifications = false; + + if ($source->isSubscribed($target)) { + + $sub = Subscription::pkeyGet(array('subscriber' => + $source->id, 'subscribed' => $target->id)); + + if (!empty($sub)) { + $notifications = ($sub->jabber || $sub->sms); + } + } + + $details['notifications_enabled'] = $notifications; + $details['blocking'] = $source->hasBlocked($target); + $details['id'] = $source->id; + + return $details; + } + + function show_twitter_xml_relationship($relationship) + { + $this->elementStart('relationship'); + + foreach($relationship as $element => $value) { + if ($element == 'source' || $element == 'target') { + $this->elementStart($element); + $this->show_xml_relationship_details($value); + $this->elementEnd($element); + } + } + + $this->elementEnd('relationship'); + } + + function show_xml_relationship_details($details) + { + foreach($details as $element => $value) { + $this->element($element, null, $value); + } + } + function show_twitter_xml_status($twitter_status) { $this->elementStart('status'); diff --git a/lib/unqueuemanager.php b/lib/unqueuemanager.php new file mode 100644 index 000000000..515461072 --- /dev/null +++ b/lib/unqueuemanager.php @@ -0,0 +1,85 @@ +<?php +/** + * Laconica, the distributed open-source microblogging tool + * + * A queue manager interface for just doing things immediately + * + * PHP version 5 + * + * LICENCE: This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou <evan@controlyourself.ca> + * @author Sarven Capadisli <csarven@controlyourself.ca> + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +class UnQueueManager +{ + function enqueue($object, $queue) + { + $notice = $object; + + switch ($queue) + { + case 'omb': + if ($this->_isLocal($notice)) { + require_once(INSTALLDIR.'/lib/omb.php'); + omb_broadcast_remote_subscribers($notice); + } + break; + case 'public': + if ($this->_isLocal($notice)) { + require_once(INSTALLDIR.'/lib/jabber.php'); + jabber_public_notice($notice); + } + break; + case 'twitter': + if ($this->_isLocal($notice)) { + broadcast_twitter($notice); + } + break; + case 'facebook': + if ($this->_isLocal($notice)) { + require_once INSTALLDIR . '/lib/facebookutil.php'; + return facebookBroadcastNotice($notice); + } + break; + case 'ping': + if ($this->_isLocal($notice)) { + require_once INSTALLDIR . '/lib/ping.php'; + return ping_broadcast_notice($notice); + } + case 'sms': + require_once(INSTALLDIR.'/lib/mail.php'); + mail_broadcast_notice_sms($notice); + break; + case 'jabber': + require_once(INSTALLDIR.'/lib/jabber.php'); + jabber_broadcast_notice($notice); + break; + default: + throw ServerException("UnQueueManager: Unknown queue: $type"); + } + } + + function _isLocal($notice) + { + return ($notice->is_local == NOTICE_LOCAL_PUBLIC || + $notice->is_local == NOTICE_LOCAL_NONPUBLIC); + } +}
\ No newline at end of file diff --git a/lib/util.php b/lib/util.php index 461ca15c1..9e8ec41d2 100644 --- a/lib/util.php +++ b/lib/util.php @@ -862,165 +862,45 @@ function common_redirect($url, $code=307) function common_broadcast_notice($notice, $remote=false) { - if (common_config('queue', 'enabled')) { - // Do it later! - return common_enqueue_notice($notice); - } else { - return common_real_broadcast($notice, $remote); - } + return common_enqueue_notice($notice); } // Stick the notice on the queue function common_enqueue_notice($notice) { - $transports = array('omb', 'sms', 'public', 'twitter', 'facebook', 'ping'); - - if (common_config('xmpp', 'enabled')) - { - $transports[] = 'jabber'; - } - - if (common_config('queue','subsystem') == 'stomp') { - common_enqueue_notice_stomp($notice, $transports); - } - else { - common_enqueue_notice_db($notice, $transports); - } - return $result; -} - -function common_enqueue_notice_stomp($notice, $transports) -{ - // use an external message queue system via STOMP - require_once("Stomp.php"); + static $localTransports = array('omb', + 'twitter', + 'facebook', + 'ping'); + static $allTransports = array('sms'); - $server = common_config('queue','stomp_server'); - $username = common_config('queue', 'stomp_username'); - $password = common_config('queue', 'stomp_password'); + $transports = $allTransports; - $con = new Stomp($server); + $xmpp = common_config('xmpp', 'enabled'); - if (!$con->connect($username, $password)) { - common_log(LOG_ERR, 'Failed to connect to queue server'); - return false; + if ($xmpp) { + $transports[] = 'jabber'; } - $queue_basename = common_config('queue','queue_basename'); - - foreach ($transports as $transport) { - $result = $con->send('/queue/'.$queue_basename.'-'.$transport, // QUEUE - $notice->id, // BODY of the message - array ('created' => $notice->created)); - if (!$result) { - common_log(LOG_ERR, 'Error sending to '.$transport.' queue'); - return false; - } - common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $transport); - } - - //send tags as headers, so they can be used as JMS selectors - common_log(LOG_DEBUG, 'searching for tags ' . $notice->id); - $tags = array(); - $tag = new Notice_tag(); - $tag->notice_id = $notice->id; - if ($tag->find()) { - while ($tag->fetch()) { - common_log(LOG_DEBUG, 'tag found = ' . $tag->tag); - array_push($tags,$tag->tag); + if ($notice->is_local == NOTICE_LOCAL_PUBLIC || + $notice->is_local == NOTICE_LOCAL_NONPUBLIC) { + $transports = array_merge($transports, $localTransports); + if ($xmpp) { + $transports[] = 'public'; } } - $tag->free(); - $con->send('/topic/laconica.'.$notice->profile_id, - $notice->content, - array( - 'profile_id' => $notice->profile_id, - 'created' => $notice->created, - 'tags' => implode($tags,' - ') - ) - ); - common_log(LOG_DEBUG, 'sent to personal topic ' . $notice->id); - $con->send('/topic/laconica.allusers', - $notice->content, - array( - 'profile_id' => $notice->profile_id, - 'created' => $notice->created, - 'tags' => implode($tags,' - ') - ) - ); - common_log(LOG_DEBUG, 'sent to catch-all topic ' . $notice->id); - $result = true; -} + $qm = QueueManager::get(); -function common_enqueue_notice_db($notice, $transports) -{ - // in any other case, 'internal' - foreach ($transports as $transport) { - common_enqueue_notice_transport($notice, $transport); + foreach ($transports as $transport) + { + $qm->enqueue($notice, $transport); } -} -function common_enqueue_notice_transport($notice, $transport) -{ - $qi = new Queue_item(); - $qi->notice_id = $notice->id; - $qi->transport = $transport; - $qi->created = $notice->created; - $result = $qi->insert(); - if (!$result) { - $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError'); - common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message); - throw new ServerException('DB error inserting queue item: ' . $last_error->message); - } - common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport); return true; } -function common_real_broadcast($notice, $remote=false) -{ - $success = true; - if (!$remote) { - // Make sure we have the OMB stuff - require_once(INSTALLDIR.'/lib/omb.php'); - $success = omb_broadcast_remote_subscribers($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in OMB broadcast for notice ' . $notice->id); - } - } - if ($success) { - require_once(INSTALLDIR.'/lib/jabber.php'); - $success = jabber_broadcast_notice($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in jabber broadcast for notice ' . $notice->id); - } - } - if ($success) { - require_once(INSTALLDIR.'/lib/mail.php'); - $success = mail_broadcast_notice_sms($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in sms broadcast for notice ' . $notice->id); - } - } - if ($success) { - $success = jabber_public_notice($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in public broadcast for notice ' . $notice->id); - } - } - if ($success) { - $success = broadcast_twitter($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in Twitter broadcast for notice ' . $notice->id); - } - } - - // XXX: Do a real-time FB broadcast here? - - // XXX: broadcast notices to other IM - return $success; -} - function common_broadcast_profile($profile) { // XXX: optionally use a queue system like http://code.google.com/p/microapps/wiki/NQDQ @@ -1098,7 +978,8 @@ function common_ensure_syslog() { static $initialized = false; if (!$initialized) { - openlog(common_config('syslog', 'appname'), 0, LOG_USER); + openlog(common_config('syslog', 'appname'), 0, + common_config('syslog', 'facility')); $initialized = true; } } @@ -1147,6 +1028,9 @@ function common_log_objstring(&$object) if (is_null($object)) { return "null"; } + if (!($object instanceof DB_DataObject)) { + return "(unknown)"; + } $arr = $object->toArray(); $fields = array(); foreach ($arr as $k => $v) { diff --git a/lib/xmppqueuehandler.php b/lib/xmppqueuehandler.php index 986e09c25..77d476c30 100644 --- a/lib/xmppqueuehandler.php +++ b/lib/xmppqueuehandler.php @@ -21,6 +21,8 @@ if (!defined('LACONICA')) { exit(1); } require_once(INSTALLDIR.'/lib/queuehandler.php'); +define('PING_INTERVAL', 120); + /** * Common superclass for all XMPP-using queue handlers. They all need to * service their message queues on idle, and forward any incoming messages @@ -30,6 +32,9 @@ require_once(INSTALLDIR.'/lib/queuehandler.php'); class XmppQueueHandler extends QueueHandler { + var $pingid = 0; + var $lastping = null; + function start() { # Low priority; we don't want to receive messages @@ -44,6 +49,11 @@ class XmppQueueHandler extends QueueHandler return !is_null($this->conn); } + function timeout() + { + return 10; + } + function handle_reconnect(&$pl) { $this->conn->processUntil('session_start'); @@ -55,7 +65,13 @@ class XmppQueueHandler extends QueueHandler # Process the queue for as long as needed try { if ($this->conn) { + $this->log(LOG_DEBUG, "Servicing the XMPP queue."); $this->conn->processTime($timeout); + $now = time(); + if (empty($this->lastping) || $now - $this->lastping > PING_INTERVAL) { + $this->sendPing(); + $this->lastping = $now; + } } } catch (XMPPHP_Exception $e) { $this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); @@ -63,6 +79,22 @@ class XmppQueueHandler extends QueueHandler } } + function sendPing() + { + $jid = jabber_daemon_address().'/'.$this->_id.$this->transport(); + $server = common_config('xmpp', 'server'); + + if (!isset($this->pingid)) { + $this->pingid = 0; + } else { + $this->pingid++; + } + + $this->log(LOG_DEBUG, "Sending ping #{$this->pingid}"); + + $this->conn->send("<iq from='{$jid}' to='{$server}' id='ping_{$this->pingid}' type='get'><ping xmlns='urn:xmpp:ping'/></iq>"); + } + function forward_message(&$pl) { if ($pl['type'] != 'chat') { @@ -91,7 +123,12 @@ class XmppQueueHandler extends QueueHandler if (common_config('xmpp', 'listener')) { return common_config('xmpp', 'listener'); } else { - return jabber_daemon_address() . '/' . common_config('xmpp','resource') . '-listener'; + return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon'; } } + + function getSockets() + { + return array($this->conn->getSocket()); + } } |