Skip to content
Snippets Groups Projects
Commit 8e0806d8 authored by Nathaniel Kofalt's avatar Nathaniel Kofalt
Browse files

Merge pull request #52 from scitran/job-generate-late

Generate job information on-the-fly
parents 15a97b05 aee0b387
No related branches found
No related tags found
No related merge requests found
......@@ -117,41 +117,16 @@ def queue_job(db, algorithm_id, container_type, container_id, filename, filehash
'created': now,
'modified': now,
# We need all these keys to re-run this job if it fails.
'algorithm_id': algorithm_id,
'container_type': container_type,
'container_id': container_id,
'filename': filename,
'filehash': filehash,
'attempt': attempt_n,
'formula': {
'inputs': [
{
'type': 'file',
'uri': '/tmp/deja/flak/0/dcm_convert-0.1.1.tar',
'location': '/',
},
{
'type': 'scitran',
'uri': '/' + container_type + '/' + container_id + '/file/' + filename,
'location': '/input/' + filename,
}
],
'transform': {
'command': ['bash', '-c', 'mkdir /output; /scripts/run /input/' + filename + ' /output/'],
'env': { },
'dir': "/",
},
# Everything required to generate a job formula.
'intent': {
'algorithm_id': algorithm_id,
'container_type': container_type,
'container_id': container_id,
'filename': filename,
'filehash': filehash,
},
'outputs': [
{
'type': 'scitran',
'uri': '/' + container_type + '/' + container_id + '/file/',
'location': '/output',
},
],
}
'attempt': attempt_n,
}
if previous_job_id is not None:
......@@ -169,13 +144,59 @@ def retry_job(db, j):
TODO: make max attempts configurable
"""
i = j['intent']
if j['attempt'] < 3:
job_id = queue_job(db, j['algorithm_id'], j['container_type'], j['container_id'], j['filename'], j['filehash'], j['attempt']+1, j['_id'])
job_id = queue_job(db, i['algorithm_id'], i['container_type'], i['container_id'], i['filename'], i['filehash'], j['attempt']+1, j['_id'])
log.info('respawned job %s as %s (attempt %d)' % (j['_id'], job_id, j['attempt']+1))
else:
log.info('permanently failed job %s (after %d attempts)' % (j['_id'], j['attempt']))
def generate_formula(i):
"""
Given an intent, generates a formula to execute a job.
Parameters
----------
i: map
A job's intent that holds everything needed to generate a formula.
"""
if i['algorithm_id'] not in ALGORITHMS:
raise Exception('Usupported algorithm ' + algorithm_id)
# Currently hard-coded for a single algorithm: dcm2nii
f = {
'inputs': [
{
'type': 'file',
'uri': '/tmp/deja/flak/0/dcm_convert-0.1.1.tar',
'location': '/',
},
{
'type': 'scitran',
'uri': '/' + i['container_type'] + '/' + i['container_id'] + '/file/' + i['filename'],
'location': '/input/' + i['filename'],
}
],
'transform': {
'command': ['bash', '-c', 'mkdir /output; /scripts/run /input/' + i['filename'] + ' /output/'],
'env': { },
'dir': "/",
},
'outputs': [
{
'type': 'scitran',
'uri': '/' + i['container_type'] + '/' + i['container_id'] + '/file/',
'location': '/output',
},
],
}
return f
class Jobs(base.RequestHandler):
"""Provide /jobs API routes."""
......@@ -214,8 +235,7 @@ class Jobs(base.RequestHandler):
if not self.superuser_request:
self.abort(401, 'Request requires superuser')
# REVIEW: is this atomic?
# REVIEW: semantics are not documented as to this mutation's behaviour when filter matches no docs.
# First, atomically mark document as running.
result = self.app.db.jobs.find_one_and_update(
{
'state': 'pending'
......@@ -228,9 +248,23 @@ class Jobs(base.RequestHandler):
return_document=pymongo.collection.ReturnDocument.AFTER
)
if result == None:
if result is None:
self.abort(400, 'No jobs to process')
# Second, update document to store formula request.
result = self.app.db.jobs.find_one_and_update(
{
'_id': result['_id']
},
{ '$set': {
'request': generate_formula(result['intent'])}
},
return_document=pymongo.collection.ReturnDocument.AFTER
)
if result is None:
self.abort(500, 'Marked job as running but could not generate and save formula')
return result
class Job(base.RequestHandler):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment