Skip to content
Snippets Groups Projects
Commit acbd780f authored by Nathaniel Kofalt's avatar Nathaniel Kofalt
Browse files

Merge pull request #84 from scitran/job-rules

Job rules
parents 453a9c49 f0a5a952
No related branches found
No related tags found
No related merge requests found
......@@ -2,14 +2,20 @@
API request handlers for process-job-handling.
"""
# We shadow the standard library; this is a workaround.
from __future__ import absolute_import
import logging
log = logging.getLogger('scitran.api.jobs')
import bson
import pymongo
import datetime
from collections import namedtuple
from . import base
from . import config
log = config.log
JOB_STATES = [
'pending', # Job is queued
......@@ -37,38 +43,39 @@ ALGORITHMS = [
"dcm2nii"
]
# A FileInput tuple holds all the details of a scitran file that needed to use that as an input a formula.
FileInput = namedtuple('input', ['container_type', 'container_id', 'filename', 'filehash'])
# TODO: json schema
# Convert a dictionary to a FileInput
# REVIEW: less irritating conversion?
def convert_to_fileinput(d):
return FileInput(
container_type= d['container_type'],
container_id = d['container_id'],
filename = d['filename'],
filehash = d['filehash']
)
def spawn_jobs(db, containers, file):
def create_fileinput_from_reference(container, container_type, file_):
"""
Spawn some number of queued jobs to process a file.
Spawn a job to process a file.
Parameters
----------
db: pymongo.database.Database
Reference to the database instance
containers: [ tuple(string, scitran.Container) ]
An array of tuples, each containing a container type name, and a container object.
Contract is:
1) The first container in the array will be the container which owns file passed in the file param.
2) Following array indexes, if any, will be higher in the ownership heirarchy than the first container.
3) Array is not guaranteed to be strictly hierarchically ordered.
container: scitran.Container
A container object that the file is held by
container_type: string
The type of container (eg, 'session')
file: scitran.File
File object that is used to spawn 0 or more jobs.
"""
if file.get('type', '') != 'dicom':
return
# File information
filename = file['name']
filehash = file['hash']
# File container information
last = len(containers) - 1
container_type, container = containers[last]
container_id = str(container['_id'])
log.info('File ' + filename + 'is in a ' + container_type + ' with id ' + container_id + ' and hash ' + filehash)
......@@ -76,10 +83,10 @@ def spawn_jobs(db, containers, file):
# Spawn rules currently do not look at container hierarchy, and only care about a single file.
# Further, one algorithm is unconditionally triggered for each dirty file.
queue_job(db, 'dcm2nii', container_type, container_id, filename, filehash)
return FileInput(container_type=container_type, container_id=container_id, filename=filename, filehash=filehash)
def queue_job(db, algorithm_id, container_type, container_id, filename, filehash, attempt_n=1, previous_job_id=None):
def queue_job(db, algorithm_id, input, attempt_n=1, previous_job_id=None):
"""
Enqueues a job for execution.
......@@ -89,14 +96,8 @@ def queue_job(db, algorithm_id, container_type, container_id, filename, filehash
Reference to the database instance
algorithm_id: string
Human-friendly unique name of the algorithm
container_type: string
Type of container ('acquisitions', 'sessions', etc) that matches the URL route.
container_id: string
ID of the container ('2', etc)
filename: string
Name of the file to download
filehash: string
Hash of the file to download
input: FileInput
The input to be used by this job
attempt_n: integer (optional)
If an equivalent job has tried & failed before, pass which attempt number we're at. Defaults to 1 (no previous attempts).
"""
......@@ -114,14 +115,8 @@ def queue_job(db, algorithm_id, container_type, container_id, filename, filehash
'created': now,
'modified': now,
# Everything required to generate a job formula.
'intent': {
'algorithm_id': algorithm_id,
'container_type': container_type,
'container_id': container_id,
'filename': filename,
'filehash': filehash,
},
'algorithm_id': algorithm_id,
'input': input._asdict(),
'attempt': attempt_n,
}
......@@ -132,19 +127,17 @@ def queue_job(db, algorithm_id, container_type, container_id, filename, filehash
result = db.jobs.insert_one(job)
_id = result.inserted_id
log.info('Running %s as job %s to process %s %s' % (algorithm_id, str(_id), container_type, container_id))
log.info('Running %s as job %s to process %s %s' % (algorithm_id, str(_id), input.container_type, input.container_id))
return _id
def retry_job(db, j):
def retry_job(db, j, force=False):
"""
Given a failed job, either retry the job or fail it permanently, based on the attempt number.
TODO: make max attempts configurable
Can override the attempt limit by passing force=True.
"""
i = j['intent']
if j['attempt'] < 3:
job_id = queue_job(db, i['algorithm_id'], i['container_type'], i['container_id'], i['filename'], i['filehash'], j['attempt']+1, j['_id'])
if j['attempt'] < 3 or Force:
job_id = queue_job(db, j['algorithm_id'], convert_to_fileinput(j['input']), j['attempt']+1, j['_id'])
log.info('respawned job %s as %s (attempt %d)' % (j['_id'], job_id, j['attempt']+1))
else:
log.info('permanently failed job %s (after %d attempts)' % (j['_id'], j['attempt']))
......@@ -216,13 +209,6 @@ class Jobs(base.RequestHandler):
return self.app.db.jobs.count()
def addTestJob(self):
"""Adds a harmless job for testing purposes. Intentionally equivalent return to queue_job."""
if not self.superuser_request:
self.abort(401, 'Request requires superuser')
return queue_job(self.app.db, 'dcm2nii', 'acquisition', '55bf861e6941f040cf8d6939')
def next(self):
"""
Atomically change a 'pending' job to 'running' and returns it. Updates timestamp.
......
import logging
log = logging.getLogger('scitran.api.jobs')
import bson
import pymongo
import datetime
import fnmatch
from . import base
from . import util
from . import jobs
#
# {
# At least one match from this array must succeed, or array must be empty
# "any": [
# ["file.type", "dicom" ] # Match the file's type
# ["file.name", "*.dcm" ] # Match a shell glob for the file name
# ["file.measurements", "diffusion" ] # Match any of the file's measurements
# ["container.measurement", "diffusion" ] # Match the container's primary measurment
# ["container.has-type", "bvec" ] # Match the container having any file (including this one) with this type
# ]
#
# All matches from array must succeed, or array must be empty
# "all": [
# ]
#
# Algorithm to run if both sets of rules match
# "alg": "dcm2nii"
# }
#
MATCH_TYPES = [
'file.type',
'file.name',
'file.measurements',
'container.measurement',
'container.has-type'
]
# TODO: replace with default rules, which get persisted, maintained, upgraded, and reasoned intelligently
HARDCODED_RULES = [
# dcm2nii
{
'all': [
['file.type', 'dicom']
],
'alg': 'dcm2nii'
}
]
def eval_match(match_type, match_param, file_, container):
"""
Given a match entry, return if the match succeeded.
"""
if not match_type in MATCH_TYPES:
raise Exception('Unsupported match type ' + match_type)
# Match the file's type
if match_type == 'file.type':
return file_['filetype'] == match_param
# Match a shell glob for the file name
elif match_type == 'file.name':
return fnmatch.fnmatch(file_['filename'], match_param)
# Match any of the file's measurements
elif match_type == 'file.measurements':
return match_param in file_[measurements]
# Match the container's primary measurment
elif match_type == 'container.measurement':
return container['measurement'] == match_param
# Match the container having any file (including this one) with this type
elif match_type == 'container.has-type':
for c_file in container['files']:
if match_param in c_file['measurements']:
return True
return False
raise Exception('Unimplemented match type ' + match_type)
def eval_rule(rule, file_, container):
"""
Decide if a rule should spawn a job.
"""
# Are there matches in the 'any' set?
must_match = len(rule.get('any', [])) > 0
has_match = False
for match in rule.get('any', []):
if eval_match(match[0], match[1], file_, container):
has_match = True
break
# If there were matches in the 'any' array and none of them succeeded
if must_match and not has_match:
return False
# Are there matches in the 'all' set?
for match in rule.get('all', []):
if not eval_match(match[0], match[1], file_, container):
return False
return True
def create_jobs(db, container, container_type, file_):
"""
Check all rules that apply to this file, and enqueue the jobs that should be run.
Returns the algorithm names that were queued.
"""
job_list = []
# Get configured rules for this project
project = get_project_for_container(db, container)
rules = project.get('rules', [])
# Add hardcoded rules that cannot be removed or changed
for hardcoded_rule in HARDCODED_RULES:
rules.append(hardcoded_rule)
for rule in rules:
if eval_rule(rule, file_, container):
alg_name = rule['alg']
input = jobs.create_fileinput_from_reference(container, container_type, file_)
jobs.queue_job(db, alg_name, input)
job_list.append(alg_name)
return job_list
# TODO: consider moving to a module that has a variety of hierarchy-management helper functions
def get_project_for_container(db, container):
"""
Recursively walk the hierarchy until the project object is found.
"""
if 'session' in container:
session = db.sessions.find_one({'_id': container['session']})
return get_project_for_container(db, session)
elif 'project' in container:
project = db.projects.find_one({'_id': container['project']})
return project
raise Exception('Hierarchy walking not implemented for container ' + str(container['_id']))
......@@ -74,6 +74,7 @@ from api import api
from api import jobs
from api import config
from api import centralclient
from api import rules
log = config.log
config.set_log_level(log, args.log_level)
......@@ -133,18 +134,21 @@ else:
def job_creation(signum):
for c_type in ['projects', 'collections', 'sessions', 'acquisitions']:
for c in application.db[c_type].find({'files.unprocessed': True}, ['files']):
containers = [(c_type, c)] # TODO: this should be the full container hierarchy
# This projection needs every field required to know what type of container it is & navigate to its project
containers = application.db[c_type].find({'files.unprocessed': True}, ['files', 'session', 'project'])
for c in containers:
for f in c['files']:
if f.get('unprocessed'):
jobs.spawn_jobs(application.db, containers, f)
rules.create_jobs(application.db, c, c_type, f)
r = application.db[c_type].update_one(
{
'_id': c['_id'],
'files': {
'$elemMatch': {
'name': f['name'],
'hash': f['hash'],
'filename': f['filename'],
'filehash': f['filehash'],
},
},
},
......@@ -155,7 +159,7 @@ else:
},
)
if not r.matched_count:
log.info('file modified or removed, not marked as clean: %s %s, %s' % (c_type, c, f['name']))
log.info('file modified or removed, not marked as clean: %s %s, %s' % (c_type, c, f['filename']))
while True:
j = application.db.jobs.find_one_and_update(
{
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment