Skip to content
Snippets Groups Projects
Commit b0fa99e1 authored by Nathaniel Kofalt's avatar Nathaniel Kofalt
Browse files

Generate formula late

parent 15a97b05
No related branches found
No related tags found
No related merge requests found
......@@ -117,41 +117,16 @@ def queue_job(db, algorithm_id, container_type, container_id, filename, filehash
'created': now,
'modified': now,
# We need all these keys to re-run this job if it fails.
'algorithm_id': algorithm_id,
'container_type': container_type,
'container_id': container_id,
'filename': filename,
'filehash': filehash,
'attempt': attempt_n,
'formula': {
'inputs': [
{
'type': 'file',
'uri': '/tmp/deja/flak/0/dcm_convert-0.1.1.tar',
'location': '/',
},
{
'type': 'scitran',
'uri': '/' + container_type + '/' + container_id + '/file/' + filename,
'location': '/input/' + filename,
}
],
'transform': {
'command': ['bash', '-c', 'mkdir /output; /scripts/run /input/' + filename + ' /output/'],
'env': { },
'dir': "/",
},
# Everything required to generate a job formula.
'intent': {
'algorithm_id': algorithm_id,
'container_type': container_type,
'container_id': container_id,
'filename': filename,
'filehash': filehash,
},
'outputs': [
{
'type': 'scitran',
'uri': '/' + container_type + '/' + container_id + '/file/',
'location': '/output',
},
],
}
'attempt': attempt_n,
}
if previous_job_id is not None:
......@@ -169,13 +144,59 @@ def retry_job(db, j):
TODO: make max attempts configurable
"""
i = j['intent']
if j['attempt'] < 3:
job_id = queue_job(db, j['algorithm_id'], j['container_type'], j['container_id'], j['filename'], j['filehash'], j['attempt']+1, j['_id'])
job_id = queue_job(db, i['algorithm_id'], i['container_type'], i['container_id'], i['filename'], i['filehash'], j['attempt']+1, j['_id'])
log.info('respawned job %s as %s (attempt %d)' % (j['_id'], job_id, j['attempt']+1))
else:
log.info('permanently failed job %s (after %d attempts)' % (j['_id'], j['attempt']))
def generate_formula(i):
"""
Given an intent, generates a formula to execute a job.
Parameters
----------
i: map
A job's intent that holds everything needed to generate a formula.
"""
if i['algorithm_id'] not in ALGORITHMS:
raise Exception('Usupported algorithm ' + algorithm_id)
# Currently hard-coded for a single algorithm: dcm2nii
f = {
'inputs': [
{
'type': 'file',
'uri': '/tmp/deja/flak/0/dcm_convert-0.1.1.tar',
'location': '/',
},
{
'type': 'scitran',
'uri': '/' + i['container_type'] + '/' + i['container_id'] + '/file/' + i['filename'],
'location': '/input/' + i['filename'],
}
],
'transform': {
'command': ['bash', '-c', 'mkdir /output; /scripts/run /input/' + i['filename'] + ' /output/'],
'env': { },
'dir': "/",
},
'outputs': [
{
'type': 'scitran',
'uri': '/' + i['container_type'] + '/' + i['container_id'] + '/file/',
'location': '/output',
},
],
}
return f
class Jobs(base.RequestHandler):
"""Provide /jobs API routes."""
......@@ -214,8 +235,7 @@ class Jobs(base.RequestHandler):
if not self.superuser_request:
self.abort(401, 'Request requires superuser')
# REVIEW: is this atomic?
# REVIEW: semantics are not documented as to this mutation's behaviour when filter matches no docs.
# First, atomically mark document as running.
result = self.app.db.jobs.find_one_and_update(
{
'state': 'pending'
......@@ -231,6 +251,20 @@ class Jobs(base.RequestHandler):
if result == None:
self.abort(400, 'No jobs to process')
# Second, update document to store formula request.
result = self.app.db.jobs.find_one_and_update(
{
'_id': result['_id']
},
{ '$set': {
'request': generate_formula(result['intent'])}
},
return_document=pymongo.collection.ReturnDocument.AFTER
)
if result == None:
self.abort(500, 'Marked job as running but could not generate and save formula')
return result
class Job(base.RequestHandler):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment