summaryrefslogtreecommitdiff
path: root/plugins/OStatus/lib/ostatusqueuehandler.php
diff options
context:
space:
mode:
authorBrion Vibber <brion@pobox.com>2010-06-04 11:48:54 -0700
committerBrion Vibber <brion@pobox.com>2010-06-07 10:03:43 -0700
commit41e9dba7297d43b7de0cb7665901869910d1047a (patch)
tree9da94d76ced9915be3e94065b66f19f33d5187e3 /plugins/OStatus/lib/ostatusqueuehandler.php
parent8b9436e8ae1ebcc7ef10752bb9666939200e26aa (diff)
OStatus plugin: Rolling batch queueing for PuSH output to >50 subscribing sites. Keeps latency down for other things enqueued while we work...
Diffstat (limited to 'plugins/OStatus/lib/ostatusqueuehandler.php')
-rw-r--r--plugins/OStatus/lib/ostatusqueuehandler.php31
1 files changed, 30 insertions, 1 deletions
diff --git a/plugins/OStatus/lib/ostatusqueuehandler.php b/plugins/OStatus/lib/ostatusqueuehandler.php
index d1e58f1d6..8905d2e21 100644
--- a/plugins/OStatus/lib/ostatusqueuehandler.php
+++ b/plugins/OStatus/lib/ostatusqueuehandler.php
@@ -25,6 +25,18 @@
*/
class OStatusQueueHandler extends QueueHandler
{
+ // If we have more than this many subscribing sites on a single feed,
+ // break up the PuSH distribution into smaller batches which will be
+ // rolled into the queue progressively. This reduces disruption to
+ // other, shorter activities being enqueued while we work.
+ const MAX_UNBATCHED = 50;
+
+ // Each batch (a 'hubprep' entry) will have this many items.
+ // Selected to provide a balance between queue packet size
+ // and number of batches that will end up getting processed.
+ // For 20,000 target sites, 1000 should work acceptably.
+ const BATCH_SIZE = 1000;
+
function transport()
{
return 'ostatus';
@@ -147,14 +159,31 @@ class OStatusQueueHandler extends QueueHandler
/**
* Queue up direct feed update pushes to subscribers on our internal hub.
+ * If there are a large number of subscriber sites, intermediate bulk
+ * distribution triggers may be queued.
+ *
* @param string $atom update feed, containing only new/changed items
* @param HubSub $sub open query of subscribers
*/
function pushFeedInternal($atom, $sub)
{
common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $sub->topic");
+ $n = 0;
+ $batch = array();
while ($sub->fetch()) {
- $sub->distribute($atom);
+ $n++;
+ if ($n < self::MAX_UNBATCHED) {
+ $sub->distribute($atom);
+ } else {
+ $batch[] = $sub->callback;
+ if (count($batch) >= self::BATCH_SIZE) {
+ $sub->bulkDistribute($atom, $batch);
+ $batch = array();
+ }
+ }
+ }
+ if (count($batch) >= 0) {
+ $sub->bulkDistribute($atom, $batch);
}
}