From e76028b629ccc819160ff5a95393c4cb86b34c1f Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Thu, 28 Oct 2010 18:26:48 -0700 Subject: Work in progress: starting on new TwitterDaemon using the Site Streams API -- code is incomplete, pulling bits from streamtest.php pending a chance to test the actual site-streams mode --- plugins/TwitterBridge/daemons/twitterdaemon.php | 327 ++++++++++++++++++++++++ 1 file changed, 327 insertions(+) create mode 100644 plugins/TwitterBridge/daemons/twitterdaemon.php (limited to 'plugins') diff --git a/plugins/TwitterBridge/daemons/twitterdaemon.php b/plugins/TwitterBridge/daemons/twitterdaemon.php new file mode 100644 index 000000000..f97f3179b --- /dev/null +++ b/plugins/TwitterBridge/daemons/twitterdaemon.php @@ -0,0 +1,327 @@ +#!/usr/bin/env php +. + */ + +define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); + +$shortoptions = 'fi::a'; +$longoptions = array('id::', 'foreground', 'all'); + +$helptext = <<allsites = $allsites; + } + + function runThread() + { + common_log(LOG_INFO, 'Waiting to listen to Twitter and queues'); + + $master = new TwitterMaster($this->get_id(), $this->processManager()); + $master->init($this->allsites); + $master->service(); + + common_log(LOG_INFO, 'terminating normally'); + + return $master->respawn ? self::EXIT_RESTART : self::EXIT_SHUTDOWN; + } + +} + +class TwitterMaster extends IoMaster +{ + protected $processManager; + + function __construct($id, $processManager) + { + parent::__construct($id); + $this->processManager = $processManager; + } + + /** + * Initialize IoManagers for the currently configured site + * which are appropriate to this instance. + */ + function initManagers() + { + if (common_config('twitter', 'enabled')) { + $qm = QueueManager::get(); + $qm->setActiveGroup('twitter'); + $this->instantiate($qm); + $this->instantiate(TwitterManager::get()); + $this->instantiate($this->processManager); + } + } +} + + +class TwitterManager extends IoManager +{ + // Recommended resource limits from http://dev.twitter.com/pages/site_streams + const MAX_STREAMS = 1000; + const USERS_PER_STREAM = 100; + const STREAMS_PER_SECOND = 20; + + protected $twitterStreams; + protected $twitterUsers; + + function __construct() + { + } + + /** + * Pull the site's active Twitter-importing users and start spawning + * some data streams for them! + * + * @fixme check their last-id and check whether we'll need to do a manual pull. + * @fixme abstract out the fetching so we can work over multiple sites. + */ + function initStreams() + { + // Pull Twitter user IDs for all users we want to pull data for + $flink = new Foreign_link(); + $flink->service = TWITTER_SERVICE; + // @fixme probably should do the bitfield check in a whereAdd but it's ugly :D + $flink->find(); + + $userIds = array(); + while ($flink->fetch()) { + if (($flink->noticesync & FOREIGN_NOTICE_RECV) == + FOREIGN_NOTICE_RECV) { + $userIds[] = $flink->foreign_id; + + if (count($userIds) >= self::USERS_PER_STREAM) { + $this->spawnStream($userIds); + $userIds = array(); + } + } + } + + if (count($userIds)) { + $this->spawnStream($userIds); + } + } + + /** + * Prepare a Site Stream connection for the given chunk of users. + * The actual connection will be opened later. + * + * @param $users array of Twitter-side user IDs + */ + function spawnStream($users) + { + $stream = $this->initSiteStream(); + $stream->followUsers($userIds); + + // Slip the stream reader into our list of active streams. + // We'll manage its actual connection on the next go-around. + $this->streams[] = $stream; + + // Record the user->stream mappings; this makes it easier for us to know + // later if we need to kill something. + foreach ($userIds as $id) { + $this->users[$id] = $stream; + } + } + + /** + * Initialize a generic site streams connection object. + * All our connections will look like this, then we'll add users to them. + * + * @return TwitterStreamReader + */ + function initSiteStream() + { + $auth = $this->siteStreamAuth(); + $stream = new TwitterSiteStream($auth); + + // Add our event handler callbacks. Whee! + $this->setupEvents($stream); + return $stream; + } + + /** + * Fetch the Twitter OAuth credentials to use to connect to the Site Streams API. + * + * This will use the locally-stored credentials for the applictation's owner account + * from the site configuration. These should be configured through the administration + * panels or manually in the config file. + * + * Will throw an exception if no credentials can be found -- but beware that invalid + * credentials won't cause breakage until later. + * + * @return TwitterOAuthClient + */ + function siteStreamAuth() + { + $token = common_config('twitter', 'stream_token'); + $secret = common_config('twitter', 'stream_secret'); + if (empty($token) || empty($secret)) { + throw new ServerException('Twitter site streams have not been correctly configured. Configure the app owner account via the admin panel.'); + } + return new TwitterOAuthClient($token, $secret); + } + + /** + * Collect the sockets for all active connections for i/o monitoring. + * + * @return array of resources + */ + function getSockets() + { + $sockets = array(); + foreach ($this->streams as $stream) { + foreach ($stream->getSockets() as $socket) { + $sockets[] = $socket; + } + } + return $streams; + } + + /** + * We're ready to process input from one of our data sources! Woooooo! + * @fixme is there an easier way to map from socket back to owning module? :( + * + * @param resource $socket + * @return boolean success + */ + function handleInput($socket) + { + foreach ($this->streams as $stream) { + foreach ($stream->getSockets() as $aSocket) { + if ($socket === $aSocket) { + $stream->handleInput($socket); + } + } + } + return true; + } + + /** + * Start the system up! + * @fixme do some rate-limiting on the stream setup + * @fixme do some sensible backoff on failure etc + */ + function start() + { + $this->initStreams(); + foreach ($this->streams as $stream) { + $stream->connect(); + } + return true; + } + + function finish() + { + foreach ($this->streams as $index => $stream) { + $stream->close(); + unset($this->streams[$index]); + } + return true; + } + + public static function get() + { + throw new Exception('not a singleton'); + } + + /** + * Set up event handlers on the streaming interface. + * + * @fixme add more event types as we add handling for them + */ + protected function setupEvents(TwitterStream $stream) + { + $handlers = array( + 'status', + ); + foreach ($handlers as $event) { + $stream->hookEvent($event, array($this, 'onTwitter' . ucfirst($event))); + } + } + + /** + * Event callback notifying that a user has a new message in their home timeline. + * + * @param object $data JSON data: Twitter status update + */ + protected function onTwitterStatus($data, $context) + { + $importer = new TwitterImport(); + $notice = $importer->importStatus($data); + if ($notice) { + $user = $this->getTwitterUser($context); + Inbox::insertNotice($user->id, $notice->id); + } + } + + /** + * @fixme what about handling multiple sites? + */ + function getTwitterUser($context) + { + if ($context->source != 'sitestream') { + throw new ServerException("Unexpected stream source"); + } + $flink = Foreign_link::getByForeignID(TWITTER_SERVICE, $context->for_user); + if ($flink) { + return $flink->getUser(); + } else { + throw new ServerException("No local user for this Twitter ID"); + } + } +} + + +if (have_option('i', 'id')) { + $id = get_option_value('i', 'id'); +} else if (count($args) > 0) { + $id = $args[0]; +} else { + $id = null; +} + +$foreground = have_option('f', 'foreground'); +$all = have_option('a') || have_option('--all'); + +$daemon = new TwitterDaemon($id, !$foreground, 1, $all); + +$daemon->runOnce(); -- cgit v1.2.3-54-g00ecf