diff options
Diffstat (limited to 'plugins/OStatus/lib')
-rw-r--r-- | plugins/OStatus/lib/discovery.php | 2 | ||||
-rw-r--r-- | plugins/OStatus/lib/discoveryhints.php | 3 | ||||
-rw-r--r-- | plugins/OStatus/lib/feeddiscovery.php | 12 | ||||
-rw-r--r-- | plugins/OStatus/lib/hubprepqueuehandler.php | 87 | ||||
-rw-r--r-- | plugins/OStatus/lib/linkheader.php | 2 | ||||
-rw-r--r-- | plugins/OStatus/lib/magicenvelope.php | 57 | ||||
-rw-r--r-- | plugins/OStatus/lib/ostatusqueuehandler.php | 31 | ||||
-rw-r--r-- | plugins/OStatus/lib/salmon.php | 4 | ||||
-rw-r--r-- | plugins/OStatus/lib/xrd.php | 103 | ||||
-rw-r--r-- | plugins/OStatus/lib/xrdaction.php | 5 |
10 files changed, 210 insertions, 96 deletions
diff --git a/plugins/OStatus/lib/discovery.php b/plugins/OStatus/lib/discovery.php index 44fad62fb..7187c1f3e 100644 --- a/plugins/OStatus/lib/discovery.php +++ b/plugins/OStatus/lib/discovery.php @@ -195,7 +195,7 @@ class Discovery_LRDD_Link_Header implements Discovery_LRDD // return false; } - return Discovery_LRDD_Link_Header::parseHeader($link_header); + return array(Discovery_LRDD_Link_Header::parseHeader($link_header)); } protected static function parseHeader($header) diff --git a/plugins/OStatus/lib/discoveryhints.php b/plugins/OStatus/lib/discoveryhints.php index 80cfbbf15..34c9be277 100644 --- a/plugins/OStatus/lib/discoveryhints.php +++ b/plugins/OStatus/lib/discoveryhints.php @@ -30,6 +30,7 @@ class DiscoveryHints { case Discovery::PROFILEPAGE: $hints['profileurl'] = $link['href']; break; + case Salmon::NS_MENTIONS: case Salmon::NS_REPLIES: $hints['salmon'] = $link['href']; break; @@ -83,7 +84,7 @@ class DiscoveryHints { $hints['fullname'] = implode(' ', $hcard['n']); } - if (array_key_exists('photo', $hcard)) { + if (array_key_exists('photo', $hcard) && count($hcard['photo'])) { $hints['avatar'] = $hcard['photo'][0]; } diff --git a/plugins/OStatus/lib/feeddiscovery.php b/plugins/OStatus/lib/feeddiscovery.php index 4809f9d35..a55399d7c 100644 --- a/plugins/OStatus/lib/feeddiscovery.php +++ b/plugins/OStatus/lib/feeddiscovery.php @@ -88,6 +88,16 @@ class FeedDiscovery } /** + * Get the referenced PuSH hub link from an Atom feed. + * + * @return mixed string or false + */ + public function getHubLink() + { + return $this->getAtomLink('hub'); + } + + /** * @param string $url * @param bool $htmlOk pass false here if you don't want to follow web pages. * @return string with validated URL @@ -104,7 +114,7 @@ class FeedDiscovery $response = $client->get($url); } catch (HTTP_Request2_Exception $e) { common_log(LOG_ERR, __METHOD__ . " Failure for $url - " . $e->getMessage()); - throw new FeedSubBadURLException($e); + throw new FeedSubBadURLException($e->getMessage()); } if ($htmlOk) { diff --git a/plugins/OStatus/lib/hubprepqueuehandler.php b/plugins/OStatus/lib/hubprepqueuehandler.php new file mode 100644 index 000000000..0d585938f --- /dev/null +++ b/plugins/OStatus/lib/hubprepqueuehandler.php @@ -0,0 +1,87 @@ +<?php +/* + * StatusNet - the distributed open-source microblogging tool + * Copyright (C) 2010, StatusNet, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +/** + * When we have a large batch of PuSH consumers, we break the data set + * into smaller chunks. Enqueue final destinations... + * + * @package Hub + * @author Brion Vibber <brion@status.net> + */ +class HubPrepQueueHandler extends QueueHandler +{ + // Enqueue this many low-level distributions before re-queueing the rest + // of the batch to be processed later. Helps to keep latency down for other + // things happening during a particularly long OStatus delivery session. + // + // [Could probably ditch this if we had working message delivery priorities + // for queueing, but this isn't supported in ActiveMQ 5.3.] + const ROLLING_BATCH = 20; + + function transport() + { + return 'hubprep'; + } + + function handle($data) + { + $topic = $data['topic']; + $atom = $data['atom']; + $pushCallbacks = $data['pushCallbacks']; + + assert(is_string($atom)); + assert(is_string($topic)); + assert(is_array($pushCallbacks)); + + // Set up distribution for the first n subscribing sites... + // If we encounter an uncatchable error, queue handling should + // automatically re-run the batch, which could lead to some dupe + // distributions. + // + // Worst case is if one of these hubprep entries dies too many + // times and gets dropped; the rest of the batch won't get processed. + try { + $n = 0; + while (count($pushCallbacks) && $n < self::ROLLING_BATCH) { + $n++; + $callback = array_shift($pushCallbacks); + $sub = HubSub::staticGet($topic, $callback); + if (!$sub) { + common_log(LOG_ERR, "Skipping PuSH delivery for deleted(?) consumer $callback on $topic"); + continue; + } + + $sub->distribute($atom); + } + } catch (Exception $e) { + common_log(LOG_ERR, "Exception during PuSH batch out: " . + $e->getMessage() . + " prepping $topic to $callback"); + } + + // And re-queue the rest of the batch! + if (count($pushCallbacks) > 0) { + $sub = new HubSub(); + $sub->topic = $topic; + $sub->bulkDistribute($atom, $pushCallbacks); + } + + return true; + } +} diff --git a/plugins/OStatus/lib/linkheader.php b/plugins/OStatus/lib/linkheader.php index afcd66d26..cd78d31ce 100644 --- a/plugins/OStatus/lib/linkheader.php +++ b/plugins/OStatus/lib/linkheader.php @@ -11,7 +11,7 @@ class LinkHeader preg_match('/^<[^>]+>/', $str, $uri_reference); //if (empty($uri_reference)) return; - $this->uri = trim($uri_reference[0], '<>'); + $this->href = trim($uri_reference[0], '<>'); $this->rel = array(); $this->type = null; diff --git a/plugins/OStatus/lib/magicenvelope.php b/plugins/OStatus/lib/magicenvelope.php index 9266cab5c..967e5f6d1 100644 --- a/plugins/OStatus/lib/magicenvelope.php +++ b/plugins/OStatus/lib/magicenvelope.php @@ -59,12 +59,21 @@ class MagicEnvelope } if ($xrd->links) { if ($link = Discovery::getService($xrd->links, Magicsig::PUBLICKEYREL)) { - list($type, $keypair) = explode(',', $link['href']); - if (empty($keypair)) { + $keypair = false; + $parts = explode(',', $link['href']); + if (count($parts) == 2) { + $keypair = $parts[1]; + } else { // Backwards compatibility check for separator bug in 0.9.0 - list($type, $keypair) = explode(';', $link['href']); + $parts = explode(';', $link['href']); + if (count($parts) == 2) { + $keypair = $parts[1]; + } + } + + if ($keypair) { + return $keypair; } - return $keypair; } } throw new Exception('Unable to locate signer public key'); @@ -74,7 +83,7 @@ class MagicEnvelope public function signMessage($text, $mimetype, $keypair) { $signature_alg = Magicsig::fromString($keypair); - $armored_text = base64_url_encode($text); + $armored_text = Magicsig::base64_url_encode($text); return array( 'data' => $armored_text, @@ -88,31 +97,25 @@ class MagicEnvelope } public function toXML($env) { - $dom = new DOMDocument(); - - $envelope = $dom->createElementNS(MagicEnvelope::NS, 'me:env'); - $envelope->setAttribute('xmlns:me', MagicEnvelope::NS); - $data = $dom->createElementNS(MagicEnvelope::NS, 'me:data', $env['data']); - $data->setAttribute('type', $env['data_type']); - $envelope->appendChild($data); - $enc = $dom->createElementNS(MagicEnvelope::NS, 'me:encoding', $env['encoding']); - $envelope->appendChild($enc); - $alg = $dom->createElementNS(MagicEnvelope::NS, 'me:alg', $env['alg']); - $envelope->appendChild($alg); - $sig = $dom->createElementNS(MagicEnvelope::NS, 'me:sig', $env['sig']); - $envelope->appendChild($sig); - - $dom->appendChild($envelope); + $xs = new XMLStringer(); + $xs->startXML(); + $xs->elementStart('me:env', array('xmlns:me' => MagicEnvelope::NS)); + $xs->element('me:data', array('type' => $env['data_type']), $env['data']); + $xs->element('me:encoding', null, $env['encoding']); + $xs->element('me:alg', null, $env['alg']); + $xs->element('me:sig', null, $env['sig']); + $xs->elementEnd('me:env'); - - return $dom->saveXML(); + $string = $xs->getString(); + common_debug($string); + return $string; } public function unfold($env) { $dom = new DOMDocument(); - $dom->loadXML(base64_url_decode($env['data'])); + $dom->loadXML(Magicsig::base64_url_decode($env['data'])); if ($dom->documentElement->tagName != 'entry') { return false; @@ -169,7 +172,7 @@ class MagicEnvelope return false; } - $text = base64_url_decode($env['data']); + $text = Magicsig::base64_url_decode($env['data']); $signer_uri = $this->getAuthor($text); try { @@ -207,13 +210,13 @@ class MagicEnvelope } $data_element = $env_element->getElementsByTagNameNS(MagicEnvelope::NS, 'data')->item(0); - + $sig_element = $env_element->getElementsByTagNameNS(MagicEnvelope::NS, 'sig')->item(0); return array( - 'data' => trim($data_element->nodeValue), + 'data' => preg_replace('/\s/', '', $data_element->nodeValue), 'data_type' => $data_element->getAttribute('type'), 'encoding' => $env_element->getElementsByTagNameNS(MagicEnvelope::NS, 'encoding')->item(0)->nodeValue, 'alg' => $env_element->getElementsByTagNameNS(MagicEnvelope::NS, 'alg')->item(0)->nodeValue, - 'sig' => $env_element->getElementsByTagNameNS(MagicEnvelope::NS, 'sig')->item(0)->nodeValue, + 'sig' => preg_replace('/\s/', '', $sig_element->nodeValue), ); } diff --git a/plugins/OStatus/lib/ostatusqueuehandler.php b/plugins/OStatus/lib/ostatusqueuehandler.php index d1e58f1d6..8905d2e21 100644 --- a/plugins/OStatus/lib/ostatusqueuehandler.php +++ b/plugins/OStatus/lib/ostatusqueuehandler.php @@ -25,6 +25,18 @@ */ class OStatusQueueHandler extends QueueHandler { + // If we have more than this many subscribing sites on a single feed, + // break up the PuSH distribution into smaller batches which will be + // rolled into the queue progressively. This reduces disruption to + // other, shorter activities being enqueued while we work. + const MAX_UNBATCHED = 50; + + // Each batch (a 'hubprep' entry) will have this many items. + // Selected to provide a balance between queue packet size + // and number of batches that will end up getting processed. + // For 20,000 target sites, 1000 should work acceptably. + const BATCH_SIZE = 1000; + function transport() { return 'ostatus'; @@ -147,14 +159,31 @@ class OStatusQueueHandler extends QueueHandler /** * Queue up direct feed update pushes to subscribers on our internal hub. + * If there are a large number of subscriber sites, intermediate bulk + * distribution triggers may be queued. + * * @param string $atom update feed, containing only new/changed items * @param HubSub $sub open query of subscribers */ function pushFeedInternal($atom, $sub) { common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $sub->topic"); + $n = 0; + $batch = array(); while ($sub->fetch()) { - $sub->distribute($atom); + $n++; + if ($n < self::MAX_UNBATCHED) { + $sub->distribute($atom); + } else { + $batch[] = $sub->callback; + if (count($batch) >= self::BATCH_SIZE) { + $sub->bulkDistribute($atom, $batch); + $batch = array(); + } + } + } + if (count($batch) >= 0) { + $sub->bulkDistribute($atom, $batch); } } diff --git a/plugins/OStatus/lib/salmon.php b/plugins/OStatus/lib/salmon.php index 3d3341bc6..ef7719a40 100644 --- a/plugins/OStatus/lib/salmon.php +++ b/plugins/OStatus/lib/salmon.php @@ -28,9 +28,11 @@ */ class Salmon { + const REL_SALMON = 'salmon'; + const REL_MENTIONED = 'mentioned'; + // XXX: these are deprecated const NS_REPLIES = "http://salmon-protocol.org/ns/salmon-replies"; - const NS_MENTIONS = "http://salmon-protocol.org/ns/salmon-mention"; /** diff --git a/plugins/OStatus/lib/xrd.php b/plugins/OStatus/lib/xrd.php index aa13ef024..a10b9f427 100644 --- a/plugins/OStatus/lib/xrd.php +++ b/plugins/OStatus/lib/xrd.php @@ -53,7 +53,14 @@ class XRD $xrd = new XRD(); $dom = new DOMDocument(); - if (!$dom->loadXML($xml)) { + + // Don't spew XML warnings to output + $old = error_reporting(); + error_reporting($old & ~E_WARNING); + $ok = $dom->loadXML($xml); + error_reporting($old); + + if (!$ok) { throw new Exception("Invalid XML"); } $xrd_element = $dom->getElementsByTagName('XRD')->item(0); @@ -99,44 +106,43 @@ class XRD public function toXML() { - $dom = new DOMDocument('1.0', 'UTF-8'); - $dom->formatOutput = true; - - $xrd_dom = $dom->createElementNS(XRD::XRD_NS, 'XRD'); - $dom->appendChild($xrd_dom); + $xs = new XMLStringer(); + + $xs->startXML(); + $xs->elementStart('XRD', array('xmlns' => XRD::XRD_NS)); if ($this->host) { - $host_dom = $dom->createElement('hm:Host', $this->host); - $xrd_dom->setAttributeNS(XRD::XML_NS, 'xmlns:hm', XRD::HOST_META_NS); - $xrd_dom->appendChild($host_dom); + $xs->element('hm:Host', array('xmlns:hm' => XRD::HOST_META_NS), $this->host); + } + + if ($this->expires) { + $xs->element('Expires', null, $this->expires); + } + + if ($this->subject) { + $xs->element('Subject', null, $this->subject); + } + + foreach ($this->alias as $alias) { + $xs->element('Alias', null, $alias); + } + + foreach ($this->links as $link) { + $titles = array(); + if (isset($link['title'])) { + $titles = $link['title']; + unset($link['title']); + } + $xs->elementStart('Link', $link); + foreach ($titles as $title) { + $xs->element('Title', null, $title); + } + $xs->elementEnd('Link'); } - if ($this->expires) { - $expires_dom = $dom->createElement('Expires', $this->expires); - $xrd_dom->appendChild($expires_dom); - } - - if ($this->subject) { - $subject_dom = $dom->createElement('Subject', $this->subject); - $xrd_dom->appendChild($subject_dom); - } - - foreach ($this->alias as $alias) { - $alias_dom = $dom->createElement('Alias', $alias); - $xrd_dom->appendChild($alias_dom); - } - - foreach ($this->types as $type) { - $type_dom = $dom->createElement('Type', $type); - $xrd_dom->appendChild($type_dom); - } - - foreach ($this->links as $link) { - $link_dom = $this->saveLink($dom, $link); - $xrd_dom->appendChild($link_dom); - } - - return $dom->saveXML(); + $xs->elementEnd('XRD'); + + return $xs->getString(); } function parseType($element) @@ -162,32 +168,5 @@ class XRD return $link; } - - function saveLink($doc, $link) - { - $link_element = $doc->createElement('Link'); - if (!empty($link['rel'])) { - $link_element->setAttribute('rel', $link['rel']); - } - if (!empty($link['type'])) { - $link_element->setAttribute('type', $link['type']); - } - if (!empty($link['href'])) { - $link_element->setAttribute('href', $link['href']); - } - if (!empty($link['template'])) { - $link_element->setAttribute('template', $link['template']); - } - - if (!empty($link['title']) && is_array($link['title'])) { - foreach($link['title'] as $title) { - $title = $doc->createElement('Title', $title); - $link_element->appendChild($title); - } - } - - - return $link_element; - } } diff --git a/plugins/OStatus/lib/xrdaction.php b/plugins/OStatus/lib/xrdaction.php index f1a56e0a8..d8cf648d6 100644 --- a/plugins/OStatus/lib/xrdaction.php +++ b/plugins/OStatus/lib/xrdaction.php @@ -76,6 +76,9 @@ class XrdAction extends Action $salmon_url = common_local_url('usersalmon', array('id' => $this->user->id)); + $xrd->links[] = array('rel' => Salmon::REL_SALMON, + 'href' => $salmon_url); + // XXX : Deprecated - to be removed. $xrd->links[] = array('rel' => Salmon::NS_REPLIES, 'href' => $salmon_url); @@ -98,7 +101,7 @@ class XrdAction extends Action $xrd->links[] = array('rel' => 'http://ostatus.org/schema/1.0/subscribe', 'template' => $url ); - header('Content-type: text/xml'); + header('Content-type: application/xrd+xml'); print $xrd->toXML(); } |