diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/default.php | 11 | ||||
-rw-r--r-- | lib/htmloutputter.php | 2 | ||||
-rw-r--r-- | lib/iomaster.php | 23 | ||||
-rw-r--r-- | lib/oauthclient.php | 88 | ||||
-rw-r--r-- | lib/omb.php | 8 | ||||
-rw-r--r-- | lib/statusnet.php | 2 | ||||
-rw-r--r-- | lib/stompqueuemanager.php | 119 |
7 files changed, 146 insertions, 107 deletions
diff --git a/lib/default.php b/lib/default.php index a74cccae1..c969c3b33 100644 --- a/lib/default.php +++ b/lib/default.php @@ -91,10 +91,13 @@ $default = 'spawndelay' => 1, // Wait at least N seconds between (re)spawns of child processes to avoid slamming the queue server with subscription startup 'debug_memory' => false, // true to spit memory usage to log 'inboxes' => true, // true to do inbox distribution & output queueing from in background via 'distrib' queue - 'breakout' => array('*' => 'shared'), // set global or per-handler queue breakout - // 'shared': use a shared queue for all sites - // 'handler': share each/this handler over multiple sites - // 'site': break out for each/this handler on this site + 'breakout' => array(), // List queue specifiers to break out when using Stomp queue. + // Default will share all queues for all sites within each group. + // Specify as <group>/<queue> or <group>/<queue>/<site>, + // using nickname identifier as site. + // + // 'main/distrib' separate "distrib" queue covering all sites + // 'xmpp/xmppout/mysite' separate "xmppout" queue covering just 'mysite' 'max_retries' => 10, // drop messages after N failed attempts to process (Stomp) 'dead_letter_dir' => false, // set to directory to save dropped messages into (Stomp) ), diff --git a/lib/htmloutputter.php b/lib/htmloutputter.php index 317f5ea61..4a88337bc 100644 --- a/lib/htmloutputter.php +++ b/lib/htmloutputter.php @@ -428,7 +428,7 @@ class HTMLOutputter extends XMLOutputter { if(Event::handle('StartCssLinkElement', array($this,&$src,&$theme,&$media))) { $url = parse_url($src); - if( empty($url->scheme) && empty($url->host) && empty($url->query) && empty($url->fragment)) + if( empty($url['scheme']) && empty($url['host']) && empty($url['query']) && empty($url['fragment'])) { if(file_exists(Theme::file($src,$theme))){ $src = Theme::path($src, $theme); diff --git a/lib/iomaster.php b/lib/iomaster.php index 54e2dfe84..d20837ba5 100644 --- a/lib/iomaster.php +++ b/lib/iomaster.php @@ -55,27 +55,18 @@ abstract class IoMaster if ($multiSite !== null) { $this->multiSite = $multiSite; } - if ($this->multiSite) { - $this->sites = StatusNet::findAllSites(); - } else { - $this->sites = array(StatusNet::currentSite()); - } - - if (empty($this->sites)) { - throw new Exception("Empty status_network table, cannot init"); - } - foreach ($this->sites as $site) { - StatusNet::switchSite($site); - $this->initManagers(); - } + $this->initManagers(); } /** - * Initialize IoManagers for the currently configured site - * which are appropriate to this instance. + * Initialize IoManagers which are appropriate to this instance; + * pass class names or instances into $this->instantiate(). + * + * If setup and configuration may vary between sites in multi-site + * mode, it's the subclass's responsibility to set them up here. * - * Pass class names into $this->instantiate() + * Switching site configurations is an acceptable side effect. */ abstract function initManagers(); diff --git a/lib/oauthclient.php b/lib/oauthclient.php index b22fd7897..bc7587183 100644 --- a/lib/oauthclient.php +++ b/lib/oauthclient.php @@ -90,20 +90,47 @@ class OAuthClient /** * Gets a request token from the given url * - * @param string $url OAuth endpoint for grabbing request tokens + * @param string $url OAuth endpoint for grabbing request tokens + * @param string $callback authorized request token callback * * @return OAuthToken $token the request token */ - function getRequestToken($url) + function getRequestToken($url, $callback = null) { - $response = $this->oAuthGet($url); + $params = null; + + if (!is_null($callback)) { + $params['oauth_callback'] = $callback; + } + + $response = $this->oAuthGet($url, $params); + $arr = array(); parse_str($response, $arr); - if (isset($arr['oauth_token']) && isset($arr['oauth_token_secret'])) { - $token = new OAuthToken($arr['oauth_token'], @$arr['oauth_token_secret']); + + $token = $arr['oauth_token']; + $secret = $arr['oauth_token_secret']; + $confirm = $arr['oauth_callback_confirmed']; + + if (isset($token) && isset($secret)) { + + $token = new OAuthToken($token, $secret); + + if (isset($confirm)) { + if ($confirm == 'true') { + common_debug('Twitter bridge - callback confirmed.'); + return $token; + } else { + throw new OAuthClientException( + 'Callback was not confirmed by Twitter.' + ); + } + } return $token; } else { - throw new OAuthClientException(); + throw new OAuthClientException( + 'Could not get a request token from Twitter.' + ); } } @@ -113,49 +140,64 @@ class OAuthClient * * @param string $url endpoint for authorizing request tokens * @param OAuthToken $request_token the request token to be authorized - * @param string $oauth_callback optional callback url * * @return string $authorize_url the url to redirect to */ - function getAuthorizeLink($url, $request_token, $oauth_callback = null) + function getAuthorizeLink($url, $request_token) { $authorize_url = $url . '?oauth_token=' . $request_token->key; - if (isset($oauth_callback)) { - $authorize_url .= '&oauth_callback=' . urlencode($oauth_callback); - } - return $authorize_url; } /** * Fetches an access token * - * @param string $url OAuth endpoint for exchanging authorized request tokens - * for access tokens + * @param string $url OAuth endpoint for exchanging authorized request tokens + * for access tokens + * @param string $verifier 1.0a verifier * * @return OAuthToken $token the access token */ - function getAccessToken($url) + function getAccessToken($url, $verifier = null) { - $response = $this->oAuthPost($url); - parse_str($response); - $token = new OAuthToken($oauth_token, $oauth_token_secret); - return $token; + $params = array(); + + if (!is_null($verifier)) { + $params['oauth_verifier'] = $verifier; + } + + $response = $this->oAuthPost($url, $params); + + $arr = array(); + parse_str($response, $arr); + + $token = $arr['oauth_token']; + $secret = $arr['oauth_token_secret']; + + if (isset($token) && isset($secret)) { + $token = new OAuthToken($token, $secret); + return $token; + } else { + throw new OAuthClientException( + 'Could not get a access token from Twitter.' + ); + } } /** - * Use HTTP GET to make a signed OAuth request + * Use HTTP GET to make a signed OAuth requesta * - * @param string $url OAuth endpoint + * @param string $url OAuth request token endpoint + * @param array $params additional parameters * * @return mixed the request */ - function oAuthGet($url) + function oAuthGet($url, $params = null) { $request = OAuthRequest::from_consumer_and_token($this->consumer, - $this->token, 'GET', $url, null); + $this->token, 'GET', $url, $params); $request->sign_request($this->sha1_method, $this->consumer, $this->token); diff --git a/lib/omb.php b/lib/omb.php index 0f38a4936..17132a594 100644 --- a/lib/omb.php +++ b/lib/omb.php @@ -29,11 +29,9 @@ require_once 'Auth/Yadis/Yadis.php'; function omb_oauth_consumer() { - static $con = null; - if (is_null($con)) { - $con = new OAuthConsumer(common_root_url(), ''); - } - return $con; + // Don't try to make this static. Leads to issues in + // multi-site setups - Z + return new OAuthConsumer(common_root_url(), ''); } function omb_oauth_server() diff --git a/lib/statusnet.php b/lib/statusnet.php index 9c7ede5a5..257bd861d 100644 --- a/lib/statusnet.php +++ b/lib/statusnet.php @@ -63,7 +63,7 @@ class StatusNet } } if (!class_exists($pluginclass)) { - throw new ServerException(500, "Plugin $name not found."); + throw new ServerException("Plugin $name not found.", 500); } } diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index bfeeb23b7..9af8b2f48 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -63,7 +63,7 @@ class StompQueueManager extends QueueManager $this->password = common_config('queue', 'stomp_password'); $this->base = common_config('queue', 'queue_basename'); $this->control = common_config('queue', 'control_channel'); - $this->subscriptions = array($this->control => $this->control); + $this->breakout = common_config('queue', 'breakout'); } /** @@ -76,28 +76,6 @@ class StompQueueManager extends QueueManager } /** - * Record queue subscriptions we'll need to handle the current site. - */ - public function addSite() - { - $this->sites[] = StatusNet::currentSite(); - - // Set up handlers active for this site... - $this->initialize(); - - foreach ($this->activeGroups as $group) { - if (isset($this->groups[$group])) { - // Actual queues may be broken out or consolidated... - // Subscribe to all the target queues we'll need. - foreach ($this->groups[$group] as $transport => $class) { - $target = $this->queueName($transport); - $this->subscriptions[$target] = $target; - } - } - } - } - - /** * Optional; ping any running queue handler daemons with a notification * such as announcing a new site to handle or requesting clean shutdown. * This avoids having to restart all the daemons manually to update configs @@ -166,14 +144,15 @@ class StompQueueManager extends QueueManager $con = $this->cons[$idx]; $host = $con->getServer(); - $result = $con->send($this->queueName($queue), $msg, $props); + $target = $this->queueName($queue); + $result = $con->send($target, $msg, $props); if (!$result) { - $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host"); + $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host $target"); return false; } - $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host"); + $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host $target"); $this->stats('enqueued', $queue); return true; } @@ -432,11 +411,42 @@ class StompQueueManager extends QueueManager protected function doSubscribe(LiberalStomp $con) { $host = $con->getServer(); - foreach ($this->subscriptions as $queue) { - $this->_log(LOG_INFO, "Subscribing to $queue on $host"); - $con->subscribe($queue); + foreach ($this->subscriptions() as $sub) { + $this->_log(LOG_INFO, "Subscribing to $sub on $host"); + $con->subscribe($sub); } } + + /** + * Grab a full list of stomp-side queue subscriptions. + * Will include: + * - control broadcast channel + * - shared group queues for active groups + * - per-handler and per-site breakouts from $config['queue']['breakout'] + * that are rooted in the active groups. + * + * @return array of strings + */ + protected function subscriptions() + { + $subs = array(); + $subs[] = $this->control; + + foreach ($this->activeGroups as $group) { + $subs[] = $this->base . $group; + } + + foreach ($this->breakout as $spec) { + $parts = explode('/', $spec); + if (count($parts) < 2 || count($parts) > 3) { + common_log(LOG_ERR, "Bad queue breakout specifier $spec"); + } + if (in_array($parts[0], $this->activeGroups)) { + $subs[] = $this->base . $spec; + } + } + return array_unique($subs); + } /** * Handle and acknowledge an event that's come in through a queue. @@ -612,32 +622,26 @@ class StompQueueManager extends QueueManager } /** - * Set us up with queue subscriptions for a new site added at runtime, + * (Re)load runtime configuration for a given site by nickname, * triggered by a broadcast to the 'statusnet-control' topic. * + * Configuration changes in database should update, but config + * files might not. + * * @param array $frame Stomp frame * @return bool true to continue; false to stop further processing. */ protected function updateSiteConfig($nickname) { - if (empty($this->sites)) { - if ($nickname == common_config('site', 'nickname')) { - StatusNet::init(common_config('site', 'server')); - } else { - $this->_log(LOG_INFO, "Ignoring update ping for other site $nickname"); + $sn = Status_network::staticGet($nickname); + if ($sn) { + $this->switchSite($nickname); + if (!in_array($nickname, $this->sites)) { + $this->addSite(); } + $this->stats('siteupdate'); } else { - $sn = Status_network::staticGet($nickname); - if ($sn) { - $this->switchSite($nickname); - if (!in_array($nickname, $this->sites)) { - $this->addSite(); - } - // @fixme update subscriptions, if applicable - $this->stats('siteupdate'); - } else { - $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname"); - } + $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname"); } } @@ -646,24 +650,25 @@ class StompQueueManager extends QueueManager * group name for this queue to give eg: * * /queue/statusnet/main + * /queue/statusnet/main/distrib + * /queue/statusnet/xmpp/xmppout/site01 * * @param string $queue * @return string */ protected function queueName($queue) { - $base = common_config('queue', 'queue_basename'); $group = $this->queueGroup($queue); - $breakout = $this->breakoutMode($queue); - if ($breakout == 'shared') { - return $base . "$group"; - } else if ($breakout == 'handler') { - return $base . "$group/$queue"; - } else if ($breakout == 'site') { - $site = StatusNet::currentSite(); - return $base . "$group/$queue/$site"; - } - throw Exception("Unrecognized queue breakout mode '$breakout' for '$queue'"); + $site = StatusNet::currentSite(); + + $specs = array("$group/$queue/$site", + "$group/$queue"); + foreach ($specs as $spec) { + if (in_array($spec, $this->breakout)) { + return $this->base . $spec; + } + } + return $this->base . $group; } /** |