Skip to content
Snippets Groups Projects
Commit 33dcbd0e authored by Nathaniel Kofalt's avatar Nathaniel Kofalt
Browse files

Job api fixes

parent 9eda8999
No related branches found
No related tags found
No related merge requests found
......@@ -62,6 +62,9 @@ def spawn_jobs(db, containers, file):
File object that is used to spawn 0 or more jobs.
"""
if file['filetype'] != 'dicom':
return
# File information
filename = file['filename']
filehash = file['filehash']
......@@ -69,9 +72,9 @@ def spawn_jobs(db, containers, file):
# File container information
last = len(containers) - 1
container_type, container = containers[last]
container_id = container['_id']
container_id = str(container['_id'])
log.info('File ' + filename + 'is in a ' + container_type + ' with id ' + str(container_id) + ' and hash ' + filehash)
log.info('File ' + filename + 'is in a ' + container_type + ' with id ' + container_id + ' and hash ' + filehash)
# Spawn rules currently do not look at container hierarchy, and only care about a single file.
# Further, one algorithm is unconditionally triggered for each dirty file.
......@@ -90,7 +93,7 @@ def queue_job(db, algorithm_id, container_type, container_id, filename, filehash
algorithm_id: string
Human-friendly unique name of the algorithm
container_type: string
Type of container ('acquisition', 'session', etc)
Type of container ('acquisitions', 'sessions', etc) that matches the URL route.
container_id: string
ID of the container ('2', etc)
filename: string
......@@ -116,9 +119,8 @@ def queue_job(db, algorithm_id, container_type, container_id, filename, filehash
# We need all these keys to re-run this job if it fails.
'algorithm_id': algorithm_id,
'container_id': container_id,
'container_type': container_type,
'container_type': algorithm_id,
'container_id': container_id,
'filename': filename,
'filehash': filehash,
'attempt': attempt_n,
......@@ -256,6 +258,12 @@ class Job(base.RequestHandler):
# Any modification must be a timestamp update
mutation['modified'] = datetime.datetime.utcnow()
# REVIEW: is this atomic?
# As far as I can tell, update_one vs find_one_and_update differ only in what they return.
self.app.db.jobs.update_one(job, {'$set': mutation})
# Create an object with all the fields that must not have changed concurrently.
jobQuery = {
'_id': job['_id'],
'state': job['state'],
}
result = self.app.db.jobs.update_one(jobQuery, {'$set': mutation})
if result.modified_count != 1:
self.abort(500, 'Job modification not saved')
......@@ -141,7 +141,6 @@ else:
log.warning('scitran central unreachable, purging all remotes info')
centralclient.clean_remotes(application.db, args.site_id)
@uwsgidecorators.timer(30)
def job_creation(signum):
for c_type in ['projects', 'collections', 'sessions', 'acquisitions']:
for c in application.db[c_type].find({'files.dirty': True}, ['files']):
......@@ -186,3 +185,12 @@ else:
log.info('respawned job %s as %s (attempt %d)' % (j['_id'], job_id, j['attempt']+1))
else:
log.info('permanently failed job %s (after %d attempts)' % (j['_id'], j['attempt']))
# Run job creation immediately on start, then every 30 seconds thereafter.
# This saves sysadmin irritation waiting for the engine, or an initial job load conflicting with timed runs.
job_creation(None)
@uwsgidecorators.timer(30)
def job_creation_timer(signum):
job_creation(signum)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment