Skip to content
Snippets Groups Projects
Unverified Commit debbf082 authored by Nathaniel Kofalt's avatar Nathaniel Kofalt Committed by GitHub
Browse files

Merge pull request #1092 from scitran/peek

Add queue peek support
parents 7780bafd 75324f19
No related branches found
No related tags found
No related merge requests found
......@@ -269,11 +269,12 @@ class JobsHandler(base.RequestHandler):
@require_admin
def next(self):
peek = self.is_true('peek')
tags = self.request.GET.getall('tags')
if len(tags) <= 0:
tags = None
job = Queue.start_job(tags=tags)
job = Queue.start_job(tags=tags, peek=peek)
if job is None:
raise InputValidationException('No jobs to process')
......
......@@ -19,7 +19,7 @@ class Job(object):
def __init__(self, gear_id, inputs, destination=None, tags=None,
attempt=1, previous_job_id=None, created=None,
modified=None, state='pending', request=None,
id_=None, config_=None, now=False, origin=None,
id_=None, config_=None, origin=None,
saved_files=None, produced_metadata=None, batch=None,
failed_output_accepted=False, profile=None):
"""
......@@ -102,7 +102,6 @@ class Job(object):
self.request = request
self.id_ = id_
self.config = config_
self.now = now
self.origin = origin
self.saved_files = saved_files
self.produced_metadata = produced_metadata
......@@ -163,7 +162,6 @@ class Job(object):
request=d.get('request'),
id_=d['_id'],
config_=d.get('config'),
now=d.get('now', False),
origin=d.get('origin'),
saved_files=d.get('saved_files'),
produced_metadata=d.get('produced_metadata'),
......@@ -207,8 +205,6 @@ class Job(object):
d.pop('previous_job_id')
if d['request'] is None:
d.pop('request')
if d['now'] is False:
d.pop('now')
if d['failed_output_accepted'] is False:
d.pop('failed_output_accepted')
......
......@@ -128,7 +128,6 @@ class Queue(object):
return new_id
@staticmethod
def enqueue_job(job_map, origin, perm_check_uid=None):
"""
......@@ -174,7 +173,6 @@ class Queue(object):
tags = job_map.get('tags', [])
attempt_n = job_map.get('attempt_n', 1)
previous_job_id = job_map.get('previous_job_id', None)
now_flag = job_map.get('now', False) # A flag to increase job priority
batch = job_map.get('batch', None) # A batch id if this job is part of a batch run
# Add destination container, or select one
......@@ -193,7 +191,6 @@ class Queue(object):
for x in inputs:
inputs[x].check_access(perm_check_uid, 'ro')
destination.check_access(perm_check_uid, 'rw')
now_flag = False # Only superuser requests are allowed to set "now" flag
# Config options are stored on the job object under the "config" key
config_ = {
......@@ -252,79 +249,79 @@ class Queue(object):
if gear_name not in tags:
tags.append(gear_name)
job = Job(str(gear['_id']), inputs, destination=destination, tags=tags, config_=config_, now=now_flag, attempt=attempt_n, previous_job_id=previous_job_id, origin=origin, batch=batch)
job = Job(str(gear['_id']), inputs, destination=destination, tags=tags, config_=config_, attempt=attempt_n, previous_job_id=previous_job_id, origin=origin, batch=batch)
job.insert()
return job
@staticmethod
def start_job(tags=None):
def start_job(tags=None, peek=False):
"""
Atomically change a 'pending' job to 'running' and returns it. Updates timestamp.
Will return None if there are no jobs to offer. Searches for jobs marked "now"
most recently first, followed by unmarked jobs in FIFO order if none are found.
Will return None if there are no jobs to offer. Searches for jobs in FIFO order.
Potential jobs must match at least one tag, if provided.
"""
query = { 'state': 'pending', 'now': True }
query = { 'state': 'pending' }
if tags is not None:
query['tags'] = {'$in': tags }
# First look for jobs marked "now" sorted by modified most recently
# Mark as running if found
modification = { '$set': {
'state': 'running',
'modified': datetime.datetime.utcnow()
}}
if peek:
# placeholder noop
modification = {'$setOnInsert': {'1': 1}}
# Search ordering by FIFO
result = config.db.jobs.find_one_and_update(
query,
{ '$set': {
'state': 'running',
'modified': datetime.datetime.utcnow()}
},
sort=[('modified', -1)],
modification,
sort=[('modified', 1)],
return_document=pymongo.collection.ReturnDocument.AFTER
)
# If no jobs marked "now" are found, search again ordering by FIFO
if result is None:
query['now'] = {'$ne': True}
result = config.db.jobs.find_one_and_update(
query,
{ '$set': {
'state': 'running',
'modified': datetime.datetime.utcnow()}
},
sort=[('modified', 1)],
return_document=pymongo.collection.ReturnDocument.AFTER
)
if result is None:
return None
job = Job.load(result)
# Return if there is a job request already, else create one
if peek:
gear = get_gear(job.gear_id)
for key in gear['gear']['inputs']:
if gear['gear']['inputs'][key] == 'api-key':
# API-key gears cannot be peeked
return None
# Return if there is a job request already (probably prefetch)
if job.request is not None:
log.info('Job ' + job.id_ + ' already has a request, so not generating')
return job
else:
# Generate, save, and return a job request.
request = job.generate_request(get_gear(job.gear_id))
result = config.db.jobs.find_one_and_update(
{
'_id': bson.ObjectId(job.id_)
},
{ '$set': {
'request': request }
},
return_document=pymongo.collection.ReturnDocument.AFTER
)
# Create a new request formula
request = job.generate_request(get_gear(job.gear_id))
if peek:
job.request = request
return job
if result is None:
raise Exception('Marked job as running but could not generate and save formula')
# Save and return
result = config.db.jobs.find_one_and_update(
{
'_id': bson.ObjectId(job.id_)
},
{ '$set': {
'request': request }
},
return_document=pymongo.collection.ReturnDocument.AFTER
)
if result is None:
raise Exception('Marked job as running but could not generate and save formula')
return Job.load(result)
return Job.load(result)
@staticmethod
def search(containers, states=None, tags=None):
......
......@@ -153,6 +153,11 @@ def test_jobs(data_builder, default_payload, as_public, as_user, as_admin, as_ro
r = as_root.get('/jobs/next', params={'tags': 'fake-tag'})
assert r.status_code == 400
# get next job with peek
r = as_root.get('/jobs/next', params={'tags': 'test-tag', 'peek': True})
assert r.ok
next_job_id = r.json()['id']
# get next job
r = as_root.get('/jobs/next', params={'tags': 'test-tag'})
assert r.ok
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment