summaryrefslogtreecommitdiff
path: root/extlib/Stomp.php
diff options
context:
space:
mode:
authorZach Copley <zach@status.net>2009-11-19 20:12:46 -0800
committerZach Copley <zach@status.net>2009-11-19 20:12:46 -0800
commit4b98edf75f4e255f8c61087bd1525d89653a521f (patch)
treeb2a7eb6d77429eadb1beabe2d5e6ae1c1a2831d6 /extlib/Stomp.php
parentf92574dbcb1f2d7cd0aaf3c9362db46fa066e888 (diff)
parentc213477081afefb1720c8ae729d1965e7a1dac63 (diff)
Merge branch '0.9-release'
* 0.9-release: (874 commits) Removed call to NewDirectMessage() until IE return is fixed i.e., Don't show flag user button your own profile Fixed HXR response for flag user Using the right form class name Using common_redirect Left a form_data class of a <ul> in the user admin panel Added validation to fields in user admin panel Added a user admin panel Added mobile logos for default and identica themes Changed gif to png Changed this to action. THANKS zach! Doing content negotiation only once Add execute bit to pingqueuehandler Localisation updates for !StatusNet from !translatewiki.net Use the browser's geolocation API to set the location on the notice form Add geometa library, and include it. Add location form elements to the noticeform, and save their values on submission Use the $user object nickname, as login name doesnt have to == nickname anymore with plugins such as ldap/etc Revert "Re added NICKNAME_FMT constant to router.php." Moved most path and server settings to a new paths admin panel ... Conflicts: js/util.js locale/it_IT/LC_MESSAGES/statusnet.mo locale/mk_MK/LC_MESSAGES/statusnet.mo locale/mk_MK/LC_MESSAGES/statusnet.po locale/pt_BR/LC_MESSAGES/statusnet.mo locale/vi_VN/LC_MESSAGES/statusnet.mo plugins/InfiniteScroll/infinitescroll.js plugins/Realtime/realtimeupdate.js
Diffstat (limited to 'extlib/Stomp.php')
-rw-r--r--extlib/Stomp.php135
1 files changed, 77 insertions, 58 deletions
diff --git a/extlib/Stomp.php b/extlib/Stomp.php
index abd9cba62..b55a4aa6d 100644
--- a/extlib/Stomp.php
+++ b/extlib/Stomp.php
@@ -66,12 +66,13 @@ class Stomp
protected $_sessionId;
protected $_read_timeout_seconds = 60;
protected $_read_timeout_milliseconds = 0;
+ protected $_connect_timeout_seconds = 60;
/**
* Constructor
*
* @param string $brokerUri Broker URL
- * @throws Stomp_Exception
+ * @throws StompException
*/
public function __construct ($brokerUri)
{
@@ -81,7 +82,7 @@ class Stomp
/**
* Initialize connection
*
- * @throws Stomp_Exception
+ * @throws StompException
*/
protected function _init ()
{
@@ -103,14 +104,14 @@ class Stomp
}
} else {
require_once 'Stomp/Exception.php';
- throw new Stomp_Exception("Bad Broker URL {$this->_brokerUri}");
+ throw new StompException("Bad Broker URL {$this->_brokerUri}");
}
}
/**
* Process broker URL
*
* @param string $url Broker URL
- * @throws Stomp_Exception
+ * @throws StompException
* @return boolean
*/
protected function _processUrl ($url)
@@ -120,19 +121,19 @@ class Stomp
array_push($this->_hosts, array($parsed['host'] , $parsed['port'] , $parsed['scheme']));
} else {
require_once 'Stomp/Exception.php';
- throw new Stomp_Exception("Bad Broker URL $url");
+ throw new StompException("Bad Broker URL $url");
}
}
/**
* Make socket connection to the server
*
- * @throws Stomp_Exception
+ * @throws StompException
*/
protected function _makeConnection ()
{
if (count($this->_hosts) == 0) {
require_once 'Stomp/Exception.php';
- throw new Stomp_Exception("No broker defined");
+ throw new StompException("No broker defined");
}
// force disconnect, if previous established connection exists
@@ -141,6 +142,9 @@ class Stomp
$i = $this->_currentHost;
$att = 0;
$connected = false;
+ $connect_errno = null;
+ $connect_errstr = null;
+
while (! $connected && $att ++ < $this->_attempts) {
if (isset($this->_params['randomize']) && $this->_params['randomize'] == 'true') {
$i = rand(0, count($this->_hosts) - 1);
@@ -158,10 +162,10 @@ class Stomp
fclose($this->_socket);
$this->_socket = null;
}
- $this->_socket = @fsockopen($scheme . '://' . $host, $port);
+ $this->_socket = @fsockopen($scheme . '://' . $host, $port, $connect_errno, $connect_errstr, $this->_connect_timeout_seconds);
if (!is_resource($this->_socket) && $att >= $this->_attempts && !array_key_exists($i + 1, $this->_hosts)) {
require_once 'Stomp/Exception.php';
- throw new Stomp_Exception("Could not connect to $host:$port ($att/{$this->_attempts})");
+ throw new StompException("Could not connect to $host:$port ($att/{$this->_attempts})");
} else if (is_resource($this->_socket)) {
$connected = true;
$this->_currentHost = $i;
@@ -170,7 +174,7 @@ class Stomp
}
if (! $connected) {
require_once 'Stomp/Exception.php';
- throw new Stomp_Exception("Could not connect to a broker");
+ throw new StompException("Could not connect to a broker");
}
}
/**
@@ -179,7 +183,7 @@ class Stomp
* @param string $username
* @param string $password
* @return boolean
- * @throws Stomp_Exception
+ * @throws StompException
*/
public function connect ($username = '', $password = '')
{
@@ -194,18 +198,18 @@ class Stomp
if ($this->clientId != null) {
$headers["client-id"] = $this->clientId;
}
- $frame = new Stomp_Frame("CONNECT", $headers);
+ $frame = new StompFrame("CONNECT", $headers);
$this->_writeFrame($frame);
$frame = $this->readFrame();
- if ($frame instanceof Stomp_Frame && $frame->command == 'CONNECTED') {
+ if ($frame instanceof StompFrame && $frame->command == 'CONNECTED') {
$this->_sessionId = $frame->headers["session"];
return true;
} else {
require_once 'Stomp/Exception.php';
- if ($frame instanceof Stomp_Frame) {
- throw new Stomp_Exception("Unexpected command: {$frame->command}", 0, $frame->body);
+ if ($frame instanceof StompFrame) {
+ throw new StompException("Unexpected command: {$frame->command}", 0, $frame->body);
} else {
- throw new Stomp_Exception("Connection not acknowledged");
+ throw new StompException("Connection not acknowledged");
}
}
}
@@ -232,21 +236,21 @@ class Stomp
* Send a message to a destination in the messaging system
*
* @param string $destination Destination queue
- * @param string|Stomp_Frame $msg Message
+ * @param string|StompFrame $msg Message
* @param array $properties
* @param boolean $sync Perform request synchronously
* @return boolean
*/
- public function send ($destination, $msg, $properties = null, $sync = null)
+ public function send ($destination, $msg, $properties = array(), $sync = null)
{
- if ($msg instanceof Stomp_Frame) {
+ if ($msg instanceof StompFrame) {
$msg->headers['destination'] = $destination;
- $msg->headers = array_merge($msg->headers, $properties);
+ if (is_array($properties)) $msg->headers = array_merge($msg->headers, $properties);
$frame = $msg;
} else {
$headers = $properties;
$headers['destination'] = $destination;
- $frame = new Stomp_Frame('SEND', $headers, $msg);
+ $frame = new StompFrame('SEND', $headers, $msg);
}
$this->_prepareReceipt($frame, $sync);
$this->_writeFrame($frame);
@@ -255,10 +259,10 @@ class Stomp
/**
* Prepair frame receipt
*
- * @param Stomp_Frame $frame
+ * @param StompFrame $frame
* @param boolean $sync
*/
- protected function _prepareReceipt (Stomp_Frame $frame, $sync)
+ protected function _prepareReceipt (StompFrame $frame, $sync)
{
$receive = $this->sync;
if ($sync !== null) {
@@ -271,12 +275,12 @@ class Stomp
/**
* Wait for receipt
*
- * @param Stomp_Frame $frame
+ * @param StompFrame $frame
* @param boolean $sync
* @return boolean
- * @throws Stomp_Exception
+ * @throws StompException
*/
- protected function _waitForReceipt (Stomp_Frame $frame, $sync)
+ protected function _waitForReceipt (StompFrame $frame, $sync)
{
$receive = $this->sync;
@@ -289,19 +293,19 @@ class Stomp
return true;
}
$frame = $this->readFrame();
- if ($frame instanceof Stomp_Frame && $frame->command == 'RECEIPT') {
+ if ($frame instanceof StompFrame && $frame->command == 'RECEIPT') {
if ($frame->headers['receipt-id'] == $id) {
return true;
} else {
require_once 'Stomp/Exception.php';
- throw new Stomp_Exception("Unexpected receipt id {$frame->headers['receipt-id']}", 0, $frame->body);
+ throw new StompException("Unexpected receipt id {$frame->headers['receipt-id']}", 0, $frame->body);
}
} else {
require_once 'Stomp/Exception.php';
- if ($frame instanceof Stomp_Frame) {
- throw new Stomp_Exception("Unexpected command {$frame->command}", 0, $frame->body);
+ if ($frame instanceof StompFrame) {
+ throw new StompException("Unexpected command {$frame->command}", 0, $frame->body);
} else {
- throw new Stomp_Exception("Receipt not received");
+ throw new StompException("Receipt not received");
}
}
}
@@ -314,7 +318,7 @@ class Stomp
* @param array $properties
* @param boolean $sync Perform request synchronously
* @return boolean
- * @throws Stomp_Exception
+ * @throws StompException
*/
public function subscribe ($destination, $properties = null, $sync = null)
{
@@ -329,7 +333,7 @@ class Stomp
}
}
$headers['destination'] = $destination;
- $frame = new Stomp_Frame('SUBSCRIBE', $headers);
+ $frame = new StompFrame('SUBSCRIBE', $headers);
$this->_prepareReceipt($frame, $sync);
$this->_writeFrame($frame);
if ($this->_waitForReceipt($frame, $sync) == true) {
@@ -346,7 +350,7 @@ class Stomp
* @param array $properties
* @param boolean $sync Perform request synchronously
* @return boolean
- * @throws Stomp_Exception
+ * @throws StompException
*/
public function unsubscribe ($destination, $properties = null, $sync = null)
{
@@ -357,7 +361,7 @@ class Stomp
}
}
$headers['destination'] = $destination;
- $frame = new Stomp_Frame('UNSUBSCRIBE', $headers);
+ $frame = new StompFrame('UNSUBSCRIBE', $headers);
$this->_prepareReceipt($frame, $sync);
$this->_writeFrame($frame);
if ($this->_waitForReceipt($frame, $sync) == true) {
@@ -373,7 +377,7 @@ class Stomp
* @param string $transactionId
* @param boolean $sync Perform request synchronously
* @return boolean
- * @throws Stomp_Exception
+ * @throws StompException
*/
public function begin ($transactionId = null, $sync = null)
{
@@ -381,7 +385,7 @@ class Stomp
if (isset($transactionId)) {
$headers['transaction'] = $transactionId;
}
- $frame = new Stomp_Frame('BEGIN', $headers);
+ $frame = new StompFrame('BEGIN', $headers);
$this->_prepareReceipt($frame, $sync);
$this->_writeFrame($frame);
return $this->_waitForReceipt($frame, $sync);
@@ -392,7 +396,7 @@ class Stomp
* @param string $transactionId
* @param boolean $sync Perform request synchronously
* @return boolean
- * @throws Stomp_Exception
+ * @throws StompException
*/
public function commit ($transactionId = null, $sync = null)
{
@@ -400,7 +404,7 @@ class Stomp
if (isset($transactionId)) {
$headers['transaction'] = $transactionId;
}
- $frame = new Stomp_Frame('COMMIT', $headers);
+ $frame = new StompFrame('COMMIT', $headers);
$this->_prepareReceipt($frame, $sync);
$this->_writeFrame($frame);
return $this->_waitForReceipt($frame, $sync);
@@ -417,7 +421,7 @@ class Stomp
if (isset($transactionId)) {
$headers['transaction'] = $transactionId;
}
- $frame = new Stomp_Frame('ABORT', $headers);
+ $frame = new StompFrame('ABORT', $headers);
$this->_prepareReceipt($frame, $sync);
$this->_writeFrame($frame);
return $this->_waitForReceipt($frame, $sync);
@@ -426,15 +430,19 @@ class Stomp
* Acknowledge consumption of a message from a subscription
* Note: This operation is always asynchronous
*
- * @param string|Stomp_Frame $messageMessage ID
+ * @param string|StompFrame $messageMessage ID
* @param string $transactionId
* @return boolean
- * @throws Stomp_Exception
+ * @throws StompException
*/
public function ack ($message, $transactionId = null)
{
- if ($message instanceof Stomp_Frame) {
- $frame = new Stomp_Frame('ACK', $message->headers);
+ if ($message instanceof StompFrame) {
+ $headers = $message->headers;
+ if (isset($transactionId)) {
+ $headers['transaction'] = $transactionId;
+ }
+ $frame = new StompFrame('ACK', $headers);
$this->_writeFrame($frame);
return true;
} else {
@@ -443,7 +451,7 @@ class Stomp
$headers['transaction'] = $transactionId;
}
$headers['message-id'] = $message;
- $frame = new Stomp_Frame('ACK', $headers);
+ $frame = new StompFrame('ACK', $headers);
$this->_writeFrame($frame);
return true;
}
@@ -461,7 +469,7 @@ class Stomp
}
if (is_resource($this->_socket)) {
- $this->_writeFrame(new Stomp_Frame('DISCONNECT', $headers));
+ $this->_writeFrame(new StompFrame('DISCONNECT', $headers));
fclose($this->_socket);
}
$this->_socket = null;
@@ -474,13 +482,13 @@ class Stomp
/**
* Write frame to server
*
- * @param Stomp_Frame $stompFrame
+ * @param StompFrame $stompFrame
*/
- protected function _writeFrame (Stomp_Frame $stompFrame)
+ protected function _writeFrame (StompFrame $stompFrame)
{
if (!is_resource($this->_socket)) {
require_once 'Stomp/Exception.php';
- throw new Stomp_Exception('Socket connection hasn\'t been established');
+ throw new StompException('Socket connection hasn\'t been established');
}
$data = $stompFrame->__toString();
@@ -504,9 +512,9 @@ class Stomp
}
/**
- * Read responce frame from server
+ * Read response frame from server
*
- * @return Stomp_Frame|Stomp_Message_Map|boolean False when no frame to read
+ * @return StompFrame False when no frame to read
*/
public function readFrame ()
{
@@ -516,15 +524,21 @@ class Stomp
$rb = 1024;
$data = '';
+ $end = false;
+
do {
- $read = fgets($this->_socket, $rb);
+ $read = fread($this->_socket, $rb);
if ($read === false) {
$this->_reconnect();
return $this->readFrame();
}
$data .= $read;
+ if (strpos($data, "\x00") !== false) {
+ $end = true;
+ $data = rtrim($data, "\n");
+ }
$len = strlen($data);
- } while (($len < 2 || ! ($data[$len - 2] == "\x00" && $data[$len - 1] == "\n")));
+ } while ($len < 2 || $end == false);
list ($header, $body) = explode("\n\n", $data, 2);
$header = explode("\n", $header);
@@ -538,13 +552,14 @@ class Stomp
$command = $v;
}
}
- $frame = new Stomp_Frame($command, $headers, trim($body));
- if (isset($frame->headers['amq-msg-type']) && $frame->headers['amq-msg-type'] == 'MapMessage') {
+ $frame = new StompFrame($command, $headers, trim($body));
+ if (isset($frame->headers['transformation']) && $frame->headers['transformation'] == 'jms-map-json') {
require_once 'Stomp/Message/Map.php';
- return new Stomp_Message_Map($frame);
+ return new StompMessageMap($frame);
} else {
return $frame;
}
+ return $frame;
}
/**
@@ -558,10 +573,14 @@ class Stomp
$write = null;
$except = null;
- $has_frame_to_read = stream_select($read, $write, $except, $this->_read_timeout_seconds, $this->_read_timeout_milliseconds);
+ $has_frame_to_read = @stream_select($read, $write, $except, $this->_read_timeout_seconds, $this->_read_timeout_milliseconds);
+
+ if ($has_frame_to_read !== false)
+ $has_frame_to_read = count($read);
+
if ($has_frame_to_read === false) {
- throw new Stomp_Exception('Check failed to determin if the socket is readable');
+ throw new StompException('Check failed to determine if the socket is readable');
} else if ($has_frame_to_read > 0) {
return true;
} else {