diff options
Diffstat (limited to 'plugins/OStatus/classes/HubSub.php')
-rw-r--r-- | plugins/OStatus/classes/HubSub.php | 189 |
1 files changed, 114 insertions, 75 deletions
diff --git a/plugins/OStatus/classes/HubSub.php b/plugins/OStatus/classes/HubSub.php index 7071ee5b4..3120a70f9 100644 --- a/plugins/OStatus/classes/HubSub.php +++ b/plugins/OStatus/classes/HubSub.php @@ -30,12 +30,11 @@ class HubSub extends Memcached_DataObject public $topic; public $callback; public $secret; - public $verify_token; - public $challenge; public $lease; public $sub_start; public $sub_end; public $created; + public $modified; public /*static*/ function staticGet($topic, $callback) { @@ -62,12 +61,11 @@ class HubSub extends Memcached_DataObject 'topic' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, 'callback' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, 'secret' => DB_DATAOBJECT_STR, - 'verify_token' => DB_DATAOBJECT_STR, - 'challenge' => DB_DATAOBJECT_STR, 'lease' => DB_DATAOBJECT_INT, 'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, 'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, - 'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL); + 'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL, + 'modified' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL); } static function schemaDef() @@ -84,10 +82,6 @@ class HubSub extends Memcached_DataObject 255, false), new ColumnDef('secret', 'text', null, true), - new ColumnDef('verify_token', 'text', - null, true), - new ColumnDef('challenge', 'varchar', - 32, true), new ColumnDef('lease', 'int', null, true), new ColumnDef('sub_start', 'datetime', @@ -95,6 +89,8 @@ class HubSub extends Memcached_DataObject new ColumnDef('sub_end', 'datetime', null, true), new ColumnDef('created', 'datetime', + null, false), + new ColumnDef('modified', 'datetime', null, false)); } @@ -103,7 +99,7 @@ class HubSub extends Memcached_DataObject return array_keys($this->keyTypes()); } - function sequenceKeys() + function sequenceKey() { return array(false, false, false); } @@ -152,84 +148,132 @@ class HubSub extends Memcached_DataObject } /** - * Send a verification ping to subscriber + * Schedule a future verification ping to the subscriber. + * If queues are disabled, will be immediate. + * * @param string $mode 'subscribe' or 'unsubscribe' + * @param string $token hub.verify_token value, if provided by client */ - function verify($mode) + function scheduleVerify($mode, $token=null, $retries=null) { - assert($mode == 'subscribe' || $mode == 'unsubscribe'); + if ($retries === null) { + $retries = intval(common_config('ostatus', 'hub_retries')); + } + $data = array('sub' => clone($this), + 'mode' => $mode, + 'token' => $token, + 'retries' => $retries); + $qm = QueueManager::get(); + $qm->enqueue($data, 'hubconf'); + } - // Is this needed? data object fun... - $clone = clone($this); - $clone->challenge = common_good_rand(16); - $clone->update($this); - $this->challenge = $clone->challenge; - unset($clone); + /** + * Send a verification ping to subscriber, and if confirmed apply the changes. + * This may create, update, or delete the database record. + * + * @param string $mode 'subscribe' or 'unsubscribe' + * @param string $token hub.verify_token value, if provided by client + * @throws ClientException on failure + */ + function verify($mode, $token=null) + { + assert($mode == 'subscribe' || $mode == 'unsubscribe'); + $challenge = common_good_rand(32); $params = array('hub.mode' => $mode, 'hub.topic' => $this->topic, - 'hub.challenge' => $this->challenge); + 'hub.challenge' => $challenge); if ($mode == 'subscribe') { $params['hub.lease_seconds'] = $this->lease; } - if ($this->verify_token) { - $params['hub.verify_token'] = $this->verify_token; + if ($token !== null) { + $params['hub.verify_token'] = $token; } - $url = $this->callback . '?' . http_build_query($params, '', '&'); // @fixme ugly urls - try { - $request = new HTTPClient(); - $response = $request->get($url); - $status = $response->getStatus(); - - if ($status >= 200 && $status < 300) { - $fail = false; - } else { - // @fixme how can we schedule a second attempt? - // Or should we? - $fail = "Returned HTTP $status"; - } - } catch (Exception $e) { - $fail = $e->getMessage(); + // Any existing query string parameters must be preserved + $url = $this->callback; + if (strpos('?', $url) !== false) { + $url .= '&'; + } else { + $url .= '?'; } - if ($fail) { - // @fixme how can we schedule a second attempt? - // or save a fail count? - // Or should we? - common_log(LOG_ERR, "Failed to verify $mode for $this->topic at $this->callback: $fail"); - return false; + $url .= http_build_query($params, '', '&'); + + $request = new HTTPClient(); + $response = $request->get($url); + $status = $response->getStatus(); + + if ($status >= 200 && $status < 300) { + common_log(LOG_INFO, "Verified $mode of $this->callback:$this->topic"); } else { - if ($mode == 'subscribe') { - // Establish or renew the subscription! - // This seems unnecessary... dataobject fun! - $clone = clone($this); - $clone->challenge = null; - $clone->setLease($this->lease); - $clone->update($this); - unset($clone); + throw new ClientException("Hub subscriber verification returned HTTP $status"); + } - $this->challenge = null; - $this->setLease($this->lease); - common_log(LOG_ERR, "Verified $mode of $this->callback:$this->topic for $this->lease seconds"); - } else if ($mode == 'unsubscribe') { - common_log(LOG_ERR, "Verified $mode of $this->callback:$this->topic"); - $this->delete(); + $old = HubSub::staticGet($this->topic, $this->callback); + if ($mode == 'subscribe') { + if ($old) { + $this->update($old); + } else { + $ok = $this->insert(); + } + } else if ($mode == 'unsubscribe') { + if ($old) { + $old->delete(); + } else { + // That's ok, we're already unsubscribed. } - return true; } } /** * Insert wrapper; transparently set the hash key from topic and callback columns. - * @return boolean success + * @return mixed success */ function insert() { $this->hashkey = self::hashkey($this->topic, $this->callback); + $this->created = common_sql_now(); + $this->modified = common_sql_now(); return parent::insert(); } /** + * Update wrapper; transparently update modified column. + * @return boolean success + */ + function update($old=null) + { + $this->modified = common_sql_now(); + return parent::update($old); + } + + /** + * Schedule delivery of a 'fat ping' to the subscriber's callback + * endpoint. If queues are disabled, this will run immediately. + * + * @param string $atom well-formed Atom feed + * @param int $retries optional count of retries if POST fails; defaults to hub_retries from config or 0 if unset + */ + function distribute($atom, $retries=null) + { + if ($retries === null) { + $retries = intval(common_config('ostatus', 'hub_retries')); + } + + // We dare not clone() as when the clone is discarded it'll + // destroy the result data for the parent query. + // @fixme use clone() again when it's safe to copy an + // individual item from a multi-item query again. + $sub = HubSub::staticGet($this->topic, $this->callback); + $data = array('sub' => $sub, + 'atom' => $atom, + 'retries' => $retries); + common_log(LOG_INFO, "Queuing PuSH: $this->topic to $this->callback"); + $qm = QueueManager::get(); + $qm->enqueue($data, 'hubout'); + } + + /** * Send a 'fat ping' to the subscriber's callback endpoint * containing the given Atom feed chunk. * @@ -237,6 +281,7 @@ class HubSub extends Memcached_DataObject * a higher level; don't just shove in a complete feed! * * @param string $atom well-formed Atom feed + * @throws Exception (HTTP or general) */ function push($atom) { @@ -248,24 +293,18 @@ class HubSub extends Memcached_DataObject $hmac = '(none)'; } common_log(LOG_INFO, "About to push feed to $this->callback for $this->topic, HMAC $hmac"); - try { - $request = new HTTPClient(); - $request->setBody($atom); - $response = $request->post($this->callback, $headers); - if ($response->isOk()) { - return true; - } - common_log(LOG_ERR, "Error sending PuSH content " . - "to $this->callback for $this->topic: " . - $response->getStatus()); - return false; + $request = new HTTPClient(); + $request->setBody($atom); + $response = $request->post($this->callback, $headers); - } catch (Exception $e) { - common_log(LOG_ERR, "Error sending PuSH content " . - "to $this->callback for $this->topic: " . - $e->getMessage()); - return false; + if ($response->isOk()) { + return true; + } else { + throw new Exception("Callback returned status: " . + $response->getStatus() . + "; body: " . + trim($response->getBody())); } } } |