diff --git a/api.py b/api.py index 97399b11e2df14747817b0b4d39c87d0585caabf..f1f5cb39dd07b72b2c1a1859321bd90a8951b6a2 100644 --- a/api.py +++ b/api.py @@ -1,4 +1,5 @@ import os +import copy import json import webapp2 import bson.json_util @@ -19,24 +20,28 @@ routes = [ webapp2_extras.routes.PathPrefixRoute(r'/api', [ webapp2.Route(r'/download', core.Core, handler_method='download', methods=['GET', 'POST'], name='download'), webapp2.Route(r'/upload', core.Core, handler_method='upload', methods=['POST']), + webapp2.Route(r'/reaper', core.Core, handler_method='reaper', methods=['POST']), webapp2.Route(r'/sites', core.Core, handler_method='sites', methods=['GET']), webapp2.Route(r'/search', core.Core, handler_method='search', methods=['GET', 'POST']), ]), + webapp2_extras.routes.PathPrefixRoute(r'/api/schema', [ + webapp2.Route(r'/group', users.Group, handler_method='schema', methods=['GET']), + webapp2.Route(r'/user', users.User, handler_method='schema', methods=['GET']), + ]), webapp2.Route(r'/api/users', users.Users), webapp2_extras.routes.PathPrefixRoute(r'/api/users', [ webapp2.Route(r'/count', users.Users, handler_method='count', methods=['GET']), webapp2.Route(r'/self', users.User, handler_method='self', methods=['GET']), webapp2.Route(r'/roles', users.User, handler_method='roles', methods=['GET']), - webapp2.Route(r'/schema', users.User, handler_method='schema', methods=['GET']), webapp2.Route(r'/<:[^/]+>', users.User, name='user'), webapp2.Route(r'/<:[^/]+>/groups', users.Groups, name='groups'), + webapp2.Route(r'/<uid:[^/]+>/projects', projects.Projects, name='u_projects'), ]), webapp2.Route(r'/api/groups', users.Groups), webapp2_extras.routes.PathPrefixRoute(r'/api/groups', [ webapp2.Route(r'/count', users.Groups, handler_method='count', methods=['GET']), - webapp2.Route(r'/schema', users.Group, handler_method='schema', methods=['GET']), webapp2.Route(r'/<:[^/]+>', users.Group, name='group'), - webapp2.Route(r'/<:[^/]+>/projects', projects.Projects, name='g_projects'), + webapp2.Route(r'/<gid:[^/]+>/projects', projects.Projects, name='g_projects'), webapp2.Route(r'/<gid:[^/]+>/sessions', sessions.Sessions, name='g_sessions', methods=['GET']), ]), webapp2.Route(r'/api/projects', projects.Projects, methods=['GET'], name='projects'), @@ -45,7 +50,7 @@ routes = [ webapp2.Route(r'/groups', projects.Projects, handler_method='groups', methods=['GET']), webapp2.Route(r'/schema', projects.Project, handler_method='schema', methods=['GET']), webapp2.Route(r'/<:[0-9a-f]{24}>', projects.Project, name='project'), - webapp2.Route(r'/<:[0-9a-f]{24}>/file', projects.Project, handler_method='file', methods=['PUT']), + webapp2.Route(r'/<:[0-9a-f]{24}>/file', projects.Project, handler_method='file', methods=['POST']), webapp2.Route(r'/<:[0-9a-f]{24}>/file/<:[^/]+>', projects.Project, handler_method='file'), webapp2.Route(r'/<pid:[0-9a-f]{24}>/sessions', sessions.Sessions, name='p_sessions'), ]), @@ -55,7 +60,7 @@ routes = [ webapp2.Route(r'/curators', collections_.Collections, handler_method='curators', methods=['GET']), webapp2.Route(r'/schema', collections_.Collection, handler_method='schema', methods=['GET']), webapp2.Route(r'/<:[0-9a-f]{24}>', collections_.Collection, name='collection'), - webapp2.Route(r'/<:[0-9a-f]{24}>/file', collections_.Collection, handler_method='file', methods=['PUT']), + webapp2.Route(r'/<:[0-9a-f]{24}>/file', collections_.Collection, handler_method='file', methods=['POST']), webapp2.Route(r'/<:[0-9a-f]{24}>/file/<:[^/]+>', collections_.Collection, handler_method='file'), webapp2.Route(r'/<:[0-9a-f]{24}>/sessions', collections_.CollectionSessions, name='coll_sessions'), webapp2.Route(r'/<:[0-9a-f]{24}>/acquisitions', collections_.CollectionAcquisitions, name='coll_acquisitions'), @@ -65,7 +70,7 @@ routes = [ webapp2.Route(r'/count', sessions.Sessions, handler_method='count', methods=['GET']), webapp2.Route(r'/schema', sessions.Session, handler_method='schema', methods=['GET']), webapp2.Route(r'/<:[0-9a-f]{24}>', sessions.Session, name='session'), - webapp2.Route(r'/<:[0-9a-f]{24}>/file', sessions.Session, handler_method='file', methods=['PUT']), + webapp2.Route(r'/<:[0-9a-f]{24}>/file', sessions.Session, handler_method='file', methods=['POST']), webapp2.Route(r'/<:[0-9a-f]{24}>/file/<:[^/]+>', sessions.Session, handler_method='file'), webapp2.Route(r'/<:[0-9a-f]{24}>/acquisitions', acquisitions.Acquisitions, name='acquisitions'), ]), @@ -73,14 +78,14 @@ routes = [ webapp2.Route(r'/count', acquisitions.Acquisitions, handler_method='count', methods=['GET']), webapp2.Route(r'/schema', acquisitions.Acquisition, handler_method='schema', methods=['GET']), webapp2.Route(r'/<:[0-9a-f]{24}>', acquisitions.Acquisition, name='acquisition'), - webapp2.Route(r'/<:[0-9a-f]{24}>/file', acquisitions.Acquisition, handler_method='file', methods=['PUT']), + webapp2.Route(r'/<:[0-9a-f]{24}>/file', acquisitions.Acquisition, handler_method='file', methods=['POST']), webapp2.Route(r'/<:[0-9a-f]{24}>/file/<:[^/]+>', acquisitions.Acquisition, handler_method='file'), - webapp2.Route(r'/<:[0-9a-f]{24}>/tile', acquisitions.Acquisition, handler_method='get_tile', methods=['GET']), ]), webapp2.Route(r'/api/jobs', jobs.Jobs), webapp2_extras.routes.PathPrefixRoute(r'/api/jobs', [ webapp2.Route(r'/next', jobs.Jobs, handler_method='next', methods=['GET']), webapp2.Route(r'/count', jobs.Jobs, handler_method='count', methods=['GET']), + webapp2.Route(r'/addTestJob', jobs.Jobs, handler_method='addTestJob', methods=['GET']), webapp2.Route(r'/<:[^/]+>', jobs.Job, name='job'), ]), webapp2.Route(r'/api/apps', apps.Apps), @@ -91,11 +96,25 @@ routes = [ ]), ] + +with open(os.path.join(os.path.dirname(__file__), 'schema.json')) as fp: + schema_dict = json.load(fp) +for cls in [ + users.Group, + users.User, + ]: + cls.post_schema = copy.deepcopy(schema_dict[cls.__name__.lower()]) + cls.put_schema = copy.deepcopy(cls.post_schema) + cls.put_schema['properties'].pop('_id') + cls.put_schema.pop('required') + + def dispatcher(router, request, response): rv = router.default_dispatcher(request, response) if rv is not None: response.write(json.dumps(rv, default=bson.json_util.default)) response.headers['Content-Type'] = 'application/json; charset=utf-8' + app = webapp2.WSGIApplication(routes) app.router.set_dispatcher(dispatcher) diff --git a/api.wsgi b/api.wsgi index 99b55f3e07ce8584199ee2192ad5ca0ea0927263..65b8abe68235a7e63ff1d7c93803159c8cf5e4a0 100644 --- a/api.wsgi +++ b/api.wsgi @@ -1,3 +1,5 @@ +# vim: filetype=python + import logging logging.basicConfig( format='%(asctime)s %(name)16.16s:%(levelname)4.4s %(message)s', @@ -14,6 +16,7 @@ import argparse import api import centralclient +import jobs os.environ['PYTHON_EGG_CACHE'] = '/tmp/python_egg_cache' @@ -100,7 +103,7 @@ else: import uwsgidecorators @uwsgidecorators.cron(0, -1, -1, -1, -1) # top of every hour - def upload_storage_cleaning(num): + def upload_storage_cleaning(signum): upload_path = application.config['upload_path'] for f in os.listdir(upload_path): fp = os.path.join(upload_path, f) @@ -119,5 +122,51 @@ else: else: fail_count = 0 if fail_count == 3: - log.debug('scitran central unreachable, purging all remotes info') + log.warning('scitran central unreachable, purging all remotes info') centralclient.clean_remotes(application.db, args.site_id) + + @uwsgidecorators.timer(30) + def job_creation(signum): + for c_type in ['projects', 'collections', 'sessions', 'acquisitions']: + for c in application.db[c_type].find({'files.dirty': True}, ['files']): + containers = [(c_type, c)] # TODO: this should be the full container hierarchy + for f in c['files']: + if f.get('dirty'): + jobs.spawn_jobs(application.db, containers, f) + r = application.db[c_type].update_one( + { + '_id': c['_id'], + 'files': { + '$elemMatch': { + 'filename': f['filename'], + 'filehash': f['filehash'], + }, + }, + }, + { + '$set': { + 'files.$.dirty': False, + }, + }, + ) + if not r.matched_count: + log.info('file modified or removed, not marked as clean: %s %s, %s' % (c_type, c, f['filename'])) + while True: + j = application.db.jobs.find_one_and_update( + { + 'state': 'running', + 'heartbeat': {'$lt': datetime.datetime.utcnow() - datetime.timedelta(seconds=100)}, + }, + { + '$set': { + 'state': 'failed', + }, + }, + ) + if j is None: + break + if j['attempt'] < 3: + job_id = jobs.queue_job(application.db, j['algorithm_id'], j['container_type'], j['container_id'], j['filename'], j['filehash'], j['attempt']+1, j['_id']) + log.info('respawned job %s as %s (attempt %d)' % (j['_id'], job_id, j['attempt']+1)) + else: + log.info('permanently failed job %s (after %d attempts)' % (j['_id'], j['attempt'])) diff --git a/base.py b/base.py index 0eba306b78061f6aa58de40f00664c7736f62914..cbec08ee22990a24f02e77cd46ceaef71b39d945 100644 --- a/base.py +++ b/base.py @@ -10,6 +10,7 @@ import hashlib import webapp2 import datetime import requests +import jsonschema class RequestHandler(webapp2.RequestHandler): @@ -125,14 +126,21 @@ class RequestHandler(webapp2.RequestHandler): if header in r.headers: self.response.headers[header] = r.headers[header] - def abort(self, code, *args, **kwargs): - log.warning(str(code) + ' ' + '; '.join(args)) + def abort(self, code, detail, **kwargs): + if isinstance(detail, jsonschema.ValidationError): + detail = { + 'relative_path': list(detail.relative_path), + 'instance': detail.instance, + 'validator': detail.validator, + 'validator_value': detail.validator_value, + } + log.warning(str(code) + ' ' + str(detail)) json_body = { 'uid': self.uid, 'code': code, - 'detail': '; '.join(args), + 'detail': detail, } - webapp2.abort(code, *args, json_body=json_body, **kwargs) + webapp2.abort(code, json_body=json_body, **kwargs) def schema(self, updates={}): json_schema = copy.deepcopy(self.json_schema) diff --git a/bootstrap.py b/bootstrap.py index cd836543ed09d2ed0176535cbd115946c471ac93..589740d7a15b410bf24e9cc818f5aa23bb6c63c8 100755 --- a/bootstrap.py +++ b/bootstrap.py @@ -2,70 +2,31 @@ # # @author: Gunnar Schaefer +import logging +logging.basicConfig( + format='%(asctime)s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=logging.INFO, +) +log = logging.getLogger('scitran.bootstrap') + import os import json import time -import pymongo import hashlib -import logging +import pymongo import argparse +import datetime import util -import scitran.data as scidata - -def connect_db(db_uri, **kwargs): - for x in range(0, 30): - try: - db_client = pymongo.MongoReplicaSetClient(db_uri, **kwargs) if 'replicaSet' in db_uri else pymongo.MongoClient(db_uri, **kwargs) - except: - time.sleep(1) - pass - else: - break - else: - raise Exception("Could not connect to MongoDB") - return db_client - - -def rsinit(args): - db_client = pymongo.MongoClient(args.db_uri) - repl_conf = eval(args.config) - db_client.admin.command('replSetInitiate', repl_conf) - -rsinit_desc = """ -example: -./scripts/bootstrap.py rsinit mongodb://cnifs.stanford.edu \\ -"dict(_id='cni', members=[ \\ - dict(_id=0, host='cnifs.stanford.edu'), \\ - dict(_id=1, host='cnibk.stanford.edu', priority=0.5), \\ - dict(_id=2, host='cni.stanford.edu', arbiterOnly=True), \\ -])" -""" - - -def authinit(args): - db_client = pymongo.MongoClient(args.db_uri) - db_client['admin'].add_user(name='admin', password=args.password, roles=['userAdminAnyDatabase']) - db_client['nims'].add_user(name=args.username, password=args.password, roles=['readWrite', 'dbAdmin']) - uri_parts = args.db_uri.partition('://') - print 'You must now restart mongod with the "--auth" parameter and modify your URI as follows:' - print ' %s%s:%s@%s' % (uri_parts[0] + uri_parts[1], args.username, args.password, uri_parts[2]) - - print 'openssl rand -base64 756 > cni.key' - -authinit_desc = """ -example: -./scripts/bootstrap.py authinit nims secret_pw mongodb://cnifs.stanford.edu/nims?replicaSet=cni -""" - def dbinit(args): - db_client = connect_db(args.db_uri) - db = db_client.get_default_database() + db = pymongo.MongoClient(args.db_uri).get_default_database() + now = datetime.datetime.utcnow() if args.force: - db_client.drop_database(db) + db.client.drop_database(db) db.projects.create_index([('gid', 1), ('name', 1)]) db.sessions.create_index('project') @@ -76,83 +37,49 @@ def dbinit(args): db.authtokens.create_index('timestamp', expireAfterSeconds=600) db.uploads.create_index('timestamp', expireAfterSeconds=60) db.downloads.create_index('timestamp', expireAfterSeconds=60) - # TODO: apps and jobs indexes (indicies?) + # TODO jobs indexes + # TODO review all indexes if args.json: with open(args.json) as json_dump: input_data = json.load(json_dump) - if 'users' in input_data: - db.users.insert(input_data['users']) - if 'groups' in input_data: - db.groups.insert(input_data['groups']) - if 'drones' in input_data: - db.drones.insert(input_data['drones']) - for u in db.users.find(): - db.users.update({'_id': u['_id']}, {'$set': {'email_hash': hashlib.md5(u['email']).hexdigest()}}) - - db.groups.update({'_id': 'unknown'}, {'$setOnInsert': {'name': 'Unknown', 'roles': []}}, upsert=True) + for u in input_data.get('users', []): + u['created'] = now + u['modified'] = now + u.setdefault('preferences', {}) + u.setdefault('avatar', 'https://gravatar.com/avatar/' + hashlib.md5(u['email']).hexdigest() + '?s=512&d=mm') + db.users.insert(u) + for g in input_data.get('groups', []): + g['created'] = now + g['modified'] = now + db.groups.insert(g) + for d in input_data.get('drones', []): + d['created'] = now + d['modified'] = now + db.drones.insert(d) + + db.groups.update({'_id': 'unknown'}, {'$setOnInsert': { + 'created': now, + 'modified': now, + 'name': 'Unknown', + 'roles': [], + }}, upsert=True) dbinit_desc = """ example: ./scripts/bootstrap.py dbinit mongodb://cnifs.stanford.edu/nims?replicaSet=cni -j nims_users_and_groups.json """ -def appsinit(args): - """Upload an app.""" - import tarfile - db_client = connect_db(args.db_uri) - db = db_client.get_default_database() - app_tgz = args.app_tgz + '.tgz' - if not os.path.exists(app_tgz): - with tarfile.open(app_tgz, 'w:gz', compresslevel=6) as tf: - tf.add(args.app_tgz, arcname=os.path.basename(args.app_tgz)) - util.insert_app(db, app_tgz, args.apps_path) - -appsinit_desc = """ -example: -./scripts/bootstrap.py appsinit mongodb://cnifs.stanford.edu/nims?repliaceSet=cni /path/to/app/dir /path/to/apps/storage -""" - -# TODO: this should use util.create_job to eliminate duplicate code -# TODO: update util.create_job to be useable from this bootstrap script. -def jobsinit(args): - """Create a job entry for every acquisition's orig dataset.""" - db_client = connect_db(args.db_uri) - db = db_client.get_default_database() - - if args.force: - db.drop_collection('jobs') - - # find all "orig" files, and create jobs for them - for a in db.acquisitions.find({'files.filetype': 'dicom'}, ['uid', 'files.$']): - aid = str(a['_id']) - fileinfo = a['files'][0] - print aid - fp = os.path.join(args.data_path, aid[-3:], aid, fileinfo['filename']) - if not os.path.exists(fp): - print ('%s does not exist. no job created.' % fp) - continue - datainfo = {'acquisition_id': a['uid'], 'fileinfo': fileinfo} - util.create_job(db.acquisitions, datainfo) - -jobinit_desc = """ -example: - ./scripts/bootstrap.py jobsinit mongodb://cnifs.stanford.edu/nims?replicaSet=cni -""" - def sort(args): - logging.basicConfig(level=logging.WARNING) quarantine_path = os.path.join(args.sort_path, 'quarantine') if not os.path.exists(args.sort_path): os.makedirs(args.sort_path) if not os.path.exists(quarantine_path): os.makedirs(quarantine_path) - print 'initializing DB' - kwargs = dict(tz_aware=True) - db_client = connect_db(args.db_uri, **kwargs) - db = db_client.get_default_database() - print 'inspecting %s' % args.path + log.info('initializing DB') + db = pymongo.MongoClient(args.db_uri).get_default_database() + log.info('inspecting %s' % args.path) files = [] for dirpath, dirnames, filenames in os.walk(args.path): for filepath in [os.path.join(dirpath, fn) for fn in filenames if not fn.startswith('.')]: @@ -160,9 +87,9 @@ def sort(args): files.append(filepath) dirnames[:] = [dn for dn in dirnames if not dn.startswith('.')] # need to use slice assignment to influence walk behavior file_cnt = len(files) - print 'found %d files to sort (ignoring symlinks and dotfiles)' % file_cnt + log.info('found %d files to sort (ignoring symlinks and dotfiles)' % file_cnt) for i, filepath in enumerate(files): - print 'sorting %s [%s] (%d/%d)' % (os.path.basename(filepath), util.hrsize(os.path.getsize(filepath)), i+1, file_cnt) + log.info('sorting %s [%s] (%d/%d)' % (os.path.basename(filepath), util.hrsize(os.path.getsize(filepath)), i+1, file_cnt)) hash_ = hashlib.sha1() if not args.quick: with open(filepath, 'rb') as fd: @@ -171,10 +98,9 @@ def sort(args): datainfo = util.parse_file(filepath, hash_.hexdigest()) if datainfo is None: util.quarantine_file(filepath, quarantine_path) - print 'Quarantining %s (unparsable)' % os.path.basename(filepath) + log.info('quarantining %s (unparsable)' % os.path.basename(filepath)) else: util.commit_file(db.acquisitions, None, datainfo, filepath, args.sort_path) - util.create_job(db.acquisitions, datainfo) # FIXME we should only mark files as new and let engine take it from there sort_desc = """ example: @@ -182,74 +108,9 @@ example: """ -def upload(args): - import util - import datetime - import requests - print 'inspecting %s' % args.path - files = [] - for dirpath, dirnames, filenames in os.walk(args.path): - for filepath in [os.path.join(dirpath, fn) for fn in filenames if not fn.startswith('.')]: - if not os.path.islink(filepath): - files.append(filepath) - dirnames[:] = [dn for dn in dirnames if not dn.startswith('.')] # need to use slice assignment to influence walk behavior - print 'found %d files to upload (ignoring symlinks and dotfiles)' % len(files) - for filepath in files: - filename = os.path.basename(filepath) - print 'hashing %s' % filename - hash_ = hashlib.md5() - with open(filepath, 'rb') as fd: - for chunk in iter(lambda: fd.read(2**20), ''): - hash_.update(chunk) - print 'uploading %s [%s]' % (filename, util.hrsize(os.path.getsize(filepath))) - with open(filepath, 'rb') as fd: - headers = { - 'User-Agent': 'bootstrapper', - 'Content-MD5': hash_.hexdigest(), - 'Content-Disposition': 'attachment; filename="%s"' % filename, - } - try: - start = datetime.datetime.now() - r = requests.put(args.url, data=fd, headers=headers, verify=not args.no_verify) - upload_duration = (datetime.datetime.now() - start).total_seconds() - except requests.exceptions.ConnectionError as e: - print 'error %s: %s' % (filename, e) - else: - if r.status_code == 200: - print 'success %s [%s/s]' % (filename, util.hrsize(os.path.getsize(filepath)/upload_duration)) - else: - print 'failure %s: %s %s, %s' % (filename, r.status_code, r.reason, r.text) - -upload_desc = """ -example: -./scripts/bootstrap.py upload /tmp/data https://example.com/upload -""" - - parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(help='operation to perform') -rsinit_parser = subparsers.add_parser( - name='rsinit', - help='initialize replication set', - description=rsinit_desc, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) -rsinit_parser.add_argument('db_uri', help='DB URI') -rsinit_parser.add_argument('config', help='replication set config') -rsinit_parser.set_defaults(func=rsinit) - -authinit_parser = subparsers.add_parser( - name='authinit', - help='initialize authentication', - description=authinit_desc, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) -authinit_parser.add_argument('username', help='DB username') -authinit_parser.add_argument('password', help='DB password') -authinit_parser.add_argument('db_uri', help='DB URI') -authinit_parser.set_defaults(func=authinit) - dbinit_parser = subparsers.add_parser( name='dbinit', help='initialize database', @@ -261,28 +122,6 @@ dbinit_parser.add_argument('-j', '--json', help='JSON file containing users and dbinit_parser.add_argument('db_uri', help='DB URI') dbinit_parser.set_defaults(func=dbinit) -appsinit_parser = subparsers.add_parser( - name='appsinit', - help='load an app', - description=appsinit_desc, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) -appsinit_parser.add_argument('db_uri', help='DB URI') -appsinit_parser.add_argument('app_tgz', help='filesystem path to tgz of app build context') -appsinit_parser.add_argument('apps_path', help='filesystem path to loaded apps') -appsinit_parser.set_defaults(func=appsinit) - -jobsinit_parser = subparsers.add_parser( - name='jobsinit', - help='initalize jobs collection from existing acquisitions', - description=dbinit_desc, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) -jobsinit_parser.add_argument('-f', '--force', action='store_true', help='wipe out any existing jobs') -jobsinit_parser.add_argument('db_uri', help='DB URI') -jobsinit_parser.add_argument('data_path', help='filesystem path to sorted data') -jobsinit_parser.set_defaults(func=jobsinit) - sort_parser = subparsers.add_parser( name='sort', help='sort all files in a dicrectory tree', @@ -295,16 +134,5 @@ sort_parser.add_argument('path', help='filesystem path to data') sort_parser.add_argument('sort_path', help='filesystem path to sorted data') sort_parser.set_defaults(func=sort) -upload_parser = subparsers.add_parser( - name='upload', - help='upload all files in a directory tree', - description=upload_desc, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) -upload_parser.add_argument('path', help='filesystem path to data') -upload_parser.add_argument('url', help='upload URL') -upload_parser.add_argument('-n', '--no_verify', help='disable SSL verification', action='store_true') -upload_parser.set_defaults(func=upload) - args = parser.parse_args() args.func(args) diff --git a/collections_.py b/collections_.py index b6b94adf3fdb15968874a612b22a8de4b6aeef76..cb4003b11b69a885a98db306a2ef380ee1fced3a 100644 --- a/collections_.py +++ b/collections_.py @@ -201,6 +201,7 @@ class Collections(containers.ContainerList): self.abort(400, str(e)) json_body['curator'] = self.uid json_body['timestamp'] = datetime.datetime.utcnow() + json_body['permissions'] = [{'_id': self.uid, 'access': 'admin'}] return {'_id': str(self.dbc.insert(json_body))} def get(self): @@ -279,8 +280,8 @@ class Collection(containers.Container): """Delete a Collection.""" _id = bson.ObjectId(cid) self._get(_id, 'admin', perm_only=True) - self.app.db.acquisitions.update({'collections': _id}, {'$pull': {'collections': _id}}, multi=True) - self.dbc.remove({'_id': _id}) + self.app.db.acquisitions.update_many({'collections': _id}, {'$pull': {'collections': _id}}) + self._delete(_id) class CollectionSessions(sessions.Sessions): @@ -335,12 +336,12 @@ class CollectionAcquisitions(acquisitions.Acquisitions): if not self.app.db.collections.find_one({'_id': _id}): self.abort(404, 'no such Collection') query = {'collections': _id} - sid = self.request.GET.get('session') + sid = self.request.GET.get('session', '') if bson.ObjectId.is_valid(sid): query['session'] = bson.ObjectId(sid) elif sid != '': self.abort(400, sid + ' is not a valid ObjectId') - projection = {'label': 1, 'description': 1, 'modality': 1, 'datatype': 1, 'notes': 1, 'timestamp': 1, 'timezone': 1} + projection = {p: 1 for p in ['label', 'description', 'modality', 'datatype', 'notes', 'timestamp', 'timezone', 'files']} projection['permissions'] = {'$elemMatch': {'_id': self.uid, 'site': self.source_site}} acquisitions = list(self.dbc.find(query, projection)) for acq in acquisitions: diff --git a/containers.py b/containers.py index 3fe94000822faec33bb44b9bc04929a859a51e44..6b51a93c317a19b3e1393398eb960f122028e1e3 100644 --- a/containers.py +++ b/containers.py @@ -8,6 +8,7 @@ import cgi import bson import json import shutil +import zipfile import datetime import jsonschema @@ -122,17 +123,20 @@ class ContainerList(base.RequestHandler): self.abort(400, str(e)) return json_body - def _get(self, query, projection, admin_only=False): + def _get(self, query, projection, admin_only=False, uid=None): projection = {p: 1 for p in projection + ['files']} if self.public_request: query['public'] = True else: - projection['permissions'] = {'$elemMatch': {'_id': self.uid, 'site': self.source_site}} - if not self.superuser_request: + if uid is not None: + if uid != self.uid and not self.superuser_request: + self.abort(403, 'User ' + self.uid + ' may not see the Projects of User ' + uid) + if not self.superuser_request or uid: if admin_only: - query['permissions'] = {'$elemMatch': {'_id': self.uid, 'site': self.source_site, 'access': 'admin'}} + query['permissions'] = {'$elemMatch': {'_id': uid or self.uid, 'site': self.source_site, 'access': 'admin'}} else: - query['permissions'] = {'$elemMatch': {'_id': self.uid, 'site': self.source_site}} + query['permissions'] = {'$elemMatch': {'_id': uid or self.uid, 'site': self.source_site}} + projection['permissions'] = {'$elemMatch': {'_id': uid or self.uid, 'site': self.source_site}} containers = list(self.dbc.find(query, projection)) for container in containers: container['_id'] = str(container['_id']) @@ -155,10 +159,10 @@ class Container(base.RequestHandler): ticket_id = self.request.GET.get('ticket') if ticket_id: ticket = self.app.db.downloads.find_one({'_id': ticket_id}) - if not ticket: # FIXME need better security + if not ticket: self.abort(404, 'no such ticket') - if ticket['target'] != _id or ticket['filename'] != filename: - self.abort(400, 'ticket not for this resource') + if ticket['target'] != _id or ticket['filename'] != filename or ticket['ip'] != self.request.client_addr: + self.abort(400, 'ticket not for this resource or source IP') elif not container.get('public', False): self.abort(403, 'this ' + dbc_name + ' is not public') del container['permissions'] @@ -219,53 +223,87 @@ class Container(base.RequestHandler): if self.request.method == 'GET': container, _ = self._get(_id, 'ro', filename) return self._get_file(_id, container, filename) - elif self.request.method in ['POST', 'DELETE']: + elif self.request.method == 'DELETE': container, _ = self._get(_id, 'rw', filename) - return self._get_file(_id, container, filename) + return self._delete_file(_id, container, filename) elif self.request.method == 'PUT': container, _ = self._get(_id, 'rw', filename) return self._put_file(_id, container, filename) + elif self.request.method == 'POST': + container, _ = self._get(_id, 'rw', filename) + return self._post_file(_id, container, filename) else: self.abort(405) def _get_file(self, _id, container, filename): - """Download or delete one file.""" - # FIXME: - # we need a genral way to ask for a single file from a zip file - # that works for tiles as well as for dicoms - - for fileinfo in container.get('files', []): - if fileinfo['filename'] == filename: - break - else: + """Download one file.""" + fileinfo = util.container_fileinfo(container, filename) + if not fileinfo: self.abort(404, 'no such file') + hash_ = self.request.GET.get('hash') + if hash_ and hash_ != fileinfo['hash']: + self.abort(409, 'file exists, hash mismatch') filepath = os.path.join(self.app.config['data_path'], str(_id)[-3:] + '/' + str(_id), filename) - if self.request.method == 'GET': - self.response.app_iter = open(filepath, 'rb') - self.response.headers['Content-Length'] = str(fileinfo['filesize']) # must be set after setting app_iter - if self.request.GET.get('view', '').lower() in ('1', 'true'): - self.response.headers['Content-Type'] = str(fileinfo.get('mimetype', 'application/octet-stream')) - else: - self.response.headers['Content-Type'] = 'application/octet-stream' - self.response.headers['Content-Disposition'] = 'attachment; filename="' + filename + '"' - elif self.request.method == 'POST': - ticket = util.download_ticket('file', _id, filename, fileinfo['filesize']) - tkt_id = self.app.db.downloads.insert(ticket) - return {'ticket': tkt_id} - elif self.request.method == 'DELETE': - r = self.dbc.update_one({'_id': _id}, {'$pull': {'files': {'filename': filename}}}) - if r.modified_count != 1: - self.abort(400) # FIXME need better error checking - if os.path.exists(filepath): - os.remove(filepath) - log.info('removed file ' + filepath) + if self.request.GET.get('ticket') == '': # request for download ticket + ticket = util.download_ticket(self.request.client_addr, 'file', _id, filename, fileinfo['filesize']) + return {'ticket': self.app.db.downloads.insert(ticket)} + else: # authenticated or ticketed (unauthenticated) download + zip_member = self.request.GET.get('member') + if self.request.GET.get('info', '').lower() in ('1', 'true'): + try: + with zipfile.ZipFile(filepath) as zf: + return [(zi.filename, zi.file_size, util.format_timestamp(datetime.datetime(*zi.date_time))[0]) for zi in zf.infolist()] + except zipfile.BadZipfile: + self.abort(400, 'not a zip file') + elif self.request.GET.get('comment', '').lower() in ('1', 'true'): + try: + with zipfile.ZipFile(filepath) as zf: + self.response.write(zf.comment) + except zipfile.BadZipfile: + self.abort(400, 'not a zip file') + elif zip_member: + try: + with zipfile.ZipFile(filepath) as zf: + self.response.headers['Content-Type'] = util.guess_mimetype(zip_member) + self.response.write(zf.open(zip_member).read()) + except zipfile.BadZipfile: + self.abort(400, 'not a zip file') + except KeyError: + self.abort(400, 'zip file contains no such member') else: - log.warning(filepath + ' does not exist') + self.response.app_iter = open(filepath, 'rb') + self.response.headers['Content-Length'] = str(fileinfo['filesize']) # must be set after setting app_iter + if self.request.GET.get('view', '').lower() in ('1', 'true'): + self.response.headers['Content-Type'] = str(fileinfo.get('mimetype', 'application/octet-stream')) + else: + self.response.headers['Content-Type'] = 'application/octet-stream' + self.response.headers['Content-Disposition'] = 'attachment; filename="' + filename + '"' + + def _delete_file(self, _id, container, filename): + """Delete one file.""" + fileinfo = util.container_fileinfo(container, filename) + if not fileinfo: + self.abort(404, 'no such file') + filepath = os.path.join(self.app.config['data_path'], str(_id)[-3:] + '/' + str(_id), filename) + r = self.dbc.update_one({'_id': _id}, {'$pull': {'files': {'filename': filename}}}) + if r.modified_count != 1: + self.abort(400) # FIXME need better error checking + if os.path.exists(filepath): + os.remove(filepath) + log.info('removed file ' + filepath) else: - self.abort(405) + log.warning(filepath + ' does not exist') def _put_file(self, _id, container, filename): - """Receive a targeted processor or user upload.""" + """Update file metadata.""" + fileinfo = util.container_fileinfo(container, filename) + if not fileinfo: + self.abort(404, 'no such file') + # TODO: implement file metadata updates + self.abort(400, 'PUT is not yet implemented') + + def _post_file(self, _id, container, filename): + """Upload one file.""" tags = [] metadata = {} if self.request.content_type == 'multipart/form-data': @@ -299,11 +337,11 @@ class Container(base.RequestHandler): if 'Content-MD5' not in self.request.headers: self.abort(400, 'Request must contain a valid "Content-MD5" header.') try: - tags = json.loads(self.request.get('tags', '[]')) + tags = json.loads(self.request.GET.get('tags', '[]')) except ValueError: self.abort(400, 'invalid "tags" parameter') try: - metadata = json.loads(self.request.get('metadata', '{}')) + metadata = json.loads(self.request.GET.get('metadata', '{}')) except ValueError: self.abort(400, 'invalid "metadata" parameter') filestream = self.request.body_file @@ -335,28 +373,9 @@ class Container(base.RequestHandler): } throughput = filesize / duration.total_seconds() log.info('Received %s [%s, %s/s] from %s' % (filename, util.hrsize(filesize), util.hrsize(throughput), self.request.client_addr)) - util.commit_file(self.dbc, _id, datainfo, filepath, self.app.config['data_path']) - - def get_tile(self, cid): - """fetch info about a tiled tiff, or retrieve a specific tile.""" - _id = bson.ObjectId(cid) - container, _ = self._get(_id, 'ro') # need at least read access to view tiles - montage_info = None - for f in container.get('files'): - if f['filetype'] == 'montage': - montage_info = f - break - if not montage_info: - self.abort(404, 'montage zip not found') - fn = montage_info['filename'] - fp = os.path.join(self.app.config['data_path'], cid[-3:], cid, fn) - z = self.request.GET.get('z') - x = self.request.GET.get('x') - y = self.request.GET.get('y') - if not (z and x and y): - return util.get_info(fp) - else: - self.response.content_type = 'image/jpeg' - tile = util.get_tile(fp, int(z), int(x), int(y)) - if tile: - self.response.write(tile) + force = self.request.GET.get('force', '').lower() in ('1', 'true') + success = util.commit_file(self.dbc, _id, datainfo, filepath, self.app.config['data_path'], force) + if success is None: + self.abort(202, 'identical file exists') + elif success == False: + self.abort(409, 'file exists; use force to overwrite') diff --git a/core.py b/core.py index 6d9bbb5735b228788a01d36b0d3fd10a2b0a9a9a..41ba2bd1658d2fba323f13876f783deee92e7309 100644 --- a/core.py +++ b/core.py @@ -6,11 +6,12 @@ logging.getLogger('MARKDOWN').setLevel(logging.WARNING) # silence Markdown libra import os import re +import cgi import bson -import gzip import json import hashlib import tarfile +import zipfile import datetime import lockfile import markdown @@ -108,24 +109,6 @@ class Core(base.RequestHandler): """Return 200 OK.""" pass - def post(self): - if not self.app.config['demo']: - self.abort(400, 'API must be in demo mode') - try: - payload = self.request.json_body - jsonschema.validate(payload, RESET_SCHEMA) - except (ValueError, jsonschema.ValidationError) as e: - self.abort(400, str(e)) - if payload.get('reset', False): - self.app.db.projects.delete_many({}) - self.app.db.sessions.delete_many({}) - self.app.db.acquisitions.delete_many({}) - self.app.db.collections.delete_many({}) - self.app.db.jobs.delete_many({}) - for p in (self.app.config['data_path'] + '/' + d for d in os.listdir(self.app.config['data_path'])): - if p not in [self.app.config['upload_path'], self.app.config['quarantine_path']]: - shutil.rmtree(p) - def get(self): """Return API documentation""" resources = """ @@ -139,11 +122,11 @@ class Core(base.RequestHandler): [(/users/count)] | count of users [(/users/self)] | user identity [(/users/roles)] | user roles - [(/users/schema)] | schema for single user - /users/*<uid>* | details for user *<uid>* + [(/users/*<uid>*)] | details for user *<uid>* + [(/users/*<uid>*/groups)] | groups for user *<uid>* + [(/users/*<uid>*/projects)] | projects for user *<uid>* [(/groups)] | list of groups [(/groups/count)] | count of groups - [(/groups/schema)] | schema for single group /groups/*<gid>* | details for group *<gid>* /groups/*<gid>*/projects | list of projects for group *<gid>* /groups/*<gid>*/sessions | list of sessions for group *<gid>* @@ -168,10 +151,13 @@ class Core(base.RequestHandler): /collections/*<cid>* | details for collection *<cid>* /collections/*<cid>*/sessions | list of sessions for collection *<cid>* /collections/*<cid>*/acquisitions | list of acquisitions for collection *<cid>* + [(/schema/group)] | group schema + [(/schema/user)] | user schema """ if self.debug and self.uid: resources = re.sub(r'\[\((.*)\)\]', r'[\1](/api\1?user=%s)' % self.uid, resources) + resources = re.sub(r'(\(.*)\*<uid>\*(.*\))', r'\1%s\2' % self.uid, resources) else: resources = re.sub(r'\[\((.*)\)\]', r'[\1](/api\1)', resources) resources = resources.replace('<', '<').replace('>', '>').strip() @@ -203,13 +189,13 @@ class Core(base.RequestHandler): self.response.write('</body>\n') self.response.write('</html>\n') - def put(self): - """Receive a sortable reaper or user upload.""" - #if not self.uid and not self.drone_request: - # self.abort(402, 'uploads must be from an authorized user or drone') + def reaper(self): + """Receive a sortable reaper upload.""" + if not self.superuser_request: + self.abort(402, 'uploads must be from an authorized drone') if 'Content-MD5' not in self.request.headers: self.abort(400, 'Request must contain a valid "Content-MD5" header.') - filename = self.request.headers.get('Content-Disposition', '').partition('filename=')[2].strip('"') + filename = cgi.parse_header(self.request.headers.get('Content-Disposition', ''))[1].get('filename') if not filename: self.abort(400, 'Request must contain a valid "Content-Disposition" header.') with tempfile.TemporaryDirectory(prefix='.tmp', dir=self.app.config['upload_path']) as tempdir_path: @@ -217,15 +203,16 @@ class Core(base.RequestHandler): success, digest, filesize, duration = util.receive_stream_and_validate(self.request.body_file, filepath, self.request.headers['Content-MD5']) if not success: self.abort(400, 'Content-MD5 mismatch.') - if not tarfile.is_tarfile(filepath): - self.abort(415, 'Only tar files are accepted.') + if not zipfile.is_zipfile(filepath): + self.abort(415, 'Only ZIP files are accepted.') log.info('Received %s [%s] from %s' % (filename, util.hrsize(self.request.content_length), self.request.user_agent)) datainfo = util.parse_file(filepath, digest) if datainfo is None: util.quarantine_file(filepath, self.app.config['quarantine_path']) self.abort(202, 'Quarantining %s (unparsable)' % filename) - util.commit_file(self.app.db.acquisitions, None, datainfo, filepath, self.app.config['data_path']) - util.create_job(self.app.db.acquisitions, datainfo) # FIXME we should only mark files as new and let engine take it from there + success = util.commit_file(self.app.db.acquisitions, None, datainfo, filepath, self.app.config['data_path'], True) + if not success: + self.abort(202, 'Identical file exists') throughput = filesize / duration.total_seconds() log.info('Received %s [%s, %s/s] from %s' % (filename, util.hrsize(filesize), util.hrsize(throughput), self.request.client_addr)) @@ -246,8 +233,8 @@ class Core(base.RequestHandler): if not success: self.abort(400, 'Content-MD5 mismatch.') with lockfile.LockFile(arcpath): - with tarfile.open(arcpath, 'a') as archive: - archive.add(filepath, os.path.join(arcname, filename)) + with zipfile.ZipFile(arcpath, 'a', zipfile.ZIP_DEFLATED, allowZip64=True) as archive: + archive.write(filepath, os.path.join(arcname, filename)) if self.public_request: self.abort(403, 'must be logged in to upload data') @@ -277,16 +264,19 @@ class Core(base.RequestHandler): acq_no = overwrites.get('acq_no') arcname = overwrites['series_uid'] + ('_' + str(acq_no) if acq_no is not None else '') + '_' + filetype - ticket = util.upload_ticket(arcname=arcname) # store arcname for later reference + ticket = util.upload_ticket(self.request.client_addr, arcname=arcname) # store arcname for later reference self.app.db.uploads.insert_one(ticket) - arcpath = os.path.join(self.app.config['upload_path'], ticket['_id'] + '.tar') - store_file(self.request.body_file, filename, self.request.headers['Content-MD5'], arcpath, arcname) + arcpath = os.path.join(self.app.config['upload_path'], ticket['_id'] + '.zip') + with zipfile.ZipFile(arcpath, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) as archive: + archive.comment = json.dumps(json_body) return {'ticket': ticket['_id']} ticket = self.app.db.uploads.find_one({'_id': ticket_id}) if not ticket: self.abort(404, 'no such ticket') - arcpath = os.path.join(self.app.config['upload_path'], ticket_id + '.tar') + if ticket['ip'] != self.request.client_addr: + self.abort(400, 'ticket not for this source IP') + arcpath = os.path.join(self.app.config['upload_path'], ticket_id + '.zip') if self.request.GET.get('complete', '').lower() not in ('1', 'true'): if 'Content-MD5' not in self.request.headers: @@ -297,21 +287,16 @@ class Core(base.RequestHandler): self.abort(400, 'Request must contain a filename query parameter.') self.app.db.uploads.update_one({'_id': ticket_id}, {'$set': {'timestamp': datetime.datetime.utcnow()}}) # refresh ticket store_file(self.request.body_file, filename, self.request.headers['Content-MD5'], arcpath, ticket['arcname']) - else: # complete -> zip, hash, commit - filepath = arcpath[:-2] + 'gz' - with gzip.open(filepath, 'wb', compresslevel=6) as gzfile: - with open(arcpath) as rawfile: - gzfile.writelines(rawfile) - os.remove(arcpath) + else: # complete -> hash, commit sha1 = hashlib.sha1() - with open(filepath, 'rb') as fd: + with open(arcpath, 'rb') as fd: for chunk in iter(lambda: fd.read(2**20), ''): sha1.update(chunk) - datainfo = util.parse_file(filepath, sha1.hexdigest()) + datainfo = util.parse_file(arcpath, sha1.hexdigest()) if datainfo is None: - util.quarantine_file(filepath, self.app.config['quarantine_path']) + util.quarantine_file(arcpath, self.app.config['quarantine_path']) self.abort(202, 'Quarantining %s (unparsable)' % filename) - util.commit_file(self.app.db.acquisitions, None, datainfo, filepath, self.app.config['data_path']) + util.commit_file(self.app.db.acquisitions, None, datainfo, arcpath, self.app.config['data_path']) def _preflight_archivestream(self, req_spec): data_path = self.app.config['data_path'] @@ -363,7 +348,7 @@ class Core(base.RequestHandler): total_size, file_cnt = append_targets(targets, acq, prefix, total_size, file_cnt) log.debug(json.dumps(targets, sort_keys=True, indent=4, separators=(',', ': '))) filename = 'sdm_' + datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S') + '.tar' - ticket = util.download_ticket('batch', targets, filename, total_size) + ticket = util.download_ticket(self.request.client_addr, 'batch', targets, filename, total_size) self.app.db.downloads.insert(ticket) return {'ticket': ticket['_id'], 'file_cnt': file_cnt, 'size': total_size} @@ -388,6 +373,8 @@ class Core(base.RequestHandler): ticket = self.app.db.downloads.find_one({'_id': ticket_id}) if not ticket: self.abort(404, 'no such ticket') + if ticket['ip'] != self.request.client_addr: + self.abort(400, 'ticket not for this source IP') self.response.app_iter = self._archivestream(ticket) self.response.headers['Content-Type'] = 'application/octet-stream' self.response.headers['Content-Disposition'] = 'attachment; filename=' + str(ticket['filename']) diff --git a/jobs.py b/jobs.py index e515c8ad981ea86e7b58043abcd6d02dae948adb..6015abde5c1760bd3af17587e737644096662de8 100644 --- a/jobs.py +++ b/jobs.py @@ -5,38 +5,172 @@ API request handlers for process-job-handling. """ import logging -import datetime log = logging.getLogger('scitran.jobs') +import bson +import pymongo +import datetime + import base +import util + +JOB_STATES = [ + 'pending', # Job is queued + 'running', # Job has been handed to an engine and is being processed + 'failed', # Job has an expired heartbeat (orphaned) or has suffered an error + 'complete', # Job has successfully completed -# TODO: what should this whitelist contain? protocol + FQDN? -# ex. https://coronal.stanford.edu -PROCESSOR_WHITELIST = [ - 'dockerhost', ] -JOB_STATES = [ - 'pending', # created but not started - 'queued', # job claimed by a processor - 'running', # job running on a processor - 'done', # job completed successfully - 'failed', # some error occurred, - 'paused', # job paused. can't think when this would be useful... +JOB_STATES_ALLOWED_MUTATE = [ + 'pending', + 'running', +] + +JOB_TRANSITIONS = [ + "pending --> running", + "running --> failed", + "running --> complete", ] -# Jobs must now how they affect the various components of a file description -# some "special" case things will reset state from 'orig' to 'pending' -# but the usual case will be to append an item to the state list. +def valid_transition(from_state, to_state): + return (from_state + " --> " + to_state) in JOB_TRANSITIONS or from_state == to_state + +ALGORITHMS = [ + "dcm2nii" +] + + +# TODO: json schema + + +def spawn_jobs(db, containers, file): + """ + Spawn some number of queued jobs to process a file. + + Parameters + ---------- + db: pymongo.database.Database + Reference to the database instance + containers: [ tuple(string, scitran.Container) ] + An array of tuples, each containing a container type name, and a container object. + Contract is: + 1) The first container in the array will be the container which owns file passed in the file param. + 2) Following array indexes, if any, will be higher in the ownership heirarchy than the first container. + 3) Array is not guaranteed to be strictly hierarchically ordered. + file: scitran.File + File object that is used to spawn 0 or more jobs. + """ + + # File information + filename = file['filename'] + filehash = file['filehash'] -# TODO: create job function should live here -# where it can be editted with the route that consume and modify the jobs + # File container information + last = len(containers) - 1 + container_type, container = containers[last] + container_id = container['_id'] -# GET /jobs full list of jobs, allow specifiers, status= -# POST /jobs creates a new job. this will be used by webapp to add new jobs -# GET /jobs/<_id> get information about one job -# PUT /jobs/<_id> update informabout about one job -# GET /jobs/next, special route to get the 'next job' + log.info('File ' + filename + 'is in a ' + container_type + ' with id ' + str(container_id) + ' and hash ' + filehash) + + # Spawn rules currently do not look at container hierarchy, and only care about a single file. + # Further, one algorithm is unconditionally triggered for each dirty file. + + queue_job(db, 'dcm2nii', container_type, container_id, filename, filehash) + + +def queue_job(db, algorithm_id, container_type, container_id, filename, filehash, attempt_n=1, previous_job_id=None): + """ + Enqueues a job for execution. + + Parameters + ---------- + db: pymongo.database.Database + Reference to the database instance + algorithm_id: string + Human-friendly unique name of the algorithm + container_type: string + Type of container ('acquisition', 'session', etc) + container_id: string + ID of the container ('2', etc) + filename: string + Name of the file to download + filehash: string + Hash of the file to download + attempt_n: integer (optional) + If an equivalent job has tried & failed before, pass which attempt number we're at. Defaults to 1 (no previous attempts). + """ + + if algorithm_id not in ALGORITHMS: + raise Exception('Usupported algorithm ' + algorithm_id) + + # TODO validate container exists + + now = datetime.datetime.utcnow() + + job = { + 'state': 'pending', + + 'created': now, + 'modified': now, + + # We need all these keys to re-run this job if it fails. + 'algorithm_id': algorithm_id, + 'container_id': container_id, + 'container_type': container_type, + 'container_type': algorithm_id, + 'filename': filename, + 'filehash': filehash, + 'attempt': attempt_n, + + 'formula': { + 'inputs': [ + { + 'type': 'scitran', + 'location': '/', + 'URI': 'TBD', + }, + { + 'type': 'scitran', + 'location': '/script', + 'URI': 'TBD', + }, + + ], + + 'accents': { + 'cwd': "/script", + 'command': [ 'TBD' ], + 'environment': { }, + }, + + 'outputs': [ + { + 'type': 'scitran', + 'location': '/output', + 'URI': 'TBD', + }, + ], + } + + } + + if previous_job_id is not None: + job['previous_job_id'] = previous_job_id + + result = db.jobs.insert_one(job) + _id = result.inserted_id + + log.info('Running %s as job %s to process %s %s' % (algorithm_id, str(_id), container_type, container_id)) + return _id + +def serialize_job(job): + if job: + job['_id'] = str(job['_id']) + job['created'] = util.format_timestamp(job['created']) + job['modified'] = util.format_timestamp(job['modified']) + + return job class Jobs(base.RequestHandler): @@ -44,87 +178,94 @@ class Jobs(base.RequestHandler): def get(self): """ - Return one Job that needs processing. + List all jobs. Not used by engine. + """ + if not self.superuser_request: + self.abort(401, 'Request requires superuser') - TODO: allow querying for group - TODO: allow querying for project - TODO: allow querying by other meta data. can this be generalized? + results = list(self.app.db.jobs.find()) + for result in results: + result = serialize_job(result) - """ - # TODO: auth - return list(self.app.db.jobs.find()) + return results def count(self): - """Return the total number of jobs.""" - # no auth? + """Return the total number of jobs. Not used by engine.""" + if not self.superuser_request: + self.abort(401, 'Request requires superuser') + return self.app.db.jobs.count() - def counts(self): - """Return more information about the jobs.""" - counts = { - 'total': self.app.db.jobs.count(), - 'failed': self.app.db.jobs.find({'status': 'failed'}).count(), - 'pending': self.app.db.jobs.find({'status': 'pending'}).count(), - 'done': self.app.db.jobs.find({'status': 'done'}).count(), - } - return counts + def addTestJob(self): + """Adds a harmless job for testing purposes. Intentionally equivalent return to queue_job.""" + if not self.superuser_request: + self.abort(401, 'Request requires superuser') + + return queue_job(self.app.db, 'dcm2nii', 'acquisition', '55bf861e6941f040cf8d6939') def next(self): - """Return the next job in the queue that matches the query parameters.""" - # TODO: add ability to query on things like psd type or psd name - try: - query_params = self.request.json - except ValueError as e: - self.abort(400, str(e)) - - query = {'status': 'pending'} - try: - query_params = self.request.json - except ValueError as e: - self.abort(400, str(e)) - - project_query = query_params.get('project') - group_query = query_params.get('group') - query = {'status': 'pending'} - if project_query: - query.update({'project': project_query}) - if group_query: - query.update({'group': group_query}) - - # TODO: how to guarantee the 'oldest' jobs pending jobs are given out first - job_spec = self.app.db.jobs.find_and_modify( - query, - {'$set': {'status': 'queued', 'modified': datetime.datetime.now()}}, + """ + Atomically change a 'pending' job to 'running' and returns it. Updates timestamp. + Will return empty if there are no jobs to offer. + Engine will poll this endpoint whenever there are free processing slots. + """ + if not self.superuser_request: + self.abort(401, 'Request requires superuser') + + # REVIEW: is this atomic? + # REVIEW: semantics are not documented as to this mutation's behaviour when filter matches no docs. + result = self.app.db.jobs.find_one_and_update( + { + 'state': 'pending' + }, + { '$set': { + 'state': 'running', + 'modified': datetime.datetime.utcnow()} + }, sort=[('modified', -1)], - new=True + return_document=pymongo.collection.ReturnDocument.AFTER ) - return job_spec + if result == None: + self.abort(400, 'No jobs to process') + + return serialize_job(result) class Job(base.RequestHandler): """Provides /Jobs/<jid> routes.""" - # TODO flesh out the job schema - json_schema = { - '$schema': 'http://json-schema.org/draft-04/schema#', - 'title': 'User', - 'type': 'object', - 'properties': { - '_id': { - 'title': 'Job ID', - 'type': 'string', - }, - }, - 'required': ['_id'], - 'additionalProperties': True, - } - def get(self, _id): - return self.app.db.jobs.find_one({'_id': int(_id)}) + if not self.superuser_request: + self.abort(401, 'Request requires superuser') + + result = self.app.db.jobs.find_one({'_id': bson.ObjectId(_id)}) + return serialize_job(result) def put(self, _id): - """Update a single job.""" - payload = self.request.json - # TODO: validate the json before updating the db - self.app.db.jobs.update({'_id': int(_id)}, {'$set': {'status': payload.get('status'), 'activity': payload.get('activity')}}) + """ + Update a job. Updates timestamp. + Enforces a valid state machine transition, if any. + Rejects any change to a job that is not currently in 'pending' or 'running' state. + """ + if not self.superuser_request: + self.abort(401, 'Request requires superuser') + + mutation = self.request.json + job = self.app.db.jobs.find_one({'_id': bson.ObjectId(_id)}) + + if job is None: + self.abort(404, 'Job not found') + + if job['state'] not in JOB_STATES_ALLOWED_MUTATE: + self.abort(404, 'Cannot mutate a job that is ' + job['state'] + '.') + + if 'state' in mutation and not valid_transition(job['state'], mutation['state']): + self.abort(404, 'Mutating job from ' + job['state'] + ' to ' + mutation['state'] + ' not allowed.') + + # Any modification must be a timestamp update + mutation['modified'] = datetime.datetime.utcnow() + + # REVIEW: is this atomic? + # As far as I can tell, update_one vs find_one_and_update differ only in what they return. + self.app.db.jobs.update_one(job, {'$set': mutation}) diff --git a/projects.py b/projects.py index 602d755e21854ec7bc81a37254c1fd6b1e8f119c..3915909c015148d87901740e73e1e97c798801b5 100644 --- a/projects.py +++ b/projects.py @@ -4,6 +4,7 @@ import logging log = logging.getLogger('scitran.api') import bson +import datetime import scitran.data.medimg @@ -105,12 +106,16 @@ class Projects(containers.ContainerList): if not self.superuser_request and util.user_perm(group['roles'], self.uid).get('access') != 'admin': self.abort(400, 'must be group admin to create project') json_body['group'] = gid - json_body['permissions'] = group['roles'] + if self.request.GET.get('inherit', '').lower() in ('1', 'true'): + json_body['permissions'] = group['roles'] + else: + json_body['permissions'] = [{'_id': self.uid, 'access': 'admin'}] json_body['public'] = json_body.get('public', False) json_body['files'] = [] + json_body['timestamp'] = datetime.datetime.utcnow() return {'_id': str(self.dbc.insert(json_body))} - def get(self, gid=None): + def get(self, uid=None, gid=None): """Return the User's list of Projects.""" if gid is not None: group = self.app.db.groups.find_one({'_id': gid}, []) @@ -118,7 +123,7 @@ class Projects(containers.ContainerList): self.abort(400, 'invalid group id') query = {'group': gid} if gid else {} projection = ['group', 'name', 'notes', 'timestamp', 'timezone'] - projects = self._get(query, projection, self.request.GET.get('admin', '').lower() in ('1', 'true')) + projects = self._get(query, projection, self.request.GET.get('admin', '').lower() in ('1', 'true'), uid) if self.debug: for proj in projects: pid = str(proj['_id']) diff --git a/schema.json b/schema.json new file mode 100644 index 0000000000000000000000000000000000000000..e99167804951e0fdd2bb62cc2f81d7b963b02bc1 --- /dev/null +++ b/schema.json @@ -0,0 +1,105 @@ +{ + "group": { + "$schema": "http://json-schema.org/draft-04/schema#", + "properties": { + "_id": { + "maxLength": 32, + "minLength": 2, + "pattern": "^[0-9a-z.@_-]*$", + "title": "ID", + "type": "string" + }, + "name": { + "maxLength": 32, + "minLength": 2, + "pattern": "^[0-9A-Za-z .@_-]*$", + "title": "Name", + "type": "string" + }, + "roles": { + "default": [], + "items": { + "additionalProperties": false, + "properties": { + "_id": { + "type": "string" + }, + "access": { + "enum": [ + "ro", + "rw", + "admin" + ], + "type": "string" + } + }, + "required": [ + "access", + "_id" + ], + "type": "object" + }, + "title": "Roles", + "type": "array", + "uniqueItems": true + } + }, + "required": [ + "_id" + ], + "title": "Group", + "type": "object" + }, + "user": { + "$schema": "http://json-schema.org/draft-04/schema#", + "additionalProperties": false, + "properties": { + "_id": { + "maxLength": 32, + "minLength": 2, + "pattern": "^[0-9a-z.@_-]*$", + "title": "ID", + "type": "string" + }, + "avatar": { + "format": "uri", + "title": "Avatar", + "type": "string" + }, + "email": { + "format": "email", + "title": "Email", + "type": "string" + }, + "firstname": { + "maxLength": 32, + "minLength": 2, + "title": "First Name", + "type": "string" + }, + "lastname": { + "maxLength": 32, + "minLength": 2, + "title": "Last Name", + "type": "string" + }, + "preferences": { + "title": "Preferences", + "type": "object" + }, + "root": { + "type": "boolean" + }, + "wheel": { + "type": "boolean" + } + }, + "required": [ + "_id", + "firstname", + "lastname" + ], + "title": "User", + "type": "object" + } +} diff --git a/users.py b/users.py index c391ca53c8e6962cc056ad291a6e539cc15c1457..b0bf691601e63bc2aa6c585ef6380c1025ea2157 100644 --- a/users.py +++ b/users.py @@ -6,6 +6,7 @@ log = logging.getLogger('scitran.api') import copy import hashlib import pymongo +import datetime import jsonschema import base @@ -47,12 +48,15 @@ class Users(base.RequestHandler): self.abort(403, 'must be logged in to create new user') try: json_body = self.request.json_body - jsonschema.validate(json_body, User.json_schema) + jsonschema.validate(json_body, User.post_schema) + json_body['created'] = datetime.datetime.utcnow() + json_body['modified'] = datetime.datetime.utcnow() json_body.setdefault('email', json_body['_id']) - json_body['email_hash'] = hashlib.md5(json_body['email']).hexdigest() + json_body.setdefault('preferences', {}) + json_body.setdefault('avatar', 'https://gravatar.com/avatar/' + hashlib.md5(json_body['email']).hexdigest() + '?s=512&d=mm') self.dbc.insert(json_body) except (ValueError, jsonschema.ValidationError) as e: - self.abort(400, str(e)) + self.abort(400, e) except pymongo.errors.DuplicateKeyError as e: self.abort(400, 'User ID %s already exists' % json_body['_id']) @@ -60,7 +64,10 @@ class Users(base.RequestHandler): """Return the list of Users.""" if self.public_request: self.abort(403, 'must be logged in to retrieve User list') - users = list(self.dbc.find({}, ['firstname', 'lastname', 'email_hash', 'wheel'])) + users = list(self.dbc.find({}, {'preferences': False})) + for user in users: + user['created'], _ = util.format_timestamp(user['created']) # TODO json serializer should do this + user['modified'], _ = util.format_timestamp(user['modified']) # TODO json serializer should do this if self.debug: for user in users: user['debug'] = {} @@ -72,61 +79,21 @@ class User(base.RequestHandler): """/users/<_id> """ - json_schema = { - '$schema': 'http://json-schema.org/draft-04/schema#', - 'title': 'User', - 'type': 'object', - 'properties': { - '_id': { - 'title': 'User ID', - 'type': 'string', - }, - 'firstname': { - 'title': 'First Name', - 'type': 'string', - }, - 'lastname': { - 'title': 'Last Name', - 'type': 'string', - }, - 'email': { - 'title': 'Email', - 'type': 'string', - 'format': 'email', - }, - 'email_hash': { - 'type': 'string', - }, - 'root': { - 'type': 'boolean', - }, - 'wheel': { - 'type': 'boolean', - }, - 'preferences': { - 'title': 'Preferences', - 'type': 'object', - 'properties': { - 'data_layout': { - 'type': 'string', - }, - }, - }, - }, - 'required': ['_id', 'firstname', 'lastname'], - 'additionalProperties': False, - } - def __init__(self, request=None, response=None): super(User, self).__init__(request, response) self.dbc = self.app.db.users + def schema(self): + method =self.request.GET.get('method', '').lower() + if method == 'put': + return self.put_schema + return self.post_schema + def self(self): """Return details for the current User.""" - user = self.dbc.find_one({'_id': self.uid}, ['firstname', 'lastname', 'root', 'wheel', 'preferences', 'email_hash']) + user = self.dbc.find_one({'_id': self.uid}) if not user: self.abort(400, 'no user is logged in') - user.setdefault('preferences', {}) return user def roles(self): @@ -145,6 +112,8 @@ class User(base.RequestHandler): user = self.dbc.find_one({'_id': _id}, projection or None) if not user: self.abort(404, 'no such User') + user['created'], _ = util.format_timestamp(user['created']) # TODO json serializer should do this + user['modified'], _ = util.format_timestamp(user['modified']) # TODO json serializer should do this if self.debug and (self.superuser_request or _id == self.uid): user['debug'] = {} user['debug']['groups'] = self.uri_for('groups', _id, _full=True) + '?' + self.request.query_string @@ -157,17 +126,14 @@ class User(base.RequestHandler): self.abort(404, 'no such User') if not self.superuser_request and _id != self.uid: self.abort(403, 'must be superuser to update another User') - schema = copy.deepcopy(self.json_schema) - del schema['required'] try: json_body = self.request.json_body - jsonschema.validate(json_body, schema) + jsonschema.validate(json_body, self.put_schema) except (ValueError, jsonschema.ValidationError) as e: - self.abort(400, str(e)) + self.abort(400, e) if _id == self.uid and 'wheel' in json_body and json_body['wheel'] != user['wheel']: self.abort(400, 'user cannot alter own superuser privilege') - if 'email' in json_body and json_body['email'] != user.get('email'): - json_body['email_hash'] = hashlib.md5(json_body['email']).hexdigest() + json_body['modified'] = datetime.datetime.utcnow() self.dbc.update({'_id': _id}, {'$set': util.mongo_dict(json_body)}) def delete(self, _id): @@ -195,31 +161,40 @@ class Groups(base.RequestHandler): self.abort(403, 'must be logged in and superuser to create new group') try: json_body = self.request.json_body - jsonschema.validate(json_body, Group.json_schema) + jsonschema.validate(json_body, Group.post_schema) + json_body['created'] = datetime.datetime.utcnow() + json_body['modified'] = datetime.datetime.utcnow() + json_body.setdefault('roles', []) self.dbc.insert(json_body) except (ValueError, jsonschema.ValidationError) as e: - self.abort(400, str(e)) + self.abort(400, e) except pymongo.errors.DuplicateKeyError as e: self.abort(400, 'Groups ID %s already exists' % json_body['_id']) def get(self, _id=None): """Return the list of Groups.""" query = None + projection = ['name', 'created', 'modified'] if _id is not None: if _id != self.uid and not self.superuser_request: self.abort(403, 'User ' + self.uid + ' may not see the Groups of User ' + _id) query = {'roles._id': _id} + projection += ['roles.$'] else: if not self.superuser_request: if self.request.GET.get('admin', '').lower() in ('1', 'true'): query = {'roles': {'$elemMatch': {'_id': self.uid, 'access': 'admin'}}} else: query = {'roles._id': self.uid} - groups = list(self.app.db.groups.find(query, ['name'])) + projection += ['roles.$'] + groups = list(self.app.db.groups.find(query, projection)) + for group in groups: + group['created'], _ = util.format_timestamp(group['created']) # TODO json serializer should do this + group['modified'], _ = util.format_timestamp(group['modified']) # TODO json serializer should do this if self.debug: for group in groups: group['debug'] = {} - group['debug']['projects'] = self.uri_for('g_projects', group['_id'], _full=True) + '?' + self.request.query_string + group['debug']['projects'] = self.uri_for('g_projects', gid=group['_id'], _full=True) + '?' + self.request.query_string group['debug']['sessions'] = self.uri_for('g_sessions', gid=group['_id'], _full=True) + '?' + self.request.query_string group['debug']['details'] = self.uri_for('group', group['_id'], _full=True) + '?' + self.request.query_string return groups @@ -229,48 +204,16 @@ class Group(base.RequestHandler): """/groups/<_id>""" - json_schema = { - '$schema': 'http://json-schema.org/draft-04/schema#', - 'title': 'Group', - 'type': 'object', - 'properties': { - '_id': { - 'title': 'Group ID', - 'type': 'string', - }, - 'name': { - 'title': 'Name', - 'type': 'string', - 'maxLength': 32, - }, - 'roles': { - 'title': 'Roles', - 'type': 'array', - 'default': [], - 'items': { - 'type': 'object', - 'properties': { - 'access': { - 'type': 'string', - 'enum': [role['rid'] for role in ROLES], - }, - '_id': { - 'type': 'string', - }, - }, - 'required': ['access', '_id'], - 'additionalProperties': False, - }, - 'uniqueItems': True, - }, - }, - 'required': ['_id'], - } - def __init__(self, request=None, response=None): super(Group, self).__init__(request, response) self.dbc = self.app.db.groups + def schema(self): + method =self.request.GET.get('method', '').lower() + if method == 'put': + return self.put_schema + return self.post_schema + def get(self, _id): """Return Group details.""" group = self.app.db.groups.find_one({'_id': _id}) @@ -280,9 +223,11 @@ class Group(base.RequestHandler): group = self.app.db.groups.find_one({'_id': _id, 'roles': {'$elemMatch': {'_id': self.uid, 'access': 'admin'}}}) if not group: self.abort(403, 'User ' + self.uid + ' is not an admin of Group ' + _id) + group['created'], _ = util.format_timestamp(group['created']) # TODO json serializer should do this + group['modified'], _ = util.format_timestamp(group['modified']) # TODO json serializer should do this if self.debug: group['debug'] = {} - group['debug']['projects'] = self.uri_for('g_projects', group['_id'], _full=True) + '?' + self.request.query_string + group['debug']['projects'] = self.uri_for('g_projects', gid=group['_id'], _full=True) + '?' + self.request.query_string group['debug']['sessions'] = self.uri_for('g_sessions', gid=group['_id'], _full=True) + '?' + self.request.query_string return group @@ -294,13 +239,12 @@ class Group(base.RequestHandler): user_perm = util.user_perm(group.get('roles', []), self.uid) if not self.superuser_request and not user_perm.get('access') == 'admin': self.abort(403, 'must be superuser or group admin to update group') - schema = copy.deepcopy(self.json_schema) - del schema['required'] try: json_body = self.request.json_body - jsonschema.validate(json_body, schema) + jsonschema.validate(json_body, self.put_schema) except (ValueError, jsonschema.ValidationError) as e: - self.abort(400, str(e)) + self.abort(400, e) + json_body['modified'] = datetime.datetime.utcnow() self.dbc.update({'_id': _id}, {'$set': util.mongo_dict(json_body)}) def delete(self, _id): diff --git a/util.py b/util.py index 69b628ee123fcc502a9882bb3fc00d23c657ed65..6be85a04fd63fe97fa60fb9554d5bb0e0a803d4e 100644 --- a/util.py +++ b/util.py @@ -4,22 +4,19 @@ import logging log = logging.getLogger('scitran.api') import os -import bson import copy -import json import pytz import uuid import shutil import difflib import hashlib -import tarfile +import zipfile import datetime import mimetypes import dateutil.parser import tempdir as tempfile import scitran.data -import scitran.data.medimg.montage MIMETYPES = [ ('.bvec', 'text', 'bvec'), @@ -31,9 +28,6 @@ MIMETYPES = [ for mt in MIMETYPES: mimetypes.types_map.update({mt[0]: mt[1] + '/' + mt[2]}) -get_info = scitran.data.medimg.montage.get_info -get_tile = scitran.data.medimg.montage.get_tile - valid_timezones = pytz.all_timezones PROJECTION_FIELDS = ['group', 'timestamp', 'permissions', 'public'] @@ -44,8 +38,8 @@ def parse_file(filepath, digest): try: log.info('Parsing %s' % filename) dataset = scitran.data.parse(filepath) - except scitran.data.DataError: - log.info('Unparsable %s' % filename) + except scitran.data.DataError as exp: + log.info('Unparsable %s (%s)' % (filename, exp)) return None filename = dataset.nims_file_name + dataset.nims_file_ext fileinfo = { @@ -54,7 +48,6 @@ def parse_file(filepath, digest): 'filesize': os.path.getsize(filepath), 'filetype': dataset.nims_file_type, 'filehash': digest, - 'datahash': None, #dataset.nims_hash, TODO: datasets should be able to hash themselves (but not here) 'modality': dataset.nims_file_domain, 'datatypes': dataset.nims_file_kinds, 'flavor': 'data', @@ -83,7 +76,7 @@ def quarantine_file(filepath, quarantine_path): shutil.move(filepath, q_path) -def commit_file(dbc, _id, datainfo, filepath, data_path): +def commit_file(dbc, _id, datainfo, filepath, data_path, force=False): """Insert a file as an attachment or as a file.""" filename = os.path.basename(filepath) fileinfo = datainfo['fileinfo'] @@ -91,14 +84,33 @@ def commit_file(dbc, _id, datainfo, filepath, data_path): if _id is None: _id = _update_db(dbc.database, datainfo) container_path = os.path.join(data_path, str(_id)[-3:] + '/' + str(_id)) + target_filepath = container_path + '/' + fileinfo['filename'] if not os.path.exists(container_path): os.makedirs(container_path) - r = dbc.update_one({'_id':_id, 'files.filename': fileinfo['filename']}, {'$set': {'files.$': fileinfo}}) - #TODO figure out if file was actually updated and return that fact - if r.matched_count != 1: - dbc.update({'_id': _id}, {'$push': {'files': fileinfo}}) - shutil.move(filepath, container_path + '/' + fileinfo['filename']) + container = dbc.find_one_and_update({'_id':_id, 'files.filename': fileinfo['filename']}, {'$set': {'files.$': fileinfo}}) + if container: # file already exists + for f in container['files']: + if f['filename'] == fileinfo['filename']: + if not force: + updated = False + elif identical_content(target_filepath, f['filehash'], filepath, fileinfo['filehash']): # existing file has identical content + log.debug('Dropping %s (identical)' % filename) + os.remove(filepath) + updated = None + else: # existing file has different content + log.debug('Replacing %s' % filename) + shutil.move(filepath, target_filepath) + dbc.update_one({'_id':_id, 'files.filename': fileinfo['filename']}, {'$set': {'files.$.dirty': True}}) + updated = True + break + else: # file does not exist + log.debug('Adding %s' % filename) + fileinfo['dirty'] = True + shutil.move(filepath, target_filepath) + dbc.update_one({'_id': _id}, {'$push': {'files': fileinfo}}) + updated = True log.debug('Done %s' % filename) + return updated def _update_db(db, datainfo): @@ -170,102 +182,22 @@ def _entity_metadata(dataset, properties, metadata={}, parent_key=''): return metadata -# TODO: create job should be use-able from bootstrap.py with only database information -def create_job(dbc, datainfo): - fileinfo = datainfo['fileinfo'] - db = dbc.database - type_ = fileinfo['filetype'] - kinds_ = fileinfo['datatypes'] - state_ = ['orig'] # dataset.nims_file_state ### WHAT IS THIS AND WHY DO WE CARE? - app = None - # TODO: check if there are 'default apps' set for this project/session/acquisition - acquisition = db.acquisitions.find_one({'uid': datainfo['acquisition_id']}) - session = db.sessions.find_one({'_id': acquisition.get('session')}) - project = db.projects.find_one({'_id': session.get('project')}) - aid = acquisition.get('_id') - - # XXX: if an input kinds = None, then that job is meant to work on any file kinds - app = db.apps.find_one({ - '$or': [ - {'inputs': {'$elemMatch': {'type': type_, 'state': state_, 'kinds': kinds_}}, 'default': True}, - {'inputs': {'$elemMatch': {'type': type_, 'state': state_, 'kinds': None}}, 'default': True}, - ], - }) - # TODO: this has to move... - # force acquisition dicom file to be marked as 'optional = True' - db.acquisitions.find_and_modify( - {'uid': datainfo['acquisition_id'], 'files.type': 'dicom'}, - {'$set': {'files.$.optional': True}}, - ) - - if not app: - log.info('no app for type=%s, state=%s, kinds=%s, default=True. no job created.' % (type_, state_, kinds_)) +def identical_content(filepath1, digest1, filepath2, digest2): + if zipfile.is_zipfile(filepath1) and zipfile.is_zipfile(filepath2): + with zipfile.ZipFile(filepath1) as zf1, zipfile.ZipFile(filepath2) as zf2: + zf1_infolist = sorted(zf1.infolist(), key=lambda zi: zi.filename) + zf2_infolist = sorted(zf2.infolist(), key=lambda zi: zi.filename) + if zf1.comment != zf2.comment: + return False + if len(zf1_infolist) != len(zf2_infolist): + return False + for zii, zij in zip(zf1_infolist, zf2_infolist): + if zii.CRC != zij.CRC: + return False + else: + return True else: - # XXX: outputs can specify to __INHERIT__ a value from the parent input file, for ex: kinds - for output in app['outputs']: - if output['kinds'] == '__INHERIT__': - output['kinds'] = kinds_ - - # TODO: job description needs more metadata to be searchable in a useful way - output_url = '%s/%s/%s' % ('acquisitions', aid, 'file') - job = db.jobs.find_and_modify( - { - '_id': db.jobs.count() + 1, - }, - { - '_id': db.jobs.count() + 1, - 'group': project.get('group'), - 'project': { - '_id': project.get('_id'), - 'name': project.get('name'), - }, - 'exam': session.get('exam'), - 'app': { - '_id': app['_id'], - 'type': 'docker', - }, - 'inputs': [ - { - 'filename': fileinfo['filename'], - 'url': '%s/%s/%s' % ('acquisitions', aid, 'file'), - 'payload': { - 'type': type_, - 'state': state_, - 'kinds': kinds_, - }, - } - ], - 'outputs': [{'url': output_url, 'payload': i} for i in app['outputs']], - 'status': 'pending', - 'activity': None, - 'added': datetime.datetime.now(), - 'timestamp': datetime.datetime.now(), - }, - upsert=True, - new=True, - ) - log.info('created job %d, group: %s, project %s' % (job['_id'], job['group'], job['project'])) - - -def insert_app(db, fp, apps_path, app_meta=None): - """Validate and insert an application tar into the filesystem and database.""" - # download, md-5 check, and json validation are handled elsewhere - if not app_meta: - with tarfile.open(fp) as tf: - for ti in tf: - if ti.name.endswith('description.json'): - app_meta = json.load(tf.extractfile(ti)) - break - - name, version = app_meta.get('_id').split(':') - app_dir = os.path.join(apps_path, name) - if not os.path.exists(app_dir): - os.makedirs(app_dir) - app_tar = os.path.join(app_dir, '%s-%s.tar' % (name, version)) - - app_meta.update({'asset_url': 'apps/%s' % app_meta.get('_id')}) - db.apps.update({'_id': app_meta.get('_id')}, app_meta, upsert=True) - shutil.move(fp, app_tar) + return digest1 == digest2 def hrsize(size): @@ -295,24 +227,34 @@ def user_perm(permissions, _id, site=None): return {} -def upload_ticket(**kwargs): +def container_fileinfo(container, filename): + for fileinfo in container.get('files', []): + if fileinfo['filename'] == filename: + return fileinfo + else: + return None + + +def upload_ticket(ip, **kwargs): ticket = { '_id': str(uuid.uuid4()), 'timestamp': datetime.datetime.utcnow(), + 'ip': ip, } ticket.update(kwargs) return ticket -def download_ticket(type_, target, filename, size): +def download_ticket(ip, type_, target, filename, size): return { - '_id': str(uuid.uuid4()), - 'timestamp': datetime.datetime.utcnow(), - 'type': type_, - 'target': target, - 'filename': filename, - 'size': size, - } + '_id': str(uuid.uuid4()), + 'timestamp': datetime.datetime.utcnow(), + 'ip': ip, + 'type': type_, + 'target': target, + 'filename': filename, + 'size': size, + } def receive_stream_and_validate(stream, filepath, received_md5): @@ -337,7 +279,7 @@ def guess_mimetype(filepath): def guess_filetype(filepath, mimetype): - """Guess MIME type based on filename.""" + """Guess file type based on filename and MIME type.""" type_, subtype = mimetype.split('/') if filepath.endswith('.nii') or filepath.endswith('.nii.gz'): return 'nifti'