diff options
author | Craig Andrews <candrews@integralblue.com> | 2010-02-24 20:52:45 -0500 |
---|---|---|
committer | Craig Andrews <candrews@integralblue.com> | 2010-02-24 20:52:45 -0500 |
commit | c187bf55974347f7ddb4f28714af57861dce8f08 (patch) | |
tree | 4398b456d88ce79977959b25ba1a0f6fe0c1d77f /plugins/OStatus/classes/Ostatus_profile.php | |
parent | 20d6a7caed6636c28cc7b95c584549691dff4388 (diff) | |
parent | 8914b69d5055c1bc7d0604ee338ffdaf6b0a8606 (diff) |
Merge branch '0.9.x' into 1.0.x
Conflicts:
EVENTS.txt
db/statusnet.sql
lib/queuemanager.php
Diffstat (limited to 'plugins/OStatus/classes/Ostatus_profile.php')
-rw-r--r-- | plugins/OStatus/classes/Ostatus_profile.php | 1328 |
1 files changed, 1004 insertions, 324 deletions
diff --git a/plugins/OStatus/classes/Ostatus_profile.php b/plugins/OStatus/classes/Ostatus_profile.php index b750e1883..a366c1c2c 100644 --- a/plugins/OStatus/classes/Ostatus_profile.php +++ b/plugins/OStatus/classes/Ostatus_profile.php @@ -18,62 +18,25 @@ */ /** - * @package FeedSubPlugin + * @package OStatusPlugin * @maintainer Brion Vibber <brion@status.net> */ -/* -PuSH subscription flow: - - $profile->subscribe() - generate random verification token - save to verify_token - sends a sub request to the hub... - - main/push/callback - hub sends confirmation back to us via GET - We verify the request, then echo back the challenge. - On our end, we save the time we subscribed and the lease expiration - - main/push/callback - hub sends us updates via POST - -*/ - -class FeedDBException extends FeedSubException -{ - public $obj; - - function __construct($obj) - { - parent::__construct('Database insert failure'); - $this->obj = $obj; - } -} - class Ostatus_profile extends Memcached_DataObject { public $__table = 'ostatus_profile'; - public $id; + public $uri; + public $profile_id; public $group_id; public $feeduri; - public $homeuri; - - // PuSH subscription data - public $huburi; - public $secret; - public $verify_token; - public $sub_state; // subscribe, active, unsubscribe - public $sub_start; - public $sub_end; - public $salmonuri; + public $avatar; // remote URL of the last avatar we saved public $created; - public $lastupdate; + public $modified; public /*static*/ function staticGet($k, $v=null) { @@ -91,56 +54,33 @@ class Ostatus_profile extends Memcached_DataObject function table() { - return array('id' => DB_DATAOBJECT_INT + DB_DATAOBJECT_NOTNULL, + return array('uri' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, 'profile_id' => DB_DATAOBJECT_INT, 'group_id' => DB_DATAOBJECT_INT, - 'feeduri' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, - 'homeuri' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, - 'huburi' => DB_DATAOBJECT_STR, - 'secret' => DB_DATAOBJECT_STR, - 'verify_token' => DB_DATAOBJECT_STR, - 'sub_state' => DB_DATAOBJECT_STR, - 'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, - 'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, + 'feeduri' => DB_DATAOBJECT_STR, 'salmonuri' => DB_DATAOBJECT_STR, + 'avatar' => DB_DATAOBJECT_STR, 'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL, - 'lastupdate' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL); + 'modified' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL); } static function schemaDef() { - return array(new ColumnDef('id', 'integer', - /*size*/ null, - /*nullable*/ false, - /*key*/ 'PRI', - /*default*/ '0', - /*extra*/ null, - /*auto_increment*/ true), + return array(new ColumnDef('uri', 'varchar', + 255, false, 'PRI'), new ColumnDef('profile_id', 'integer', null, true, 'UNI'), new ColumnDef('group_id', 'integer', null, true, 'UNI'), new ColumnDef('feeduri', 'varchar', - 255, false, 'UNI'), - new ColumnDef('homeuri', 'varchar', - 255, false), - new ColumnDef('huburi', 'text', - null, true), - new ColumnDef('verify_token', 'varchar', - 32, true), - new ColumnDef('secret', 'varchar', - 64, true), - new ColumnDef('sub_state', "enum('subscribe','active','unsubscribe')", - null, true), - new ColumnDef('sub_start', 'datetime', - null, true), - new ColumnDef('sub_end', 'datetime', - null, true), + 255, true, 'UNI'), new ColumnDef('salmonuri', 'text', null, true), + new ColumnDef('avatar', 'text', + null, true), new ColumnDef('created', 'datetime', null, false), - new ColumnDef('lastupdate', 'datetime', + new ColumnDef('modified', 'datetime', null, false)); } @@ -169,12 +109,12 @@ class Ostatus_profile extends Memcached_DataObject function keyTypes() { - return array('id' => 'K', 'profile_id' => 'U', 'group_id' => 'U', 'feeduri' => 'U'); + return array('uri' => 'K', 'profile_id' => 'U', 'group_id' => 'U', 'feeduri' => 'U'); } function sequenceKey() { - return array('id', true, false); + return array(false, false, false); } /** @@ -202,99 +142,37 @@ class Ostatus_profile extends Memcached_DataObject } /** - * @param FeedMunger $munger - * @param boolean $isGroup is this a group record? - * @return Ostatus_profile + * Returns an ActivityObject describing this remote user or group profile. + * Can then be used to generate Atom chunks. + * + * @return ActivityObject */ - public static function ensureProfile($munger) + function asActivityObject() { - $profile = $munger->ostatusProfile(); - - $current = self::staticGet('feeduri', $profile->feeduri); - if ($current) { - // @fixme we should probably update info as necessary - return $current; - } - - $profile->query('BEGIN'); - - // Awful hack! Awful hack! - $profile->verify = common_good_rand(16); - $profile->secret = common_good_rand(32); + if ($this->isGroup()) { + $object = new ActivityObject(); + $object->type = 'http://activitystrea.ms/schema/1.0/group'; + $object->id = $this->uri; + $self = $this->localGroup(); - try { - $local = $munger->profile(); - - if ($entity->isGroup()) { - $group = new User_group(); - $group->nickname = $local->nickname . '@remote'; // @fixme - $group->fullname = $local->fullname; - $group->homepage = $local->homepage; - $group->location = $local->location; - $group->created = $local->created; - $group->insert(); - if (empty($result)) { - throw new FeedDBException($group); - } - $profile->group_id = $group->id; - } else { - $result = $local->insert(); - if (empty($result)) { - throw new FeedDBException($local); + // @fixme put a standard getAvatar() interface on groups too + if ($self->homepage_logo) { + $object->avatar = $self->homepage_logo; + $map = array('png' => 'image/png', + 'jpg' => 'image/jpeg', + 'jpeg' => 'image/jpeg', + 'gif' => 'image/gif'); + $extension = pathinfo(parse_url($avatarHref, PHP_URL_PATH), PATHINFO_EXTENSION); + if (isset($map[$extension])) { + // @fixme this ain't used/saved yet + $object->avatarType = $map[$extension]; } - $profile->profile_id = $local->id; - } - - $profile->created = sql_common_date(); - $profile->lastupdate = sql_common_date(); - $result = $profile->insert(); - if (empty($result)) { - throw new FeedDBException($profile); } - $entity->query('COMMIT'); - } catch (FeedDBException $e) { - common_log_db_error($e->obj, 'INSERT', __FILE__); - $entity->query('ROLLBACK'); - return false; - } - - $avatar = $munger->getAvatar(); - if ($avatar) { - try { - $this->updateAvatar($avatar); - } catch (Exception $e) { - common_log(LOG_ERR, "Exception setting OStatus avatar: " . - $e->getMessage()); - } - } - - return $entity; - } - - /** - * Download and update given avatar image - * @param string $url - * @throws Exception in various failure cases - */ - public function updateAvatar($url) - { - // @fixme this should be better encapsulated - // ripped from oauthstore.php (for old OMB client) - $temp_filename = tempnam(sys_get_temp_dir(), 'listener_avatar'); - copy($url, $temp_filename); - $imagefile = new ImageFile($profile->id, $temp_filename); - $filename = Avatar::filename($profile->id, - image_type_to_extension($imagefile->type), - null, - common_timestamp()); - rename($temp_filename, Avatar::path($filename)); - if ($this->isGroup()) { - $group = $this->localGroup(); - $group->setOriginal($filename); + $object->link = $this->uri; // @fixme accurate? + return $object; } else { - $profile = $this->localProfile(); - $profile->setOriginal($filename); + return ActivityObject::fromProfile($this->localProfile()); } } @@ -304,13 +182,14 @@ class Ostatus_profile extends Memcached_DataObject * * Assumes that 'activity' namespace has been previously defined. * + * @fixme replace with wrappers on asActivityObject when it's got everything. + * * @param string $element one of 'actor', 'subject', 'object', 'target' * @return string */ function asActivityNoun($element) { $xs = new XMLStringer(true); - $avatarHref = Avatar::defaultImage(AVATAR_PROFILE_SIZE); $avatarType = 'image/png'; if ($this->isGroup()) { @@ -334,8 +213,8 @@ class Ostatus_profile extends Memcached_DataObject $self = $this->localProfile(); $avatar = $self->getAvatar(AVATAR_PROFILE_SIZE); if ($avatar) { - $avatarHref = $avatar-> - $avatarType = $avatar->mediatype; + $avatarHref = $avatar->url; + $avatarType = $avatar->mediatype; } } $xs->elementStart('activity:' . $element); @@ -347,7 +226,7 @@ class Ostatus_profile extends Memcached_DataObject $xs->element( 'id', null, - $this->homeuri); // ? + $this->uri); // ? $xs->element('title', null, $self->getBestName()); $xs->element( @@ -364,100 +243,84 @@ class Ostatus_profile extends Memcached_DataObject } /** - * Damn dirty hack! + * @return boolean true if this is a remote group */ function isGroup() { - return (strpos($this->feeduri, '/groups/') !== false); + if ($this->profile_id && !$this->group_id) { + return false; + } else if ($this->group_id && !$this->profile_id) { + return true; + } else if ($this->group_id && $this->profile_id) { + throw new ServerException("Invalid ostatus_profile state: both group and profile IDs set for $this->uri"); + } else { + throw new ServerException("Invalid ostatus_profile state: both group and profile IDs empty for $this->uri"); + } } /** - * Send a subscription request to the hub for this feed. - * The hub will later send us a confirmation POST to /main/push/callback. + * Subscribe a local user to this remote user. + * PuSH subscription will be started if necessary, and we'll + * send a Salmon notification to the remote server if available + * notifying them of the sub. * - * @return bool true on success, false on failure + * @param User $user + * @return boolean success + * @throws FeedException */ - public function subscribe($mode='subscribe') + public function subscribeLocalToRemote(User $user) { - if (common_config('feedsub', 'nohub')) { - // Fake it! We're just testing remote feeds w/o hubs. - return true; + if ($this->isGroup()) { + throw new ServerException("Can't subscribe to a remote group"); } - // @fixme use the verification token - #$token = md5(mt_rand() . ':' . $this->feeduri); - #$this->verify_token = $token; - #$this->update(); // @fixme - try { - $callback = common_local_url('pushcallback', array('feed' => $this->id)); - $headers = array('Content-Type: application/x-www-form-urlencoded'); - $post = array('hub.mode' => $mode, - 'hub.callback' => $callback, - 'hub.verify' => 'async', - 'hub.verify_token' => $this->verify_token, - 'hub.secret' => $this->secret, - //'hub.lease_seconds' => 0, - 'hub.topic' => $this->feeduri); - $client = new HTTPClient(); - $response = $client->post($this->huburi, $headers, $post); - $status = $response->getStatus(); - if ($status == 202) { - common_log(LOG_INFO, __METHOD__ . ': sub req ok, awaiting verification callback'); - return true; - } else if ($status == 204) { - common_log(LOG_INFO, __METHOD__ . ': sub req ok and verified'); + + if ($this->subscribe()) { + if ($user->subscribeTo($this->localProfile())) { + $this->notify($user->getProfile(), ActivityVerb::FOLLOW, $this); return true; - } else if ($status >= 200 && $status < 300) { - common_log(LOG_ERR, __METHOD__ . ": sub req returned unexpected HTTP $status: " . $response->getBody()); - return false; - } else { - common_log(LOG_ERR, __METHOD__ . ": sub req failed with HTTP $status: " . $response->getBody()); - return false; } - } catch (Exception $e) { - // wtf! - common_log(LOG_ERR, __METHOD__ . ": error \"{$e->getMessage()}\" hitting hub $this->huburi subscribing to $this->feeduri"); - return false; } + return false; } /** - * Save PuSH subscription confirmation. - * Sets approximate lease start and end times and finalizes state. + * Mark this remote profile as subscribing to the given local user, + * and send appropriate notifications to the user. * - * @param int $lease_seconds provided hub.lease_seconds parameter, if given + * This will generally be in response to a subscription notification + * from a foreign site to our local Salmon response channel. + * + * @param User $user + * @return boolean success */ - public function confirmSubscribe($lease_seconds=0) + public function subscribeRemoteToLocal(User $user) { - $original = clone($this); - - $this->sub_state = 'active'; - $this->sub_start = common_sql_date(time()); - if ($lease_seconds > 0) { - $this->sub_end = common_sql_date(time() + $lease_seconds); - } else { - $this->sub_end = null; + if ($this->isGroup()) { + throw new ServerException("Remote groups can't subscribe to local users"); } - $this->lastupdate = common_sql_date(); - return $this->update($original); + Subscription::start($this->localProfile(), $user->getProfile()); + + return true; } /** - * Save PuSH unsubscription confirmation. - * Wipes active PuSH sub info and resets state. + * Send a subscription request to the hub for this feed. + * The hub will later send us a confirmation POST to /main/push/callback. + * + * @return bool true on success, false on failure + * @throws ServerException if feed state is not valid */ - public function confirmUnsubscribe() + public function subscribe() { - $original = clone($this); - - $this->verify_token = null; - $this->secret = null; - $this->sub_state = null; - $this->sub_start = null; - $this->sub_end = null; - $this->lastupdate = common_sql_date(); - - return $this->update($original); + $feedsub = FeedSub::ensureFeed($this->feeduri); + if ($feedsub->sub_state == 'active' || $feedsub->sub_state == 'subscribe') { + return true; + } else if ($feedsub->sub_state == '' || $feedsub->sub_state == 'inactive') { + return $feedsub->subscribe(); + } else if ('unsubscribe') { + throw new FeedSubException("Unsub is pending, can't subscribe..."); + } } /** @@ -465,55 +328,148 @@ class Ostatus_profile extends Memcached_DataObject * The hub will later send us a confirmation POST to /main/push/callback. * * @return bool true on success, false on failure + * @throws ServerException if feed state is not valid */ public function unsubscribe() { - return $this->subscribe('unsubscribe'); + $feedsub = FeedSub::staticGet('uri', $this->feeduri); + if ($feedsub->sub_state == 'active') { + return $feedsub->unsubscribe(); + } else if ($feedsub->sub_state == '' || $feedsub->sub_state == 'inactive' || $feedsub->sub_state == 'unsubscribe') { + return true; + } else if ($feedsub->sub_state == 'subscribe') { + throw new FeedSubException("Feed is awaiting subscription, can't unsub..."); + } + } + + /** + * Check if this remote profile has any active local subscriptions, and + * if not drop the PuSH subscription feed. + * + * @return boolean + */ + public function garbageCollect() + { + if ($this->isGroup()) { + $members = $this->localGroup()->getMembers(0, 1); + $count = $members->N; + } else { + $count = $this->localProfile()->subscriberCount(); + } + if ($count == 0) { + common_log(LOG_INFO, "Unsubscribing from now-unused remote feed $oprofile->feeduri"); + $this->unsubscribe(); + return true; + } else { + return false; + } } /** * Send an Activity Streams notification to the remote Salmon endpoint, * if so configured. * - * @param Profile $actor - * @param $verb eg Activity::SUBSCRIBE or Activity::JOIN - * @param $object object of the action; if null, the remote entity itself is assumed + * @param Profile $actor Actor who did the activity + * @param string $verb Activity::SUBSCRIBE or Activity::JOIN + * @param Object $object object of the action; must define asActivityNoun($tag) */ - public function notify(Profile $actor, $verb, $object=null) + public function notify($actor, $verb, $object=null) { + if (!($actor instanceof Profile)) { + $type = gettype($actor); + if ($type == 'object') { + $type = get_class($actor); + } + throw new ServerException("Invalid actor passed to " . __METHOD__ . ": " . $type); + } if ($object == null) { $object = $this; } if ($this->salmonuri) { - $text = 'update'; // @fixme - $id = 'tag:' . common_config('site', 'server') . - ':' . $verb . - ':' . $actor->id . - ':' . time(); // @fixme - - $entry = new Atom10Entry(); - $entry->elementStart('entry'); + + $text = 'update'; + $id = TagURI::mint('%s:%s:%s', + $verb, + $actor->getURI(), + common_date_iso8601(time())); + + // @fixme consolidate all these NS settings somewhere + $attributes = array('xmlns' => Activity::ATOM, + 'xmlns:activity' => 'http://activitystrea.ms/spec/1.0/', + 'xmlns:thr' => 'http://purl.org/syndication/thread/1.0', + 'xmlns:georss' => 'http://www.georss.org/georss', + 'xmlns:ostatus' => 'http://ostatus.org/schema/1.0', + 'xmlns:poco' => 'http://portablecontacts.net/spec/1.0'); + + $entry = new XMLStringer(); + $entry->elementStart('entry', $attributes); $entry->element('id', null, $id); $entry->element('title', null, $text); $entry->element('summary', null, $text); - $entry->element('published', null, common_date_w3dtf()); + $entry->element('published', null, common_date_w3dtf(common_sql_now())); $entry->element('activity:verb', null, $verb); - $entry->raw($profile->asAtomAuthor()); - $entry->raw($profile->asActivityActor()); + $entry->raw($actor->asAtomAuthor()); + $entry->raw($actor->asActivityActor()); $entry->raw($object->asActivityNoun('object')); - $entry->elmentEnd('entry'); + $entry->elementEnd('entry'); - $feed = $this->atomFeed($actor); - $feed->initFeed(); - $feed->addEntry($entry); - $feed->renderEntries(); - $feed->endFeed(); - - $xml = $feed->getString(); - common_log(LOG_INFO, "Posting to Salmon endpoint $salmon: $xml"); + $xml = $entry->getString(); + common_log(LOG_INFO, "Posting to Salmon endpoint $this->salmonuri: $xml"); $salmon = new Salmon(); // ? - $salmon->post($this->salmonuri, $xml); + return $salmon->post($this->salmonuri, $xml); + } + return false; + } + + /** + * Send a Salmon notification ping immediately, and confirm that we got + * an acceptable response from the remote site. + * + * @param mixed $entry XML string, Notice, or Activity + * @return boolean success + */ + public function notifyActivity($entry) + { + if ($this->salmonuri) { + $salmon = new Salmon(); + return $salmon->post($this->salmonuri, $this->notifyPrepXml($entry)); + } + + return false; + } + + /** + * Queue a Salmon notification for later. If queues are disabled we'll + * send immediately but won't get the return value. + * + * @param mixed $entry XML string, Notice, or Activity + * @return boolean success + */ + public function notifyDeferred($entry) + { + if ($this->salmonuri) { + $data = array('salmonuri' => $this->salmonuri, + 'entry' => $this->notifyPrepXml($entry)); + + $qm = QueueManager::get(); + return $qm->enqueue($data, 'salmon'); + } + + return false; + } + + protected function notifyPrepXml($entry) + { + $preamble = '<?xml version="1.0" encoding="UTF-8" ?' . '>'; + if (is_string($entry)) { + return $entry; + } else if ($entry instanceof Activity) { + return $preamble . $entry->asString(true); + } else if ($entry instanceof Notice) { + return $preamble . $entry->asAtomEntry(true, true); + } else { + throw new ServerException("Invalid type passed to Ostatus_profile::notify; must be XML string or Activity entry"); } } @@ -531,7 +487,7 @@ class Ostatus_profile extends Memcached_DataObject $feed = new Atom10Feed(); // @fixme should these be set up somewhere else? $feed->addNamespace('activity', 'http://activitystrea.ms/spec/1.0/'); - $feed->addNamesapce('thr', 'http://purl.org/syndication/thread/1.0'); + $feed->addNamespace('thr', 'http://purl.org/syndication/thread/1.0'); $feed->addNamespace('georss', 'http://www.georss.org/georss'); $feed->addNamespace('ostatus', 'http://ostatus.org/schema/1.0'); @@ -542,14 +498,14 @@ class Ostatus_profile extends Memcached_DataObject $feed->setUpdated(time()); $feed->setPublished(time()); - $feed->addLink(common_url('ApiTimelineUser', - array('id' => $actor->id, - 'type' => 'atom')), + $feed->addLink(common_local_url('ApiTimelineUser', + array('id' => $actor->id, + 'type' => 'atom')), array('rel' => 'self', 'type' => 'application/atom+xml')); - $feed->addLink(common_url('userbyid', - array('id' => $actor->id)), + $feed->addLink(common_local_url('userbyid', + array('id' => $actor->id)), array('rel' => 'alternate', 'type' => 'text/html')); @@ -561,84 +517,808 @@ class Ostatus_profile extends Memcached_DataObject * Currently assumes that all items in the feed are new, * coming from a PuSH hub. * - * @param string $xml source of Atom or RSS feed - * @param string $hmac X-Hub-Signature header, if present + * @param DOMDocument $feed + */ + public function processFeed($feed, $source) + { + $entries = $feed->getElementsByTagNameNS(Activity::ATOM, 'entry'); + if ($entries->length == 0) { + common_log(LOG_ERR, __METHOD__ . ": no entries in feed update, ignoring"); + return; + } + + for ($i = 0; $i < $entries->length; $i++) { + $entry = $entries->item($i); + $this->processEntry($entry, $feed, $source); + } + } + + /** + * Process a posted entry from this feed source. + * + * @param DOMElement $entry + * @param DOMElement $feed for context */ - public function postUpdates($xml, $hmac) + public function processEntry($entry, $feed, $source) { - common_log(LOG_INFO, __METHOD__ . ": packet for \"$this->feeduri\"! $hmac $xml"); + $activity = new Activity($entry, $feed); - if ($this->secret) { - if (preg_match('/^sha1=([0-9a-fA-F]{40})$/', $hmac, $matches)) { - $their_hmac = strtolower($matches[1]); - $our_hmac = hash_hmac('sha1', $xml, $this->secret); - if ($their_hmac !== $our_hmac) { - common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bad SHA-1 HMAC: got $their_hmac, expected $our_hmac"); - return; - } + if ($activity->verb == ActivityVerb::POST) { + $this->processPost($activity, $source); + } else { + common_log(LOG_INFO, "Ignoring activity with unrecognized verb $activity->verb"); + } + } + + /** + * Process an incoming post activity from this remote feed. + * @param Activity $activity + * @param string $method 'push' or 'salmon' + * @return mixed saved Notice or false + * @fixme break up this function, it's getting nasty long + */ + public function processPost($activity, $method) + { + if ($this->isGroup()) { + // A group feed will contain posts from multiple authors. + // @fixme validate these profiles in some way! + $oprofile = self::ensureActorProfile($activity); + if ($oprofile->isGroup()) { + // Groups can't post notices in StatusNet. + common_log(LOG_WARNING, "OStatus: skipping post with group listed as author: $oprofile->uri in feed from $this->uri"); + return false; + } + } else { + // Individual user feeds may contain only posts from themselves. + // Authorship is validated against the profile URI on upper layers, + // through PuSH setup or Salmon signature checks. + $actorUri = self::getActorProfileURI($activity); + if ($actorUri == $this->uri) { + // Check if profile info has changed and update it + $this->updateFromActivityObject($activity->actor); } else { - common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bogus HMAC '$hmac'"); - return; + common_log(LOG_WARNING, "OStatus: skipping post with bad author: got $actorUri expected $this->uri"); + return false; } - } else if ($hmac) { - common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with unexpected HMAC '$hmac'"); - return; + $oprofile = $this; } - require_once "XML/Feed/Parser.php"; - $feed = new XML_Feed_Parser($xml, false, false, true); - $munger = new FeedMunger($feed); + // The id URI will be used as a unique identifier for for the notice, + // protecting against duplicate saves. It isn't required to be a URL; + // tag: URIs for instance are found in Google Buzz feeds. + $sourceUri = $activity->object->id; + $dupe = Notice::staticGet('uri', $sourceUri); + if ($dupe) { + common_log(LOG_INFO, "OStatus: ignoring duplicate post: $sourceUri"); + return false; + } - $hits = 0; - foreach ($feed as $index => $entry) { - // @fixme this might sort in wrong order if we get multiple updates + // We'll also want to save a web link to the original notice, if provided. + $sourceUrl = null; + if ($activity->object->link) { + $sourceUrl = $activity->object->link; + } else if ($activity->link) { + $sourceUrl = $activity->link; + } else if (preg_match('!^https?://!', $activity->object->id)) { + $sourceUrl = $activity->object->id; + } - $notice = $munger->notice($index); + // Get (safe!) HTML and text versions of the content + $rendered = $this->purify($activity->object->content); + $content = html_entity_decode(strip_tags($rendered)); - // Double-check for oldies - // @fixme this could explode horribly for multiple feeds on a blog. sigh + $options = array('is_local' => Notice::REMOTE_OMB, + 'url' => $sourceUrl, + 'uri' => $sourceUri, + 'rendered' => $rendered, + 'replies' => array(), + 'groups' => array()); - $dupe = Notice::staticGet('uri', $notice->uri); + // Check for optional attributes... - if (!empty($dupe)) { - common_log(LOG_WARNING, __METHOD__ . ": tried to save dupe notice for entry {$notice->uri} of feed {$this->feeduri}"); - continue; + if (!empty($activity->time)) { + $options['created'] = common_sql_date($activity->time); + } + + if ($activity->context) { + // Any individual or group attn: targets? + $replies = $activity->context->attention; + $options['groups'] = $this->filterReplies($oprofile, $replies); + $options['replies'] = $replies; + + // Maintain direct reply associations + // @fixme what about conversation ID? + if (!empty($activity->context->replyToID)) { + $orig = Notice::staticGet('uri', + $activity->context->replyToID); + if (!empty($orig)) { + $options['reply_to'] = $orig->id; + } } - // @fixme need to ensure that groups get handled correctly - $saved = Notice::saveNew($notice->profile_id, - $notice->content, + $location = $activity->context->location; + if ($location) { + $options['lat'] = $location->lat; + $options['lon'] = $location->lon; + if ($location->location_id) { + $options['location_ns'] = $location->location_ns; + $options['location_id'] = $location->location_id; + } + } + } + + try { + $saved = Notice::saveNew($oprofile->profile_id, + $content, 'ostatus', - array('is_local' => Notice::REMOTE_OMB, - 'uri' => $notice->uri, - 'lat' => $notice->lat, - 'lon' => $notice->lon, - 'location_ns' => $notice->location_ns, - 'location_id' => $notice->location_id)); - - /* - common_log(LOG_DEBUG, "going to check group delivery..."); - if ($this->group_id) { - $group = User_group::staticGet($this->group_id); + $options); + if ($saved) { + Ostatus_source::saveNew($saved, $this, $method); + } + } catch (Exception $e) { + common_log(LOG_ERR, "OStatus save of remote message $sourceUri failed: " . $e->getMessage()); + throw $e; + } + common_log(LOG_INFO, "OStatus saved remote message $sourceUri as notice id $saved->id"); + return $saved; + } + + /** + * Clean up HTML + */ + protected function purify($html) + { + require_once INSTALLDIR.'/extlib/htmLawed/htmLawed.php'; + $config = array('safe' => 1); + return htmLawed($html, $config); + } + + /** + * Filters a list of recipient ID URIs to just those for local delivery. + * @param Ostatus_profile local profile of sender + * @param array in/out &$attention_uris set of URIs, will be pruned on output + * @return array of group IDs + */ + protected function filterReplies($sender, &$attention_uris) + { + common_log(LOG_DEBUG, "Original reply recipients: " . implode(', ', $attention_uris)); + $groups = array(); + $replies = array(); + foreach ($attention_uris as $recipient) { + // Is the recipient a local user? + $user = User::staticGet('uri', $recipient); + if ($user) { + // @fixme sender verification, spam etc? + $replies[] = $recipient; + continue; + } + + // Is the recipient a remote group? + $oprofile = Ostatus_profile::staticGet('uri', $recipient); + if ($oprofile) { + if ($oprofile->isGroup()) { + // Deliver to local members of this remote group. + // @fixme sender verification? + $groups[] = $oprofile->group_id; + } else { + common_log(LOG_DEBUG, "Skipping reply to remote profile $recipient"); + } + continue; + } + + // Is the recipient a local group? + // @fixme we need a uri on user_group + // $group = User_group::staticGet('uri', $recipient); + $template = common_local_url('groupbyid', array('id' => '31337')); + $template = preg_quote($template, '/'); + $template = str_replace('31337', '(\d+)', $template); + if (preg_match("/$template/", $recipient, $matches)) { + $id = $matches[1]; + $group = User_group::staticGet('id', $id); if ($group) { - common_log(LOG_INFO, __METHOD__ . ": saving to local shadow group $group->id $group->nickname"); - $groups = array($group); + // Deliver to all members of this local group if allowed. + $profile = $sender->localProfile(); + if ($profile->isMember($group)) { + $groups[] = $group->id; + } else { + common_log(LOG_DEBUG, "Skipping reply to local group $group->nickname as sender $profile->id is not a member"); + } + continue; } else { - common_log(LOG_INFO, __METHOD__ . ": lost the local shadow group?"); + common_log(LOG_DEBUG, "Skipping reply to bogus group $recipient"); } - } else { - common_log(LOG_INFO, __METHOD__ . ": no local shadow groups"); - $groups = array(); } - common_log(LOG_DEBUG, "going to add to inboxes..."); - $notice->addToInboxes($groups, array()); - common_log(LOG_DEBUG, "added to inboxes."); - */ - $hits++; + common_log(LOG_DEBUG, "Skipping reply to unrecognized profile $recipient"); + + } + $attention_uris = $replies; + common_log(LOG_DEBUG, "Local reply recipients: " . implode(', ', $replies)); + common_log(LOG_DEBUG, "Local group recipients: " . implode(', ', $groups)); + return $groups; + } + + /** + * @param string $profile_url + * @return Ostatus_profile + * @throws FeedSubException + */ + public static function ensureProfile($profile_uri, $hints=array()) + { + // Get the canonical feed URI and check it + $discover = new FeedDiscovery(); + $feeduri = $discover->discoverFromURL($profile_uri); + + //$feedsub = FeedSub::ensureFeed($feeduri, $discover->feed); + $huburi = $discover->getAtomLink('hub'); + $salmonuri = $discover->getAtomLink('salmon'); + + if (!$huburi) { + // We can only deal with folks with a PuSH hub + throw new FeedSubNoHubException(); } - if ($hits == 0) { - common_log(LOG_INFO, __METHOD__ . ": no updates in packet for \"$this->feeduri\"! $xml"); + + // Try to get a profile from the feed activity:subject + + $feedEl = $discover->feed->documentElement; + + $subject = ActivityUtils::child($feedEl, Activity::SUBJECT, Activity::SPEC); + + if (!empty($subject)) { + $subjObject = new ActivityObject($subject); + return self::ensureActivityObjectProfile($subjObject, $feeduri, $salmonuri, $hints); } + + // Otherwise, try the feed author + + $author = ActivityUtils::child($feedEl, Activity::AUTHOR, Activity::ATOM); + + if (!empty($author)) { + $authorObject = new ActivityObject($author); + return self::ensureActivityObjectProfile($authorObject, $feeduri, $salmonuri, $hints); + } + + // Sheesh. Not a very nice feed! Let's try fingerpoken in the + // entries. + + $entries = $discover->feed->getElementsByTagNameNS(Activity::ATOM, 'entry'); + + if (!empty($entries) && $entries->length > 0) { + + $entry = $entries->item(0); + + $actor = ActivityUtils::child($entry, Activity::ACTOR, Activity::SPEC); + + if (!empty($actor)) { + $actorObject = new ActivityObject($actor); + return self::ensureActivityObjectProfile($actorObject, $feeduri, $salmonuri, $hints); + + } + + $author = ActivityUtils::child($entry, Activity::AUTHOR, Activity::ATOM); + + if (!empty($author)) { + $authorObject = new ActivityObject($author); + return self::ensureActivityObjectProfile($authorObject, $feeduri, $salmonuri, $hints); + } + } + + // XXX: make some educated guesses here + + throw new FeedSubException("Can't find enough profile information to make a feed."); + } + + /** + * + * Download and update given avatar image + * @param string $url + * @throws Exception in various failure cases + */ + protected function updateAvatar($url) + { + if ($url == $this->avatar) { + // We've already got this one. + return; + } + + if ($this->isGroup()) { + $self = $this->localGroup(); + } else { + $self = $this->localProfile(); + } + if (!$self) { + throw new ServerException(sprintf( + _m("Tried to update avatar for unsaved remote profile %s"), + $this->uri)); + } + + // @fixme this should be better encapsulated + // ripped from oauthstore.php (for old OMB client) + $temp_filename = tempnam(sys_get_temp_dir(), 'listener_avatar'); + if (!copy($url, $temp_filename)) { + throw new ServerException(sprintf(_m("Unable to fetch avatar from %s"), $url)); + } + + if ($this->isGroup()) { + $id = $this->group_id; + } else { + $id = $this->profile_id; + } + // @fixme should we be using different ids? + $imagefile = new ImageFile($id, $temp_filename); + $filename = Avatar::filename($id, + image_type_to_extension($imagefile->type), + null, + common_timestamp()); + rename($temp_filename, Avatar::path($filename)); + $self->setOriginal($filename); + + $orig = clone($this); + $this->avatar = $url; + $this->update($orig); + } + + /** + * Pull avatar URL from ActivityObject or profile hints + * + * @param ActivityObject $object + * @param array $hints + * @return mixed URL string or false + */ + + protected static function getActivityObjectAvatar($object, $hints=array()) + { + if ($object->avatar) { + return $object->avatar; + } else if (array_key_exists('avatar', $hints)) { + return $hints['avatar']; + } + return false; + } + + /** + * Get an appropriate avatar image source URL, if available. + * + * @param ActivityObject $actor + * @param DOMElement $feed + * @return string + */ + + protected static function getAvatar($actor, $feed) + { + $url = ''; + $icon = ''; + if ($actor->avatar) { + $url = trim($actor->avatar); + } + if (!$url) { + // Check <atom:logo> and <atom:icon> on the feed + $els = $feed->childNodes(); + if ($els && $els->length) { + for ($i = 0; $i < $els->length; $i++) { + $el = $els->item($i); + if ($el->namespaceURI == Activity::ATOM) { + if (empty($url) && $el->localName == 'logo') { + $url = trim($el->textContent); + break; + } + if (empty($icon) && $el->localName == 'icon') { + // Use as a fallback + $icon = trim($el->textContent); + } + } + } + } + if ($icon && !$url) { + $url = $icon; + } + } + if ($url) { + $opts = array('allowed_schemes' => array('http', 'https')); + if (Validate::uri($url, $opts)) { + return $url; + } + } + return common_path('plugins/OStatus/images/96px-Feed-icon.svg.png'); + } + + /** + * Fetch, or build if necessary, an Ostatus_profile for the actor + * in a given Activity Streams activity. + * + * @param Activity $activity + * @param string $feeduri if we already know the canonical feed URI! + * @param string $salmonuri if we already know the salmon return channel URI + * @return Ostatus_profile + */ + + public static function ensureActorProfile($activity, $feeduri=null, $salmonuri=null) + { + return self::ensureActivityObjectProfile($activity->actor, $feeduri, $salmonuri); + } + + public static function ensureActivityObjectProfile($object, $feeduri=null, $salmonuri=null, $hints=array()) + { + $profile = self::getActivityObjectProfile($object); + if ($profile) { + $profile->updateFromActivityObject($object, $hints); + } else { + $profile = self::createActivityObjectProfile($object, $feeduri, $salmonuri, $hints); + } + return $profile; + } + + /** + * @param Activity $activity + * @return mixed matching Ostatus_profile or false if none known + */ + public static function getActorProfile($activity) + { + return self::getActivityObjectProfile($activity->actor); + } + + protected static function getActivityObjectProfile($object) + { + $uri = self::getActivityObjectProfileURI($object); + return Ostatus_profile::staticGet('uri', $uri); + } + + protected static function getActorProfileURI($activity) + { + return self::getActivityObjectProfileURI($activity->actor); + } + + /** + * @param Activity $activity + * @return string + * @throws ServerException + */ + protected static function getActivityObjectProfileURI($object) + { + $opts = array('allowed_schemes' => array('http', 'https')); + if ($object->id && Validate::uri($object->id, $opts)) { + return $object->id; + } + if ($object->link && Validate::uri($object->link, $opts)) { + return $object->link; + } + throw new ServerException("No author ID URI found"); + } + + /** + * @fixme validate stuff somewhere + */ + + protected static function createActorProfile($activity, $feeduri=null, $salmonuri=null) + { + $actor = $activity->actor; + + self::createActivityObjectProfile($actor, $feeduri, $salmonuri); + } + + /** + * Create local ostatus_profile and profile/user_group entries for + * the provided remote user or group. + * + * @param ActivityObject $object + * @param string $feeduri + * @param string $salmonuri + * @param array $hints + * + * @fixme fold $feeduri/$salmonuri into $hints + * @return Ostatus_profile + */ + protected static function createActivityObjectProfile($object, $feeduri=null, $salmonuri=null, $hints=array()) + { + $homeuri = $object->id; + + if (!$homeuri) { + common_log(LOG_DEBUG, __METHOD__ . " empty actor profile URI: " . var_export($activity, true)); + throw new ServerException("No profile URI"); + } + + if (empty($feeduri)) { + if (array_key_exists('feedurl', $hints)) { + $feeduri = $hints['feedurl']; + } + } + + if (empty($salmonuri)) { + if (array_key_exists('salmon', $hints)) { + $salmonuri = $hints['salmon']; + } + } + + if (!$feeduri || !$salmonuri) { + // Get the canonical feed URI and check it + $discover = new FeedDiscovery(); + $feeduri = $discover->discoverFromURL($homeuri); + + $huburi = $discover->getAtomLink('hub'); + $salmonuri = $discover->getAtomLink('salmon'); + + if (!$huburi) { + // We can only deal with folks with a PuSH hub + throw new FeedSubNoHubException(); + } + } + + $oprofile = new Ostatus_profile(); + + $oprofile->uri = $homeuri; + $oprofile->feeduri = $feeduri; + $oprofile->salmonuri = $salmonuri; + + $oprofile->created = common_sql_now(); + $oprofile->modified = common_sql_now(); + + if ($object->type == ActivityObject::PERSON) { + $profile = new Profile(); + self::updateProfile($profile, $object, $hints); + $profile->created = common_sql_now(); + + $oprofile->profile_id = $profile->insert(); + if (!$oprofile->profile_id) { + throw new ServerException("Can't save local profile"); + } + } else { + $group = new User_group(); + $group->created = common_sql_now(); + self::updateGroup($group, $object, $hints); + + $oprofile->group_id = $group->insert(); + if (!$oprofile->group_id) { + throw new ServerException("Can't save local profile"); + } + } + + $ok = $oprofile->insert(); + + if ($ok) { + $avatar = self::getActivityObjectAvatar($object, $hints); + if ($avatar) { + $oprofile->updateAvatar($avatar); + } + return $oprofile; + } else { + throw new ServerException("Can't save OStatus profile"); + } + } + + /** + * Save any updated profile information to our local copy. + * @param ActivityObject $object + * @param array $hints + */ + public function updateFromActivityObject($object, $hints=array()) + { + if ($this->isGroup()) { + $group = $this->localGroup(); + self::updateGroup($group, $object, $hints); + } else { + $profile = $this->localProfile(); + self::updateProfile($profile, $object, $hints); + } + $avatar = self::getActivityObjectAvatar($object, $hints); + if ($avatar) { + $this->updateAvatar($avatar); + } + } + + protected static function updateProfile($profile, $object, $hints=array()) + { + $orig = clone($profile); + + $profile->nickname = self::getActivityObjectNickname($object, $hints); + $profile->fullname = $object->title; + if (!empty($object->link)) { + $profile->profileurl = $object->link; + } else if (array_key_exists('profileurl', $hints)) { + $profile->profileurl = $hints['profileurl']; + } + + // @fixme bio + // @fixme tags/categories + // @fixme location? + // @todo tags from categories + // @todo lat/lon/location? + + if ($profile->id) { + common_log(LOG_DEBUG, "Updating OStatus profile $profile->id from remote info $object->id: " . var_export($object, true) . var_export($hints, true)); + $profile->update($orig); + } + } + + protected static function updateGroup($group, $object, $hints=array()) + { + $orig = clone($group); + + // @fixme need to make nick unique etc *hack hack* + $group->nickname = self::getActivityObjectNickname($object, $hints); + $group->fullname = $object->title; + + // @fixme no canonical profileurl; using homepage instead for now + $group->homepage = $object->id; + + // @fixme homepage + // @fixme bio + // @fixme tags/categories + // @fixme location? + // @todo tags from categories + // @todo lat/lon/location? + + if ($group->id) { + common_log(LOG_DEBUG, "Updating OStatus group $group->id from remote info $object->id: " . var_export($object, true) . var_export($hints, true)); + $group->update($orig); + } + } + + protected static function getActivityObjectNickname($object, $hints=array()) + { + if ($object->poco) { + if (!empty($object->poco->preferredUsername)) { + return common_nicknamize($object->poco->preferredUsername); + } + } + if (!empty($object->nickname)) { + return common_nicknamize($object->nickname); + } + + // Try the definitive ID + + $nickname = self::nicknameFromURI($object->id); + + // Try a Webfinger if one was passed (way) down + + if (empty($nickname)) { + if (array_key_exists('webfinger', $hints)) { + $nickname = self::nicknameFromURI($hints['webfinger']); + } + } + + // Try the name + + if (empty($nickname)) { + $nickname = common_nicknamize($object->title); + } + + return $nickname; + } + + protected static function nicknameFromURI($uri) + { + preg_match('/(\w+):/', $uri, $matches); + + $protocol = $matches[1]; + + switch ($protocol) { + case 'acct': + case 'mailto': + if (preg_match("/^$protocol:(.*)?@.*\$/", $uri, $matches)) { + return common_canonical_nickname($matches[1]); + } + return null; + case 'http': + return common_url_to_nickname($uri); + break; + default: + return null; + } + } + + public static function ensureWebfinger($addr) + { + // First, look it up + + $oprofile = Ostatus_profile::staticGet('uri', 'acct:'.$addr); + + if (!empty($oprofile)) { + return $oprofile; + } + + // Now, try some discovery + + $wf = new Webfinger(); + + $result = $wf->lookup($addr); + + if (!$result) { + return null; + } + + foreach ($result->links as $link) { + switch ($link['rel']) { + case Webfinger::PROFILEPAGE: + $profileUrl = $link['href']; + break; + case 'salmon': + $salmonEndpoint = $link['href']; + break; + case Webfinger::UPDATESFROM: + $feedUrl = $link['href']; + break; + default: + common_log(LOG_NOTICE, "Don't know what to do with rel = '{$link['rel']}'"); + break; + } + } + + $hints = array('webfinger' => $addr, + 'profileurl' => $profileUrl, + 'feedurl' => $feedUrl, + 'salmon' => $salmonEndpoint); + + // If we got a feed URL, try that + + if (isset($feedUrl)) { + try { + $oprofile = self::ensureProfile($feedUrl, $hints); + return $oprofile; + } catch (Exception $e) { + common_log(LOG_WARNING, "Failed creating profile from feed URL '$feedUrl': " . $e->getMessage()); + // keep looking + } + } + + // If we got a profile page, try that! + + if (isset($profileUrl)) { + try { + $oprofile = self::ensureProfile($profileUrl, $hints); + return $oprofile; + } catch (Exception $e) { + common_log(LOG_WARNING, "Failed creating profile from profile URL '$profileUrl': " . $e->getMessage()); + // keep looking + } + } + + // XXX: try hcard + // XXX: try FOAF + + if (isset($salmonEndpoint)) { + + // An account URL, a salmon endpoint, and a dream? Not much to go + // on, but let's give it a try + + $uri = 'acct:'.$addr; + + $profile = new Profile(); + + $profile->nickname = self::nicknameFromUri($uri); + $profile->created = common_sql_now(); + + if (isset($profileUrl)) { + $profile->profileurl = $profileUrl; + } + + $profile_id = $profile->insert(); + + if (!$profile_id) { + common_log_db_error($profile, 'INSERT', __FILE__); + throw new Exception("Couldn't save profile for '$addr'"); + } + + $oprofile = new Ostatus_profile(); + + $oprofile->uri = $uri; + $oprofile->salmonuri = $salmonEndpoint; + $oprofile->profile_id = $profile_id; + $oprofile->created = common_sql_now(); + + if (isset($feedUrl)) { + $profile->feeduri = $feedUrl; + } + + $result = $oprofile->insert(); + + if (!$result) { + common_log_db_error($oprofile, 'INSERT', __FILE__); + throw new Exception("Couldn't save ostatus_profile for '$addr'"); + } + + return $oprofile; + } + + return null; } } |