From 2ac684f53df3f76aacb4d1be79d7db64567f1964 Mon Sep 17 00:00:00 2001 From: Federico Marani Date: Fri, 20 Feb 2009 18:57:20 +0000 Subject: * implemented STOMP support for external message queueing systems * (ticket-724) * included PHP STOMP libraries (http://code.google.com/p/stompcli/) --- lib/util.php | 48 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 11 deletions(-) (limited to 'lib/util.php') diff --git a/lib/util.php b/lib/util.php index c5a092f63..8ef19f579 100644 --- a/lib/util.php +++ b/lib/util.php @@ -1138,18 +1138,44 @@ function common_twitter_broadcast($notice, $flink) function common_enqueue_notice($notice) { - foreach (array('jabber', 'omb', 'sms', 'public') as $transport) { - $qi = new Queue_item(); - $qi->notice_id = $notice->id; - $qi->transport = $transport; - $qi->created = $notice->created; - $result = $qi->insert(); - if (!$result) { - $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError'); - common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message); - return false; + if (common_config('queue','subsystem') == 'stomp') { + // use an external message queue system via STOMP + require_once("Stomp.php"); + $con = new Stomp(common_config('queue','stomp_server')); + if (!$con->connect()) { + common_log(LOG_ERR, 'Failed to connect to queue server'); + return false; + } + $queue_basename = common_config('queue','queue_basename'); + foreach (array('jabber', 'omb', 'sms', 'public') as $transport) { + if (!$con->send( + '/queue/'.$queue_basename.'-'.$transport, // QUEUE + $notice->id, // BODY of the message + array ( // HEADERS of the msg + 'created' => $notice->created + ))) { + common_log(LOG_ERR, 'Error sending to '.$transport.' queue'); + return false; + } + common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $transport); + } + $result = true; + } + else { + // in any other case, 'internal' + foreach (array('jabber', 'omb', 'sms', 'public') as $transport) { + $qi = new Queue_item(); + $qi->notice_id = $notice->id; + $qi->transport = $transport; + $qi->created = $notice->created; + $result = $qi->insert(); + if (!$result) { + $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError'); + common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message); + return false; + } + common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport); } - common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport); } return $result; } -- cgit v1.2.3-54-g00ecf From a389f157baddafa91347c27194805580b1373e30 Mon Sep 17 00:00:00 2001 From: Federico Marani Date: Sat, 7 Mar 2009 13:48:39 +0000 Subject: tests with Apache ActiveMQ topics (pubsub) --- lib/util.php | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'lib/util.php') diff --git a/lib/util.php b/lib/util.php index 8ef19f579..8f1c419f2 100644 --- a/lib/util.php +++ b/lib/util.php @@ -1159,6 +1159,20 @@ function common_enqueue_notice($notice) } common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $transport); } + $con->send('/topic/laconica.'.$notice->profile_id, + $notice->content, + array( + 'profile_id' => $notice->profile_id, + 'created' => $notice->created + ) + ); + $con->send('/topic/laconica.allusers', + $notice->content, + array( + 'profile_id' => $notice->profile_id, + 'created' => $notice->created + ) + ); $result = true; } else { -- cgit v1.2.3-54-g00ecf From 5e9ff6ae973423c06ac42c679f4ad67a46a77bd0 Mon Sep 17 00:00:00 2001 From: Federico Marani Date: Sat, 28 Mar 2009 17:55:20 +0000 Subject: Testing JMS message selectors for notice tags-based routing --- lib/util.php | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) (limited to 'lib/util.php') diff --git a/lib/util.php b/lib/util.php index 4e09bffce..79d5cc943 100644 --- a/lib/util.php +++ b/lib/util.php @@ -868,20 +868,38 @@ function common_enqueue_notice($notice) } common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $transport); } + + //send tags as headers, so they can be used as JMS selectors + common_log(LOG_DEBUG, 'searching for tags ' . $notice->id); + $tags = array(); + $tag = new Notice_tag(); + $tag->notice_id = $notice->id; + if ($tag->find()) { + while ($tag->fetch()) { + common_log(LOG_DEBUG, 'tag found = ' . $tag->tag); + array_push($tags,$tag->tag); + } + } + $tag->free(); + $con->send('/topic/laconica.'.$notice->profile_id, $notice->content, array( 'profile_id' => $notice->profile_id, - 'created' => $notice->created + 'created' => $notice->created, + 'tags' => implode($tags,' - ') ) ); + common_log(LOG_DEBUG, 'sent to personal topic ' . $notice->id); $con->send('/topic/laconica.allusers', $notice->content, array( 'profile_id' => $notice->profile_id, - 'created' => $notice->created + 'created' => $notice->created, + 'tags' => implode($tags,' - ') ) ); + common_log(LOG_DEBUG, 'sent to catch-all topic ' . $notice->id); $result = true; } else { -- cgit v1.2.3-54-g00ecf