1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """shell/term utilities, useful to write some python scripts instead of shell
19 scripts.
20 """
21 __docformat__ = "restructuredtext en"
22
23 import os
24 import glob
25 import shutil
26 import stat
27 import sys
28 import tempfile
29 import time
30 import fnmatch
31 import errno
32 import string
33 import random
34 from os.path import exists, isdir, islink, basename, join
35
36 from logilab.common import STD_BLACKLIST, _handle_blacklist
37 from logilab.common.compat import raw_input
38 from logilab.common.compat import str_to_bytes
39
40 try:
41 from logilab.common.proc import ProcInfo, NoSuchProcess
42 except ImportError:
43
45
48
49
51
53 self.path = tempfile.mkdtemp()
54 return self.path
55
56 - def __exit__(self, exctype, value, traceback):
57
58 shutil.rmtree(self.path)
59 return traceback is None
60
61
64 self.directory = directory
65
67 self.cwd = os.getcwd()
68 os.chdir(self.directory)
69 return self.directory
70
71 - def __exit__(self, exctype, value, traceback):
73
74
75 -def chown(path, login=None, group=None):
76 """Same as `os.chown` function but accepting user login or group name as
77 argument. If login or group is omitted, it's left unchanged.
78
79 Note: you must own the file to chown it (or be root). Otherwise OSError is raised.
80 """
81 if login is None:
82 uid = -1
83 else:
84 try:
85 uid = int(login)
86 except ValueError:
87 import pwd
88 uid = pwd.getpwnam(login).pw_uid
89 if group is None:
90 gid = -1
91 else:
92 try:
93 gid = int(group)
94 except ValueError:
95 import grp
96 gid = grp.getgrnam(group).gr_gid
97 os.chown(path, uid, gid)
98
99 -def mv(source, destination, _action=shutil.move):
100 """A shell-like mv, supporting wildcards.
101 """
102 sources = glob.glob(source)
103 if len(sources) > 1:
104 assert isdir(destination)
105 for filename in sources:
106 _action(filename, join(destination, basename(filename)))
107 else:
108 try:
109 source = sources[0]
110 except IndexError:
111 raise OSError('No file matching %s' % source)
112 if isdir(destination) and exists(destination):
113 destination = join(destination, basename(source))
114 try:
115 _action(source, destination)
116 except OSError, ex:
117 raise OSError('Unable to move %r to %r (%s)' % (
118 source, destination, ex))
119
121 """A shell-like rm, supporting wildcards.
122 """
123 for wfile in files:
124 for filename in glob.glob(wfile):
125 if islink(filename):
126 os.remove(filename)
127 elif isdir(filename):
128 shutil.rmtree(filename)
129 else:
130 os.remove(filename)
131
132 -def cp(source, destination):
133 """A shell-like cp, supporting wildcards.
134 """
135 mv(source, destination, _action=shutil.copy)
136
138 """Recursively find files ending with the given extensions from the directory.
139
140 :type directory: str
141 :param directory:
142 directory where the search should start
143
144 :type exts: basestring or list or tuple
145 :param exts:
146 extensions or lists or extensions to search
147
148 :type exclude: boolean
149 :param exts:
150 if this argument is True, returning files NOT ending with the given
151 extensions
152
153 :type blacklist: list or tuple
154 :param blacklist:
155 optional list of files or directory to ignore, default to the value of
156 `logilab.common.STD_BLACKLIST`
157
158 :rtype: list
159 :return:
160 the list of all matching files
161 """
162 if isinstance(exts, basestring):
163 exts = (exts,)
164 if exclude:
165 def match(filename, exts):
166 for ext in exts:
167 if filename.endswith(ext):
168 return False
169 return True
170 else:
171 def match(filename, exts):
172 for ext in exts:
173 if filename.endswith(ext):
174 return True
175 return False
176 files = []
177 for dirpath, dirnames, filenames in os.walk(directory):
178 _handle_blacklist(blacklist, dirnames, filenames)
179
180 dirname = basename(dirpath)
181 if dirname in blacklist:
182 continue
183 files.extend([join(dirpath, f) for f in filenames if match(f, exts)])
184 return files
185
186
188 """Recursively finds files matching glob `pattern` under `directory`.
189
190 This is an alternative to `logilab.common.shellutils.find`.
191
192 :type directory: str
193 :param directory:
194 directory where the search should start
195
196 :type pattern: basestring
197 :param pattern:
198 the glob pattern (e.g *.py, foo*.py, etc.)
199
200 :type blacklist: list or tuple
201 :param blacklist:
202 optional list of files or directory to ignore, default to the value of
203 `logilab.common.STD_BLACKLIST`
204
205 :rtype: iterator
206 :return:
207 iterator over the list of all matching files
208 """
209 for curdir, dirnames, filenames in os.walk(directory):
210 _handle_blacklist(blacklist, dirnames, filenames)
211 for fname in fnmatch.filter(filenames, pattern):
212 yield join(curdir, fname)
213
214 -def unzip(archive, destdir):
215 import zipfile
216 if not exists(destdir):
217 os.mkdir(destdir)
218 zfobj = zipfile.ZipFile(archive)
219 for name in zfobj.namelist():
220 if name.endswith('/'):
221 os.mkdir(join(destdir, name))
222 else:
223 outfile = open(join(destdir, name), 'wb')
224 outfile.write(zfobj.read(name))
225 outfile.close()
226
228 """This is a deadlock safe version of popen2 (no stdin), that returns
229 an object with errorlevel, out and err.
230 """
231
233 outfile = tempfile.mktemp()
234 errfile = tempfile.mktemp()
235 self.status = os.system("( %s ) >%s 2>%s" %
236 (command, outfile, errfile)) >> 8
237 self.out = open(outfile, "r").read()
238 self.err = open(errfile, "r").read()
239 os.remove(outfile)
240 os.remove(errfile)
241
242 -def acquire_lock(lock_file, max_try=10, delay=10, max_delay=3600):
243 """Acquire a lock represented by a file on the file system
244
245 If the process written in lock file doesn't exist anymore, we remove the
246 lock file immediately
247 If age of the lock_file is greater than max_delay, then we raise a UserWarning
248 """
249 count = abs(max_try)
250 while count:
251 try:
252 fd = os.open(lock_file, os.O_EXCL | os.O_RDWR | os.O_CREAT)
253 os.write(fd, str_to_bytes(str(os.getpid())) )
254 os.close(fd)
255 return True
256 except OSError, e:
257 if e.errno == errno.EEXIST:
258 try:
259 fd = open(lock_file, "r")
260 pid = int(fd.readline())
261 pi = ProcInfo(pid)
262 age = (time.time() - os.stat(lock_file)[stat.ST_MTIME])
263 if age / max_delay > 1 :
264 raise UserWarning("Command '%s' (pid %s) has locked the "
265 "file '%s' for %s minutes"
266 % (pi.name(), pid, lock_file, age/60))
267 except UserWarning:
268 raise
269 except NoSuchProcess:
270 os.remove(lock_file)
271 except Exception:
272
273
274
275
276 pass
277 else:
278 raise
279 count -= 1
280 time.sleep(delay)
281 else:
282 raise Exception('Unable to acquire %s' % lock_file)
283
285 """Release a lock represented by a file on the file system."""
286 os.remove(lock_file)
287
288
290 """A simple text progression bar."""
291
292 - def __init__(self, nbops, size=20, stream=sys.stdout, title=''):
293 if title:
294 self._fstr = '\r%s [%%-%ss]' % (title, int(size))
295 else:
296 self._fstr = '\r[%%-%ss]' % int(size)
297 self._stream = stream
298 self._total = nbops
299 self._size = size
300 self._current = 0
301 self._progress = 0
302 self._current_text = None
303 self._last_text_write_size = 0
304
305 - def _get_text(self):
306 return self._current_text
307
308 - def _set_text(self, text=None):
309 if text != self._current_text:
310 self._current_text = text
311 self.refresh()
312
313 - def _del_text(self):
315
316 text = property(_get_text, _set_text, _del_text)
317
318 - def update(self, offset=1, exact=False):
319 """Move FORWARD to new cursor position (cursor will never go backward).
320
321 :offset: fraction of ``size``
322
323 :exact:
324
325 - False: offset relative to current cursor position if True
326 - True: offset as an asbsolute position
327
328 """
329 if exact:
330 self._current = offset
331 else:
332 self._current += offset
333
334 progress = int((float(self._current)/float(self._total))*self._size)
335 if progress > self._progress:
336 self._progress = progress
337 self.refresh()
338
340 """Refresh the progression bar display."""
341 self._stream.write(self._fstr % ('.' * min(self._progress, self._size)) )
342 if self._last_text_write_size or self._current_text:
343 template = ' %%-%is' % (self._last_text_write_size)
344 text = self._current_text
345 if text is None:
346 text = ''
347 self._stream.write(template % text)
348 self._last_text_write_size = len(text.rstrip())
349 self._stream.flush()
350
352 self._stream.write('\n')
353 self._stream.flush()
354
355
365
366
367 _MARKER = object()
369
371 self.nbops = nbops
372 self.size = size
373 self.stream = stream
374 self.title = title
375 self.enabled = enabled
376
378 if self.enabled:
379 kwargs = {}
380 for attr in ('nbops', 'size', 'stream', 'title'):
381 value = getattr(self, attr)
382 if value is not _MARKER:
383 kwargs[attr] = value
384 self.pb = ProgressBar(**kwargs)
385 else:
386 self.pb = DummyProgressBar()
387 return self.pb
388
389 - def __exit__(self, exc_type, exc_val, exc_tb):
391
436
437 ASK = RawInput()
438
439
441 """avoid using os.getlogin() because of strange tty / stdin problems
442 (man 3 getlogin)
443 Another solution would be to use $LOGNAME, $USER or $USERNAME
444 """
445 if sys.platform != 'win32':
446 import pwd
447 return pwd.getpwuid(os.getuid())[0]
448 else:
449 return os.environ['USERNAME']
450
452 """dumb password generation function"""
453 pwd = ''
454 for i in xrange(length):
455 pwd += random.choice(vocab)
456 return pwd
457