diff options
author | Evan Prodromou <evan@prodromou.name> | 2008-07-05 13:28:37 -0400 |
---|---|---|
committer | Evan Prodromou <evan@prodromou.name> | 2008-07-05 13:28:37 -0400 |
commit | 9942b4416b9bc3ea2e0d78f4d1ee46111aef61ac (patch) | |
tree | 0257bcbd9bd7752e5c4ddb4df3ee725e5a6f0be0 /xmppdaemon.php | |
parent | c414746e743abbce04b9c42075a6efadbaf41641 (diff) |
move queuedaemon into xmppdaemon
darcs-hash:20080705172837-84dde-89fa8f7e844417f8157a2ecc9b24efad267258ff.gz
Diffstat (limited to 'xmppdaemon.php')
-rwxr-xr-x | xmppdaemon.php | 163 |
1 files changed, 121 insertions, 42 deletions
diff --git a/xmppdaemon.php b/xmppdaemon.php index a6e046579..26c395303 100755 --- a/xmppdaemon.php +++ b/xmppdaemon.php @@ -67,24 +67,32 @@ class XMPPDaemon { } function handle() { + + static $parts = array('message', 'presence', + 'end_stream', 'session_start'); + while(!$this->conn->disconnected) { - $payloads = $this->conn->processUntil(array('message', 'presence', - 'end_stream', 'session_start')); - foreach($payloads as $event) { - $pl = $event[1]; - $this->log(LOG_DEBUG, "Received '$event[0]': " . print_r($pl, TRUE)); - switch($event[0]) { - case 'message': - $this->handle_message($pl); - break; - case 'presence': - $this->handle_presence($pl); - break; - case 'session_start': - $this->handle_session($pl); - break; + + $payloads = $this->conn->processUntil($parts, 10); + + if ($payloads) { + foreach($payloads as $event) { + $pl = $event[1]; + switch($event[0]) { + case 'message': + $this->handle_message($pl); + break; + case 'presence': + $this->handle_presence($pl); + break; + case 'session_start': + $this->handle_session($pl); + break; + } } } + + $this->broadcast_queue(); } } @@ -116,7 +124,7 @@ class XMPPDaemon { $user = $this->get_user($from); if (!$user) { - $this->from_site($from, 'Unknown user; go to ' . + $this->from_site($from, 'Unknown user; go to ' . common_local_url('imsettings') . ' to add your address to your account'); $this->log(LOG_WARNING, 'Message from unknown user ' . $from); @@ -133,7 +141,7 @@ class XMPPDaemon { $text = '['.common_config('site', 'name') . '] ' . $msg; jabber_send_message($address, $text); } - + function handle_command($user, $body) { # XXX: localise switch(trim($body)) { @@ -201,32 +209,32 @@ class XMPPDaemon { function handle_presence(&$pl) { $from = jabber_normalize_jid($pl['from']); switch ($pl['type']) { - case 'subscribe': - # We let anyone subscribe - $this->subscribed($from); - $this->log(LOG_INFO, - 'Accepted subscription from ' . $from); - break; - case 'subscribed': - case 'unsubscribed': - case 'unsubscribe': - $this->log(LOG_INFO, - 'Ignoring "' . $pl['type'] . '" from ' . $from); - break; - default: - if (!$pl['type']) { - $user = User::staticGet('jabber', $from); - if (!$user) { - $this->log(LOG_WARNING, 'Message from unknown user ' . $from); - return; - } - if ($user->updatefrompresence) { - $this->log(LOG_INFO, 'Updating ' . $user->nickname . - ' status from presence.'); - $this->add_notice($user, $pl); - } + case 'subscribe': + # We let anyone subscribe + $this->subscribed($from); + $this->log(LOG_INFO, + 'Accepted subscription from ' . $from); + break; + case 'subscribed': + case 'unsubscribed': + case 'unsubscribe': + $this->log(LOG_INFO, + 'Ignoring "' . $pl['type'] . '" from ' . $from); + break; + default: + if (!$pl['type']) { + $user = User::staticGet('jabber', $from); + if (!$user) { + $this->log(LOG_WARNING, 'Message from unknown user ' . $from); + return; } - break; + if ($user->updatefrompresence) { + $this->log(LOG_INFO, 'Updating ' . $user->nickname . + ' status from presence.'); + $this->add_notice($user, $pl); + } + } + break; } } @@ -242,6 +250,77 @@ class XMPPDaemon { $this->log(LOG_INFO, 'Setting status to "' . $status . '"'); jabber_send_presence($status); } + + function top_queue_item() { + + $qi = new Queue_item(); + $qi->orderBy('created'); + $qi->whereAdd('claimed is NULL'); + + $qi->limit(1); + + $cnt = $qi->find(TRUE); + + if ($cnt) { + # XXX: potential race condition + # can we force it to only update if claimed is still NULL + # (or old)? + $this->log(LOG_INFO, 'claiming queue item = ' . $qi->notice_id); + $orig = clone($qi); + $qi->claimed = DB_DataObject_Cast::dateTime(); + $result = $qi->update($orig); + if ($result) { + $this->log(LOG_INFO, 'claim succeeded.'); + return $qi; + } else { + $this->log(LOG_INFO, 'claim failed.'); + } + } + $qi = NULL; + return NULL; + } + + function broadcast_queue() { + $this->clear_old_claims(); + $this->log(LOG_INFO, 'checking for queued notices'); + do { + $qi = $this->top_queue_item(); + if ($qi) { + $this->log(LOG_INFO, 'Got queue item #'.$in_a_row.' enqueued '.common_exact_date($qi->created)); + $notice = Notice::staticGet($qi->notice_id); + if ($notice) { + $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); + # XXX: what to do if broadcast fails? + $result = common_real_broadcast($notice, $this->is_remote($notice)); + if (!$result) { + $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); + $orig = $qi; + $qi->claimed = NULL; + $qi->update($orig); + $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id); + continue; + } + $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); + $notice = NULL; + } else { + $this->log(LOG_WARNING, 'queue item for notice that does not exist'); + } + $qi->delete(); + } + } while ($qi); + } + + function clear_old_claims() { + $qi = new Queue_item(); + $qi->claimed = NULL; + $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT); + $qi->update(DB_DATAOBJECT_WHEREADD_ONLY); + } + + function is_remote($notice) { + $user = User::staticGet($notice->profile_id); + return !$user; + } } $resource = ($argc > 1) ? $argv[1] : NULL; |