Source code for fixie.tools

"""Various helper tools for fixie services."""
import os
import time
import errno
import subprocess
import multiprocessing
from contextlib import contextmanager

import tornado.gen
import tornado.ioloop
from tornado.httpclient import AsyncHTTPClient
from lazyasd import lazyobject

from fixie.environ import ENV
from fixie.logger import LOGGER
import fixie.jsonutils as json


[docs]@tornado.gen.coroutine def fetch(url, obj): """Asynrochously fetches a fixie URL, using the standard fixie interface (POST method, fixie JSON utilties). This fetch functions accepts a Python object, rather than a string for its body. """ body = json.encode(obj) http_client = AsyncHTTPClient() response = yield http_client.fetch(url, method='POST', body=body) assert response.code == 200 rtn = json.decode(response.body) return rtn
@lazyobject def CREDS_CACHE(): from fixie_creds.cache import CACHE return CACHE
[docs]def verify_user_local(user, token): """Verifies a user via the local (in process) credetialling service.""" return CREDS_CACHE.verify(user, token)
[docs]def verify_user_remote(user, token, base_url): """Verifies a user via a remote credentialling service. This runs syncronously.""" url = base_url + '/verify' body = {'user': user, 'token': token} rtn = tornado.ioloop.IOLoop.current().run_sync(lambda: fetch(url, body)) return rtn['verified'], rtn['message'], rtn['status']
[docs]def verify_user(user, token, url=None): """verifies a user/token pair. This happens either locally (if creds is available) or remotely (if $FIXIE_CREDS_URL was provided). """ url = ENV.get('FIXIE_CREDS_URL', '') if url is None else url if url: return verify_user(user, token, url) else: return verify_user_local(user, token)
[docs]@contextmanager def flock(filename, timeout=None, sleepfor=0.1, raise_errors=True): """A context manager for locking a file via the filesystem. This yeilds the file descriptor of the lockfile. If raise_errors is False and an exception would have been raised, a file descriptor of zero is yielded instead. """ fd = 0 lockfile = filename + '.lock' t0 = time.time() while True: try: fd = os.open(lockfile, os.O_CREAT|os.O_EXCL|os.O_RDWR) break except OSError as e: if e.errno != errno.EEXIST: if raise_errors: raise else: break elif (time.time() - t0) >= timeout: if raise_errors: raise TimeoutError(lockfile + " could not be obtained in time.") else: break time.sleep(sleepfor) yield fd if fd == 0: return os.close(fd) os.unlink(lockfile)
[docs]def next_jobid(timeout=None, sleepfor=0.1, raise_errors=True): """Obtains the next jobid from the $FIXIE_JOBID_FILE and increments the value in $FIXIE_JOBID_FILE. A None value means that the jobid could not be obtained in time. """ f = ENV['FIXIE_JOBID_FILE'] with flock(f, timeout=timeout, sleepfor=sleepfor, raise_errors=raise_errors) as lockfd: if lockfd == 0: return if os.path.isfile(f): with open(f) as fh: curr = fh.read() curr = int(curr.strip() or 0) else: curr = 0 inc = str(curr + 1) with open(f, 'w') as fh: fh.write(inc) return curr
[docs]def register_job_alias(jobid, user, name='', project='', timeout=None, sleepfor=0.1, raise_errors=True): """Registers a job id, user, name, and project in the global jobs alias cache. Returns whether the registration was successful or not. """ f = ENV['FIXIE_JOB_ALIASES_FILE'] with flock(f, timeout=timeout, sleepfor=sleepfor, raise_errors=raise_errors) as lockfd: if lockfd == 0: return False # obtain the current contents if os.path.isfile(f): with open(f) as fh: s = fh.read() if s.strip(): cache = json.loads(s) else: cache = {} else: cache = {} # add the entry as approriate if user not in cache: cache[user] = {} u = cache[user] if project not in u: u[project] = {} p = u[project] if name not in p: p[name] = set() p[name].add(jobid) # write the file back out with open(f, 'w') as fh: json.dump(cache, fh) return True
[docs]def jobids_from_alias(user, name='', project='', timeout=None, sleepfor=0.1, raise_errors=True): """Obtains a set of job ids from user, name, and project informnation. This looks up information in the the global jobs alias cache. Returns a set of jobids. """ f = ENV['FIXIE_JOB_ALIASES_FILE'] with flock(f, timeout=timeout, sleepfor=sleepfor, raise_errors=raise_errors) as lockfd: if lockfd == 0: return set() # obtain the current contents if os.path.isfile(f): with open(f) as fh: cache = json.load(fh) else: return set() # add the entry as approriate if user not in cache: return set() u = cache[user] if project not in u: return set() p = u[project] if name not in p: return set() return p[name]
[docs]def jobids_with_name(name, project='', timeout=None, sleepfor=0.1, raise_errors=True): """Obtains a set of job ids across all users and projects that has a given name. This looks up information in the the global jobs alias cache. Returns a set of jobids. """ f = ENV['FIXIE_JOB_ALIASES_FILE'] with flock(f, timeout=timeout, sleepfor=sleepfor, raise_errors=raise_errors) as lockfd: if lockfd == 0: return set() # obtain the current contents if os.path.isfile(f): with open(f) as fh: cache = json.load(fh) else: return set() # add the entry as approriate jobids = set() for user in cache.values(): for project in user.values(): j = project.get(name, None) if j is not None: jobids |= j return jobids
[docs]def detached_call(args, stdout=None, stderr=None, stdin=None, env=None, **kwargs): """Runs a process and detaches it from its parent (i.e. the current process). In the parent process, this will return the PID of the child. By default, this will return redirect all streams to os.devnull. Additionally, if an environment is not provided, the current fixie environment is passed in. If close_fds is provided, it must be True. All other kwargs are passed through to Popen. Inspired by detach.call(), Copyright (c) 2014 Ryan Bourgeois. """ env = ENV.detype() if env is None else env stdin = os.open(os.devnull, os.O_RDONLY) if stdin is None else stdin stdout = os.open(os.devnull, os.O_WRONLY) if stdout is None else stdout stderr = os.open(os.devnull, os.O_WRONLY) if stderr is None else stderr if not kwargs.get('close_fds', True): raise RuntimeError('close_fds must be True.') shared_pid = multiprocessing.Value('i', 0) pid = os.fork() if pid == 0: # in child os.setsid() proc = subprocess.Popen(args, stdout=stdout, stderr=stderr, stdin=stdin, close_fds=True, env=env) shared_pid.value = proc.pid os._exit(0) else: # in parent os.waitpid(pid, 0) child_pid = shared_pid.value del shared_pid return child_pid
[docs]def waitpid(pid, timeout=None, sleepfor=0.001, raise_errors=True): """Waits for a PID, even if if it isn't a child of the current process. Returns a boolean flag for whether the waiting was successfull or not. """ rtn = False t0 = time.time() while True: try: os.kill(pid, 0) except OSError as e: if e.errno != errno.EPERM: rtn = True break if timeout is not None and (time.time() - t0) >= timeout: if raise_errors: raise TimeoutError('wait time for PID exceeded') else: rtn = False break time.sleep(sleepfor) return rtn
[docs]def default_path(path, name='', project='', jobid=-1, ext='.h5'): """Returns a default path name, depending on specified parameters.""" if path: # deal with non-empty paths if not path.startswith('/'): path = '/' + path return path # deal with empty path names path = '/' if project: path += project + '/' if name: path += name elif isinstance(jobid, int): if jobid < 0: raise ValueError('jobid must be non-negative, got ' + str(jobid)) path += str(jobid) elif isinstance(jobid, str): path += jobid else: msg = 'name ({0!r}) and/or jobid ({1!r}) not recognized' raise ValueError(msg.format(name, jobid)) path += ext return path