summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorEvan Prodromou <evan@prodromou.name>2008-08-27 16:54:07 -0400
committerEvan Prodromou <evan@prodromou.name>2008-08-27 16:54:07 -0400
commitb9f0ea6f0e16f2314a2bdd56365205e7a3c31aa1 (patch)
treea4739cd2c39c4736a5789a80b47ab0cf174d2be3 /lib
parentadeb19f1f7a82b17cee85ade1ac06fcd48e0d4cb (diff)
break up monolithic xmppdaemon into multiple queue handlers
Eventually, the poor xmppdaemon has become overloaded with extra tasks. So, I've broken it up. Now, we have 5 background scripts, and more coming: * xmppdaemon.php - handles incoming XMPP messages only. * xmppqueuehandler.php - sends notices from the queue out through XMPP. * smsqueuehandler.php - sends notices from the queue out over SMS * ombqueuehandler.php - sends notices from the queue out over OMB * xmppconfirmhandler.php - sends confirmation requests out over XMPP. This is in addition to maildaemon.php, which takes incoming messages. None of these are "true" daemons -- they don't daemonize themselves automatically. Use nohup or another tool to background them. monit can also be useful to keep them running. At some point, these might become fork()'ing daemons, able to handle more than one notice at a time. For now, I'm just running multiple instances, hoping they don't interfere. darcs-hash:20080827205407-84dde-97884a12f5f4e54c93bc785bd280683d1ee7e749.gz
Diffstat (limited to 'lib')
-rw-r--r--lib/jabber.php52
-rw-r--r--lib/queuehandler.php105
-rw-r--r--lib/util.php29
3 files changed, 155 insertions, 31 deletions
diff --git a/lib/jabber.php b/lib/jabber.php
index 1202aa322..e2f7ca3c8 100644
--- a/lib/jabber.php
+++ b/lib/jabber.php
@@ -26,23 +26,45 @@ require_once('XMPPHP/XMPP.php');
class Laconica_XMPP extends XMPPHP_XMPP {
function messageplus($to, $body, $type = 'chat', $subject = null, $payload = null) {
- $to = htmlspecialchars($to);
- $body = htmlspecialchars($body);
- $subject = htmlspecialchars($subject);
-
- $jid = jabber_daemon_address();
-
- $out = "<message from='$jid' to='$to' type='$type'>";
- if($subject) $out .= "<subject>$subject</subject>";
- $out .= "<body>$body</body>";
- if($payload) $out .= $payload;
- $out .= "</message>";
-
+ $to = htmlspecialchars($to);
+ $body = htmlspecialchars($body);
+ $subject = htmlspecialchars($subject);
+
+ $jid = jabber_daemon_address();
+
+ $out = "<message from='$jid' to='$to' type='$type'>";
+ if($subject) $out .= "<subject>$subject</subject>";
+ $out .= "<body>$body</body>";
+ if($payload) $out .= $payload;
+ $out .= "</message>";
+
$cnt = strlen($out);
common_log(LOG_DEBUG, "Sending $cnt chars to $to");
- $this->send($out);
+ $this->send($out);
common_log(LOG_DEBUG, 'Done.');
}
+
+ public function presence($status = null, $show = 'available', $to = null, $type='available', $priority=NULL) {
+ if($type == 'available') $type = '';
+ $to = htmlspecialchars($to);
+ $status = htmlspecialchars($status);
+ if($show == 'unavailable') $type = 'unavailable';
+
+ $out = "<presence";
+ if($to) $out .= " to='$to'";
+ if($type) $out .= " type='$type'";
+ if($show == 'available' and !$status and is_null($priority)) {
+ $out .= "/>";
+ } else {
+ $out .= ">";
+ if($show != 'available') $out .= "<show>$show</show>";
+ if($status) $out .= "<status>$status</status>";
+ if(!is_null($priority)) $out .= "<priority>$priority</priority>";
+ $out .= "</presence>";
+ }
+
+ $this->send($out);
+ }
}
function jabber_valid_base_jid($jid) {
@@ -64,7 +86,7 @@ function jabber_daemon_address() {
return common_config('xmpp', 'user') . '@' . common_config('xmpp', 'server');
}
-function jabber_connect($resource=NULL) {
+function jabber_connect($resource=NULL, $status=NULL, $priority=NULL) {
static $conn = NULL;
if (!$conn) {
$conn = new Laconica_XMPP(common_config('xmpp', 'host') ?
@@ -92,6 +114,8 @@ function jabber_connect($resource=NULL) {
return false;
}
$conn->processUntil('session_start');
+ $conn->getRoster();
+ $conn->presence($presence, $priority);
}
return $conn;
}
diff --git a/lib/queuehandler.php b/lib/queuehandler.php
new file mode 100644
index 000000000..f12b880b1
--- /dev/null
+++ b/lib/queuehandler.php
@@ -0,0 +1,105 @@
+<?php
+/*
+ * Laconica - a distributed open-source microblogging tool
+ * Copyright (C) 2008, Controlez-Vous, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+define('CLAIM_TIMEOUT', 1200);
+
+class QueueHandler {
+
+ var $_id = 'generic';
+
+ function QueueHandler($id=NULL) {
+ if ($id) {
+ $this->set_id($id);
+ }
+ }
+
+ function class_name() {
+ return ucfirst($this->transport()) . 'Handler';
+ }
+
+ function get_id() {
+ return $this->_id;
+ }
+
+ function set_id($id) {
+ $this->_id = $id;
+ }
+
+ function transport() {
+ return NULL;
+ }
+
+ function start() {
+ }
+
+ function finish() {
+ }
+
+ function handle_notice($notice) {
+ return true;
+ }
+
+ function handle_queue() {
+ $this->clear_old_claims();
+ $this->log(LOG_INFO, 'checking for queued notices');
+ $cnt = 0;
+ $transport = $this->transport();
+ do {
+ $qi = Queue_item::top($transport);
+ if ($qi) {
+ $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created));
+ $notice = Notice::staticGet($qi->notice_id);
+ if ($notice) {
+ $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
+ # XXX: what to do if broadcast fails?
+ $result = $this->handle_notice($notice);
+ if (!$result) {
+ $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
+ $orig = $qi;
+ $qi->claimed = NULL;
+ $qi->update($orig);
+ $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id);
+ continue;
+ }
+ $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
+ $notice = NULL;
+ } else {
+ $this->log(LOG_WARNING, 'queue item for notice that does not exist');
+ }
+ $qi->delete();
+ $cnt++;
+ } else {
+ $this->clear_old_claims();
+ sleep(10);
+ }
+ } while (true);
+ }
+
+ function clear_old_claims() {
+ $qi = new Queue_item();
+ $qi->transport = $this->transport();
+ $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT);
+ $qi->update(DB_DATAOBJECT_WHEREADD_ONLY);
+ }
+
+ function log($level, $msg) {
+ common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
+ }
+}
+ \ No newline at end of file
diff --git a/lib/util.php b/lib/util.php
index f06f49d71..cea0c3992 100644
--- a/lib/util.php
+++ b/lib/util.php
@@ -1070,17 +1070,20 @@ function common_broadcast_notice($notice, $remote=false) {
# Stick the notice on the queue
function common_enqueue_notice($notice) {
- $qi = new Queue_item();
- $qi->notice_id = $notice->id;
- $qi->created = $notice->created;
+ foreach (array('jabber', 'oms', 'sms') as $transport) {
+ $qi = new Queue_item();
+ $qi->notice_id = $notice->id;
+ $qi->transport = $transport;
+ $qi->created = $notice->created;
+ if (!$result) {
$result = $qi->insert();
- if (!$result) {
- $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError');
- common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message);
- return false;
+ $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError');
+ common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message);
+ return false;
+ }
+ common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport);
+ return $result;
}
- common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id);
- return $result;
}
function common_dequeue_notice($notice) {
@@ -1477,11 +1480,3 @@ function common_canonical_sms($sms) {
preg_replace('/\D/', '', $sms);
return $sms;
}
-
-function common_session_token() {
- common_ensure_session();
- if (!array_key_exists('token', $_SESSION)) {
- $_SESSION['token'] = common_good_rand(64);
- }
- return $_SESSION['token'];
-}