1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
|
"""Logging
"""
import sys
import logging
class Logger(object):
"""
Logging object for use in command-line script. Allows ranges of
levels, to avoid some redundancy of displayed information.
"""
VERBOSE_DEBUG = logging.DEBUG-1
DEBUG = logging.DEBUG
INFO = logging.INFO
NOTIFY = (logging.INFO+logging.WARN)/2
WARN = WARNING = logging.WARN
ERROR = logging.ERROR
FATAL = logging.FATAL
LEVELS = [VERBOSE_DEBUG, DEBUG, INFO, NOTIFY, WARN, ERROR, FATAL]
def __init__(self):
self.consumers = []
self.indent = 0
self.explicit_levels = False
self.in_progress = None
self.in_progress_hanging = False
def debug(self, msg, *args, **kw):
self.log(self.DEBUG, msg, *args, **kw)
def info(self, msg, *args, **kw):
self.log(self.INFO, msg, *args, **kw)
def notify(self, msg, *args, **kw):
self.log(self.NOTIFY, msg, *args, **kw)
def warn(self, msg, *args, **kw):
self.log(self.WARN, msg, *args, **kw)
def error(self, msg, *args, **kw):
self.log(self.WARN, msg, *args, **kw)
def fatal(self, msg, *args, **kw):
self.log(self.FATAL, msg, *args, **kw)
def log(self, level, msg, *args, **kw):
if args:
if kw:
raise TypeError(
"You may give positional or keyword arguments, not both")
args = args or kw
rendered = None
for consumer_level, consumer in self.consumers:
if self.level_matches(level, consumer_level):
if (self.in_progress_hanging
and consumer in (sys.stdout, sys.stderr)):
self.in_progress_hanging = False
sys.stdout.write('\n')
sys.stdout.flush()
if rendered is None:
if args:
rendered = msg % args
else:
rendered = msg
rendered = ' '*self.indent + rendered
if self.explicit_levels:
## FIXME: should this be a name, not a level number?
rendered = '%02i %s' % (level, rendered)
if hasattr(consumer, 'write'):
consumer.write(rendered+'\n')
else:
consumer(rendered)
def start_progress(self, msg):
assert not self.in_progress, (
"Tried to start_progress(%r) while in_progress %r"
% (msg, self.in_progress))
if self.level_matches(self.NOTIFY, self._stdout_level()):
sys.stdout.write(' '*self.indent + msg)
sys.stdout.flush()
self.in_progress_hanging = True
else:
self.in_progress_hanging = False
self.in_progress = msg
self.last_message = None
def end_progress(self, msg='done.'):
assert self.in_progress, (
"Tried to end_progress without start_progress")
if self.stdout_level_matches(self.NOTIFY):
if not self.in_progress_hanging:
# Some message has been printed out since start_progress
sys.stdout.write('...' + self.in_progress + msg + '\n')
sys.stdout.flush()
else:
# These erase any messages shown with show_progress (besides .'s)
logger.show_progress('')
logger.show_progress('')
sys.stdout.write(msg + '\n')
sys.stdout.flush()
self.in_progress = None
self.in_progress_hanging = False
def show_progress(self, message=None):
"""If we are in a progress scope, and no log messages have been
shown, write out another '.'"""
if self.in_progress_hanging:
if message is None:
sys.stdout.write('.')
sys.stdout.flush()
else:
if self.last_message:
padding = ' ' * max(0, len(self.last_message)-len(message))
else:
padding = ''
sys.stdout.write('\r%s%s%s%s' % (' '*self.indent, self.in_progress, message, padding))
sys.stdout.flush()
self.last_message = message
def stdout_level_matches(self, level):
"""Returns true if a message at this level will go to stdout"""
return self.level_matches(level, self._stdout_level())
def _stdout_level(self):
"""Returns the level that stdout runs at"""
for level, consumer in self.consumers:
if consumer is sys.stdout:
return level
return self.FATAL
def level_matches(self, level, consumer_level):
"""
>>> l = Logger()
>>> l.level_matches(3, 4)
False
>>> l.level_matches(3, 2)
True
>>> l.level_matches(slice(None, 3), 3)
False
>>> l.level_matches(slice(None, 3), 2)
True
>>> l.level_matches(slice(1, 3), 1)
True
>>> l.level_matches(slice(2, 3), 1)
False
"""
if isinstance(level, slice):
start, stop = level.start, level.stop
if start is not None and start > consumer_level:
return False
if stop is not None or stop <= consumer_level:
return False
return True
else:
return level >= consumer_level
@classmethod
def level_for_integer(cls, level):
levels = cls.LEVELS
if level < 0:
return levels[0]
if level >= len(levels):
return levels[-1]
return levels[level]
def move_stdout_to_stderr(self):
to_remove = []
to_add = []
for consumer_level, consumer in self.consumers:
if consumer == sys.stdout:
to_remove.append((consumer_level, consumer))
to_add.append((consumer_level, sys.stderr))
for item in to_remove:
self.consumers.remove(item)
self.consumers.extend(to_add)
logger = Logger()
|