diff options
author | Sarven Capadisli <csarven@status.net> | 2010-02-04 16:56:34 +0000 |
---|---|---|
committer | Sarven Capadisli <csarven@status.net> | 2010-02-04 16:56:34 +0000 |
commit | 7ebd13fa69d2a5dac8bc59799281d3d6e017eeae (patch) | |
tree | 605267bbe7c902d7a71766cdeb523bfbb266a0f9 /lib | |
parent | 339eb1adadc7f3495ad31ef0a5cf20cdca47ce1f (diff) | |
parent | 9e940445f1ab1ec53f3bad14a1a94dc2064d0ee6 (diff) |
Merge branch '0.9.x' of git@gitorious.org:statusnet/mainline into 0.9.x
Diffstat (limited to 'lib')
-rw-r--r-- | lib/api.php | 21 | ||||
-rw-r--r-- | lib/apiauth.php | 22 | ||||
-rw-r--r-- | lib/apioauthstore.php | 27 | ||||
-rw-r--r-- | lib/common.php | 9 | ||||
-rw-r--r-- | lib/default.php | 7 | ||||
-rw-r--r-- | lib/distribqueuehandler.php | 55 | ||||
-rw-r--r-- | lib/jabber.php | 4 | ||||
-rw-r--r-- | lib/jabberqueuehandler.php | 2 | ||||
-rw-r--r-- | lib/liberalstomp.php | 24 | ||||
-rw-r--r-- | lib/mysqlschema.php | 537 | ||||
-rw-r--r-- | lib/noticelist.php | 20 | ||||
-rw-r--r-- | lib/ombqueuehandler.php | 2 | ||||
-rw-r--r-- | lib/pgsqlschema.php | 503 | ||||
-rw-r--r-- | lib/publicqueuehandler.php | 2 | ||||
-rw-r--r-- | lib/router.php | 26 | ||||
-rw-r--r-- | lib/schema.php | 58 | ||||
-rw-r--r-- | lib/stompqueuemanager.php | 375 | ||||
-rw-r--r-- | lib/util.php | 13 |
18 files changed, 1526 insertions, 181 deletions
diff --git a/lib/api.php b/lib/api.php index 825262b4c..7d94eaee4 100644 --- a/lib/api.php +++ b/lib/api.php @@ -299,7 +299,7 @@ class ApiAction extends Action } } - if ($include_user) { + if ($include_user && $profile) { # Don't get notice (recursive!) $twitter_user = $this->twitterUserArray($profile, false); $twitter_status['user'] = $twitter_user; @@ -1250,10 +1250,27 @@ class ApiAction extends Action case 'api': break; default: + + $name = null; + $url = null; + $ns = Notice_source::staticGet($source); + if ($ns) { - $source_name = '<a href="' . $ns->url . '">' . $ns->name . '</a>'; + $name = $ns->name; + $url = $ns->url; + } else { + $app = Oauth_application::staticGet('name', $source); + if ($app) { + $name = $app->name; + $url = $app->source_url; + } + } + + if (!empty($name) && !empty($url)) { + $source_name = '<a href="' . $url . '">' . $name . '</a>'; } + break; } return $source_name; diff --git a/lib/apiauth.php b/lib/apiauth.php index 262f4b966..25e2196cf 100644 --- a/lib/apiauth.php +++ b/lib/apiauth.php @@ -55,6 +55,7 @@ class ApiAuthAction extends ApiAction { var $auth_user_nickname = null; var $auth_user_password = null; + var $oauth_source = null; /** * Take arguments for running, looks for an OAuth request, @@ -73,28 +74,23 @@ class ApiAuthAction extends ApiAction // NOTE: $this->auth_user has to get set in prepare(), not handle(), // because subclasses do stuff with it in their prepares. - if ($this->requiresAuth()) { + $oauthReq = $this->getOAuthRequest(); - $oauthReq = $this->getOAuthRequest(); - - if (!$oauthReq) { + if (!$oauthReq) { + if ($this->requiresAuth()) { $this->checkBasicAuthUser(true); } else { - $this->checkOAuthRequest($oauthReq); + // Check to see if a basic auth user is there even + // if one's not required + $this->checkBasicAuthUser(false); } } else { - - // Check to see if a basic auth user is there even - // if one's not required - $this->checkBasicAuthUser(false); + $this->checkOAuthRequest($oauthReq); } // Reject API calls with the wrong access level if ($this->isReadOnly($args) == false) { - - common_debug(get_class($this) . ' is not read-only!'); - if ($this->access != self::READ_WRITE) { $msg = _('API resource requires read-write access, ' . 'but you only have read access.'); @@ -111,7 +107,6 @@ class ApiAuthAction extends ApiAction * This is to avoid doign any unnecessary DB lookups. * * @return mixed the OAuthRequest or false - * */ function getOAuthRequest() @@ -140,7 +135,6 @@ class ApiAuthAction extends ApiAction * @param OAuthRequest $request the OAuth Request * * @return nothing - * */ function checkOAuthRequest($request) diff --git a/lib/apioauthstore.php b/lib/apioauthstore.php index 32110d057..1bb11cbca 100644 --- a/lib/apioauthstore.php +++ b/lib/apioauthstore.php @@ -159,5 +159,32 @@ class ApiStatusNetOAuthDataStore extends StatusNetOAuthDataStore } } + /** + * Revoke specified access token + * + * Revokes the token specified by $token_key. + * Throws exceptions in case of error. + * + * @param string $token_key the token to be revoked + * @param int $type type of token (0 = req, 1 = access) + * + * @access public + * + * @return void + */ + + public function revoke_token($token_key, $type = 0) { + $rt = new Token(); + $rt->tok = $token_key; + $rt->type = $type; + $rt->state = 0; + if (!$rt->find(true)) { + throw new Exception('Tried to revoke unknown token'); + } + if (!$rt->delete()) { + throw new Exception('Failed to delete revoked token'); + } + } + } diff --git a/lib/common.php b/lib/common.php index ada48b339..b95cd1175 100644 --- a/lib/common.php +++ b/lib/common.php @@ -22,7 +22,7 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } //exit with 200 response, if this is checking fancy from the installer if (isset($_REQUEST['p']) && $_REQUEST['p'] == 'check-fancy') { exit; } -define('STATUSNET_VERSION', '0.9.0beta3'); +define('STATUSNET_VERSION', '0.9.0beta5'); define('LACONICA_VERSION', STATUSNET_VERSION); // compatibility define('STATUSNET_CODENAME', 'Stand'); @@ -115,6 +115,10 @@ function __autoload($cls) require_once 'Validate.php'; require_once 'markdown.php'; +// XXX: other formats here + +define('NICKNAME_FMT', VALIDATE_NUM.VALIDATE_ALPHA_LOWER); + require_once INSTALLDIR.'/lib/util.php'; require_once INSTALLDIR.'/lib/action.php'; require_once INSTALLDIR.'/lib/mail.php'; @@ -136,6 +140,3 @@ try { exit; } -// XXX: other formats here - -define('NICKNAME_FMT', VALIDATE_NUM.VALIDATE_ALPHA_LOWER); diff --git a/lib/default.php b/lib/default.php index 64fb7a786..437f350dd 100644 --- a/lib/default.php +++ b/lib/default.php @@ -84,9 +84,12 @@ $default = 'control_channel' => '/topic/statusnet-control', // broadcasts to all queue daemons 'stomp_username' => null, 'stomp_password' => null, + 'stomp_persistent' => true, // keep items across queue server restart, if persistence is enabled + 'stomp_manual_failover' => true, // if multiple servers are listed, treat them as separate (enqueue on one randomly, listen on all) 'monitor' => null, // URL to monitor ping endpoint (work in progress) 'softlimit' => '90%', // total size or % of memory_limit at which to restart queue threads gracefully 'debug_memory' => false, // true to spit memory usage to log + 'inboxes' => true, // true to do inbox distribution & output queueing from in background via 'distrib' queue ), 'license' => array('type' => 'cc', # can be 'cc', 'allrightsreserved', 'private' @@ -269,4 +272,8 @@ $default = 'singleuser' => array('enabled' => false, 'nickname' => null), + 'robotstxt' => + array('crawldelay' => 0, + 'disallow' => array('main', 'settings', 'admin', 'search', 'message') + ), ); diff --git a/lib/distribqueuehandler.php b/lib/distribqueuehandler.php index f458d238d..4477468d0 100644 --- a/lib/distribqueuehandler.php +++ b/lib/distribqueuehandler.php @@ -62,23 +62,60 @@ class DistribQueueHandler { // XXX: do we need to change this for remote users? - $notice->saveTags(); + try { + $notice->saveTags(); + } catch (Exception $e) { + $this->logit($notice, $e); + } - $groups = $notice->saveGroups(); + try { + $groups = $notice->saveGroups(); + } catch (Exception $e) { + $this->logit($notice, $e); + } - $recipients = $notice->saveReplies(); + try { + $recipients = $notice->saveReplies(); + } catch (Exception $e) { + $this->logit($notice, $e); + } - $notice->addToInboxes($groups, $recipients); + try { + $notice->addToInboxes($groups, $recipients); + } catch (Exception $e) { + $this->logit($notice, $e); + } - $notice->saveUrls(); + try { + $notice->saveUrls(); + } catch (Exception $e) { + $this->logit($notice, $e); + } - Event::handle('EndNoticeSave', array($notice)); + try { + Event::handle('EndNoticeSave', array($notice)); + // Enqueue for other handlers + } catch (Exception $e) { + $this->logit($notice, $e); + } - // Enqueue for other handlers - - common_enqueue_notice($notice); + try { + common_enqueue_notice($notice); + } catch (Exception $e) { + $this->logit($notice, $e); + } return true; } + + protected function logit($notice, $e) + { + common_log(LOG_ERR, "Distrib queue exception saving notice $notice->id: " . + $e->getMessage() . ' ' . + str_replace("\n", " ", $e->getTraceAsString())); + + // We'll still return true so we don't get stuck in a loop + // trying to run a bad insert over and over... + } } diff --git a/lib/jabber.php b/lib/jabber.php index b6b23521b..e1bf06ba6 100644 --- a/lib/jabber.php +++ b/lib/jabber.php @@ -358,7 +358,7 @@ function jabber_broadcast_notice($notice) common_log(LOG_WARNING, 'Refusing to broadcast notice with ' . 'unknown profile ' . common_log_objstring($notice), __FILE__); - return false; + return true; // not recoverable; discard. } $msg = jabber_format_notice($profile, $notice); @@ -437,7 +437,7 @@ function jabber_public_notice($notice) common_log(LOG_WARNING, 'Refusing to broadcast notice with ' . 'unknown profile ' . common_log_objstring($notice), __FILE__); - return false; + return true; // not recoverable; discard. } $msg = jabber_format_notice($profile, $notice); diff --git a/lib/jabberqueuehandler.php b/lib/jabberqueuehandler.php index 83471f2df..d6b4b7416 100644 --- a/lib/jabberqueuehandler.php +++ b/lib/jabberqueuehandler.php @@ -40,7 +40,7 @@ class JabberQueueHandler extends QueueHandler try { return jabber_broadcast_notice($notice); } catch (XMPPHP_Exception $e) { - $this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); + common_log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); return false; } } diff --git a/lib/liberalstomp.php b/lib/liberalstomp.php index c9233843a..3d38953fd 100644 --- a/lib/liberalstomp.php +++ b/lib/liberalstomp.php @@ -34,6 +34,22 @@ class LiberalStomp extends Stomp } /** + * Return the host we're currently connected to. + * + * @return string + */ + function getServer() + { + $idx = $this->_currentHost; + if ($idx >= 0) { + $host = $this->_hosts[$idx]; + return "$host[0]:$host[1]"; + } else { + return '[unconnected]'; + } + } + + /** * Make socket connection to the server * We also set the stream to non-blocking mode, since we'll be * select'ing to wait for updates. In blocking mode it seems @@ -71,10 +87,12 @@ class LiberalStomp extends Stomp // @fixme this sometimes hangs in blocking mode... // shouldn't we have been idle until we found there's more data? $read = fread($this->_socket, $rb); - if ($read === false) { - $this->_reconnect(); + if ($read === false || ($read === '' && feof($this->_socket))) { + // @fixme possibly attempt an auto reconnect as old code? + throw new StompException("Error reading"); + //$this->_reconnect(); // @fixme this will lose prior items - return $this->readFrames(); + //return $this->readFrames(); } $data .= $read; if (strpos($data, "\x00") !== false) { diff --git a/lib/mysqlschema.php b/lib/mysqlschema.php new file mode 100644 index 000000000..1f7c3d092 --- /dev/null +++ b/lib/mysqlschema.php @@ -0,0 +1,537 @@ +<?php +/** + * StatusNet, the distributed open-source microblogging tool + * + * Database schema utilities + * + * PHP version 5 + * + * LICENCE: This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * @category Database + * @package StatusNet + * @author Evan Prodromou <evan@status.net> + * @copyright 2009 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://status.net/ + */ + +if (!defined('STATUSNET')) { + exit(1); +} + +/** + * Class representing the database schema + * + * A class representing the database schema. Can be used to + * manipulate the schema -- especially for plugins and upgrade + * utilities. + * + * @category Database + * @package StatusNet + * @author Evan Prodromou <evan@status.net> + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://status.net/ + */ + +class MysqlSchema extends Schema +{ + static $_single = null; + protected $conn = null; + + /** + * Constructor. Only run once for singleton object. + */ + + protected function __construct() + { + // XXX: there should be an easier way to do this. + $user = new User(); + + $this->conn = $user->getDatabaseConnection(); + + $user->free(); + + unset($user); + } + + /** + * Main public entry point. Use this to get + * the singleton object. + * + * @return Schema the (single) Schema object + */ + + static function get() + { + if (empty(self::$_single)) { + self::$_single = new Schema(); + } + return self::$_single; + } + + /** + * Returns a TableDef object for the table + * in the schema with the given name. + * + * Throws an exception if the table is not found. + * + * @param string $name Name of the table to get + * + * @return TableDef tabledef for that table. + */ + + public function getTableDef($name) + { + $res = $this->conn->query('DESCRIBE ' . $name); + + if (PEAR::isError($res)) { + throw new Exception($res->getMessage()); + } + + $td = new TableDef(); + + $td->name = $name; + $td->columns = array(); + + $row = array(); + + while ($res->fetchInto($row, DB_FETCHMODE_ASSOC)) { + + $cd = new ColumnDef(); + + $cd->name = $row['Field']; + + $packed = $row['Type']; + + if (preg_match('/^(\w+)\((\d+)\)$/', $packed, $match)) { + $cd->type = $match[1]; + $cd->size = $match[2]; + } else { + $cd->type = $packed; + } + + $cd->nullable = ($row['Null'] == 'YES') ? true : false; + $cd->key = $row['Key']; + $cd->default = $row['Default']; + $cd->extra = $row['Extra']; + + $td->columns[] = $cd; + } + + return $td; + } + + /** + * Gets a ColumnDef object for a single column. + * + * Throws an exception if the table is not found. + * + * @param string $table name of the table + * @param string $column name of the column + * + * @return ColumnDef definition of the column or null + * if not found. + */ + + public function getColumnDef($table, $column) + { + $td = $this->getTableDef($table); + + foreach ($td->columns as $cd) { + if ($cd->name == $column) { + return $cd; + } + } + + return null; + } + + /** + * Creates a table with the given names and columns. + * + * @param string $name Name of the table + * @param array $columns Array of ColumnDef objects + * for new table. + * + * @return boolean success flag + */ + + public function createTable($name, $columns) + { + $uniques = array(); + $primary = array(); + $indices = array(); + + $sql = "CREATE TABLE $name (\n"; + + for ($i = 0; $i < count($columns); $i++) { + + $cd =& $columns[$i]; + + if ($i > 0) { + $sql .= ",\n"; + } + + $sql .= $this->_columnSql($cd); + + switch ($cd->key) { + case 'UNI': + $uniques[] = $cd->name; + break; + case 'PRI': + $primary[] = $cd->name; + break; + case 'MUL': + $indices[] = $cd->name; + break; + } + } + + if (count($primary) > 0) { // it really should be... + $sql .= ",\nconstraint primary key (" . implode(',', $primary) . ")"; + } + + foreach ($uniques as $u) { + $sql .= ",\nunique index {$name}_{$u}_idx ($u)"; + } + + foreach ($indices as $i) { + $sql .= ",\nindex {$name}_{$i}_idx ($i)"; + } + + $sql .= "); "; + + $res = $this->conn->query($sql); + + if (PEAR::isError($res)) { + throw new Exception($res->getMessage()); + } + + return true; + } + + /** + * Drops a table from the schema + * + * Throws an exception if the table is not found. + * + * @param string $name Name of the table to drop + * + * @return boolean success flag + */ + + public function dropTable($name) + { + $res = $this->conn->query("DROP TABLE $name"); + + if (PEAR::isError($res)) { + throw new Exception($res->getMessage()); + } + + return true; + } + + /** + * Adds an index to a table. + * + * If no name is provided, a name will be made up based + * on the table name and column names. + * + * Throws an exception on database error, esp. if the table + * does not exist. + * + * @param string $table Name of the table + * @param array $columnNames Name of columns to index + * @param string $name (Optional) name of the index + * + * @return boolean success flag + */ + + public function createIndex($table, $columnNames, $name=null) + { + if (!is_array($columnNames)) { + $columnNames = array($columnNames); + } + + if (empty($name)) { + $name = "$table_".implode("_", $columnNames)."_idx"; + } + + $res = $this->conn->query("ALTER TABLE $table ". + "ADD INDEX $name (". + implode(",", $columnNames).")"); + + if (PEAR::isError($res)) { + throw new Exception($res->getMessage()); + } + + return true; + } + + /** + * Drops a named index from a table. + * + * @param string $table name of the table the index is on. + * @param string $name name of the index + * + * @return boolean success flag + */ + + public function dropIndex($table, $name) + { + $res = $this->conn->query("ALTER TABLE $table DROP INDEX $name"); + + if (PEAR::isError($res)) { + throw new Exception($res->getMessage()); + } + + return true; + } + + /** + * Adds a column to a table + * + * @param string $table name of the table + * @param ColumnDef $columndef Definition of the new + * column. + * + * @return boolean success flag + */ + + public function addColumn($table, $columndef) + { + $sql = "ALTER TABLE $table ADD COLUMN " . $this->_columnSql($columndef); + + $res = $this->conn->query($sql); + + if (PEAR::isError($res)) { + throw new Exception($res->getMessage()); + } + + return true; + } + + /** + * Modifies a column in the schema. + * + * The name must match an existing column and table. + * + * @param string $table name of the table + * @param ColumnDef $columndef new definition of the column. + * + * @return boolean success flag + */ + + public function modifyColumn($table, $columndef) + { + $sql = "ALTER TABLE $table MODIFY COLUMN " . + $this->_columnSql($columndef); + + $res = $this->conn->query($sql); + + if (PEAR::isError($res)) { + throw new Exception($res->getMessage()); + } + + return true; + } + + /** + * Drops a column from a table + * + * The name must match an existing column. + * + * @param string $table name of the table + * @param string $columnName name of the column to drop + * + * @return boolean success flag + */ + + public function dropColumn($table, $columnName) + { + $sql = "ALTER TABLE $table DROP COLUMN $columnName"; + + $res = $this->conn->query($sql); + + if (PEAR::isError($res)) { + throw new Exception($res->getMessage()); + } + + return true; + } + + /** + * Ensures that a table exists with the given + * name and the given column definitions. + * + * If the table does not yet exist, it will + * create the table. If it does exist, it will + * alter the table to match the column definitions. + * + * @param string $tableName name of the table + * @param array $columns array of ColumnDef + * objects for the table + * + * @return boolean success flag + */ + + public function ensureTable($tableName, $columns) + { + // XXX: DB engine portability -> toilet + + try { + $td = $this->getTableDef($tableName); + } catch (Exception $e) { + if (preg_match('/no such table/', $e->getMessage())) { + return $this->createTable($tableName, $columns); + } else { + throw $e; + } + } + + $cur = $this->_names($td->columns); + $new = $this->_names($columns); + + $toadd = array_diff($new, $cur); + $todrop = array_diff($cur, $new); + $same = array_intersect($new, $cur); + $tomod = array(); + + foreach ($same as $m) { + $curCol = $this->_byName($td->columns, $m); + $newCol = $this->_byName($columns, $m); + + if (!$newCol->equals($curCol)) { + $tomod[] = $newCol->name; + } + } + + if (count($toadd) + count($todrop) + count($tomod) == 0) { + // nothing to do + return true; + } + + // For efficiency, we want this all in one + // query, instead of using our methods. + + $phrase = array(); + + foreach ($toadd as $columnName) { + $cd = $this->_byName($columns, $columnName); + + $phrase[] = 'ADD COLUMN ' . $this->_columnSql($cd); + } + + foreach ($todrop as $columnName) { + $phrase[] = 'DROP COLUMN ' . $columnName; + } + + foreach ($tomod as $columnName) { + $cd = $this->_byName($columns, $columnName); + + $phrase[] = 'MODIFY COLUMN ' . $this->_columnSql($cd); + } + + $sql = 'ALTER TABLE ' . $tableName . ' ' . implode(', ', $phrase); + + $res = $this->conn->query($sql); + + if (PEAR::isError($res)) { + throw new Exception($res->getMessage()); + } + + return true; + } + + /** + * Returns the array of names from an array of + * ColumnDef objects. + * + * @param array $cds array of ColumnDef objects + * + * @return array strings for name values + */ + + private function _names($cds) + { + $names = array(); + + foreach ($cds as $cd) { + $names[] = $cd->name; + } + + return $names; + } + + /** + * Get a ColumnDef from an array matching + * name. + * + * @param array $cds Array of ColumnDef objects + * @param string $name Name of the column + * + * @return ColumnDef matching item or null if no match. + */ + + private function _byName($cds, $name) + { + foreach ($cds as $cd) { + if ($cd->name == $name) { + return $cd; + } + } + + return null; + } + + /** + * Return the proper SQL for creating or + * altering a column. + * + * Appropriate for use in CREATE TABLE or + * ALTER TABLE statements. + * + * @param ColumnDef $cd column to create + * + * @return string correct SQL for that column + */ + + private function _columnSql($cd) + { + $sql = "{$cd->name} "; + + if (!empty($cd->size)) { + $sql .= "{$cd->type}({$cd->size}) "; + } else { + $sql .= "{$cd->type} "; + } + + if (!empty($cd->default)) { + $sql .= "default {$cd->default} "; + } else { + $sql .= ($cd->nullable) ? "null " : "not null "; + } + + if (!empty($cd->auto_increment)) { + $sql .= " auto_increment "; + } + + if (!empty($cd->extra)) { + $sql .= "{$cd->extra} "; + } + + return $sql; + } +} diff --git a/lib/noticelist.php b/lib/noticelist.php index 85c169716..a4a0f2651 100644 --- a/lib/noticelist.php +++ b/lib/noticelist.php @@ -486,12 +486,28 @@ class NoticeListItem extends Widget $this->out->element('span', 'device', $source_name); break; default: + + $name = null; + $url = null; + $ns = Notice_source::staticGet($this->notice->source); + if ($ns) { + $name = $ns->name; + $url = $ns->url; + } else { + $app = Oauth_application::staticGet('name', $this->notice->source); + if ($app) { + $name = $app->name; + $url = $app->source_url; + } + } + + if (!empty($name) && !empty($url)) { $this->out->elementStart('span', 'device'); - $this->out->element('a', array('href' => $ns->url, + $this->out->element('a', array('href' => $url, 'rel' => 'external'), - $ns->name); + $name); $this->out->elementEnd('span'); } else { $this->out->element('span', 'device', $source_name); diff --git a/lib/ombqueuehandler.php b/lib/ombqueuehandler.php index 24896c784..1921c2bac 100644 --- a/lib/ombqueuehandler.php +++ b/lib/ombqueuehandler.php @@ -39,7 +39,7 @@ class OmbQueueHandler extends QueueHandler function handle($notice) { if ($this->is_remote($notice)) { - $this->log(LOG_DEBUG, 'Ignoring remote notice ' . $notice->id); + common_log(LOG_DEBUG, 'Ignoring remote notice ' . $notice->id); return true; } else { require_once(INSTALLDIR.'/lib/omb.php'); diff --git a/lib/pgsqlschema.php b/lib/pgsqlschema.php new file mode 100644 index 000000000..91bc09667 --- /dev/null +++ b/lib/pgsqlschema.php @@ -0,0 +1,503 @@ +<?php +/** + * StatusNet, the distributed open-source microblogging tool + * + * Database schema utilities + * + * PHP version 5 + * + * LICENCE: This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * @category Database + * @package StatusNet + * @author Evan Prodromou <evan@status.net> + * @copyright 2009 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://status.net/ + */ + +if (!defined('STATUSNET')) { + exit(1); +} + +/** + * Class representing the database schema + * + * A class representing the database schema. Can be used to + * manipulate the schema -- especially for plugins and upgrade + * utilities. + * + * @category Database + * @package StatusNet + * @author Evan Prodromou <evan@status.net> + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://status.net/ + */ + +class PgsqlSchema extends Schema +{ + + /** + * Returns a TableDef object for the table + * in the schema with the given name. + * + * Throws an exception if the table is not found. + * + * @param string $name Name of the table to get + * + * @return TableDef tabledef for that table. + */ + + public function getTableDef($name) + { + $res = $this->conn->query("select *, column_default as default, is_nullable as Null, udt_name as Type, column_name AS Field from INFORMATION_SCHEMA.COLUMNS where table_name = '$name'"); + + if (PEAR::isError($res)) { + throw new Exception($res->getMessage()); + } + + $td = new TableDef(); + + $td->name = $name; + $td->columns = array(); + + $row = array(); + + while ($res->fetchInto($row, DB_FETCHMODE_ASSOC)) { +// var_dump($row); + $cd = new ColumnDef(); + + $cd->name = $row['field']; + + $packed = $row['type']; + + if (preg_match('/^(\w+)\((\d+)\)$/', $packed, $match)) { + $cd->type = $match[1]; + $cd->size = $match[2]; + } else { + $cd->type = $packed; + } + + $cd->nullable = ($row['null'] == 'YES') ? true : false; + $cd->key = $row['Key']; + $cd->default = $row['default']; + $cd->extra = $row['Extra']; + + $td->columns[] = $cd; + } + return $td; + } + + /** + * Gets a ColumnDef object for a single column. + * + * Throws an exception if the table is not found. + * + * @param string $table name of the table + * @param string $column name of the column + * + * @return ColumnDef definition of the column or null + * if not found. + */ + + public function getColumnDef($table, $column) + { + $td = $this->getTableDef($table); + + foreach ($td->columns as $cd) { + if ($cd->name == $column) { + return $cd; + } + } + + return null; + } + + /** + * Creates a table with the given names and columns. + * + * @param string $name Name of the table + * @param array $columns Array of ColumnDef objects + * for new table. + * + * @return boolean success flag + */ + + public function createTable($name, $columns) + { + $uniques = array(); + $primary = array(); + $indices = array(); + + $sql = "CREATE TABLE $name (\n"; + + for ($i = 0; $i < count($columns); $i++) { + + $cd =& $columns[$i]; + + if ($i > 0) { + $sql .= ",\n"; + } + + $sql .= $this->_columnSql($cd); + + switch ($cd->key) { + case 'UNI': + $uniques[] = $cd->name; + break; + case 'PRI': + $primary[] = $cd->name; + break; + case 'MUL': + $indices[] = $cd->name; + break; + } + } + + if (count($primary) > 0) { // it really should be... + $sql .= ",\nconstraint primary key (" . implode(',', $primary) . ")"; + } + + foreach ($uniques as $u) { + $sql .= ",\nunique index {$name}_{$u}_idx ($u)"; + } + + foreach ($indices as $i) { + $sql .= ",\nindex {$name}_{$i}_idx ($i)"; + } + + $sql .= "); "; + + $res = $this->conn->query($sql); + + if (PEAR::isError($res)) { + throw new Exception($res->getMessage()); + } + + return true; + } + + /** + * Drops a table from the schema + * + * Throws an exception if the table is not found. + * + * @param string $name Name of the table to drop + * + * @return boolean success flag + */ + + public function dropTable($name) + { + $res = $this->conn->query("DROP TABLE $name"); + + if (PEAR::isError($res)) { + throw new Exception($res->getMessage()); + } + + return true; + } + + /** + * Adds an index to a table. + * + * If no name is provided, a name will be made up based + * on the table name and column names. + * + * Throws an exception on database error, esp. if the table + * does not exist. + * + * @param string $table Name of the table + * @param array $columnNames Name of columns to index + * @param string $name (Optional) name of the index + * + * @return boolean success flag + */ + + public function createIndex($table, $columnNames, $name=null) + { + if (!is_array($columnNames)) { + $columnNames = array($columnNames); + } + + if (empty($name)) { + $name = "$table_".implode("_", $columnNames)."_idx"; + } + + $res = $this->conn->query("ALTER TABLE $table ". + "ADD INDEX $name (". + implode(",", $columnNames).")"); + + if (PEAR::isError($res)) { + throw new Exception($res->getMessage()); + } + + return true; + } + + /** + * Drops a named index from a table. + * + * @param string $table name of the table the index is on. + * @param string $name name of the index + * + * @return boolean success flag + */ + + public function dropIndex($table, $name) + { + $res = $this->conn->query("ALTER TABLE $table DROP INDEX $name"); + + if (PEAR::isError($res)) { + throw new Exception($res->getMessage()); + } + + return true; + } + + /** + * Adds a column to a table + * + * @param string $table name of the table + * @param ColumnDef $columndef Definition of the new + * column. + * + * @return boolean success flag + */ + + public function addColumn($table, $columndef) + { + $sql = "ALTER TABLE $table ADD COLUMN " . $this->_columnSql($columndef); + + $res = $this->conn->query($sql); + + if (PEAR::isError($res)) { + throw new Exception($res->getMessage()); + } + + return true; + } + + /** + * Modifies a column in the schema. + * + * The name must match an existing column and table. + * + * @param string $table name of the table + * @param ColumnDef $columndef new definition of the column. + * + * @return boolean success flag + */ + + public function modifyColumn($table, $columndef) + { + $sql = "ALTER TABLE $table MODIFY COLUMN " . + $this->_columnSql($columndef); + + $res = $this->conn->query($sql); + + if (PEAR::isError($res)) { + throw new Exception($res->getMessage()); + } + + return true; + } + + /** + * Drops a column from a table + * + * The name must match an existing column. + * + * @param string $table name of the table + * @param string $columnName name of the column to drop + * + * @return boolean success flag + */ + + public function dropColumn($table, $columnName) + { + $sql = "ALTER TABLE $table DROP COLUMN $columnName"; + + $res = $this->conn->query($sql); + + if (PEAR::isError($res)) { + throw new Exception($res->getMessage()); + } + + return true; + } + + /** + * Ensures that a table exists with the given + * name and the given column definitions. + * + * If the table does not yet exist, it will + * create the table. If it does exist, it will + * alter the table to match the column definitions. + * + * @param string $tableName name of the table + * @param array $columns array of ColumnDef + * objects for the table + * + * @return boolean success flag + */ + + public function ensureTable($tableName, $columns) + { + // XXX: DB engine portability -> toilet + + try { + $td = $this->getTableDef($tableName); + } catch (Exception $e) { + if (preg_match('/no such table/', $e->getMessage())) { + return $this->createTable($tableName, $columns); + } else { + throw $e; + } + } + + $cur = $this->_names($td->columns); + $new = $this->_names($columns); + + $toadd = array_diff($new, $cur); + $todrop = array_diff($cur, $new); + $same = array_intersect($new, $cur); + $tomod = array(); + + foreach ($same as $m) { + $curCol = $this->_byName($td->columns, $m); + $newCol = $this->_byName($columns, $m); + + if (!$newCol->equals($curCol)) { + $tomod[] = $newCol->name; + } + } + + if (count($toadd) + count($todrop) + count($tomod) == 0) { + // nothing to do + return true; + } + + // For efficiency, we want this all in one + // query, instead of using our methods. + + $phrase = array(); + + foreach ($toadd as $columnName) { + $cd = $this->_byName($columns, $columnName); + + $phrase[] = 'ADD COLUMN ' . $this->_columnSql($cd); + } + + foreach ($todrop as $columnName) { + $phrase[] = 'DROP COLUMN ' . $columnName; + } + + foreach ($tomod as $columnName) { + $cd = $this->_byName($columns, $columnName); + + $phrase[] = 'MODIFY COLUMN ' . $this->_columnSql($cd); + } + + $sql = 'ALTER TABLE ' . $tableName . ' ' . implode(', ', $phrase); + + $res = $this->conn->query($sql); + + if (PEAR::isError($res)) { + throw new Exception($res->getMessage()); + } + + return true; + } + + /** + * Returns the array of names from an array of + * ColumnDef objects. + * + * @param array $cds array of ColumnDef objects + * + * @return array strings for name values + */ + + private function _names($cds) + { + $names = array(); + + foreach ($cds as $cd) { + $names[] = $cd->name; + } + + return $names; + } + + /** + * Get a ColumnDef from an array matching + * name. + * + * @param array $cds Array of ColumnDef objects + * @param string $name Name of the column + * + * @return ColumnDef matching item or null if no match. + */ + + private function _byName($cds, $name) + { + foreach ($cds as $cd) { + if ($cd->name == $name) { + return $cd; + } + } + + return null; + } + + /** + * Return the proper SQL for creating or + * altering a column. + * + * Appropriate for use in CREATE TABLE or + * ALTER TABLE statements. + * + * @param ColumnDef $cd column to create + * + * @return string correct SQL for that column + */ + + private function _columnSql($cd) + { + $sql = "{$cd->name} "; + + if (!empty($cd->size)) { + $sql .= "{$cd->type}({$cd->size}) "; + } else { + $sql .= "{$cd->type} "; + } + + if (!empty($cd->default)) { + $sql .= "default {$cd->default} "; + } else { + $sql .= ($cd->nullable) ? "null " : "not null "; + } + + if (!empty($cd->auto_increment)) { + $sql .= " auto_increment "; + } + + if (!empty($cd->extra)) { + $sql .= "{$cd->extra} "; + } + + return $sql; + } +} diff --git a/lib/publicqueuehandler.php b/lib/publicqueuehandler.php index c9edb8d5d..a497d1385 100644 --- a/lib/publicqueuehandler.php +++ b/lib/publicqueuehandler.php @@ -38,7 +38,7 @@ class PublicQueueHandler extends QueueHandler try { return jabber_public_notice($notice); } catch (XMPPHP_Exception $e) { - $this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); + common_log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); return false; } } diff --git a/lib/router.php b/lib/router.php index be9cfac0c..987d0152e 100644 --- a/lib/router.php +++ b/lib/router.php @@ -73,6 +73,8 @@ class Router if (Event::handle('StartInitializeRouter', array(&$m))) { + $m->connect('robots.txt', array('action' => 'robotstxt')); + $m->connect('opensearch/people', array('action' => 'opensearch', 'type' => 'people')); $m->connect('opensearch/notice', array('action' => 'opensearch', @@ -150,6 +152,10 @@ class Router array('action' => 'editapplication'), array('id' => '[0-9]+') ); + $m->connect('settings/oauthapps/delete/:id', + array('action' => 'deleteapplication'), + array('id' => '[0-9]+') + ); // search @@ -649,7 +655,16 @@ class Router if (common_config('singleuser', 'enabled')) { - $nickname = common_config('singleuser', 'nickname'); + $user = User::siteOwner(); + + if (!empty($user)) { + $nickname = $user->nickname; + } else { + $nickname = common_config('singleuser', 'nickname'); + if (empty($nickname)) { + throw new ServerException(_("No single user defined for single-user mode.")); + } + } foreach (array('subscriptions', 'subscribers', 'all', 'foaf', 'xrds', @@ -697,6 +712,10 @@ class Router 'nickname' => $nickname), array('tag' => '[a-zA-Z0-9]+')); + $m->connect('rsd.xml', + array('action' => 'rsd', + 'nickname' => $nickname)); + $m->connect('', array('action' => 'showstream', 'nickname' => $nickname)); @@ -711,6 +730,7 @@ class Router $m->connect('featured', array('action' => 'featured')); $m->connect('favorited/', array('action' => 'favorited')); $m->connect('favorited', array('action' => 'favorited')); + $m->connect('rsd.xml', array('action' => 'rsd')); foreach (array('subscriptions', 'subscribers', 'nudge', 'all', 'foaf', 'xrds', @@ -758,6 +778,10 @@ class Router array('nickname' => '[a-zA-Z0-9]{1,64}'), array('tag' => '[a-zA-Z0-9]+')); + $m->connect(':nickname/rsd.xml', + array('action' => 'rsd'), + array('nickname' => '[a-zA-Z0-9]{1,64}')); + $m->connect(':nickname', array('action' => 'showstream'), array('nickname' => '[a-zA-Z0-9]{1,64}')); diff --git a/lib/schema.php b/lib/schema.php index a7f64ebed..137b814e0 100644 --- a/lib/schema.php +++ b/lib/schema.php @@ -75,65 +75,15 @@ class Schema static function get() { + $type = common_config('db', 'type'); if (empty(self::$_single)) { - self::$_single = new Schema(); + $schemaClass = ucfirst($type).'Schema'; + self::$_single = new $schemaClass(); } return self::$_single; } /** - * Returns a TableDef object for the table - * in the schema with the given name. - * - * Throws an exception if the table is not found. - * - * @param string $name Name of the table to get - * - * @return TableDef tabledef for that table. - */ - - public function getTableDef($name) - { - $res = $this->conn->query('DESCRIBE ' . $name); - - if (PEAR::isError($res)) { - throw new Exception($res->getMessage()); - } - - $td = new TableDef(); - - $td->name = $name; - $td->columns = array(); - - $row = array(); - - while ($res->fetchInto($row, DB_FETCHMODE_ASSOC)) { - - $cd = new ColumnDef(); - - $cd->name = $row['Field']; - - $packed = $row['Type']; - - if (preg_match('/^(\w+)\((\d+)\)$/', $packed, $match)) { - $cd->type = $match[1]; - $cd->size = $match[2]; - } else { - $cd->type = $packed; - } - - $cd->nullable = ($row['Null'] == 'YES') ? true : false; - $cd->key = $row['Key']; - $cd->default = $row['Default']; - $cd->extra = $row['Extra']; - - $td->columns[] = $cd; - } - - return $td; - } - - /** * Gets a ColumnDef object for a single column. * * Throws an exception if the table is not found. @@ -523,7 +473,7 @@ class Schema } else { $sql .= ($cd->nullable) ? "null " : "not null "; } - + if (!empty($cd->auto_increment)) { $sql .= " auto_increment "; } diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 19e8c49b5..6730cd213 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -29,28 +29,36 @@ */ require_once 'Stomp.php'; - +require_once 'Stomp/Exception.php'; class StompQueueManager extends QueueManager { - var $server = null; - var $username = null; - var $password = null; - var $base = null; - var $con = null; + protected $servers; + protected $username; + protected $password; + protected $base; protected $control; - + + protected $useTransactions = true; + protected $sites = array(); protected $subscriptions = array(); - protected $useTransactions = true; - protected $transaction = null; - protected $transactionCount = 0; + protected $cons = array(); // all open connections + protected $disconnect = array(); + protected $transaction = array(); + protected $transactionCount = array(); + protected $defaultIdx = 0; function __construct() { parent::__construct(); - $this->server = common_config('queue', 'stomp_server'); + $server = common_config('queue', 'stomp_server'); + if (is_array($server)) { + $this->servers = $server; + } else { + $this->servers = array($server); + } $this->username = common_config('queue', 'stomp_username'); $this->password = common_config('queue', 'stomp_password'); $this->base = common_config('queue', 'queue_basename'); @@ -99,9 +107,9 @@ class StompQueueManager extends QueueManager $message .= ':' . $param; } $this->_connect(); - $result = $this->con->send($this->control, - $message, - array ('created' => common_sql_now())); + $result = $this->_send($this->control, + $message, + array ('created' => common_sql_now())); if ($result) { $this->_log(LOG_INFO, "Sent control ping to queue daemons: $message"); return true; @@ -166,28 +174,59 @@ class StompQueueManager extends QueueManager /** * Saves a notice object reference into the queue item table. * @return boolean true on success + * @throws StompException on connection or send error */ public function enqueue($object, $queue) { + $this->_connect(); + return $this->_doEnqueue($object, $queue, $this->defaultIdx); + } + + /** + * Saves a notice object reference into the queue item table + * on the given connection. + * + * @return boolean true on success + * @throws StompException on connection or send error + */ + protected function _doEnqueue($object, $queue, $idx) + { $msg = $this->encode($object); $rep = $this->logrep($object); - $this->_connect(); - - // XXX: serialize and send entire notice + $props = array('created' => common_sql_now()); + if ($this->isPersistent($queue)) { + $props['persistent'] = 'true'; + } - $result = $this->con->send($this->queueName($queue), - $msg, // BODY of the message - array ('created' => common_sql_now(), - 'persistent' => 'true')); + $con = $this->cons[$idx]; + $host = $con->getServer(); + $result = $con->send($this->queueName($queue), $msg, $props); if (!$result) { - common_log(LOG_ERR, "Error sending $rep to $queue queue"); + common_log(LOG_ERR, "Error sending $rep to $queue queue on $host"); return false; } - common_log(LOG_DEBUG, "complete remote queueing $rep for $queue"); + common_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host"); $this->stats('enqueued', $queue); + return true; + } + + /** + * Determine whether messages to this queue should be marked as persistent. + * Actual persistent storage depends on the queue server's configuration. + * @param string $queue + * @return bool + */ + protected function isPersistent($queue) + { + $mode = common_config('queue', 'stomp_persistent'); + if (is_array($mode)) { + return in_array($queue, $mode); + } else { + return (bool)$mode; + } } /** @@ -198,7 +237,29 @@ class StompQueueManager extends QueueManager */ public function getSockets() { - return array($this->con->getSocket()); + $sockets = array(); + foreach ($this->cons as $con) { + if ($con) { + $sockets[] = $con->getSocket(); + } + } + return $sockets; + } + + /** + * Get the Stomp connection object associated with the given socket. + * @param resource $socket + * @return int index into connections list + * @throws Exception + */ + protected function connectionFromSocket($socket) + { + foreach ($this->cons as $i => $con) { + if ($con && $con->getSocket() === $socket) { + return $i; + } + } + throw new Exception(__CLASS__ . " asked to read from unrecognized socket"); } /** @@ -210,27 +271,56 @@ class StompQueueManager extends QueueManager */ public function handleInput($socket) { - assert($socket === $this->con->getSocket()); + $idx = $this->connectionFromSocket($socket); + $con = $this->cons[$idx]; + $host = $con->getServer(); + $ok = true; - $frames = $this->con->readFrames(); + try { + $frames = $con->readFrames(); + } catch (StompException $e) { + common_log(LOG_ERR, "Lost connection to $host: " . $e->getMessage()); + $this->cons[$idx] = null; + $this->transaction[$idx] = null; + $this->disconnect[$idx] = time(); + return false; + } foreach ($frames as $frame) { $dest = $frame->headers['destination']; if ($dest == $this->control) { - if (!$this->handleControlSignal($frame)) { + if (!$this->handleControlSignal($idx, $frame)) { // We got a control event that requests a shutdown; // close out and stop handling anything else! break; } } else { - $ok = $ok && $this->handleItem($frame); + $ok = $ok && $this->handleItem($idx, $frame); } } return $ok; } /** + * Attempt to reconnect in background if we lost a connection. + */ + function idle() + { + $now = time(); + foreach ($this->cons as $idx => $con) { + if (empty($con)) { + $age = $now - $this->disconnect[$idx]; + if ($age >= 60) { + $this->_reconnect($idx); + } + } + } + return true; + } + + /** * Initialize our connection and subscribe to all the queues - * we're going to need to handle... + * we're going to need to handle... If multiple queue servers + * are configured for failover, we'll listen to all of them. * * Side effects: in multi-site mode, may reset site configuration. * @@ -240,9 +330,14 @@ class StompQueueManager extends QueueManager public function start($master) { parent::start($master); - $this->_connect(); + $this->_connectAll(); - $this->con->subscribe($this->control); + common_log(LOG_INFO, "Subscribing to $this->control"); + foreach ($this->cons as $con) { + if ($con) { + $con->subscribe($this->control); + } + } if ($this->sites) { foreach ($this->sites as $server) { StatusNet::init($server); @@ -251,10 +346,14 @@ class StompQueueManager extends QueueManager } else { $this->doSubscribe(); } - $this->begin(); + foreach ($this->cons as $i => $con) { + if ($con) { + $this->begin($i); + } + } return true; } - + /** * Subscribe to all the queues we're going to need to handle... * @@ -266,8 +365,12 @@ class StompQueueManager extends QueueManager { // If there are any outstanding delivered messages we haven't processed, // free them for another thread to take. - $this->rollback(); - $this->con->unsubscribe($this->control); + foreach ($this->cons as $i => $con) { + if ($con) { + $this->rollback($i); + $con->unsubscribe($this->control); + } + } if ($this->sites) { foreach ($this->sites as $server) { StatusNet::init($server); @@ -289,23 +392,106 @@ class StompQueueManager extends QueueManager } /** - * Lazy open connection to Stomp queue server. + * Lazy open a single connection to Stomp queue server. + * If multiple servers are configured, we let the Stomp client library + * worry about finding a working connection among them. */ protected function _connect() { - if (empty($this->con)) { - $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'..."); - $this->con = new LiberalStomp($this->server); - - if ($this->con->connect($this->username, $this->password)) { - $this->_log(LOG_INFO, "Connected."); + if (empty($this->cons)) { + $list = $this->servers; + if (count($list) > 1) { + shuffle($list); // Randomize to spread load + $url = 'failover://(' . implode(',', $list) . ')'; } else { - $this->_log(LOG_ERR, 'Failed to connect to queue server'); - throw new ServerException('Failed to connect to queue server'); + $url = $list[0]; + } + $con = $this->_doConnect($url); + $this->cons = array($con); + $this->transactionCount = array(0); + $this->transaction = array(null); + $this->disconnect = array(null); + } + } + + /** + * Lazy open connections to all Stomp servers, if in manual failover + * mode. This means the queue servers don't speak to each other, so + * we have to listen to all of them to make sure we get all events. + */ + protected function _connectAll() + { + if (!common_config('queue', 'stomp_manual_failover')) { + return $this->_connect(); + } + if (empty($this->cons)) { + $this->cons = array(); + $this->transactionCount = array(); + $this->transaction = array(); + foreach ($this->servers as $idx => $server) { + try { + $this->cons[] = $this->_doConnect($server); + $this->disconnect[] = null; + } catch (Exception $e) { + // s'okay, we'll live + $this->cons[] = null; + $this->disconnect[] = time(); + } + $this->transactionCount[] = 0; + $this->transaction[] = null; + } + if (empty($this->cons)) { + throw new ServerException("No queue servers reachable..."); + return false; } } } + protected function _reconnect($idx) + { + try { + $con = $this->_doConnect($this->servers[$idx]); + } catch (Exception $e) { + $this->_log(LOG_ERR, $e->getMessage()); + $con = null; + } + if ($con) { + $this->cons[$idx] = $con; + $this->disconnect[$idx] = null; + + // now we have to listen to everything... + // @fixme refactor this nicer. :P + $host = $con->getServer(); + $this->_log(LOG_INFO, "Resubscribing to $this->control on $host"); + $con->subscribe($this->control); + foreach ($this->subscriptions as $site => $queues) { + foreach ($queues as $queue) { + $this->_log(LOG_INFO, "Resubscribing to $queue on $host"); + $con->subscribe($queue); + } + } + $this->begin($idx); + } else { + // Try again later... + $this->disconnect[$idx] = time(); + } + } + + protected function _doConnect($server) + { + $this->_log(LOG_INFO, "Connecting to '$server' as '$this->username'..."); + $con = new LiberalStomp($server); + + if ($con->connect($this->username, $this->password)) { + $this->_log(LOG_INFO, "Connected."); + } else { + $this->_log(LOG_ERR, 'Failed to connect to queue server'); + throw new ServerException('Failed to connect to queue server'); + } + + return $con; + } + /** * Subscribe to all enabled notice queues for the current site. */ @@ -317,7 +503,11 @@ class StompQueueManager extends QueueManager $rawqueue = $this->queueName($queue); $this->subscriptions[$site][$queue] = $rawqueue; $this->_log(LOG_INFO, "Subscribing to $rawqueue"); - $this->con->subscribe($rawqueue); + foreach ($this->cons as $con) { + if ($con) { + $con->subscribe($rawqueue); + } + } } } @@ -331,7 +521,11 @@ class StompQueueManager extends QueueManager if (!empty($this->subscriptions[$site])) { foreach ($this->subscriptions[$site] as $queue => $rawqueue) { $this->_log(LOG_INFO, "Unsubscribing from $rawqueue"); - $this->con->unsubscribe($rawqueue); + foreach ($this->cons as $con) { + if ($con) { + $con->unsubscribe($rawqueue); + } + } unset($this->subscriptions[$site][$queue]); } } @@ -346,27 +540,31 @@ class StompQueueManager extends QueueManager * Side effects: in multi-site mode, may reset site configuration to * match the site that queued the event. * + * @param int $idx connection index * @param StompFrame $frame * @return bool */ - protected function handleItem($frame) + protected function handleItem($idx, $frame) { + $this->defaultIdx = $idx; + list($site, $queue) = $this->parseDestination($frame->headers['destination']); if ($site != $this->currentSite()) { $this->stats('switch'); StatusNet::init($site); } + $host = $this->cons[$idx]->getServer(); if (is_numeric($frame->body)) { $id = intval($frame->body); - $info = "notice $id posted at {$frame->headers['created']} in queue $queue"; + $info = "notice $id posted at {$frame->headers['created']} in queue $queue from $host"; $notice = Notice::staticGet('id', $id); if (empty($notice)) { $this->_log(LOG_WARNING, "Skipping missing $info"); - $this->ack($frame); - $this->commit(); - $this->begin(); + $this->ack($idx, $frame); + $this->commit($idx); + $this->begin($idx); $this->stats('badnotice', $queue); return false; } @@ -374,39 +572,47 @@ class StompQueueManager extends QueueManager $item = $notice; } else { // @fixme should we serialize, or json, or what here? - $info = "string posted at {$frame->headers['created']} in queue $queue"; + $info = "string posted at {$frame->headers['created']} in queue $queue from $host"; $item = $frame->body; } $handler = $this->getHandler($queue); if (!$handler) { $this->_log(LOG_ERR, "Missing handler class; skipping $info"); - $this->ack($frame); - $this->commit(); - $this->begin(); + $this->ack($idx, $frame); + $this->commit($idx); + $this->begin($idx); $this->stats('badhandler', $queue); return false; } - $ok = $handler->handle($item); + // If there's an exception when handling, + // log the error and let it get requeued. + + try { + $ok = $handler->handle($item); + } catch (Exception $e) { + $this->_log(LOG_ERR, "Exception on queue $queue: " . $e->getMessage()); + $ok = false; + } if (!$ok) { $this->_log(LOG_WARNING, "Failed handling $info"); // FIXME we probably shouldn't have to do // this kind of queue management ourselves; // if we don't ack, it should resend... - $this->ack($frame); + $this->ack($idx, $frame); $this->enqueue($item, $queue); - $this->commit(); - $this->begin(); + $this->commit($idx); + $this->begin($idx); $this->stats('requeued', $queue); return false; } $this->_log(LOG_INFO, "Successfully handled $info"); - $this->ack($frame); - $this->commit(); - $this->begin(); + $this->ack($idx, $frame); + $this->commit($idx); + $this->begin($idx); $this->stats('handled', $queue); return true; } @@ -414,10 +620,11 @@ class StompQueueManager extends QueueManager /** * Process a control signal broadcast. * + * @param int $idx connection index * @param array $frame Stomp frame * @return bool true to continue; false to stop further processing. */ - protected function handleControlSignal($frame) + protected function handleControlSignal($idx, $frame) { $message = trim($frame->body); if (strpos($message, ':') !== false) { @@ -441,12 +648,12 @@ class StompQueueManager extends QueueManager $this->_log(LOG_ERR, "Ignoring unrecognized control message: $message"); } - $this->ack($frame); - $this->commit(); - $this->begin(); + $this->ack($idx, $frame); + $this->commit($idx); + $this->begin($idx); return $shutdown; } - + /** * Set us up with queue subscriptions for a new site added at runtime, * triggered by a broadcast to the 'statusnet-control' topic. @@ -520,47 +727,49 @@ class StompQueueManager extends QueueManager common_log($level, 'StompQueueManager: '.$msg); } - protected function begin() + protected function begin($idx) { if ($this->useTransactions) { - if ($this->transaction) { + if (!empty($this->transaction[$idx])) { throw new Exception("Tried to start transaction in the middle of a transaction"); } - $this->transactionCount++; - $this->transaction = $this->master->id . '-' . $this->transactionCount . '-' . time(); - $this->con->begin($this->transaction); + $this->transactionCount[$idx]++; + $this->transaction[$idx] = $this->master->id . '-' . $this->transactionCount[$idx] . '-' . time(); + $this->cons[$idx]->begin($this->transaction[$idx]); } } - protected function ack($frame) + protected function ack($idx, $frame) { if ($this->useTransactions) { - if (!$this->transaction) { + if (empty($this->transaction[$idx])) { throw new Exception("Tried to ack but not in a transaction"); } + $this->cons[$idx]->ack($frame, $this->transaction[$idx]); + } else { + $this->cons[$idx]->ack($frame); } - $this->con->ack($frame, $this->transaction); } - protected function commit() + protected function commit($idx) { if ($this->useTransactions) { - if (!$this->transaction) { + if (empty($this->transaction[$idx])) { throw new Exception("Tried to commit but not in a transaction"); } - $this->con->commit($this->transaction); - $this->transaction = null; + $this->cons[$idx]->commit($this->transaction[$idx]); + $this->transaction[$idx] = null; } } - protected function rollback() + protected function rollback($idx) { if ($this->useTransactions) { - if (!$this->transaction) { + if (empty($this->transaction[$idx])) { throw new Exception("Tried to rollback but not in a transaction"); } - $this->con->commit($this->transaction); - $this->transaction = null; + $this->cons[$idx]->commit($this->transaction[$idx]); + $this->transaction[$idx] = null; } } } diff --git a/lib/util.php b/lib/util.php index 6c9f6316a..9e8ac26ad 100644 --- a/lib/util.php +++ b/lib/util.php @@ -178,7 +178,6 @@ function common_ensure_session() } if (isset($id)) { session_id($id); - setcookie(session_name(), $id); } @session_start(); if (!isset($_SESSION['started'])) { @@ -990,9 +989,14 @@ function common_enqueue_notice($notice) static $localTransports = array('omb', 'ping'); - static $allTransports = array('sms', 'plugin'); - - $transports = $allTransports; + $transports = array(); + if (common_config('sms', 'enabled')) { + $transports[] = 'sms'; + } + if (Event::hasHandler('HandleQueuedNotice')) { + $transports[] = 'plugin'; + } + $xmpp = common_config('xmpp', 'enabled'); @@ -1000,6 +1004,7 @@ function common_enqueue_notice($notice) $transports[] = 'jabber'; } + // @fixme move these checks into QueueManager and/or individual handlers if ($notice->is_local == Notice::LOCAL_PUBLIC || $notice->is_local == Notice::LOCAL_NONPUBLIC) { $transports = array_merge($transports, $localTransports); |