summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorBrion Vibber <brion@pobox.com>2010-02-12 11:18:35 -0800
committerBrion Vibber <brion@pobox.com>2010-02-12 11:18:35 -0800
commit122c8677b7004bae4cfe7e2bd49fc1bc3187c72c (patch)
tree552bf77b587fbb674329a95745067f358c0a25d3 /lib
parent3c79448cd817d01b4421262fefc29eb558cede20 (diff)
parentfd3c9334bfcfe627446feb86ac3054b24ed05449 (diff)
Merge branch 'testing' of gitorious.org:statusnet/mainline into 0.9.x
Diffstat (limited to 'lib')
-rw-r--r--lib/api.php18
-rw-r--r--lib/atom10entry.php106
-rw-r--r--lib/atom10feed.php227
-rw-r--r--lib/atomnoticefeed.php103
-rw-r--r--lib/default.php1
-rw-r--r--lib/queuemanager.php36
-rw-r--r--lib/spawningdaemon.php2
-rw-r--r--lib/stompqueuemanager.php44
-rw-r--r--lib/util.php2
9 files changed, 496 insertions, 43 deletions
diff --git a/lib/api.php b/lib/api.php
index b987badc0..5758cc874 100644
--- a/lib/api.php
+++ b/lib/api.php
@@ -1322,4 +1322,22 @@ class ApiAction extends Action
}
}
+ function getSelfUri($action, $aargs)
+ {
+ parse_str($_SERVER['QUERY_STRING'], $params);
+ $pstring = '';
+ if (!empty($params)) {
+ unset($params['p']);
+ $pstring = http_build_query($params);
+ }
+
+ $uri = common_local_url($action, $aargs);
+
+ if (!empty($pstring)) {
+ $uri .= '?' . $pstring;
+ }
+
+ return $uri;
+ }
+
}
diff --git a/lib/atom10entry.php b/lib/atom10entry.php
new file mode 100644
index 000000000..5710c80fc
--- /dev/null
+++ b/lib/atom10entry.php
@@ -0,0 +1,106 @@
+<?php
+/**
+ * StatusNet, the distributed open-source microblogging tool
+ *
+ * Class for building / manipulating an Atom entry in memory
+ *
+ * PHP version 5
+ *
+ * LICENCE: This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @category Feed
+ * @package StatusNet
+ * @author Zach Copley <zach@status.net>
+ * @copyright 2010 StatusNet, Inc.
+ * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
+ * @link http://status.net/
+ */
+
+if (!defined('STATUSNET')
+{
+ exit(1);
+}
+
+class Atom10EntryException extends Exception
+{
+}
+
+/**
+ * Class for manipulating an Atom entry in memory. Get the entry as an XML
+ * string with Atom10Entry::getString().
+ *
+ * @category Feed
+ * @package StatusNet
+ * @author Zach Copley <zach@status.net>
+ * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
+ * @link http://status.net/
+ */
+class Atom10Entry extends XMLStringer
+{
+ private $namespaces;
+ private $categories;
+ private $content;
+ private $contributors;
+ private $id;
+ private $links;
+ private $published;
+ private $rights;
+ private $source;
+ private $summary;
+ private $title;
+
+ function __construct($indent = true) {
+ parent::__construct($indent);
+ $this->namespaces = array();
+ }
+
+ function addNamespace($namespace, $uri)
+ {
+ $ns = array($namespace => $uri);
+ $this->namespaces = array_merge($this->namespaces, $ns);
+ }
+
+ function initEntry()
+ {
+
+ }
+
+ function endEntry()
+ {
+
+ }
+
+ /**
+ * Check that all required elements have been set, etc.
+ * Throws an Atom10EntryException if something's missing.
+ *
+ * @return void
+ */
+ function validate
+ {
+
+ }
+
+ function getString()
+ {
+ $this->validate();
+
+ $this->initEntry();
+ $this->renderEntries();
+ $this->endEntry();
+
+ return $this->xw->outputMemory();
+ }
+
+} \ No newline at end of file
diff --git a/lib/atom10feed.php b/lib/atom10feed.php
new file mode 100644
index 000000000..ccca76a09
--- /dev/null
+++ b/lib/atom10feed.php
@@ -0,0 +1,227 @@
+<?php
+/**
+ * StatusNet, the distributed open-source microblogging tool
+ *
+ * Class for building an Atom feed in memory
+ *
+ * PHP version 5
+ *
+ * LICENCE: This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @category Feed
+ * @package StatusNet
+ * @author Zach Copley <zach@status.net>
+ * @copyright 2010 StatusNet, Inc.
+ * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
+ * @link http://status.net/
+ */
+
+if (!defined('STATUSNET'))
+{
+ exit(1);
+}
+
+class Atom10FeedException extends Exception
+{
+}
+
+/**
+ * Class for building an Atom feed in memory. Get the finished doc
+ * as a string with Atom10Feed::getString().
+ *
+ * @category Feed
+ * @package StatusNet
+ * @author Zach Copley <zach@status.net>
+ * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
+ * @link http://status.net/
+ */
+class Atom10Feed extends XMLStringer
+{
+ public $xw;
+ private $namespaces;
+ private $authors;
+ private $categories;
+ private $contributors;
+ private $generator;
+ private $icon;
+ private $links;
+ private $logo;
+ private $rights;
+ private $subtitle;
+ private $title;
+ private $published;
+ private $updated;
+ private $entries;
+
+ /**
+ * Constructor
+ *
+ * @param boolean $indent flag to turn indenting on or off
+ *
+ * @return void
+ */
+ function __construct($indent = true) {
+ parent::__construct($indent);
+ $this->namespaces = array();
+ $this->links = array();
+ $this->entries = array();
+ $this->addNamespace('xmlns', 'http://www.w3.org/2005/Atom');
+ }
+
+ /**
+ * Add another namespace to the feed
+ *
+ * @param string $namespace the namespace
+ * @param string $uri namspace uri
+ *
+ * @return void
+ */
+ function addNamespace($namespace, $uri)
+ {
+ $ns = array($namespace => $uri);
+ $this->namespaces = array_merge($this->namespaces, $ns);
+ }
+
+ function getNamespaces()
+ {
+ return $this->namespaces;
+ }
+
+ function initFeed()
+ {
+ $this->xw->startDocument('1.0', 'UTF-8');
+ $commonAttrs = array('xml:lang' => 'en-US');
+ $commonAttrs = array_merge($commonAttrs, $this->namespaces);
+ $this->elementStart('feed', $commonAttrs);
+
+ $this->element('id', null, $this->id);
+ $this->element('title', null, $this->title);
+ $this->element('subtitle', null, $this->subtitle);
+
+ if (!empty($this->logo)) {
+ $this->element('logo', null, $this->logo);
+ }
+
+ $this->element('updated', null, $this->updated);
+
+ $this->renderLinks();
+ }
+
+ /**
+ * Check that all required elements have been set, etc.
+ * Throws an Atom10FeedException if something's missing.
+ *
+ * @return void
+ */
+ function validate()
+ {
+ }
+
+ function renderLinks()
+ {
+ foreach ($this->links as $attrs)
+ {
+ $this->element('link', $attrs, null);
+ }
+ }
+
+ function addEntryRaw($entry)
+ {
+ array_push($this->entries, $entry);
+ }
+
+ function addEntry($entry)
+ {
+ array_push($this->entries, $entry->getString());
+ }
+
+ function renderEntries()
+ {
+ foreach ($this->entries as $entry) {
+ $this->raw($entry);
+ }
+ }
+
+ function endFeed()
+ {
+ $this->elementEnd('feed');
+ $this->xw->endDocument();
+ }
+
+ function getString()
+ {
+ $this->validate();
+
+ $this->initFeed();
+ $this->renderEntries();
+ $this->endFeed();
+
+ return $this->xw->outputMemory();
+ }
+
+ function setId($id)
+ {
+ $this->id = $id;
+ }
+
+ function setTitle($title)
+ {
+ $this->title = $title;
+ }
+
+ function setSubtitle($subtitle)
+ {
+ $this->subtitle = $subtitle;
+ }
+
+ function setLogo($logo)
+ {
+ $this->logo = $logo;
+ }
+
+ function setUpdated($dt)
+ {
+ $this->updated = common_date_iso8601($dt);
+ }
+
+ function setPublished($dt)
+ {
+ $this->published = common_date_iso8601($dt);
+ }
+
+ /**
+ * Adds a link element into the Atom document
+ *
+ * Assumes you want rel="alternate" and type="text/html" unless
+ * you send in $otherAttrs.
+ *
+ * @param string $uri the uri the href needs to point to
+ * @param array $otherAttrs other attributes to stick in
+ *
+ * @return void
+ */
+ function addLink($uri, $otherAttrs = null) {
+ $attrs = array('href' => $uri);
+
+ if (is_null($otherAttrs)) {
+ $attrs['rel'] = 'alternate';
+ $attrs['type'] = 'text/html';
+ } else {
+ $attrs = array_merge($attrs, $otherAttrs);
+ }
+
+ array_push($this->links, $attrs);
+ }
+
+}
diff --git a/lib/atomnoticefeed.php b/lib/atomnoticefeed.php
new file mode 100644
index 000000000..34ed44b2e
--- /dev/null
+++ b/lib/atomnoticefeed.php
@@ -0,0 +1,103 @@
+<?php
+/**
+ * StatusNet, the distributed open-source microblogging tool
+ *
+ * Class for building and Atom feed from a collection of notices
+ *
+ * PHP version 5
+ *
+ * LICENCE: This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @category Feed
+ * @package StatusNet
+ * @author Zach Copley <zach@status.net>
+ * @copyright 2010 StatusNet, Inc.
+ * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
+ * @link http://status.net/
+ */
+
+if (!defined('STATUSNET'))
+{
+ exit(1);
+}
+
+/**
+ * Class for creating a feed that represents a collection of notices. Builds the
+ * feed in memory. Get the feed as a string with AtomNoticeFeed::getString().
+ *
+ * @category Feed
+ * @package StatusNet
+ * @author Zach Copley <zach@status.net>
+ * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
+ * @link http://status.net/
+ */
+class AtomNoticeFeed extends Atom10Feed
+{
+ function __construct($indent = true) {
+ parent::__construct($indent);
+
+ // Feeds containing notice info use these namespaces
+
+ $this->addNamespace(
+ 'xmlns:thr',
+ 'http://purl.org/syndication/thread/1.0'
+ );
+
+ $this->addNamespace(
+ 'xmlns:georss',
+ 'http://www.georss.org/georss'
+ );
+
+ $this->addNamespace(
+ 'xmlns:activity',
+ 'http://activitystrea.ms/spec/1.0/'
+ );
+
+ // XXX: What should the uri be?
+ $this->addNamespace(
+ 'xmlns:ostatus',
+ 'http://ostatus.org/schema/1.0'
+ );
+ }
+
+ /**
+ * Add more than one Notice to the feed
+ *
+ * @param mixed $notices an array of Notice objects or handle
+ *
+ */
+ function addEntryFromNotices($notices)
+ {
+ if (is_array($notices)) {
+ foreach ($notices as $notice) {
+ $this->addEntryFromNotice($notice);
+ }
+ } else {
+ while ($notices->fetch()) {
+ $this->addEntryFromNotice($notices);
+ }
+ }
+ }
+
+ /**
+ * Add a single Notice to the feed
+ *
+ * @param Notice $notice a Notice to add
+ */
+ function addEntryFromNotice($notice)
+ {
+ $this->addEntryRaw($notice->asAtomEntry());
+ }
+
+}
diff --git a/lib/default.php b/lib/default.php
index 16d1330f0..cc6863488 100644
--- a/lib/default.php
+++ b/lib/default.php
@@ -88,6 +88,7 @@ $default =
'stomp_manual_failover' => true, // if multiple servers are listed, treat them as separate (enqueue on one randomly, listen on all)
'monitor' => null, // URL to monitor ping endpoint (work in progress)
'softlimit' => '90%', // total size or % of memory_limit at which to restart queue threads gracefully
+ 'spawndelay' => 1, // Wait at least N seconds between (re)spawns of child processes to avoid slamming the queue server with subscription startup
'debug_memory' => false, // true to spit memory usage to log
'inboxes' => true, // true to do inbox distribution & output queueing from in background via 'distrib' queue
),
diff --git a/lib/queuemanager.php b/lib/queuemanager.php
index 274e1c2f6..64bb52e10 100644
--- a/lib/queuemanager.php
+++ b/lib/queuemanager.php
@@ -155,26 +155,26 @@ abstract class QueueManager extends IoManager
}
/**
- * Encode an object for queued storage.
- * Next gen may use serialization.
+ * Encode an object or variable for queued storage.
+ * Notice objects are currently stored as an id reference;
+ * other items are serialized.
*
- * @param mixed $object
+ * @param mixed $item
* @return string
*/
- protected function encode($object)
+ protected function encode($item)
{
- if ($object instanceof Notice) {
- return $object->id;
- } else if (is_string($object)) {
- return $object;
+ if ($item instanceof Notice) {
+ // Backwards compat
+ return $item->id;
} else {
- throw new ServerException("Can't queue this type", 500);
+ return serialize($item);
}
}
/**
* Decode an object from queued storage.
- * Accepts back-compat notice reference entries and strings for now.
+ * Accepts notice reference entries and serialized items.
*
* @param string
* @return mixed
@@ -182,9 +182,23 @@ abstract class QueueManager extends IoManager
protected function decode($frame)
{
if (is_numeric($frame)) {
+ // Back-compat for notices...
return Notice::staticGet(intval($frame));
- } else {
+ } elseif (substr($frame, 0, 1) == '<') {
+ // Back-compat for XML source
return $frame;
+ } else {
+ // Deserialize!
+ #$old = error_reporting();
+ #error_reporting($old & ~E_NOTICE);
+ $out = unserialize($frame);
+ #error_reporting($old);
+
+ if ($out === false && $frame !== 'b:0;') {
+ common_log(LOG_ERR, "Couldn't unserialize queued frame: $frame");
+ return false;
+ }
+ return $out;
}
}
diff --git a/lib/spawningdaemon.php b/lib/spawningdaemon.php
index b1961d688..862cbb4fa 100644
--- a/lib/spawningdaemon.php
+++ b/lib/spawningdaemon.php
@@ -83,6 +83,7 @@ abstract class SpawningDaemon extends Daemon
$this->log(LOG_INFO, "Spawned thread $i as pid $pid");
$children[$i] = $pid;
}
+ sleep(common_config('queue', 'spawndelay'));
}
$this->log(LOG_INFO, "Waiting for children to complete.");
@@ -111,6 +112,7 @@ abstract class SpawningDaemon extends Daemon
$this->log(LOG_INFO, "Respawned thread $i as pid $pid");
$children[$i] = $pid;
}
+ sleep(common_config('queue', 'spawndelay'));
} else {
$this->log(LOG_INFO, "Thread $i pid $pid exited with status $exitCode; closing out thread.");
}
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index 6730cd213..cd62c25bd 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -107,9 +107,10 @@ class StompQueueManager extends QueueManager
$message .= ':' . $param;
}
$this->_connect();
- $result = $this->_send($this->control,
- $message,
- array ('created' => common_sql_now()));
+ $con = $this->cons[$this->defaultIdx];
+ $result = $con->send($this->control,
+ $message,
+ array ('created' => common_sql_now()));
if ($result) {
$this->_log(LOG_INFO, "Sent control ping to queue daemons: $message");
return true;
@@ -368,17 +369,10 @@ class StompQueueManager extends QueueManager
foreach ($this->cons as $i => $con) {
if ($con) {
$this->rollback($i);
- $con->unsubscribe($this->control);
+ $con->disconnect();
+ $this->cons[$i] = null;
}
}
- if ($this->sites) {
- foreach ($this->sites as $server) {
- StatusNet::init($server);
- $this->doUnsubscribe();
- }
- } else {
- $this->doUnsubscribe();
- }
return true;
}
@@ -555,26 +549,14 @@ class StompQueueManager extends QueueManager
}
$host = $this->cons[$idx]->getServer();
- if (is_numeric($frame->body)) {
- $id = intval($frame->body);
- $info = "notice $id posted at {$frame->headers['created']} in queue $queue from $host";
-
- $notice = Notice::staticGet('id', $id);
- if (empty($notice)) {
- $this->_log(LOG_WARNING, "Skipping missing $info");
- $this->ack($idx, $frame);
- $this->commit($idx);
- $this->begin($idx);
- $this->stats('badnotice', $queue);
- return false;
- }
-
- $item = $notice;
- } else {
- // @fixme should we serialize, or json, or what here?
- $info = "string posted at {$frame->headers['created']} in queue $queue from $host";
- $item = $frame->body;
+ $item = $this->decode($frame->body);
+ if (empty($item)) {
+ $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host");
+ return true;
}
+ $info = $this->logrep($item) . " posted at " .
+ $frame->headers['created'] . " in queue $queue from $host";
+ $this->_log(LOG_DEBUG, "Dequeued $info");
$handler = $this->getHandler($queue);
if (!$handler) {
diff --git a/lib/util.php b/lib/util.php
index 879834a3d..e255c5fe0 100644
--- a/lib/util.php
+++ b/lib/util.php
@@ -690,7 +690,7 @@ function common_group_link($sender_id, $nickname)
{
$sender = Profile::staticGet($sender_id);
$group = User_group::getForNickname($nickname);
- if ($group && $sender->isMember($group)) {
+ if ($sender && $group && $sender->isMember($group)) {
$attrs = array('href' => $group->permalink(),
'class' => 'url');
if (!empty($group->fullname)) {