diff --git a/api/api.py b/api/api.py index 77043bf3777c4ee472284487619db93172f680ca..f01c2196f43506f0c4c19a7669e2834546180e0f 100644 --- a/api/api.py +++ b/api/api.py @@ -15,9 +15,8 @@ from .handlers.roothandler import RootHandler from .handlers.schemahandler import SchemaHandler from .handlers.searchhandler import SearchHandler from .handlers.userhandler import UserHandler -from .jobs.handlers import JobsHandler, JobHandler, GearsHandler, GearHandler, RulesHandler +from .jobs.handlers import BatchHandler, JobsHandler, JobHandler, GearsHandler, GearHandler, RulesHandler from .upload import Upload - from . import config log = config.log @@ -133,7 +132,6 @@ endpoints = [ route('/<:[^/]+>/config.json', JobHandler, h='get_config'), route('/<:[^/]+>/retry', JobHandler, h='retry', m=['POST']), ]), - route('/gears', GearsHandler), prefix('/gears', [ route('/<:[^/]+>', GearHandler), @@ -144,6 +142,18 @@ endpoints = [ route('/rules', RulesHandler), + # Batch jobs + + route('/batch', BatchHandler, h='get_all', m=['GET']), + route('/batch', BatchHandler, m=['POST']), + prefix('/batch', [ + route('/<:[^/]+>', BatchHandler, h='get', m=['GET']), + route('/<:[^/]+>/run', BatchHandler, h='run', m=['POST']), + route('/<:[^/]+>/cancel', BatchHandler, h='cancel', m=['POST']), + route('/<:[^/]+>/jobs', BatchHandler, h='get_jobs', m=['GET']) + ]), + + # Devices route( '/devices', DeviceHandler, h='get_all', m=['GET']), diff --git a/api/dao/containerstorage.py b/api/dao/containerstorage.py index 16ad589d4f147a8875424ca2debfecd496adb7a7..c256a05316e7e6f1e5156c9d1105bae785a911d2 100644 --- a/api/dao/containerstorage.py +++ b/api/dao/containerstorage.py @@ -52,13 +52,13 @@ class ContainerStorage(object): Factory method to aid in the creation of a ContainerStorage instance when cont_name is dynamic. """ - if cont_name == 'groups': + if cont_name in ['group', 'groups']: return GroupStorage() - elif cont_name == 'projects': + elif cont_name in ['project', 'projects']: return ProjectStorage() - elif cont_name == 'sessions': + elif cont_name in ['session', 'sessions']: return SessionStorage() - elif cont_name == 'acquisitions': + elif cont_name in ['acquisition', 'acquisitions']: return AcquisitionStorage() else: return ContainerStorage(cont_name, use_object_id) @@ -179,6 +179,7 @@ class ContainerStorage(object): def get_all_el(self, query, user, projection, fill_defaults=False): if user: + log.debug('user is {}'.format(user)) if query.get('permissions'): query['$and'] = [{'permissions': {'$elemMatch': user}}, {'permissions': query.pop('permissions')}] else: diff --git a/api/jobs/batch.py b/api/jobs/batch.py new file mode 100644 index 0000000000000000000000000000000000000000..780afe4079c1d101fbe035a7bbc24b20c926b350 --- /dev/null +++ b/api/jobs/batch.py @@ -0,0 +1,182 @@ +""" +Batch +""" +import bson + +from .. import config +from ..dao import APINotFoundException +from ..dao.containerutil import create_filereference_from_dictionary, create_containerreference_from_filereference +from .jobs import Job +from .queue import Queue +from . import gears + +log = config.log + +BATCH_JOB_TRANSITIONS = { + # To <------- #From + 'launched': 'pending', + 'cancelled': 'launched' +} + +def get_all(query, projection=None): + """ + Fetch batch objects from the database + """ + return config.db.batch.find(query, projection) + +def get(batch_id, projection=None, get_jobs=False): + """ + Fetch batch object by id, include stats and job objects as requested + """ + + if isinstance(batch_id, str): + batch_id = bson.ObjectId(batch_id) + batch_job = config.db.batch.find_one({'_id': batch_id}, projection) + + if batch_job is None: + raise APINotFoundException('Batch job {} not found.'.format(batch_id)) + + if get_jobs: + jobs = [] + for jid in batch_job.get('jobs', []): + job = Job.get(jid) + jobs.append(job) + batch_job['jobs'] = jobs + + return batch_job + +def find_matching_conts(gear, containers, container_type): + """ + Give a gear and a list of containers, find files that: + - have no solution to the gear's input schema (not matched) + - have multiple solutions to the gear's input schema (ambiguous) + - match the gear's input schema 1 to 1 (matched) + Containers are placed in one of the three categories in order. + A container with 2 possible files for one input and none for the other + will be marked as 'not matched', not ambiguous. + """ + + matched_conts = [] + not_matched_conts = [] + ambiguous_conts = [] + + for c in containers: + files = c.get('files') + if files: + suggestions = gears.suggest_for_files(gear, files) + + # Determine if any of the inputs are ambiguous or not satisfied + ambiguous = False # Are any of the inputs ambiguous? + not_matched = False + for files in suggestions.itervalues(): + if len(files) > 1: + ambiguous = True + elif len(files) == 0: + not_matched = True + break + + # Based on results, add to proper list + if not_matched: + not_matched_conts.append(c) + elif ambiguous: + ambiguous_conts.append(c) + else: + # Create input map of file refs + inputs = {} + for input_name, files in suggestions.iteritems(): + inputs[input_name] = {'type': container_type, 'id': str(c['_id']), 'name': files[0]} + c['inputs'] = inputs + matched_conts.append(c) + else: + not_matched_conts.append(c) + return { + 'matched': matched_conts, + 'not_matched': not_matched_conts, + 'ambiguous': ambiguous_conts + } + +def insert(batch_proposal): + """ + Simple database insert given a batch proposal. + """ + return config.db.batch.insert(batch_proposal) + +def update(batch_id, payload): + """ + Updates a batch job, being mindful of state flow. + """ + + bid = bson.ObjectId(batch_id) + query = {'_id': bid} + if payload.get('state'): + # Require that the batch job has the previous state + query['state'] = BATCH_JOB_TRANSITIONS[payload.get('state')] + result = config.db.batch.update_one({'_id': bid}, {'$set': payload}) + if result.modified_count != 1: + raise Exception('Batch job not updated') + +def run(batch_job): + """ + Creates jobs from proposed inputs, returns jobs enqueued. + """ + + proposed_inputs = batch_job.get('proposed_inputs', []) + gear_name = batch_job.get('gear') + config_ = batch_job.get('config') + origin = batch_job.get('origin') + + jobs = [] + job_ids = [] + for inputs in proposed_inputs: + for input_name, fr in inputs.iteritems(): + inputs[input_name] = create_filereference_from_dictionary(fr) + # TODO support analysis gears (will have to create analyses here) + destination = create_containerreference_from_filereference(inputs[inputs.keys()[0]]) + job = Job(gear_name, inputs, destination=destination, tags=['batch'], config_=config_, origin=origin) + job_id = job.insert() + jobs.append(job) + job_ids.append(job_id) + + update(batch_job['_id'], {'state': 'launched', 'jobs': job_ids}) + return jobs + +def cancel(batch_job): + """ + Cancels all pending jobs, returns number of jobs cancelled. + """ + + pending_jobs = config.db.jobs.find({'state': 'pending', '_id': {'$in': batch_job.get('jobs')}}) + cancelled_jobs = 0 + for j in pending_jobs: + job = Job.load(j) + try: + Queue.mutate(job, {'state': 'cancelled'}) + cancelled_jobs += 1 + except Exception: # pylint: disable=broad-except + # if the cancellation fails, move on to next job + continue + + update(batch_job['_id'], {'state': 'cancelled'}) + return cancelled_jobs + + +def get_stats(): + """ + Return the number of jobs by state. + """ + raise NotImplementedError() + +def resume(): + """ + Move cancelled jobs back to pending. + """ + raise NotImplementedError() + +def delete(): + """ + Remove: + - the batch job + - it's spawned jobs + - all the files it's jobs produced. + """ + raise NotImplementedError() diff --git a/api/jobs/gears.py b/api/jobs/gears.py index d416575c4b27500741fa4b8e93154e3fb891e23d..80055775160ce26f0ba8051dc6abd7aa737eb84b 100644 --- a/api/jobs/gears.py +++ b/api/jobs/gears.py @@ -73,6 +73,25 @@ def suggest_container(gear, cont_name, cid): return root +def suggest_for_files(gear, files): + + invocation_schema = get_invocation_schema(gear) + schemas = {} + for x in gear['gear']['inputs']: + schema = gear_tools.isolate_file_invocation(invocation_schema, x) + schemas[x] = Draft4Validator(schema) + + suggested_files = {} + log.debug(schemas) + for input_name, schema in schemas.iteritems(): + suggested_files[input_name] = [] + for f in files: + if schema.is_valid(f): + suggested_files[input_name].append(f.get('name')) + + return suggested_files + + def insert_gear(doc): gear_tools.validate_manifest(doc['gear']) diff --git a/api/jobs/handlers.py b/api/jobs/handlers.py index 3822332340f0b0051fd0f1021e2744549eb7fa3a..da5c293e68d6b0962d626fc41b4db31e56937e56 100644 --- a/api/jobs/handlers.py +++ b/api/jobs/handlers.py @@ -1,15 +1,19 @@ """ API request handlers for the jobs module """ - +import bson import json import StringIO import gear_tools from jsonschema import Draft4Validator, ValidationError +from ..auth import require_login +from ..dao import APIPermissionException +from ..dao.containerstorage import ContainerStorage from ..dao.containerutil import create_filereference_from_dictionary, create_containerreference_from_dictionary, create_containerreference_from_filereference, ContainerReference from ..web import base from .. import config +from . import batch from .gears import get_gears, get_gear_by_name, get_invocation_schema, remove_gear, upsert_gear, suggest_container from .jobs import Job @@ -271,3 +275,126 @@ class JobHandler(base.RequestHandler): new_id = Queue.retry(j, force=True) return { "_id": new_id } + +class BatchHandler(base.RequestHandler): + + @require_login + def get_all(self): + """ + Get a list of batch jobs user has created. + Make a superuser request to see all batch jobs. + """ + + if self.superuser_request: + # Don't enforce permissions for superuser requests or drone requests + query = {} + else: + query = {'origin.id': self.uid} + return batch.get_all(query, {'proposed_inputs':0}) + + @require_login + def get(self, _id): + """ + Get a batch job by id. + Use param jobs=true to replace job id list with job objects. + """ + + get_jobs = self.is_true('jobs') + batch_job = batch.get(_id, projection={'proposed_inputs':0}, get_jobs=get_jobs) + self._check_permission(batch_job) + return batch_job + + @require_login + def post(self): + """ + Create a batch job proposal, insert as 'pending'. + """ + + if self.superuser_request: + # Don't enforce permissions for superuser requests + user = None + else: + user = {'_id': self.uid, 'site': self.user_site} + + payload = self.request.json + gear_name = payload.get('gear') + targets = payload.get('targets') + + if not gear_name or not targets: + self.abort(400, 'A gear name and list of target containers is required.') + gear = get_gear_by_name(gear_name) + + container_ids = [] + container_type = None + + for t in targets: + if not container_type: + container_type = t.get('type') + else: + # Ensure all targets are of same type, may change in future + if container_type != t.get('type'): + self.abort(400, 'targets must all be of same type.') + container_ids.append(t.get('id')) + + objectIds = [bson.ObjectId(x) for x in container_ids] + containers = ContainerStorage.factory(container_type).get_all_el({'_id': {'$in':objectIds}}, user, {'permissions': 0}) + + if len(containers) != len(container_ids): + # TODO: Break this out into individual errors, requires more work to determine difference + self.abort(404, 'Not all target containers exist or user does not have access to all containers.') + + + results = batch.find_matching_conts(gear, containers, container_type) + + matched = results['matched'] + if not matched: + self.abort(400, 'No target containers matched gear input spec.') + + batch_proposal = { + '_id': bson.ObjectId(), + 'gear': gear_name, + 'config': payload.get('config', {}), + 'state': 'pending', + 'origin': self.origin, + 'proposed_inputs': [c.pop('inputs') for c in matched] + } + + batch.insert(batch_proposal) + batch_proposal.pop('proposed_inputs') + batch_proposal['not_matched'] = results['not_matched'], + batch_proposal['ambiguous'] = results['ambiguous'], + batch_proposal['matched'] = matched + + return batch_proposal + + @require_login + def run(self, _id): + """ + Creates jobs from proposed inputs, returns jobs enqueued. + Moves 'pending' batch job to 'launched'. + """ + + batch_job = batch.get(_id) + self._check_permission(batch_job) + if batch_job.get('state') != 'pending': + self.abort(400, 'Can only run pending batch jobs.') + return batch.run(batch_job) + + @require_login + def cancel(self, _id): + """ + Cancels jobs that are still pending, returns number of jobs cancelled. + Moves a 'launched' batch job to 'cancelled'. + """ + + batch_job = batch.get(_id) + self._check_permission(batch_job) + if batch_job.get('state') != 'launched': + self.abort(400, 'Can only cancel started batch jobs.') + return {'number_cancelled': batch.cancel(batch_job)} + + def _check_permission(self, batch_job): + if not self.superuser_request: + if batch_job['origin'].get('id') != self.uid: + raise APIPermissionException('User does not have permission to access batch {}'.format(batch_job.get('_id'))) + diff --git a/api/jobs/jobs.py b/api/jobs/jobs.py index 4ab8798c66c119874f317b328d78d6e98c3c7222..44c21a83d65df7068bd2ed7e806c5e7d06b8435b 100644 --- a/api/jobs/jobs.py +++ b/api/jobs/jobs.py @@ -66,7 +66,8 @@ class Job(object): if destination is None and inputs is not None: # Grab an arbitrary input's container key = inputs.keys()[0] - destination = create_containerreference_from_filereference(inputs[key]) + fr = inputs[key] + destination = create_containerreference_from_filereference(fr) # A job is always tagged with the name of the gear tags.append(name) @@ -149,6 +150,14 @@ class Job(object): """ # Don't modify the job obj + # test = self.__dict__ + # for k, v in test.iteritems(): + # config.log.debug('the k is {} and the type of the v is {}'.format(k, type(v))) + + # raise Exception + + + d = copy.deepcopy(self.__dict__) d['id'] = d.pop('id_', None) diff --git a/api/jobs/queue.py b/api/jobs/queue.py index 06b0ede9d17bc1f417c9f7c3109cd11efaec51f6..a6b43d6702f2cf38705491b1defe1822082dc527 100644 --- a/api/jobs/queue.py +++ b/api/jobs/queue.py @@ -18,6 +18,7 @@ JOB_STATES = [ 'running', # Job has been handed to an engine and is being processed 'failed', # Job has an expired heartbeat (orphaned) or has suffered an error 'complete', # Job has successfully completed + 'cancelled' # Job has been cancelled (via a bulk job cancellation) ] JOB_STATES_ALLOWED_MUTATE = [ @@ -27,6 +28,7 @@ JOB_STATES_ALLOWED_MUTATE = [ JOB_TRANSITIONS = [ 'pending --> running', + 'pending --> cancelled', 'running --> failed', 'running --> complete', ]