summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvan Prodromou <evan@prodromou.name>2008-08-29 14:17:02 -0400
committerEvan Prodromou <evan@prodromou.name>2008-08-29 14:17:02 -0400
commitc4d67892751b17856b235182874c3304890dc2c3 (patch)
treeb31fa8766cfaa2e906cf19862af5bfa50390e6cd
parentf652418863182182bb17e0f378194a8648392410 (diff)
split public stream to its own queue handler
Add another queue handler for the public stream. Should further parallelize the work of sending out messages. darcs-hash:20080829181702-84dde-594505aa73d2380b13bd98917b70b02bac597d12.gz
-rw-r--r--lib/jabber.php39
-rw-r--r--lib/util.php8
-rwxr-xr-xscripts/publicqueuehandler.php66
-rwxr-xr-xscripts/startdaemons.sh1
-rwxr-xr-xscripts/stopdaemons.sh1
-rwxr-xr-xscripts/xmppdaemon.php2
-rwxr-xr-xscripts/xmppqueuehandler.php4
7 files changed, 101 insertions, 20 deletions
diff --git a/lib/jabber.php b/lib/jabber.php
index 215cd5537..171dff4df 100644
--- a/lib/jabber.php
+++ b/lib/jabber.php
@@ -24,32 +24,32 @@ require_once('XMPPHP/XMPP.php');
# XXX: something of a hack to work around problems with the XMPPHP lib
class Laconica_XMPP extends XMPPHP_XMPP {
-
+
function messageplus($to, $body, $type = 'chat', $subject = null, $payload = null) {
$to = htmlspecialchars($to);
$body = htmlspecialchars($body);
$subject = htmlspecialchars($subject);
-
+
$jid = jabber_daemon_address();
-
+
$out = "<message from='$jid' to='$to' type='$type'>";
if($subject) $out .= "<subject>$subject</subject>";
$out .= "<body>$body</body>";
if($payload) $out .= $payload;
$out .= "</message>";
-
+
$cnt = strlen($out);
common_log(LOG_DEBUG, "Sending $cnt chars to $to");
$this->send($out);
common_log(LOG_DEBUG, 'Done.');
}
-
+
public function presence($status = null, $show = 'available', $to = null, $type='available', $priority=NULL) {
if($type == 'available') $type = '';
$to = htmlspecialchars($to);
$status = htmlspecialchars($status);
if($show == 'unavailable') $type = 'unavailable';
-
+
$out = "<presence";
if($to) $out .= " to='$to'";
if($type) $out .= " type='$type'";
@@ -62,7 +62,7 @@ class Laconica_XMPP extends XMPPHP_XMPP {
if(!is_null($priority)) $out .= "<priority>$priority</priority>";
$out .= "</presence>";
}
-
+
$this->send($out);
}
}
@@ -105,7 +105,7 @@ function jabber_connect($resource=NULL, $status=NULL, $priority=NULL) {
);
$conn->autoSubscribe();
$conn->useEncryption(common_config('xmpp', 'encryption'));
-
+
if (!$conn) {
return false;
}
@@ -141,7 +141,7 @@ function jabber_send_notice($to, $notice) {
# Extra stuff defined by Twitter, needed by twitter clients
function jabber_format_entry($profile, $notice) {
-
+
$noticeurl = common_local_url('shownotice',
array('notice' => $notice->id));
$msg = jabber_format_notice($profile, $notice);
@@ -167,7 +167,7 @@ function jabber_format_entry($profile, $notice) {
$html .= ($notice->rendered) ? $notice->rendered : common_render_content($notice->content, $notice);
$html .= "\n</body>\n";
$html .= "\n</html>\n";
-
+
$event = "<event xmlns='http://jabber.org/protocol/pubsub#event'>\n";
$event .= "<items xmlns='http://jabber.org/protocol/pubsub' ";
$event .= "node='" . common_local_url('public') . "'>\n";
@@ -229,6 +229,7 @@ function jabber_special_presence($type, $to=NULL, $show=NULL, $status=NULL) {
}
function jabber_broadcast_notice($notice) {
+
if (!common_config('xmpp', 'enabled')) {
return true;
}
@@ -268,7 +269,7 @@ function jabber_broadcast_notice($notice) {
# XXX: use a join here rather than looping through results
$sub = new Subscription();
$sub->subscribed = $notice->profile_id;
-
+
if ($sub->find()) {
while ($sub->fetch()) {
$user = User::staticGet($sub->subscriber);
@@ -289,14 +290,20 @@ function jabber_broadcast_notice($notice) {
}
}
}
+
+ return true;
+}
+
+function jabber_public_notice($notice) {
# Now, users who want everything
-
+
$public = common_config('xmpp', 'public');
-
+
# FIXME PRIV don't send out private messages here
- # XXX: should we send out non-local messages if public,localonly = false? I think not
-
+ # XXX: should we send out non-local messages if public,localonly
+ # = false? I think not
+
if ($public && $notice->is_local) {
foreach ($public as $address) {
common_log(LOG_INFO,
@@ -305,7 +312,7 @@ function jabber_broadcast_notice($notice) {
jabber_send_notice($address, $notice);
}
}
-
+
return true;
}
diff --git a/lib/util.php b/lib/util.php
index 496c6f3d2..c6cdfbcb9 100644
--- a/lib/util.php
+++ b/lib/util.php
@@ -1070,7 +1070,7 @@ function common_broadcast_notice($notice, $remote=false) {
# Stick the notice on the queue
function common_enqueue_notice($notice) {
- foreach (array('jabber', 'omb', 'sms') as $transport) {
+ foreach (array('jabber', 'omb', 'sms', 'public') as $transport) {
$qi = new Queue_item();
$qi->notice_id = $notice->id;
$qi->transport = $transport;
@@ -1126,6 +1126,12 @@ function common_real_broadcast($notice, $remote=false) {
common_log(LOG_ERR, 'Error in sms broadcast for notice ' . $notice->id);
}
}
+ if ($success) {
+ $success = jabber_public_notice($notice);
+ if (!$success) {
+ common_log(LOG_ERR, 'Error in public broadcast for notice ' . $notice->id);
+ }
+ }
// XXX: broadcast notices to other IM
return $success;
}
diff --git a/scripts/publicqueuehandler.php b/scripts/publicqueuehandler.php
new file mode 100755
index 000000000..0d95a489f
--- /dev/null
+++ b/scripts/publicqueuehandler.php
@@ -0,0 +1,66 @@
+#!/usr/bin/env php
+<?php
+/*
+ * Laconica - a distributed open-source microblogging tool
+ * Copyright (C) 2008, Controlez-Vous, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+# Abort if called from a web server
+if (isset($_SERVER) && array_key_exists('REQUEST_METHOD', $_SERVER)) {
+ print "This script must be run from the command line\n";
+ exit();
+}
+
+define('INSTALLDIR', realpath(dirname(__FILE__) . '/..'));
+define('LACONICA', true);
+
+require_once(INSTALLDIR . '/lib/common.php');
+require_once(INSTALLDIR . '/lib/jabber.php');
+require_once(INSTALLDIR . '/lib/queuehandler.php');
+
+set_error_handler('common_error_handler');
+
+class PublicQueueHandler extends QueueHandler {
+
+ function transport() {
+ return 'public';
+ }
+
+ function start() {
+ # Low priority; we don't want to receive messages
+ $this->conn = jabber_connect($this->_id, NULL, -1);
+ return !is_null($this->conn);
+ }
+
+ function handle_notice($notice) {
+ return jabber_public_notice($notice);
+ }
+
+ function finish() {
+ }
+}
+
+mb_internal_encoding('UTF-8');
+
+$resource = ($argc > 1) ? $argv[1] : (common_config('xmpp','resource') . '-public');
+
+$handler = new XmppQueueHandler($resource);
+
+if ($handler->start()) {
+ $handler->handle_queue();
+}
+
+$handler->finish();
diff --git a/scripts/startdaemons.sh b/scripts/startdaemons.sh
index 37d30cdb4..8c054417d 100755
--- a/scripts/startdaemons.sh
+++ b/scripts/startdaemons.sh
@@ -22,6 +22,7 @@ export INSTALLDIR=$1
/sbin/start-stop-daemon -S --exec $INSTALLDIR/scripts/xmppdaemon.php -b -m --pidfile=/var/run/xmppdaemon.pid
/sbin/start-stop-daemon -S --exec $INSTALLDIR/scripts/xmppqueuehandler.php -b -m --pidfile=/var/run/xmppqueuehandler.pid
+/sbin/start-stop-daemon -S --exec $INSTALLDIR/scripts/publicqueuehandler.php -b -m --pidfile=/var/run/publicqueuehandler.pid
/sbin/start-stop-daemon -S --exec $INSTALLDIR/scripts/xmppconfirmhandler.php -b -m --pidfile=/var/run/xmppconfirmhandler.pid
/sbin/start-stop-daemon -S --exec $INSTALLDIR/scripts/smsqueuehandler.php -b -m --pidfile=/var/run/smsqueuehandler.pid
/sbin/start-stop-daemon -S --exec $INSTALLDIR/scripts/ombqueuehandler.php -b -m --pidfile=/var/run/ombqueuehandler.pid
diff --git a/scripts/stopdaemons.sh b/scripts/stopdaemons.sh
index d620472f0..6814101cb 100755
--- a/scripts/stopdaemons.sh
+++ b/scripts/stopdaemons.sh
@@ -22,6 +22,7 @@ export INSTALLDIR=$1
/sbin/start-stop-daemon -K -m --pidfile=/var/run/xmppdaemon.pid
/sbin/start-stop-daemon -K -m --pidfile=/var/run/xmppqueuehandler.pid
+/sbin/start-stop-daemon -K -m --pidfile=/var/run/publicqueuehandler.pid
/sbin/start-stop-daemon -K -m --pidfile=/var/run/xmppconfirmhandler.pid
/sbin/start-stop-daemon -K -m --pidfile=/var/run/smsqueuehandler.pid
/sbin/start-stop-daemon -K -m --pidfile=/var/run/ombqueuehandler.pid
diff --git a/scripts/xmppdaemon.php b/scripts/xmppdaemon.php
index b013a34c9..153ab5149 100755
--- a/scripts/xmppdaemon.php
+++ b/scripts/xmppdaemon.php
@@ -259,7 +259,7 @@ class XMPPDaemon {
mb_internal_encoding('UTF-8');
-$resource = ($argc > 1) ? $argv[1] : NULL;
+$resource = ($argc > 1) ? $argv[1] : (common_config('xmpp','resource') . '-listen');
$daemon = new XMPPDaemon($resource);
diff --git a/scripts/xmppqueuehandler.php b/scripts/xmppqueuehandler.php
index 8fe8b1360..3f632d5a8 100755
--- a/scripts/xmppqueuehandler.php
+++ b/scripts/xmppqueuehandler.php
@@ -41,7 +41,7 @@ class XmppQueueHandler extends QueueHandler {
function start() {
# Low priority; we don't want to receive messages
- $this->conn = jabber_connect($this->resource, NULL, -100);
+ $this->conn = jabber_connect($this->_id, NULL, -1);
return !is_null($this->conn);
}
@@ -55,7 +55,7 @@ class XmppQueueHandler extends QueueHandler {
mb_internal_encoding('UTF-8');
-$resource = ($argc > 1) ? $argv[1] : NULL;
+$resource = ($argc > 1) ? $argv[1] : (common_config('xmpp','resource') . '-queuehandler');
$handler = new XmppQueueHandler($resource);