Package logilab :: Package common :: Module shellutils
[frames] | no frames]

Source Code for Module logilab.common.shellutils

  1  # copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. 
  2  # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr 
  3  # 
  4  # This file is part of logilab-common. 
  5  # 
  6  # logilab-common is free software: you can redistribute it and/or modify it under 
  7  # the terms of the GNU Lesser General Public License as published by the Free 
  8  # Software Foundation, either version 2.1 of the License, or (at your option) any 
  9  # later version. 
 10  # 
 11  # logilab-common is distributed in the hope that it will be useful, but WITHOUT 
 12  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
 13  # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
 14  # details. 
 15  # 
 16  # You should have received a copy of the GNU Lesser General Public License along 
 17  # with logilab-common.  If not, see <http://www.gnu.org/licenses/>. 
 18  """shell/term utilities, useful to write some python scripts instead of shell 
 19  scripts. 
 20  """ 
 21  __docformat__ = "restructuredtext en" 
 22   
 23  import os 
 24  import glob 
 25  import shutil 
 26  import stat 
 27  import sys 
 28  import tempfile 
 29  import time 
 30  import fnmatch 
 31  import errno 
 32  import string 
 33  import random 
 34  from os.path import exists, isdir, islink, basename, join 
 35   
 36  from logilab.common import STD_BLACKLIST, _handle_blacklist 
 37  from logilab.common.compat import raw_input 
 38  from logilab.common.compat import str_to_bytes 
 39   
 40  try: 
 41      from logilab.common.proc import ProcInfo, NoSuchProcess 
 42  except ImportError: 
 43      # windows platform 
44 - class NoSuchProcess(Exception): pass
45
46 - def ProcInfo(pid):
47 raise NoSuchProcess()
48 49
50 -class tempdir(object):
51
52 - def __enter__(self):
53 self.path = tempfile.mkdtemp() 54 return self.path
55
56 - def __exit__(self, exctype, value, traceback):
57 # rmtree in all cases 58 shutil.rmtree(self.path) 59 return traceback is None
60 61
62 -class pushd(object):
63 - def __init__(self, directory):
64 self.directory = directory
65
66 - def __enter__(self):
67 self.cwd = os.getcwd() 68 os.chdir(self.directory) 69 return self.directory
70
71 - def __exit__(self, exctype, value, traceback):
72 os.chdir(self.cwd)
73 74
75 -def chown(path, login=None, group=None):
76 """Same as `os.chown` function but accepting user login or group name as 77 argument. If login or group is omitted, it's left unchanged. 78 79 Note: you must own the file to chown it (or be root). Otherwise OSError is raised. 80 """ 81 if login is None: 82 uid = -1 83 else: 84 try: 85 uid = int(login) 86 except ValueError: 87 import pwd # Platforms: Unix 88 uid = pwd.getpwnam(login).pw_uid 89 if group is None: 90 gid = -1 91 else: 92 try: 93 gid = int(group) 94 except ValueError: 95 import grp 96 gid = grp.getgrnam(group).gr_gid 97 os.chown(path, uid, gid)
98
99 -def mv(source, destination, _action=shutil.move):
100 """A shell-like mv, supporting wildcards. 101 """ 102 sources = glob.glob(source) 103 if len(sources) > 1: 104 assert isdir(destination) 105 for filename in sources: 106 _action(filename, join(destination, basename(filename))) 107 else: 108 try: 109 source = sources[0] 110 except IndexError: 111 raise OSError('No file matching %s' % source) 112 if isdir(destination) and exists(destination): 113 destination = join(destination, basename(source)) 114 try: 115 _action(source, destination) 116 except OSError, ex: 117 raise OSError('Unable to move %r to %r (%s)' % ( 118 source, destination, ex))
119
120 -def rm(*files):
121 """A shell-like rm, supporting wildcards. 122 """ 123 for wfile in files: 124 for filename in glob.glob(wfile): 125 if islink(filename): 126 os.remove(filename) 127 elif isdir(filename): 128 shutil.rmtree(filename) 129 else: 130 os.remove(filename)
131
132 -def cp(source, destination):
133 """A shell-like cp, supporting wildcards. 134 """ 135 mv(source, destination, _action=shutil.copy)
136
137 -def find(directory, exts, exclude=False, blacklist=STD_BLACKLIST):
138 """Recursively find files ending with the given extensions from the directory. 139 140 :type directory: str 141 :param directory: 142 directory where the search should start 143 144 :type exts: basestring or list or tuple 145 :param exts: 146 extensions or lists or extensions to search 147 148 :type exclude: boolean 149 :param exts: 150 if this argument is True, returning files NOT ending with the given 151 extensions 152 153 :type blacklist: list or tuple 154 :param blacklist: 155 optional list of files or directory to ignore, default to the value of 156 `logilab.common.STD_BLACKLIST` 157 158 :rtype: list 159 :return: 160 the list of all matching files 161 """ 162 if isinstance(exts, basestring): 163 exts = (exts,) 164 if exclude: 165 def match(filename, exts): 166 for ext in exts: 167 if filename.endswith(ext): 168 return False 169 return True
170 else: 171 def match(filename, exts): 172 for ext in exts: 173 if filename.endswith(ext): 174 return True 175 return False 176 files = [] 177 for dirpath, dirnames, filenames in os.walk(directory): 178 _handle_blacklist(blacklist, dirnames, filenames) 179 # don't append files if the directory is blacklisted 180 dirname = basename(dirpath) 181 if dirname in blacklist: 182 continue 183 files.extend([join(dirpath, f) for f in filenames if match(f, exts)]) 184 return files 185 186
187 -def globfind(directory, pattern, blacklist=STD_BLACKLIST):
188 """Recursively finds files matching glob `pattern` under `directory`. 189 190 This is an alternative to `logilab.common.shellutils.find`. 191 192 :type directory: str 193 :param directory: 194 directory where the search should start 195 196 :type pattern: basestring 197 :param pattern: 198 the glob pattern (e.g *.py, foo*.py, etc.) 199 200 :type blacklist: list or tuple 201 :param blacklist: 202 optional list of files or directory to ignore, default to the value of 203 `logilab.common.STD_BLACKLIST` 204 205 :rtype: iterator 206 :return: 207 iterator over the list of all matching files 208 """ 209 for curdir, dirnames, filenames in os.walk(directory): 210 _handle_blacklist(blacklist, dirnames, filenames) 211 for fname in fnmatch.filter(filenames, pattern): 212 yield join(curdir, fname)
213
214 -def unzip(archive, destdir):
215 import zipfile 216 if not exists(destdir): 217 os.mkdir(destdir) 218 zfobj = zipfile.ZipFile(archive) 219 for name in zfobj.namelist(): 220 if name.endswith('/'): 221 os.mkdir(join(destdir, name)) 222 else: 223 outfile = open(join(destdir, name), 'wb') 224 outfile.write(zfobj.read(name)) 225 outfile.close()
226
227 -class Execute:
228 """This is a deadlock safe version of popen2 (no stdin), that returns 229 an object with errorlevel, out and err. 230 """ 231
232 - def __init__(self, command):
233 outfile = tempfile.mktemp() 234 errfile = tempfile.mktemp() 235 self.status = os.system("( %s ) >%s 2>%s" % 236 (command, outfile, errfile)) >> 8 237 self.out = open(outfile, "r").read() 238 self.err = open(errfile, "r").read() 239 os.remove(outfile) 240 os.remove(errfile)
241
242 -def acquire_lock(lock_file, max_try=10, delay=10, max_delay=3600):
243 """Acquire a lock represented by a file on the file system 244 245 If the process written in lock file doesn't exist anymore, we remove the 246 lock file immediately 247 If age of the lock_file is greater than max_delay, then we raise a UserWarning 248 """ 249 count = abs(max_try) 250 while count: 251 try: 252 fd = os.open(lock_file, os.O_EXCL | os.O_RDWR | os.O_CREAT) 253 os.write(fd, str_to_bytes(str(os.getpid())) ) 254 os.close(fd) 255 return True 256 except OSError, e: 257 if e.errno == errno.EEXIST: 258 try: 259 fd = open(lock_file, "r") 260 pid = int(fd.readline()) 261 pi = ProcInfo(pid) 262 age = (time.time() - os.stat(lock_file)[stat.ST_MTIME]) 263 if age / max_delay > 1 : 264 raise UserWarning("Command '%s' (pid %s) has locked the " 265 "file '%s' for %s minutes" 266 % (pi.name(), pid, lock_file, age/60)) 267 except UserWarning: 268 raise 269 except NoSuchProcess: 270 os.remove(lock_file) 271 except Exception: 272 # The try block is not essential. can be skipped. 273 # Note: ProcInfo object is only available for linux 274 # process information are not accessible... 275 # or lock_file is no more present... 276 pass 277 else: 278 raise 279 count -= 1 280 time.sleep(delay) 281 else: 282 raise Exception('Unable to acquire %s' % lock_file)
283
284 -def release_lock(lock_file):
285 """Release a lock represented by a file on the file system.""" 286 os.remove(lock_file)
287 288
289 -class ProgressBar(object):
290 """A simple text progression bar.""" 291
292 - def __init__(self, nbops, size=20, stream=sys.stdout, title=''):
293 if title: 294 self._fstr = '\r%s [%%-%ss]' % (title, int(size)) 295 else: 296 self._fstr = '\r[%%-%ss]' % int(size) 297 self._stream = stream 298 self._total = nbops 299 self._size = size 300 self._current = 0 301 self._progress = 0 302 self._current_text = None 303 self._last_text_write_size = 0
304
305 - def _get_text(self):
306 return self._current_text
307
308 - def _set_text(self, text=None):
309 if text != self._current_text: 310 self._current_text = text 311 self.refresh()
312
313 - def _del_text(self):
314 self.text = None
315 316 text = property(_get_text, _set_text, _del_text) 317
318 - def update(self, offset=1, exact=False):
319 """Move FORWARD to new cursor position (cursor will never go backward). 320 321 :offset: fraction of ``size`` 322 323 :exact: 324 325 - False: offset relative to current cursor position if True 326 - True: offset as an asbsolute position 327 328 """ 329 if exact: 330 self._current = offset 331 else: 332 self._current += offset 333 334 progress = int((float(self._current)/float(self._total))*self._size) 335 if progress > self._progress: 336 self._progress = progress 337 self.refresh()
338
339 - def refresh(self):
340 """Refresh the progression bar display.""" 341 self._stream.write(self._fstr % ('.' * min(self._progress, self._size)) ) 342 if self._last_text_write_size or self._current_text: 343 template = ' %%-%is' % (self._last_text_write_size) 344 text = self._current_text 345 if text is None: 346 text = '' 347 self._stream.write(template % text) 348 self._last_text_write_size = len(text.rstrip()) 349 self._stream.flush()
350
351 - def finish(self):
352 self._stream.write('\n') 353 self._stream.flush()
354 355
356 -class DummyProgressBar(object):
357 __slot__ = ('text',) 358
359 - def refresh(self):
360 pass
361 - def update(self):
362 pass
363 - def finish(self):
364 pass
365 366 367 _MARKER = object()
368 -class progress(object):
369
370 - def __init__(self, nbops=_MARKER, size=_MARKER, stream=_MARKER, title=_MARKER, enabled=True):
371 self.nbops = nbops 372 self.size = size 373 self.stream = stream 374 self.title = title 375 self.enabled = enabled
376
377 - def __enter__(self):
378 if self.enabled: 379 kwargs = {} 380 for attr in ('nbops', 'size', 'stream', 'title'): 381 value = getattr(self, attr) 382 if value is not _MARKER: 383 kwargs[attr] = value 384 self.pb = ProgressBar(**kwargs) 385 else: 386 self.pb = DummyProgressBar() 387 return self.pb
388
389 - def __exit__(self, exc_type, exc_val, exc_tb):
390 self.pb.finish()
391
392 -class RawInput(object):
393
394 - def __init__(self, input=None, printer=None):
395 self._input = input or raw_input 396 self._print = printer
397
398 - def ask(self, question, options, default):
399 assert default in options 400 choices = [] 401 for option in options: 402 if option == default: 403 label = option[0].upper() 404 else: 405 label = option[0].lower() 406 if len(option) > 1: 407 label += '(%s)' % option[1:].lower() 408 choices.append((option, label)) 409 prompt = "%s [%s]: " % (question, 410 '/'.join([opt[1] for opt in choices])) 411 tries = 3 412 while tries > 0: 413 answer = self._input(prompt).strip().lower() 414 if not answer: 415 return default 416 possible = [option for option, label in choices 417 if option.lower().startswith(answer)] 418 if len(possible) == 1: 419 return possible[0] 420 elif len(possible) == 0: 421 msg = '%s is not an option.' % answer 422 else: 423 msg = ('%s is an ambiguous answer, do you mean %s ?' % ( 424 answer, ' or '.join(possible))) 425 if self._print: 426 self._print(msg) 427 else: 428 print msg 429 tries -= 1 430 raise Exception('unable to get a sensible answer')
431
432 - def confirm(self, question, default_is_yes=True):
433 default = default_is_yes and 'y' or 'n' 434 answer = self.ask(question, ('y', 'n'), default) 435 return answer == 'y'
436 437 ASK = RawInput() 438 439
440 -def getlogin():
441 """avoid using os.getlogin() because of strange tty / stdin problems 442 (man 3 getlogin) 443 Another solution would be to use $LOGNAME, $USER or $USERNAME 444 """ 445 if sys.platform != 'win32': 446 import pwd # Platforms: Unix 447 return pwd.getpwuid(os.getuid())[0] 448 else: 449 return os.environ['USERNAME']
450
451 -def generate_password(length=8, vocab=string.ascii_letters + string.digits):
452 """dumb password generation function""" 453 pwd = '' 454 for i in xrange(length): 455 pwd += random.choice(vocab) 456 return pwd
457