summaryrefslogtreecommitdiff
path: root/plugins/TwitterBridge
diff options
context:
space:
mode:
authorBrion Vibber <brion@pobox.com>2010-10-29 13:18:03 -0700
committerBrion Vibber <brion@pobox.com>2010-10-29 13:18:03 -0700
commit47eada3a95bbe92619f2fc070f24e429f28e6fa8 (patch)
tree4c793ce22f1152d26b1469e697ff0ca630c428d4 /plugins/TwitterBridge
parent86adc575ecc1dce990540d06fc86735215ae1ea1 (diff)
Work in progress on site streams-aware TwitterDaemon
Diffstat (limited to 'plugins/TwitterBridge')
-rw-r--r--plugins/TwitterBridge/daemons/twitterdaemon.php58
1 files changed, 25 insertions, 33 deletions
diff --git a/plugins/TwitterBridge/daemons/twitterdaemon.php b/plugins/TwitterBridge/daemons/twitterdaemon.php
index f97f3179b..851d191dd 100644
--- a/plugins/TwitterBridge/daemons/twitterdaemon.php
+++ b/plugins/TwitterBridge/daemons/twitterdaemon.php
@@ -114,7 +114,7 @@ class TwitterManager extends IoManager
* @fixme check their last-id and check whether we'll need to do a manual pull.
* @fixme abstract out the fetching so we can work over multiple sites.
*/
- function initStreams()
+ protected function initStreams()
{
// Pull Twitter user IDs for all users we want to pull data for
$flink = new Foreign_link();
@@ -146,7 +146,7 @@ class TwitterManager extends IoManager
*
* @param $users array of Twitter-side user IDs
*/
- function spawnStream($users)
+ protected function spawnStream($users)
{
$stream = $this->initSiteStream();
$stream->followUsers($userIds);
@@ -168,7 +168,7 @@ class TwitterManager extends IoManager
*
* @return TwitterStreamReader
*/
- function initSiteStream()
+ protected function initSiteStream()
{
$auth = $this->siteStreamAuth();
$stream = new TwitterSiteStream($auth);
@@ -190,7 +190,7 @@ class TwitterManager extends IoManager
*
* @return TwitterOAuthClient
*/
- function siteStreamAuth()
+ protected function siteStreamAuth()
{
$token = common_config('twitter', 'stream_token');
$secret = common_config('twitter', 'stream_secret');
@@ -205,7 +205,7 @@ class TwitterManager extends IoManager
*
* @return array of resources
*/
- function getSockets()
+ public function getSockets()
{
$sockets = array();
foreach ($this->streams as $stream) {
@@ -223,7 +223,7 @@ class TwitterManager extends IoManager
* @param resource $socket
* @return boolean success
*/
- function handleInput($socket)
+ public function handleInput($socket)
{
foreach ($this->streams as $stream) {
foreach ($stream->getSockets() as $aSocket) {
@@ -236,11 +236,12 @@ class TwitterManager extends IoManager
}
/**
- * Start the system up!
+ * Start the i/o system up! Prepare our connections and start opening them.
+ *
* @fixme do some rate-limiting on the stream setup
* @fixme do some sensible backoff on failure etc
*/
- function start()
+ public function start()
{
$this->initStreams();
foreach ($this->streams as $stream) {
@@ -249,7 +250,10 @@ class TwitterManager extends IoManager
return true;
}
- function finish()
+ /**
+ * Close down our connections when the daemon wraps up for business.
+ */
+ public function finish()
{
foreach ($this->streams as $index => $stream) {
$stream->close();
@@ -280,33 +284,21 @@ class TwitterManager extends IoManager
/**
* Event callback notifying that a user has a new message in their home timeline.
+ * We store the incoming message into the queues for processing, keeping our own
+ * daemon running as shiny-fast as possible.
*
- * @param object $data JSON data: Twitter status update
- */
- protected function onTwitterStatus($data, $context)
- {
- $importer = new TwitterImport();
- $notice = $importer->importStatus($data);
- if ($notice) {
- $user = $this->getTwitterUser($context);
- Inbox::insertNotice($user->id, $notice->id);
- }
- }
-
- /**
- * @fixme what about handling multiple sites?
+ * @param object $status JSON data: Twitter status update
+ * @fixme in all-sites mode we may need to route queue items into another site's
+ * destination queues, or multiple sites.
*/
- function getTwitterUser($context)
+ protected function onTwitterStatus($status, $context)
{
- if ($context->source != 'sitestream') {
- throw new ServerException("Unexpected stream source");
- }
- $flink = Foreign_link::getByForeignID(TWITTER_SERVICE, $context->for_user);
- if ($flink) {
- return $flink->getUser();
- } else {
- throw new ServerException("No local user for this Twitter ID");
- }
+ $data = array(
+ 'status' => $status,
+ 'for_user' => $context->for_user,
+ );
+ $qm = QueueManager::get();
+ $qm->enqueue($data, 'tweetin');
}
}