From 22ff358ba8d1fd0396136e1de570d788dd0727b6 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Thu, 18 Feb 2010 18:20:48 +0000 Subject: OStatus sub/unsub updates: - fix for PuSH unsub verification - send Salmon notification on unsub --- plugins/OStatus/actions/pushcallback.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'plugins/OStatus/actions/pushcallback.php') diff --git a/plugins/OStatus/actions/pushcallback.php b/plugins/OStatus/actions/pushcallback.php index 388c8f9c3..ed859a32f 100644 --- a/plugins/OStatus/actions/pushcallback.php +++ b/plugins/OStatus/actions/pushcallback.php @@ -89,7 +89,7 @@ class PushCallbackAction extends Action if ($profile->verify_token !== $verify_token) { common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback with bad token \"$verify_token\" for feed $topic"); - throw new ServerError("Bogus hub callback: bad token", 404); + throw new ServerException("Bogus hub callback: bad token", 404); } if ($mode != $profile->sub_state) { -- cgit v1.2.3-54-g00ecf From 0dac13d197248bf24ea51cb7911d32286764c0c8 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Thu, 18 Feb 2010 21:22:21 +0000 Subject: OStatus refactoring to clean up profile vs feed and fix up subscription issues. PuSH subscription maintenance broken back out to FeedSub, letting Ostatus_profile deal with the profile level (user or group, with unique id URI) --- plugins/OStatus/OStatusPlugin.php | 53 +-- plugins/OStatus/actions/feedsubsettings.php | 99 ++--- plugins/OStatus/actions/pushcallback.php | 22 +- plugins/OStatus/actions/salmon.php | 32 +- plugins/OStatus/classes/FeedSub.php | 443 ++++++++++++++++++++++ plugins/OStatus/classes/Ostatus_profile.php | 544 +++++++++------------------- plugins/OStatus/lib/feeddiscovery.php | 50 ++- plugins/OStatus/lib/feedmunger.php | 350 ------------------ 8 files changed, 741 insertions(+), 852 deletions(-) create mode 100644 plugins/OStatus/classes/FeedSub.php delete mode 100644 plugins/OStatus/lib/feedmunger.php (limited to 'plugins/OStatus/actions/pushcallback.php') diff --git a/plugins/OStatus/OStatusPlugin.php b/plugins/OStatus/OStatusPlugin.php index e548a151c..4ebe4551e 100644 --- a/plugins/OStatus/OStatusPlugin.php +++ b/plugins/OStatus/OStatusPlugin.php @@ -1,17 +1,7 @@ -Author URI: http://status.net/ -*/ - /* * StatusNet - the distributed open-source microblogging tool - * Copyright (C) 2009, StatusNet, Inc. + * Copyright (C) 2009-2010, StatusNet, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by @@ -28,17 +18,12 @@ Author URI: http://status.net/ */ /** - * @package FeedSubPlugin + * @package OStatusPlugin * @maintainer Brion Vibber */ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } -define('FEEDSUB_SERVICE', 100); // fixme -- avoid hardcoding these? - -// We bundle the XML_Parse_Feed library... -set_include_path(get_include_path() . PATH_SEPARATOR . dirname(__FILE__) . '/extlib'); - class FeedSubException extends Exception { } @@ -258,24 +243,6 @@ class OStatusPlugin extends Plugin } } - /** - * Notify remote server when one of our users subscribes. - * @fixme Check and restart the PuSH subscription if needed - * - * @param User $user - * @param Profile $other - * @return hook return value - */ - function onEndSubscribe($user, $other) - { - $oprofile = Ostatus_profile::staticGet('profile_id', $other->id); - if ($oprofile) { - // Notify the remote server of the unsub, if supported. - $oprofile->notify($user->getProfile(), ActivityVerb::FOLLOW, $oprofile); - } - return true; - } - /** * Notify remote server and garbage collect unused feeds on unsubscribe. * @fixme send these operations to background queues @@ -309,6 +276,7 @@ class OStatusPlugin extends Plugin function onCheckSchema() { $schema = Schema::get(); $schema->ensureTable('ostatus_profile', Ostatus_profile::schemaDef()); + $schema->ensureTable('feedsub', FeedSub::schemaDef()); $schema->ensureTable('hubsub', HubSub::schemaDef()); return true; } @@ -345,4 +313,19 @@ class OStatusPlugin extends Plugin return false; } } + + /** + * Send incoming PuSH feeds for OStatus endpoints in for processing. + * + * @param FeedSub $feedsub + * @param DOMDocument $feed + * @return mixed hook return code + */ + function onStartFeedSubReceive($feedsub, $feed) + { + $oprofile = Ostatus_profile::staticGet('feeduri', $feedsub->uri); + if ($oprofile) { + $oprofile->processFeed($feed); + } + } } diff --git a/plugins/OStatus/actions/feedsubsettings.php b/plugins/OStatus/actions/feedsubsettings.php index 6933c9bf2..3e1d0aa82 100644 --- a/plugins/OStatus/actions/feedsubsettings.php +++ b/plugins/OStatus/actions/feedsubsettings.php @@ -26,7 +26,7 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } class FeedSubSettingsAction extends ConnectSettingsAction { - protected $feedurl; + protected $profile_uri; protected $preview; protected $munger; @@ -88,7 +88,10 @@ class FeedSubSettingsAction extends ConnectSettingsAction $this->elementStart('ul', 'form_data'); $this->elementStart('li', array('id' => 'settings_twitter_login_button')); - $this->input('feedurl', _('Feed URL'), $this->feedurl, _('Enter the URL of a PubSubHubbub-enabled feed')); + $this->input('profile_uri', + _m('Feed URL'), + $this->profile_uri, + _m('Enter the profile URL of a PubSubHubbub-enabled feed')); $this->elementEnd('li'); $this->elementEnd('ul'); @@ -145,79 +148,55 @@ class FeedSubSettingsAction extends ConnectSettingsAction */ function validateFeed() { - $feedurl = trim($this->arg('feedurl')); + $profile_uri = trim($this->arg('profile_uri')); - if ($feedurl == '') { - $this->showForm(_m('Empty feed URL!')); + if ($profile_uri == '') { + $this->showForm(_m('Empty remote profile URL!')); return; } - $this->feedurl = $feedurl; + $this->profile_uri = $profile_uri; - // Get the canonical feed URI and check it + // @fixme validate, normalize bla bla try { - $discover = new FeedDiscovery(); - $uri = $discover->discoverFromURL($feedurl); + $oprofile = Ostatus_profile::ensureProfile($this->profile_uri); + $this->oprofile = $oprofile; + return true; } catch (FeedSubBadURLException $e) { - $this->showForm(_m('Invalid URL or could not reach server.')); - return false; + $err = _m('Invalid URL or could not reach server.'); } catch (FeedSubBadResponseException $e) { - $this->showForm(_m('Cannot read feed; server returned error.')); - return false; + $err = _m('Cannot read feed; server returned error.'); } catch (FeedSubEmptyException $e) { - $this->showForm(_m('Cannot read feed; server returned an empty page.')); - return false; + $err = _m('Cannot read feed; server returned an empty page.'); } catch (FeedSubBadHTMLException $e) { - $this->showForm(_m('Bad HTML, could not find feed link.')); - return false; + $err = _m('Bad HTML, could not find feed link.'); } catch (FeedSubNoFeedException $e) { - $this->showForm(_m('Could not find a feed linked from this URL.')); - return false; + $err = _m('Could not find a feed linked from this URL.'); } catch (FeedSubUnrecognizedTypeException $e) { - $this->showForm(_m('Not a recognized feed type.')); - return false; + $err = _m('Not a recognized feed type.'); } catch (FeedSubException $e) { // Any new ones we forgot about - $this->showForm(_m('Bad feed URL.')); - return false; + $err = sprintf(_m('Bad feed URL: %s %s'), get_class($e), $e->getMessage()); } - - $this->munger = $discover->feedMunger(); - $this->profile = $this->munger->ostatusProfile(); - if ($this->profile->huburi == '' && !common_config('feedsub', 'nohub')) { - $this->showForm(_m('Feed is not PuSH-enabled; cannot subscribe.')); - return false; - } - - return true; + $this->showForm($err); + return false; } function saveFeed() { if ($this->validateFeed()) { $this->preview = true; - $this->profile = Ostatus_profile::ensureProfile($this->munger); - if (!$this->profile) { - throw new ServerException("Feed profile was not saved properly."); - } - - // If not already in use, subscribe to updates via the hub - if ($this->profile->sub_start) { - common_log(LOG_INFO, __METHOD__ . ": double the fun! new sub for {$this->profile->feeduri} last subbed {$this->profile->sub_start}"); - } else { - $ok = $this->profile->subscribe(); - common_log(LOG_INFO, __METHOD__ . ": sub was $ok"); - if (!$ok) { - $this->showForm(_m('Feed subscription failed! Bad response from hub.')); - return; - } - } // And subscribe the current user to the local profile $user = common_current_user(); - if ($this->profile->isGroup()) { - $group = $this->profile->localGroup(); + if (!$this->oprofile->subscribe()) { + $this->showForm(_m("Failed to set up server-to-server subscription.")); + return; + } + + if ($this->oprofile->isGroup()) { + $group = $this->oprofile->localGroup(); if ($user->isMember($group)) { $this->showForm(_m('Already a member!')); } elseif (Group_member::join($this->profile->group_id, $user->id)) { @@ -226,13 +205,13 @@ class FeedSubSettingsAction extends ConnectSettingsAction $this->showForm(_m('Remote group join failed!')); } } else { - $local = $this->profile->localProfile(); + $local = $this->oprofile->localProfile(); if ($user->isSubscribed($local)) { $this->showForm(_m('Already subscribed!')); - } elseif ($user->subscribeTo($local)) { - $this->showForm(_m('Feed subscribed!')); + } elseif ($this->oprofile->subscribeLocalToRemote($user)) { + $this->showForm(_m('Remote user subscribed!')); } else { - $this->showForm(_m('Feed subscription failed!')); + $this->showForm(_m('Remote subscription failed!')); } } } @@ -248,17 +227,7 @@ class FeedSubSettingsAction extends ConnectSettingsAction function previewFeed() { - $profile = $this->munger->ostatusProfile(); - $notice = $this->munger->notice(0, true); // preview - - if ($notice) { - $this->element('b', null, 'Preview of latest post from this feed:'); - - $item = new NoticeList($notice, $this); - $item->show(); - } else { - $this->element('b', null, 'No posts in this feed yet.'); - } + $this->text('Profile preview should go here'); } function showScripts() diff --git a/plugins/OStatus/actions/pushcallback.php b/plugins/OStatus/actions/pushcallback.php index ed859a32f..7e1227a66 100644 --- a/plugins/OStatus/actions/pushcallback.php +++ b/plugins/OStatus/actions/pushcallback.php @@ -48,9 +48,9 @@ class PushCallbackAction extends Action throw new ServerException('Empty or invalid feed id', 400); } - $profile = Ostatus_profile::staticGet('id', $feedid); - if (!$profile) { - throw new ServerException('Unknown OStatus/PuSH feed id ' . $feedid, 400); + $feedsub = FeedSub::staticGet('id', $feedid); + if (!$feedsub) { + throw new ServerException('Unknown PuSH feed id ' . $feedid, 400); } $hmac = ''; @@ -62,7 +62,7 @@ class PushCallbackAction extends Action // @fixme Queue this to a background process; we should return // as quickly as possible from a distribution POST. - $profile->postUpdates($post, $hmac); + $feedsub->receive($post, $hmac); } /** @@ -81,29 +81,29 @@ class PushCallbackAction extends Action throw new ServerException("Bogus hub callback: bad mode", 404); } - $profile = Ostatus_profile::staticGet('feeduri', $topic); - if (!$profile) { + $feedsub = FeedSub::staticGet('uri', $topic); + if (!$feedsub) { common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback for unknown feed $topic"); throw new ServerException("Bogus hub callback: unknown feed", 404); } - if ($profile->verify_token !== $verify_token) { + if ($feedsub->verify_token !== $verify_token) { common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback with bad token \"$verify_token\" for feed $topic"); throw new ServerException("Bogus hub callback: bad token", 404); } - if ($mode != $profile->sub_state) { - common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback with bad mode \"$mode\" for feed $topic in state \"{$profile->sub_state}\""); + if ($mode != $feedsub->sub_state) { + common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback with bad mode \"$mode\" for feed $topic in state \"{$feedsub->sub_state}\""); throw new ServerException("Bogus hub callback: mode doesn't match subscription state.", 404); } // OK! if ($mode == 'subscribe') { common_log(LOG_INFO, __METHOD__ . ': sub confirmed'); - $profile->confirmSubscribe($lease_seconds); + $feedsub->confirmSubscribe($lease_seconds); } else { common_log(LOG_INFO, __METHOD__ . ": unsub confirmed; deleting sub record for $topic"); - $profile->confirmUnsubscribe(); + $feedsub->confirmUnsubscribe(); } print $challenge; } diff --git a/plugins/OStatus/actions/salmon.php b/plugins/OStatus/actions/salmon.php index 224134cd7..ea5b8e4ea 100644 --- a/plugins/OStatus/actions/salmon.php +++ b/plugins/OStatus/actions/salmon.php @@ -68,6 +68,9 @@ class SalmonAction extends Action return true; } + /** + * @fixme probably call Ostatus_profile::processFeed + */ function handle($args) { common_log(LOG_INFO, 'Salmon: incoming post for user '. $this->user->id); @@ -95,6 +98,9 @@ class SalmonAction extends Action } } + /** + * @fixme probably call Ostatus_profile::processFeed + */ function handlePost() { switch ($this->act->object->type) { @@ -111,14 +117,23 @@ class SalmonAction extends Action $profile = $this->ensureProfile(); } + /** + * @fixme probably call Ostatus_profile::processFeed + */ function handleFollow() { } + /** + * @fixme probably call Ostatus_profile::processFeed + */ function handleFavorite() { } + /** + * @fixme probably call Ostatus_profile::processFeed + */ function handleShare() { } @@ -131,17 +146,13 @@ class SalmonAction extends Action throw new Exception("Received a salmon slap from unidentified actor."); } - $ostatusProfile = Ostatus_profile::staticGet('homeuri', $actor->id); - - if (empty($ostatusProfile)) { - return $this->createProfile(); - } else { - // XXX: can we receive a salmon slap from a group...? - assert(!empty($ostatusProfile->profile_id)); - return Profile::staticGet($ostatusProfile->profile_id); - } + $ostatusProfile = Ostatus_profile::ensureActorProfile($this->act); + return $oprofile->localProfile(); } + /** + * @fixme anything new in here probably should be merged into Ostatus_profile::ensureActorProfile and friends + */ function createProfile() { $actor = $this->act->actor; @@ -186,6 +197,9 @@ class SalmonAction extends Action return $profile; } + /** + * @fixme should be merged into Ostatus_profile + */ function nicknameFromURI($uri) { preg_match('/(\w+):/', $uri, $matches); diff --git a/plugins/OStatus/classes/FeedSub.php b/plugins/OStatus/classes/FeedSub.php new file mode 100644 index 000000000..dc2c0b710 --- /dev/null +++ b/plugins/OStatus/classes/FeedSub.php @@ -0,0 +1,443 @@ +. + */ + +/** + * @package OStatusPlugin + * @maintainer Brion Vibber + */ + +/* +PuSH subscription flow: + + $profile->subscribe() + generate random verification token + save to verify_token + sends a sub request to the hub... + + main/push/callback + hub sends confirmation back to us via GET + We verify the request, then echo back the challenge. + On our end, we save the time we subscribed and the lease expiration + + main/push/callback + hub sends us updates via POST + +*/ + +class FeedDBException extends FeedSubException +{ + public $obj; + + function __construct($obj) + { + parent::__construct('Database insert failure'); + $this->obj = $obj; + } +} + +/** + * FeedSub handles low-level PubHubSubbub (PuSH) subscriptions. + * Higher-level behavior building OStatus stuff on top is handled + * under Ostatus_profile. + */ +class FeedSub extends Memcached_DataObject +{ + public $__table = 'feedsub'; + + public $id; + public $feeduri; + + // PuSH subscription data + public $huburi; + public $secret; + public $verify_token; + public $sub_state; // subscribe, active, unsubscribe, inactive + public $sub_start; + public $sub_end; + public $last_update; + + public $created; + public $modified; + + public /*static*/ function staticGet($k, $v=null) + { + return parent::staticGet(__CLASS__, $k, $v); + } + + /** + * return table definition for DB_DataObject + * + * DB_DataObject needs to know something about the table to manipulate + * instances. This method provides all the DB_DataObject needs to know. + * + * @return array array of column definitions + */ + + function table() + { + return array('id' => DB_DATAOBJECT_INT + DB_DATAOBJECT_NOTNULL, + 'uri' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, + 'huburi' => DB_DATAOBJECT_STR, + 'secret' => DB_DATAOBJECT_STR, + 'verify_token' => DB_DATAOBJECT_STR, + 'sub_state' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, + 'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, + 'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, + 'last_update' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL, + 'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL, + 'modified' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL); + } + + static function schemaDef() + { + return array(new ColumnDef('id', 'integer', + /*size*/ null, + /*nullable*/ false, + /*key*/ 'PRI', + /*default*/ '0', + /*extra*/ null, + /*auto_increment*/ true), + new ColumnDef('uri', 'varchar', + 255, false, 'UNI'), + new ColumnDef('huburi', 'text', + null, true), + new ColumnDef('verify_token', 'text', + null, true), + new ColumnDef('secret', 'text', + null, true), + new ColumnDef('sub_state', "enum('subscribe','active','unsubscribe','inactive')", + null, false), + new ColumnDef('sub_start', 'datetime', + null, true), + new ColumnDef('sub_end', 'datetime', + null, true), + new ColumnDef('last_update', 'datetime', + null, false), + new ColumnDef('created', 'datetime', + null, false), + new ColumnDef('modified', 'datetime', + null, false)); + } + + /** + * return key definitions for DB_DataObject + * + * DB_DataObject needs to know about keys that the table has; this function + * defines them. + * + * @return array key definitions + */ + + function keys() + { + return array_keys($this->keyTypes()); + } + + /** + * return key definitions for Memcached_DataObject + * + * Our caching system uses the same key definitions, but uses a different + * method to get them. + * + * @return array key definitions + */ + + function keyTypes() + { + return array('id' => 'K', 'uri' => 'U'); + } + + function sequenceKey() + { + return array('id', true, false); + } + + /** + * Fetch the StatusNet-side profile for this feed + * @return Profile + */ + public function localProfile() + { + if ($this->profile_id) { + return Profile::staticGet('id', $this->profile_id); + } + return null; + } + + /** + * Fetch the StatusNet-side profile for this feed + * @return Profile + */ + public function localGroup() + { + if ($this->group_id) { + return User_group::staticGet('id', $this->group_id); + } + return null; + } + + /** + * @param string $feeduri + * @return FeedSub + * @throws FeedSubException if feed is invalid or lacks PuSH setup + */ + public static function ensureFeed($feeduri) + { + $current = self::staticGet('uri', $feeduri); + if ($current) { + return $current; + } + + $discover = new FeedDiscovery(); + $discover->discoverFromFeedURL($feeduri); + + $huburi = $discover->getAtomLink('hub'); + if (!$huburi) { + throw new FeedSubNoHubException(); + } + + $feedsub = new FeedSub(); + $feedsub->uri = $feeduri; + $feedsub->huburi = $huburi; + $feedsub->sub_state = 'inactive'; + + $feedsub->created = common_sql_now(); + $feedsub->modified = common_sql_now(); + + $result = $feedsub->insert(); + if (empty($result)) { + throw new FeedDBException($feedsub); + } + + return $feedsub; + } + + /** + * Send a subscription request to the hub for this feed. + * The hub will later send us a confirmation POST to /main/push/callback. + * + * @return bool true on success, false on failure + * @throws ServerException if feed state is not valid + */ + public function subscribe($mode='subscribe') + { + if ($this->sub_state && $this->sub_state != 'inactive') { + throw new ServerException("Attempting to start PuSH subscription to feed in state $this->sub_state"); + } + if (empty($this->huburi)) { + if (common_config('feedsub', 'nohub')) { + // Fake it! We're just testing remote feeds w/o hubs. + return true; + } else { + throw new ServerException("Attempting to start PuSH subscription for feed with no hub"); + } + } + + return $this->doSubscribe('subscribe'); + } + + /** + * Send a PuSH unsubscription request to the hub for this feed. + * The hub will later send us a confirmation POST to /main/push/callback. + * + * @return bool true on success, false on failure + * @throws ServerException if feed state is not valid + */ + public function unsubscribe() { + if ($this->sub_state != 'active') { + throw new ServerException("Attempting to end PuSH subscription to feed in state $this->sub_state"); + } + if (empty($this->huburi)) { + if (common_config('feedsub', 'nohub')) { + // Fake it! We're just testing remote feeds w/o hubs. + return true; + } else { + throw new ServerException("Attempting to end PuSH subscription for feed with no hub"); + } + } + + return $this->doSubscribe('unsubscribe'); + } + + protected function doSubscribe($mode) + { + $orig = clone($this); + $this->verify_token = common_good_rand(16); + if ($mode == 'subscribe') { + $this->secret = common_good_rand(32); + } + $this->sub_state = $mode; + $this->update($orig); + unset($orig); + + try { + $callback = common_local_url('pushcallback', array('feed' => $this->id)); + $headers = array('Content-Type: application/x-www-form-urlencoded'); + $post = array('hub.mode' => $mode, + 'hub.callback' => $callback, + 'hub.verify' => 'async', + 'hub.verify_token' => $this->verify_token, + 'hub.secret' => $this->secret, + //'hub.lease_seconds' => 0, + 'hub.topic' => $this->uri); + $client = new HTTPClient(); + $response = $client->post($this->huburi, $headers, $post); + $status = $response->getStatus(); + if ($status == 202) { + common_log(LOG_INFO, __METHOD__ . ': sub req ok, awaiting verification callback'); + return true; + } else if ($status == 204) { + common_log(LOG_INFO, __METHOD__ . ': sub req ok and verified'); + return true; + } else if ($status >= 200 && $status < 300) { + common_log(LOG_ERR, __METHOD__ . ": sub req returned unexpected HTTP $status: " . $response->getBody()); + return false; + } else { + common_log(LOG_ERR, __METHOD__ . ": sub req failed with HTTP $status: " . $response->getBody()); + return false; + } + } catch (Exception $e) { + // wtf! + common_log(LOG_ERR, __METHOD__ . ": error \"{$e->getMessage()}\" hitting hub $this->huburi subscribing to $this->uri"); + + $orig = clone($this); + $this->verify_token = null; + $this->sub_state = null; + $this->update($orig); + unset($orig); + + return false; + } + } + + /** + * Save PuSH subscription confirmation. + * Sets approximate lease start and end times and finalizes state. + * + * @param int $lease_seconds provided hub.lease_seconds parameter, if given + */ + public function confirmSubscribe($lease_seconds=0) + { + $original = clone($this); + + $this->sub_state = 'active'; + $this->sub_start = common_sql_date(time()); + if ($lease_seconds > 0) { + $this->sub_end = common_sql_date(time() + $lease_seconds); + } else { + $this->sub_end = null; + } + $this->lastupdate = common_sql_now(); + + return $this->update($original); + } + + /** + * Save PuSH unsubscription confirmation. + * Wipes active PuSH sub info and resets state. + */ + public function confirmUnsubscribe() + { + $original = clone($this); + + // @fixme these should all be null, but DB_DataObject doesn't save null values...????? + $this->verify_token = ''; + $this->secret = ''; + $this->sub_state = ''; + $this->sub_start = ''; + $this->sub_end = ''; + $this->lastupdate = common_sql_now(); + + return $this->update($original); + } + + /** + * Accept updates from a PuSH feed. If validated, this object and the + * feed (as a DOMDocument) will be passed to the StartFeedSubHandleFeed + * and EndFeedSubHandleFeed events for processing. + * + * @param string $post source of Atom or RSS feed + * @param string $hmac X-Hub-Signature header, if present + */ + public function receive($post, $hmac) + { + common_log(LOG_INFO, __METHOD__ . ": packet for \"$this->uri\"! $hmac $post"); + + if ($this->sub_state != 'active') { + common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH for inactive feed $this->uri (in state '$this->sub_state')"); + return; + } + + if ($post === '') { + common_log(LOG_ERR, __METHOD__ . ": ignoring empty post"); + return; + } + + if (!$this->validatePushSig($post, $hmac)) { + // Per spec we silently drop input with a bad sig, + // while reporting receipt to the server. + return; + } + + $feed = new DOMDocument(); + if (!$feed->loadXML($post)) { + // @fixme might help to include the err message + common_log(LOG_ERR, __METHOD__ . ": ignoring invalid XML"); + return; + } + + Event::handle('StartFeedSubReceive', array($this, $feed)); + Event::handle('EndFeedSubReceive', array($this, $feed)); + } + + /** + * Validate the given Atom chunk and HMAC signature against our + * shared secret that was set up at subscription time. + * + * If we don't have a shared secret, there should be no signature. + * If we we do, our the calculated HMAC should match theirs. + * + * @param string $post raw XML source as POSTed to us + * @param string $hmac X-Hub-Signature HTTP header value, or empty + * @return boolean true for a match + */ + protected function validatePushSig($post, $hmac) + { + if ($this->secret) { + if (preg_match('/^sha1=([0-9a-fA-F]{40})$/', $hmac, $matches)) { + $their_hmac = strtolower($matches[1]); + $our_hmac = hash_hmac('sha1', $post, $this->secret); + if ($their_hmac === $our_hmac) { + return true; + } + common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bad SHA-1 HMAC: got $their_hmac, expected $our_hmac"); + } else { + common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bogus HMAC '$hmac'"); + } + } else { + if (empty($hmac)) { + return true; + } else { + common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with unexpected HMAC '$hmac'"); + } + } + return false; + } + +} diff --git a/plugins/OStatus/classes/Ostatus_profile.php b/plugins/OStatus/classes/Ostatus_profile.php index 486417617..1ce8ac491 100644 --- a/plugins/OStatus/classes/Ostatus_profile.php +++ b/plugins/OStatus/classes/Ostatus_profile.php @@ -18,62 +18,24 @@ */ /** - * @package FeedSubPlugin + * @package OStatusPlugin * @maintainer Brion Vibber */ -/* -PuSH subscription flow: - - $profile->subscribe() - generate random verification token - save to verify_token - sends a sub request to the hub... - - main/push/callback - hub sends confirmation back to us via GET - We verify the request, then echo back the challenge. - On our end, we save the time we subscribed and the lease expiration - - main/push/callback - hub sends us updates via POST - -*/ - -class FeedDBException extends FeedSubException -{ - public $obj; - - function __construct($obj) - { - parent::__construct('Database insert failure'); - $this->obj = $obj; - } -} - class Ostatus_profile extends Memcached_DataObject { public $__table = 'ostatus_profile'; - public $id; + public $uri; + public $profile_id; public $group_id; public $feeduri; - public $homeuri; - - // PuSH subscription data - public $huburi; - public $secret; - public $verify_token; - public $sub_state; // subscribe, active, unsubscribe - public $sub_start; - public $sub_end; - public $salmonuri; public $created; - public $lastupdate; + public $modified; public /*static*/ function staticGet($k, $v=null) { @@ -91,56 +53,30 @@ class Ostatus_profile extends Memcached_DataObject function table() { - return array('id' => DB_DATAOBJECT_INT + DB_DATAOBJECT_NOTNULL, + return array('uri' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, 'profile_id' => DB_DATAOBJECT_INT, 'group_id' => DB_DATAOBJECT_INT, 'feeduri' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, - 'homeuri' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, - 'huburi' => DB_DATAOBJECT_STR, - 'secret' => DB_DATAOBJECT_STR, - 'verify_token' => DB_DATAOBJECT_STR, - 'sub_state' => DB_DATAOBJECT_STR, - 'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, - 'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, 'salmonuri' => DB_DATAOBJECT_STR, 'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL, - 'lastupdate' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL); + 'modified' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL); } static function schemaDef() { - return array(new ColumnDef('id', 'integer', - /*size*/ null, - /*nullable*/ false, - /*key*/ 'PRI', - /*default*/ '0', - /*extra*/ null, - /*auto_increment*/ true), + return array(new ColumnDef('uri', 'varchar', + 255, false, 'PRI'), new ColumnDef('profile_id', 'integer', null, true, 'UNI'), new ColumnDef('group_id', 'integer', null, true, 'UNI'), new ColumnDef('feeduri', 'varchar', 255, false, 'UNI'), - new ColumnDef('homeuri', 'varchar', - 255, false), - new ColumnDef('huburi', 'text', - null, true), - new ColumnDef('verify_token', 'varchar', - 32, true), - new ColumnDef('secret', 'varchar', - 64, true), - new ColumnDef('sub_state', "enum('subscribe','active','unsubscribe')", - null, true), - new ColumnDef('sub_start', 'datetime', - null, true), - new ColumnDef('sub_end', 'datetime', - null, true), new ColumnDef('salmonuri', 'text', null, true), new ColumnDef('created', 'datetime', null, false), - new ColumnDef('lastupdate', 'datetime', + new ColumnDef('modified', 'datetime', null, false)); } @@ -169,12 +105,12 @@ class Ostatus_profile extends Memcached_DataObject function keyTypes() { - return array('id' => 'K', 'profile_id' => 'U', 'group_id' => 'U', 'feeduri' => 'U'); + return array('uri' => 'K', 'profile_id' => 'U', 'group_id' => 'U', 'feeduri' => 'U'); } function sequenceKey() { - return array('id', true, false); + return array(false, false, false); } /** @@ -201,101 +137,6 @@ class Ostatus_profile extends Memcached_DataObject return null; } - /** - * @param FeedMunger $munger - * @param boolean $isGroup is this a group record? - * @return Ostatus_profile - */ - public static function ensureProfile($munger) - { - $profile = $munger->ostatusProfile(); - - $current = self::staticGet('feeduri', $profile->feeduri); - if ($current) { - // @fixme we should probably update info as necessary - return $current; - } - - $profile->query('BEGIN'); - - try { - $local = $munger->profile(); - - if ($profile->isGroup()) { - $group = new User_group(); - $group->nickname = $local->nickname . '@remote'; // @fixme - $group->fullname = $local->fullname; - $group->homepage = $local->homepage; - $group->location = $local->location; - $group->created = $local->created; - $group->insert(); - if (empty($result)) { - throw new FeedDBException($group); - } - $profile->group_id = $group->id; - } else { - $result = $local->insert(); - if (empty($result)) { - throw new FeedDBException($local); - } - $profile->profile_id = $local->id; - } - - $profile->created = common_sql_now(); - $profile->lastupdate = common_sql_now(); - $result = $profile->insert(); - if (empty($result)) { - throw new FeedDBException($profile); - } - - $profile->query('COMMIT'); - } catch (FeedDBException $e) { - common_log_db_error($e->obj, 'INSERT', __FILE__); - $profile->query('ROLLBACK'); - return false; - } - - $avatar = $munger->getAvatar(); - if ($avatar) { - try { - $profile->updateAvatar($avatar); - } catch (Exception $e) { - common_log(LOG_ERR, "Exception setting OStatus avatar: " . - $e->getMessage()); - } - } - - return $profile; - } - - /** - * Download and update given avatar image - * @param string $url - * @throws Exception in various failure cases - */ - public function updateAvatar($url) - { - // @fixme this should be better encapsulated - // ripped from oauthstore.php (for old OMB client) - $temp_filename = tempnam(sys_get_temp_dir(), 'listener_avatar'); - copy($url, $temp_filename); - - // @fixme should we be using different ids? - $imagefile = new ImageFile($this->id, $temp_filename); - $filename = Avatar::filename($this->id, - image_type_to_extension($imagefile->type), - null, - common_timestamp()); - rename($temp_filename, Avatar::path($filename)); - if ($this->isGroup()) { - $group = $this->localGroup(); - $group->setOriginal($filename); - } else { - $profile = $this->localProfile(); - $profile->setOriginal($filename); - } - } - /** * Returns an XML string fragment with profile information as an * Activity Streams noun object with the given element type. @@ -345,7 +186,7 @@ class Ostatus_profile extends Memcached_DataObject $xs->element( 'id', null, - $this->homeuri); // ? + $this->uri); // ? $xs->element('title', null, $self->getBestName()); $xs->element( @@ -370,142 +211,95 @@ class Ostatus_profile extends Memcached_DataObject } /** - * Send a subscription request to the hub for this feed. - * The hub will later send us a confirmation POST to /main/push/callback. + * Subscribe a local user to this remote user. + * PuSH subscription will be started if necessary, and we'll + * send a Salmon notification to the remote server if available + * notifying them of the sub. * - * @return bool true on success, false on failure - * @throws ServerException if feed state is not valid + * @param User $user + * @return boolean success + * @throws FeedException */ - public function subscribe($mode='subscribe') + public function subscribeLocalToRemote(User $user) { - if ($this->sub_state != '') { - throw new ServerException("Attempting to start PuSH subscription to feed in state $this->sub_state"); + if ($this->isGroup()) { + throw new ServerException("Can't subscribe to a remote group"); } - if (empty($this->huburi)) { - if (common_config('feedsub', 'nohub')) { - // Fake it! We're just testing remote feeds w/o hubs. + + if ($this->subscribe()) { + if ($user->subscribeTo($this->localProfile())) { + $this->notify($user->getProfile(), ActivityVerb::FOLLOW, $this); return true; - } else { - throw new ServerException("Attempting to start PuSH subscription for feed with no hub"); } } - - return $this->doSubscribe('subscribe'); + return false; } /** - * Send a PuSH unsubscription request to the hub for this feed. - * The hub will later send us a confirmation POST to /main/push/callback. + * Mark this remote profile as subscribing to the given local user, + * and send appropriate notifications to the user. * - * @return bool true on success, false on failure - * @throws ServerException if feed state is not valid + * This will generally be in response to a subscription notification + * from a foreign site to our local Salmon response channel. + * + * @param User $user + * @return boolean success */ - public function unsubscribe() { - if ($this->sub_state != 'active') { - throw new ServerException("Attempting to end PuSH subscription to feed in state $this->sub_state"); - } - if (empty($this->huburi)) { - if (common_config('feedsub', 'nohub')) { - // Fake it! We're just testing remote feeds w/o hubs. - return true; - } else { - throw new ServerException("Attempting to end PuSH subscription for feed with no hub"); - } - } - - return $this->doSubscribe('unsubscribe'); - } - - protected function doSubscribe($mode) + public function subscribeRemoteToLocal(User $user) { - $orig = clone($this); - $this->verify_token = common_good_rand(16); - if ($mode == 'subscribe') { - $this->secret = common_good_rand(32); - } - $this->sub_state = $mode; - $this->update($orig); - unset($orig); - - try { - $callback = common_local_url('pushcallback', array('feed' => $this->id)); - $headers = array('Content-Type: application/x-www-form-urlencoded'); - $post = array('hub.mode' => $mode, - 'hub.callback' => $callback, - 'hub.verify' => 'async', - 'hub.verify_token' => $this->verify_token, - 'hub.secret' => $this->secret, - //'hub.lease_seconds' => 0, - 'hub.topic' => $this->feeduri); - $client = new HTTPClient(); - $response = $client->post($this->huburi, $headers, $post); - $status = $response->getStatus(); - if ($status == 202) { - common_log(LOG_INFO, __METHOD__ . ': sub req ok, awaiting verification callback'); - return true; - } else if ($status == 204) { - common_log(LOG_INFO, __METHOD__ . ': sub req ok and verified'); - return true; - } else if ($status >= 200 && $status < 300) { - common_log(LOG_ERR, __METHOD__ . ": sub req returned unexpected HTTP $status: " . $response->getBody()); - return false; - } else { - common_log(LOG_ERR, __METHOD__ . ": sub req failed with HTTP $status: " . $response->getBody()); - return false; - } - } catch (Exception $e) { - // wtf! - common_log(LOG_ERR, __METHOD__ . ": error \"{$e->getMessage()}\" hitting hub $this->huburi subscribing to $this->feeduri"); + if ($this->isGroup()) { + throw new ServerException("Remote groups can't subscribe to local users"); + } - $orig = clone($this); - $this->verify_token = null; - $this->sub_state = null; - $this->update($orig); - unset($orig); + // @fixme use regular channels for subbing, once they accept remote profiles + $sub = new Subscription(); + $sub->subscriber = $this->profile_id; + $sub->subscribed = $user->id; + $sub->created = common_sql_now(); // current time - return false; + if ($sub->insert()) { + // @fixme use subs_notify() if refactored to take profiles? + mail_subscribe_notify_profile($user, $this->localProfile()); + return true; } + return false; } /** - * Save PuSH subscription confirmation. - * Sets approximate lease start and end times and finalizes state. + * Send a subscription request to the hub for this feed. + * The hub will later send us a confirmation POST to /main/push/callback. * - * @param int $lease_seconds provided hub.lease_seconds parameter, if given + * @return bool true on success, false on failure + * @throws ServerException if feed state is not valid */ - public function confirmSubscribe($lease_seconds=0) + public function subscribe($mode='subscribe') { - $original = clone($this); - - $this->sub_state = 'active'; - $this->sub_start = common_sql_date(time()); - if ($lease_seconds > 0) { - $this->sub_end = common_sql_date(time() + $lease_seconds); - } else { - $this->sub_end = null; + $feedsub = FeedSub::ensureFeed($this->feeduri); + if ($feedsub->sub_state == 'active' || $feedsub->sub_state == 'subscribe') { + return true; + } else if ($feedsub->sub_state == '' || $feedsub->sub_state == 'inactive') { + return $feedsub->subscribe(); + } else if ('unsubscribe') { + throw new FeedSubException("Unsub is pending, can't subscribe..."); } - $this->lastupdate = common_sql_now(); - - return $this->update($original); } /** - * Save PuSH unsubscription confirmation. - * Wipes active PuSH sub info and resets state. + * Send a PuSH unsubscription request to the hub for this feed. + * The hub will later send us a confirmation POST to /main/push/callback. + * + * @return bool true on success, false on failure + * @throws ServerException if feed state is not valid */ - public function confirmUnsubscribe() - { - $original = clone($this); - - // @fixme these should all be null, but DB_DataObject doesn't save null values...????? - $this->verify_token = ''; - $this->secret = ''; - $this->sub_state = ''; - $this->sub_start = ''; - $this->sub_end = ''; - $this->lastupdate = common_sql_now(); - - return $this->update($original); + public function unsubscribe() { + $feedsub = FeedSub::staticGet('uri', $this->feeduri); + if ($feedsub->sub_state == 'active') { + return $feedsub->unsubscribe(); + } else if ($feedsub->sub_state == '' || $feedsub->sub_state == 'inactive' || $feedsub->sub_state == 'unsubscribe') { + return true; + } else if ($feedsub->sub_state == 'subscribe') { + throw new FeedSubException("Feed is awaiting subscription, can't unsub..."); + } } /** @@ -543,10 +337,7 @@ class Ostatus_profile extends Memcached_DataObject $entry->elementEnd('entry'); $feed = $this->atomFeed($actor); - #$feed->initFeed(); $feed->addEntry($entry); - #$feed->renderEntries(); - #$feed->endFeed(); $xml = $feed->getString(); common_log(LOG_INFO, "Posting to Salmon endpoint $salmon: $xml"); @@ -600,36 +391,10 @@ class Ostatus_profile extends Memcached_DataObject * Currently assumes that all items in the feed are new, * coming from a PuSH hub. * - * @param string $post source of Atom or RSS feed - * @param string $hmac X-Hub-Signature header, if present + * @param DOMDocument $feed */ - public function postUpdates($post, $hmac) + public function processFeed($feed) { - common_log(LOG_INFO, __METHOD__ . ": packet for \"$this->feeduri\"! $hmac $post"); - - if ($this->sub_state != 'active') { - common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH for inactive feed $this->feeduri (in state '$this->sub_state')"); - return; - } - - if ($post === '') { - common_log(LOG_ERR, __METHOD__ . ": ignoring empty post"); - return; - } - - if (!$this->validatePushSig($post, $hmac)) { - // Per spec we silently drop input with a bad sig, - // while reporting receipt to the server. - return; - } - - $feed = new DOMDocument(); - if (!$feed->loadXML($post)) { - // @fixme might help to include the err message - common_log(LOG_ERR, __METHOD__ . ": ignoring invalid XML"); - return; - } - $entries = $feed->getElementsByTagNameNS(Activity::ATOM, 'entry'); if ($entries->length == 0) { common_log(LOG_ERR, __METHOD__ . ": no entries in feed update, ignoring"); @@ -642,40 +407,6 @@ class Ostatus_profile extends Memcached_DataObject } } - /** - * Validate the given Atom chunk and HMAC signature against our - * shared secret that was set up at subscription time. - * - * If we don't have a shared secret, there should be no signature. - * If we we do, our the calculated HMAC should match theirs. - * - * @param string $post raw XML source as POSTed to us - * @param string $hmac X-Hub-Signature HTTP header value, or empty - * @return boolean true for a match - */ - protected function validatePushSig($post, $hmac) - { - if ($this->secret) { - if (preg_match('/^sha1=([0-9a-fA-F]{40})$/', $hmac, $matches)) { - $their_hmac = strtolower($matches[1]); - $our_hmac = hash_hmac('sha1', $post, $this->secret); - if ($their_hmac === $our_hmac) { - return true; - } - common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bad SHA-1 HMAC: got $their_hmac, expected $our_hmac"); - } else { - common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bogus HMAC '$hmac'"); - } - } else { - if (empty($hmac)) { - return true; - } else { - common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with unexpected HMAC '$hmac'"); - } - } - return false; - } - /** * Process a posted entry from this feed source. * @@ -704,14 +435,14 @@ class Ostatus_profile extends Memcached_DataObject { if ($this->isGroup()) { // @fixme validate these profiles in some way! - $oprofile = $this->ensureActorProfile($activity); + $oprofile = self::ensureActorProfile($activity); } else { - $actorUri = $this->getActorProfileURI($activity); - if ($actorUri == $this->homeuri) { + $actorUri = self::getActorProfileURI($activity); + if ($actorUri == $this->uri) { // @fixme check if profile info has changed and update it } else { // @fixme drop or reject the messages once we've got the canonical profile URI recorded sanely - common_log(LOG_INFO, "OStatus: Warning: non-group post with unexpected author: $actorUri expected $this->homeuri"); + common_log(LOG_INFO, "OStatus: Warning: non-group post with unexpected author: $actorUri expected $this->uri"); //return; } $oprofile = $this; @@ -787,6 +518,65 @@ class Ostatus_profile extends Memcached_DataObject return false; } + /** + * @param string $profile_url + * @return Ostatus_profile + * @throws FeedSubException + */ + public static function ensureProfile($profile_uri) + { + // Get the canonical feed URI and check it + $discover = new FeedDiscovery(); + $feeduri = $discover->discoverFromURL($profile_uri); + + $feedsub = FeedSub::ensureFeed($feeduri, $discover->feed); + $huburi = $discover->getAtomLink('hub'); + $salmonuri = $discover->getAtomLink('salmon'); + + if (!$huburi) { + // We can only deal with folks with a PuSH hub + throw new FeedSubNoHubException(); + } + + // Ok this is going to be a terrible hack! + // Won't be suitable for groups, empty feeds, or getting + // info that's only available on the profile page. + $entries = $discover->feed->getElementsByTagNameNS(Activity::ATOM, 'entry'); + if (!$entries || $entries->length == 0) { + throw new FeedSubException('empty feed'); + } + $first = new Activity($entries->item(0), $discover->feed); + return self::ensureActorProfile($first, $feeduri); + } + + /** + * Download and update given avatar image + * @param string $url + * @throws Exception in various failure cases + */ + protected function updateAvatar($url) + { + // @fixme this should be better encapsulated + // ripped from oauthstore.php (for old OMB client) + $temp_filename = tempnam(sys_get_temp_dir(), 'listener_avatar'); + copy($url, $temp_filename); + + // @fixme should we be using different ids? + $imagefile = new ImageFile($this->id, $temp_filename); + $filename = Avatar::filename($this->id, + image_type_to_extension($imagefile->type), + null, + common_timestamp()); + rename($temp_filename, Avatar::path($filename)); + if ($this->isGroup()) { + $group = $this->localGroup(); + $group->setOriginal($filename); + } else { + $profile = $this->localProfile(); + $profile->setOriginal($filename); + } + } + /** * Get an appropriate avatar image source URL, if available. * @@ -794,7 +584,7 @@ class Ostatus_profile extends Memcached_DataObject * @param DOMElement $feed * @return string */ - function getAvatar($actor, $feed) + protected static function getAvatar($actor, $feed) { $url = ''; $icon = ''; @@ -833,13 +623,18 @@ class Ostatus_profile extends Memcached_DataObject } /** - * @fixme move off of ostatus_profile or static? + * Fetch, or build if necessary, an Ostatus_profile for the actor + * in a given Activity Streams activity. + * + * @param Activity $activity + * @param string $feeduri if we already know the canonical feed URI! + * @return Ostatus_profile */ - function ensureActorProfile($activity) + public static function ensureActorProfile($activity, $feeduri=null) { - $profile = $this->getActorProfile($activity); + $profile = self::getActorProfile($activity); if (!$profile) { - $profile = $this->createActorProfile($activity); + $profile = self::createActorProfile($activity, $feeduri); } return $profile; } @@ -848,10 +643,10 @@ class Ostatus_profile extends Memcached_DataObject * @param Activity $activity * @return mixed matching Ostatus_profile or false if none known */ - function getActorProfile($activity) + protected static function getActorProfile($activity) { - $homeuri = $this->getActorProfileURI($activity); - return Ostatus_profile::staticGet('homeuri', $homeuri); + $homeuri = self::getActorProfileURI($activity); + return self::staticGet('uri', $homeuri); } /** @@ -859,7 +654,7 @@ class Ostatus_profile extends Memcached_DataObject * @return string * @throws ServerException */ - function getActorProfileURI($activity) + protected static function getActorProfileURI($activity) { $opts = array('allowed_schemes' => array('http', 'https')); $actor = $activity->actor; @@ -873,14 +668,19 @@ class Ostatus_profile extends Memcached_DataObject } /** - * + * @fixme validate stuff somewhere */ - function createActorProfile($activity) + protected static function createActorProfile($activity, $feeduri=null) { - $actor = $activity->actor(); - $homeuri = $this->getActivityProfileURI($activity); - $nickname = $this->getAuthorNick($activity); - $avatar = $this->getAvatar($actor, $feed); + $actor = $activity->actor; + $homeuri = self::getActorProfileURI($activity); + $nickname = self::getAuthorNick($activity); + $avatar = self::getAvatar($actor, $feed); + + if (!$homeuri) { + common_log(LOG_DEBUG, __METHOD__ . " empty actor profile URI: " . var_export($activity, true)); + throw new ServerException("No profile URI"); + } $profile = new Profile(); $profile->nickname = $nickname; @@ -894,9 +694,7 @@ class Ostatus_profile extends Memcached_DataObject // @todo lat/lon/location? $ok = $profile->insert(); - if ($ok) { - $this->updateAvatar($profile, $avatar); - } else { + if (!$ok) { throw new ServerException("Can't save local profile"); } @@ -904,11 +702,15 @@ class Ostatus_profile extends Memcached_DataObject // or need to split out some of the feed stuff // so we can leave it empty until later. $oprofile = new Ostatus_profile(); - $oprofile->homeuri = $homeuri; + $oprofile->uri = $homeuri; + if ($feeduri) { + $oprofile->feeduri = $feeduri; + } $oprofile->profile_id = $profile->id; $ok = $oprofile->insert(); if ($ok) { + $oprofile->updateAvatar($avatar); return $oprofile; } else { throw new ServerException("Can't save OStatus profile"); @@ -920,13 +722,13 @@ class Ostatus_profile extends Memcached_DataObject * @param Activity $activity * @return string */ - function getAuthorNick($activity) + protected static function getAuthorNick($activity) { // @fixme not technically part of the actor? foreach (array($activity->entry, $activity->feed) as $source) { - $author = ActivityUtil::child($source, 'author', Activity::ATOM); + $author = ActivityUtils::child($source, 'author', Activity::ATOM); if ($author) { - $name = ActivityUtil::child($author, 'name', Activity::ATOM); + $name = ActivityUtils::child($author, 'name', Activity::ATOM); if ($name) { return trim($name->textContent); } diff --git a/plugins/OStatus/lib/feeddiscovery.php b/plugins/OStatus/lib/feeddiscovery.php index 39985fc90..7afb71bdc 100644 --- a/plugins/OStatus/lib/feeddiscovery.php +++ b/plugins/OStatus/lib/feeddiscovery.php @@ -48,6 +48,14 @@ class FeedSubNoFeedException extends FeedSubException { } +class FeedSubBadXmlException extends FeedSubException +{ +} + +class FeedSubNoHubException extends FeedSubException +{ +} + /** * Given a web page or feed URL, discover the final location of the feed * and return its current contents. @@ -57,21 +65,25 @@ class FeedSubNoFeedException extends FeedSubException * if ($feed->discoverFromURL($url)) { * print $feed->uri; * print $feed->type; - * processFeed($feed->body); + * processFeed($feed->feed); // DOMDocument * } */ class FeedDiscovery { public $uri; public $type; - public $body; + public $feed; + /** Post-initialize query helper... */ + public function getLink($rel, $type=null) + { + // @fixme check for non-Atom links in RSS2 feeds as well + return self::getAtomLink($rel, $type); + } - public function feedMunger() + public function getAtomLink($rel, $type=null) { - require_once 'XML/Feed/Parser.php'; - $feed = new XML_Feed_Parser($this->body, false, false, true); // @fixme - return new FeedMunger($feed, $this->uri); + return ActivityUtils::getLink($this->feed->documentElement, $rel, $type); } /** @@ -90,6 +102,7 @@ class FeedDiscovery $client = new HTTPClient(); $response = $client->get($url); } catch (HTTP_Request2_Exception $e) { + common_log(LOG_ERR, __METHOD__ . " Failure for $url - " . $e->getMessage()); throw new FeedSubBadURLException($e); } @@ -107,7 +120,12 @@ class FeedDiscovery return $this->initFromResponse($response); } - + + function discoverFromFeedURL($url) + { + return $this->discoverFromURL($url, false); + } + function initFromResponse($response) { if (!$response->isOk()) { @@ -122,16 +140,26 @@ class FeedDiscovery $type = $response->getHeader('Content-Type'); if (preg_match('!^(text/xml|application/xml|application/(rss|atom)\+xml)!i', $type)) { - $this->uri = $sourceurl; - $this->type = $type; - $this->body = $body; - return true; + return $this->init($sourceurl, $type, $body); } else { common_log(LOG_WARNING, "Unrecognized feed type $type for $sourceurl"); throw new FeedSubUnrecognizedTypeException($type); } } + function init($sourceurl, $type, $body) + { + $feed = new DOMDocument(); + if ($feed->loadXML($body)) { + $this->uri = $sourceurl; + $this->type = $type; + $this->feed = $feed; + return $this->uri; + } else { + throw new FeedSubBadXmlException($url); + } + } + /** * @param string $url source URL, used to resolve relative links * @param string $body HTML body text diff --git a/plugins/OStatus/lib/feedmunger.php b/plugins/OStatus/lib/feedmunger.php deleted file mode 100644 index e8c46de90..000000000 --- a/plugins/OStatus/lib/feedmunger.php +++ /dev/null @@ -1,350 +0,0 @@ -. - */ - -/** - * @package FeedSubPlugin - * @maintainer Brion Vibber - */ - -if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } - -class FeedSubPreviewNotice extends Notice -{ - protected $fetched = true; - - function __construct($profile) - { - $this->profile = $profile; - $this->profile_id = 0; - } - - function getProfile() - { - return $this->profile; - } - - function find() - { - return true; - } - - function fetch() - { - $got = $this->fetched; - $this->fetched = false; - return $got; - } -} - -class FeedSubPreviewProfile extends Profile -{ - function getAvatar($width, $height=null) - { - return new FeedSubPreviewAvatar($width, $height, $this->avatar); - } -} - -class FeedSubPreviewAvatar extends Avatar -{ - function __construct($width, $height, $remote) - { - $this->remoteImage = $remote; - } - - function displayUrl() { - return $this->remoteImage; - } -} - -class FeedMunger -{ - /** - * @param XML_Feed_Parser $feed - */ - function __construct($feed, $url=null) - { - $this->feed = $feed; - $this->url = $url; - } - - function ostatusProfile() - { - $profile = new Ostatus_profile(); - $profile->feeduri = $this->url; - $profile->homeuri = $this->feed->link; - $profile->huburi = $this->getHubLink(); - $salmon = $this->getSalmonLink(); - if ($salmon) { - $profile->salmonuri = $salmon; - } - return $profile; - } - - function getAtomLink($item, $attribs=array()) - { - // XML_Feed_Parser gets confused by multiple elements. - $dom = $item->model; - - // Note that RSS feeds would embed an so this should work for both. - /// http://code.google.com/p/pubsubhubbub/wiki/RssFeeds - // - $links = $dom->getElementsByTagNameNS('http://www.w3.org/2005/Atom', 'link'); - for ($i = 0; $i < $links->length; $i++) { - $node = $links->item($i); - if ($node->hasAttributes()) { - $href = $node->attributes->getNamedItem('href'); - if ($href) { - $matches = 0; - foreach ($attribs as $name => $val) { - $attrib = $node->attributes->getNamedItem($name); - if ($attrib && $attrib->value == $val) { - $matches++; - } - } - if ($matches == count($attribs)) { - return $href->value; - } - } - } - } - return false; - } - - function getRssLink($item) - { - // XML_Feed_Parser gets confused by multiple elements. - $dom = $item->model; - - // Note that RSS feeds would embed an so this should work for both. - /// http://code.google.com/p/pubsubhubbub/wiki/RssFeeds - // - $links = $dom->getElementsByTagName('link'); - for ($i = 0; $i < $links->length; $i++) { - $node = $links->item($i); - if (!$node->hasAttributes()) { - return $node->textContent; - } - } - return false; - } - - function getAltLink($item) - { - // Check for an atom link... - $link = $this->getAtomLink($item, array('rel' => 'alternate', 'type' => 'text/html')); - if (!$link) { - $link = $this->getRssLink($item); - } - return $link; - } - - function getHubLink() - { - return $this->getAtomLink($this->feed, array('rel' => 'hub')); - } - - function getSalmonLink() - { - return $this->getAtomLink($this->feed, array('rel' => 'salmon')); - } - - function getSelfLink() - { - return $this->getAtomLink($this->feed, array('rel' => 'self')); - } - - /** - * Get an appropriate avatar image source URL, if available. - * @return mixed string or false - */ - function getAvatar() - { - $logo = $this->feed->logo; - if ($logo) { - return $logo; - } - $icon = $this->feed->icon; - if ($icon) { - return $icon; - } - return common_path('plugins/OStatus/images/48px-Feed-icon.svg.png'); - } - - function profile($preview=false) - { - if ($preview) { - $profile = new FeedSubPreviewProfile(); - } else { - $profile = new Profile(); - } - - // @todo validate/normalize nick? - $profile->nickname = $this->feed->title; - $profile->fullname = $this->feed->title; - $profile->homepage = $this->getAltLink($this->feed); - $profile->bio = $this->feed->description; - $profile->profileurl = $this->getAltLink($this->feed); - - if ($preview) { - $profile->avatar = $this->getAvatar(); - } - - // @todo tags from categories - // @todo lat/lon/location? - - return $profile; - } - - function notice($index=1, $preview=false) - { - $entry = $this->feed->getEntryByOffset($index); - if (!$entry) { - return null; - } - - if ($preview) { - $notice = new FeedSubPreviewNotice($this->profile(true)); - $notice->id = -1; - } else { - $notice = new Notice(); - $notice->profile_id = $this->profileIdForEntry($index); - } - - $link = $this->getAltLink($entry); - if (empty($link)) { - if (preg_match('!^https?://!', $entry->id)) { - $link = $entry->id; - common_log(LOG_DEBUG, "No link on entry, using URL from id: $link"); - } - } - $notice->uri = $link; - $notice->url = $link; - $notice->content = $this->noticeFromEntry($entry); - $notice->rendered = common_render_content($notice->content, $notice); // @fixme this is failing on group posts - $notice->created = common_sql_date($entry->updated); // @fixme - $notice->is_local = Notice::GATEWAY; - $notice->source = 'feed'; - - $location = $this->getLocation($entry); - if ($location) { - if ($location->location_id) { - $notice->location_ns = $location->location_ns; - $notice->location_id = $location->location_id; - } - $notice->lat = $location->lat; - $notice->lon = $location->lon; - } - - return $notice; - } - - function profileIdForEntry($index=1) - { - // hack hack hack - // should get profile for this entry's author... - $feeduri = $this->getSelfLink(); - $remote = Ostatus_profile::staticGet('feeduri', $feeduri); - if ($remote) { - return $remote->profile_id; - } else { - throw new Exception("Can't find feed profile for $feeduri"); - } - } - - /** - * Parse location given as a GeoRSS-simple point, if provided. - * http://www.georss.org/simple - * - * @param feed item $entry - * @return mixed Location or false - */ - function getLocation($entry) - { - $dom = $entry->model; - $points = $dom->getElementsByTagNameNS('http://www.georss.org/georss', 'point'); - - for ($i = 0; $i < $points->length; $i++) { - $point = $points->item(0)->textContent; - $point = str_replace(',', ' ', $point); // per spec "treat commas as whitespace" - $point = preg_replace('/\s+/', ' ', $point); - $point = trim($point); - $coords = explode(' ', $point); - if (count($coords) == 2) { - list($lat, $lon) = $coords; - if (is_numeric($lat) && is_numeric($lon)) { - common_log(LOG_INFO, "Looking up location for $lat $lon from georss"); - return Location::fromLatLon($lat, $lon); - } - } - common_log(LOG_ERR, "Ignoring bogus georss:point value $point"); - } - - return false; - } - - /** - * @param XML_Feed_Type $entry - * @return string notice text, within post size limit - */ - function noticeFromEntry($entry) - { - $max = Notice::maxContent(); - $ellipsis = "\xe2\x80\xa6"; // U+2026 HORIZONTAL ELLIPSIS - $title = $entry->title; - $link = $entry->link; - - // @todo We can get entries like this: - // $cats = $entry->getCategory('category', array(0, true)); - // but it feels like an awful hack. If it's accessible cleanly, - // try adding #hashtags from the categories/tags on a post. - - $title = $entry->title; - $link = $this->getAltLink($entry); - if ($link) { - // Blog post or such... - // @todo Should we force a language here? - $format = _m('New post: "%1$s" %2$s'); - $out = sprintf($format, $title, $link); - - // Trim link if needed... - if (mb_strlen($out) > $max) { - $link = common_shorten_url($link); - $out = sprintf($format, $title, $link); - } - - // Trim title if needed... - if (mb_strlen($out) > $max) { - $used = mb_strlen($out) - mb_strlen($title); - $available = $max - $used - mb_strlen($ellipsis); - $title = mb_substr($title, 0, $available) . $ellipsis; - $out = sprintf($format, $title, $link); - } - } else { - // No link? Consider a bare status update. - if (mb_strlen($title) > $max) { - $available = $max - mb_strlen($ellipsis); - $out = mb_substr($title, 0, $available) . $ellipsis; - } else { - $out = $title; - } - } - - return $out; - } -} -- cgit v1.2.3-54-g00ecf From 5349aa420ed45c2f5bf773d10c7709826ae6babd Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Sun, 21 Feb 2010 13:40:59 -0800 Subject: OStatus feedsub fixlets: - actually udpate feedsub.last_update when we get a new PuSH update in - move incoming PuSH processing to a queue handler to minimize time spent before POST return, as recommended by PuSH spec. When queues are disabled this'll still be handled immediately. --- plugins/OStatus/OStatusPlugin.php | 4 +++ plugins/OStatus/actions/pushcallback.php | 9 +++-- plugins/OStatus/classes/FeedSub.php | 10 ++++++ plugins/OStatus/lib/pushinputqueuehandler.php | 49 +++++++++++++++++++++++++++ 4 files changed, 70 insertions(+), 2 deletions(-) create mode 100644 plugins/OStatus/lib/pushinputqueuehandler.php (limited to 'plugins/OStatus/actions/pushcallback.php') diff --git a/plugins/OStatus/OStatusPlugin.php b/plugins/OStatus/OStatusPlugin.php index b966661db..c5a2db3d8 100644 --- a/plugins/OStatus/OStatusPlugin.php +++ b/plugins/OStatus/OStatusPlugin.php @@ -78,9 +78,13 @@ class OStatusPlugin extends Plugin */ function onEndInitializeQueueManager(QueueManager $qm) { + // Outgoing from our internal PuSH hub $qm->connect('hubverify', 'HubVerifyQueueHandler'); $qm->connect('hubdistrib', 'HubDistribQueueHandler'); $qm->connect('hubout', 'HubOutQueueHandler'); + + // Incoming from a foreign PuSH hub + $qm->connect('pushinput', 'PushInputQueueHandler'); return true; } diff --git a/plugins/OStatus/actions/pushcallback.php b/plugins/OStatus/actions/pushcallback.php index 7e1227a66..9e976a80d 100644 --- a/plugins/OStatus/actions/pushcallback.php +++ b/plugins/OStatus/actions/pushcallback.php @@ -60,9 +60,14 @@ class PushCallbackAction extends Action $post = file_get_contents('php://input'); - // @fixme Queue this to a background process; we should return + // Queue this to a background process; we should return // as quickly as possible from a distribution POST. - $feedsub->receive($post, $hmac); + // If queues are disabled this'll process immediately. + $data = array('feedsub_id' => $feedsub->id, + 'post' => $post, + 'hmac' => $hmac); + $qm = QueueManager::get(); + $qm->enqueue($data, 'pushinput'); } /** diff --git a/plugins/OStatus/classes/FeedSub.php b/plugins/OStatus/classes/FeedSub.php index bf9d063fa..31241d3de 100644 --- a/plugins/OStatus/classes/FeedSub.php +++ b/plugins/OStatus/classes/FeedSub.php @@ -372,6 +372,12 @@ class FeedSub extends Memcached_DataObject * feed (as a DOMDocument) will be passed to the StartFeedSubHandleFeed * and EndFeedSubHandleFeed events for processing. * + * Not guaranteed to be running in an immediate POST context; may be run + * from a queue handler. + * + * Side effects: the feedsub record's lastupdate field will be updated + * to the current time (not published time) if we got a legit update. + * * @param string $post source of Atom or RSS feed * @param string $hmac X-Hub-Signature header, if present */ @@ -402,6 +408,10 @@ class FeedSub extends Memcached_DataObject return; } + $orig = clone($this); + $this->last_update = common_sql_now(); + $this->update($orig); + Event::handle('StartFeedSubReceive', array($this, $feed)); Event::handle('EndFeedSubReceive', array($this, $feed)); } diff --git a/plugins/OStatus/lib/pushinputqueuehandler.php b/plugins/OStatus/lib/pushinputqueuehandler.php new file mode 100644 index 000000000..cbd9139b5 --- /dev/null +++ b/plugins/OStatus/lib/pushinputqueuehandler.php @@ -0,0 +1,49 @@ +. + */ + +/** + * Process a feed distribution POST from a PuSH hub. + * @package FeedSub + * @author Brion Vibber + */ + +class PushInputQueueHandler extends QueueHandler +{ + function transport() + { + return 'pushinput'; + } + + function handle($data) + { + assert(is_array($data)); + + $feedsub_id = $data['feedsub_id']; + $post = $data['post']; + $hmac = $data['hmac']; + + $feedsub = FeedSub::staticGet('id', $feedsub_id); + if ($feedsub) { + $feedsub->receive($post, $hmac); + } else { + common_log(LOG_ERR, "Discarding POST to unknown feed subscription id $feedsub_id"); + } + return true; + } +} -- cgit v1.2.3-54-g00ecf From aa0b2ce81ad4a99fb55a36feda54e70bcd0808be Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Sun, 21 Feb 2010 14:28:06 -0800 Subject: OStatus PuSH fixlets: - set minimal error page output on PuSH callback - allow hub to retry ($config['ostatus']['hub_retries']), simplify internal iface a bit. Retries are pushed to end of queue but otherwise not delayed yet; makes delivery more robust to one-off transitory errors but not yet against downtime. --- plugins/OStatus/actions/pushcallback.php | 1 + plugins/OStatus/classes/HubSub.php | 47 +++++++++++++++++--------- plugins/OStatus/lib/hubdistribqueuehandler.php | 5 +-- plugins/OStatus/lib/huboutqueuehandler.php | 18 +++++++--- 4 files changed, 46 insertions(+), 25 deletions(-) (limited to 'plugins/OStatus/actions/pushcallback.php') diff --git a/plugins/OStatus/actions/pushcallback.php b/plugins/OStatus/actions/pushcallback.php index 9e976a80d..35c92c732 100644 --- a/plugins/OStatus/actions/pushcallback.php +++ b/plugins/OStatus/actions/pushcallback.php @@ -29,6 +29,7 @@ class PushCallbackAction extends Action { function handle() { + StatusNet::setApi(true); // Minimize error messages to aid in debugging parent::handle(); if ($_SERVER['REQUEST_METHOD'] == 'POST') { $this->handlePost(); diff --git a/plugins/OStatus/classes/HubSub.php b/plugins/OStatus/classes/HubSub.php index 0cd4281f8..a81de68e6 100644 --- a/plugins/OStatus/classes/HubSub.php +++ b/plugins/OStatus/classes/HubSub.php @@ -226,6 +226,26 @@ class HubSub extends Memcached_DataObject return parent::insert(); } + /** + * Schedule delivery of a 'fat ping' to the subscriber's callback + * endpoint. If queues are disabled, this will run immediately. + * + * @param string $atom well-formed Atom feed + * @param int $retries optional count of retries if POST fails; defaults to hub_retries from config or 0 if unset + */ + function distribute($atom, $retries=null) + { + if ($retries === null) { + $retries = intval(common_config('ostatus', 'hub_retries')); + } + + $data = array('sub' => clone($this), + 'atom' => $atom, + 'retries' => $retries); + $qm = QueueManager::get(); + $qm->enqueue($data, 'hubout'); + } + /** * Send a 'fat ping' to the subscriber's callback endpoint * containing the given Atom feed chunk. @@ -234,6 +254,7 @@ class HubSub extends Memcached_DataObject * a higher level; don't just shove in a complete feed! * * @param string $atom well-formed Atom feed + * @throws Exception (HTTP or general) */ function push($atom) { @@ -245,24 +266,18 @@ class HubSub extends Memcached_DataObject $hmac = '(none)'; } common_log(LOG_INFO, "About to push feed to $this->callback for $this->topic, HMAC $hmac"); - try { - $request = new HTTPClient(); - $request->setBody($atom); - $response = $request->post($this->callback, $headers); - if ($response->isOk()) { - return true; - } - common_log(LOG_ERR, "Error sending PuSH content " . - "to $this->callback for $this->topic: " . - $response->getStatus()); - return false; + $request = new HTTPClient(); + $request->setBody($atom); + $response = $request->post($this->callback, $headers); - } catch (Exception $e) { - common_log(LOG_ERR, "Error sending PuSH content " . - "to $this->callback for $this->topic: " . - $e->getMessage()); - return false; + if ($response->isOk()) { + return true; + } else { + throw new Exception("Callback returned status: " . + $response->getStatus() . + "; body: " . + trim($response->getBody())); } } } diff --git a/plugins/OStatus/lib/hubdistribqueuehandler.php b/plugins/OStatus/lib/hubdistribqueuehandler.php index 245a57f72..30a427e3f 100644 --- a/plugins/OStatus/lib/hubdistribqueuehandler.php +++ b/plugins/OStatus/lib/hubdistribqueuehandler.php @@ -124,10 +124,7 @@ class HubDistribQueueHandler extends QueueHandler common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $sub->topic"); $qm = QueueManager::get(); while ($sub->fetch()) { - common_log(LOG_INFO, "Prepping PuSH distribution to $sub->callback for $sub->topic"); - $data = array('sub' => clone($sub), - 'atom' => $atom); - $qm->enqueue($data, 'hubout'); + $sub->distribute($atom); } } diff --git a/plugins/OStatus/lib/huboutqueuehandler.php b/plugins/OStatus/lib/huboutqueuehandler.php index 0791c7e5d..3ad94646e 100644 --- a/plugins/OStatus/lib/huboutqueuehandler.php +++ b/plugins/OStatus/lib/huboutqueuehandler.php @@ -33,6 +33,7 @@ class HubOutQueueHandler extends QueueHandler { $sub = $data['sub']; $atom = $data['atom']; + $retries = $data['retries']; assert($sub instanceof HubSub); assert(is_string($atom)); @@ -40,13 +41,20 @@ class HubOutQueueHandler extends QueueHandler try { $sub->push($atom); } catch (Exception $e) { - common_log(LOG_ERR, "Failed PuSH to $sub->callback for $sub->topic: " . - $e->getMessage()); - // @fixme Reschedule a later delivery? - return true; + $retries--; + $msg = "Failed PuSH to $sub->callback for $sub->topic: " . + $e->getMessage(); + if ($retries > 0) { + common_log(LOG_ERR, "$msg; scheduling for $retries more tries"); + + // @fixme when we have infrastructure to schedule a retry + // after a delay, use it. + $sub->distribute($atom, $retries); + } else { + common_log(LOG_ERR, "$msg; discarding"); + } } return true; } } - -- cgit v1.2.3-54-g00ecf From 78ca45c7a05dea911c58097a8c57be470dafee01 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Sun, 21 Feb 2010 14:46:26 -0800 Subject: OStatus PuSH fixes: - hub now defers subscription state updates until after verification, per spec - hub now supports synchronous verification when requested (if async is not requested after) - client now requests synchronous verification (it's a bit safer) - cleanup on subscription logging/error responses --- plugins/OStatus/actions/pushcallback.php | 34 +++++--- plugins/OStatus/actions/pushhub.php | 141 ++++++++++++++++++------------- plugins/OStatus/classes/FeedSub.php | 11 ++- plugins/OStatus/classes/HubSub.php | 123 ++++++++++++++++----------- 4 files changed, 177 insertions(+), 132 deletions(-) (limited to 'plugins/OStatus/actions/pushcallback.php') diff --git a/plugins/OStatus/actions/pushcallback.php b/plugins/OStatus/actions/pushcallback.php index 35c92c732..4184f0e0c 100644 --- a/plugins/OStatus/actions/pushcallback.php +++ b/plugins/OStatus/actions/pushcallback.php @@ -72,7 +72,7 @@ class PushCallbackAction extends Action } /** - * Handler for GET verification requests from the hub + * Handler for GET verification requests from the hub. */ function handleGet() { @@ -81,31 +81,37 @@ class PushCallbackAction extends Action $challenge = $this->arg('hub_challenge'); $lease_seconds = $this->arg('hub_lease_seconds'); $verify_token = $this->arg('hub_verify_token'); - + if ($mode != 'subscribe' && $mode != 'unsubscribe') { - common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback with mode \"$mode\""); - throw new ServerException("Bogus hub callback: bad mode", 404); + throw new ClientException("Bad hub.mode $mode", 404); } - + $feedsub = FeedSub::staticGet('uri', $topic); if (!$feedsub) { - common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback for unknown feed $topic"); - throw new ServerException("Bogus hub callback: unknown feed", 404); + throw new ClientException("Bad hub.topic feed $topic", 404); } if ($feedsub->verify_token !== $verify_token) { - common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback with bad token \"$verify_token\" for feed $topic"); - throw new ServerException("Bogus hub callback: bad token", 404); + throw new ClientException("Bad hub.verify_token $token for $topic", 404); } - if ($mode != $feedsub->sub_state) { - common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback with bad mode \"$mode\" for feed $topic in state \"{$feedsub->sub_state}\""); - throw new ServerException("Bogus hub callback: mode doesn't match subscription state.", 404); + if ($mode == 'subscribe') { + // We may get re-sub requests legitimately. + if ($feedsub->sub_state != 'subscribe' && $feedsub->sub_state != 'active') { + throw new ClientException("Unexpected subscribe request for $topic.", 404); + } + } else { + if ($feedsub->sub_state != 'unsubscribe') { + throw new ClientException("Unexpected unsubscribe request for $topic.", 404); + } } - // OK! if ($mode == 'subscribe') { - common_log(LOG_INFO, __METHOD__ . ': sub confirmed'); + if ($feedsub->sub_state == 'active') { + common_log(LOG_INFO, __METHOD__ . ': sub update confirmed'); + } else { + common_log(LOG_INFO, __METHOD__ . ': sub confirmed'); + } $feedsub->confirmSubscribe($lease_seconds); } else { common_log(LOG_INFO, __METHOD__ . ": unsub confirmed; deleting sub record for $topic"); diff --git a/plugins/OStatus/actions/pushhub.php b/plugins/OStatus/actions/pushhub.php index 19599d815..f33690bc4 100644 --- a/plugins/OStatus/actions/pushhub.php +++ b/plugins/OStatus/actions/pushhub.php @@ -59,102 +59,121 @@ class PushHubAction extends Action $mode = $this->trimmed('hub.mode'); switch ($mode) { case "subscribe": - $this->subscribe(); - break; case "unsubscribe": - $this->unsubscribe(); + $this->subunsub($mode); break; case "publish": - throw new ServerException("Publishing outside feeds not supported.", 400); + throw new ClientException("Publishing outside feeds not supported.", 400); default: - throw new ServerException("Unrecognized mode '$mode'.", 400); + throw new ClientException("Unrecognized mode '$mode'.", 400); } } /** - * Process a PuSH feed subscription request. + * Process a request for a new or modified PuSH feed subscription. + * If asynchronous verification is requested, updates won't be saved immediately. * * HTTP return codes: * 202 Accepted - request saved and awaiting verification * 204 No Content - already subscribed - * 403 Forbidden - rejecting this (not specifically spec'd) + * 400 Bad Request - rejecting this (not specifically spec'd) */ - function subscribe() + function subunsub($mode) { - $feed = $this->argUrl('hub.topic'); $callback = $this->argUrl('hub.callback'); - $token = $this->arg('hub.verify_token', null); - common_log(LOG_DEBUG, __METHOD__ . ": checking sub'd to $feed $callback"); - if ($this->getSub($feed, $callback)) { - // Already subscribed; return 204 per spec. - header('HTTP/1.1 204 No Content'); - common_log(LOG_DEBUG, __METHOD__ . ': already subscribed'); - return; + $topic = $this->argUrl('hub.topic'); + if (!$this->recognizedFeed($topic)) { + throw new ClientException("Unsupported hub.topic $topic; this hub only serves local user and group Atom feeds."); } - common_log(LOG_DEBUG, __METHOD__ . ': setting up'); - $sub = new HubSub(); - $sub->topic = $feed; - $sub->callback = $callback; - $sub->secret = $this->arg('hub.secret', null); - if (strlen($sub->secret) > 200) { - throw new ClientException("hub.secret must be no longer than 200 chars", 400); + $verify = $this->arg('hub.verify'); // @fixme may be multiple + if ($verify != 'sync' && $verify != 'async') { + throw new ClientException("Invalid hub.verify $verify; must be sync or async."); } - $sub->setLease(intval($this->arg('hub.lease_seconds'))); - // @fixme check for feeds we don't manage - // @fixme check the verification mode, might want a return immediately? + $lease = $this->arg('hub.lease_seconds', null); + if ($mode == 'subscribe' && $lease != '' && !preg_match('/^\d+$/', $lease)) { + throw new ClientException("Invalid hub.lease $lease; must be empty or positive integer."); + } + + $token = $this->arg('hub.verify_token', null); - common_log(LOG_DEBUG, __METHOD__ . ': inserting'); - $ok = $sub->insert(); - - if (!$ok) { - throw new ServerException("Failed to save subscription record", 500); + $secret = $this->arg('hub.secret', null); + if ($secret != '' && strlen($secret) >= 200) { + throw new ClientException("Invalid hub.secret $secret; must be under 200 bytes."); } - // @fixme check errors ;) + $sub = HubSub::staticGet($sub->topic, $sub->callback); + if (!$sub) { + // Creating a new one! + $sub = new HubSub(); + $sub->topic = $topic; + $sub->callback = $callback; + } + if ($mode == 'subscribe') { + if ($secret) { + $sub->secret = $secret; + } + if ($lease) { + $sub->setLease(intval($lease)); + } + } - $data = array('sub' => $sub, 'mode' => 'subscribe', 'token' => $token); - $qm = QueueManager::get(); - $qm->enqueue($data, 'hubverify'); - - header('HTTP/1.1 202 Accepted'); - common_log(LOG_DEBUG, __METHOD__ . ': done'); + if (!common_config('queue', 'enabled')) { + // Won't be able to background it. + $verify = 'sync'; + } + if ($verify == 'async') { + $sub->scheduleVerify($mode, $token); + header('HTTP/1.1 202 Accepted'); + } else { + $sub->verify($mode, $token); + header('HTTP/1.1 204 No Content'); + } } /** - * Process a PuSH feed unsubscription request. - * - * HTTP return codes: - * 202 Accepted - request saved and awaiting verification - * 204 No Content - already subscribed - * 400 Bad Request - invalid params or rejected feed + * Check whether the given URL represents one of our canonical + * user or group Atom feeds. * - * @fixme background this + * @param string $feed URL + * @return boolean true if it matches */ - function unsubscribe() + function recognizedFeed($feed) { - $feed = $this->argUrl('hub.topic'); - $callback = $this->argUrl('hub.callback'); - $sub = $this->getSub($feed, $callback); - - if ($sub) { - $token = $this->arg('hub.verify_token', null); - if ($sub->verify('unsubscribe', $token)) { - $sub->delete(); - common_log(LOG_INFO, "PuSH unsubscribed $feed for $callback"); - } else { - throw new ServerException("Failed PuSH unsubscription: verification failed! $feed for $callback"); + $matches = array(); + if (preg_match('!/(\d+)\.atom$!', $feed, $matches)) { + $id = $matches[1]; + $params = array('id' => $id, 'format' => 'atom'); + $userFeed = common_local_url('ApiTimelineUser', $params); + $groupFeed = common_local_url('ApiTimelineGroup', $params); + + if ($feed == $userFeed) { + $user = User::staticGet('id', $id); + if (!$user) { + throw new ClientException("Invalid hub.topic $feed; user doesn't exist."); + } else { + return true; + } } - } else { - throw new ServerException("Failed PuSH unsubscription: not subscribed! $feed for $callback"); + if ($feed == $groupFeed) { + $user = User_group::staticGet('id', $id); + if (!$user) { + throw new ClientException("Invalid hub.topic $feed; group doesn't exist."); + } else { + return true; + } + } + common_log(LOG_DEBUG, "Not a user or group feed? $feed $userFeed $groupFeed"); } + common_log(LOG_DEBUG, "LOST $feed"); + return false; } /** * Grab and validate a URL from POST parameters. - * @throws ServerException for malformed or non-http/https URLs + * @throws ClientException for malformed or non-http/https URLs */ protected function argUrl($arg) { @@ -164,7 +183,7 @@ class PushHubAction extends Action if (Validate::uri($url, $params)) { return $url; } else { - throw new ServerException("Invalid URL passed for $arg: '$url'", 400); + throw new ClientException("Invalid URL passed for $arg: '$url'"); } } diff --git a/plugins/OStatus/classes/FeedSub.php b/plugins/OStatus/classes/FeedSub.php index 31241d3de..b848b6b1d 100644 --- a/plugins/OStatus/classes/FeedSub.php +++ b/plugins/OStatus/classes/FeedSub.php @@ -291,10 +291,9 @@ class FeedSub extends Memcached_DataObject $headers = array('Content-Type: application/x-www-form-urlencoded'); $post = array('hub.mode' => $mode, 'hub.callback' => $callback, - 'hub.verify' => 'async', + 'hub.verify' => 'sync', 'hub.verify_token' => $this->verify_token, 'hub.secret' => $this->secret, - //'hub.lease_seconds' => 0, 'hub.topic' => $this->uri); $client = new HTTPClient(); $response = $client->post($this->huburi, $headers, $post); @@ -317,8 +316,8 @@ class FeedSub extends Memcached_DataObject common_log(LOG_ERR, __METHOD__ . ": error \"{$e->getMessage()}\" hitting hub $this->huburi subscribing to $this->uri"); $orig = clone($this); - $this->verify_token = null; - $this->sub_state = null; + $this->verify_token = ''; + $this->sub_state = 'inactive'; $this->update($orig); unset($orig); @@ -343,7 +342,7 @@ class FeedSub extends Memcached_DataObject } else { $this->sub_end = null; } - $this->lastupdate = common_sql_now(); + $this->modified = common_sql_now(); return $this->update($original); } @@ -362,7 +361,7 @@ class FeedSub extends Memcached_DataObject $this->sub_state = ''; $this->sub_start = ''; $this->sub_end = ''; - $this->lastupdate = common_sql_now(); + $this->modified = common_sql_now(); return $this->update($original); } diff --git a/plugins/OStatus/classes/HubSub.php b/plugins/OStatus/classes/HubSub.php index a81de68e6..eae2928c3 100644 --- a/plugins/OStatus/classes/HubSub.php +++ b/plugins/OStatus/classes/HubSub.php @@ -30,11 +30,11 @@ class HubSub extends Memcached_DataObject public $topic; public $callback; public $secret; - public $challenge; public $lease; public $sub_start; public $sub_end; public $created; + public $modified; public /*static*/ function staticGet($topic, $callback) { @@ -61,11 +61,11 @@ class HubSub extends Memcached_DataObject 'topic' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, 'callback' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, 'secret' => DB_DATAOBJECT_STR, - 'challenge' => DB_DATAOBJECT_STR, 'lease' => DB_DATAOBJECT_INT, 'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, 'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, - 'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL); + 'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL, + 'modified' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL); } static function schemaDef() @@ -82,8 +82,6 @@ class HubSub extends Memcached_DataObject 255, false), new ColumnDef('secret', 'text', null, true), - new ColumnDef('challenge', 'varchar', - 32, true), new ColumnDef('lease', 'int', null, true), new ColumnDef('sub_start', 'datetime', @@ -91,6 +89,8 @@ class HubSub extends Memcached_DataObject new ColumnDef('sub_end', 'datetime', null, true), new ColumnDef('created', 'datetime', + null, false), + new ColumnDef('modified', 'datetime', null, false)); } @@ -148,84 +148,105 @@ class HubSub extends Memcached_DataObject } /** - * Send a verification ping to subscriber + * Schedule a future verification ping to the subscriber. + * If queues are disabled, will be immediate. + * + * @param string $mode 'subscribe' or 'unsubscribe' + * @param string $token hub.verify_token value, if provided by client + */ + function scheduleVerify($mode, $token=null, $retries=null) + { + if ($retries === null) { + $retries = intval(common_config('ostatus', 'hub_retries')); + } + $data = array('sub' => clone($this), + 'mode' => $mode, + 'token' => $token, + 'retries' => $retries); + $qm = QueueManager::get(); + $qm->enqueue($data, 'hubverify'); + } + + /** + * Send a verification ping to subscriber, and if confirmed apply the changes. + * This may create, update, or delete the database record. + * * @param string $mode 'subscribe' or 'unsubscribe' * @param string $token hub.verify_token value, if provided by client + * @throws ClientException on failure */ function verify($mode, $token=null) { assert($mode == 'subscribe' || $mode == 'unsubscribe'); - // Is this needed? data object fun... - $clone = clone($this); - $clone->challenge = common_good_rand(16); - $clone->update($this); - $this->challenge = $clone->challenge; - unset($clone); - + $challenge = common_good_rand(32); $params = array('hub.mode' => $mode, 'hub.topic' => $this->topic, - 'hub.challenge' => $this->challenge); + 'hub.challenge' => $challenge); if ($mode == 'subscribe') { $params['hub.lease_seconds'] = $this->lease; } if ($token !== null) { $params['hub.verify_token'] = $token; } - $url = $this->callback . '?' . http_build_query($params, '', '&'); // @fixme ugly urls - try { - $request = new HTTPClient(); - $response = $request->get($url); - $status = $response->getStatus(); - - if ($status >= 200 && $status < 300) { - $fail = false; - } else { - // @fixme how can we schedule a second attempt? - // Or should we? - $fail = "Returned HTTP $status"; - } - } catch (Exception $e) { - $fail = $e->getMessage(); + // Any existing query string parameters must be preserved + $url = $this->callback; + if (strpos('?', $url) !== false) { + $url .= '&'; + } else { + $url .= '?'; } - if ($fail) { - // @fixme how can we schedule a second attempt? - // or save a fail count? - // Or should we? - common_log(LOG_ERR, "Failed to verify $mode for $this->topic at $this->callback: $fail"); - return false; + $url .= http_build_query($params, '', '&'); + + $request = new HTTPClient(); + $response = $request->get($url); + $status = $response->getStatus(); + + if ($status >= 200 && $status < 300) { + common_log(LOG_INFO, "Verified $mode of $this->callback:$this->topic"); } else { - if ($mode == 'subscribe') { - // Establish or renew the subscription! - // This seems unnecessary... dataobject fun! - $clone = clone($this); - $clone->challenge = null; - $clone->setLease($this->lease); - $clone->update($this); - unset($clone); + throw new ClientException("Hub subscriber verification returned HTTP $status"); + } - $this->challenge = null; - $this->setLease($this->lease); - common_log(LOG_ERR, "Verified $mode of $this->callback:$this->topic for $this->lease seconds"); - } else if ($mode == 'unsubscribe') { - common_log(LOG_ERR, "Verified $mode of $this->callback:$this->topic"); - $this->delete(); + $old = HubSub::staticGet($this->topic, $this->callback); + if ($mode == 'subscribe') { + if ($old) { + $this->update($old); + } else { + $ok = $this->insert(); + } + } else if ($mode == 'unsubscribe') { + if ($old) { + $old->delete(); + } else { + // That's ok, we're already unsubscribed. } - return true; } } /** * Insert wrapper; transparently set the hash key from topic and callback columns. - * @return boolean success + * @return mixed success */ function insert() { $this->hashkey = self::hashkey($this->topic, $this->callback); + $this->created = common_sql_now(); + $this->modified = common_sql_now(); return parent::insert(); } + /** + * Update wrapper; transparently update modified column. + * @return boolean success + */ + function update($old=null) + { + $this->modified = common_sql_now(); + return parent::update($old); + } + /** * Schedule delivery of a 'fat ping' to the subscriber's callback * endpoint. If queues are disabled, this will run immediately. -- cgit v1.2.3-54-g00ecf From c36bdc1ba535dc3e2dc9098dbe40735b1955d96d Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Wed, 24 Feb 2010 20:36:36 +0000 Subject: - break OMB profile update pings to a background queue - add event hooks to profile update pings - send Salmon pings with custom update-profile event to OStatus subscribees and groups (subscribers will see it on your next post) - fix OStatus queues with overlong transport names, should work on DB queues now - Ostatus_profile::notifyActivity() and ::notifyDeferred() now can take XML, Notice, or Activity for convenience --- lib/activity.php | 3 ++ lib/profilequeuehandler.php | 48 ++++++++++++++++++++++++ lib/queuemanager.php | 3 ++ lib/util.php | 14 ++++--- plugins/OStatus/OStatusPlugin.php | 53 ++++++++++++++++++++++++-- plugins/OStatus/actions/pushcallback.php | 2 +- plugins/OStatus/classes/HubSub.php | 2 +- plugins/OStatus/classes/Ostatus_profile.php | 46 ++++++++++++++++++++--- plugins/OStatus/lib/hubconfqueuehandler.php | 54 +++++++++++++++++++++++++++ plugins/OStatus/lib/hubverifyqueuehandler.php | 54 --------------------------- plugins/OStatus/lib/ostatusqueuehandler.php | 22 +++-------- plugins/OStatus/lib/pushinputqueuehandler.php | 49 ------------------------ plugins/OStatus/lib/pushinqueuehandler.php | 49 ++++++++++++++++++++++++ plugins/OStatus/lib/salmonoutqueuehandler.php | 44 ---------------------- plugins/OStatus/lib/salmonqueuehandler.php | 44 ++++++++++++++++++++++ 15 files changed, 308 insertions(+), 179 deletions(-) create mode 100644 lib/profilequeuehandler.php create mode 100644 plugins/OStatus/lib/hubconfqueuehandler.php delete mode 100644 plugins/OStatus/lib/hubverifyqueuehandler.php delete mode 100644 plugins/OStatus/lib/pushinputqueuehandler.php create mode 100644 plugins/OStatus/lib/pushinqueuehandler.php delete mode 100644 plugins/OStatus/lib/salmonoutqueuehandler.php create mode 100644 plugins/OStatus/lib/salmonqueuehandler.php (limited to 'plugins/OStatus/actions/pushcallback.php') diff --git a/lib/activity.php b/lib/activity.php index fa4ae0274..33932ad0e 100644 --- a/lib/activity.php +++ b/lib/activity.php @@ -691,6 +691,9 @@ class ActivityVerb const UNFAVORITE = 'http://ostatus.org/schema/1.0/unfavorite'; const UNFOLLOW = 'http://ostatus.org/schema/1.0/unfollow'; const LEAVE = 'http://ostatus.org/schema/1.0/leave'; + + // For simple profile-update pings; no content to share. + const UPDATE_PROFILE = 'http://ostatus.org/schema/1.0/update-profile'; } class ActivityContext diff --git a/lib/profilequeuehandler.php b/lib/profilequeuehandler.php new file mode 100644 index 000000000..e8a00aef3 --- /dev/null +++ b/lib/profilequeuehandler.php @@ -0,0 +1,48 @@ +. + */ + +/** + * @package QueueHandler + * @maintainer Brion Vibber + */ + +class ProfileQueueHandler extends QueueHandler +{ + + function transport() + { + return 'profile'; + } + + function handle($profile) + { + if (!($profile instanceof Profile)) { + common_log(LOG_ERR, "Got a bogus profile, not broadcasting"); + return true; + } + + if (Event::handle('StartBroadcastProfile', array($profile))) { + require_once(INSTALLDIR.'/lib/omb.php'); + omb_broadcast_profile($profile); + } + Event::handle('EndBroadcastProfile', array($profile)); + return true; + } + +} diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 8f8c8f133..9fdc80110 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -262,6 +262,9 @@ abstract class QueueManager extends IoManager $this->connect('sms', 'SmsQueueHandler'); } + // Broadcasting profile updates to OMB remote subscribers + $this->connect('profile', 'ProfileQueueHandler'); + // XMPP output handlers... if (common_config('xmpp', 'enabled')) { // Delivery prep, read by queuedaemon.php: diff --git a/lib/util.php b/lib/util.php index 7fb2c6c4b..9354431f2 100644 --- a/lib/util.php +++ b/lib/util.php @@ -1119,12 +1119,16 @@ function common_enqueue_notice($notice) return true; } -function common_broadcast_profile($profile) +/** + * Broadcast profile updates to OMB and other remote subscribers. + * + * Since this may be slow with a lot of subscribers or bad remote sites, + * this is run through the background queues if possible. + */ +function common_broadcast_profile(Profile $profile) { - // XXX: optionally use a queue system like http://code.google.com/p/microapps/wiki/NQDQ - require_once(INSTALLDIR.'/lib/omb.php'); - omb_broadcast_profile($profile); - // XXX: Other broadcasts...? + $qm = QueueManager::get(); + $qm->enqueue($profile, "profile"); return true; } diff --git a/plugins/OStatus/OStatusPlugin.php b/plugins/OStatus/OStatusPlugin.php index 9376c048d..90abe034d 100644 --- a/plugins/OStatus/OStatusPlugin.php +++ b/plugins/OStatus/OStatusPlugin.php @@ -82,14 +82,14 @@ class OStatusPlugin extends Plugin $qm->connect('ostatus', 'OStatusQueueHandler'); // Outgoing from our internal PuSH hub - $qm->connect('hubverify', 'HubVerifyQueueHandler'); + $qm->connect('hubconf', 'HubConfQueueHandler'); $qm->connect('hubout', 'HubOutQueueHandler'); // Outgoing Salmon replies (when we don't need a return value) - $qm->connect('salmonout', 'SalmonOutQueueHandler'); + $qm->connect('salmon', 'SalmonQueueHandler'); // Incoming from a foreign PuSH hub - $qm->connect('pushinput', 'PushInputQueueHandler'); + $qm->connect('pushin', 'PushInQueueHandler'); return true; } @@ -656,4 +656,51 @@ class OStatusPlugin extends Plugin return true; } + + /** + * Ping remote profiles with updates to this profile. + * Salmon pings are queued for background processing. + */ + function onEndBroadcastProfile(Profile $profile) + { + $user = User::staticGet('id', $profile->id); + + // Find foreign accounts I'm subscribed to that support Salmon pings. + // + // @fixme we could run updates through the PuSH feed too, + // in which case we can skip Salmon pings to folks who + // are also subscribed to me. + $sql = "SELECT * FROM ostatus_profile " . + "WHERE profile_id IN " . + "(SELECT subscribed FROM subscription WHERE subscriber=%d) " . + "OR group_id IN " . + "(SELECT group_id FROM group_member WHERE profile_id=%d)"; + $oprofile = new Ostatus_profile(); + $oprofile->query(sprintf($sql, $profile->id, $profile->id)); + + if ($oprofile->N == 0) { + common_log(LOG_DEBUG, "No OStatus remote subscribees for $profile->nickname"); + return true; + } + + $act = new Activity(); + + $act->verb = ActivityVerb::UPDATE_PROFILE; + $act->id = TagURI::mint('update-profile:%d:%s', + $profile->id, + common_date_iso8601(time())); + $act->time = time(); + $act->title = _m("Profile update"); + $act->content = sprintf(_m("%s has updated their profile page."), + $profile->getBestName()); + + $act->actor = ActivityObject::fromProfile($profile); + $act->object = $act->actor; + + while ($oprofile->fetch()) { + $oprofile->notifyDeferred($act); + } + + return true; + } } diff --git a/plugins/OStatus/actions/pushcallback.php b/plugins/OStatus/actions/pushcallback.php index 4184f0e0c..9a2067b8c 100644 --- a/plugins/OStatus/actions/pushcallback.php +++ b/plugins/OStatus/actions/pushcallback.php @@ -68,7 +68,7 @@ class PushCallbackAction extends Action 'post' => $post, 'hmac' => $hmac); $qm = QueueManager::get(); - $qm->enqueue($data, 'pushinput'); + $qm->enqueue($data, 'pushin'); } /** diff --git a/plugins/OStatus/classes/HubSub.php b/plugins/OStatus/classes/HubSub.php index eae2928c3..1ac181fee 100644 --- a/plugins/OStatus/classes/HubSub.php +++ b/plugins/OStatus/classes/HubSub.php @@ -164,7 +164,7 @@ class HubSub extends Memcached_DataObject 'token' => $token, 'retries' => $retries); $qm = QueueManager::get(); - $qm->enqueue($data, 'hubverify'); + $qm->enqueue($data, 'hubconf'); } /** diff --git a/plugins/OStatus/classes/Ostatus_profile.php b/plugins/OStatus/classes/Ostatus_profile.php index 9f9efb96e..61505206e 100644 --- a/plugins/OStatus/classes/Ostatus_profile.php +++ b/plugins/OStatus/classes/Ostatus_profile.php @@ -431,21 +431,57 @@ class Ostatus_profile extends Memcached_DataObject return false; } - public function notifyActivity($activity) + /** + * Send a Salmon notification ping immediately, and confirm that we got + * an acceptable response from the remote site. + * + * @param mixed $entry XML string, Notice, or Activity + * @return boolean success + */ + public function notifyActivity($entry) { if ($this->salmonuri) { + $salmon = new Salmon(); + return $salmon->post($this->salmonuri, $this->notifyPrepXml($entry)); + } - $xml = '' . - $activity->asString(true); + return false; + } - $salmon = new Salmon(); // ? + /** + * Queue a Salmon notification for later. If queues are disabled we'll + * send immediately but won't get the return value. + * + * @param mixed $entry XML string, Notice, or Activity + * @return boolean success + */ + public function notifyDeferred($entry) + { + if ($this->salmonuri) { + $data = array('salmonuri' => $this->salmonuri, + 'entry' => $this->notifyPrepXml($entry)); - return $salmon->post($this->salmonuri, $xml); + $qm = QueueManager::get(); + return $qm->enqueue($data, 'salmon'); } return false; } + protected function notifyPrepXml($entry) + { + $preamble = ''; + if (is_string($entry)) { + return $entry; + } else if ($entry instanceof Activity) { + return $preamble . $entry->asString(true); + } else if ($entry instanceof Notice) { + return $preamble . $entry->asAtomEntry(true, true); + } else { + throw new ServerException("Invalid type passed to Ostatus_profile::notify; must be XML string or Activity entry"); + } + } + function getBestName() { if ($this->isGroup()) { diff --git a/plugins/OStatus/lib/hubconfqueuehandler.php b/plugins/OStatus/lib/hubconfqueuehandler.php new file mode 100644 index 000000000..c8e0b72fe --- /dev/null +++ b/plugins/OStatus/lib/hubconfqueuehandler.php @@ -0,0 +1,54 @@ +. + */ + +/** + * Send a PuSH subscription verification from our internal hub. + * @package Hub + * @author Brion Vibber + */ +class HubConfQueueHandler extends QueueHandler +{ + function transport() + { + return 'hubconf'; + } + + function handle($data) + { + $sub = $data['sub']; + $mode = $data['mode']; + $token = $data['token']; + + assert($sub instanceof HubSub); + assert($mode === 'subscribe' || $mode === 'unsubscribe'); + + common_log(LOG_INFO, __METHOD__ . ": $mode $sub->callback $sub->topic"); + try { + $sub->verify($mode, $token); + } catch (Exception $e) { + common_log(LOG_ERR, "Failed PuSH $mode verify to $sub->callback for $sub->topic: " . + $e->getMessage()); + // @fixme schedule retry? + // @fixme just kill it? + } + + return true; + } +} + diff --git a/plugins/OStatus/lib/hubverifyqueuehandler.php b/plugins/OStatus/lib/hubverifyqueuehandler.php deleted file mode 100644 index 7ce9e1431..000000000 --- a/plugins/OStatus/lib/hubverifyqueuehandler.php +++ /dev/null @@ -1,54 +0,0 @@ -. - */ - -/** - * Send a PuSH subscription verification from our internal hub. - * @package Hub - * @author Brion Vibber - */ -class HubVerifyQueueHandler extends QueueHandler -{ - function transport() - { - return 'hubverify'; - } - - function handle($data) - { - $sub = $data['sub']; - $mode = $data['mode']; - $token = $data['token']; - - assert($sub instanceof HubSub); - assert($mode === 'subscribe' || $mode === 'unsubscribe'); - - common_log(LOG_INFO, __METHOD__ . ": $mode $sub->callback $sub->topic"); - try { - $sub->verify($mode, $token); - } catch (Exception $e) { - common_log(LOG_ERR, "Failed PuSH $mode verify to $sub->callback for $sub->topic: " . - $e->getMessage()); - // @fixme schedule retry? - // @fixme just kill it? - } - - return true; - } -} - diff --git a/plugins/OStatus/lib/ostatusqueuehandler.php b/plugins/OStatus/lib/ostatusqueuehandler.php index c1e50bffa..0da85600f 100644 --- a/plugins/OStatus/lib/ostatusqueuehandler.php +++ b/plugins/OStatus/lib/ostatusqueuehandler.php @@ -83,23 +83,11 @@ class OStatusQueueHandler extends QueueHandler function pingReply($oprofile) { if ($this->user) { - if (!empty($oprofile->salmonuri)) { - // For local posts, send a Salmon ping to the mentioned - // remote user or group. - // @fixme as an optimization we can skip this if the - // remote profile is subscribed to the author. - - common_log(LOG_INFO, "Prepping to send notice '{$this->notice->uri}' to remote profile '{$oprofile->uri}'."); - - $xml = ''; - $xml .= $this->notice->asAtomEntry(true, true); - - $data = array('salmonuri' => $oprofile->salmonuri, - 'entry' => $xml); - - $qm = QueueManager::get(); - $qm->enqueue($data, 'salmonout'); - } + // For local posts, send a Salmon ping to the mentioned + // remote user or group. + // @fixme as an optimization we can skip this if the + // remote profile is subscribed to the author. + $oprofile->notifyDeferred($this->notice); } } diff --git a/plugins/OStatus/lib/pushinputqueuehandler.php b/plugins/OStatus/lib/pushinputqueuehandler.php deleted file mode 100644 index cbd9139b5..000000000 --- a/plugins/OStatus/lib/pushinputqueuehandler.php +++ /dev/null @@ -1,49 +0,0 @@ -. - */ - -/** - * Process a feed distribution POST from a PuSH hub. - * @package FeedSub - * @author Brion Vibber - */ - -class PushInputQueueHandler extends QueueHandler -{ - function transport() - { - return 'pushinput'; - } - - function handle($data) - { - assert(is_array($data)); - - $feedsub_id = $data['feedsub_id']; - $post = $data['post']; - $hmac = $data['hmac']; - - $feedsub = FeedSub::staticGet('id', $feedsub_id); - if ($feedsub) { - $feedsub->receive($post, $hmac); - } else { - common_log(LOG_ERR, "Discarding POST to unknown feed subscription id $feedsub_id"); - } - return true; - } -} diff --git a/plugins/OStatus/lib/pushinqueuehandler.php b/plugins/OStatus/lib/pushinqueuehandler.php new file mode 100644 index 000000000..a90f52df2 --- /dev/null +++ b/plugins/OStatus/lib/pushinqueuehandler.php @@ -0,0 +1,49 @@ +. + */ + +/** + * Process a feed distribution POST from a PuSH hub. + * @package FeedSub + * @author Brion Vibber + */ + +class PushInQueueHandler extends QueueHandler +{ + function transport() + { + return 'pushin'; + } + + function handle($data) + { + assert(is_array($data)); + + $feedsub_id = $data['feedsub_id']; + $post = $data['post']; + $hmac = $data['hmac']; + + $feedsub = FeedSub::staticGet('id', $feedsub_id); + if ($feedsub) { + $feedsub->receive($post, $hmac); + } else { + common_log(LOG_ERR, "Discarding POST to unknown feed subscription id $feedsub_id"); + } + return true; + } +} diff --git a/plugins/OStatus/lib/salmonoutqueuehandler.php b/plugins/OStatus/lib/salmonoutqueuehandler.php deleted file mode 100644 index 536ff94af..000000000 --- a/plugins/OStatus/lib/salmonoutqueuehandler.php +++ /dev/null @@ -1,44 +0,0 @@ -. - */ - -/** - * Send a Salmon notification in the background. - * @package OStatusPlugin - * @author Brion Vibber - */ -class SalmonOutQueueHandler extends QueueHandler -{ - function transport() - { - return 'salmonout'; - } - - function handle($data) - { - assert(is_array($data)); - assert(is_string($data['salmonuri'])); - assert(is_string($data['entry'])); - - $salmon = new Salmon(); - $salmon->post($data['salmonuri'], $data['entry']); - - // @fixme detect failure and attempt to resend - return true; - } -} diff --git a/plugins/OStatus/lib/salmonqueuehandler.php b/plugins/OStatus/lib/salmonqueuehandler.php new file mode 100644 index 000000000..aa97018dc --- /dev/null +++ b/plugins/OStatus/lib/salmonqueuehandler.php @@ -0,0 +1,44 @@ +. + */ + +/** + * Send a Salmon notification in the background. + * @package OStatusPlugin + * @author Brion Vibber + */ +class SalmonQueueHandler extends QueueHandler +{ + function transport() + { + return 'salmon'; + } + + function handle($data) + { + assert(is_array($data)); + assert(is_string($data['salmonuri'])); + assert(is_string($data['entry'])); + + $salmon = new Salmon(); + $salmon->post($data['salmonuri'], $data['entry']); + + // @fixme detect failure and attempt to resend + return true; + } +} -- cgit v1.2.3-54-g00ecf