summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/action.php8
-rw-r--r--lib/queuehandler.php91
-rw-r--r--lib/router.php6
-rw-r--r--lib/twitter.php14
-rw-r--r--lib/util.php80
5 files changed, 160 insertions, 39 deletions
diff --git a/lib/action.php b/lib/action.php
index 5d0d5b758..75531d34b 100644
--- a/lib/action.php
+++ b/lib/action.php
@@ -830,10 +830,12 @@ class Action extends HTMLOutputter // lawsuit
}
if ($lm) {
header('Last-Modified: ' . date(DATE_RFC1123, $lm));
- if (isset($_SERVER['HTTP_IF_MODIFIED_SINCE'])) {
- $ims = strtotime($_SERVER['HTTP_IF_MODIFIED_SINCE']);
+ if (array_key_exists('HTTP_IF_MODIFIED_SINCE', $_SERVER)) {
+ $if_modified_since = $_SERVER['HTTP_IF_MODIFIED_SINCE'];
+ $ims = strtotime($if_modified_since);
if ($lm <= $ims) {
- $if_none_match = $_SERVER['HTTP_IF_NONE_MATCH'];
+ $if_none_match = (array_key_exists('HTTP_IF_NONE_MATCH', $_SERVER)) ?
+ $_SERVER['HTTP_IF_NONE_MATCH'] : null;
if (!$if_none_match ||
!$etag ||
$this->_hasEtag($etag, $if_none_match)) {
diff --git a/lib/queuehandler.php b/lib/queuehandler.php
index 9ce9e32b3..f76f16e07 100644
--- a/lib/queuehandler.php
+++ b/lib/queuehandler.php
@@ -36,7 +36,7 @@ class QueueHandler extends Daemon
$this->set_id($id);
}
}
-
+
function class_name()
{
return ucfirst($this->transport()) . 'Handler';
@@ -46,7 +46,7 @@ class QueueHandler extends Daemon
{
return strtolower($this->class_name().'.'.$this->get_id());
}
-
+
function get_id()
{
return $this->_id;
@@ -56,16 +56,16 @@ class QueueHandler extends Daemon
{
$this->_id = $id;
}
-
+
function transport()
{
return null;
}
-
+
function start()
{
}
-
+
function finish()
{
}
@@ -74,16 +74,10 @@ class QueueHandler extends Daemon
{
return true;
}
-
- function run()
- {
- if (!$this->start()) {
- return false;
- }
- $this->log(LOG_INFO, 'checking for queued notices');
- $transport = $this->transport();
+
+ function db_dispatch() {
do {
- $qi = Queue_item::top($transport);
+ $qi = Queue_item::top($this->transport());
if ($qi) {
$this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created));
$notice = Notice::staticGet($qi->notice_id);
@@ -113,8 +107,69 @@ class QueueHandler extends Daemon
} else {
$this->clear_old_claims();
$this->idle(5);
- }
+ }
} while (true);
+ }
+
+ function stomp_dispatch() {
+ require("Stomp.php");
+ $con = new Stomp(common_config('queue','stomp_server'));
+ if (!$con->connect()) {
+ $this->log(LOG_ERR, 'Failed to connect to queue server');
+ return false;
+ }
+ $queue_basename = common_config('queue','queue_basename');
+ // subscribe to the relevant queue (format: basename-transport)
+ $con->subscribe('/queue/'.$queue_basename.'-'.$this->transport());
+
+ do {
+ $frame = $con->readFrame();
+ if ($frame) {
+ $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created']));
+
+ // XXX: Now the queue handler receives only the ID of the
+ // notice, and it has to get it from the DB
+ // A massive improvement would be avoid DB query by transmitting
+ // all the notice details via queue server...
+ $notice = Notice::staticGet($frame->body);
+
+ if ($notice) {
+ $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
+ $result = $this->handle_notice($notice);
+ if ($result) {
+ // if the msg has been handled positively, ack it
+ // and the queue server will remove it from the queue
+ $con->ack($frame);
+ $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
+ }
+ else {
+ // no ack
+ $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
+ }
+ $notice->free();
+ unset($notice);
+ $notice = null;
+ } else {
+ $this->log(LOG_WARNING, 'queue item for notice that does not exist');
+ }
+ }
+ } while (true);
+
+ $con->disconnect();
+ }
+
+ function run()
+ {
+ if (!$this->start()) {
+ return false;
+ }
+ $this->log(LOG_INFO, 'checking for queued notices');
+ if (common_config('queue','subsystem') == 'stomp') {
+ $this->stomp_dispatch();
+ }
+ else {
+ $this->db_dispatch();
+ }
if (!$this->finish()) {
return false;
}
@@ -127,7 +182,7 @@ class QueueHandler extends Daemon
sleep($timeout);
}
}
-
+
function clear_old_claims()
{
$qi = new Queue_item();
@@ -137,10 +192,10 @@ class QueueHandler extends Daemon
$qi->free();
unset($qi);
}
-
+
function log($level, $msg)
{
common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
}
}
- \ No newline at end of file
+
diff --git a/lib/router.php b/lib/router.php
index 6fb2f9487..060253eb5 100644
--- a/lib/router.php
+++ b/lib/router.php
@@ -165,6 +165,12 @@ class Router
array('action' => 'deletenotice'),
array('notice' => '[0-9]+'));
+ // conversation
+
+ $m->connect('conversation/:id',
+ array('action' => 'conversation'),
+ array('id' => '[0-9]+'));
+
$m->connect('message/new', array('action' => 'newmessage'));
$m->connect('message/new?to=:to', array('action' => 'newmessage'), array('to' => '[A-Za-z0-9_-]+'));
$m->connect('message/:message',
diff --git a/lib/twitter.php b/lib/twitter.php
index 7abb40151..db2092210 100644
--- a/lib/twitter.php
+++ b/lib/twitter.php
@@ -210,7 +210,7 @@ function save_twitter_friends($user, $twitter_id, $screen_name, $password)
function is_twitter_bound($notice, $flink) {
// Check to see if notice should go to Twitter
- if ($flink->noticesync & FOREIGN_NOTICE_SEND) {
+ if (!empty($flink) && ($flink->noticesync & FOREIGN_NOTICE_SEND)) {
// If it's not a Twitter-style reply, or if the user WANTS to send replies.
if (!preg_match('/^@[a-zA-Z0-9_]{1,15}\b/u', $notice->content) ||
@@ -218,7 +218,7 @@ function is_twitter_bound($notice, $flink) {
return true;
}
}
-
+
return false;
}
@@ -226,10 +226,10 @@ function broadcast_twitter($notice)
{
$success = true;
- $flink = Foreign_link::getByUserID($notice->profile_id,
+ $flink = Foreign_link::getByUserID($notice->profile_id,
TWITTER_SERVICE);
-
- // XXX: Not sure WHERE to check whether a notice should go to
+
+ // XXX: Not sure WHERE to check whether a notice should go to
// Twitter. Should we even put in the queue if it shouldn't? --Zach
if (!is_null($flink) && is_twitter_bound($notice, $flink)) {
@@ -244,7 +244,7 @@ function broadcast_twitter($notice)
$options = array(
CURLOPT_USERPWD => "$twitter_user:$twitter_password",
CURLOPT_POST => true,
- CURLOPT_POSTFIELDS =>
+ CURLOPT_POSTFIELDS =>
array(
'status' => $statustxt,
'source' => common_config('integration', 'source')
@@ -292,7 +292,7 @@ function broadcast_twitter($notice)
$success = false;
}
}
-
+
return $success;
}
diff --git a/lib/util.php b/lib/util.php
index 73410e289..ad1044fa2 100644
--- a/lib/util.php
+++ b/lib/util.php
@@ -876,18 +876,76 @@ function common_broadcast_notice($notice, $remote=false)
function common_enqueue_notice($notice)
{
- foreach (array('jabber', 'omb', 'sms', 'public', 'twitter', 'facebook', 'ping') as $transport) {
- $qi = new Queue_item();
- $qi->notice_id = $notice->id;
- $qi->transport = $transport;
- $qi->created = $notice->created;
- $result = $qi->insert();
- if (!$result) {
- $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError');
- common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message);
- return false;
+ if (common_config('queue','subsystem') == 'stomp') {
+ // use an external message queue system via STOMP
+ require_once("Stomp.php");
+ $con = new Stomp(common_config('queue','stomp_server'));
+ if (!$con->connect()) {
+ common_log(LOG_ERR, 'Failed to connect to queue server');
+ return false;
+ }
+ $queue_basename = common_config('queue','queue_basename');
+ foreach (array('jabber', 'omb', 'sms', 'public', 'twitter', 'facebook', 'ping') as $transport) {
+ if (!$con->send(
+ '/queue/'.$queue_basename.'-'.$transport, // QUEUE
+ $notice->id, // BODY of the message
+ array ( // HEADERS of the msg
+ 'created' => $notice->created
+ ))) {
+ common_log(LOG_ERR, 'Error sending to '.$transport.' queue');
+ return false;
+ }
+ common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $transport);
+ }
+
+ //send tags as headers, so they can be used as JMS selectors
+ common_log(LOG_DEBUG, 'searching for tags ' . $notice->id);
+ $tags = array();
+ $tag = new Notice_tag();
+ $tag->notice_id = $notice->id;
+ if ($tag->find()) {
+ while ($tag->fetch()) {
+ common_log(LOG_DEBUG, 'tag found = ' . $tag->tag);
+ array_push($tags,$tag->tag);
+ }
+ }
+ $tag->free();
+
+ $con->send('/topic/laconica.'.$notice->profile_id,
+ $notice->content,
+ array(
+ 'profile_id' => $notice->profile_id,
+ 'created' => $notice->created,
+ 'tags' => implode($tags,' - ')
+ )
+ );
+ common_log(LOG_DEBUG, 'sent to personal topic ' . $notice->id);
+ $con->send('/topic/laconica.allusers',
+ $notice->content,
+ array(
+ 'profile_id' => $notice->profile_id,
+ 'created' => $notice->created,
+ 'tags' => implode($tags,' - ')
+ )
+ );
+ common_log(LOG_DEBUG, 'sent to catch-all topic ' . $notice->id);
+ $result = true;
+ }
+ else {
+ // in any other case, 'internal'
+ foreach (array('jabber', 'omb', 'sms', 'public', 'twitter', 'facebook', 'ping') as $transport) {
+ $qi = new Queue_item();
+ $qi->notice_id = $notice->id;
+ $qi->transport = $transport;
+ $qi->created = $notice->created;
+ $result = $qi->insert();
+ if (!$result) {
+ $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError');
+ common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message);
+ return false;
+ }
+ common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport);
}
- common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport);
}
return $result;
}