summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/action.php7
-rw-r--r--lib/common.php3
-rw-r--r--lib/currentuserdesignaction.php12
-rw-r--r--lib/dbqueuemanager.php166
-rw-r--r--lib/facebookutil.php81
-rw-r--r--lib/groupdesignaction.php16
-rw-r--r--lib/jabber.php10
-rw-r--r--lib/mail.php72
-rw-r--r--lib/ownerdesignaction.php12
-rw-r--r--lib/peoplesearchresults.php80
-rw-r--r--lib/ping.php2
-rw-r--r--lib/popularnoticesection.php2
-rw-r--r--lib/profilelist.php9
-rw-r--r--lib/queuehandler.php133
-rw-r--r--lib/queuemanager.php74
-rw-r--r--lib/router.php4
-rw-r--r--lib/stompqueuemanager.php174
-rw-r--r--lib/twitter.php99
-rw-r--r--lib/twitterapi.php61
-rw-r--r--lib/unqueuemanager.php85
-rw-r--r--lib/util.php164
-rw-r--r--lib/xmppqueuehandler.php39
22 files changed, 922 insertions, 383 deletions
diff --git a/lib/action.php b/lib/action.php
index c89fe180a..da5b48858 100644
--- a/lib/action.php
+++ b/lib/action.php
@@ -439,8 +439,6 @@ class Action extends HTMLOutputter // lawsuit
$this->menuItem(common_local_url('register'),
_('Register'), _('Create an account'), false, 'nav_register');
}
- $this->menuItem(common_local_url('openidlogin'),
- _('OpenID'), _('Login with OpenID'), false, 'nav_openid');
$this->menuItem(common_local_url('login'),
_('Login'), _('Login to the site'), false, 'nav_login');
}
@@ -708,6 +706,11 @@ class Action extends HTMLOutputter // lawsuit
_('About'));
$this->menuItem(common_local_url('doc', array('title' => 'faq')),
_('FAQ'));
+ $bb = common_config('site', 'broughtby');
+ if (!empty($bb)) {
+ $this->menuItem(common_local_url('doc', array('title' => 'tos')),
+ _('TOS'));
+ }
$this->menuItem(common_local_url('doc', array('title' => 'privacy')),
_('Privacy'));
$this->menuItem(common_local_url('doc', array('title' => 'source')),
diff --git a/lib/common.php b/lib/common.php
index 5d451463b..14be747bc 100644
--- a/lib/common.php
+++ b/lib/common.php
@@ -124,7 +124,8 @@ $config =
'dupelimit' => 60), # default for same person saying the same thing
'syslog' =>
array('appname' => 'laconica', # for syslog
- 'priority' => 'debug'), # XXX: currently ignored
+ 'priority' => 'debug', # XXX: currently ignored
+ 'facility' => LOG_USER),
'queue' =>
array('enabled' => false,
'subsystem' => 'db', # default to database, or 'stomp'
diff --git a/lib/currentuserdesignaction.php b/lib/currentuserdesignaction.php
index 7c2520cf6..4c7e15a8b 100644
--- a/lib/currentuserdesignaction.php
+++ b/lib/currentuserdesignaction.php
@@ -53,14 +53,19 @@ class CurrentUserDesignAction extends Action
*
* @return nothing
*/
+
function showStylesheets()
{
parent::showStylesheets();
- $design = $this->getDesign();
+ $user = common_current_user();
+
+ if (empty($user) || $user->viewdesigns) {
+ $design = $this->getDesign();
- if (!empty($design)) {
- $design->showCSS($this);
+ if (!empty($design)) {
+ $design->showCSS($this);
+ }
}
}
@@ -84,5 +89,4 @@ class CurrentUserDesignAction extends Action
return $cur->getDesign();
}
-
}
diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php
new file mode 100644
index 000000000..6e7172de0
--- /dev/null
+++ b/lib/dbqueuemanager.php
@@ -0,0 +1,166 @@
+<?php
+/**
+ * Laconica, the distributed open-source microblogging tool
+ *
+ * Simple-minded queue manager for storing items in the database
+ *
+ * PHP version 5
+ *
+ * LICENCE: This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @category QueueManager
+ * @package Laconica
+ * @author Evan Prodromou <evan@controlyourself.ca>
+ * @author Sarven Capadisli <csarven@controlyourself.ca>
+ * @copyright 2009 Control Yourself, Inc.
+ * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
+ * @link http://laconi.ca/
+ */
+
+class DBQueueManager extends QueueManager
+{
+ var $qis = array();
+
+ function enqueue($object, $queue)
+ {
+ $notice = $object;
+
+ $qi = new Queue_item();
+
+ $qi->notice_id = $notice->id;
+ $qi->transport = $queue;
+ $qi->created = $notice->created;
+ $result = $qi->insert();
+
+ if (!$result) {
+ common_log_db_error($qi, 'INSERT', __FILE__);
+ throw new ServerException('DB error inserting queue item');
+ }
+
+ return true;
+ }
+
+ function service($queue, $handler)
+ {
+ while (true) {
+ $this->_log(LOG_DEBUG, 'Checking for notices...');
+ $notice = $this->_nextItem($queue, null);
+ if (empty($notice)) {
+ $this->_log(LOG_DEBUG, 'No notices waiting; idling.');
+ // Nothing in the queue. Do you
+ // have other tasks, like servicing your
+ // XMPP connection, to do?
+ $handler->idle(QUEUE_HANDLER_MISS_IDLE);
+ } else {
+ $this->_log(LOG_INFO, 'Got notice '. $notice->id);
+ // Yay! Got one!
+ if ($handler->handle_notice($notice)) {
+ $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id);
+ $this->_done($notice, $queue);
+ } else {
+ $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id);
+ $this->_fail($notice, $queue);
+ }
+ // Chance to e.g. service your XMPP connection
+ $this->_log(LOG_DEBUG, 'Idling after success.');
+ $handler->idle(QUEUE_HANDLER_HIT_IDLE);
+ }
+ // XXX: when do we give up?
+ }
+ }
+
+ function _nextItem($queue, $timeout=null)
+ {
+ $start = time();
+ $result = null;
+
+ do {
+ $qi = Queue_item::top($queue);
+ if (!empty($qi)) {
+ $notice = Notice::staticGet('id', $qi->notice_id);
+ if (!empty($notice)) {
+ $result = $notice;
+ } else {
+ $this->_log(LOG_INFO, 'dequeued non-existent notice ' . $notice->id);
+ $qi->delete();
+ $qi->free();
+ $qi = null;
+ }
+ }
+ } while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout));
+
+ return $result;
+ }
+
+ function _done($object, $queue)
+ {
+ // XXX: right now, we only handle notices
+
+ $notice = $object;
+
+ $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
+ 'transport' => $queue));
+
+ if (empty($qi)) {
+ $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
+ } else {
+ if (empty($qi->claimed)) {
+ $this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '.
+ 'for '.$notice->id.', queue '.$queue);
+ }
+ $qi->delete();
+ $qi->free();
+ $qi = null;
+ }
+
+ $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id);
+
+ $notice->free();
+ $notice = null;
+ }
+
+ function _fail($object, $queue)
+ {
+ // XXX: right now, we only handle notices
+
+ $notice = $object;
+
+ $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
+ 'transport' => $queue));
+
+ if (empty($qi)) {
+ $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
+ } else {
+ if (empty($qi->claimed)) {
+ $this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '.
+ 'for '.$notice->id.', queue '.$queue);
+ } else {
+ $orig = clone($qi);
+ $qi->claimed = null;
+ $qi->update($orig);
+ $qi = null;
+ }
+ }
+
+ $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id);
+
+ $notice->free();
+ $notice = null;
+ }
+
+ function _log($level, $msg)
+ {
+ common_log($level, 'DBQueueManager: '.$msg);
+ }
+}
diff --git a/lib/facebookutil.php b/lib/facebookutil.php
index 4d0df797b..632ec4bad 100644
--- a/lib/facebookutil.php
+++ b/lib/facebookutil.php
@@ -51,6 +51,10 @@ function updateProfileBox($facebook, $flink, $notice) {
function isFacebookBound($notice, $flink) {
+ if (empty($flink)) {
+ return false;
+ }
+
// If the user does not want to broadcast to Facebook, move along
if (!($flink->noticesync & FOREIGN_NOTICE_SEND == FOREIGN_NOTICE_SEND)) {
common_log(LOG_INFO, "Skipping notice $notice->id " .
@@ -86,10 +90,10 @@ function isFacebookBound($notice, $flink) {
if ($result != 1) {
$user = $flink->getUser();
- $msg = "Can't send notice $notice->id to Facebook " .
+ $msg = "Not sending notice $notice->id to Facebook " .
"because user $user->nickname hasn't given the " .
'Facebook app \'status_update\' permission.';
- common_log(LOG_INFO, $msg);
+ common_debug($msg);
$success = false;
}
@@ -108,13 +112,16 @@ function facebookBroadcastNotice($notice)
{
$facebook = getFacebook();
$flink = Foreign_link::getByUserID($notice->profile_id, FACEBOOK_SERVICE);
- $fbuid = $flink->foreign_id;
if (isFacebookBound($notice, $flink)) {
$status = null;
+ $fbuid = $flink->foreign_id;
+
+ $user = $flink->getUser();
// Get the status 'verb' (prefix) the user has set
+
try {
$prefix = $facebook->api_client->
data_getUserPreference(FACEBOOK_NOTICE_PREFIX, $fbuid);
@@ -122,23 +129,79 @@ function facebookBroadcastNotice($notice)
$status = "$prefix $notice->content";
} catch(FacebookRestClientException $e) {
- common_log(LOG_ERR, $e->getMessage());
- return false;
+ common_log(LOG_WARNING, $e->getMessage());
+ common_log(LOG_WARNING,
+ 'Unable to get the status verb setting from Facebook ' .
+ "for $user->nickname (user id: $user->id).");
}
- // Okay, we're good to go!
+ // Okay, we're good to go, update the FB status
try {
$facebook->api_client->users_setStatus($status, $fbuid, false, true);
- updateProfileBox($facebook, $flink, $notice);
} catch(FacebookRestClientException $e) {
common_log(LOG_ERR, $e->getMessage());
- return false;
+ common_log(LOG_ERR,
+ 'Unable to update Facebook status for ' .
+ "$user->nickname (user id: $user->id)!");
+
+ $code = $e->getCode();
- // Should we remove flink if this fails?
+ if ($code >= 200) {
+
+ // 200 The application does not have permission to operate on the passed in uid parameter.
+ // 250 Updating status requires the extended permission status_update.
+ // see: http://wiki.developers.facebook.com/index.php/Users.setStatus#Example_Return_XML
+
+ remove_facebook_app($flink);
+ }
+
+ }
+
+ // Now try to update the profile box
+
+ try {
+ updateProfileBox($facebook, $flink, $notice);
+ } catch(FacebookRestClientException $e) {
+ common_log(LOG_WARNING, $e->getMessage());
+ common_log(LOG_WARNING,
+ 'Unable to update Facebook profile box for ' .
+ "$user->nickname (user id: $user->id).");
}
}
return true;
}
+
+function remove_facebook_app($flink)
+{
+
+ $user = $flink->getUser();
+
+ common_log(LOG_INFO, 'Removing Facebook App Foreign link for ' .
+ "user $user->nickname (user id: $user->id).");
+
+ $result = $flink->delete();
+
+ if (empty($result)) {
+ common_log(LOG_ERR, 'Could not remove Facebook App ' .
+ "Foreign_link for $user->nickname (user id: $user->id)!");
+ common_log_db_error($flink, 'DELETE', __FILE__);
+ }
+
+ // Notify the user that we are removing their FB app access
+
+ $result = mail_facebook_app_removed($user);
+
+ if (!$result) {
+
+ $msg = 'Unable to send email to notify ' .
+ "$user->nickname (user id: $user->id) " .
+ 'that their Facebook app link was ' .
+ 'removed!';
+
+ common_log(LOG_WARNING, $msg);
+ }
+
+}
diff --git a/lib/groupdesignaction.php b/lib/groupdesignaction.php
index bc95921f1..58777c283 100644
--- a/lib/groupdesignaction.php
+++ b/lib/groupdesignaction.php
@@ -34,7 +34,7 @@ if (!defined('LACONICA')) {
/**
* Base class for actions that use a group's design
*
- * Pages related to groups can be themed with a design.
+ * Pages related to groups can be themed with a design.
* This superclass returns that design.
*
* @category Action
@@ -48,7 +48,7 @@ class GroupDesignAction extends Action {
/** The group in question */
var $group = null;
-
+
/**
* Show the groups's design stylesheet
*
@@ -58,10 +58,14 @@ class GroupDesignAction extends Action {
{
parent::showStylesheets();
- $design = $this->getDesign();
+ $user = common_current_user();
+
+ if (empty($user) || $user->viewdesigns) {
+ $design = $this->getDesign();
- if (!empty($design)) {
- $design->showCSS($this);
+ if (!empty($design)) {
+ $design->showCSS($this);
+ }
}
}
@@ -76,12 +80,10 @@ class GroupDesignAction extends Action {
function getDesign()
{
-
if (empty($this->group)) {
return null;
}
return $this->group->getDesign();
}
-
}
diff --git a/lib/jabber.php b/lib/jabber.php
index 7d584ad01..e15076160 100644
--- a/lib/jabber.php
+++ b/lib/jabber.php
@@ -77,6 +77,14 @@ function jabber_daemon_address()
return common_config('xmpp', 'user') . '@' . common_config('xmpp', 'server');
}
+class Sharing_XMPP extends XMPPHP_XMPP
+{
+ function getSocket()
+ {
+ return $this->socket;
+ }
+}
+
/**
* connect the configured Jabber account to the configured server
*
@@ -89,7 +97,7 @@ function jabber_connect($resource=null)
{
static $conn = null;
if (!$conn) {
- $conn = new XMPPHP_XMPP(common_config('xmpp', 'host') ?
+ $conn = new Sharing_XMPP(common_config('xmpp', 'host') ?
common_config('xmpp', 'host') :
common_config('xmpp', 'server'),
common_config('xmpp', 'port'),
diff --git a/lib/mail.php b/lib/mail.php
index 4e1f1dbb1..90ee3c992 100644
--- a/lib/mail.php
+++ b/lib/mail.php
@@ -625,3 +625,75 @@ function mail_notify_attn($user, $notice)
common_init_locale();
mail_to_user($user, $subject, $body);
}
+
+/**
+ * Send a mail message to notify a user that her Twitter bridge link
+ * has stopped working, and therefore has been removed. This can
+ * happen when the user changes her Twitter password, or otherwise
+ * revokes access.
+ *
+ * @param User $user user whose Twitter bridge link has been removed
+ *
+ * @return boolean success flag
+ */
+
+function mail_twitter_bridge_removed($user)
+{
+ common_init_locale($user->language);
+
+ $profile = $user->getProfile();
+
+ $subject = sprintf(_('Your Twitter bridge has been disabled.'));
+
+ $body = sprintf(_("Hi, %1\$s. We're sorry to inform you that your " .
+ 'link to Twitter has been disabled. Your Twitter credentials ' .
+ 'have either changed (did you recently change your Twitter ' .
+ 'password?) or you have otherwise revoked our access to your ' .
+ "Twitter account.\n\n" .
+ 'You can re-enable your Twitter bridge by visiting your ' .
+ "Twitter settings page:\n\n\t%2\$s\n\n" .
+ "Regards,\n%3\$s\n"),
+ $profile->getBestName(),
+ common_local_url('twittersettings'),
+ common_config('site', 'name'));
+
+ common_init_locale();
+ return mail_to_user($user, $subject, $body);
+}
+
+/**
+ * Send a mail message to notify a user that her Facebook Application
+ * access has been removed.
+ *
+ * @param User $user user whose Facebook app link has been removed
+ *
+ * @return boolean success flag
+ */
+
+function mail_facebook_app_removed($user)
+{
+ common_init_locale($user->language);
+
+ $profile = $user->getProfile();
+
+ $site_name = common_config('site', 'name');
+
+ $subject = sprintf(
+ _('Your %s Facebook application access has been disabled.',
+ $site_name));
+
+ $body = sprintf(_("Hi, %1\$s. We're sorry to inform you that we are " .
+ 'unable to update your Facebook status from %s, and have disabled ' .
+ 'the Facebook application for your account. This may be because ' .
+ 'you have removed the Facebook application\'s authorization, or ' .
+ 'have deleted your Facebook account. You can re-enable the ' .
+ 'Facebook application and automatic status updating by ' .
+ "re-installing the %1\$s Facebook application.\n\nRegards,\n\n%1\$s"),
+ $site_name);
+
+ common_init_locale();
+ return mail_to_user($user, $subject, $body);
+
+}
+
+
diff --git a/lib/ownerdesignaction.php b/lib/ownerdesignaction.php
index 424474f42..785b8a93d 100644
--- a/lib/ownerdesignaction.php
+++ b/lib/ownerdesignaction.php
@@ -61,11 +61,15 @@ class OwnerDesignAction extends Action {
{
parent::showStylesheets();
- $design = $this->getDesign();
+ $user = common_current_user();
- if (!empty($design)) {
- $design->showCSS($this);
- }
+ if (empty($user) || $user->viewdesigns) {
+ $design = $this->getDesign();
+
+ if (!empty($design)) {
+ $design->showCSS($this);
+ }
+ }
}
/**
diff --git a/lib/peoplesearchresults.php b/lib/peoplesearchresults.php
deleted file mode 100644
index 9f6696b5f..000000000
--- a/lib/peoplesearchresults.php
+++ /dev/null
@@ -1,80 +0,0 @@
-<?php
-/**
- * People search results class
- *
- * PHP version 5
- *
- * @category Widget
- * @package Laconica
- * @author Evan Prodromou <evan@controlyourself.ca>
- * @author Robin Millette <millette@controlyourself.ca>
- * @license http://www.fsf.org/licensing/licenses/agpl.html AGPLv3
- * @link http://laconi.ca/
- *
- * Laconica - a distributed open-source microblogging tool
- * Copyright (C) 2008, 2009, Control Yourself, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-if (!defined('LACONICA')) {
- exit(1);
-}
-
-require_once INSTALLDIR.'/lib/profilelist.php';
-
-/**
- * People search results class
- *
- * Derivative of ProfileList with specialization for highlighting search terms.
- *
- * @category Widget
- * @package Laconica
- * @author Evan Prodromou <evan@controlyourself.ca>
- * @author Robin Millette <millette@controlyourself.ca>
- * @license http://www.fsf.org/licensing/licenses/agpl.html AGPLv3
- * @link http://laconi.ca/
- *
- * @see PeoplesearchAction
- */
-
-class PeopleSearchResults extends ProfileList
-{
- var $terms = null;
- var $pattern = null;
-
- function __construct($profile, $terms, $action)
- {
- parent::__construct($profile, $action);
-
- $this->terms = array_map('preg_quote',
- array_map('htmlspecialchars', $terms));
-
- $this->pattern = '/('.implode('|',$terms).')/i';
- }
-
- function newProfileItem($profile)
- {
- return new PeopleSearchResultItem($profile, $this->action);
- }
-}
-
-class PeopleSearchResultItem extends ProfileListItem
-{
- function highlight($text)
- {
- return preg_replace($this->pattern, '<strong>\\1</strong>', htmlspecialchars($text));
- }
-}
-
diff --git a/lib/ping.php b/lib/ping.php
index 3de541e9a..d26c73417 100644
--- a/lib/ping.php
+++ b/lib/ping.php
@@ -59,7 +59,7 @@ function ping_broadcast_notice($notice) {
$response = xmlrpc_decode($file);
- if (xmlrpc_is_fault($response)) {
+ if (is_array($response) && xmlrpc_is_fault($response)) {
common_log(LOG_WARNING,
"XML-RPC error for ping ($notify_url, $notice->id) ".
"$response[faultString] ($response[faultCode])");
diff --git a/lib/popularnoticesection.php b/lib/popularnoticesection.php
index 375d5538b..e47c9b385 100644
--- a/lib/popularnoticesection.php
+++ b/lib/popularnoticesection.php
@@ -68,7 +68,7 @@ class PopularNoticeSection extends NoticeSection
}
$qry .= ' GROUP BY notice.id,notice.profile_id,notice.content,notice.uri,' .
'notice.rendered,notice.url,notice.created,notice.modified,' .
- 'notice.reply_to,notice.is_local,notice.source ' .
+ 'notice.reply_to,notice.is_local,notice.source,notice.conversation ' .
'ORDER BY weight DESC';
$offset = 0;
diff --git a/lib/profilelist.php b/lib/profilelist.php
index a604230f8..774538a4b 100644
--- a/lib/profilelist.php
+++ b/lib/profilelist.php
@@ -248,8 +248,13 @@ class ProfileListItem extends Widget
$usf = new UnsubscribeForm($this->out, $this->profile);
$usf->show();
} else {
- $sf = new SubscribeForm($this->out, $this->profile);
- $sf->show();
+ // Is it a local user? can't remote sub from a list
+ // XXX: make that possible!
+ $other = User::staticGet('id', $this->profile->id);
+ if (!empty($other)) {
+ $sf = new SubscribeForm($this->out, $this->profile);
+ $sf->show();
+ }
}
$this->out->elementEnd('li');
}
diff --git a/lib/queuehandler.php b/lib/queuehandler.php
index c1c4f3309..c2ff10f32 100644
--- a/lib/queuehandler.php
+++ b/lib/queuehandler.php
@@ -17,14 +17,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-define('CLAIM_TIMEOUT', 1200);
-
if (!defined('LACONICA')) { exit(1); }
require_once(INSTALLDIR.'/lib/daemon.php');
require_once(INSTALLDIR.'/classes/Queue_item.php');
require_once(INSTALLDIR.'/classes/Notice.php');
+define('CLAIM_TIMEOUT', 1200);
+define('QUEUE_HANDLER_MISS_IDLE', 10);
+define('QUEUE_HANDLER_HIT_IDLE', 0);
+
class QueueHandler extends Daemon
{
var $_id = 'generic';
@@ -38,6 +40,11 @@ class QueueHandler extends Daemon
}
}
+ function timeout()
+ {
+ return 60;
+ }
+
function class_name()
{
return ucfirst($this->transport()) . 'Handler';
@@ -76,110 +83,21 @@ class QueueHandler extends Daemon
return true;
}
- function db_dispatch() {
- do {
- $qi = Queue_item::top($this->transport());
- if ($qi) {
- $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created));
- $notice = Notice::staticGet($qi->notice_id);
- if ($notice) {
- $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
- # XXX: what to do if broadcast fails?
- $result = $this->handle_notice($notice);
- if (!$result) {
- $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
- $orig = $qi;
- $qi->claimed = null;
- $qi->update($orig);
- $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id);
- continue;
- }
- $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
- $notice->free();
- unset($notice);
- $notice = null;
- } else {
- $this->log(LOG_WARNING, 'queue item for notice that does not exist');
- }
- $qi->delete();
- $qi->free();
- unset($qi);
- $this->idle(0);
- } else {
- $this->clear_old_claims();
- $this->idle(5);
- }
- } while (true);
- }
-
- function stomp_dispatch() {
-
- // use an external message queue system via STOMP
- require_once("Stomp.php");
-
- $server = common_config('queue','stomp_server');
- $username = common_config('queue', 'stomp_username');
- $password = common_config('queue', 'stomp_password');
-
- $con = new Stomp($server);
-
- if (!$con->connect($username, $password)) {
- $this->log(LOG_ERR, 'Failed to connect to queue server');
- return false;
- }
-
- $queue_basename = common_config('queue','queue_basename');
- // subscribe to the relevant queue (format: basename-transport)
- $con->subscribe('/queue/'.$queue_basename.'-'.$this->transport());
-
- do {
- $frame = $con->readFrame();
- if ($frame) {
- $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created']));
-
- // XXX: Now the queue handler receives only the ID of the
- // notice, and it has to get it from the DB
- // A massive improvement would be avoid DB query by transmitting
- // all the notice details via queue server...
- $notice = Notice::staticGet($frame->body);
-
- if ($notice) {
- $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
- $result = $this->handle_notice($notice);
- if ($result) {
- // if the msg has been handled positively, ack it
- // and the queue server will remove it from the queue
- $con->ack($frame);
- $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
- }
- else {
- // no ack
- $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
- }
- $notice->free();
- unset($notice);
- $notice = null;
- } else {
- $this->log(LOG_WARNING, 'queue item for notice that does not exist');
- }
- }
- } while (true);
-
- $con->disconnect();
- }
-
function run()
{
if (!$this->start()) {
return false;
}
+
$this->log(LOG_INFO, 'checking for queued notices');
- if (common_config('queue','subsystem') == 'stomp') {
- $this->stomp_dispatch();
- }
- else {
- $this->db_dispatch();
- }
+
+ $queue = $this->transport();
+ $timeout = $this->timeout();
+
+ $qm = QueueManager::get();
+
+ $qm->service($queue, $this);
+
if (!$this->finish()) {
return false;
}
@@ -188,24 +106,19 @@ class QueueHandler extends Daemon
function idle($timeout=0)
{
- if ($timeout>0) {
+ if ($timeout > 0) {
sleep($timeout);
}
}
- function clear_old_claims()
+ function log($level, $msg)
{
- $qi = new Queue_item();
- $qi->transport = $this->transport();
- $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT);
- $qi->update(DB_DATAOBJECT_WHEREADD_ONLY);
- $qi->free();
- unset($qi);
+ common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
}
- function log($level, $msg)
+ function getSockets()
{
- common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
+ return array();
}
}
diff --git a/lib/queuemanager.php b/lib/queuemanager.php
new file mode 100644
index 000000000..582c24790
--- /dev/null
+++ b/lib/queuemanager.php
@@ -0,0 +1,74 @@
+<?php
+/**
+ * Laconica, the distributed open-source microblogging tool
+ *
+ * Abstract class for queue managers
+ *
+ * PHP version 5
+ *
+ * LICENCE: This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @category QueueManager
+ * @package Laconica
+ * @author Evan Prodromou <evan@controlyourself.ca>
+ * @author Sarven Capadisli <csarven@controlyourself.ca>
+ * @copyright 2009 Control Yourself, Inc.
+ * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
+ * @link http://laconi.ca/
+ */
+
+class QueueManager
+{
+ static $qm = null;
+
+ static function get()
+ {
+ if (empty(self::$qm)) {
+
+ if (Event::handle('StartNewQueueManager', array(&self::$qm))) {
+
+ $enabled = common_config('queue', 'enabled');
+ $type = common_config('queue', 'subsystem');
+
+ if (!$enabled) {
+ // does everything immediately
+ self::$qm = new UnQueueManager();
+ } else {
+ switch ($type) {
+ case 'db':
+ self::$qm = new DBQueueManager();
+ break;
+ case 'stomp':
+ self::$qm = new StompQueueManager();
+ break;
+ default:
+ throw new ServerException("No queue manager class for type '$type'");
+ }
+ }
+ }
+ }
+
+ return self::$qm;
+ }
+
+ function enqueue($object, $queue)
+ {
+ throw ServerException("Unimplemented function 'enqueue' called");
+ }
+
+ function service($queue, $handler)
+ {
+ throw ServerException("Unimplemented function 'service' called");
+ }
+}
diff --git a/lib/router.php b/lib/router.php
index 784ea9882..75e72f932 100644
--- a/lib/router.php
+++ b/lib/router.php
@@ -261,7 +261,7 @@ class Router
$m->connect('api/statuses/:method',
array('action' => 'api',
'apiaction' => 'statuses'),
- array('method' => '(public_timeline|friends_timeline|user_timeline|update|replies|mentions|friends|followers|featured)(\.(atom|rss|xml|json))?'));
+ array('method' => '(public_timeline|friends_timeline|user_timeline|update|replies|mentions|show|friends|followers|featured)(\.(atom|rss|xml|json))?'));
$m->connect('api/statuses/:method/:argument',
array('action' => 'api',
@@ -317,7 +317,7 @@ class Router
$m->connect('api/friendships/:method',
array('action' => 'api',
'apiaction' => 'friendships'),
- array('method' => 'exists(\.(xml|json))'));
+ array('method' => '(show|exists)(\.(xml|json))'));
// Social graph
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
new file mode 100644
index 000000000..d13af3fa5
--- /dev/null
+++ b/lib/stompqueuemanager.php
@@ -0,0 +1,174 @@
+<?php
+/**
+ * Laconica, the distributed open-source microblogging tool
+ *
+ * Abstract class for queue managers
+ *
+ * PHP version 5
+ *
+ * LICENCE: This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @category QueueManager
+ * @package Laconica
+ * @author Evan Prodromou <evan@controlyourself.ca>
+ * @author Sarven Capadisli <csarven@controlyourself.ca>
+ * @copyright 2009 Control Yourself, Inc.
+ * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
+ * @link http://laconi.ca/
+ */
+
+require_once 'Stomp.php';
+
+class LiberalStomp extends Stomp
+{
+ function getSocket()
+ {
+ return $this->_socket;
+ }
+}
+
+class StompQueueManager
+{
+ var $server = null;
+ var $username = null;
+ var $password = null;
+ var $base = null;
+ var $con = null;
+
+ function __construct()
+ {
+ $this->server = common_config('queue', 'stomp_server');
+ $this->username = common_config('queue', 'stomp_username');
+ $this->password = common_config('queue', 'stomp_password');
+ $this->base = common_config('queue', 'queue_basename');
+ }
+
+ function _connect()
+ {
+ if (empty($this->con)) {
+ $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'...");
+ $this->con = new LiberalStomp($this->server);
+
+ if ($this->con->connect($this->username, $this->password)) {
+ $this->_log(LOG_INFO, "Connected.");
+ } else {
+ $this->_log(LOG_ERR, 'Failed to connect to queue server');
+ throw new ServerException('Failed to connect to queue server');
+ }
+ }
+ }
+
+ function enqueue($object, $queue)
+ {
+ $notice = $object;
+
+ $this->_connect();
+
+ // XXX: serialize and send entire notice
+
+ $result = $this->con->send($this->_queueName($queue),
+ $notice->id, // BODY of the message
+ array ('created' => $notice->created));
+
+ if (!$result) {
+ common_log(LOG_ERR, 'Error sending to '.$queue.' queue');
+ return false;
+ }
+
+ common_log(LOG_DEBUG, 'complete remote queueing notice ID = '
+ . $notice->id . ' for ' . $queue);
+ }
+
+ function service($queue, $handler)
+ {
+ $result = null;
+
+ $this->_connect();
+
+ $this->con->setReadTimeout($handler->timeout());
+
+ $this->con->subscribe($this->_queueName($queue));
+
+ while (true) {
+
+ // Wait for something on one of our sockets
+
+ $stompsock = $this->con->getSocket();
+
+ $handsocks = $handler->getSockets();
+
+ $socks = array_merge(array($stompsock), $handsocks);
+
+ $read = $socks;
+ $write = array();
+ $except = array();
+
+ $ready = stream_select($read, $write, $except, $handler->timeout(), 0);
+
+ if ($ready === false) {
+ $this->_log(LOG_ERR, "Error selecting on sockets");
+ } else if ($ready > 0) {
+ if (in_array($stompsock, $read)) {
+ $this->_handleNotice($queue, $handler);
+ }
+ foreach ($handsocks as $sock) {
+ if (in_array($sock, $read)) {
+ $handler->idle(QUEUE_HANDLER_HIT_IDLE);
+ break;
+ }
+ }
+ }
+ }
+
+ $this->con->unsubscribe($this->_queueName($queue));
+ }
+
+ function _handleNotice($queue, $handler)
+ {
+ $frame = $this->con->readFrame();
+
+ if (!empty($frame)) {
+ $notice = Notice::staticGet('id', $frame->body);
+
+ if (empty($notice)) {
+ $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue);
+ $this->con->ack($frame);
+ } else {
+ if ($handler->handle_notice($notice)) {
+ $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
+ $this->con->ack($frame);
+ } else {
+ $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
+ // FIXME we probably shouldn't have to do
+ // this kind of queue management ourselves
+ $this->con->ack($frame);
+ $this->enqueue($notice, $queue);
+ }
+ unset($notice);
+ }
+
+ unset($frame);
+ }
+ }
+
+ function _queueName($queue)
+ {
+ return common_config('queue', 'queue_basename') . $queue;
+ }
+
+ function _log($level, $msg)
+ {
+ common_log($level, 'StompQueueManager: '.$msg);
+ }
+}
diff --git a/lib/twitter.php b/lib/twitter.php
index 3ec082686..47af32e61 100644
--- a/lib/twitter.php
+++ b/lib/twitter.php
@@ -360,14 +360,11 @@ function is_twitter_bound($notice, $flink) {
function broadcast_twitter($notice)
{
- $success = true;
$flink = Foreign_link::getByUserID($notice->profile_id,
TWITTER_SERVICE);
- // XXX: Not sure WHERE to check whether a notice should go to
- // Twitter. Should we even put in the queue if it shouldn't? --Zach
- if (!is_null($flink) && is_twitter_bound($notice, $flink)) {
+ if (is_twitter_bound($notice, $flink)) {
$fuser = $flink->getForeignUser();
$twitter_user = $fuser->nickname;
@@ -401,33 +398,99 @@ function broadcast_twitter($notice)
curl_setopt_array($ch, $options);
$data = curl_exec($ch);
$errmsg = curl_error($ch);
+ $errno = curl_errno($ch);
- if ($errmsg) {
- common_debug("cURL error: $errmsg - " .
+ if (!empty($errmsg)) {
+ common_debug("cURL error ($errno): $errmsg - " .
"trying to send notice for $twitter_user.",
__FILE__);
- $success = false;
+
+ $user = $flink->getUser();
+
+ if ($errmsg == 'The requested URL returned error: 401') {
+ common_debug(sprintf('User %s (user id: %s) ' .
+ 'has bad Twitter credentials!',
+ $user->nickname, $user->id));
+
+ // Bad credentials we need to delete the foreign_link
+ // to Twitter and inform the user.
+
+ remove_twitter_link($flink);
+
+ return true;
+
+ } else {
+
+ // Some other error happened, so we should try to
+ // send again later
+
+ return false;
+ }
+
}
curl_close($ch);
- if (!$data) {
+ if (empty($data)) {
common_debug("No data returned by Twitter's " .
"API trying to send update for $twitter_user",
__FILE__);
- $success = false;
- }
- // Twitter should return a status
- $status = json_decode($data);
+ // XXX: Not sure this represents a failure to send, but it
+ // probably does
- if (!$status->id) {
- common_debug("Unexpected data returned by Twitter " .
- " API trying to send update for $twitter_user",
- __FILE__);
- $success = false;
+ return false;
+
+ } else {
+
+ // Twitter should return a status
+ $status = json_decode($data);
+
+ if (empty($status)) {
+ common_debug("Unexpected data returned by Twitter " .
+ " API trying to send update for $twitter_user",
+ __FILE__);
+
+ // XXX: Again, this could represent a failure posting
+ // or the Twitter API might just be behaving flakey.
+ // We're treating it as a failure to post.
+
+ return false;
+ }
}
}
- return $success;
+ return true;
+}
+
+function remove_twitter_link($flink)
+{
+ $user = $flink->getUser();
+
+ common_log(LOG_INFO, 'Removing Twitter bridge Foreign link for ' .
+ "user $user->nickname (user id: $user->id).");
+
+ $result = $flink->delete();
+
+ if (empty($result)) {
+ common_log(LOG_ERR, 'Could not remove Twitter bridge ' .
+ "Foreign_link for $user->nickname (user id: $user->id)!");
+ common_log_db_error($flink, 'DELETE', __FILE__);
+ }
+
+ // Notify the user that her Twitter bridge is down
+
+ $result = mail_twitter_bridge_removed($user);
+
+ if (!$result) {
+
+ $msg = 'Unable to send email to notify ' .
+ "$user->nickname (user id: $user->id) " .
+ 'that their Twitter bridge link was ' .
+ 'removed!';
+
+ common_log(LOG_WARNING, $msg);
+ }
+
}
+
diff --git a/lib/twitterapi.php b/lib/twitterapi.php
index f538a0298..40e5b5067 100644
--- a/lib/twitterapi.php
+++ b/lib/twitterapi.php
@@ -278,6 +278,67 @@ class TwitterapiAction extends Action
return $twitter_dm;
}
+ function twitter_relationship_array($source, $target)
+ {
+ $relationship = array();
+
+ $relationship['source'] =
+ $this->relationship_details_array($source, $target);
+ $relationship['target'] =
+ $this->relationship_details_array($target, $source);
+
+ return array('relationship' => $relationship);
+ }
+
+ function relationship_details_array($source, $target)
+ {
+ $details = array();
+
+ $details['screen_name'] = $source->nickname;
+ $details['followed_by'] = $target->isSubscribed($source);
+ $details['following'] = $source->isSubscribed($target);
+
+ $notifications = false;
+
+ if ($source->isSubscribed($target)) {
+
+ $sub = Subscription::pkeyGet(array('subscriber' =>
+ $source->id, 'subscribed' => $target->id));
+
+ if (!empty($sub)) {
+ $notifications = ($sub->jabber || $sub->sms);
+ }
+ }
+
+ $details['notifications_enabled'] = $notifications;
+ $details['blocking'] = $source->hasBlocked($target);
+ $details['id'] = $source->id;
+
+ return $details;
+ }
+
+ function show_twitter_xml_relationship($relationship)
+ {
+ $this->elementStart('relationship');
+
+ foreach($relationship as $element => $value) {
+ if ($element == 'source' || $element == 'target') {
+ $this->elementStart($element);
+ $this->show_xml_relationship_details($value);
+ $this->elementEnd($element);
+ }
+ }
+
+ $this->elementEnd('relationship');
+ }
+
+ function show_xml_relationship_details($details)
+ {
+ foreach($details as $element => $value) {
+ $this->element($element, null, $value);
+ }
+ }
+
function show_twitter_xml_status($twitter_status)
{
$this->elementStart('status');
diff --git a/lib/unqueuemanager.php b/lib/unqueuemanager.php
new file mode 100644
index 000000000..515461072
--- /dev/null
+++ b/lib/unqueuemanager.php
@@ -0,0 +1,85 @@
+<?php
+/**
+ * Laconica, the distributed open-source microblogging tool
+ *
+ * A queue manager interface for just doing things immediately
+ *
+ * PHP version 5
+ *
+ * LICENCE: This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @category QueueManager
+ * @package Laconica
+ * @author Evan Prodromou <evan@controlyourself.ca>
+ * @author Sarven Capadisli <csarven@controlyourself.ca>
+ * @copyright 2009 Control Yourself, Inc.
+ * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
+ * @link http://laconi.ca/
+ */
+
+class UnQueueManager
+{
+ function enqueue($object, $queue)
+ {
+ $notice = $object;
+
+ switch ($queue)
+ {
+ case 'omb':
+ if ($this->_isLocal($notice)) {
+ require_once(INSTALLDIR.'/lib/omb.php');
+ omb_broadcast_remote_subscribers($notice);
+ }
+ break;
+ case 'public':
+ if ($this->_isLocal($notice)) {
+ require_once(INSTALLDIR.'/lib/jabber.php');
+ jabber_public_notice($notice);
+ }
+ break;
+ case 'twitter':
+ if ($this->_isLocal($notice)) {
+ broadcast_twitter($notice);
+ }
+ break;
+ case 'facebook':
+ if ($this->_isLocal($notice)) {
+ require_once INSTALLDIR . '/lib/facebookutil.php';
+ return facebookBroadcastNotice($notice);
+ }
+ break;
+ case 'ping':
+ if ($this->_isLocal($notice)) {
+ require_once INSTALLDIR . '/lib/ping.php';
+ return ping_broadcast_notice($notice);
+ }
+ case 'sms':
+ require_once(INSTALLDIR.'/lib/mail.php');
+ mail_broadcast_notice_sms($notice);
+ break;
+ case 'jabber':
+ require_once(INSTALLDIR.'/lib/jabber.php');
+ jabber_broadcast_notice($notice);
+ break;
+ default:
+ throw ServerException("UnQueueManager: Unknown queue: $type");
+ }
+ }
+
+ function _isLocal($notice)
+ {
+ return ($notice->is_local == NOTICE_LOCAL_PUBLIC ||
+ $notice->is_local == NOTICE_LOCAL_NONPUBLIC);
+ }
+} \ No newline at end of file
diff --git a/lib/util.php b/lib/util.php
index 461ca15c1..9e8ec41d2 100644
--- a/lib/util.php
+++ b/lib/util.php
@@ -862,165 +862,45 @@ function common_redirect($url, $code=307)
function common_broadcast_notice($notice, $remote=false)
{
- if (common_config('queue', 'enabled')) {
- // Do it later!
- return common_enqueue_notice($notice);
- } else {
- return common_real_broadcast($notice, $remote);
- }
+ return common_enqueue_notice($notice);
}
// Stick the notice on the queue
function common_enqueue_notice($notice)
{
- $transports = array('omb', 'sms', 'public', 'twitter', 'facebook', 'ping');
-
- if (common_config('xmpp', 'enabled'))
- {
- $transports[] = 'jabber';
- }
-
- if (common_config('queue','subsystem') == 'stomp') {
- common_enqueue_notice_stomp($notice, $transports);
- }
- else {
- common_enqueue_notice_db($notice, $transports);
- }
- return $result;
-}
-
-function common_enqueue_notice_stomp($notice, $transports)
-{
- // use an external message queue system via STOMP
- require_once("Stomp.php");
+ static $localTransports = array('omb',
+ 'twitter',
+ 'facebook',
+ 'ping');
+ static $allTransports = array('sms');
- $server = common_config('queue','stomp_server');
- $username = common_config('queue', 'stomp_username');
- $password = common_config('queue', 'stomp_password');
+ $transports = $allTransports;
- $con = new Stomp($server);
+ $xmpp = common_config('xmpp', 'enabled');
- if (!$con->connect($username, $password)) {
- common_log(LOG_ERR, 'Failed to connect to queue server');
- return false;
+ if ($xmpp) {
+ $transports[] = 'jabber';
}
- $queue_basename = common_config('queue','queue_basename');
-
- foreach ($transports as $transport) {
- $result = $con->send('/queue/'.$queue_basename.'-'.$transport, // QUEUE
- $notice->id, // BODY of the message
- array ('created' => $notice->created));
- if (!$result) {
- common_log(LOG_ERR, 'Error sending to '.$transport.' queue');
- return false;
- }
- common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $transport);
- }
-
- //send tags as headers, so they can be used as JMS selectors
- common_log(LOG_DEBUG, 'searching for tags ' . $notice->id);
- $tags = array();
- $tag = new Notice_tag();
- $tag->notice_id = $notice->id;
- if ($tag->find()) {
- while ($tag->fetch()) {
- common_log(LOG_DEBUG, 'tag found = ' . $tag->tag);
- array_push($tags,$tag->tag);
+ if ($notice->is_local == NOTICE_LOCAL_PUBLIC ||
+ $notice->is_local == NOTICE_LOCAL_NONPUBLIC) {
+ $transports = array_merge($transports, $localTransports);
+ if ($xmpp) {
+ $transports[] = 'public';
}
}
- $tag->free();
- $con->send('/topic/laconica.'.$notice->profile_id,
- $notice->content,
- array(
- 'profile_id' => $notice->profile_id,
- 'created' => $notice->created,
- 'tags' => implode($tags,' - ')
- )
- );
- common_log(LOG_DEBUG, 'sent to personal topic ' . $notice->id);
- $con->send('/topic/laconica.allusers',
- $notice->content,
- array(
- 'profile_id' => $notice->profile_id,
- 'created' => $notice->created,
- 'tags' => implode($tags,' - ')
- )
- );
- common_log(LOG_DEBUG, 'sent to catch-all topic ' . $notice->id);
- $result = true;
-}
+ $qm = QueueManager::get();
-function common_enqueue_notice_db($notice, $transports)
-{
- // in any other case, 'internal'
- foreach ($transports as $transport) {
- common_enqueue_notice_transport($notice, $transport);
+ foreach ($transports as $transport)
+ {
+ $qm->enqueue($notice, $transport);
}
-}
-function common_enqueue_notice_transport($notice, $transport)
-{
- $qi = new Queue_item();
- $qi->notice_id = $notice->id;
- $qi->transport = $transport;
- $qi->created = $notice->created;
- $result = $qi->insert();
- if (!$result) {
- $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError');
- common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message);
- throw new ServerException('DB error inserting queue item: ' . $last_error->message);
- }
- common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport);
return true;
}
-function common_real_broadcast($notice, $remote=false)
-{
- $success = true;
- if (!$remote) {
- // Make sure we have the OMB stuff
- require_once(INSTALLDIR.'/lib/omb.php');
- $success = omb_broadcast_remote_subscribers($notice);
- if (!$success) {
- common_log(LOG_ERR, 'Error in OMB broadcast for notice ' . $notice->id);
- }
- }
- if ($success) {
- require_once(INSTALLDIR.'/lib/jabber.php');
- $success = jabber_broadcast_notice($notice);
- if (!$success) {
- common_log(LOG_ERR, 'Error in jabber broadcast for notice ' . $notice->id);
- }
- }
- if ($success) {
- require_once(INSTALLDIR.'/lib/mail.php');
- $success = mail_broadcast_notice_sms($notice);
- if (!$success) {
- common_log(LOG_ERR, 'Error in sms broadcast for notice ' . $notice->id);
- }
- }
- if ($success) {
- $success = jabber_public_notice($notice);
- if (!$success) {
- common_log(LOG_ERR, 'Error in public broadcast for notice ' . $notice->id);
- }
- }
- if ($success) {
- $success = broadcast_twitter($notice);
- if (!$success) {
- common_log(LOG_ERR, 'Error in Twitter broadcast for notice ' . $notice->id);
- }
- }
-
- // XXX: Do a real-time FB broadcast here?
-
- // XXX: broadcast notices to other IM
- return $success;
-}
-
function common_broadcast_profile($profile)
{
// XXX: optionally use a queue system like http://code.google.com/p/microapps/wiki/NQDQ
@@ -1098,7 +978,8 @@ function common_ensure_syslog()
{
static $initialized = false;
if (!$initialized) {
- openlog(common_config('syslog', 'appname'), 0, LOG_USER);
+ openlog(common_config('syslog', 'appname'), 0,
+ common_config('syslog', 'facility'));
$initialized = true;
}
}
@@ -1147,6 +1028,9 @@ function common_log_objstring(&$object)
if (is_null($object)) {
return "null";
}
+ if (!($object instanceof DB_DataObject)) {
+ return "(unknown)";
+ }
$arr = $object->toArray();
$fields = array();
foreach ($arr as $k => $v) {
diff --git a/lib/xmppqueuehandler.php b/lib/xmppqueuehandler.php
index 986e09c25..77d476c30 100644
--- a/lib/xmppqueuehandler.php
+++ b/lib/xmppqueuehandler.php
@@ -21,6 +21,8 @@ if (!defined('LACONICA')) { exit(1); }
require_once(INSTALLDIR.'/lib/queuehandler.php');
+define('PING_INTERVAL', 120);
+
/**
* Common superclass for all XMPP-using queue handlers. They all need to
* service their message queues on idle, and forward any incoming messages
@@ -30,6 +32,9 @@ require_once(INSTALLDIR.'/lib/queuehandler.php');
class XmppQueueHandler extends QueueHandler
{
+ var $pingid = 0;
+ var $lastping = null;
+
function start()
{
# Low priority; we don't want to receive messages
@@ -44,6 +49,11 @@ class XmppQueueHandler extends QueueHandler
return !is_null($this->conn);
}
+ function timeout()
+ {
+ return 10;
+ }
+
function handle_reconnect(&$pl)
{
$this->conn->processUntil('session_start');
@@ -55,7 +65,13 @@ class XmppQueueHandler extends QueueHandler
# Process the queue for as long as needed
try {
if ($this->conn) {
+ $this->log(LOG_DEBUG, "Servicing the XMPP queue.");
$this->conn->processTime($timeout);
+ $now = time();
+ if (empty($this->lastping) || $now - $this->lastping > PING_INTERVAL) {
+ $this->sendPing();
+ $this->lastping = $now;
+ }
}
} catch (XMPPHP_Exception $e) {
$this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
@@ -63,6 +79,22 @@ class XmppQueueHandler extends QueueHandler
}
}
+ function sendPing()
+ {
+ $jid = jabber_daemon_address().'/'.$this->_id.$this->transport();
+ $server = common_config('xmpp', 'server');
+
+ if (!isset($this->pingid)) {
+ $this->pingid = 0;
+ } else {
+ $this->pingid++;
+ }
+
+ $this->log(LOG_DEBUG, "Sending ping #{$this->pingid}");
+
+ $this->conn->send("<iq from='{$jid}' to='{$server}' id='ping_{$this->pingid}' type='get'><ping xmlns='urn:xmpp:ping'/></iq>");
+ }
+
function forward_message(&$pl)
{
if ($pl['type'] != 'chat') {
@@ -91,7 +123,12 @@ class XmppQueueHandler extends QueueHandler
if (common_config('xmpp', 'listener')) {
return common_config('xmpp', 'listener');
} else {
- return jabber_daemon_address() . '/' . common_config('xmpp','resource') . '-listener';
+ return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon';
}
}
+
+ function getSockets()
+ {
+ return array($this->conn->getSocket());
+ }
}