diff options
-rw-r--r-- | plugins/OStatus/actions/pushcallback.php | 1 | ||||
-rw-r--r-- | plugins/OStatus/classes/HubSub.php | 47 | ||||
-rw-r--r-- | plugins/OStatus/lib/hubdistribqueuehandler.php | 5 | ||||
-rw-r--r-- | plugins/OStatus/lib/huboutqueuehandler.php | 18 |
4 files changed, 46 insertions, 25 deletions
diff --git a/plugins/OStatus/actions/pushcallback.php b/plugins/OStatus/actions/pushcallback.php index 9e976a80d..35c92c732 100644 --- a/plugins/OStatus/actions/pushcallback.php +++ b/plugins/OStatus/actions/pushcallback.php @@ -29,6 +29,7 @@ class PushCallbackAction extends Action { function handle() { + StatusNet::setApi(true); // Minimize error messages to aid in debugging parent::handle(); if ($_SERVER['REQUEST_METHOD'] == 'POST') { $this->handlePost(); diff --git a/plugins/OStatus/classes/HubSub.php b/plugins/OStatus/classes/HubSub.php index 0cd4281f8..a81de68e6 100644 --- a/plugins/OStatus/classes/HubSub.php +++ b/plugins/OStatus/classes/HubSub.php @@ -227,6 +227,26 @@ class HubSub extends Memcached_DataObject } /** + * Schedule delivery of a 'fat ping' to the subscriber's callback + * endpoint. If queues are disabled, this will run immediately. + * + * @param string $atom well-formed Atom feed + * @param int $retries optional count of retries if POST fails; defaults to hub_retries from config or 0 if unset + */ + function distribute($atom, $retries=null) + { + if ($retries === null) { + $retries = intval(common_config('ostatus', 'hub_retries')); + } + + $data = array('sub' => clone($this), + 'atom' => $atom, + 'retries' => $retries); + $qm = QueueManager::get(); + $qm->enqueue($data, 'hubout'); + } + + /** * Send a 'fat ping' to the subscriber's callback endpoint * containing the given Atom feed chunk. * @@ -234,6 +254,7 @@ class HubSub extends Memcached_DataObject * a higher level; don't just shove in a complete feed! * * @param string $atom well-formed Atom feed + * @throws Exception (HTTP or general) */ function push($atom) { @@ -245,24 +266,18 @@ class HubSub extends Memcached_DataObject $hmac = '(none)'; } common_log(LOG_INFO, "About to push feed to $this->callback for $this->topic, HMAC $hmac"); - try { - $request = new HTTPClient(); - $request->setBody($atom); - $response = $request->post($this->callback, $headers); - if ($response->isOk()) { - return true; - } - common_log(LOG_ERR, "Error sending PuSH content " . - "to $this->callback for $this->topic: " . - $response->getStatus()); - return false; + $request = new HTTPClient(); + $request->setBody($atom); + $response = $request->post($this->callback, $headers); - } catch (Exception $e) { - common_log(LOG_ERR, "Error sending PuSH content " . - "to $this->callback for $this->topic: " . - $e->getMessage()); - return false; + if ($response->isOk()) { + return true; + } else { + throw new Exception("Callback returned status: " . + $response->getStatus() . + "; body: " . + trim($response->getBody())); } } } diff --git a/plugins/OStatus/lib/hubdistribqueuehandler.php b/plugins/OStatus/lib/hubdistribqueuehandler.php index 245a57f72..30a427e3f 100644 --- a/plugins/OStatus/lib/hubdistribqueuehandler.php +++ b/plugins/OStatus/lib/hubdistribqueuehandler.php @@ -124,10 +124,7 @@ class HubDistribQueueHandler extends QueueHandler common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $sub->topic"); $qm = QueueManager::get(); while ($sub->fetch()) { - common_log(LOG_INFO, "Prepping PuSH distribution to $sub->callback for $sub->topic"); - $data = array('sub' => clone($sub), - 'atom' => $atom); - $qm->enqueue($data, 'hubout'); + $sub->distribute($atom); } } diff --git a/plugins/OStatus/lib/huboutqueuehandler.php b/plugins/OStatus/lib/huboutqueuehandler.php index 0791c7e5d..3ad94646e 100644 --- a/plugins/OStatus/lib/huboutqueuehandler.php +++ b/plugins/OStatus/lib/huboutqueuehandler.php @@ -33,6 +33,7 @@ class HubOutQueueHandler extends QueueHandler { $sub = $data['sub']; $atom = $data['atom']; + $retries = $data['retries']; assert($sub instanceof HubSub); assert(is_string($atom)); @@ -40,13 +41,20 @@ class HubOutQueueHandler extends QueueHandler try { $sub->push($atom); } catch (Exception $e) { - common_log(LOG_ERR, "Failed PuSH to $sub->callback for $sub->topic: " . - $e->getMessage()); - // @fixme Reschedule a later delivery? - return true; + $retries--; + $msg = "Failed PuSH to $sub->callback for $sub->topic: " . + $e->getMessage(); + if ($retries > 0) { + common_log(LOG_ERR, "$msg; scheduling for $retries more tries"); + + // @fixme when we have infrastructure to schedule a retry + // after a delay, use it. + $sub->distribute($atom, $retries); + } else { + common_log(LOG_ERR, "$msg; discarding"); + } } return true; } } - |