diff --git a/api/api.py b/api/api.py index 65309c6d4e67e091af2df232478b7c546c8dcef9..d9126f1d797a57fb736030e3a1f002625f7e8518 100644 --- a/api/api.py +++ b/api/api.py @@ -57,6 +57,7 @@ routes = [ webapp2.Route(r'/download', core.Core, handler_method='download', methods=['GET', 'POST'], name='download'), webapp2.Route(r'/reaper', core.Core, handler_method='reaper', methods=['POST']), webapp2.Route(r'/sites', core.Core, handler_method='sites', methods=['GET']), + webapp2.Route(r'/register', core.Core, handler_method='register', methods=['POST']), webapp2.Route(r'/config', core.Config, methods=['GET']), webapp2.Route(r'/config.js', core.Config, handler_method='get_js', methods=['GET']) ]), @@ -72,6 +73,7 @@ routes = [ webapp2.Route(r'/next', jobs.Jobs, handler_method='next', methods=['GET']), webapp2.Route(r'/count', jobs.Jobs, handler_method='count', methods=['GET']), webapp2.Route(r'/addTestJob', jobs.Jobs, handler_method='addTestJob', methods=['GET']), + webapp2.Route(r'/create', jobs.Jobs, handler_method='create', methods=['POST']), webapp2.Route(r'/<:[^/]+>', jobs.Job, name='job'), ]), webapp2.Route(r'/api/groups', grouphandler.GroupHandler, handler_method='get_all', methods=['GET']), diff --git a/api/centralclient.py b/api/centralclient.py index 808d8f02cec6923354e55c64324c101079571823..435799beef8236ab1a4c30c2ed0a0eadedf287b4 100644 --- a/api/centralclient.py +++ b/api/centralclient.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python """ Client registers this instance with a central instance registery. @@ -18,6 +17,8 @@ logging.basicConfig() log = logging.getLogger('scitran.api.centralclient') logging.getLogger('urllib3').setLevel(logging.WARNING) # silence Requests library logging +fail_count = 0 + def update(db, api_uri, site_name, site_id, ssl_cert, central_url): """Send is-alive signal to central peer registry.""" diff --git a/api/core.py b/api/core.py index 3be18434d0b0903ebd9cbdd75090450ab9785a82..341774fae8ddc2768c3bf02bee75842130690093 100644 --- a/api/core.py +++ b/api/core.py @@ -14,6 +14,7 @@ from . import base from . import files from . import util from . import config +from . import centralclient from .dao import reaperutil from . import tempdir as tempfile @@ -270,3 +271,18 @@ class Core(base.RequestHandler): s['onload'] = True break return sites + + def register(self): + if not config.get_item('site', 'registered'): + self.abort(400, 'Site not registered with central') + if not config.get_item('site', 'ssl_cert'): + self.abort(400, 'SSL cert not configured') + if not config.get_item('site', 'central_url'): + self.abort(400, 'Central URL not configured') + if not centralclient.update(config.db, config.get_item('site', 'ssl_cert'), config.get_item('site', 'central_url')): + centralclient.fail_count += 1 + else: + centralclient.fail_count = 0 + if centralclient.fail_count == 3: + log.warning('scitran central unreachable, purging all remotes info') + centralclient.clean_remotes(mongo.db) diff --git a/api/jobs.py b/api/jobs.py index 759b199752499efce5dae81f4661ec99571cddab..c759de0a2c5c7204f0aba2760e45ebc1834aa8af 100644 --- a/api/jobs.py +++ b/api/jobs.py @@ -195,6 +195,7 @@ def generate_formula(algorithm_id, i): return f + class Jobs(base.RequestHandler): """Provide /jobs API routes.""" @@ -258,6 +259,53 @@ class Jobs(base.RequestHandler): return result + def create(self): + from . import rules # FIXME circular dependency hack + for c_type in ['projects', 'collections', 'sessions', 'acquisitions']: + + # This projection needs every field required to know what type of container it is & navigate to its project + containers = config.db[c_type].find({'files.unprocessed': True}, ['files', 'session', 'project']) + + for c in containers: + for f in c['files']: + if f.get('unprocessed'): + rules.create_jobs(config.db, c, c_type, f) + r = config.db[c_type].update_one( + { + '_id': c['_id'], + 'files': { + '$elemMatch': { + 'name': f['name'], + 'hash': f['hash'], + }, + }, + }, + { + '$set': { + 'files.$.unprocessed': False, + }, + }, + ) + if not r.matched_count: + log.info('file modified or removed, not marked as clean: %s %s, %s' % (c_type, c, f['name'])) + while True: + j = config.db.jobs.find_one_and_update( + { + 'state': 'running', + 'modified': {'$lt': datetime.datetime.utcnow() - datetime.timedelta(seconds=100)}, + }, + { + '$set': { + 'state': 'failed', + }, + }, + ) + if j is None: + break + else: + retry_job(config.db, j) + + class Job(base.RequestHandler): """Provides /Jobs/<jid> routes.""" diff --git a/bin/api.wsgi b/bin/api.wsgi index a90574f74e13b78c1d4e3558a6c01a1b0161b6e9..c23c90cff04c8a55f41509639b1abc2fd159908c 100644 --- a/bin/api.wsgi +++ b/bin/api.wsgi @@ -3,86 +3,3 @@ from api import api application = api.app_factory() - - -# FIXME: all code below should removed and ported into an app server independent framework - -import datetime -import uwsgidecorators - -from api import rules -from api import config -from api import jobs - -log = config.log - - -if config.get_item('site', 'registered'): - fail_count = 0 - @uwsgidecorators.timer(60) - def centralclient_timer(signum): - global fail_count - if not centralclient.update(mongo.db, args.ssl_cert, args.central_uri): - fail_count += 1 - else: - fail_count = 0 - if fail_count == 3: - log.warning('scitran central unreachable, purging all remotes info') - centralclient.clean_remotes(mongo.db) - - -def job_creation(signum): - for c_type in ['projects', 'collections', 'sessions', 'acquisitions']: - - # This projection needs every field required to know what type of container it is & navigate to its project - containers = config.db[c_type].find({'files.unprocessed': True}, ['files', 'session', 'project']) - - for c in containers: - for f in c['files']: - if f.get('unprocessed'): - rules.create_jobs(config.db, c, c_type, f) - r = config.db[c_type].update_one( - { - '_id': c['_id'], - 'files': { - '$elemMatch': { - 'name': f['name'], - 'hash': f['hash'], - }, - }, - }, - { - '$set': { - 'files.$.unprocessed': False, - }, - }, - ) - if not r.matched_count: - log.info('file modified or removed, not marked as clean: %s %s, %s' % (c_type, c, f['name'])) - while True: - j = config.db.jobs.find_one_and_update( - { - 'state': 'running', - 'modified': {'$lt': datetime.datetime.utcnow() - datetime.timedelta(seconds=100)}, - }, - { - '$set': { - 'state': 'failed', - }, - }, - ) - if j is None: - break - else: - jobs.retry_job(config.db, j) - - -# Run job creation immediately on start, then every 30 seconds thereafter. -# This saves sysadmin irritation waiting for the engine, or an initial job load conflicting with timed runs. -log.info('Loading jobs queue for initial processing. This may take some time.') -job_creation(None) -log.info('Loading jobs queue complete.') - -@uwsgidecorators.timer(30) -def job_creation_timer(signum): - job_creation(signum)