From 4ae760cb62657e68b6b2313e64d2bb59fe264df4 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Wed, 10 Feb 2010 22:58:39 +0000 Subject: OStatus PuSH fixes: * HMAC now calculated correctly - confirmed interop with Google's public hub * Can optionally use an external PuSH hub, set URL in $config['ostatus']['hub'] (may have issues in replication environment, and will ping the hub for every update rather than just those with subscribers) Internal hub will still function when this is set, but won't be advertised. Warning: setting this, then turning it off later will break subscriptions as that hub will no longer receive pings. --- plugins/OStatus/lib/hubdistribqueuehandler.php | 70 ++++++++++++++++++++------ 1 file changed, 56 insertions(+), 14 deletions(-) (limited to 'plugins/OStatus/lib/hubdistribqueuehandler.php') diff --git a/plugins/OStatus/lib/hubdistribqueuehandler.php b/plugins/OStatus/lib/hubdistribqueuehandler.php index a35b8874c..245a57f72 100644 --- a/plugins/OStatus/lib/hubdistribqueuehandler.php +++ b/plugins/OStatus/lib/hubdistribqueuehandler.php @@ -49,15 +49,7 @@ class HubDistribQueueHandler extends QueueHandler $feed = common_local_url('ApiTimelineUser', array('id' => $notice->profile_id, 'format' => 'atom')); - $sub = new HubSub(); - $sub->topic = $feed; - if ($sub->find()) { - $atom = $this->userFeedForNotice($notice); - $this->pushFeeds($atom, $sub); - } else { - common_log(LOG_INFO, "No PuSH subscribers for $feed"); - } - return true; + $this->pushFeed($feed, array($this, 'userFeedForNotice'), $notice); } function pushGroup($notice, $group_id) @@ -65,19 +57,69 @@ class HubDistribQueueHandler extends QueueHandler $feed = common_local_url('ApiTimelineGroup', array('id' => $group_id, 'format' => 'atom')); + $this->pushFeed($feed, array($this, 'groupFeedForNotice'), $group_id, $notice); + } + + /** + * @param string $feed URI to the feed + * @param callable $callback function to generate Atom feed update if needed + * any additional params are passed to the callback. + */ + function pushFeed($feed, $callback) + { + $hub = common_config('ostatus', 'hub'); + if ($hub) { + $this->pushFeedExternal($feed, $hub); + } + $sub = new HubSub(); $sub->topic = $feed; if ($sub->find()) { - common_log(LOG_INFO, "Building PuSH feed for $feed"); - $atom = $this->groupFeedForNotice($group_id, $notice); - $this->pushFeeds($atom, $sub); + $args = array_slice(func_get_args(), 2); + $atom = call_user_func_array($callback, $args); + $this->pushFeedInternal($atom, $sub); } else { common_log(LOG_INFO, "No PuSH subscribers for $feed"); } + return true; } - - function pushFeeds($atom, $sub) + /** + * Ping external hub about this update. + * The hub will pull the feed and check for new items later. + * Not guaranteed safe in an environment with database replication. + * + * @param string $feed feed topic URI + * @param string $hub PuSH hub URI + * @fixme can consolidate pings for user & group posts + */ + function pushFeedExternal($feed, $hub) + { + $client = new HTTPClient(); + try { + $data = array('hub.mode' => 'publish', + 'hub.url' => $feed); + $response = $client->post($hub, array(), $data); + if ($response->getStatus() == 204) { + common_log(LOG_INFO, "PuSH ping to hub $hub for $feed ok"); + return true; + } else { + common_log(LOG_ERR, "PuSH ping to hub $hub for $feed failed with HTTP " . + $response->getStatus() . ': ' . + $response->getBody()); + } + } catch (Exception $e) { + common_log(LOG_ERR, "PuSH ping to hub $hub for $feed failed: " . $e->getMessage()); + return false; + } + } + + /** + * Queue up direct feed update pushes to subscribers on our internal hub. + * @param string $atom update feed, containing only new/changed items + * @param HubSub $sub open query of subscribers + */ + function pushFeedInternal($atom, $sub) { common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $sub->topic"); $qm = QueueManager::get(); -- cgit v1.2.3-54-g00ecf