Skip to content
Snippets Groups Projects
Commit 3a8650b5 authored by Nathaniel Kofalt's avatar Nathaniel Kofalt
Browse files

Merge pull request #28 from scitran/job-fixes

Tune up jobs API
parents 033e7e41 987aea9c
No related branches found
No related tags found
No related merge requests found
......@@ -83,6 +83,7 @@ routes = [
webapp2_extras.routes.PathPrefixRoute(r'/api/jobs', [
webapp2.Route(r'/next', jobs.Jobs, handler_method='next', methods=['GET']),
webapp2.Route(r'/count', jobs.Jobs, handler_method='count', methods=['GET']),
webapp2.Route(r'/addTestJob', jobs.Jobs, handler_method='addTestJob', methods=['GET']),
webapp2.Route(r'/<:[^/]+>', jobs.Job, name='job'),
]),
webapp2.Route(r'/api/apps', apps.Apps),
......
# vim: filetype=python
import logging
logging.basicConfig(
format='%(asctime)s %(name)16.16s:%(levelname)4.4s %(message)s',
......@@ -14,6 +16,7 @@ import argparse
import api
import centralclient
import jobs
os.environ['PYTHON_EGG_CACHE'] = '/tmp/python_egg_cache'
......@@ -100,7 +103,7 @@ else:
import uwsgidecorators
@uwsgidecorators.cron(0, -1, -1, -1, -1) # top of every hour
def upload_storage_cleaning(num):
def upload_storage_cleaning(signum):
upload_path = application.config['upload_path']
for f in os.listdir(upload_path):
fp = os.path.join(upload_path, f)
......@@ -119,5 +122,46 @@ else:
else:
fail_count = 0
if fail_count == 3:
log.debug('scitran central unreachable, purging all remotes info')
log.warning('scitran central unreachable, purging all remotes info')
centralclient.clean_remotes(application.db, args.site_id)
@uwsgidecorators.timer(30)
def job_creation(signum):
for c_type in ['projects', 'collections', 'sessions', 'acquisitions']:
for c in application.db[c_type].find({'files.dirty': True}, ['files']):
containers = [(c_type, c)] # TODO: this should be the full container hierarchy
for f in c['files']:
if f.get('dirty'):
jobs.spawn_jobs(application.db, containers, f)
r = application.db[c_type].update_one(
{
'_id': c['_id'],
'files': {
'$elemMatch': {
'filename': f['filename'],
'filehash': f['filehash'],
},
},
},
{
'$set': {
'files.$.dirty': False,
},
},
)
if not r.matched_count:
log.info('file modified, not marked as clean: %s %s, %s' % (c_type, c, f['filename']))
for j in application.db.jobs.find_many_and_update(
{
'state': 'running',
'heartbeat': {'$lt': {datetime.datetime.utcnow() - datetime.timedelta(seconds=100)}},
},
{
'state': 'failed',
},
):
if j['attempt'] < 3:
job_id = jobs.queue_job(application.db, j['algorithm_id'], j['container_type'], j['container_id'], j['filename'], j['filehash'], j['attempt']+1, j['_id'])
log.info('respawned job %s as %s (attempt %d)' % (j['_id'], job_id, j['attempt']+1))
else:
log.info('permanently failed job %s (attempt %d)' % (j['_id'], j['attempt']+1))
......@@ -239,6 +239,9 @@ class Container(base.RequestHandler):
fileinfo = util.container_fileinfo(container, filename)
if not fileinfo:
self.abort(404, 'no such file')
hash_ = self.request.GET.get('hash')
if hash_ and hash_ != fileinfo['hash']:
self.abort(409, 'file exists, hash mismatch')
filepath = os.path.join(self.app.config['data_path'], str(_id)[-3:] + '/' + str(_id), filename)
if self.request.GET.get('ticket') == '': # request for download ticket
ticket = util.download_ticket(self.request.client_addr, 'file', _id, filename, fileinfo['filesize'])
......
......@@ -33,75 +33,138 @@ JOB_TRANSITIONS = [
"running --> complete",
]
def valid_transition(from_state, to_state):
return (from_state + " --> " + to_state) in JOB_TRANSITIONS or from_state == to_state
ALGORITHMS = [
"dcm2nii"
]
# TODO: json schema
def validTransition(fromState, toState):
return (fromState + " --> " + tosState) in JOB_TRANSITIONS
def createJob(db, jobType, containerType, containerID):
def spawn_jobs(db, containers, file):
"""
Spawn some number of queued jobs to process a file.
Parameters
----------
db: pymongo.database.Database
Reference to the database instance
containers: [ tuple(string, scitran.Container) ]
An array of tuples, each containing a container type name, and a container object.
Contract is:
1) The first container in the array will be the container which owns file passed in the file param.
2) Following array indexes, if any, will be higher in the ownership heirarchy than the first container.
3) Array is not guaranteed to be strictly hierarchically ordered.
file: scitran.File
File object that is used to spawn 0 or more jobs.
"""
# File information
filename = file['filename']
filehash = file['filehash']
# File container information
last = len(containers) - 1
container_type, container = containers[last]
container_id = container['_id']
log.info('File ' + filename + 'is in a ' + container_type + ' with id ' + str(container_id) + ' and hash ' + filehash)
# Spawn rules currently do not look at container hierarchy, and only care about a single file.
# Further, one algorithm is unconditionally triggered for each dirty file.
queue_job(db, 'dcm2nii', container_type, container_id, filename, filehash)
def queue_job(db, algorithm_id, container_type, container_id, filename, filehash, attempt_n=1, previous_job_id=None):
"""
Creates a job.
Enqueues a job for execution.
Parameters
----------
db: pymongo.database.Database
Reference to the database instance
jobType: string
Human-friendly name of the algorithm
containerType: string
algorithm_id: string
Human-friendly unique name of the algorithm
container_type: string
Type of container ('acquisition', 'session', etc)
containerID: string
container_id: string
ID of the container ('2', etc)
filename: string
Name of the file to download
filehash: string
Hash of the file to download
attempt_n: integer (optional)
If an equivalent job has tried & failed before, pass which attempt number we're at. Defaults to 1 (no previous attempts).
"""
if jobType != 'dcm2nii':
raise Exception('Usupported algorithm ' + jobType)
if algorithm_id not in ALGORITHMS:
raise Exception('Usupported algorithm ' + algorithm_id)
# TODO validate container exists
now = datetime.datetime.utcnow()
job = {
'state': 'pending',
'attempt': 1,
'created': datetime.datetime.now(),
'modified': datetime.datetime.now(),
'inputs': [
{
'type': 'scitran',
'location': '/',
'URI': 'TBD',
},
{
'type': 'scitran',
'location': '/script',
'URI': 'TBD',
'created': now,
'modified': now,
# We need all these keys to re-run this job if it fails.
'algorithm_id': algorithm_id,
'container_id': container_id,
'container_type': container_type,
'container_type': algorithm_id,
'filename': filename,
'filehash': filehash,
'attempt': attempt_n,
'formula': {
'inputs': [
{
'type': 'scitran',
'location': '/',
'URI': 'TBD',
},
{
'type': 'scitran',
'location': '/script',
'URI': 'TBD',
},
],
'accents': {
'cwd': "/script",
'command': [ 'TBD' ],
'environment': { },
},
],
'accents': {
'cwd': "/script",
'command': [ 'TBD' ],
'environment': { },
},
'outputs': [
{
'type': 'scitran',
'location': '/output',
'URI': 'TBD',
},
],
}
'outputs': [
{
'type': 'scitran',
'location': '/output',
'URI': 'TBD',
},
],
}
if previous_job_id is not None:
job['previous_job_id'] = previous_job_id
result = db.jobs.insert_one(job)
id = result.inserted_id
_id = result.inserted_id
log.info('Running %s as job %s to process %s %s' % (jobType, str(id), containerType, containerID))
return id
log.info('Running %s as job %s to process %s %s' % (algorithm_id, str(_id), container_type, container_id))
return _id
def serializeJob(job):
def serialize_job(job):
if job:
job['_id'] = str(job['_id'])
job['created'] = util.format_timestamp(job['created'])
......@@ -122,7 +185,7 @@ class Jobs(base.RequestHandler):
results = list(self.app.db.jobs.find())
for result in results:
result = serializeJob(result)
result = serialize_job(result)
return results
......@@ -133,6 +196,13 @@ class Jobs(base.RequestHandler):
return self.app.db.jobs.count()
def addTestJob(self):
"""Adds a harmless job for testing purposes. Intentionally equivalent return to queue_job."""
if not self.superuser_request:
self.abort(401, 'Request requires superuser')
return queue_job(self.app.db, 'dcm2nii', 'acquisition', '55bf861e6941f040cf8d6939')
def next(self):
"""
Atomically change a 'pending' job to 'running' and returns it. Updates timestamp.
......@@ -142,8 +212,6 @@ class Jobs(base.RequestHandler):
if not self.superuser_request:
self.abort(401, 'Request requires superuser')
# createJob(self.app.db, 'dcm2nii', 'session', '55a58db95f22580812902b9e')
# REVIEW: is this atomic?
# REVIEW: semantics are not documented as to this mutation's behaviour when filter matches no docs.
result = self.app.db.jobs.find_one_and_update(
......@@ -152,7 +220,7 @@ class Jobs(base.RequestHandler):
},
{ '$set': {
'state': 'running',
'modified': datetime.datetime.now()}
'modified': datetime.datetime.utcnow()}
},
sort=[('modified', -1)],
return_document=pymongo.collection.ReturnDocument.AFTER
......@@ -161,7 +229,7 @@ class Jobs(base.RequestHandler):
if result == None:
self.abort(400, 'No jobs to process')
return serializeJob(result)
return serialize_job(result)
class Job(base.RequestHandler):
......@@ -172,7 +240,7 @@ class Job(base.RequestHandler):
self.abort(401, 'Request requires superuser')
result = self.app.db.jobs.find_one({'_id': bson.ObjectId(_id)})
return serializeJob(result)
return serialize_job(result)
def put(self, _id):
"""
......@@ -186,17 +254,18 @@ class Job(base.RequestHandler):
mutation = self.request.json
job = self.app.db.jobs.find_one({'_id': bson.ObjectId(_id)})
print 'MUTATION HAS ' + len(mutation) + ' FIELDS'
if job is None:
self.abort(404, 'Job not found')
if job['state'] not in JOB_STATES_ALLOWED_MUTATE:
self.abort(404, 'Cannot mutate a job that is ' + job['state'] + '.')
if 'state' in mutation and not validTransition(job['state'], mutation['state']):
if 'state' in mutation and not valid_transition(job['state'], mutation['state']):
self.abort(404, 'Mutating job from ' + job['state'] + ' to ' + mutation['state'] + ' not allowed.')
# Any modification must be a timestamp update
mutation['timestamp'] = datetime.datetime.now()
mutation['modified'] = datetime.datetime.utcnow()
# REVIEW: is this atomic?
# As far as I can tell, update_one vs find_one_and_update differ only in what they return.
self.app.db.jobs.update_one(job, mutation)
self.app.db.jobs.update_one(job, {'$set': mutation})
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment