diff --git a/api/jobs/handlers.py b/api/jobs/handlers.py index 3c058c97083e2a4b7f34827b55f787856fcd9d0a..c45883d8d947eaf0d568c628a1edffac1c84b1a7 100644 --- a/api/jobs/handlers.py +++ b/api/jobs/handlers.py @@ -269,11 +269,12 @@ class JobsHandler(base.RequestHandler): @require_admin def next(self): + peek = self.is_true('peek') tags = self.request.GET.getall('tags') if len(tags) <= 0: tags = None - job = Queue.start_job(tags=tags) + job = Queue.start_job(tags=tags, peek=peek) if job is None: raise InputValidationException('No jobs to process') diff --git a/api/jobs/jobs.py b/api/jobs/jobs.py index 7c408d0c411c602c84276bb90f6d12741fd7fa9a..2769abcd96f73244aaeca6360881ac746fcfb9b6 100644 --- a/api/jobs/jobs.py +++ b/api/jobs/jobs.py @@ -19,7 +19,7 @@ class Job(object): def __init__(self, gear_id, inputs, destination=None, tags=None, attempt=1, previous_job_id=None, created=None, modified=None, state='pending', request=None, - id_=None, config_=None, now=False, origin=None, + id_=None, config_=None, origin=None, saved_files=None, produced_metadata=None, batch=None, failed_output_accepted=False, profile=None): """ @@ -102,7 +102,6 @@ class Job(object): self.request = request self.id_ = id_ self.config = config_ - self.now = now self.origin = origin self.saved_files = saved_files self.produced_metadata = produced_metadata @@ -163,7 +162,6 @@ class Job(object): request=d.get('request'), id_=d['_id'], config_=d.get('config'), - now=d.get('now', False), origin=d.get('origin'), saved_files=d.get('saved_files'), produced_metadata=d.get('produced_metadata'), @@ -207,8 +205,6 @@ class Job(object): d.pop('previous_job_id') if d['request'] is None: d.pop('request') - if d['now'] is False: - d.pop('now') if d['failed_output_accepted'] is False: d.pop('failed_output_accepted') diff --git a/api/jobs/queue.py b/api/jobs/queue.py index 89ea8cc10bbb473473df474f75e4eff9e4fafb64..3eed0b25eba5d782bf6f5ce20ad1eaee68f3144a 100644 --- a/api/jobs/queue.py +++ b/api/jobs/queue.py @@ -128,7 +128,6 @@ class Queue(object): return new_id - @staticmethod def enqueue_job(job_map, origin, perm_check_uid=None): """ @@ -174,7 +173,6 @@ class Queue(object): tags = job_map.get('tags', []) attempt_n = job_map.get('attempt_n', 1) previous_job_id = job_map.get('previous_job_id', None) - now_flag = job_map.get('now', False) # A flag to increase job priority batch = job_map.get('batch', None) # A batch id if this job is part of a batch run # Add destination container, or select one @@ -193,7 +191,6 @@ class Queue(object): for x in inputs: inputs[x].check_access(perm_check_uid, 'ro') destination.check_access(perm_check_uid, 'rw') - now_flag = False # Only superuser requests are allowed to set "now" flag # Config options are stored on the job object under the "config" key config_ = { @@ -252,79 +249,79 @@ class Queue(object): if gear_name not in tags: tags.append(gear_name) - job = Job(str(gear['_id']), inputs, destination=destination, tags=tags, config_=config_, now=now_flag, attempt=attempt_n, previous_job_id=previous_job_id, origin=origin, batch=batch) + job = Job(str(gear['_id']), inputs, destination=destination, tags=tags, config_=config_, attempt=attempt_n, previous_job_id=previous_job_id, origin=origin, batch=batch) job.insert() return job @staticmethod - def start_job(tags=None): + def start_job(tags=None, peek=False): """ Atomically change a 'pending' job to 'running' and returns it. Updates timestamp. - Will return None if there are no jobs to offer. Searches for jobs marked "now" - most recently first, followed by unmarked jobs in FIFO order if none are found. + Will return None if there are no jobs to offer. Searches for jobs in FIFO order. Potential jobs must match at least one tag, if provided. """ - query = { 'state': 'pending', 'now': True } - + query = { 'state': 'pending' } if tags is not None: query['tags'] = {'$in': tags } - # First look for jobs marked "now" sorted by modified most recently - # Mark as running if found + modification = { '$set': { + 'state': 'running', + 'modified': datetime.datetime.utcnow() + }} + + if peek: + # placeholder noop + modification = {'$setOnInsert': {'1': 1}} + + # Search ordering by FIFO result = config.db.jobs.find_one_and_update( query, - - { '$set': { - 'state': 'running', - 'modified': datetime.datetime.utcnow()} - }, - sort=[('modified', -1)], + modification, + sort=[('modified', 1)], return_document=pymongo.collection.ReturnDocument.AFTER ) - # If no jobs marked "now" are found, search again ordering by FIFO - if result is None: - query['now'] = {'$ne': True} - result = config.db.jobs.find_one_and_update( - query, - - { '$set': { - 'state': 'running', - 'modified': datetime.datetime.utcnow()} - }, - sort=[('modified', 1)], - return_document=pymongo.collection.ReturnDocument.AFTER - ) - if result is None: return None job = Job.load(result) - # Return if there is a job request already, else create one + if peek: + gear = get_gear(job.gear_id) + for key in gear['gear']['inputs']: + if gear['gear']['inputs'][key] == 'api-key': + # API-key gears cannot be peeked + return None + + # Return if there is a job request already (probably prefetch) if job.request is not None: log.info('Job ' + job.id_ + ' already has a request, so not generating') return job - else: - # Generate, save, and return a job request. - request = job.generate_request(get_gear(job.gear_id)) - result = config.db.jobs.find_one_and_update( - { - '_id': bson.ObjectId(job.id_) - }, - { '$set': { - 'request': request } - }, - return_document=pymongo.collection.ReturnDocument.AFTER - ) + # Create a new request formula + request = job.generate_request(get_gear(job.gear_id)) + + if peek: + job.request = request + return job - if result is None: - raise Exception('Marked job as running but could not generate and save formula') + # Save and return + result = config.db.jobs.find_one_and_update( + { + '_id': bson.ObjectId(job.id_) + }, + { '$set': { + 'request': request } + }, + return_document=pymongo.collection.ReturnDocument.AFTER + ) + + if result is None: + raise Exception('Marked job as running but could not generate and save formula') - return Job.load(result) + return Job.load(result) @staticmethod def search(containers, states=None, tags=None):