From 2ac684f53df3f76aacb4d1be79d7db64567f1964 Mon Sep 17 00:00:00 2001 From: Federico Marani Date: Fri, 20 Feb 2009 18:57:20 +0000 Subject: * implemented STOMP support for external message queueing systems * (ticket-724) * included PHP STOMP libraries (http://code.google.com/p/stompcli/) --- lib/queuehandler.php | 74 +++++++++++++++++++++++++++++++++++++++++++++------- lib/util.php | 48 ++++++++++++++++++++++++++-------- 2 files changed, 102 insertions(+), 20 deletions(-) (limited to 'lib') diff --git a/lib/queuehandler.php b/lib/queuehandler.php index 9ce9e32b3..752bbc6d2 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -75,15 +75,9 @@ class QueueHandler extends Daemon return true; } - function run() - { - if (!$this->start()) { - return false; - } - $this->log(LOG_INFO, 'checking for queued notices'); - $transport = $this->transport(); + function db_dispatch() { do { - $qi = Queue_item::top($transport); + $qi = Queue_item::top($this->transport()); if ($qi) { $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created)); $notice = Notice::staticGet($qi->notice_id); @@ -115,6 +109,68 @@ class QueueHandler extends Daemon $this->idle(5); } } while (true); + } + + function stomp_dispatch() { + require("Stomp.php"); + $con = new Stomp(common_config('queue','stomp_server')); + if (!$con->connect()) { + $this->log(LOG_ERR, 'Failed to connect to queue server'); + return false; + } + $queue_basename = common_config('queue','queue_basename'); + // subscribe to the relevant queue (format: basename-transport) + $con->subscribe('/queue/'.$queue_basename.'-'.$this->transport()); + + do { + $frame = $con->readFrame(); + if ($frame) { + $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); + + // XXX: Now the queue handler receives only the ID of the + // notice, and it has to get it from the DB + // A massive improvement would be avoid DB query by transmitting + // all the notice details via queue server... + $notice = Notice::staticGet($frame->body); + + if ($notice) { + $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); + $result = $this->handle_notice($notice); + if ($result) { + // if the msg has been handled positively, ack it + // and the queue server will remove it from the queue + $con->ack($frame); + $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); + } + else { + // no ack + $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); + } + $notice->free(); + unset($notice); + $notice = null; + } else { + $this->log(LOG_WARNING, 'queue item for notice that does not exist'); + } + } + } while (true); + + $con->disconnect(); + } + + + function run() + { + if (!$this->start()) { + return false; + } + $this->log(LOG_INFO, 'checking for queued notices'); + if (common_config('queue','subsystem') == 'stomp') { + $this->stomp_dispatch(); + } + else { + $this->db_dispatch(); + } if (!$this->finish()) { return false; } @@ -143,4 +199,4 @@ class QueueHandler extends Daemon common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); } } - \ No newline at end of file + diff --git a/lib/util.php b/lib/util.php index c5a092f63..8ef19f579 100644 --- a/lib/util.php +++ b/lib/util.php @@ -1138,18 +1138,44 @@ function common_twitter_broadcast($notice, $flink) function common_enqueue_notice($notice) { - foreach (array('jabber', 'omb', 'sms', 'public') as $transport) { - $qi = new Queue_item(); - $qi->notice_id = $notice->id; - $qi->transport = $transport; - $qi->created = $notice->created; - $result = $qi->insert(); - if (!$result) { - $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError'); - common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message); - return false; + if (common_config('queue','subsystem') == 'stomp') { + // use an external message queue system via STOMP + require_once("Stomp.php"); + $con = new Stomp(common_config('queue','stomp_server')); + if (!$con->connect()) { + common_log(LOG_ERR, 'Failed to connect to queue server'); + return false; + } + $queue_basename = common_config('queue','queue_basename'); + foreach (array('jabber', 'omb', 'sms', 'public') as $transport) { + if (!$con->send( + '/queue/'.$queue_basename.'-'.$transport, // QUEUE + $notice->id, // BODY of the message + array ( // HEADERS of the msg + 'created' => $notice->created + ))) { + common_log(LOG_ERR, 'Error sending to '.$transport.' queue'); + return false; + } + common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $transport); + } + $result = true; + } + else { + // in any other case, 'internal' + foreach (array('jabber', 'omb', 'sms', 'public') as $transport) { + $qi = new Queue_item(); + $qi->notice_id = $notice->id; + $qi->transport = $transport; + $qi->created = $notice->created; + $result = $qi->insert(); + if (!$result) { + $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError'); + common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message); + return false; + } + common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport); } - common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport); } return $result; } -- cgit v1.2.3-54-g00ecf From 80d0423026b0bc14c8da820fff7905ba9c0a5d0e Mon Sep 17 00:00:00 2001 From: Federico Marani Date: Sat, 7 Mar 2009 13:47:46 +0000 Subject: html escape of atom attributes (ticket 1266) --- lib/jabber.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'lib') diff --git a/lib/jabber.php b/lib/jabber.php index f41d984d6..4a96fb54e 100644 --- a/lib/jabber.php +++ b/lib/jabber.php @@ -176,14 +176,14 @@ function jabber_format_entry($profile, $notice) $entry .= "\n"; $entry .= "" . $profile->nickname . " - " . common_config('site', 'name') . "\n"; $entry .= "\n"; - $entry .= "\n"; + $entry .= "\n"; $entry .= "" . $profile->nickname . "\n"; $entry .= "" . $profile->avatarUrl(AVATAR_PROFILE_SIZE) . "\n"; $entry .= "\n"; $entry .= "" . htmlspecialchars($msg) . "\n"; $entry .= "" . htmlspecialchars($msg) . "\n"; - $entry .= "\n"; - $entry .= "". $notice->uri . "\n"; + $entry .= "\n"; + $entry .= "". htmlspecialchars($notice->uri) . "\n"; $entry .= "".common_date_w3dtf($notice->created)."\n"; $entry .= "".common_date_w3dtf($notice->modified)."\n"; $entry .= "\n"; -- cgit v1.2.3-54-g00ecf From a389f157baddafa91347c27194805580b1373e30 Mon Sep 17 00:00:00 2001 From: Federico Marani Date: Sat, 7 Mar 2009 13:48:39 +0000 Subject: tests with Apache ActiveMQ topics (pubsub) --- lib/util.php | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'lib') diff --git a/lib/util.php b/lib/util.php index 8ef19f579..8f1c419f2 100644 --- a/lib/util.php +++ b/lib/util.php @@ -1159,6 +1159,20 @@ function common_enqueue_notice($notice) } common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $transport); } + $con->send('/topic/laconica.'.$notice->profile_id, + $notice->content, + array( + 'profile_id' => $notice->profile_id, + 'created' => $notice->created + ) + ); + $con->send('/topic/laconica.allusers', + $notice->content, + array( + 'profile_id' => $notice->profile_id, + 'created' => $notice->created + ) + ); $result = true; } else { -- cgit v1.2.3-54-g00ecf From 5e9ff6ae973423c06ac42c679f4ad67a46a77bd0 Mon Sep 17 00:00:00 2001 From: Federico Marani Date: Sat, 28 Mar 2009 17:55:20 +0000 Subject: Testing JMS message selectors for notice tags-based routing --- lib/util.php | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) (limited to 'lib') diff --git a/lib/util.php b/lib/util.php index 4e09bffce..79d5cc943 100644 --- a/lib/util.php +++ b/lib/util.php @@ -868,20 +868,38 @@ function common_enqueue_notice($notice) } common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $transport); } + + //send tags as headers, so they can be used as JMS selectors + common_log(LOG_DEBUG, 'searching for tags ' . $notice->id); + $tags = array(); + $tag = new Notice_tag(); + $tag->notice_id = $notice->id; + if ($tag->find()) { + while ($tag->fetch()) { + common_log(LOG_DEBUG, 'tag found = ' . $tag->tag); + array_push($tags,$tag->tag); + } + } + $tag->free(); + $con->send('/topic/laconica.'.$notice->profile_id, $notice->content, array( 'profile_id' => $notice->profile_id, - 'created' => $notice->created + 'created' => $notice->created, + 'tags' => implode($tags,' - ') ) ); + common_log(LOG_DEBUG, 'sent to personal topic ' . $notice->id); $con->send('/topic/laconica.allusers', $notice->content, array( 'profile_id' => $notice->profile_id, - 'created' => $notice->created + 'created' => $notice->created, + 'tags' => implode($tags,' - ') ) ); + common_log(LOG_DEBUG, 'sent to catch-all topic ' . $notice->id); $result = true; } else { -- cgit v1.2.3-54-g00ecf From bd120bc3153e3502790ec976769febd41f790568 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Fri, 3 Apr 2009 12:45:54 -0400 Subject: reformat queuehandler.php --- lib/queuehandler.php | 79 ++++++++++++++++++++++++++-------------------------- 1 file changed, 39 insertions(+), 40 deletions(-) (limited to 'lib') diff --git a/lib/queuehandler.php b/lib/queuehandler.php index 752bbc6d2..f76f16e07 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -36,7 +36,7 @@ class QueueHandler extends Daemon $this->set_id($id); } } - + function class_name() { return ucfirst($this->transport()) . 'Handler'; @@ -46,7 +46,7 @@ class QueueHandler extends Daemon { return strtolower($this->class_name().'.'.$this->get_id()); } - + function get_id() { return $this->_id; @@ -56,16 +56,16 @@ class QueueHandler extends Daemon { $this->_id = $id; } - + function transport() { return null; } - + function start() { } - + function finish() { } @@ -74,7 +74,7 @@ class QueueHandler extends Daemon { return true; } - + function db_dispatch() { do { $qi = Queue_item::top($this->transport()); @@ -107,43 +107,43 @@ class QueueHandler extends Daemon } else { $this->clear_old_claims(); $this->idle(5); - } + } } while (true); } function stomp_dispatch() { - require("Stomp.php"); - $con = new Stomp(common_config('queue','stomp_server')); - if (!$con->connect()) { - $this->log(LOG_ERR, 'Failed to connect to queue server'); - return false; - } - $queue_basename = common_config('queue','queue_basename'); - // subscribe to the relevant queue (format: basename-transport) - $con->subscribe('/queue/'.$queue_basename.'-'.$this->transport()); + require("Stomp.php"); + $con = new Stomp(common_config('queue','stomp_server')); + if (!$con->connect()) { + $this->log(LOG_ERR, 'Failed to connect to queue server'); + return false; + } + $queue_basename = common_config('queue','queue_basename'); + // subscribe to the relevant queue (format: basename-transport) + $con->subscribe('/queue/'.$queue_basename.'-'.$this->transport()); do { $frame = $con->readFrame(); if ($frame) { $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); - // XXX: Now the queue handler receives only the ID of the - // notice, and it has to get it from the DB - // A massive improvement would be avoid DB query by transmitting - // all the notice details via queue server... + // XXX: Now the queue handler receives only the ID of the + // notice, and it has to get it from the DB + // A massive improvement would be avoid DB query by transmitting + // all the notice details via queue server... $notice = Notice::staticGet($frame->body); if ($notice) { $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); $result = $this->handle_notice($notice); if ($result) { - // if the msg has been handled positively, ack it - // and the queue server will remove it from the queue - $con->ack($frame); - $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); - } - else { - // no ack + // if the msg has been handled positively, ack it + // and the queue server will remove it from the queue + $con->ack($frame); + $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); + } + else { + // no ack $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); } $notice->free(); @@ -152,12 +152,11 @@ class QueueHandler extends Daemon } else { $this->log(LOG_WARNING, 'queue item for notice that does not exist'); } - } + } } while (true); - - $con->disconnect(); + + $con->disconnect(); } - function run() { @@ -165,12 +164,12 @@ class QueueHandler extends Daemon return false; } $this->log(LOG_INFO, 'checking for queued notices'); - if (common_config('queue','subsystem') == 'stomp') { - $this->stomp_dispatch(); - } - else { - $this->db_dispatch(); - } + if (common_config('queue','subsystem') == 'stomp') { + $this->stomp_dispatch(); + } + else { + $this->db_dispatch(); + } if (!$this->finish()) { return false; } @@ -183,7 +182,7 @@ class QueueHandler extends Daemon sleep($timeout); } } - + function clear_old_claims() { $qi = new Queue_item(); @@ -193,10 +192,10 @@ class QueueHandler extends Daemon $qi->free(); unset($qi); } - + function log($level, $msg) { common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); } } - + -- cgit v1.2.3-54-g00ecf From 95bb1c6a99766d065ffcde9523de05d8a1162c3d Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Fri, 3 Apr 2009 16:16:39 -0400 Subject: in context link for conversations --- lib/noticelist.php | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) (limited to 'lib') diff --git a/lib/noticelist.php b/lib/noticelist.php index 4182d8808..8fccba73e 100644 --- a/lib/noticelist.php +++ b/lib/noticelist.php @@ -197,7 +197,7 @@ class NoticeListItem extends Widget $this->out->elementStart('div', 'entry-content'); $this->showNoticeLink(); $this->showNoticeSource(); - $this->showReplyTo(); + $this->showContext(); $this->out->elementEnd('div'); } @@ -421,17 +421,18 @@ class NoticeListItem extends Widget * @return void */ - function showReplyTo() + function showContext() { - if ($this->notice->reply_to) { - $replyurl = common_local_url('shownotice', - array('notice' => $this->notice->reply_to)); + // XXX: also show context if there are replies to this notice + if (!empty($this->notice->conversation) + && $this->notice->conversation != $this->notice->id) { + $convurl = common_local_url('conversation', + array('id' => $this->notice->conversation)); $this->out->elementStart('dl', 'response'); $this->out->element('dt', null, _('To')); $this->out->elementStart('dd'); - $this->out->element('a', array('href' => $replyurl, - 'rel' => 'in-reply-to'), - _('in reply to')); + $this->out->element('a', array('href' => $convurl), + _('in context')); $this->out->elementEnd('dd'); $this->out->elementEnd('dl'); } -- cgit v1.2.3-54-g00ecf