diff options
author | Brion Vibber <brion@pobox.com> | 2010-11-03 16:09:49 -0700 |
---|---|---|
committer | Brion Vibber <brion@pobox.com> | 2010-11-03 16:09:49 -0700 |
commit | b716d01a41c54c06676a1e54a48c36e05755a247 (patch) | |
tree | 822280a837809452c94905b2beb1d91a760752c7 /plugins/TwitterBridge/jsonstreamreader.php | |
parent | 0229c22d2358933758fea98284349a871106b3bd (diff) | |
parent | 28e009898f6ce6dd72471a5ae4abbc91e6b00193 (diff) |
Merge branch '0.9.x' into 1.0.x
Diffstat (limited to 'plugins/TwitterBridge/jsonstreamreader.php')
-rw-r--r-- | plugins/TwitterBridge/jsonstreamreader.php | 265 |
1 files changed, 265 insertions, 0 deletions
diff --git a/plugins/TwitterBridge/jsonstreamreader.php b/plugins/TwitterBridge/jsonstreamreader.php new file mode 100644 index 000000000..f6572c9ee --- /dev/null +++ b/plugins/TwitterBridge/jsonstreamreader.php @@ -0,0 +1,265 @@ +<?php +/** + * StatusNet, the distributed open-source microblogging tool + * + * PHP version 5 + * + * LICENCE: This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * @category Plugin + * @package StatusNet + * @author Brion Vibber <brion@status.net> + * @copyright 2010 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://status.net/ + */ + +class OAuthData +{ + public $consumer_key, $consumer_secret, $token, $token_secret; +} + +/** + * + */ +abstract class JsonStreamReader +{ + const CRLF = "\r\n"; + + public $id; + protected $socket = null; + protected $state = 'init'; // 'init', 'connecting', 'waiting', 'headers', 'active' + + public function __construct() + { + $this->id = get_class($this) . '.' . substr(md5(mt_rand()), 0, 8); + } + + /** + * Starts asynchronous connect operation... + * + * @fixme Can we do the open-socket fully async to? (need write select infrastructure) + * + * @param string $url + */ + public function connect($url) + { + common_log(LOG_DEBUG, "$this->id opening connection to $url"); + + $scheme = parse_url($url, PHP_URL_SCHEME); + if ($scheme == 'http') { + $rawScheme = 'tcp'; + } else if ($scheme == 'https') { + $rawScheme = 'ssl'; + } else { + throw new ServerException('Invalid URL scheme for HTTP stream reader'); + } + + $host = parse_url($url, PHP_URL_HOST); + $port = parse_url($url, PHP_URL_PORT); + if (!$port) { + if ($scheme == 'https') { + $port = 443; + } else { + $port = 80; + } + } + + $path = parse_url($url, PHP_URL_PATH); + $query = parse_url($url, PHP_URL_QUERY); + if ($query) { + $path .= '?' . $query; + } + + $errno = $errstr = null; + $timeout = 5; + //$flags = STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT; + $flags = STREAM_CLIENT_CONNECT; + // @fixme add SSL params + $this->socket = stream_socket_client("$rawScheme://$host:$port", $errno, $errstr, $timeout, $flags); + + $this->send($this->httpOpen($host, $path)); + + stream_set_blocking($this->socket, false); + $this->state = 'waiting'; + } + + /** + * Send some fun data off to the server. + * + * @param string $buffer + */ + function send($buffer) + { + fwrite($this->socket, $buffer); + } + + /** + * Read next packet of data from the socket. + * + * @return string + */ + function read() + { + $buffer = fread($this->socket, 65536); + return $buffer; + } + + /** + * Build HTTP request headers. + * + * @param string $host + * @param string $path + * @return string + */ + protected function httpOpen($host, $path) + { + $lines = array( + "GET $path HTTP/1.1", + "Host: $host", + "User-Agent: StatusNet/" . STATUSNET_VERSION . " (TwitterBridgePlugin)", + "Connection: close", + "", + "" + ); + return implode(self::CRLF, $lines); + } + + /** + * Close the current connection, if open. + */ + public function close() + { + if ($this->isConnected()) { + common_log(LOG_DEBUG, "$this->id closing connection."); + fclose($this->socket); + $this->socket = null; + } + } + + /** + * Are we currently connected? + * + * @return boolean + */ + public function isConnected() + { + return $this->socket !== null; + } + + /** + * Send any sockets we're listening on to the IO manager + * to wait for input. + * + * @return array of resources + */ + public function getSockets() + { + if ($this->isConnected()) { + return array($this->socket); + } + return array(); + } + + /** + * Take a chunk of input over the horn and go go go! :D + * + * @param string $buffer + */ + public function handleInput($socket) + { + if ($this->socket !== $socket) { + throw new Exception('Got input from unexpected socket!'); + } + + try { + $buffer = $this->read(); + $lines = explode(self::CRLF, $buffer); + foreach ($lines as $line) { + $this->handleLine($line); + } + } catch (Exception $e) { + common_log(LOG_ERR, "$this->id aborting connection due to error: " . $e->getMessage()); + fclose($this->socket); + throw $e; + } + } + + protected function handleLine($line) + { + switch ($this->state) + { + case 'waiting': + $this->handleLineWaiting($line); + break; + case 'headers': + $this->handleLineHeaders($line); + break; + case 'active': + $this->handleLineActive($line); + break; + default: + throw new Exception('Invalid state in handleLine: ' . $this->state); + } + } + + /** + * + * @param <type> $line + */ + protected function handleLineWaiting($line) + { + $bits = explode(' ', $line, 3); + if (count($bits) != 3) { + throw new Exception("Invalid HTTP response line: $line"); + } + + list($http, $status, $text) = $bits; + if (substr($http, 0, 5) != 'HTTP/') { + throw new Exception("Invalid HTTP response line chunk '$http': $line"); + } + if ($status != '200') { + throw new Exception("Bad HTTP response code $status: $line"); + } + common_log(LOG_DEBUG, "$this->id $line"); + $this->state = 'headers'; + } + + protected function handleLineHeaders($line) + { + if ($line == '') { + $this->state = 'active'; + common_log(LOG_DEBUG, "$this->id connection is active!"); + } else { + common_log(LOG_DEBUG, "$this->id read HTTP header: $line"); + $this->responseHeaders[] = $line; + } + } + + protected function handleLineActive($line) + { + if ($line == "") { + // Server sends empty lines as keepalive. + return; + } + $data = json_decode($line); + if ($data) { + $this->handleJson($data); + } else { + common_log(LOG_ERR, "$this->id received bogus JSON data: " . var_export($line, true)); + } + } + + abstract protected function handleJson(stdClass $data); +} |