Source code for rickshaw.docker_scheduler
"""Scheduler for running via docker."""
import time
import docker
try:
from pprintpp import pprint
except ImportError:
from pprint import pprint
from rickshaw.scheduler import Scheduler
[docs]class DockerScheduler(Scheduler):
"""A base docker scheduler"""
def __init__(self, debug=False, **kwargs):
self.client = docker.from_env()
try:
try_test = self.client.nodes.list()
except:
print( '***************************************************'*2+'\n'+
'This container probably failed to connect to docker. '+
'Remember to give this container/service access to the '+
'docker socket on the host machine with the following argument:\n '+
'-v /var/run/docker.sock:/var/run/docker.sock\n' +
'***************************************************'*2)
self.cyclus_container = None
self.server_tag = "ergs/cyclus-server-dev"
if debug:
self.server_cmd = "--debug"
else:
self.server_cmd = ""
self.cyclus_server_name = "rickshaw_metadata_server"
self.cyclus_server_host = None
self.cyclus_server_ready = False
self.gathered_annotations = False
self._have_swarm = False
self._find_ncpu()
def _find_ncpu(self):
try:
# get NCPUs for swarm
ncpu = 0.0
for node in self.client.nodes.list():
ncpu += node.attrs['Description']['Resources']['NanoCPUs'] * 1e-9
self._have_swarm = True
except docker.errors.APIError:
print('except')
# get NCPUs for local host
ncpu = self.client.info()['NCPU']
self._have_swarm = False
self.ncpu = int(ncpu)
def __del__(self):
self.stop_cyclus_server()
[docs] def start_cyclus_server(self):
"""Starts up a cyclus server at a remote location."""
print("starting cyclus server")
cc = self.cyclus_container = self.client.containers.run(self.server_tag,
self.server_cmd,
ports={'4242/tcp': ('127.0.0.1', 4242)},
name=self.cyclus_server_name,
publish_all_ports=True,
detach=True)
host = self.client.networks.get('bridge').attrs['Containers'][cc.id]['IPv4Address']
if '/' in host:
self.cyclus_server_host, _, _ = host.rpartition('/')
else:
self.cyclus_server_host = host
print("cyclus server started at host " + host)
time.sleep(3)
self.cyclus_server_ready = True
for line in cc.logs(stream=True):
print('[cyclus] ' + line.decode(), end='')
[docs] def stop_cyclus_server(self):
"""Stops the cyclus server running in a remote location"""
if self.cyclus_container is not None:
self.cyclus_container.stop()
self.cyclus_container = None
self.cyclus_server_ready = False
[docs] def queue(self):
"""Obtains the current queue status and retuns the jobs that are scheduled
and status of each job.
"""
return [(c.id, c.status) for c in self.client.containers.list()]
[docs] def schedule(self, sim):
"""Schedules a simulation to be executed."""
print("would have scheduled sim: ", repr(sim))
[docs] def want_n_more_jobs(self):
"""Determine how many more new jobs to schedule."""
n = self.ncpu*2 - len(self.queue())
print("Will want to fill out, " + str(n)+ " jobs")
pprint(self.client.swarm.attrs)
return n