Source code for rickshaw.server

"""The asynchronous rickshaw server that communicates with scheduling queues and
provides randomly generated input files.
"""
import sys
import json
import socket
import asyncio
import concurrent.futures
from argparse import ArgumentParser

import docker
import websockets

from rickshaw.docker_scheduler import DockerScheduler
from rickshaw.server_scheduler import ServerScheduler
import rickshaw.generate as generate


SEND_QUEUE = asyncio.Queue()

[docs]def all_archetypes(): arches = generate.DEFAULT_SOURCES | generate.DEFAULT_SINKS for v in generate.NICHE_ARCHETYPES.values(): arches |= v
return arches
[docs]async def gather_annotations(scheduler, frequency=0.001): """The basic consumer of actions.""" all_arches = all_archetypes() curr_arches = set(generate.ANNOTATIONS.keys()) staged_tasks = [] while curr_arches < all_arches: if SEND_QUEUE.qsize() > 0: await asyncio.sleep(min(frequency*1e3, 1.0)) curr_arches = set(generate.ANNOTATIONS.keys()) continue for arche in all_arches - curr_arches: msg = {'event': 'agent_annotations', 'params': {'spec': arche}} msg = json.dumps(msg) action_task = asyncio.ensure_future(SEND_QUEUE.put(msg)) staged_tasks.append(action_task) if len(staged_tasks) > 0: await asyncio.wait(staged_tasks) staged_tasks.clear() await asyncio.sleep(frequency) curr_arches = set(generate.ANNOTATIONS.keys()) await SEND_QUEUE.put('{"event": "shutdown", "params": {"when": "now"}}')
scheduler.gathered_annotations = True
[docs]async def get_send_data(): """Asynchronously grabs the next data to send from the queue.""" data = await SEND_QUEUE.get()
return data
[docs]async def queue_message_action(message): event = json.loads(message) params = event.get("params", {}) kind = event["event"] if kind == 'agent_annotations': spec = params['spec'] print('received agent annotations for ' + spec, file=sys.stderr) generate.ANNOTATIONS[spec] = event['data'] else:
print("ignoring received " + kind + " event", file=sys.stderr)
[docs]async def websocket_handler(websocket, scheduler): """Sends and recieves data via a websocket.""" while not scheduler.gathered_annotations: recv_task = asyncio.ensure_future(websocket.recv()) send_task = asyncio.ensure_future(get_send_data()) done, pending = await asyncio.wait([recv_task, send_task], return_when=asyncio.FIRST_COMPLETED) # handle incoming if recv_task in done: message = recv_task.result() await queue_message_action(message) else: recv_task.cancel() # handle sending of data if send_task in done: message = send_task.result() await websocket.send(message) else:
send_task.cancel()
[docs]async def websocket_client(port, scheduler, frequency=1.0): """Runs a websocket client on a host/port.""" while not scheduler.cyclus_server_ready: await asyncio.sleep(frequency) host = scheduler.cyclus_server_host url = 'ws://{}:{}'.format(host, port) n = 0 connected = False while not connected: try: async with websockets.connect(url) as websocket: connected = True print("connected to cyclus server websocket") await websocket_handler(websocket, scheduler) except Exception: n += 1 if n > 10: raise print("failed to connect to websocket, retrying {0}/10".format(n))
scheduler.stop_cyclus_server()
[docs]async def start_annotations_server(loop, executor, scheduler): """Starts up remote cyclus server""" run_task = loop.run_in_executor(executor, scheduler.start_annotations_server)
await asyncio.wait([run_task])
[docs]async def schedule_sims(scheduler, frequency=0.001): """Loads jobs into the hopper, as needed.""" freq = min(frequency*1e3, 1.0) while not scheduler.gathered_annotations: await asyncio.sleep(freq) while True: n = scheduler.want_n_more_jobs() if n == 0: await asyncio.sleep(freq) continue for i in range(n): sim = generate()
scheduler.schedule(sim) def _start_debug(loop): import logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger('websockets.server') logger.setLevel(logging.ERROR) logger.addHandler(logging.StreamHandler()) loop.set_debug(True) def _find_open_port(host, port): found = False while not found: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s.bind((host, port)) except socket.error as e: if e.errno == 98: port += 1 continue else: raise finally: s.close() found = True return port
[docs]def make_parser(): """Makes the argument parser for the rickshaw server.""" p = ArgumentParser("rickshaw-server", description="Rickshaw Server CLI") p.add_argument('--debug', action='store_true', default=False, dest='debug', help="runs the server in debug mode.") p.add_argument('--host', dest='host', default='localhost', help='hostname to run the server on') p.add_argument('-p', '--port', dest='port', type=int, default=4242, help='port to run the server on') p.add_argument('-n', '--nthreads', type=int, dest='nthreads', default=4, help='Maximum number of thread workers to run with.') p.add_argument('-s', '--swarm', action='store_true', dest='swarm', default=False, help='Run the server in swarm mode.')
return p
[docs]def main(args=None): p = make_parser() ns = p.parse_args(args=args) # start up tasks executor = concurrent.futures.ThreadPoolExecutor(max_workers=ns.nthreads) loop = asyncio.get_event_loop() if ns.swarm: print('started in swarm mode') scheduler = ServerScheduler(debug=ns.debug) else: print('started in docker mode') scheduler = DockerScheduler(debug=ns.debug) if ns.debug: _start_debug(loop) print("serving rickshaw at http://{}:{}".format(ns.host, ns.port)) # run the loop! try: loop.run_until_complete(asyncio.gather( asyncio.ensure_future(websocket_client(ns.port, scheduler)), asyncio.ensure_future(gather_annotations(scheduler)), asyncio.ensure_future(start_annotations_server(loop, executor, scheduler)), asyncio.ensure_future(schedule_sims(scheduler)), )) finally: if not loop.is_closed():
loop.close() if __name__ == '__main__': main()