summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/jabber.php52
-rw-r--r--lib/queuehandler.php105
-rw-r--r--lib/util.php29
3 files changed, 155 insertions, 31 deletions
diff --git a/lib/jabber.php b/lib/jabber.php
index 1202aa322..e2f7ca3c8 100644
--- a/lib/jabber.php
+++ b/lib/jabber.php
@@ -26,23 +26,45 @@ require_once('XMPPHP/XMPP.php');
class Laconica_XMPP extends XMPPHP_XMPP {
function messageplus($to, $body, $type = 'chat', $subject = null, $payload = null) {
- $to = htmlspecialchars($to);
- $body = htmlspecialchars($body);
- $subject = htmlspecialchars($subject);
-
- $jid = jabber_daemon_address();
-
- $out = "<message from='$jid' to='$to' type='$type'>";
- if($subject) $out .= "<subject>$subject</subject>";
- $out .= "<body>$body</body>";
- if($payload) $out .= $payload;
- $out .= "</message>";
-
+ $to = htmlspecialchars($to);
+ $body = htmlspecialchars($body);
+ $subject = htmlspecialchars($subject);
+
+ $jid = jabber_daemon_address();
+
+ $out = "<message from='$jid' to='$to' type='$type'>";
+ if($subject) $out .= "<subject>$subject</subject>";
+ $out .= "<body>$body</body>";
+ if($payload) $out .= $payload;
+ $out .= "</message>";
+
$cnt = strlen($out);
common_log(LOG_DEBUG, "Sending $cnt chars to $to");
- $this->send($out);
+ $this->send($out);
common_log(LOG_DEBUG, 'Done.');
}
+
+ public function presence($status = null, $show = 'available', $to = null, $type='available', $priority=NULL) {
+ if($type == 'available') $type = '';
+ $to = htmlspecialchars($to);
+ $status = htmlspecialchars($status);
+ if($show == 'unavailable') $type = 'unavailable';
+
+ $out = "<presence";
+ if($to) $out .= " to='$to'";
+ if($type) $out .= " type='$type'";
+ if($show == 'available' and !$status and is_null($priority)) {
+ $out .= "/>";
+ } else {
+ $out .= ">";
+ if($show != 'available') $out .= "<show>$show</show>";
+ if($status) $out .= "<status>$status</status>";
+ if(!is_null($priority)) $out .= "<priority>$priority</priority>";
+ $out .= "</presence>";
+ }
+
+ $this->send($out);
+ }
}
function jabber_valid_base_jid($jid) {
@@ -64,7 +86,7 @@ function jabber_daemon_address() {
return common_config('xmpp', 'user') . '@' . common_config('xmpp', 'server');
}
-function jabber_connect($resource=NULL) {
+function jabber_connect($resource=NULL, $status=NULL, $priority=NULL) {
static $conn = NULL;
if (!$conn) {
$conn = new Laconica_XMPP(common_config('xmpp', 'host') ?
@@ -92,6 +114,8 @@ function jabber_connect($resource=NULL) {
return false;
}
$conn->processUntil('session_start');
+ $conn->getRoster();
+ $conn->presence($presence, $priority);
}
return $conn;
}
diff --git a/lib/queuehandler.php b/lib/queuehandler.php
new file mode 100644
index 000000000..f12b880b1
--- /dev/null
+++ b/lib/queuehandler.php
@@ -0,0 +1,105 @@
+<?php
+/*
+ * Laconica - a distributed open-source microblogging tool
+ * Copyright (C) 2008, Controlez-Vous, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+define('CLAIM_TIMEOUT', 1200);
+
+class QueueHandler {
+
+ var $_id = 'generic';
+
+ function QueueHandler($id=NULL) {
+ if ($id) {
+ $this->set_id($id);
+ }
+ }
+
+ function class_name() {
+ return ucfirst($this->transport()) . 'Handler';
+ }
+
+ function get_id() {
+ return $this->_id;
+ }
+
+ function set_id($id) {
+ $this->_id = $id;
+ }
+
+ function transport() {
+ return NULL;
+ }
+
+ function start() {
+ }
+
+ function finish() {
+ }
+
+ function handle_notice($notice) {
+ return true;
+ }
+
+ function handle_queue() {
+ $this->clear_old_claims();
+ $this->log(LOG_INFO, 'checking for queued notices');
+ $cnt = 0;
+ $transport = $this->transport();
+ do {
+ $qi = Queue_item::top($transport);
+ if ($qi) {
+ $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created));
+ $notice = Notice::staticGet($qi->notice_id);
+ if ($notice) {
+ $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
+ # XXX: what to do if broadcast fails?
+ $result = $this->handle_notice($notice);
+ if (!$result) {
+ $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
+ $orig = $qi;
+ $qi->claimed = NULL;
+ $qi->update($orig);
+ $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id);
+ continue;
+ }
+ $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
+ $notice = NULL;
+ } else {
+ $this->log(LOG_WARNING, 'queue item for notice that does not exist');
+ }
+ $qi->delete();
+ $cnt++;
+ } else {
+ $this->clear_old_claims();
+ sleep(10);
+ }
+ } while (true);
+ }
+
+ function clear_old_claims() {
+ $qi = new Queue_item();
+ $qi->transport = $this->transport();
+ $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT);
+ $qi->update(DB_DATAOBJECT_WHEREADD_ONLY);
+ }
+
+ function log($level, $msg) {
+ common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
+ }
+}
+ \ No newline at end of file
diff --git a/lib/util.php b/lib/util.php
index f06f49d71..cea0c3992 100644
--- a/lib/util.php
+++ b/lib/util.php
@@ -1070,17 +1070,20 @@ function common_broadcast_notice($notice, $remote=false) {
# Stick the notice on the queue
function common_enqueue_notice($notice) {
- $qi = new Queue_item();
- $qi->notice_id = $notice->id;
- $qi->created = $notice->created;
+ foreach (array('jabber', 'oms', 'sms') as $transport) {
+ $qi = new Queue_item();
+ $qi->notice_id = $notice->id;
+ $qi->transport = $transport;
+ $qi->created = $notice->created;
+ if (!$result) {
$result = $qi->insert();
- if (!$result) {
- $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError');
- common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message);
- return false;
+ $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError');
+ common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message);
+ return false;
+ }
+ common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport);
+ return $result;
}
- common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id);
- return $result;
}
function common_dequeue_notice($notice) {
@@ -1477,11 +1480,3 @@ function common_canonical_sms($sms) {
preg_replace('/\D/', '', $sms);
return $sms;
}
-
-function common_session_token() {
- common_ensure_session();
- if (!array_key_exists('token', $_SESSION)) {
- $_SESSION['token'] = common_good_rand(64);
- }
- return $_SESSION['token'];
-}