summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/stompqueuemanager.php68
-rw-r--r--plugins/UserFlag/User_flag_profile.php11
2 files changed, 74 insertions, 5 deletions
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index f057bd9e4..8f0091a13 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -41,6 +41,10 @@ class StompQueueManager extends QueueManager
protected $sites = array();
+ protected $useTransactions = true;
+ protected $transaction = null;
+ protected $transactionCount = 0;
+
function __construct()
{
parent::__construct();
@@ -201,6 +205,7 @@ class StompQueueManager extends QueueManager
} else {
$this->doSubscribe();
}
+ $this->begin();
return true;
}
@@ -213,6 +218,9 @@ class StompQueueManager extends QueueManager
*/
public function finish()
{
+ // If there are any outstanding delivered messages we haven't processed,
+ // free them for another thread to take.
+ $this->rollback();
if ($this->sites) {
foreach ($this->sites as $server) {
StatusNet::init($server);
@@ -293,7 +301,9 @@ class StompQueueManager extends QueueManager
$notice = Notice::staticGet('id', $id);
if (empty($notice)) {
$this->_log(LOG_WARNING, "Skipping missing $info");
- $this->con->ack($frame);
+ $this->ack($frame);
+ $this->commit();
+ $this->begin();
$this->stats('badnotice', $queue);
return false;
}
@@ -308,7 +318,9 @@ class StompQueueManager extends QueueManager
$handler = $this->getHandler($queue);
if (!$handler) {
$this->_log(LOG_ERROR, "Missing handler class; skipping $info");
- $this->con->ack($frame);
+ $this->ack($frame);
+ $this->commit();
+ $this->begin();
$this->stats('badhandler', $queue);
return false;
}
@@ -320,14 +332,18 @@ class StompQueueManager extends QueueManager
// FIXME we probably shouldn't have to do
// this kind of queue management ourselves;
// if we don't ack, it should resend...
- $this->con->ack($frame);
+ $this->ack($frame);
$this->enqueue($item, $queue);
+ $this->commit();
+ $this->begin();
$this->stats('requeued', $queue);
return false;
}
$this->_log(LOG_INFO, "Successfully handled $info");
- $this->con->ack($frame);
+ $this->ack($frame);
+ $this->commit();
+ $this->begin();
$this->stats('handled', $queue);
return true;
}
@@ -369,5 +385,49 @@ class StompQueueManager extends QueueManager
{
common_log($level, 'StompQueueManager: '.$msg);
}
+
+ protected function begin()
+ {
+ if ($this->useTransactions) {
+ if ($this->transaction) {
+ throw new Exception("Tried to start transaction in the middle of a transaction");
+ }
+ $this->transactionCount++;
+ $this->transaction = $this->master->id . '-' . $this->transactionCount . '-' . time();
+ $this->con->begin($this->transaction);
+ }
+ }
+
+ protected function ack($frame)
+ {
+ if ($this->useTransactions) {
+ if (!$this->transaction) {
+ throw new Exception("Tried to ack but not in a transaction");
+ }
+ }
+ $this->con->ack($frame, $this->transaction);
+ }
+
+ protected function commit()
+ {
+ if ($this->useTransactions) {
+ if (!$this->transaction) {
+ throw new Exception("Tried to commit but not in a transaction");
+ }
+ $this->con->commit($this->transaction);
+ $this->transaction = null;
+ }
+ }
+
+ protected function rollback()
+ {
+ if ($this->useTransactions) {
+ if (!$this->transaction) {
+ throw new Exception("Tried to rollback but not in a transaction");
+ }
+ $this->con->commit($this->transaction);
+ $this->transaction = null;
+ }
+ }
}
diff --git a/plugins/UserFlag/User_flag_profile.php b/plugins/UserFlag/User_flag_profile.php
index bc4251cf7..86b39160b 100644
--- a/plugins/UserFlag/User_flag_profile.php
+++ b/plugins/UserFlag/User_flag_profile.php
@@ -86,7 +86,7 @@ class User_flag_profile extends Memcached_DataObject
function keys()
{
- return array('profile_id' => 'N', 'user_id' => 'N');
+ return array('profile_id' => 'K', 'user_id' => 'K');
}
/**
@@ -130,6 +130,15 @@ class User_flag_profile extends Memcached_DataObject
return !empty($ufp);
}
+ /**
+ * Create a new flag
+ *
+ * @param integer $user_id ID of user who's flagging
+ * @param integer $profile_id ID of profile being flagged
+ *
+ * @return boolean success flag
+ */
+
static function create($user_id, $profile_id)
{
$ufp = new User_flag_profile();