From dc09453a77f33c4dfdff306321ce93cf5fbd2d57 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Mon, 8 Feb 2010 11:06:03 -0800 Subject: First steps on converting FeedSub into the pub/sub basis for OStatus communications: * renamed FeedSub plugin to OStatus * now setting avatar on subscriptions * general fixes for subscription * integrated PuSH hub to handle only user timelines on canonical ID url; sends updates directly * set $config['feedsub']['nohub'] = true to test w/ foreign feeds that don't have hubs (won't actually receive updates though) * a few bits of code documentation * HMAC support for verified distributions (safest if sub setup is on HTTPS) And a couple core changes: * minimizing HTML output for exceptions in API requests to aid in debugging * fix for rel=self link in apitimelineuser when id given This does not not yet include any of the individual subscription management (Salmon notifications for sub/unsub, etc) nor a nice UI for user subscriptions. Needs some further cleanup to treat posts as status updates instead of link references. --- plugins/OStatus/classes/Feedinfo.php | 345 +++++++++++++++++++++++++++++++++++ plugins/OStatus/classes/HubSub.php | 272 +++++++++++++++++++++++++++ 2 files changed, 617 insertions(+) create mode 100644 plugins/OStatus/classes/Feedinfo.php create mode 100644 plugins/OStatus/classes/HubSub.php (limited to 'plugins/OStatus/classes') diff --git a/plugins/OStatus/classes/Feedinfo.php b/plugins/OStatus/classes/Feedinfo.php new file mode 100644 index 000000000..f29d08cb0 --- /dev/null +++ b/plugins/OStatus/classes/Feedinfo.php @@ -0,0 +1,345 @@ +. + */ + +/** + * @package FeedSubPlugin + * @maintainer Brion Vibber + */ + +/* +PuSH subscription flow: + + $feedinfo->subscribe() + generate random verification token + save to verify_token + sends a sub request to the hub... + + feedsub/callback + hub sends confirmation back to us via GET + We verify the request, then echo back the challenge. + On our end, we save the time we subscribed and the lease expiration + + feedsub/callback + hub sends us updates via POST + +*/ + +class FeedDBException extends FeedSubException +{ + public $obj; + + function __construct($obj) + { + parent::__construct('Database insert failure'); + $this->obj = $obj; + } +} + +class Feedinfo extends Memcached_DataObject +{ + public $__table = 'feedinfo'; + + public $id; + public $profile_id; + + public $feeduri; + public $homeuri; + public $huburi; + + // PuSH subscription data + public $secret; + public $verify_token; + public $sub_start; + public $sub_end; + + public $created; + public $lastupdate; + + + public /*static*/ function staticGet($k, $v=null) + { + return parent::staticGet(__CLASS__, $k, $v); + } + + /** + * return table definition for DB_DataObject + * + * DB_DataObject needs to know something about the table to manipulate + * instances. This method provides all the DB_DataObject needs to know. + * + * @return array array of column definitions + */ + + function table() + { + return array('id' => DB_DATAOBJECT_INT + DB_DATAOBJECT_NOTNULL, + 'profile_id' => DB_DATAOBJECT_INT + DB_DATAOBJECT_NOTNULL, + 'feeduri' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, + 'homeuri' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, + 'huburi' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, + 'secret' => DB_DATAOBJECT_STR, + 'verify_token' => DB_DATAOBJECT_STR, + 'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, + 'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, + 'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL, + 'lastupdate' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL); + } + + static function schemaDef() + { + return array(new ColumnDef('id', 'integer', + /*size*/ null, + /*nullable*/ false, + /*key*/ 'PRI', + /*default*/ '0', + /*extra*/ null, + /*auto_increment*/ true), + new ColumnDef('profile_id', 'integer', + null, false), + new ColumnDef('feeduri', 'varchar', + 255, false, 'UNI'), + new ColumnDef('homeuri', 'varchar', + 255, false), + new ColumnDef('huburi', 'varchar', + 255, false), + new ColumnDef('verify_token', 'varchar', + 32, true), + new ColumnDef('secret', 'varchar', + 64, true), + new ColumnDef('sub_start', 'datetime', + null, true), + new ColumnDef('sub_end', 'datetime', + null, true), + new ColumnDef('created', 'datetime', + null, false), + new ColumnDef('lastupdate', 'datetime', + null, false)); + } + + /** + * return key definitions for DB_DataObject + * + * DB_DataObject needs to know about keys that the table has; this function + * defines them. + * + * @return array key definitions + */ + + function keys() + { + return array_keys($this->keyTypes()); + } + + /** + * return key definitions for Memcached_DataObject + * + * Our caching system uses the same key definitions, but uses a different + * method to get them. + * + * @return array key definitions + */ + + function keyTypes() + { + return array('id' => 'K'); // @fixme we'll need a profile_id key at least + } + + function sequenceKey() + { + return array('id', true, false); + } + + /** + * Fetch the StatusNet-side profile for this feed + * @return Profile + */ + public function getProfile() + { + return Profile::staticGet('id', $this->profile_id); + } + + /** + * @param FeedMunger $munger + * @return Feedinfo + */ + public static function ensureProfile($munger) + { + $feedinfo = $munger->feedinfo(); + + $current = self::staticGet('feeduri', $feedinfo->feeduri); + if ($current) { + // @fixme we should probably update info as necessary + return $current; + } + + $feedinfo->query('BEGIN'); + + // Awful hack! Awful hack! + $feedinfo->verify = common_good_rand(16); + $feedinfo->secret = common_good_rand(32); + + try { + $profile = $munger->profile(); + $result = $profile->insert(); + if (empty($result)) { + throw new FeedDBException($profile); + } + + $avatar = $munger->getAvatar(); + if ($avatar) { + // @fixme this should be better encapsulated + // ripped from oauthstore.php (for old OMB client) + $temp_filename = tempnam(sys_get_temp_dir(), 'listener_avatar'); + copy($avatar, $temp_filename); + $imagefile = new ImageFile($profile->id, $temp_filename); + $filename = Avatar::filename($profile->id, + image_type_to_extension($imagefile->type), + null, + common_timestamp()); + rename($temp_filename, Avatar::path($filename)); + $profile->setOriginal($filename); + } + + $feedinfo->profile_id = $profile->id; + $result = $feedinfo->insert(); + if (empty($result)) { + throw new FeedDBException($feedinfo); + } + + $feedinfo->query('COMMIT'); + } catch (FeedDBException $e) { + common_log_db_error($e->obj, 'INSERT', __FILE__); + $feedinfo->query('ROLLBACK'); + return false; + } + return $feedinfo; + } + + /** + * Send a subscription request to the hub for this feed. + * The hub will later send us a confirmation POST to /feedsub/callback. + * + * @return bool true on success, false on failure + */ + public function subscribe() + { + if (common_config('feedsub', 'nohub')) { + // Fake it! We're just testing remote feeds w/o hubs. + return true; + } + // @fixme use the verification token + #$token = md5(mt_rand() . ':' . $this->feeduri); + #$this->verify_token = $token; + #$this->update(); // @fixme + try { + $callback = common_local_url('feedsubcallback', array('feed' => $this->id)); + $headers = array('Content-Type: application/x-www-form-urlencoded'); + $post = array('hub.mode' => 'subscribe', + 'hub.callback' => $callback, + 'hub.verify' => 'async', + 'hub.verify_token' => $this->verify_token, + 'hub.secret' => $this->secret, + //'hub.lease_seconds' => 0, + 'hub.topic' => $this->feeduri); + $client = new HTTPClient(); + $response = $client->post($this->huburi, $headers, $post); + $status = $response->getStatus(); + if ($status == 202) { + common_log(LOG_INFO, __METHOD__ . ': sub req ok, awaiting verification callback'); + return true; + } else if ($status == 204) { + common_log(LOG_INFO, __METHOD__ . ': sub req ok and verified'); + return true; + } else if ($status >= 200 && $status < 300) { + common_log(LOG_ERR, __METHOD__ . ": sub req returned unexpected HTTP $status: " . $response->getBody()); + return false; + } else { + common_log(LOG_ERR, __METHOD__ . ": sub req failed with HTTP $status: " . $response->getBody()); + return false; + } + } catch (Exception $e) { + // wtf! + common_log(LOG_ERR, __METHOD__ . ": error \"{$e->getMessage()}\" hitting hub $this->huburi subscribing to $this->feeduri"); + return false; + } + } + + /** + * Read and post notices for updates from the feed. + * Currently assumes that all items in the feed are new, + * coming from a PuSH hub. + * + * @param string $xml source of Atom or RSS feed + * @param string $hmac X-Hub-Signature header, if present + */ + public function postUpdates($xml, $hmac) + { + common_log(LOG_INFO, __METHOD__ . ": packet for \"$this->feeduri\"! $hmac $xml"); + + if ($this->secret) { + if (preg_match('/^sha1=([0-9a-fA-F]{40})$/', $hmac, $matches)) { + $their_hmac = strtolower($matches[1]); + $our_hmac = sha1($xml . $this->secret); + if ($their_hmac !== $our_hmac) { + common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bad SHA-1 HMAC: got $their_hmac, expected $our_hmac"); + return; + } + } else { + common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bogus HMAC '$hmac'"); + return; + } + } else if ($hmac) { + common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with unexpected HMAC '$hmac'"); + return; + } + + require_once "XML/Feed/Parser.php"; + $feed = new XML_Feed_Parser($xml, false, false, true); + $munger = new FeedMunger($feed); + + $hits = 0; + foreach ($feed as $index => $entry) { + // @fixme this might sort in wrong order if we get multiple updates + + $notice = $munger->notice($index); + $notice->profile_id = $this->profile_id; + + // Double-check for oldies + // @fixme this could explode horribly for multiple feeds on a blog. sigh + $dupe = new Notice(); + $dupe->uri = $notice->uri; + if ($dupe->find(true)) { + common_log(LOG_WARNING, __METHOD__ . ": tried to save dupe notice for entry {$notice->uri} of feed {$this->feeduri}"); + continue; + } + + if (Event::handle('StartNoticeSave', array(&$notice))) { + $id = $notice->insert(); + Event::handle('EndNoticeSave', array($notice)); + } + $notice->addToInboxes(); + + common_log(LOG_INFO, __METHOD__ . ": saved notice {$notice->id} for entry $index of update to \"{$this->feeduri}\""); + $hits++; + } + if ($hits == 0) { + common_log(LOG_INFO, __METHOD__ . ": no updates in packet for \"$this->feeduri\"! $xml"); + } + } +} diff --git a/plugins/OStatus/classes/HubSub.php b/plugins/OStatus/classes/HubSub.php new file mode 100644 index 000000000..1769f6c94 --- /dev/null +++ b/plugins/OStatus/classes/HubSub.php @@ -0,0 +1,272 @@ +. + */ + +/** + * PuSH feed subscription record + * @package Hub + * @author Brion Vibber + */ +class HubSub extends Memcached_DataObject +{ + public $__table = 'hubsub'; + + public $hashkey; // sha1(topic . '|' . $callback); (topic, callback) key is too long for myisam in utf8 + public $topic; + public $callback; + public $secret; + public $verify_token; + public $challenge; + public $lease; + public $sub_start; + public $sub_end; + public $created; + + public /*static*/ function staticGet($topic, $callback) + { + return parent::staticGet(__CLASS__, 'hashkey', self::hashkey($topic, $callback)); + } + + protected static function hashkey($topic, $callback) + { + return sha1($topic . '|' . $callback); + } + + /** + * return table definition for DB_DataObject + * + * DB_DataObject needs to know something about the table to manipulate + * instances. This method provides all the DB_DataObject needs to know. + * + * @return array array of column definitions + */ + + function table() + { + return array('hashkey' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, + 'topic' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, + 'callback' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, + 'secret' => DB_DATAOBJECT_STR, + 'verify_token' => DB_DATAOBJECT_STR, + 'challenge' => DB_DATAOBJECT_STR, + 'lease' => DB_DATAOBJECT_INT, + 'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, + 'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, + 'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL); + } + + static function schemaDef() + { + return array(new ColumnDef('hashkey', 'char', + /*size*/40, + /*nullable*/false, + /*key*/'PRI'), + new ColumnDef('topic', 'varchar', + /*size*/255, + /*nullable*/false, + /*key*/'KEY'), + new ColumnDef('callback', 'varchar', + 255, false), + new ColumnDef('secret', 'text', + null, true), + new ColumnDef('verify_token', 'text', + null, true), + new ColumnDef('challenge', 'varchar', + 32, true), + new ColumnDef('lease', 'int', + null, true), + new ColumnDef('sub_start', 'datetime', + null, true), + new ColumnDef('sub_end', 'datetime', + null, true), + new ColumnDef('created', 'datetime', + null, false)); + } + + function keys() + { + return array_keys($this->keyTypes()); + } + + function sequenceKeys() + { + return array(false, false, false); + } + + /** + * return key definitions for DB_DataObject + * + * DB_DataObject needs to know about keys that the table has; this function + * defines them. + * + * @return array key definitions + */ + + function keyTypes() + { + return array('hashkey' => 'K'); + } + + /** + * Validates a requested lease length, sets length plus + * subscription start & end dates. + * + * Does not save to database -- use before insert() or update(). + * + * @param int $length in seconds + */ + function setLease($length) + { + assert(is_int($length)); + + $min = 86400; + $max = 86400 * 30; + + if ($length == 0) { + // We want to garbage collect dead subscriptions! + $length = $max; + } elseif( $length < $min) { + $length = $min; + } else if ($length > $max) { + $length = $max; + } + + $this->lease = $length; + $this->start_sub = common_sql_now(); + $this->end_sub = common_sql_date(time() + $length); + } + + /** + * Send a verification ping to subscriber + * @param string $mode 'subscribe' or 'unsubscribe' + */ + function verify($mode) + { + assert($mode == 'subscribe' || $mode == 'unsubscribe'); + + // Is this needed? data object fun... + $clone = clone($this); + $clone->challenge = common_good_rand(16); + $clone->update($this); + $this->challenge = $clone->challenge; + unset($clone); + + $params = array('hub.mode' => $mode, + 'hub.topic' => $this->topic, + 'hub.challenge' => $this->challenge); + if ($mode == 'subscribe') { + $params['hub.lease_seconds'] = $this->lease; + } + if ($this->verify_token) { + $params['hub.verify_token'] = $this->verify_token; + } + $url = $this->callback . '?' . http_build_query($params, '', '&'); // @fixme ugly urls + + try { + $request = new HTTPClient(); + $response = $request->get($url); + $status = $response->getStatus(); + + if ($status >= 200 && $status < 300) { + $fail = false; + } else { + // @fixme how can we schedule a second attempt? + // Or should we? + $fail = "Returned HTTP $status"; + } + } catch (Exception $e) { + $fail = $e->getMessage(); + } + if ($fail) { + // @fixme how can we schedule a second attempt? + // or save a fail count? + // Or should we? + common_log(LOG_ERR, "Failed to verify $mode for $this->topic at $this->callback: $fail"); + return false; + } else { + if ($mode == 'subscribe') { + // Establish or renew the subscription! + // This seems unnecessary... dataobject fun! + $clone = clone($this); + $clone->challenge = null; + $clone->setLease($this->lease); + $clone->update($this); + unset($clone); + + $this->challenge = null; + $this->setLease($this->lease); + common_log(LOG_ERR, "Verified $mode of $this->callback:$this->topic for $this->lease seconds"); + } else if ($mode == 'unsubscribe') { + common_log(LOG_ERR, "Verified $mode of $this->callback:$this->topic"); + $this->delete(); + } + return true; + } + } + + /** + * Insert wrapper; transparently set the hash key from topic and callback columns. + * @return boolean success + */ + function insert() + { + $this->hashkey = self::hashkey($this->topic, $this->callback); + return parent::insert(); + } + + /** + * Send a 'fat ping' to the subscriber's callback endpoint + * containing the given Atom feed chunk. + * + * Determination of which items to send should be done at + * a higher level; don't just shove in a complete feed! + * + * @param string $atom well-formed Atom feed + */ + function push($atom) + { + $headers = array('Content-Type: application/atom+xml'); + if ($this->secret) { + $hmac = sha1($atom . $this->secret); + $headers[] = "X-Hub-Signature: sha1=$hmac"; + } else { + $hmac = '(none)'; + } + common_log(LOG_INFO, "About to push feed to $this->callback for $this->topic, HMAC $hmac"); + try { + $request = new HTTPClient(); + $request->setBody($atom); + $response = $request->post($this->callback, $headers); + + if ($response->isOk()) { + return true; + } + common_log(LOG_ERR, "Error sending PuSH content " . + "to $this->callback for $this->topic: " . + $response->getStatus()); + return false; + + } catch (Exception $e) { + common_log(LOG_ERR, "Error sending PuSH content " . + "to $this->callback for $this->topic: " . + $e->getMessage()); + return false; + } + } +} + -- cgit v1.2.3-54-g00ecf