Source code for fixie_batch.simulations

"""Tools for spawning, canceling, and querying cyclus simulations. Simulations are
spawned detached from the parent process. All information about running processes
is stored on the file system. These tools should be robust to the server or other
managing process going down.
"""
import os
import json
import time
import signal
from collections.abc import Mapping, Set

from pprintpp import pformat
from lazyasd import lazyobject
from fixie import (ENV, verify_user, next_jobid, detached_call,
    register_job_alias, jobids_from_alias, jobids_with_name, default_path)

from fixie_batch.environ import QUEUE_STATUSES


SPAWN_XSH = """#!/usr/bin/env xonsh
import os
import json
import time


def queued_ids():
    # sorted jobids that are in the queue
    qids = [int(j.name[:-5]) for j in os.scandir('{{FIXIE_QUEUED_JOBS_DIR}}')]
    qids.sort()
    return qids


# variables from calling process
simulation = {{simulation}}

# derived variables
out = '{{FIXIE_SIMS_DIR}}/{{jobid}}.h5'
inp = json.dumps(simulation, sort_keys=True)

# make a jobs file and add it to the queue
job = {
    'interactive': {{interactive}},
    'jobid': {{jobid}},
    'notify': {{notify}},
    'outfile': out,
    'path': '{{path}}',
    'pid': os.getpid(),
    'permissions': {{permissions}},
    'post': {{post}},
    'project': '{{project}}',
    'queue_starttime': time.time(),
    'simulation': simulation,
    'user': '{{user}}',
    }
with open('{{FIXIE_QUEUED_JOBS_DIR}}/{{jobid}}.json', 'w') as f:
    json.dump(job, f, sort_keys=True, indent=1)

# wait for the queue to be free, and then move the jobs file
qids = queued_ids()
while {{jobid}} not in qids[:{{FIXIE_NJOBS}}]:
    if {{jobid}} not in qids:
        # job cancels itself if it isn't in the queue at all!
        err = 'Job canceled itself after jobfile was removed from queue'
        job.update({'returncode': 1,
                    'out': None,
                    'err': err,
                    'queue_endtime': time.time()})
        with open('{{FIXIE_CANCELED_JOBS_DIR}}/{{jobid}}.json', 'w') as f:
            json.dump(job, f, sort_keys=True, indent=1)
        import sys
        sys.exit(err)
    time.sleep(0.1)
    qids = queued_ids()
job['queue_endtime'] = time.time()
os.remove('{{FIXIE_QUEUED_JOBS_DIR}}/{{jobid}}.json')
with open('{{FIXIE_RUNNING_JOBS_DIR}}/{{jobid}}.json', 'w') as f:
    json.dump(job, f, sort_keys=True, indent=1)

# make a pending path file, to signal that a path is available.
pending_path = {
    'file': out,
    'holding': {{FIXIE_HOLDING_TIME}},
    'jobid': {{jobid}},
    'path': '{{path}}',
    'project': '{{project}}',
    'user': '{{user}}',
    }
with open('{{FIXIE_PATHS_DIR}}/{{user}}-{{jobid}}-pending-path.json', 'w') as f:
    json.dump(pending_path, f, sort_keys=True, indent=1)

# run cyclus itself
with ${...}.swap(RAISE_SUBPROC_ERROR=False):
    proc = !(cyclus -f json -o @(out) @(inp))

# update and swap job file
job.update({
    'returncode': proc.returncode,
    'starttime': proc.starttime,
    'endtime': proc.endtime,
    'out': proc.out,
    'err': proc.err,
    })
jobdir = '{{FIXIE_COMPLETED_JOBS_DIR}}' if proc else '{{FIXIE_FAILED_JOBS_DIR}}'
os.remove('{{FIXIE_RUNNING_JOBS_DIR}}/{{jobid}}.json')
with open(jobdir + '/{{jobid}}.json', 'w') as f:
    json.dump(job, f, sort_keys=True, indent=1)
"""


@lazyobject
def SPAWN_TEMPLATE():
    """A jinja template for spawning simulations."""
    from jinja2 import Template
    return Template(SPAWN_XSH)


[docs]def spawn(simulation, user, token, name='', project='', path='', permissions='public', post=(), notify=(), interactive=False, return_pid=False): """Spawning simulations let’s the batch execution service know to run a simulation as soon as possible. Parameters ---------- simulation : dict or string Cyclus simulation, currently only dict methods are supported user : str Name of the user token : str Credential token for the user name : str, optional Alias for simulation, default '' project : str, optional Name of the project, default '' path : str, optional Path to suggest for output file. An empty string will apply a default path name, which depends on the simulation name, the project name, and the jobid. permissions : str or list of str, optional "public" (default), "private", or a list of users. Currently only public permissions are supported. post : list, optional Any post processing activities, not currently supported notify : list, optional Any notifications to register, not currently supported interactive : bool, optional True or False (default), not currently supported. return_pid : bool, optional Whether or not to return the PID of the detached child process. Default False, this is mostly for testing. Returns ------- jobid : int Unique job id of this run, this is negative if job could not be spawned. status : bool Whether run was spawned successfully, message : str Message about status pid : int, if return_pid is True Child process id. """ # validate all inputs if not isinstance(simulation, Mapping): return -1, False, 'Simulation must be dict (i.e. mapping object) currently.' if permissions != 'public': return -1, False, 'Non-public permissions are not supported yet.' if post: return -1, False, 'Post-processing activities are not supported yet.' if notify: return -1, False, 'Notifications are not supported yet.' if interactive: return -1, False, 'Interactive simulation spawning is not supported yet.' valid, msg, status = verify_user(user, token) if not status or not valid: return -1, False, msg # now we can actually spawn the simulation jobid = next_jobid() path = default_path(path, name=name, project=project, jobid=jobid) holding = "'inf'" if ENV['FIXIE_HOLDING_TIME'] == float('inf') \ else ENV['FIXIE_HOLDING_TIME'] ctx = dict( FIXIE_CANCELED_JOBS_DIR=ENV['FIXIE_CANCELED_JOBS_DIR'], FIXIE_COMPLETED_JOBS_DIR=ENV['FIXIE_COMPLETED_JOBS_DIR'], FIXIE_FAILED_JOBS_DIR=ENV['FIXIE_FAILED_JOBS_DIR'], FIXIE_HOLDING_TIME=holding, FIXIE_NJOBS=ENV['FIXIE_NJOBS'], FIXIE_PATHS_DIR=ENV['FIXIE_PATHS_DIR'], FIXIE_QUEUED_JOBS_DIR=ENV['FIXIE_QUEUED_JOBS_DIR'], FIXIE_RUNNING_JOBS_DIR=ENV['FIXIE_RUNNING_JOBS_DIR'], FIXIE_SIMS_DIR=ENV['FIXIE_SIMS_DIR'], interactive=interactive, jobid=jobid, name=name, notify=repr(notify), path=path, permissions=repr(permissions), post=repr(post), project=project, simulation=pformat(simulation), user=user, ) script = SPAWN_TEMPLATE.render(ctx) cmd = ['xonsh', '-c', script] pid = detached_call(cmd) if name or project: register_job_alias(jobid, user, name=name, project=project) rtn = (jobid, True, 'Simulation spawned') if return_pid: rtn += (pid,) return rtn
STATUS_IDS = {} t = """ def {status}_ids(): "Set of {status} jobids." ids = {{int(j.name[:-5]) for j in os.scandir(ENV['FIXIE_{STATUS}_JOBS_DIR'])}} return ids STATUS_IDS['{status}'] = {status}_ids """ g = globals() for status in QUEUE_STATUSES: exec(t.format(status=status, STATUS=status.upper()), g) del g, t, status
[docs]def cancel(job, user, token, project=''): """Cancels a job that is queued or running. Parameters ---------- job : int or str If an integer, this is the jobid. If it is a string, it is interpreted as a job name, and a jobid is looked up via the aliases cache. user : str Name of the user token : str Credential token for the user project : str, optional Name of the project, default '', only used with alias lookup. Returns ------- jobid : int Unique job id of the canceled run, this is negative if a unique jobid could not be found. status : bool Whether run was successfully canceled. message : str Message about status """ # verify users valid, msg, status = verify_user(user, token) if not status or not valid: return -1, False, msg # get jobids qids = queued_ids() rids = running_ids() qrids = qids | rids if isinstance(job, str): jobids = jobids_from_alias(user, job, project=project) else: jobids = {job} # check uniqueness and get jobid current = jobids & qrids if len(current) > 1: msg = ('Too many jobids found! {user} in project {project!r} is ' 'has the folllowing jobids queued or running for {name!r}: ' '{jobs}') msg = msg.format(user=user, project=project, name=job, jobs=', '.join(map(str, current))) return -1, False, msg elif len(current) == 0: return -1, False, 'No running or queued job found' else: jobid = jobids.pop() # get the job data, if we can find it base = str(jobid) + '.json' status_dirs = [ENV['FIXIE_QUEUED_JOBS_DIR'], ENV['FIXIE_RUNNING_JOBS_DIR']] for d in status_dirs: jobfile = os.path.join(d, base) if os.path.exists(jobfile): # can't just do a normal read here, since we may be trying to cancel # a job that hasn't been fully written yet. s = '' with open(jobfile) as f: while not s: s = f.read() data = json.loads(s) break else: return -1, False, 'Job file could not be cound in queue or running.' # kill the job and transfer job to canceled dir if user != data['user']: return jobid, False, 'User did not start job, cannot cancel it!' os.kill(data['pid'], signal.SIGTERM) os.remove(jobfile) if 'queued_endtime' not in data: data['queued_endtime'] = time.time() if 'starttime' not in data: data['starttime'] = time.time() data.update({ 'returncode': 1, 'endtime': time.time(), 'out': None, 'err': 'Job was canceled externally', }) jobfile = os.path.join(ENV['FIXIE_CANCELED_JOBS_DIR'], base) with open(jobfile, 'w') as f: json.dump(data, f, sort_keys=True, indent=1) return jobid, True, 'Job canceled'
def _convert_to_statuses_set(statuses): """Returns a set of valid statuses AND an error message. On failure, the set of statues will be None. """ if isinstance(statuses, str): statuses = {statuses} s = set() for status in statuses: if status == 'all': s |= QUEUE_STATUSES elif not isinstance(status, str): return None, 'status must be a string, got ' + repr(status) elif status not in QUEUE_STATUSES: return None, status + ' is not a valid status' else: s.add(status) return s, '' def _ensure_set_of_str_or_none(x): """Returns an object that is a set of str or None AND an error message. On failure, the error message will be non-empty. """ if x is None: return x, '' elif isinstance(x, str): return set([x]), '' elif isinstance(x, Set): for e in x: if not isinstance(e, str): return None, '{0!r} is not a string'.format(e) return x, '' else: y = set() for e in x: if not isinstance(e, str): return None, '{0!r} is not a string'.format(e) y.add(e) return y, '' def _load_job(jobid, hint): """Loads a job from a jobid and a hint about which status queue it might be in. Returns a job dict or None (if the job could not be found), and the status the job was found in. """ t = 'FIXIE_{0}_JOBS_DIR' base = str(jobid) + '.json' # first try the hint jobfile = os.path.join(ENV[t.format(hint.upper())], base) if os.path.exists(jobfile): with open(jobfile) as f: job = json.load(f) return job, hint # couldn't find in the hint, search other statuses. for status in QUEUE_STATUSES: jobfile = os.path.join(ENV[t.format(status.upper())], base) if os.path.exists(jobfile): with open(jobfile) as f: job = json.load(f) return job, status else: return None, None
[docs]def query(statuses='all', users=None, jobs=None, projects=None): """Returns the state of the jobs, filtered as approriate. Parameters ---------- statuses : str or set of str, optional Which queue statuses we should pull jobs from. If 'all', every status queue is searched. If a set of str is provided, the statuses are ORed together. users : str, set of str, or None, optional User name(s) to filter on. If None, all user names are used. User names are ORed together, if multiple are provided. jobs : int, str, set of ints & strs, or None, optional The jobids and job names to filter on, if provided. If None, filtering on job id/name is not done. These are ORed together. projects : str, set of str, or None, optional Project names to filer on, if not None. These are ORed together. Returns ------- data : list of dicts or None The list of all jobs found. None if status is False. status : bool Whether or not the query was successful. message : str Message related to the status of the query """ users, msg = _ensure_set_of_str_or_none(users) if msg: return None, False, msg projects, msg = _ensure_set_of_str_or_none(projects) if msg: return None, False, msg # get job ids from statuses statuses, msg = _convert_to_statuses_set(statuses) if statuses is None: return None, False, msg sids = set() # all ids joined together ids_to_status = {} # ids mapped to the status found for status in statuses: s = STATUS_IDS[status]() sids |= s ids_to_status.update({i: status for i in s}) # get job ids from jobs if jobs is None: # since jobs was not provided, we want the maximal set # this happens to be at most the job ids we have already found jids = sids elif isinstance(jobs, int): jids = jobs = set([jobs]) else: if isinstance(jobs, str): jobs = set([jobs]) jids = set() for job in jobs: if isinstance(job, int): jids.add(job) elif isinstance(job, str): jids |= jobids_with_name(job) else: msg = 'type of job not reconized: {0} {1}' return None, False, msg.format(job, type(job)) # Now load the jobfiles and filter based on user and project data = [] jobids = sids & jids for jobid in sorted(jobids): job, status = _load_job(jobid, ids_to_status[jobid]) if job is None: continue if users is not None and job['user'] not in users: continue if projects is not None and job['project'] not in projects: continue job['status'] = status data.append(job) return data, True, 'Jobs queried'