diff options
Diffstat (limited to 'plugins/OStatus')
-rw-r--r-- | plugins/OStatus/OStatusPlugin.php | 8 | ||||
-rw-r--r-- | plugins/OStatus/classes/HubSub.php | 51 | ||||
-rw-r--r-- | plugins/OStatus/lib/ostatusqueuehandler.php | 31 |
3 files changed, 87 insertions, 3 deletions
diff --git a/plugins/OStatus/OStatusPlugin.php b/plugins/OStatus/OStatusPlugin.php index 5b153216e..c61e2cc5f 100644 --- a/plugins/OStatus/OStatusPlugin.php +++ b/plugins/OStatus/OStatusPlugin.php @@ -87,6 +87,8 @@ class OStatusPlugin extends Plugin // Outgoing from our internal PuSH hub $qm->connect('hubconf', 'HubConfQueueHandler'); + $qm->connect('hubprep', 'HubPrepQueueHandler'); + $qm->connect('hubout', 'HubOutQueueHandler'); // Outgoing Salmon replies (when we don't need a return value) @@ -102,8 +104,10 @@ class OStatusPlugin extends Plugin */ function onStartEnqueueNotice($notice, &$transports) { - // put our transport first, in case there's any conflict (like OMB) - array_unshift($transports, 'ostatus'); + if ($notice->isLocal()) { + // put our transport first, in case there's any conflict (like OMB) + array_unshift($transports, 'ostatus'); + } return true; } diff --git a/plugins/OStatus/classes/HubSub.php b/plugins/OStatus/classes/HubSub.php index cdace3c1f..7db528a4e 100644 --- a/plugins/OStatus/classes/HubSub.php +++ b/plugins/OStatus/classes/HubSub.php @@ -260,6 +260,37 @@ class HubSub extends Memcached_DataObject $retries = intval(common_config('ostatus', 'hub_retries')); } + if (common_config('ostatus', 'local_push_bypass')) { + // If target is a local site, bypass the web server and drop the + // item directly into the target's input queue. + $url = parse_url($this->callback); + $wildcard = common_config('ostatus', 'local_wildcard'); + $site = Status_network::getFromHostname($url['host'], $wildcard); + + if ($site) { + if ($this->secret) { + $hmac = 'sha1=' . hash_hmac('sha1', $atom, $this->secret); + } else { + $hmac = ''; + } + + // Hack: at the moment we stick the subscription ID in the callback + // URL so we don't have to look inside the Atom to route the subscription. + // For now this means we need to extract that from the target URL + // so we can include it in the data. + $parts = explode('/', $url['path']); + $subId = intval(array_pop($parts)); + + $data = array('feedsub_id' => $subId, + 'post' => $atom, + 'hmac' => $hmac); + common_log(LOG_DEBUG, "Cross-site PuSH bypass enqueueing straight to $site->nickname feed $subId"); + $qm = QueueManager::get(); + $qm->enqueue($data, 'pushin', $site->nickname); + return; + } + } + // We dare not clone() as when the clone is discarded it'll // destroy the result data for the parent query. // @fixme use clone() again when it's safe to copy an @@ -274,6 +305,26 @@ class HubSub extends Memcached_DataObject } /** + * Queue up a large batch of pushes to multiple subscribers + * for this same topic update. + * + * If queues are disabled, this will run immediately. + * + * @param string $atom well-formed Atom feed + * @param array $pushCallbacks list of callback URLs + */ + function bulkDistribute($atom, $pushCallbacks) + { + $data = array('atom' => $atom, + 'topic' => $this->topic, + 'pushCallbacks' => $pushCallbacks); + common_log(LOG_INFO, "Queuing PuSH batch: $this->topic to " . + count($pushCallbacks) . " sites"); + $qm = QueueManager::get(); + $qm->enqueue($data, 'hubprep'); + } + + /** * Send a 'fat ping' to the subscriber's callback endpoint * containing the given Atom feed chunk. * diff --git a/plugins/OStatus/lib/ostatusqueuehandler.php b/plugins/OStatus/lib/ostatusqueuehandler.php index d1e58f1d6..8905d2e21 100644 --- a/plugins/OStatus/lib/ostatusqueuehandler.php +++ b/plugins/OStatus/lib/ostatusqueuehandler.php @@ -25,6 +25,18 @@ */ class OStatusQueueHandler extends QueueHandler { + // If we have more than this many subscribing sites on a single feed, + // break up the PuSH distribution into smaller batches which will be + // rolled into the queue progressively. This reduces disruption to + // other, shorter activities being enqueued while we work. + const MAX_UNBATCHED = 50; + + // Each batch (a 'hubprep' entry) will have this many items. + // Selected to provide a balance between queue packet size + // and number of batches that will end up getting processed. + // For 20,000 target sites, 1000 should work acceptably. + const BATCH_SIZE = 1000; + function transport() { return 'ostatus'; @@ -147,14 +159,31 @@ class OStatusQueueHandler extends QueueHandler /** * Queue up direct feed update pushes to subscribers on our internal hub. + * If there are a large number of subscriber sites, intermediate bulk + * distribution triggers may be queued. + * * @param string $atom update feed, containing only new/changed items * @param HubSub $sub open query of subscribers */ function pushFeedInternal($atom, $sub) { common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $sub->topic"); + $n = 0; + $batch = array(); while ($sub->fetch()) { - $sub->distribute($atom); + $n++; + if ($n < self::MAX_UNBATCHED) { + $sub->distribute($atom); + } else { + $batch[] = $sub->callback; + if (count($batch) >= self::BATCH_SIZE) { + $sub->bulkDistribute($atom, $batch); + $batch = array(); + } + } + } + if (count($batch) >= 0) { + $sub->bulkDistribute($atom, $batch); } } |