diff options
author | Zach Copley <zach@status.net> | 2009-11-19 20:12:46 -0800 |
---|---|---|
committer | Zach Copley <zach@status.net> | 2009-11-19 20:12:46 -0800 |
commit | 4b98edf75f4e255f8c61087bd1525d89653a521f (patch) | |
tree | b2a7eb6d77429eadb1beabe2d5e6ae1c1a2831d6 /extlib/Stomp.php | |
parent | f92574dbcb1f2d7cd0aaf3c9362db46fa066e888 (diff) | |
parent | c213477081afefb1720c8ae729d1965e7a1dac63 (diff) |
Merge branch '0.9-release'
* 0.9-release: (874 commits)
Removed call to NewDirectMessage() until IE return is fixed i.e.,
Don't show flag user button your own profile
Fixed HXR response for flag user
Using the right form class name
Using common_redirect
Left a form_data class of a <ul> in the user admin panel
Added validation to fields in user admin panel
Added a user admin panel
Added mobile logos for default and identica themes
Changed gif to png
Changed this to action. THANKS zach!
Doing content negotiation only once
Add execute bit to pingqueuehandler
Localisation updates for !StatusNet from !translatewiki.net
Use the browser's geolocation API to set the location on the notice form
Add geometa library, and include it.
Add location form elements to the noticeform, and save their values on submission
Use the $user object nickname, as login name doesnt have to == nickname anymore with plugins such as ldap/etc
Revert "Re added NICKNAME_FMT constant to router.php."
Moved most path and server settings to a new paths admin panel
...
Conflicts:
js/util.js
locale/it_IT/LC_MESSAGES/statusnet.mo
locale/mk_MK/LC_MESSAGES/statusnet.mo
locale/mk_MK/LC_MESSAGES/statusnet.po
locale/pt_BR/LC_MESSAGES/statusnet.mo
locale/vi_VN/LC_MESSAGES/statusnet.mo
plugins/InfiniteScroll/infinitescroll.js
plugins/Realtime/realtimeupdate.js
Diffstat (limited to 'extlib/Stomp.php')
-rw-r--r-- | extlib/Stomp.php | 135 |
1 files changed, 77 insertions, 58 deletions
diff --git a/extlib/Stomp.php b/extlib/Stomp.php index abd9cba62..b55a4aa6d 100644 --- a/extlib/Stomp.php +++ b/extlib/Stomp.php @@ -66,12 +66,13 @@ class Stomp protected $_sessionId; protected $_read_timeout_seconds = 60; protected $_read_timeout_milliseconds = 0; + protected $_connect_timeout_seconds = 60; /** * Constructor * * @param string $brokerUri Broker URL - * @throws Stomp_Exception + * @throws StompException */ public function __construct ($brokerUri) { @@ -81,7 +82,7 @@ class Stomp /** * Initialize connection * - * @throws Stomp_Exception + * @throws StompException */ protected function _init () { @@ -103,14 +104,14 @@ class Stomp } } else { require_once 'Stomp/Exception.php'; - throw new Stomp_Exception("Bad Broker URL {$this->_brokerUri}"); + throw new StompException("Bad Broker URL {$this->_brokerUri}"); } } /** * Process broker URL * * @param string $url Broker URL - * @throws Stomp_Exception + * @throws StompException * @return boolean */ protected function _processUrl ($url) @@ -120,19 +121,19 @@ class Stomp array_push($this->_hosts, array($parsed['host'] , $parsed['port'] , $parsed['scheme'])); } else { require_once 'Stomp/Exception.php'; - throw new Stomp_Exception("Bad Broker URL $url"); + throw new StompException("Bad Broker URL $url"); } } /** * Make socket connection to the server * - * @throws Stomp_Exception + * @throws StompException */ protected function _makeConnection () { if (count($this->_hosts) == 0) { require_once 'Stomp/Exception.php'; - throw new Stomp_Exception("No broker defined"); + throw new StompException("No broker defined"); } // force disconnect, if previous established connection exists @@ -141,6 +142,9 @@ class Stomp $i = $this->_currentHost; $att = 0; $connected = false; + $connect_errno = null; + $connect_errstr = null; + while (! $connected && $att ++ < $this->_attempts) { if (isset($this->_params['randomize']) && $this->_params['randomize'] == 'true') { $i = rand(0, count($this->_hosts) - 1); @@ -158,10 +162,10 @@ class Stomp fclose($this->_socket); $this->_socket = null; } - $this->_socket = @fsockopen($scheme . '://' . $host, $port); + $this->_socket = @fsockopen($scheme . '://' . $host, $port, $connect_errno, $connect_errstr, $this->_connect_timeout_seconds); if (!is_resource($this->_socket) && $att >= $this->_attempts && !array_key_exists($i + 1, $this->_hosts)) { require_once 'Stomp/Exception.php'; - throw new Stomp_Exception("Could not connect to $host:$port ($att/{$this->_attempts})"); + throw new StompException("Could not connect to $host:$port ($att/{$this->_attempts})"); } else if (is_resource($this->_socket)) { $connected = true; $this->_currentHost = $i; @@ -170,7 +174,7 @@ class Stomp } if (! $connected) { require_once 'Stomp/Exception.php'; - throw new Stomp_Exception("Could not connect to a broker"); + throw new StompException("Could not connect to a broker"); } } /** @@ -179,7 +183,7 @@ class Stomp * @param string $username * @param string $password * @return boolean - * @throws Stomp_Exception + * @throws StompException */ public function connect ($username = '', $password = '') { @@ -194,18 +198,18 @@ class Stomp if ($this->clientId != null) { $headers["client-id"] = $this->clientId; } - $frame = new Stomp_Frame("CONNECT", $headers); + $frame = new StompFrame("CONNECT", $headers); $this->_writeFrame($frame); $frame = $this->readFrame(); - if ($frame instanceof Stomp_Frame && $frame->command == 'CONNECTED') { + if ($frame instanceof StompFrame && $frame->command == 'CONNECTED') { $this->_sessionId = $frame->headers["session"]; return true; } else { require_once 'Stomp/Exception.php'; - if ($frame instanceof Stomp_Frame) { - throw new Stomp_Exception("Unexpected command: {$frame->command}", 0, $frame->body); + if ($frame instanceof StompFrame) { + throw new StompException("Unexpected command: {$frame->command}", 0, $frame->body); } else { - throw new Stomp_Exception("Connection not acknowledged"); + throw new StompException("Connection not acknowledged"); } } } @@ -232,21 +236,21 @@ class Stomp * Send a message to a destination in the messaging system * * @param string $destination Destination queue - * @param string|Stomp_Frame $msg Message + * @param string|StompFrame $msg Message * @param array $properties * @param boolean $sync Perform request synchronously * @return boolean */ - public function send ($destination, $msg, $properties = null, $sync = null) + public function send ($destination, $msg, $properties = array(), $sync = null) { - if ($msg instanceof Stomp_Frame) { + if ($msg instanceof StompFrame) { $msg->headers['destination'] = $destination; - $msg->headers = array_merge($msg->headers, $properties); + if (is_array($properties)) $msg->headers = array_merge($msg->headers, $properties); $frame = $msg; } else { $headers = $properties; $headers['destination'] = $destination; - $frame = new Stomp_Frame('SEND', $headers, $msg); + $frame = new StompFrame('SEND', $headers, $msg); } $this->_prepareReceipt($frame, $sync); $this->_writeFrame($frame); @@ -255,10 +259,10 @@ class Stomp /** * Prepair frame receipt * - * @param Stomp_Frame $frame + * @param StompFrame $frame * @param boolean $sync */ - protected function _prepareReceipt (Stomp_Frame $frame, $sync) + protected function _prepareReceipt (StompFrame $frame, $sync) { $receive = $this->sync; if ($sync !== null) { @@ -271,12 +275,12 @@ class Stomp /** * Wait for receipt * - * @param Stomp_Frame $frame + * @param StompFrame $frame * @param boolean $sync * @return boolean - * @throws Stomp_Exception + * @throws StompException */ - protected function _waitForReceipt (Stomp_Frame $frame, $sync) + protected function _waitForReceipt (StompFrame $frame, $sync) { $receive = $this->sync; @@ -289,19 +293,19 @@ class Stomp return true; } $frame = $this->readFrame(); - if ($frame instanceof Stomp_Frame && $frame->command == 'RECEIPT') { + if ($frame instanceof StompFrame && $frame->command == 'RECEIPT') { if ($frame->headers['receipt-id'] == $id) { return true; } else { require_once 'Stomp/Exception.php'; - throw new Stomp_Exception("Unexpected receipt id {$frame->headers['receipt-id']}", 0, $frame->body); + throw new StompException("Unexpected receipt id {$frame->headers['receipt-id']}", 0, $frame->body); } } else { require_once 'Stomp/Exception.php'; - if ($frame instanceof Stomp_Frame) { - throw new Stomp_Exception("Unexpected command {$frame->command}", 0, $frame->body); + if ($frame instanceof StompFrame) { + throw new StompException("Unexpected command {$frame->command}", 0, $frame->body); } else { - throw new Stomp_Exception("Receipt not received"); + throw new StompException("Receipt not received"); } } } @@ -314,7 +318,7 @@ class Stomp * @param array $properties * @param boolean $sync Perform request synchronously * @return boolean - * @throws Stomp_Exception + * @throws StompException */ public function subscribe ($destination, $properties = null, $sync = null) { @@ -329,7 +333,7 @@ class Stomp } } $headers['destination'] = $destination; - $frame = new Stomp_Frame('SUBSCRIBE', $headers); + $frame = new StompFrame('SUBSCRIBE', $headers); $this->_prepareReceipt($frame, $sync); $this->_writeFrame($frame); if ($this->_waitForReceipt($frame, $sync) == true) { @@ -346,7 +350,7 @@ class Stomp * @param array $properties * @param boolean $sync Perform request synchronously * @return boolean - * @throws Stomp_Exception + * @throws StompException */ public function unsubscribe ($destination, $properties = null, $sync = null) { @@ -357,7 +361,7 @@ class Stomp } } $headers['destination'] = $destination; - $frame = new Stomp_Frame('UNSUBSCRIBE', $headers); + $frame = new StompFrame('UNSUBSCRIBE', $headers); $this->_prepareReceipt($frame, $sync); $this->_writeFrame($frame); if ($this->_waitForReceipt($frame, $sync) == true) { @@ -373,7 +377,7 @@ class Stomp * @param string $transactionId * @param boolean $sync Perform request synchronously * @return boolean - * @throws Stomp_Exception + * @throws StompException */ public function begin ($transactionId = null, $sync = null) { @@ -381,7 +385,7 @@ class Stomp if (isset($transactionId)) { $headers['transaction'] = $transactionId; } - $frame = new Stomp_Frame('BEGIN', $headers); + $frame = new StompFrame('BEGIN', $headers); $this->_prepareReceipt($frame, $sync); $this->_writeFrame($frame); return $this->_waitForReceipt($frame, $sync); @@ -392,7 +396,7 @@ class Stomp * @param string $transactionId * @param boolean $sync Perform request synchronously * @return boolean - * @throws Stomp_Exception + * @throws StompException */ public function commit ($transactionId = null, $sync = null) { @@ -400,7 +404,7 @@ class Stomp if (isset($transactionId)) { $headers['transaction'] = $transactionId; } - $frame = new Stomp_Frame('COMMIT', $headers); + $frame = new StompFrame('COMMIT', $headers); $this->_prepareReceipt($frame, $sync); $this->_writeFrame($frame); return $this->_waitForReceipt($frame, $sync); @@ -417,7 +421,7 @@ class Stomp if (isset($transactionId)) { $headers['transaction'] = $transactionId; } - $frame = new Stomp_Frame('ABORT', $headers); + $frame = new StompFrame('ABORT', $headers); $this->_prepareReceipt($frame, $sync); $this->_writeFrame($frame); return $this->_waitForReceipt($frame, $sync); @@ -426,15 +430,19 @@ class Stomp * Acknowledge consumption of a message from a subscription * Note: This operation is always asynchronous * - * @param string|Stomp_Frame $messageMessage ID + * @param string|StompFrame $messageMessage ID * @param string $transactionId * @return boolean - * @throws Stomp_Exception + * @throws StompException */ public function ack ($message, $transactionId = null) { - if ($message instanceof Stomp_Frame) { - $frame = new Stomp_Frame('ACK', $message->headers); + if ($message instanceof StompFrame) { + $headers = $message->headers; + if (isset($transactionId)) { + $headers['transaction'] = $transactionId; + } + $frame = new StompFrame('ACK', $headers); $this->_writeFrame($frame); return true; } else { @@ -443,7 +451,7 @@ class Stomp $headers['transaction'] = $transactionId; } $headers['message-id'] = $message; - $frame = new Stomp_Frame('ACK', $headers); + $frame = new StompFrame('ACK', $headers); $this->_writeFrame($frame); return true; } @@ -461,7 +469,7 @@ class Stomp } if (is_resource($this->_socket)) { - $this->_writeFrame(new Stomp_Frame('DISCONNECT', $headers)); + $this->_writeFrame(new StompFrame('DISCONNECT', $headers)); fclose($this->_socket); } $this->_socket = null; @@ -474,13 +482,13 @@ class Stomp /** * Write frame to server * - * @param Stomp_Frame $stompFrame + * @param StompFrame $stompFrame */ - protected function _writeFrame (Stomp_Frame $stompFrame) + protected function _writeFrame (StompFrame $stompFrame) { if (!is_resource($this->_socket)) { require_once 'Stomp/Exception.php'; - throw new Stomp_Exception('Socket connection hasn\'t been established'); + throw new StompException('Socket connection hasn\'t been established'); } $data = $stompFrame->__toString(); @@ -504,9 +512,9 @@ class Stomp } /** - * Read responce frame from server + * Read response frame from server * - * @return Stomp_Frame|Stomp_Message_Map|boolean False when no frame to read + * @return StompFrame False when no frame to read */ public function readFrame () { @@ -516,15 +524,21 @@ class Stomp $rb = 1024; $data = ''; + $end = false; + do { - $read = fgets($this->_socket, $rb); + $read = fread($this->_socket, $rb); if ($read === false) { $this->_reconnect(); return $this->readFrame(); } $data .= $read; + if (strpos($data, "\x00") !== false) { + $end = true; + $data = rtrim($data, "\n"); + } $len = strlen($data); - } while (($len < 2 || ! ($data[$len - 2] == "\x00" && $data[$len - 1] == "\n"))); + } while ($len < 2 || $end == false); list ($header, $body) = explode("\n\n", $data, 2); $header = explode("\n", $header); @@ -538,13 +552,14 @@ class Stomp $command = $v; } } - $frame = new Stomp_Frame($command, $headers, trim($body)); - if (isset($frame->headers['amq-msg-type']) && $frame->headers['amq-msg-type'] == 'MapMessage') { + $frame = new StompFrame($command, $headers, trim($body)); + if (isset($frame->headers['transformation']) && $frame->headers['transformation'] == 'jms-map-json') { require_once 'Stomp/Message/Map.php'; - return new Stomp_Message_Map($frame); + return new StompMessageMap($frame); } else { return $frame; } + return $frame; } /** @@ -558,10 +573,14 @@ class Stomp $write = null; $except = null; - $has_frame_to_read = stream_select($read, $write, $except, $this->_read_timeout_seconds, $this->_read_timeout_milliseconds); + $has_frame_to_read = @stream_select($read, $write, $except, $this->_read_timeout_seconds, $this->_read_timeout_milliseconds); + + if ($has_frame_to_read !== false) + $has_frame_to_read = count($read); + if ($has_frame_to_read === false) { - throw new Stomp_Exception('Check failed to determin if the socket is readable'); + throw new StompException('Check failed to determine if the socket is readable'); } else if ($has_frame_to_read > 0) { return true; } else { |