Skip to content
Snippets Groups Projects
Unverified Commit 58866be2 authored by Nathaniel Kofalt's avatar Nathaniel Kofalt Committed by GitHub
Browse files

Merge pull request #1062 from scitran/jobcomplete-fixup

Menagerie of job handler improvements
parents 4666fd19 adf5b5ba
No related branches found
No related tags found
No related merge requests found
......@@ -246,7 +246,7 @@ def initialize_db():
create_or_recreate_ttl_index('authtokens', 'timestamp', 2592000)
create_or_recreate_ttl_index('uploads', 'timestamp', 60)
create_or_recreate_ttl_index('downloads', 'timestamp', 60)
create_or_recreate_ttl_index('job_tickets', 'timestamp', 300)
create_or_recreate_ttl_index('job_tickets', 'timestamp', 3600) # IMPORTANT: this controls job orphan logic. Ref queue.py
now = datetime.datetime.utcnow()
db.groups.update_one({'_id': 'unknown'}, {'$setOnInsert': { 'created': now, 'modified': now, 'label': 'Unknown', 'permissions': []}}, upsert=True)
......
......@@ -8,24 +8,24 @@ import StringIO
from jsonschema import ValidationError
from urlparse import urlparse
from . import batch
from .. import config
from .. import upload
from .. import util
from ..auth import require_drone, require_login, has_access
from ..auth import require_drone, require_login, require_admin, has_access
from ..auth.apikeys import JobApiKey
from ..dao import hierarchy
from ..dao.containerstorage import ProjectStorage, SessionStorage, AcquisitionStorage
from ..dao.containerutil import ContainerReference, pluralize
from ..util import humanize_validation_error, set_for_download
from ..validators import validate_data, verify_payload_exists
from ..web import base
from ..web.encoder import pseudo_consistent_json_encode
from ..web.errors import APIPermissionException, APINotFoundException, InputValidationException
from ..web.request import AccessType
from .. import config
from . import batch
from ..validators import validate_data, verify_payload_exists
from ..auth.apikeys import JobApiKey
from .gears import validate_gear_config, get_gears, get_gear, get_invocation_schema, remove_gear, upsert_gear, suggest_container, get_gear_by_name, check_for_gear_insertion
from .jobs import Job, Logs
from .jobs import Job, JobTicket, Logs
from .batch import check_state, update
from .queue import Queue
from .rules import create_jobs, validate_regexes
......@@ -35,12 +35,10 @@ class GearsHandler(base.RequestHandler):
"""Provide /gears API routes."""
@require_login
def get(self):
"""List all gears."""
if self.public_request:
self.abort(403, 'Request requires login')
gears = get_gears()
filters = self.request.GET.getall('filter')
......@@ -49,13 +47,9 @@ class GearsHandler(base.RequestHandler):
return gears
@require_login
def check(self):
"""
Check if a gear upload is likely to succeed.
"""
if self.public_request:
self.abort(403, 'Request requires login')
"""Check if a gear upload is likely to succeed."""
check_for_gear_insertion(self.request.json)
return None
......@@ -63,27 +57,16 @@ class GearsHandler(base.RequestHandler):
class GearHandler(base.RequestHandler):
"""Provide /gears/x API routes."""
@require_login
def get(self, _id):
"""Detail a gear."""
if self.public_request:
self.abort(403, 'Request requires login')
return get_gear(_id)
@require_login
def get_invocation(self, _id):
return get_invocation_schema(get_gear(_id))
if self.public_request:
self.abort(403, 'Request requires login')
gear = get_gear(_id)
return get_invocation_schema(gear)
@require_login
def suggest(self, _id, cont_name, cid):
if self.public_request:
self.abort(403, 'Request requires login')
cr = ContainerReference(cont_name, cid)
if not self.superuser_request:
cr.check_access(self.uid, 'ro')
......@@ -91,13 +74,11 @@ class GearHandler(base.RequestHandler):
gear = get_gear(_id)
return suggest_container(gear, cont_name+'s', cid)
@require_admin
def upload(self): # pragma: no cover
"""Upload new gear tarball file"""
if not self.user_is_admin:
self.abort(403, 'Request requires admin')
r = upload.process_upload(self.request, upload.Strategy.gear, container_type='gear', origin=self.origin, metadata=self.request.headers.get('metadata'))
gear_id = upsert_gear(r[1])
config.db.gears.update_one({'_id': gear_id}, {'$set': {
'exchange.rootfs-url': '/api/gears/temp/' + str(gear_id)}
})
......@@ -110,40 +91,26 @@ class GearHandler(base.RequestHandler):
gear = get_gear(dl_id)
hash_ = gear['exchange']['rootfs-hash']
filepath = os.path.join(config.get_item('persistent', 'data_path'), util.path_from_hash('v0-' + hash_.replace(':', '-')))
self.response.app_iter = open(filepath, 'rb')
# self.response.headers['Content-Length'] = str(gear['size']) # must be set after setting app_iter
self.response.headers['Content-Type'] = 'application/octet-stream'
self.response.headers['Content-Disposition'] = 'attachment; filename="gear.tar"'
stream = open(filepath, 'rb')
set_for_download(self.response, stream=stream, filename='gear.tar')
@require_admin
def post(self, _id):
"""Upsert an entire gear document."""
if not self.superuser_request and not self.user_is_admin:
self.abort(403, 'Request requires admin')
doc = self.request.json
payload = self.request.json
if _id != doc.get('gear', {}).get('name', ''):
if _id != payload.get('gear', {}).get('name', ''):
self.abort(400, 'Name key must be present and match URL')
try:
result = upsert_gear(self.request.json)
result = upsert_gear(payload)
return { '_id': str(result) }
except ValidationError as err:
key = "none"
if len(err.relative_path) > 0:
key = err.relative_path[0]
message = err.message.replace("u'", "'")
raise InputValidationException('Gear manifest does not match schema on key ' + key + ': ' + message)
raise InputValidationException(humanize_validation_error(err))
@require_admin
def delete(self, _id):
"""Delete a gear. Generally not recommended."""
if not self.superuser_request and not self.user_is_admin:
self.abort(403, 'Request requires admin')
return remove_gear(_id)
class RulesHandler(base.RequestHandler):
......@@ -164,7 +131,6 @@ class RulesHandler(base.RequestHandler):
return config.db.project_rules.find({'project_id' : cid}, projection=projection)
@verify_payload_exists
def post(self, cid):
"""Add a rule"""
......@@ -177,18 +143,18 @@ class RulesHandler(base.RequestHandler):
if not self.user_is_admin and not has_access(self.uid, project, 'admin'):
raise APIPermissionException('Adding rules to a project can only be done by a project admin.')
doc = self.request.json
payload = self.request.json
validate_data(doc, 'rule-new.json', 'input', 'POST', optional=True)
validate_regexes(doc)
validate_data(payload, 'rule-new.json', 'input', 'POST', optional=True)
validate_regexes(payload)
try:
get_gear_by_name(doc['alg'])
get_gear_by_name(payload['alg'])
except APINotFoundException:
self.abort(400, 'Cannot find gear for alg {}, alg not valid'.format(doc['alg']))
self.abort(400, 'Cannot find gear for alg {}, alg not valid'.format(payload['alg']))
doc['project_id'] = cid
payload['project_id'] = cid
result = config.db.project_rules.insert_one(doc)
result = config.db.project_rules.insert_one(payload)
return { '_id': result.inserted_id }
class RuleHandler(base.RequestHandler):
......@@ -243,9 +209,6 @@ class RuleHandler(base.RequestHandler):
doc.update(updates)
config.db.project_rules.replace_one({'_id': bson.ObjectId(rid)}, doc)
return
def delete(self, cid, rid):
"""Remove a rule"""
......@@ -261,14 +224,12 @@ class RuleHandler(base.RequestHandler):
result = config.db.project_rules.delete_one({'project_id' : cid, '_id': bson.ObjectId(rid)})
if result.deleted_count != 1:
raise APINotFoundException('Rule not found.')
return
class JobsHandler(base.RequestHandler):
"""Provide /jobs API routes."""
@require_admin
def get(self): # pragma: no cover (no route)
"""List all jobs."""
if not self.superuser_request and not self.user_is_admin:
self.abort(403, 'Request requires admin')
return list(config.db.jobs.find())
def add(self):
......@@ -279,14 +240,11 @@ class JobsHandler(base.RequestHandler):
if not self.superuser_request:
uid = self.uid
job = Queue.enqueue_job(payload,self.origin, perm_check_uid=uid)
job = Queue.enqueue_job(payload, self.origin, perm_check_uid=uid)
return { '_id': job.id_ }
@require_admin
def stats(self):
if not self.superuser_request and not self.user_is_admin:
self.abort(403, 'Request requires admin')
all_flag = self.is_true('all')
unique = self.is_true('unique')
tags = self.request.GET.getall('tags')
......@@ -301,22 +259,16 @@ class JobsHandler(base.RequestHandler):
return Queue.get_statistics(tags=tags, last=last, unique=unique, all_flag=all_flag)
@require_admin
def pending(self):
if not self.superuser_request and not self.user_is_admin:
self.abort(403, 'Request requires admin')
tags = self.request.GET.getall('tags')
if len(tags) == 1:
tags = tags[0].split(',')
return Queue.get_pending(tags=tags)
@require_admin
def next(self):
if not self.superuser_request and not self.user_is_admin:
self.abort(403, 'Request requires admin')
tags = self.request.GET.getall('tags')
if len(tags) <= 0:
tags = None
......@@ -324,44 +276,34 @@ class JobsHandler(base.RequestHandler):
job = Queue.start_job(tags=tags)
if job is None:
self.abort(400, 'No jobs to process')
raise InputValidationException('No jobs to process')
else:
return job
@require_admin
def reap_stale(self):
if not self.superuser_request and not self.user_is_admin:
self.abort(403, 'Request requires admin')
count = Queue.scan_for_orphans()
return { 'orphaned': count }
class JobHandler(base.RequestHandler):
"""Provides /Jobs/<jid> routes."""
@require_admin
def get(self, _id):
if not self.superuser_request and not self.user_is_admin:
self.abort(403, 'Request requires admin')
return Job.get(_id)
@require_admin
def get_config(self, _id):
"""Get a job's config"""
if not self.superuser_request and not self.user_is_admin:
self.abort(403, 'Request requires superuser')
j = Job.get(_id)
c = j.config
if c is None:
c = {}
# Serve config as formatted json file
self.response.headers['Content-Type'] = 'application/octet-stream'
self.response.headers['Content-Disposition'] = 'attachment; filename="config.json"'
# Detect if config is old- or new-style.
# TODO: remove this logic with a DB upgrade, ref database.py's reserved upgrade section.
encoded = None
if 'config' in c and c.get('inputs') is not None:
# New behavior
......@@ -394,13 +336,14 @@ class JobHandler(base.RequestHandler):
}
encoded = pseudo_consistent_json_encode(c)
self.response.app_iter = StringIO.StringIO(encoded)
self.response.headers['Content-Length'] = str(len(encoded.encode('utf-8'))) # must be set after app_iter
else:
# Legacy behavior
else: # Legacy behavior
encoded = pseudo_consistent_json_encode({"config": c})
self.response.app_iter = StringIO.StringIO(encoded)
self.response.headers['Content-Length'] = str(len(encoded.encode('utf-8'))) # must be set after app_iter
stream = StringIO.StringIO(encoded)
length = len(encoded.encode('utf-8'))
set_for_download(self.response, stream=stream, filename='config.json', length=length)
@require_login
def put(self, _id):
......@@ -455,15 +398,12 @@ class JobHandler(base.RequestHandler):
"""Get a job's logs in raw text"""
self._log_read_check(_id)
filename = 'job-' + _id + '-logs.txt'
self.response.headers['Content-Type'] = 'application/octet-stream'
self.response.headers['Content-Disposition'] = 'attachment; filename="job-' + _id + '-logs.txt"'
set_for_download(self.response, filename=filename)
for output in Logs.get_text_generator(_id):
self.response.write(output)
return
def get_logs_html(self, _id):
"""Get a job's logs in html"""
......@@ -474,18 +414,12 @@ class JobHandler(base.RequestHandler):
return
@require_admin
def add_logs(self, _id):
"""Add to a job's logs"""
if not self.superuser_request and not self.user_is_admin:
self.abort(403, 'Request requires admin')
doc = self.request.json
try:
Job.get(_id)
except Exception: # pylint: disable=broad-except
raise APINotFoundException('Job not found')
j = Job.get(_id)
Queue.mutate(j, {}) # Unconditionally heartbeat
return Logs.add(_id, doc)
......@@ -506,17 +440,13 @@ class JobHandler(base.RequestHandler):
new_id = Queue.retry(j, force=True)
return { "_id": new_id }
@require_drone
def prepare_complete(self, _id):
j = Job.get(_id)
payload = self.request.json
ticket = {
'job': j.id_,
'success': payload.get('success', False),
}
return {'ticket': config.db.job_tickets.insert_one(ticket).inserted_id}
success = payload['success']
ticket = JobTicket.create(_id, success)
return { 'ticket': ticket }
@require_login
def accept_failed_output(self, _id):
......
......@@ -370,6 +370,32 @@ class Job(object):
self.request = r
return self.request
class JobTicket(object):
"""
A JobTicket represents an attempt to complete a job.
"""
@staticmethod
def get(_id):
return config.db.job_tickets.find_one({'_id': bson.ObjectId(_id)})
@staticmethod
def create(job_id, success):
j = Job.get(job_id)
result = config.db.job_tickets.insert_one({
'job': j.id_,
'success': success,
})
return result.inserted_id
@staticmethod
def find(job_id):
"""Find any tickets with job ID"""
return list(config.db.job_tickets.find({'job': job_id}))
class Logs(object):
@staticmethod
......@@ -422,6 +448,11 @@ class Logs(object):
@staticmethod
def add(_id, doc):
# Silently ignore adding no logs
if len(doc) <= 0:
return
log = config.db.job_logs.find_one({'_id': _id})
if log is None: # Race
......
......@@ -8,7 +8,7 @@ import pymongo
import datetime
from .. import config
from .jobs import Job, Logs
from .jobs import Job, Logs, JobTicket
from .gears import get_gear, validate_gear_config, fill_gear_default_values
from ..dao.containerutil import create_filereference_from_dictionary, create_containerreference_from_dictionary, create_containerreference_from_filereference
from ..web.errors import InputValidationException
......@@ -417,13 +417,26 @@ class Queue(object):
"""
orphaned = 0
query = {
'state': 'running',
'modified': {'$lt': datetime.datetime.utcnow() - datetime.timedelta(seconds=100)},
}
while True:
orphan_candidate = config.db.jobs.find_one(query)
if orphan_candidate is None:
break
# If the job is currently attempting to complete, do not orphan.
ticket = JobTicket.find(orphan_candidate['_id'])
if ticket is not None and len(ticket) > 0:
continue
# CAS this job, since it does not have a ticket
select = { '_id': orphan_candidate['_id'] }
doc = config.db.jobs.find_one_and_update(
{
'state': 'running',
'modified': {'$lt': datetime.datetime.utcnow() - datetime.timedelta(seconds=100)},
},
dict(query, **select),
{
'$set': {
'state': 'failed', },
......@@ -432,11 +445,12 @@ class Queue(object):
)
if doc is None:
break
log.info('Job %s was heartbeat during a ticket lookup and thus not orhpaned', orphan_candidate['_id'])
else:
orphaned += 1
j = Job.load(doc)
Logs.add(j.id_, [{"msg":"The job did not report in for a long time and was canceled.", "fd":-1}])
Logs.add(j.id_, [{'msg':'The job did not report in for a long time and was canceled.', 'fd':-1}])
new_id = Queue.retry(j)
Logs.add(j.id_, [{"msg": "Retried job as " + str(new_id) if new_id else "Job retries exceeded maximum allowed", "fd":-1}])
Logs.add(j.id_, [{'msg': 'Retried job as ' + str(new_id) if new_id else 'Job retries exceeded maximum allowed', 'fd':-1}])
return orphaned
......@@ -12,10 +12,11 @@ from . import files
from . import tempdir as tempfile
from . import util
from . import validators
from .dao.containerstorage import SessionStorage, AcquisitionStorage
from .dao import containerutil, hierarchy
from .dao.containerstorage import SessionStorage, AcquisitionStorage
from .jobs import rules
from .jobs.jobs import Job
from .jobs.jobs import Job, JobTicket
from .jobs.queue import Queue
from .types import Origin
from .web import encoder
from .web.errors import FileFormException
......@@ -245,7 +246,7 @@ class EnginePlacer(Placer):
"""
A placer that can accept files and/or metadata sent to it from the engine
It uses update_container_hierarchy to update the container and it's parents' fields from the metadata
It uses update_container_hierarchy to update the container and its parents' fields from the metadata
"""
def check(self):
......@@ -277,7 +278,6 @@ class EnginePlacer(Placer):
self.metadata[self.container_type]['files'] = files_
###
def process_file_field(self, field, file_attrs):
if self.metadata is not None:
file_mds = self.metadata.get(self.container_type, {}).get('files', [])
......@@ -288,15 +288,24 @@ class EnginePlacer(Placer):
break
if self.context.get('job_ticket_id'):
job_ticket_id = bson.ObjectId(self.context.get('job_ticket_id'))
job_ticket = config.db.job_tickets.find_one({'_id': job_ticket_id})
if not job_ticket.get('success'):
job_ticket = JobTicket.get(self.context.get('job_ticket_id'))
if not job_ticket['success']:
file_attrs['from_failed_job'] = True
self.save_file(field, file_attrs)
self.saved.append(file_attrs)
def finalize(self):
job = None
job_ticket = None
success = True
if self.context.get('job_ticket_id'):
job_ticket = JobTicket.get(self.context.get('job_ticket_id'))
job = Job.get(job_ticket['job'])
success = job_ticket['success']
if self.metadata is not None:
bid = bson.ObjectId(self.id_)
......@@ -311,14 +320,15 @@ class EnginePlacer(Placer):
for k in self.metadata.keys():
self.metadata[k].pop('files', {})
if self.context.get('job_ticket_id'):
job_ticket_id = bson.ObjectId(self.context.get('job_ticket_id'))
job_ticket = config.db.job_tickets.find_one({'_id': job_ticket_id})
if job_ticket.get('success'):
hierarchy.update_container_hierarchy(self.metadata, bid, self.container_type)
else:
if success:
hierarchy.update_container_hierarchy(self.metadata, bid, self.container_type)
if job_ticket is not None:
if success:
Queue.mutate(job, {'state': 'complete'})
else:
Queue.mutate(job, {'state': 'failed'})
if self.context.get('job_id'):
job = Job.get(self.context.get('job_id'))
job.saved_files = [f['name'] for f in self.saved]
......
......@@ -61,7 +61,7 @@ class TemporaryDirectory(object):
def __del__(self):
# Issue a Warning if implicit cleanup needed
self.cleanup(_warn=True)
self.cleanup()
# The following code attempts to make
# this class tolerant of the module nulling out process
......
......@@ -177,6 +177,18 @@ def sanitize_string_to_filename(value):
keepcharacters = (' ', '.', '_', '-')
return "".join([c for c in value if c.isalnum() or c in keepcharacters]).rstrip()
def humanize_validation_error(val_err):
"""
Takes a jsonschema.ValidationError, returns a human-friendly string
"""
key = 'none'
if len(val_err.relative_path) > 0:
key = val_err.relative_path[0]
message = val_err.message.replace("u'", "'")
return 'Object does not match schema on key ' + key + ': ' + message
def obj_from_map(_map):
"""
Creates an anonymous object with properties determined by the passed (shallow) map.
......@@ -199,6 +211,20 @@ def path_from_hash(hash_):
path = (hash_version, hash_alg, first_stanza, second_stanza, hash_)
return os.path.join(*path)
def set_for_download(response, stream=None, filename=None, length=None):
"""Takes a self.response, and various download options."""
# If an app_iter is to be set, it MUST be before these other headers are set.
if stream is not None:
response.app_iter = stream
response.headers['Content-Type'] = 'application/octet-stream'
if filename is not None:
response.headers['Content-Disposition'] = 'attachment; filename="' + filename + '"'
if length is not None:
response.headers['Content-Length'] = str(length)
def format_hash(hash_alg, hash_):
"""
......@@ -206,7 +232,6 @@ def format_hash(hash_alg, hash_):
"""
return '-'.join(('v0', hash_alg, hash_))
def create_json_http_exception_response(message, code, request_id, custom=None):
content = {
'message': message,
......@@ -217,14 +242,12 @@ def create_json_http_exception_response(message, code, request_id, custom=None):
content.update(custom)
return content
def send_json_http_exception(response, message, code, request_id, custom=None):
response.set_status(code)
json_content = json.dumps(create_json_http_exception_response(message, code, request_id, custom))
response.headers['Content-Type'] = 'application/json; charset=utf-8'
response.write(json_content)
class Enum(baseEnum.Enum):
# Enum strings are prefixed by their class: "Category.classifier".
# This overrides that behaviour and removes the prefix.
......
......@@ -335,7 +335,6 @@ class RequestHandler(webapp2.RequestHandler):
code = exception.code
elif isinstance(exception, errors.InputValidationException):
code = 400
self.request.logger.warning(str(exception))
elif isinstance(exception, errors.APIAuthProviderException):
code = 401
elif isinstance(exception, errors.APIRefreshTokenException):
......
......@@ -418,6 +418,8 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_
}
}
api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set':{'state': 'running'}})
r = as_drone.post('/engine',
params={'level': 'acquisition', 'id': acquisition, 'job': job, 'job_ticket': job_ticket['_id']},
files=file_form('result.txt', meta=metadata)
......@@ -441,16 +443,6 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_
r = as_user.post('/jobs/' + job + '/accept-failed-output')
assert r.status_code == 403
# try to accept failed output - job is not in failed state yet
r = as_admin.post('/jobs/' + job + '/accept-failed-output')
assert r.status_code == 400
# set job state to failed
r = as_drone.put('/jobs/' + job, json={'state': 'running'})
assert r.ok
r = as_drone.put('/jobs/' + job, json={'state': 'failed'})
assert r.ok
# accept failed output
r = as_admin.post('/jobs/' + job + '/accept-failed-output')
assert r.ok
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment