Skip to content
Snippets Groups Projects
Commit 48c87d8f authored by Nathaniel Kofalt's avatar Nathaniel Kofalt
Browse files

Merge pull request #117 from scitran/cron

Convert cron jobs to endpoints and remove unprocessed flag
parents bf7c8998 850c0bb8
No related branches found
No related tags found
No related merge requests found
......@@ -57,6 +57,7 @@ routes = [
webapp2.Route(r'/download', core.Core, handler_method='download', methods=['GET', 'POST'], name='download'),
webapp2.Route(r'/reaper', core.Core, handler_method='reaper', methods=['POST']),
webapp2.Route(r'/sites', core.Core, handler_method='sites', methods=['GET']),
webapp2.Route(r'/register', core.Core, handler_method='register', methods=['POST']),
webapp2.Route(r'/config', core.Config, methods=['GET']),
webapp2.Route(r'/config.js', core.Config, handler_method='get_js', methods=['GET'])
]),
......@@ -72,6 +73,7 @@ routes = [
webapp2.Route(r'/next', jobs.Jobs, handler_method='next', methods=['GET']),
webapp2.Route(r'/count', jobs.Jobs, handler_method='count', methods=['GET']),
webapp2.Route(r'/addTestJob', jobs.Jobs, handler_method='addTestJob', methods=['GET']),
webapp2.Route(r'/reap', jobs.Jobs, handler_method='reap_stale', methods=['POST']),
webapp2.Route(r'/<:[^/]+>', jobs.Job, name='job'),
]),
webapp2.Route(r'/api/groups', grouphandler.GroupHandler, handler_method='get_all', methods=['GET']),
......
#!/usr/bin/env python
"""
Client registers this instance with a central instance registery.
......@@ -18,6 +17,8 @@ logging.basicConfig()
log = logging.getLogger('scitran.api.centralclient')
logging.getLogger('urllib3').setLevel(logging.WARNING) # silence Requests library logging
fail_count = 0
def update(db, api_uri, site_name, site_id, ssl_cert, central_url):
"""Send is-alive signal to central peer registry."""
......
......@@ -11,9 +11,11 @@ import cStringIO
import validators
from . import base
from . import files
from . import util
from . import files
from . import rules
from . import config
from . import centralclient
from .dao import reaperutil
from . import tempdir as tempfile
......@@ -123,28 +125,28 @@ class Core(base.RequestHandler):
file_store = files.FileStore(self.request, tempdir_path)
except files.FileStoreException as e:
self.abort(400, str(e))
created = modified = datetime.datetime.now()
now = datetime.datetime.now()
fileinfo = dict(
name=file_store.filename,
created=created,
modified=modified,
created=now,
modified=now,
size=file_store.size,
hash=file_store.hash,
type=file_store.filetype,
unprocessed=True,
tags=file_store.tags,
metadata=file_store.metadata
)
container = reaperutil.create_container_hierarchy(file_store.metadata)
f = container.find(file_store.filename)
created = modified = datetime.datetime.utcnow()
target_path = os.path.join(config.get_item('persistent', 'data_path'), util.path_from_hash(fileinfo['hash']))
if not f:
file_store.move_file(target_path)
container.add_file(fileinfo)
rules.create_jobs(config.db, container.acquisition, 'acquisition', fileinfo)
elif not file_store.identical(util.path_from_hash(fileinfo['hash']), f['hash']):
file_store.move_file(target_path)
container.update_file(fileinfo)
rules.create_jobs(config.db, container.acquisition, 'acquisition', fileinfo)
throughput = file_store.size / file_store.duration.total_seconds()
log.info('Received %s [%s, %s/s] from %s' % (file_store.filename, util.hrsize(file_store.size), util.hrsize(throughput), self.request.client_addr))
......@@ -270,3 +272,18 @@ class Core(base.RequestHandler):
s['onload'] = True
break
return sites
def register(self):
if not config.get_item('site', 'registered'):
self.abort(400, 'Site not registered with central')
if not config.get_item('site', 'ssl_cert'):
self.abort(400, 'SSL cert not configured')
if not config.get_item('site', 'central_url'):
self.abort(400, 'Central URL not configured')
if not centralclient.update(config.db, config.get_item('site', 'ssl_cert'), config.get_item('site', 'central_url')):
centralclient.fail_count += 1
else:
centralclient.fail_count = 0
if centralclient.fail_count == 3:
log.warning('scitran central unreachable, purging all remotes info')
centralclient.clean_remotes(mongo.db)
......@@ -25,7 +25,7 @@ class ReapedAcquisition(object):
return None
def update_file(self, fileinfo):
update_set = {'files.$.unprocessed': True, 'files.$.modified': datetime.datetime.utcnow()}
update_set = {'files.$.modified': datetime.datetime.utcnow()}
# in this method, we are overriding an existing file.
# update_set allows to update all the fileinfo like size, hash, etc.
fileinfo.update(self.fileinfo)
......
......@@ -6,6 +6,7 @@ import datetime
from .. import base
from .. import util
from .. import files
from .. import rules
from .. import config
from .. import validators
from .. import tempdir as tempfile
......@@ -366,7 +367,6 @@ class FileListHandler(ListHandler):
'type': file_store.filetype,
'created': file_datetime,
'modified': file_datetime,
'unprocessed': True
}
if file_store.metadata:
file_properties['metadata'] = file_store.metadata
......@@ -398,4 +398,5 @@ class FileListHandler(ListHandler):
result = keycheck(mongo_validator(permchecker(storage.exec_op)))(method, _id=_id, query_params=query_params, payload=payload)
if not result or result.modified_count != 1:
self.abort(404, 'Element not added in list {} of container {} {}'.format(storage.list_name, storage.cont_name, _id))
rules.create_jobs(config.db, container, cont_name, file_properties)
return {'modified': result.modified_count}
......@@ -195,6 +195,7 @@ def generate_formula(algorithm_id, i):
return f
class Jobs(base.RequestHandler):
"""Provide /jobs API routes."""
......@@ -258,6 +259,25 @@ class Jobs(base.RequestHandler):
return result
def reap_stale(self):
while True:
j = config.db.jobs.find_one_and_update(
{
'state': 'running',
'modified': {'$lt': datetime.datetime.utcnow() - datetime.timedelta(seconds=100)},
},
{
'$set': {
'state': 'failed',
},
},
)
if j is None:
break
else:
retry_job(config.db, j)
class Job(base.RequestHandler):
"""Provides /Jobs/<jid> routes."""
......
......@@ -9,7 +9,6 @@
"size": { "type": "integer" },
"hash": { "type": "string" },
"instrument": { "type": "string" },
"unprocessed": { "type": "boolean"},
"measurements": {
"items": { "type": "string"},
"type": "array",
......
......@@ -9,7 +9,6 @@
"size": { "type": "integer" },
"hash": { "type": "string" },
"instrument": { "type": "string" },
"unprocessed": { "type": "boolean"},
"measurements": {
"items": { "type": "string"},
"type": "array",
......@@ -24,7 +23,7 @@
"type": "object"
}
},
"required": ["name", "created", "modified", "size", "hash", "unprocessed"],
"required": ["name", "created", "modified", "size", "hash"],
"key_fields": ["name"],
"additionalProperties": false
}
......@@ -3,86 +3,3 @@
from api import api
application = api.app_factory()
# FIXME: all code below should removed and ported into an app server independent framework
import datetime
import uwsgidecorators
from api import rules
from api import config
from api import jobs
log = config.log
if config.get_item('site', 'registered'):
fail_count = 0
@uwsgidecorators.timer(60)
def centralclient_timer(signum):
global fail_count
if not centralclient.update(mongo.db, args.ssl_cert, args.central_uri):
fail_count += 1
else:
fail_count = 0
if fail_count == 3:
log.warning('scitran central unreachable, purging all remotes info')
centralclient.clean_remotes(mongo.db)
def job_creation(signum):
for c_type in ['projects', 'collections', 'sessions', 'acquisitions']:
# This projection needs every field required to know what type of container it is & navigate to its project
containers = config.db[c_type].find({'files.unprocessed': True}, ['files', 'session', 'project'])
for c in containers:
for f in c['files']:
if f.get('unprocessed'):
rules.create_jobs(config.db, c, c_type, f)
r = config.db[c_type].update_one(
{
'_id': c['_id'],
'files': {
'$elemMatch': {
'name': f['name'],
'hash': f['hash'],
},
},
},
{
'$set': {
'files.$.unprocessed': False,
},
},
)
if not r.matched_count:
log.info('file modified or removed, not marked as clean: %s %s, %s' % (c_type, c, f['name']))
while True:
j = config.db.jobs.find_one_and_update(
{
'state': 'running',
'modified': {'$lt': datetime.datetime.utcnow() - datetime.timedelta(seconds=100)},
},
{
'$set': {
'state': 'failed',
},
},
)
if j is None:
break
else:
jobs.retry_job(config.db, j)
# Run job creation immediately on start, then every 30 seconds thereafter.
# This saves sysadmin irritation waiting for the engine, or an initial job load conflicting with timed runs.
log.info('Loading jobs queue for initial processing. This may take some time.')
job_creation(None)
log.info('Loading jobs queue complete.')
@uwsgidecorators.timer(30)
def job_creation_timer(signum):
job_creation(signum)
......@@ -6,7 +6,6 @@ import os
import json
import shutil
import hashlib
import pymongo
import zipfile
import argparse
import datetime
......@@ -15,6 +14,7 @@ import requests
from api.dao import reaperutil
from api import util
from api import rules
from api import config
log = config.log
......@@ -111,11 +111,11 @@ def data(args):
'name': filename,
'size': size,
'hash': computed_hash,
'unprocessed': True,
'created': created,
'modified': modified
}
container.add_file(fileinfo)
rules.create_jobs(config.db, container.acquisition, 'acquisition', fileinfo)
data_desc = """
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment