From 0dac13d197248bf24ea51cb7911d32286764c0c8 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Thu, 18 Feb 2010 21:22:21 +0000 Subject: OStatus refactoring to clean up profile vs feed and fix up subscription issues. PuSH subscription maintenance broken back out to FeedSub, letting Ostatus_profile deal with the profile level (user or group, with unique id URI) --- plugins/OStatus/classes/FeedSub.php | 443 ++++++++++++++++++++++++++++++++++++ 1 file changed, 443 insertions(+) create mode 100644 plugins/OStatus/classes/FeedSub.php (limited to 'plugins/OStatus/classes/FeedSub.php') diff --git a/plugins/OStatus/classes/FeedSub.php b/plugins/OStatus/classes/FeedSub.php new file mode 100644 index 000000000..dc2c0b710 --- /dev/null +++ b/plugins/OStatus/classes/FeedSub.php @@ -0,0 +1,443 @@ +. + */ + +/** + * @package OStatusPlugin + * @maintainer Brion Vibber + */ + +/* +PuSH subscription flow: + + $profile->subscribe() + generate random verification token + save to verify_token + sends a sub request to the hub... + + main/push/callback + hub sends confirmation back to us via GET + We verify the request, then echo back the challenge. + On our end, we save the time we subscribed and the lease expiration + + main/push/callback + hub sends us updates via POST + +*/ + +class FeedDBException extends FeedSubException +{ + public $obj; + + function __construct($obj) + { + parent::__construct('Database insert failure'); + $this->obj = $obj; + } +} + +/** + * FeedSub handles low-level PubHubSubbub (PuSH) subscriptions. + * Higher-level behavior building OStatus stuff on top is handled + * under Ostatus_profile. + */ +class FeedSub extends Memcached_DataObject +{ + public $__table = 'feedsub'; + + public $id; + public $feeduri; + + // PuSH subscription data + public $huburi; + public $secret; + public $verify_token; + public $sub_state; // subscribe, active, unsubscribe, inactive + public $sub_start; + public $sub_end; + public $last_update; + + public $created; + public $modified; + + public /*static*/ function staticGet($k, $v=null) + { + return parent::staticGet(__CLASS__, $k, $v); + } + + /** + * return table definition for DB_DataObject + * + * DB_DataObject needs to know something about the table to manipulate + * instances. This method provides all the DB_DataObject needs to know. + * + * @return array array of column definitions + */ + + function table() + { + return array('id' => DB_DATAOBJECT_INT + DB_DATAOBJECT_NOTNULL, + 'uri' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, + 'huburi' => DB_DATAOBJECT_STR, + 'secret' => DB_DATAOBJECT_STR, + 'verify_token' => DB_DATAOBJECT_STR, + 'sub_state' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, + 'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, + 'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, + 'last_update' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL, + 'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL, + 'modified' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL); + } + + static function schemaDef() + { + return array(new ColumnDef('id', 'integer', + /*size*/ null, + /*nullable*/ false, + /*key*/ 'PRI', + /*default*/ '0', + /*extra*/ null, + /*auto_increment*/ true), + new ColumnDef('uri', 'varchar', + 255, false, 'UNI'), + new ColumnDef('huburi', 'text', + null, true), + new ColumnDef('verify_token', 'text', + null, true), + new ColumnDef('secret', 'text', + null, true), + new ColumnDef('sub_state', "enum('subscribe','active','unsubscribe','inactive')", + null, false), + new ColumnDef('sub_start', 'datetime', + null, true), + new ColumnDef('sub_end', 'datetime', + null, true), + new ColumnDef('last_update', 'datetime', + null, false), + new ColumnDef('created', 'datetime', + null, false), + new ColumnDef('modified', 'datetime', + null, false)); + } + + /** + * return key definitions for DB_DataObject + * + * DB_DataObject needs to know about keys that the table has; this function + * defines them. + * + * @return array key definitions + */ + + function keys() + { + return array_keys($this->keyTypes()); + } + + /** + * return key definitions for Memcached_DataObject + * + * Our caching system uses the same key definitions, but uses a different + * method to get them. + * + * @return array key definitions + */ + + function keyTypes() + { + return array('id' => 'K', 'uri' => 'U'); + } + + function sequenceKey() + { + return array('id', true, false); + } + + /** + * Fetch the StatusNet-side profile for this feed + * @return Profile + */ + public function localProfile() + { + if ($this->profile_id) { + return Profile::staticGet('id', $this->profile_id); + } + return null; + } + + /** + * Fetch the StatusNet-side profile for this feed + * @return Profile + */ + public function localGroup() + { + if ($this->group_id) { + return User_group::staticGet('id', $this->group_id); + } + return null; + } + + /** + * @param string $feeduri + * @return FeedSub + * @throws FeedSubException if feed is invalid or lacks PuSH setup + */ + public static function ensureFeed($feeduri) + { + $current = self::staticGet('uri', $feeduri); + if ($current) { + return $current; + } + + $discover = new FeedDiscovery(); + $discover->discoverFromFeedURL($feeduri); + + $huburi = $discover->getAtomLink('hub'); + if (!$huburi) { + throw new FeedSubNoHubException(); + } + + $feedsub = new FeedSub(); + $feedsub->uri = $feeduri; + $feedsub->huburi = $huburi; + $feedsub->sub_state = 'inactive'; + + $feedsub->created = common_sql_now(); + $feedsub->modified = common_sql_now(); + + $result = $feedsub->insert(); + if (empty($result)) { + throw new FeedDBException($feedsub); + } + + return $feedsub; + } + + /** + * Send a subscription request to the hub for this feed. + * The hub will later send us a confirmation POST to /main/push/callback. + * + * @return bool true on success, false on failure + * @throws ServerException if feed state is not valid + */ + public function subscribe($mode='subscribe') + { + if ($this->sub_state && $this->sub_state != 'inactive') { + throw new ServerException("Attempting to start PuSH subscription to feed in state $this->sub_state"); + } + if (empty($this->huburi)) { + if (common_config('feedsub', 'nohub')) { + // Fake it! We're just testing remote feeds w/o hubs. + return true; + } else { + throw new ServerException("Attempting to start PuSH subscription for feed with no hub"); + } + } + + return $this->doSubscribe('subscribe'); + } + + /** + * Send a PuSH unsubscription request to the hub for this feed. + * The hub will later send us a confirmation POST to /main/push/callback. + * + * @return bool true on success, false on failure + * @throws ServerException if feed state is not valid + */ + public function unsubscribe() { + if ($this->sub_state != 'active') { + throw new ServerException("Attempting to end PuSH subscription to feed in state $this->sub_state"); + } + if (empty($this->huburi)) { + if (common_config('feedsub', 'nohub')) { + // Fake it! We're just testing remote feeds w/o hubs. + return true; + } else { + throw new ServerException("Attempting to end PuSH subscription for feed with no hub"); + } + } + + return $this->doSubscribe('unsubscribe'); + } + + protected function doSubscribe($mode) + { + $orig = clone($this); + $this->verify_token = common_good_rand(16); + if ($mode == 'subscribe') { + $this->secret = common_good_rand(32); + } + $this->sub_state = $mode; + $this->update($orig); + unset($orig); + + try { + $callback = common_local_url('pushcallback', array('feed' => $this->id)); + $headers = array('Content-Type: application/x-www-form-urlencoded'); + $post = array('hub.mode' => $mode, + 'hub.callback' => $callback, + 'hub.verify' => 'async', + 'hub.verify_token' => $this->verify_token, + 'hub.secret' => $this->secret, + //'hub.lease_seconds' => 0, + 'hub.topic' => $this->uri); + $client = new HTTPClient(); + $response = $client->post($this->huburi, $headers, $post); + $status = $response->getStatus(); + if ($status == 202) { + common_log(LOG_INFO, __METHOD__ . ': sub req ok, awaiting verification callback'); + return true; + } else if ($status == 204) { + common_log(LOG_INFO, __METHOD__ . ': sub req ok and verified'); + return true; + } else if ($status >= 200 && $status < 300) { + common_log(LOG_ERR, __METHOD__ . ": sub req returned unexpected HTTP $status: " . $response->getBody()); + return false; + } else { + common_log(LOG_ERR, __METHOD__ . ": sub req failed with HTTP $status: " . $response->getBody()); + return false; + } + } catch (Exception $e) { + // wtf! + common_log(LOG_ERR, __METHOD__ . ": error \"{$e->getMessage()}\" hitting hub $this->huburi subscribing to $this->uri"); + + $orig = clone($this); + $this->verify_token = null; + $this->sub_state = null; + $this->update($orig); + unset($orig); + + return false; + } + } + + /** + * Save PuSH subscription confirmation. + * Sets approximate lease start and end times and finalizes state. + * + * @param int $lease_seconds provided hub.lease_seconds parameter, if given + */ + public function confirmSubscribe($lease_seconds=0) + { + $original = clone($this); + + $this->sub_state = 'active'; + $this->sub_start = common_sql_date(time()); + if ($lease_seconds > 0) { + $this->sub_end = common_sql_date(time() + $lease_seconds); + } else { + $this->sub_end = null; + } + $this->lastupdate = common_sql_now(); + + return $this->update($original); + } + + /** + * Save PuSH unsubscription confirmation. + * Wipes active PuSH sub info and resets state. + */ + public function confirmUnsubscribe() + { + $original = clone($this); + + // @fixme these should all be null, but DB_DataObject doesn't save null values...????? + $this->verify_token = ''; + $this->secret = ''; + $this->sub_state = ''; + $this->sub_start = ''; + $this->sub_end = ''; + $this->lastupdate = common_sql_now(); + + return $this->update($original); + } + + /** + * Accept updates from a PuSH feed. If validated, this object and the + * feed (as a DOMDocument) will be passed to the StartFeedSubHandleFeed + * and EndFeedSubHandleFeed events for processing. + * + * @param string $post source of Atom or RSS feed + * @param string $hmac X-Hub-Signature header, if present + */ + public function receive($post, $hmac) + { + common_log(LOG_INFO, __METHOD__ . ": packet for \"$this->uri\"! $hmac $post"); + + if ($this->sub_state != 'active') { + common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH for inactive feed $this->uri (in state '$this->sub_state')"); + return; + } + + if ($post === '') { + common_log(LOG_ERR, __METHOD__ . ": ignoring empty post"); + return; + } + + if (!$this->validatePushSig($post, $hmac)) { + // Per spec we silently drop input with a bad sig, + // while reporting receipt to the server. + return; + } + + $feed = new DOMDocument(); + if (!$feed->loadXML($post)) { + // @fixme might help to include the err message + common_log(LOG_ERR, __METHOD__ . ": ignoring invalid XML"); + return; + } + + Event::handle('StartFeedSubReceive', array($this, $feed)); + Event::handle('EndFeedSubReceive', array($this, $feed)); + } + + /** + * Validate the given Atom chunk and HMAC signature against our + * shared secret that was set up at subscription time. + * + * If we don't have a shared secret, there should be no signature. + * If we we do, our the calculated HMAC should match theirs. + * + * @param string $post raw XML source as POSTed to us + * @param string $hmac X-Hub-Signature HTTP header value, or empty + * @return boolean true for a match + */ + protected function validatePushSig($post, $hmac) + { + if ($this->secret) { + if (preg_match('/^sha1=([0-9a-fA-F]{40})$/', $hmac, $matches)) { + $their_hmac = strtolower($matches[1]); + $our_hmac = hash_hmac('sha1', $post, $this->secret); + if ($their_hmac === $our_hmac) { + return true; + } + common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bad SHA-1 HMAC: got $their_hmac, expected $our_hmac"); + } else { + common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bogus HMAC '$hmac'"); + } + } else { + if (empty($hmac)) { + return true; + } else { + common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with unexpected HMAC '$hmac'"); + } + } + return false; + } + +} -- cgit v1.2.3-54-g00ecf From 114eb310ca4f53e09c98f2337850829a9036c133 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Fri, 19 Feb 2010 10:29:06 -0800 Subject: OStatus: fix up Salmon endpoint detection/saving, timestamp fixes. --- plugins/OStatus/actions/salmon.php | 2 +- plugins/OStatus/classes/FeedSub.php | 2 +- plugins/OStatus/classes/Ostatus_profile.php | 18 +++++++++++++----- 3 files changed, 15 insertions(+), 7 deletions(-) (limited to 'plugins/OStatus/classes/FeedSub.php') diff --git a/plugins/OStatus/actions/salmon.php b/plugins/OStatus/actions/salmon.php index e9d6015f4..2da9db9eb 100644 --- a/plugins/OStatus/actions/salmon.php +++ b/plugins/OStatus/actions/salmon.php @@ -142,7 +142,7 @@ class SalmonAction extends Action function ensureProfile() { $actor = $this->act->actor; - + common_log(LOG_DEBUG, "Received salmon bit: " . var_export($this->act, true)); if (empty($actor->id)) { throw new Exception("Received a salmon slap from unidentified actor."); } diff --git a/plugins/OStatus/classes/FeedSub.php b/plugins/OStatus/classes/FeedSub.php index dc2c0b710..bf9d063fa 100644 --- a/plugins/OStatus/classes/FeedSub.php +++ b/plugins/OStatus/classes/FeedSub.php @@ -99,7 +99,7 @@ class FeedSub extends Memcached_DataObject 'sub_state' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, 'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, 'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, - 'last_update' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL, + 'last_update' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, 'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL, 'modified' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL); } diff --git a/plugins/OStatus/classes/Ostatus_profile.php b/plugins/OStatus/classes/Ostatus_profile.php index e0cb467e5..319b76a66 100644 --- a/plugins/OStatus/classes/Ostatus_profile.php +++ b/plugins/OStatus/classes/Ostatus_profile.php @@ -328,7 +328,7 @@ class Ostatus_profile extends Memcached_DataObject $entry->element('id', null, $id); $entry->element('title', null, $text); $entry->element('summary', null, $text); - $entry->element('published', null, common_date_w3dtf(time())); + $entry->element('published', null, common_date_w3dtf(common_sql_now())); $entry->element('activity:verb', null, $verb); $entry->raw($actor->asAtomAuthor()); @@ -516,7 +516,7 @@ class Ostatus_profile extends Memcached_DataObject throw new FeedSubException('empty feed'); } $first = new Activity($entries->item(0), $discover->feed); - return self::ensureActorProfile($first, $feeduri); + return self::ensureActorProfile($first, $feeduri, $salmonuri); } /** @@ -598,13 +598,14 @@ class Ostatus_profile extends Memcached_DataObject * * @param Activity $activity * @param string $feeduri if we already know the canonical feed URI! + * @param string $salmonuri if we already know the salmon return channel URI * @return Ostatus_profile */ - public static function ensureActorProfile($activity, $feeduri=null) + public static function ensureActorProfile($activity, $feeduri=null, $salmonuri=null) { $profile = self::getActorProfile($activity); if (!$profile) { - $profile = self::createActorProfile($activity, $feeduri); + $profile = self::createActorProfile($activity, $feeduri, $salmonuri); } return $profile; } @@ -640,7 +641,7 @@ class Ostatus_profile extends Memcached_DataObject /** * @fixme validate stuff somewhere */ - protected static function createActorProfile($activity, $feeduri=null) + protected static function createActorProfile($activity, $feeduri=null, $salmonuri=null) { $actor = $activity->actor; $homeuri = self::getActorProfileURI($activity); @@ -674,10 +675,17 @@ class Ostatus_profile extends Memcached_DataObject $oprofile = new Ostatus_profile(); $oprofile->uri = $homeuri; if ($feeduri) { + // If we don't have these, we can look them up later. $oprofile->feeduri = $feeduri; + if ($salmonuri) { + $oprofile->salmonuri = $salmonuri; + } } $oprofile->profile_id = $profile->id; + $oprofile->created = common_sql_now(); + $oprofile->modified = common_sql_now(); + $ok = $oprofile->insert(); if ($ok) { $oprofile->updateAvatar($avatar); -- cgit v1.2.3-54-g00ecf From 5349aa420ed45c2f5bf773d10c7709826ae6babd Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Sun, 21 Feb 2010 13:40:59 -0800 Subject: OStatus feedsub fixlets: - actually udpate feedsub.last_update when we get a new PuSH update in - move incoming PuSH processing to a queue handler to minimize time spent before POST return, as recommended by PuSH spec. When queues are disabled this'll still be handled immediately. --- plugins/OStatus/OStatusPlugin.php | 4 +++ plugins/OStatus/actions/pushcallback.php | 9 +++-- plugins/OStatus/classes/FeedSub.php | 10 ++++++ plugins/OStatus/lib/pushinputqueuehandler.php | 49 +++++++++++++++++++++++++++ 4 files changed, 70 insertions(+), 2 deletions(-) create mode 100644 plugins/OStatus/lib/pushinputqueuehandler.php (limited to 'plugins/OStatus/classes/FeedSub.php') diff --git a/plugins/OStatus/OStatusPlugin.php b/plugins/OStatus/OStatusPlugin.php index b966661db..c5a2db3d8 100644 --- a/plugins/OStatus/OStatusPlugin.php +++ b/plugins/OStatus/OStatusPlugin.php @@ -78,9 +78,13 @@ class OStatusPlugin extends Plugin */ function onEndInitializeQueueManager(QueueManager $qm) { + // Outgoing from our internal PuSH hub $qm->connect('hubverify', 'HubVerifyQueueHandler'); $qm->connect('hubdistrib', 'HubDistribQueueHandler'); $qm->connect('hubout', 'HubOutQueueHandler'); + + // Incoming from a foreign PuSH hub + $qm->connect('pushinput', 'PushInputQueueHandler'); return true; } diff --git a/plugins/OStatus/actions/pushcallback.php b/plugins/OStatus/actions/pushcallback.php index 7e1227a66..9e976a80d 100644 --- a/plugins/OStatus/actions/pushcallback.php +++ b/plugins/OStatus/actions/pushcallback.php @@ -60,9 +60,14 @@ class PushCallbackAction extends Action $post = file_get_contents('php://input'); - // @fixme Queue this to a background process; we should return + // Queue this to a background process; we should return // as quickly as possible from a distribution POST. - $feedsub->receive($post, $hmac); + // If queues are disabled this'll process immediately. + $data = array('feedsub_id' => $feedsub->id, + 'post' => $post, + 'hmac' => $hmac); + $qm = QueueManager::get(); + $qm->enqueue($data, 'pushinput'); } /** diff --git a/plugins/OStatus/classes/FeedSub.php b/plugins/OStatus/classes/FeedSub.php index bf9d063fa..31241d3de 100644 --- a/plugins/OStatus/classes/FeedSub.php +++ b/plugins/OStatus/classes/FeedSub.php @@ -372,6 +372,12 @@ class FeedSub extends Memcached_DataObject * feed (as a DOMDocument) will be passed to the StartFeedSubHandleFeed * and EndFeedSubHandleFeed events for processing. * + * Not guaranteed to be running in an immediate POST context; may be run + * from a queue handler. + * + * Side effects: the feedsub record's lastupdate field will be updated + * to the current time (not published time) if we got a legit update. + * * @param string $post source of Atom or RSS feed * @param string $hmac X-Hub-Signature header, if present */ @@ -402,6 +408,10 @@ class FeedSub extends Memcached_DataObject return; } + $orig = clone($this); + $this->last_update = common_sql_now(); + $this->update($orig); + Event::handle('StartFeedSubReceive', array($this, $feed)); Event::handle('EndFeedSubReceive', array($this, $feed)); } diff --git a/plugins/OStatus/lib/pushinputqueuehandler.php b/plugins/OStatus/lib/pushinputqueuehandler.php new file mode 100644 index 000000000..cbd9139b5 --- /dev/null +++ b/plugins/OStatus/lib/pushinputqueuehandler.php @@ -0,0 +1,49 @@ +. + */ + +/** + * Process a feed distribution POST from a PuSH hub. + * @package FeedSub + * @author Brion Vibber + */ + +class PushInputQueueHandler extends QueueHandler +{ + function transport() + { + return 'pushinput'; + } + + function handle($data) + { + assert(is_array($data)); + + $feedsub_id = $data['feedsub_id']; + $post = $data['post']; + $hmac = $data['hmac']; + + $feedsub = FeedSub::staticGet('id', $feedsub_id); + if ($feedsub) { + $feedsub->receive($post, $hmac); + } else { + common_log(LOG_ERR, "Discarding POST to unknown feed subscription id $feedsub_id"); + } + return true; + } +} -- cgit v1.2.3-54-g00ecf From 78ca45c7a05dea911c58097a8c57be470dafee01 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Sun, 21 Feb 2010 14:46:26 -0800 Subject: OStatus PuSH fixes: - hub now defers subscription state updates until after verification, per spec - hub now supports synchronous verification when requested (if async is not requested after) - client now requests synchronous verification (it's a bit safer) - cleanup on subscription logging/error responses --- plugins/OStatus/actions/pushcallback.php | 34 +++++--- plugins/OStatus/actions/pushhub.php | 141 ++++++++++++++++++------------- plugins/OStatus/classes/FeedSub.php | 11 ++- plugins/OStatus/classes/HubSub.php | 123 ++++++++++++++++----------- 4 files changed, 177 insertions(+), 132 deletions(-) (limited to 'plugins/OStatus/classes/FeedSub.php') diff --git a/plugins/OStatus/actions/pushcallback.php b/plugins/OStatus/actions/pushcallback.php index 35c92c732..4184f0e0c 100644 --- a/plugins/OStatus/actions/pushcallback.php +++ b/plugins/OStatus/actions/pushcallback.php @@ -72,7 +72,7 @@ class PushCallbackAction extends Action } /** - * Handler for GET verification requests from the hub + * Handler for GET verification requests from the hub. */ function handleGet() { @@ -81,31 +81,37 @@ class PushCallbackAction extends Action $challenge = $this->arg('hub_challenge'); $lease_seconds = $this->arg('hub_lease_seconds'); $verify_token = $this->arg('hub_verify_token'); - + if ($mode != 'subscribe' && $mode != 'unsubscribe') { - common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback with mode \"$mode\""); - throw new ServerException("Bogus hub callback: bad mode", 404); + throw new ClientException("Bad hub.mode $mode", 404); } - + $feedsub = FeedSub::staticGet('uri', $topic); if (!$feedsub) { - common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback for unknown feed $topic"); - throw new ServerException("Bogus hub callback: unknown feed", 404); + throw new ClientException("Bad hub.topic feed $topic", 404); } if ($feedsub->verify_token !== $verify_token) { - common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback with bad token \"$verify_token\" for feed $topic"); - throw new ServerException("Bogus hub callback: bad token", 404); + throw new ClientException("Bad hub.verify_token $token for $topic", 404); } - if ($mode != $feedsub->sub_state) { - common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback with bad mode \"$mode\" for feed $topic in state \"{$feedsub->sub_state}\""); - throw new ServerException("Bogus hub callback: mode doesn't match subscription state.", 404); + if ($mode == 'subscribe') { + // We may get re-sub requests legitimately. + if ($feedsub->sub_state != 'subscribe' && $feedsub->sub_state != 'active') { + throw new ClientException("Unexpected subscribe request for $topic.", 404); + } + } else { + if ($feedsub->sub_state != 'unsubscribe') { + throw new ClientException("Unexpected unsubscribe request for $topic.", 404); + } } - // OK! if ($mode == 'subscribe') { - common_log(LOG_INFO, __METHOD__ . ': sub confirmed'); + if ($feedsub->sub_state == 'active') { + common_log(LOG_INFO, __METHOD__ . ': sub update confirmed'); + } else { + common_log(LOG_INFO, __METHOD__ . ': sub confirmed'); + } $feedsub->confirmSubscribe($lease_seconds); } else { common_log(LOG_INFO, __METHOD__ . ": unsub confirmed; deleting sub record for $topic"); diff --git a/plugins/OStatus/actions/pushhub.php b/plugins/OStatus/actions/pushhub.php index 19599d815..f33690bc4 100644 --- a/plugins/OStatus/actions/pushhub.php +++ b/plugins/OStatus/actions/pushhub.php @@ -59,102 +59,121 @@ class PushHubAction extends Action $mode = $this->trimmed('hub.mode'); switch ($mode) { case "subscribe": - $this->subscribe(); - break; case "unsubscribe": - $this->unsubscribe(); + $this->subunsub($mode); break; case "publish": - throw new ServerException("Publishing outside feeds not supported.", 400); + throw new ClientException("Publishing outside feeds not supported.", 400); default: - throw new ServerException("Unrecognized mode '$mode'.", 400); + throw new ClientException("Unrecognized mode '$mode'.", 400); } } /** - * Process a PuSH feed subscription request. + * Process a request for a new or modified PuSH feed subscription. + * If asynchronous verification is requested, updates won't be saved immediately. * * HTTP return codes: * 202 Accepted - request saved and awaiting verification * 204 No Content - already subscribed - * 403 Forbidden - rejecting this (not specifically spec'd) + * 400 Bad Request - rejecting this (not specifically spec'd) */ - function subscribe() + function subunsub($mode) { - $feed = $this->argUrl('hub.topic'); $callback = $this->argUrl('hub.callback'); - $token = $this->arg('hub.verify_token', null); - common_log(LOG_DEBUG, __METHOD__ . ": checking sub'd to $feed $callback"); - if ($this->getSub($feed, $callback)) { - // Already subscribed; return 204 per spec. - header('HTTP/1.1 204 No Content'); - common_log(LOG_DEBUG, __METHOD__ . ': already subscribed'); - return; + $topic = $this->argUrl('hub.topic'); + if (!$this->recognizedFeed($topic)) { + throw new ClientException("Unsupported hub.topic $topic; this hub only serves local user and group Atom feeds."); } - common_log(LOG_DEBUG, __METHOD__ . ': setting up'); - $sub = new HubSub(); - $sub->topic = $feed; - $sub->callback = $callback; - $sub->secret = $this->arg('hub.secret', null); - if (strlen($sub->secret) > 200) { - throw new ClientException("hub.secret must be no longer than 200 chars", 400); + $verify = $this->arg('hub.verify'); // @fixme may be multiple + if ($verify != 'sync' && $verify != 'async') { + throw new ClientException("Invalid hub.verify $verify; must be sync or async."); } - $sub->setLease(intval($this->arg('hub.lease_seconds'))); - // @fixme check for feeds we don't manage - // @fixme check the verification mode, might want a return immediately? + $lease = $this->arg('hub.lease_seconds', null); + if ($mode == 'subscribe' && $lease != '' && !preg_match('/^\d+$/', $lease)) { + throw new ClientException("Invalid hub.lease $lease; must be empty or positive integer."); + } + + $token = $this->arg('hub.verify_token', null); - common_log(LOG_DEBUG, __METHOD__ . ': inserting'); - $ok = $sub->insert(); - - if (!$ok) { - throw new ServerException("Failed to save subscription record", 500); + $secret = $this->arg('hub.secret', null); + if ($secret != '' && strlen($secret) >= 200) { + throw new ClientException("Invalid hub.secret $secret; must be under 200 bytes."); } - // @fixme check errors ;) + $sub = HubSub::staticGet($sub->topic, $sub->callback); + if (!$sub) { + // Creating a new one! + $sub = new HubSub(); + $sub->topic = $topic; + $sub->callback = $callback; + } + if ($mode == 'subscribe') { + if ($secret) { + $sub->secret = $secret; + } + if ($lease) { + $sub->setLease(intval($lease)); + } + } - $data = array('sub' => $sub, 'mode' => 'subscribe', 'token' => $token); - $qm = QueueManager::get(); - $qm->enqueue($data, 'hubverify'); - - header('HTTP/1.1 202 Accepted'); - common_log(LOG_DEBUG, __METHOD__ . ': done'); + if (!common_config('queue', 'enabled')) { + // Won't be able to background it. + $verify = 'sync'; + } + if ($verify == 'async') { + $sub->scheduleVerify($mode, $token); + header('HTTP/1.1 202 Accepted'); + } else { + $sub->verify($mode, $token); + header('HTTP/1.1 204 No Content'); + } } /** - * Process a PuSH feed unsubscription request. - * - * HTTP return codes: - * 202 Accepted - request saved and awaiting verification - * 204 No Content - already subscribed - * 400 Bad Request - invalid params or rejected feed + * Check whether the given URL represents one of our canonical + * user or group Atom feeds. * - * @fixme background this + * @param string $feed URL + * @return boolean true if it matches */ - function unsubscribe() + function recognizedFeed($feed) { - $feed = $this->argUrl('hub.topic'); - $callback = $this->argUrl('hub.callback'); - $sub = $this->getSub($feed, $callback); - - if ($sub) { - $token = $this->arg('hub.verify_token', null); - if ($sub->verify('unsubscribe', $token)) { - $sub->delete(); - common_log(LOG_INFO, "PuSH unsubscribed $feed for $callback"); - } else { - throw new ServerException("Failed PuSH unsubscription: verification failed! $feed for $callback"); + $matches = array(); + if (preg_match('!/(\d+)\.atom$!', $feed, $matches)) { + $id = $matches[1]; + $params = array('id' => $id, 'format' => 'atom'); + $userFeed = common_local_url('ApiTimelineUser', $params); + $groupFeed = common_local_url('ApiTimelineGroup', $params); + + if ($feed == $userFeed) { + $user = User::staticGet('id', $id); + if (!$user) { + throw new ClientException("Invalid hub.topic $feed; user doesn't exist."); + } else { + return true; + } } - } else { - throw new ServerException("Failed PuSH unsubscription: not subscribed! $feed for $callback"); + if ($feed == $groupFeed) { + $user = User_group::staticGet('id', $id); + if (!$user) { + throw new ClientException("Invalid hub.topic $feed; group doesn't exist."); + } else { + return true; + } + } + common_log(LOG_DEBUG, "Not a user or group feed? $feed $userFeed $groupFeed"); } + common_log(LOG_DEBUG, "LOST $feed"); + return false; } /** * Grab and validate a URL from POST parameters. - * @throws ServerException for malformed or non-http/https URLs + * @throws ClientException for malformed or non-http/https URLs */ protected function argUrl($arg) { @@ -164,7 +183,7 @@ class PushHubAction extends Action if (Validate::uri($url, $params)) { return $url; } else { - throw new ServerException("Invalid URL passed for $arg: '$url'", 400); + throw new ClientException("Invalid URL passed for $arg: '$url'"); } } diff --git a/plugins/OStatus/classes/FeedSub.php b/plugins/OStatus/classes/FeedSub.php index 31241d3de..b848b6b1d 100644 --- a/plugins/OStatus/classes/FeedSub.php +++ b/plugins/OStatus/classes/FeedSub.php @@ -291,10 +291,9 @@ class FeedSub extends Memcached_DataObject $headers = array('Content-Type: application/x-www-form-urlencoded'); $post = array('hub.mode' => $mode, 'hub.callback' => $callback, - 'hub.verify' => 'async', + 'hub.verify' => 'sync', 'hub.verify_token' => $this->verify_token, 'hub.secret' => $this->secret, - //'hub.lease_seconds' => 0, 'hub.topic' => $this->uri); $client = new HTTPClient(); $response = $client->post($this->huburi, $headers, $post); @@ -317,8 +316,8 @@ class FeedSub extends Memcached_DataObject common_log(LOG_ERR, __METHOD__ . ": error \"{$e->getMessage()}\" hitting hub $this->huburi subscribing to $this->uri"); $orig = clone($this); - $this->verify_token = null; - $this->sub_state = null; + $this->verify_token = ''; + $this->sub_state = 'inactive'; $this->update($orig); unset($orig); @@ -343,7 +342,7 @@ class FeedSub extends Memcached_DataObject } else { $this->sub_end = null; } - $this->lastupdate = common_sql_now(); + $this->modified = common_sql_now(); return $this->update($original); } @@ -362,7 +361,7 @@ class FeedSub extends Memcached_DataObject $this->sub_state = ''; $this->sub_start = ''; $this->sub_end = ''; - $this->lastupdate = common_sql_now(); + $this->modified = common_sql_now(); return $this->update($original); } diff --git a/plugins/OStatus/classes/HubSub.php b/plugins/OStatus/classes/HubSub.php index a81de68e6..eae2928c3 100644 --- a/plugins/OStatus/classes/HubSub.php +++ b/plugins/OStatus/classes/HubSub.php @@ -30,11 +30,11 @@ class HubSub extends Memcached_DataObject public $topic; public $callback; public $secret; - public $challenge; public $lease; public $sub_start; public $sub_end; public $created; + public $modified; public /*static*/ function staticGet($topic, $callback) { @@ -61,11 +61,11 @@ class HubSub extends Memcached_DataObject 'topic' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, 'callback' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, 'secret' => DB_DATAOBJECT_STR, - 'challenge' => DB_DATAOBJECT_STR, 'lease' => DB_DATAOBJECT_INT, 'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, 'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, - 'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL); + 'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL, + 'modified' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL); } static function schemaDef() @@ -82,8 +82,6 @@ class HubSub extends Memcached_DataObject 255, false), new ColumnDef('secret', 'text', null, true), - new ColumnDef('challenge', 'varchar', - 32, true), new ColumnDef('lease', 'int', null, true), new ColumnDef('sub_start', 'datetime', @@ -91,6 +89,8 @@ class HubSub extends Memcached_DataObject new ColumnDef('sub_end', 'datetime', null, true), new ColumnDef('created', 'datetime', + null, false), + new ColumnDef('modified', 'datetime', null, false)); } @@ -148,84 +148,105 @@ class HubSub extends Memcached_DataObject } /** - * Send a verification ping to subscriber + * Schedule a future verification ping to the subscriber. + * If queues are disabled, will be immediate. + * + * @param string $mode 'subscribe' or 'unsubscribe' + * @param string $token hub.verify_token value, if provided by client + */ + function scheduleVerify($mode, $token=null, $retries=null) + { + if ($retries === null) { + $retries = intval(common_config('ostatus', 'hub_retries')); + } + $data = array('sub' => clone($this), + 'mode' => $mode, + 'token' => $token, + 'retries' => $retries); + $qm = QueueManager::get(); + $qm->enqueue($data, 'hubverify'); + } + + /** + * Send a verification ping to subscriber, and if confirmed apply the changes. + * This may create, update, or delete the database record. + * * @param string $mode 'subscribe' or 'unsubscribe' * @param string $token hub.verify_token value, if provided by client + * @throws ClientException on failure */ function verify($mode, $token=null) { assert($mode == 'subscribe' || $mode == 'unsubscribe'); - // Is this needed? data object fun... - $clone = clone($this); - $clone->challenge = common_good_rand(16); - $clone->update($this); - $this->challenge = $clone->challenge; - unset($clone); - + $challenge = common_good_rand(32); $params = array('hub.mode' => $mode, 'hub.topic' => $this->topic, - 'hub.challenge' => $this->challenge); + 'hub.challenge' => $challenge); if ($mode == 'subscribe') { $params['hub.lease_seconds'] = $this->lease; } if ($token !== null) { $params['hub.verify_token'] = $token; } - $url = $this->callback . '?' . http_build_query($params, '', '&'); // @fixme ugly urls - try { - $request = new HTTPClient(); - $response = $request->get($url); - $status = $response->getStatus(); - - if ($status >= 200 && $status < 300) { - $fail = false; - } else { - // @fixme how can we schedule a second attempt? - // Or should we? - $fail = "Returned HTTP $status"; - } - } catch (Exception $e) { - $fail = $e->getMessage(); + // Any existing query string parameters must be preserved + $url = $this->callback; + if (strpos('?', $url) !== false) { + $url .= '&'; + } else { + $url .= '?'; } - if ($fail) { - // @fixme how can we schedule a second attempt? - // or save a fail count? - // Or should we? - common_log(LOG_ERR, "Failed to verify $mode for $this->topic at $this->callback: $fail"); - return false; + $url .= http_build_query($params, '', '&'); + + $request = new HTTPClient(); + $response = $request->get($url); + $status = $response->getStatus(); + + if ($status >= 200 && $status < 300) { + common_log(LOG_INFO, "Verified $mode of $this->callback:$this->topic"); } else { - if ($mode == 'subscribe') { - // Establish or renew the subscription! - // This seems unnecessary... dataobject fun! - $clone = clone($this); - $clone->challenge = null; - $clone->setLease($this->lease); - $clone->update($this); - unset($clone); + throw new ClientException("Hub subscriber verification returned HTTP $status"); + } - $this->challenge = null; - $this->setLease($this->lease); - common_log(LOG_ERR, "Verified $mode of $this->callback:$this->topic for $this->lease seconds"); - } else if ($mode == 'unsubscribe') { - common_log(LOG_ERR, "Verified $mode of $this->callback:$this->topic"); - $this->delete(); + $old = HubSub::staticGet($this->topic, $this->callback); + if ($mode == 'subscribe') { + if ($old) { + $this->update($old); + } else { + $ok = $this->insert(); + } + } else if ($mode == 'unsubscribe') { + if ($old) { + $old->delete(); + } else { + // That's ok, we're already unsubscribed. } - return true; } } /** * Insert wrapper; transparently set the hash key from topic and callback columns. - * @return boolean success + * @return mixed success */ function insert() { $this->hashkey = self::hashkey($this->topic, $this->callback); + $this->created = common_sql_now(); + $this->modified = common_sql_now(); return parent::insert(); } + /** + * Update wrapper; transparently update modified column. + * @return boolean success + */ + function update($old=null) + { + $this->modified = common_sql_now(); + return parent::update($old); + } + /** * Schedule delivery of a 'fat ping' to the subscriber's callback * endpoint. If queues are disabled, this will run immediately. -- cgit v1.2.3-54-g00ecf