summaryrefslogtreecommitdiff
path: root/classes/Inbox.php
blob: a1ab6215fd3a018cfcd2724522443e1fbb8d888f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
<?php
/**
 * StatusNet, the distributed open-source microblogging tool
 *
 * Data class for user location preferences
 *
 * PHP version 5
 *
 * LICENCE: This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 * @category  Data
 * @package   StatusNet
 * @author    Evan Prodromou <evan@status.net>
 * @copyright 2009 StatusNet Inc.
 * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
 * @link      http://status.net/
 */

require_once INSTALLDIR.'/classes/Memcached_DataObject.php';

class Inbox extends Memcached_DataObject
{
    const BOXCAR = 128;
    const MAX_NOTICES = 1024;

    ###START_AUTOCODE
    /* the code below is auto generated do not remove the above tag */

    public $__table = 'inbox';                           // table name
    public $user_id;                         // int(4)  primary_key not_null
    public $notice_ids;                      // blob

    /* Static get */
    function staticGet($k,$v=NULL) { return Memcached_DataObject::staticGet('Inbox',$k,$v); }

    /* the code above is auto generated do not remove the tag below */
    ###END_AUTOCODE

    function sequenceKey()
    {
        return array(false, false, false);
    }

    /**
     * Create a new inbox from existing Notice_inbox stuff
     */
    static function initialize($user_id)
    {
        $inbox = Inbox::fromNoticeInbox($user_id);

        unset($inbox->fake);

        $result = $inbox->insert();

        if (!$result) {
            common_log_db_error($inbox, 'INSERT', __FILE__);
            return null;
        }

        return $inbox;
    }

    static function fromNoticeInbox($user_id)
    {
        $ids = array();

        $ni = new Notice_inbox();

        $ni->user_id = $user_id;
        $ni->selectAdd();
        $ni->selectAdd('notice_id');
        $ni->orderBy('notice_id DESC');
        $ni->limit(0, self::MAX_NOTICES);

        if ($ni->find()) {
            while($ni->fetch()) {
                $ids[] = $ni->notice_id;
            }
        }

        $ni->free();
        unset($ni);

        $inbox = new Inbox();

        $inbox->user_id = $user_id;
        $inbox->pack($ids);
        $inbox->fake = true;

        return $inbox;
    }

    /**
     * Append the given notice to the given user's inbox.
     * Caching updates are managed for the inbox itself.
     *
     * If the notice is already in this inbox, the second
     * add will be silently dropped.
     *
     * @param int @user_id
     * @param int $notice_id
     * @return boolean success
     */
    static function insertNotice($user_id, $notice_id)
    {
        // Going straight to the DB rather than trusting our caching
        // during an update. Note: not using DB_DataObject::staticGet,
        // which is unsafe to use directly (in-process caching causes
        // memory leaks, which accumulate in queue processes).
        $inbox = new Inbox();
        if (!$inbox->get('user_id', $user_id)) {
            $inbox = Inbox::initialize($user_id);
        }

        if (empty($inbox)) {
            return false;
        }

        $ids = $inbox->unpack();
        if (in_array(intval($notice_id), $ids)) {
            // Already in there, we probably re-ran some inbox adds
            // due to an error. Skip the dupe silently.
            return true;
        }

        $result = $inbox->query(sprintf('UPDATE inbox '.
                                        'set notice_ids = concat(cast(0x%08x as binary(4)), '.
                                        'substr(notice_ids, 1, %d)) '.
                                        'WHERE user_id = %d',
                                        $notice_id,
                                        4 * (self::MAX_NOTICES - 1),
                                        $user_id));

        if ($result) {
            self::blow('inbox:user_id:%d', $user_id);
        }

        return $result;
    }

    static function bulkInsert($notice_id, $user_ids)
    {
        foreach ($user_ids as $user_id)
        {
            Inbox::insertNotice($user_id, $notice_id);
        }
    }

    function stream($user_id, $offset, $limit, $since_id, $max_id, $own=false)
    {
        $inbox = Inbox::staticGet('user_id', $user_id);

        if (empty($inbox)) {
            $inbox = Inbox::fromNoticeInbox($user_id);
            if (empty($inbox)) {
                return array();
            } else {
                $inbox->encache();
            }
        }

        $ids = $inbox->unpack();

        if (!empty($since_id)) {
            $newids = array();
            foreach ($ids as $id) {
                if ($id > $since_id) {
                    $newids[] = $id;
                }
            }
            $ids = $newids;
        }

        if (!empty($max_id)) {
            $newids = array();
            foreach ($ids as $id) {
                if ($id <= $max_id) {
                    $newids[] = $id;
                }
            }
            $ids = $newids;
        }

        $ids = array_slice($ids, $offset, $limit);

        return $ids;
    }

    /**
     * Wrapper for Inbox::stream() and Notice::getStreamByIds() returning
     * additional items up to the limit if we were short due to deleted
     * notices still being listed in the inbox.
     *
     * The fast path (when no items are deleted) should be just as fast; the
     * offset parameter is applied *before* lookups for maximum efficiency.
     *
     * This means offset-based paging may show duplicates, but similar behavior
     * already exists when new notices are posted between page views, so we
     * think people will be ok with this until id-based paging is introduced
     * to the user interface.
     *
     * @param int $user_id
     * @param int $offset skip past the most recent N notices (after since_id checks)
     * @param int $limit
     * @param mixed $since_id return only notices after but not including this id
     * @param mixed $max_id return only notices up to and including this id
     * @param mixed $own ignored?
     * @return array of Notice objects
     *
     * @todo consider repacking the inbox when this happens?
     * @fixme reimplement $own if we need it?
     */
    function streamNotices($user_id, $offset, $limit, $since_id, $max_id, $own=false)
    {
        $ids = self::stream($user_id, $offset, self::MAX_NOTICES, $since_id, $max_id, $own);

        // Do a bulk lookup for the first $limit items
        // Fast path when nothing's deleted.
        $firstChunk = array_slice($ids, 0, $limit);
        $notices = Notice::getStreamByIds($firstChunk);

        $wanted = count($firstChunk); // raw entry count in the inbox up to our $limit
        if ($notices->N >= $wanted) {
            return $notices;
        }

        // There were deleted notices, we'll need to look for more.
        assert($notices instanceof ArrayWrapper);
        $items = $notices->_items;
        $remainder = array_slice($ids, $limit);

        while (count($items) < $wanted && count($remainder) > 0) {
            $notice = Notice::staticGet(array_shift($remainder));
            if ($notice) {
                $items[] = $notice;
            } else {
            }
        }
        return new ArrayWrapper($items);
    }

    /**
     * Saves a list of integer notice_ids into a packed blob in this object.
     * @param array $ids list of integer notice_ids
     */
    protected function pack(array $ids)
    {
        $this->notice_ids = call_user_func_array('pack', array_merge(array('N*'), $ids));
    }

    /**
     * @return array of integer notice_ids
     */
    protected function unpack()
    {
        return unpack('N*', $this->notice_ids);
    }
}