diff --git a/api/jobs.py b/api/jobs.py index 628dea5b107b03bce66bc76b3eafbf1bdaba72ac..900a504040deb92eff9b98188771d6db4e6d24fc 100644 --- a/api/jobs.py +++ b/api/jobs.py @@ -2,14 +2,20 @@ API request handlers for process-job-handling. """ +# We shadow the standard library; this is a workaround. +from __future__ import absolute_import + +import logging +log = logging.getLogger('scitran.api.jobs') + import bson import pymongo import datetime +from collections import namedtuple from . import base from . import config -log = config.log JOB_STATES = [ 'pending', # Job is queued @@ -37,38 +43,39 @@ ALGORITHMS = [ "dcm2nii" ] +# A FileInput tuple holds all the details of a scitran file that needed to use that as an input a formula. +FileInput = namedtuple('input', ['container_type', 'container_id', 'filename', 'filehash']) -# TODO: json schema +# Convert a dictionary to a FileInput +# REVIEW: less irritating conversion? +def convert_to_fileinput(d): + return FileInput( + container_type= d['container_type'], + container_id = d['container_id'], + filename = d['filename'], + filehash = d['filehash'] + ) - -def spawn_jobs(db, containers, file): +def create_fileinput_from_reference(container, container_type, file_): """ - Spawn some number of queued jobs to process a file. + Spawn a job to process a file. Parameters ---------- - db: pymongo.database.Database - Reference to the database instance - containers: [ tuple(string, scitran.Container) ] - An array of tuples, each containing a container type name, and a container object. - Contract is: - 1) The first container in the array will be the container which owns file passed in the file param. - 2) Following array indexes, if any, will be higher in the ownership heirarchy than the first container. - 3) Array is not guaranteed to be strictly hierarchically ordered. + container: scitran.Container + A container object that the file is held by + container_type: string + The type of container (eg, 'session') file: scitran.File File object that is used to spawn 0 or more jobs. """ if file.get('type', '') != 'dicom': return - # File information filename = file['name'] filehash = file['hash'] - # File container information - last = len(containers) - 1 - container_type, container = containers[last] container_id = str(container['_id']) log.info('File ' + filename + 'is in a ' + container_type + ' with id ' + container_id + ' and hash ' + filehash) @@ -76,10 +83,10 @@ def spawn_jobs(db, containers, file): # Spawn rules currently do not look at container hierarchy, and only care about a single file. # Further, one algorithm is unconditionally triggered for each dirty file. - queue_job(db, 'dcm2nii', container_type, container_id, filename, filehash) + return FileInput(container_type=container_type, container_id=container_id, filename=filename, filehash=filehash) -def queue_job(db, algorithm_id, container_type, container_id, filename, filehash, attempt_n=1, previous_job_id=None): +def queue_job(db, algorithm_id, input, attempt_n=1, previous_job_id=None): """ Enqueues a job for execution. @@ -89,14 +96,8 @@ def queue_job(db, algorithm_id, container_type, container_id, filename, filehash Reference to the database instance algorithm_id: string Human-friendly unique name of the algorithm - container_type: string - Type of container ('acquisitions', 'sessions', etc) that matches the URL route. - container_id: string - ID of the container ('2', etc) - filename: string - Name of the file to download - filehash: string - Hash of the file to download + input: FileInput + The input to be used by this job attempt_n: integer (optional) If an equivalent job has tried & failed before, pass which attempt number we're at. Defaults to 1 (no previous attempts). """ @@ -114,14 +115,8 @@ def queue_job(db, algorithm_id, container_type, container_id, filename, filehash 'created': now, 'modified': now, - # Everything required to generate a job formula. - 'intent': { - 'algorithm_id': algorithm_id, - 'container_type': container_type, - 'container_id': container_id, - 'filename': filename, - 'filehash': filehash, - }, + 'algorithm_id': algorithm_id, + 'input': input._asdict(), 'attempt': attempt_n, } @@ -132,19 +127,17 @@ def queue_job(db, algorithm_id, container_type, container_id, filename, filehash result = db.jobs.insert_one(job) _id = result.inserted_id - log.info('Running %s as job %s to process %s %s' % (algorithm_id, str(_id), container_type, container_id)) + log.info('Running %s as job %s to process %s %s' % (algorithm_id, str(_id), input.container_type, input.container_id)) return _id -def retry_job(db, j): +def retry_job(db, j, force=False): """ Given a failed job, either retry the job or fail it permanently, based on the attempt number. - TODO: make max attempts configurable + Can override the attempt limit by passing force=True. """ - i = j['intent'] - - if j['attempt'] < 3: - job_id = queue_job(db, i['algorithm_id'], i['container_type'], i['container_id'], i['filename'], i['filehash'], j['attempt']+1, j['_id']) + if j['attempt'] < 3 or Force: + job_id = queue_job(db, j['algorithm_id'], convert_to_fileinput(j['input']), j['attempt']+1, j['_id']) log.info('respawned job %s as %s (attempt %d)' % (j['_id'], job_id, j['attempt']+1)) else: log.info('permanently failed job %s (after %d attempts)' % (j['_id'], j['attempt'])) @@ -216,13 +209,6 @@ class Jobs(base.RequestHandler): return self.app.db.jobs.count() - def addTestJob(self): - """Adds a harmless job for testing purposes. Intentionally equivalent return to queue_job.""" - if not self.superuser_request: - self.abort(401, 'Request requires superuser') - - return queue_job(self.app.db, 'dcm2nii', 'acquisition', '55bf861e6941f040cf8d6939') - def next(self): """ Atomically change a 'pending' job to 'running' and returns it. Updates timestamp. diff --git a/api/rules.py b/api/rules.py new file mode 100644 index 0000000000000000000000000000000000000000..f8adef5390cfd9e93aa39b485b53429aa3790ad7 --- /dev/null +++ b/api/rules.py @@ -0,0 +1,151 @@ + +import logging +log = logging.getLogger('scitran.api.jobs') + +import bson +import pymongo +import datetime +import fnmatch + +from . import base +from . import util +from . import jobs + + +# +# { +# At least one match from this array must succeed, or array must be empty +# "any": [ +# ["file.type", "dicom" ] # Match the file's type +# ["file.name", "*.dcm" ] # Match a shell glob for the file name +# ["file.measurements", "diffusion" ] # Match any of the file's measurements +# ["container.measurement", "diffusion" ] # Match the container's primary measurment +# ["container.has-type", "bvec" ] # Match the container having any file (including this one) with this type +# ] +# +# All matches from array must succeed, or array must be empty +# "all": [ +# ] +# +# Algorithm to run if both sets of rules match +# "alg": "dcm2nii" +# } +# + +MATCH_TYPES = [ + 'file.type', + 'file.name', + 'file.measurements', + 'container.measurement', + 'container.has-type' +] + +# TODO: replace with default rules, which get persisted, maintained, upgraded, and reasoned intelligently +HARDCODED_RULES = [ + # dcm2nii + { + 'all': [ + ['file.type', 'dicom'] + ], + 'alg': 'dcm2nii' + } +] + +def eval_match(match_type, match_param, file_, container): + """ + Given a match entry, return if the match succeeded. + """ + + if not match_type in MATCH_TYPES: + raise Exception('Unsupported match type ' + match_type) + + # Match the file's type + if match_type == 'file.type': + return file_['filetype'] == match_param + + # Match a shell glob for the file name + elif match_type == 'file.name': + return fnmatch.fnmatch(file_['filename'], match_param) + + # Match any of the file's measurements + elif match_type == 'file.measurements': + return match_param in file_[measurements] + + # Match the container's primary measurment + elif match_type == 'container.measurement': + return container['measurement'] == match_param + + # Match the container having any file (including this one) with this type + elif match_type == 'container.has-type': + for c_file in container['files']: + if match_param in c_file['measurements']: + return True + + return False + + raise Exception('Unimplemented match type ' + match_type) + + +def eval_rule(rule, file_, container): + """ + Decide if a rule should spawn a job. + """ + + # Are there matches in the 'any' set? + must_match = len(rule.get('any', [])) > 0 + has_match = False + + for match in rule.get('any', []): + if eval_match(match[0], match[1], file_, container): + has_match = True + break + + # If there were matches in the 'any' array and none of them succeeded + if must_match and not has_match: + return False + + # Are there matches in the 'all' set? + for match in rule.get('all', []): + if not eval_match(match[0], match[1], file_, container): + return False + + return True + + +def create_jobs(db, container, container_type, file_): + """ + Check all rules that apply to this file, and enqueue the jobs that should be run. + Returns the algorithm names that were queued. + """ + + job_list = [] + + # Get configured rules for this project + project = get_project_for_container(db, container) + rules = project.get('rules', []) + + # Add hardcoded rules that cannot be removed or changed + for hardcoded_rule in HARDCODED_RULES: + rules.append(hardcoded_rule) + + for rule in rules: + if eval_rule(rule, file_, container): + alg_name = rule['alg'] + input = jobs.create_fileinput_from_reference(container, container_type, file_) + jobs.queue_job(db, alg_name, input) + job_list.append(alg_name) + + return job_list + +# TODO: consider moving to a module that has a variety of hierarchy-management helper functions +def get_project_for_container(db, container): + """ + Recursively walk the hierarchy until the project object is found. + """ + if 'session' in container: + session = db.sessions.find_one({'_id': container['session']}) + return get_project_for_container(db, session) + elif 'project' in container: + project = db.projects.find_one({'_id': container['project']}) + return project + raise Exception('Hierarchy walking not implemented for container ' + str(container['_id'])) diff --git a/bin/api.wsgi b/bin/api.wsgi index c9c6972742974c90c799981ef16f1802c2685de5..9093a60c3ae2f30e5134aa1fed3d329983e40580 100644 --- a/bin/api.wsgi +++ b/bin/api.wsgi @@ -74,6 +74,7 @@ from api import api from api import jobs from api import config from api import centralclient +from api import rules log = config.log config.set_log_level(log, args.log_level) @@ -133,18 +134,21 @@ else: def job_creation(signum): for c_type in ['projects', 'collections', 'sessions', 'acquisitions']: - for c in application.db[c_type].find({'files.unprocessed': True}, ['files']): - containers = [(c_type, c)] # TODO: this should be the full container hierarchy + + # This projection needs every field required to know what type of container it is & navigate to its project + containers = application.db[c_type].find({'files.unprocessed': True}, ['files', 'session', 'project']) + + for c in containers: for f in c['files']: if f.get('unprocessed'): - jobs.spawn_jobs(application.db, containers, f) + rules.create_jobs(application.db, c, c_type, f) r = application.db[c_type].update_one( { '_id': c['_id'], 'files': { '$elemMatch': { - 'name': f['name'], - 'hash': f['hash'], + 'filename': f['filename'], + 'filehash': f['filehash'], }, }, }, @@ -155,7 +159,7 @@ else: }, ) if not r.matched_count: - log.info('file modified or removed, not marked as clean: %s %s, %s' % (c_type, c, f['name'])) + log.info('file modified or removed, not marked as clean: %s %s, %s' % (c_type, c, f['filename'])) while True: j = application.db.jobs.find_one_and_update( {