-
Kevin S. Hahn authoredKevin S. Hahn authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
jobs.py 4.03 KiB
# @author: Kevin S Hahn
"""
API request handlers for process-job-handling.
represents the /nimsapi/jobs route
"""
import logging
import datetime
log = logging.getLogger('nimsapi.jobs')
import base
# TODO: what should this whitelist contain? protocol + FQDN?
# ex. https://coronal.stanford.edu
PROCESSOR_WHITELIST = [
'dockerhost',
]
JOB_STATES = [
'pending', # created but not started
'queued', # job claimed by a processor
'running', # job running on a processor
'done', # job completed successfully
'failed', # some error occurred,
'paused', # job paused. can't think when this would be useful...
]
# Jobs must now how they affect the various components of a file description
# some "special" case things will reset state from 'orig' to 'pending'
# but the usual case will be to append an item to the state list.
# TODO: create job function should live here
# where it can be editted with the route that consume and modify the jobs
# GET /jobs full list of jobs, allow specifiers, status=
# POST /jobs creates a new job. this will be used by webapp to add new jobs
# GET /jobs/<_id> get information about one job
# PUT /jobs/<_id> update informabout about one job
# GET /jobs/next, special route to get the 'next job'
class Jobs(base.RequestHandler):
"""Provide /jobs API routes."""
def get(self):
"""
Return one Job that needs processing.
TODO: allow querying for group
TODO: allow querying for project
TODO: allow querying by other meta data. can this be generalized?
"""
# TODO: auth
return list(self.app.db.jobs.find())
def count(self):
"""Return the total number of jobs."""
# no auth?
return self.app.db.jobs.count()
def counts(self):
"""Return more information about the jobs."""
counts = {
'total': self.app.db.jobs.count(),
'failed': self.app.db.jobs.find({'status': 'failed'}).count(),
'pending': self.app.db.jobs.find({'status': 'pending'}).count(),
'done': self.app.db.jobs.find({'status': 'done'}).count(),
}
return counts
def next(self):
"""Return the next job in the queue that matches the query parameters."""
# TODO: add ability to query on things like psd type or psd name
try:
query_params = self.request.json
except ValueError as e:
self.abort(400, str(e))
query = {'status': 'pending'}
try:
query_params = self.request.json
except ValueError as e:
self.abort(400, str(e))
project_query = query_params.get('project')
group_query = query_params.get('group')
query = {'status': 'pending'}
if project_query:
query.update({'project': project_query})
if group_query:
query.update({'group': group_query})
# TODO: how to guarantee the 'oldest' jobs pending jobs are given out first
job_spec = self.app.db.jobs.find_and_modify(
query,
{'$set': {'status': 'queued', 'modified': datetime.datetime.now()}},
sort=[('modified', -1)],
new=True
)
return job_spec
class Job(base.RequestHandler):
"""Provides /Jobs/<jid> routes."""
# TODO flesh out the job schema
json_schema = {
'$schema': 'http://json-schema.org/draft-04/schema#',
'title': 'User',
'type': 'object',
'properties': {
'_id': {
'title': 'Job ID',
'type': 'string',
},
},
'required': ['_id'],
'additionalProperties': True,
}
def get(self, _id):
return self.app.db.jobs.find_one({'_id': int(_id)})
def put(self, _id):
"""Update a single job."""
payload = self.request.json
# TODO: validate the json before updating the db
self.app.db.jobs.update({'_id': int(_id)}, {'$set': {'status': payload.get('status'), 'activity': payload.get('activity')}})