summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--plugins/OStatus/actions/pushcallback.php1
-rw-r--r--plugins/OStatus/classes/HubSub.php47
-rw-r--r--plugins/OStatus/lib/hubdistribqueuehandler.php5
-rw-r--r--plugins/OStatus/lib/huboutqueuehandler.php18
4 files changed, 46 insertions, 25 deletions
diff --git a/plugins/OStatus/actions/pushcallback.php b/plugins/OStatus/actions/pushcallback.php
index 9e976a80d..35c92c732 100644
--- a/plugins/OStatus/actions/pushcallback.php
+++ b/plugins/OStatus/actions/pushcallback.php
@@ -29,6 +29,7 @@ class PushCallbackAction extends Action
{
function handle()
{
+ StatusNet::setApi(true); // Minimize error messages to aid in debugging
parent::handle();
if ($_SERVER['REQUEST_METHOD'] == 'POST') {
$this->handlePost();
diff --git a/plugins/OStatus/classes/HubSub.php b/plugins/OStatus/classes/HubSub.php
index 0cd4281f8..a81de68e6 100644
--- a/plugins/OStatus/classes/HubSub.php
+++ b/plugins/OStatus/classes/HubSub.php
@@ -227,6 +227,26 @@ class HubSub extends Memcached_DataObject
}
/**
+ * Schedule delivery of a 'fat ping' to the subscriber's callback
+ * endpoint. If queues are disabled, this will run immediately.
+ *
+ * @param string $atom well-formed Atom feed
+ * @param int $retries optional count of retries if POST fails; defaults to hub_retries from config or 0 if unset
+ */
+ function distribute($atom, $retries=null)
+ {
+ if ($retries === null) {
+ $retries = intval(common_config('ostatus', 'hub_retries'));
+ }
+
+ $data = array('sub' => clone($this),
+ 'atom' => $atom,
+ 'retries' => $retries);
+ $qm = QueueManager::get();
+ $qm->enqueue($data, 'hubout');
+ }
+
+ /**
* Send a 'fat ping' to the subscriber's callback endpoint
* containing the given Atom feed chunk.
*
@@ -234,6 +254,7 @@ class HubSub extends Memcached_DataObject
* a higher level; don't just shove in a complete feed!
*
* @param string $atom well-formed Atom feed
+ * @throws Exception (HTTP or general)
*/
function push($atom)
{
@@ -245,24 +266,18 @@ class HubSub extends Memcached_DataObject
$hmac = '(none)';
}
common_log(LOG_INFO, "About to push feed to $this->callback for $this->topic, HMAC $hmac");
- try {
- $request = new HTTPClient();
- $request->setBody($atom);
- $response = $request->post($this->callback, $headers);
- if ($response->isOk()) {
- return true;
- }
- common_log(LOG_ERR, "Error sending PuSH content " .
- "to $this->callback for $this->topic: " .
- $response->getStatus());
- return false;
+ $request = new HTTPClient();
+ $request->setBody($atom);
+ $response = $request->post($this->callback, $headers);
- } catch (Exception $e) {
- common_log(LOG_ERR, "Error sending PuSH content " .
- "to $this->callback for $this->topic: " .
- $e->getMessage());
- return false;
+ if ($response->isOk()) {
+ return true;
+ } else {
+ throw new Exception("Callback returned status: " .
+ $response->getStatus() .
+ "; body: " .
+ trim($response->getBody()));
}
}
}
diff --git a/plugins/OStatus/lib/hubdistribqueuehandler.php b/plugins/OStatus/lib/hubdistribqueuehandler.php
index 245a57f72..30a427e3f 100644
--- a/plugins/OStatus/lib/hubdistribqueuehandler.php
+++ b/plugins/OStatus/lib/hubdistribqueuehandler.php
@@ -124,10 +124,7 @@ class HubDistribQueueHandler extends QueueHandler
common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $sub->topic");
$qm = QueueManager::get();
while ($sub->fetch()) {
- common_log(LOG_INFO, "Prepping PuSH distribution to $sub->callback for $sub->topic");
- $data = array('sub' => clone($sub),
- 'atom' => $atom);
- $qm->enqueue($data, 'hubout');
+ $sub->distribute($atom);
}
}
diff --git a/plugins/OStatus/lib/huboutqueuehandler.php b/plugins/OStatus/lib/huboutqueuehandler.php
index 0791c7e5d..3ad94646e 100644
--- a/plugins/OStatus/lib/huboutqueuehandler.php
+++ b/plugins/OStatus/lib/huboutqueuehandler.php
@@ -33,6 +33,7 @@ class HubOutQueueHandler extends QueueHandler
{
$sub = $data['sub'];
$atom = $data['atom'];
+ $retries = $data['retries'];
assert($sub instanceof HubSub);
assert(is_string($atom));
@@ -40,13 +41,20 @@ class HubOutQueueHandler extends QueueHandler
try {
$sub->push($atom);
} catch (Exception $e) {
- common_log(LOG_ERR, "Failed PuSH to $sub->callback for $sub->topic: " .
- $e->getMessage());
- // @fixme Reschedule a later delivery?
- return true;
+ $retries--;
+ $msg = "Failed PuSH to $sub->callback for $sub->topic: " .
+ $e->getMessage();
+ if ($retries > 0) {
+ common_log(LOG_ERR, "$msg; scheduling for $retries more tries");
+
+ // @fixme when we have infrastructure to schedule a retry
+ // after a delay, use it.
+ $sub->distribute($atom, $retries);
+ } else {
+ common_log(LOG_ERR, "$msg; discarding");
+ }
}
return true;
}
}
-