diff --git a/api/jobs/handlers.py b/api/jobs/handlers.py index f35a9e5c5b899ef102ec7608b3c737bd56ce9f0c..f06caf606dae9faa70993c9e37694875b75e2e8e 100644 --- a/api/jobs/handlers.py +++ b/api/jobs/handlers.py @@ -32,9 +32,7 @@ class JobsHandler(base.RequestHandler): if not self.superuser_request: self.abort(403, 'Request requires superuser') - results = list(config.db.jobs.find()) - - return results + return list(config.db.jobs.find()) def add(self): """ @@ -88,23 +86,8 @@ class JobsHandler(base.RequestHandler): if not self.superuser_request: self.abort(403, 'Request requires superuser') - while True: - doc = config.db.jobs.find_one_and_update( - { - 'state': 'running', - 'modified': {'$lt': datetime.datetime.utcnow() - datetime.timedelta(seconds=100)}, - }, - { - '$set': { - 'state': 'failed', - }, - }, - ) - if doc is None: - break - else: - j = Job.load(doc) - Queue.retry(j) + count = Queue.scan_for_orphans() + return { 'orphaned': count } class JobHandler(base.RequestHandler): diff --git a/api/jobs/queue.py b/api/jobs/queue.py index f480aadfced8be30bfe1fc07b78b23baaf1b69d5..328bf22595d6e705258906a27d4967562a9e0702 100644 --- a/api/jobs/queue.py +++ b/api/jobs/queue.py @@ -164,3 +164,33 @@ class Queue(object): 'by-tag': by_tag, 'permafailed': permafailed } + + @staticmethod + def scan_for_orphans(): + """ + Scan the queue for orphaned jobs, mark them as failed, and possibly retry them. + Should be called periodically. + """ + + orphaned = 0 + + while True: + doc = config.db.jobs.find_one_and_update( + { + 'state': 'running', + 'modified': {'$lt': datetime.datetime.utcnow() - datetime.timedelta(seconds=100)}, + }, + { + '$set': { + 'state': 'failed', }, + }, + ) + + if doc is None: + break + else: + orphaned += 1 + j = Job.load(doc) + Queue.retry(j) + + return orphaned