Skip to content
Snippets Groups Projects
util.py 12.2 KiB
Newer Older
Gunnar Schaefer's avatar
Gunnar Schaefer committed
# @author:  Gunnar Schaefer

import logging
log = logging.getLogger('scitran.api')

import os
Gunnar Schaefer's avatar
Gunnar Schaefer committed
import copy
import shutil
import difflib
Gunnar Schaefer's avatar
Gunnar Schaefer committed
import datetime
import mimetypes
import tempdir as tempfile
Gunnar Schaefer's avatar
Gunnar Schaefer committed

import scitran.data
import scitran.data.medimg.montage

mimetypes.types_map.update({'.bvec': 'text/plain'})
mimetypes.types_map.update({'.bval': 'text/plain'})

get_info = scitran.data.medimg.montage.get_info
get_tile = scitran.data.medimg.montage.get_tile

Gunnar Schaefer's avatar
Gunnar Schaefer committed
PROJECTION_FIELDS = ['timestamp', 'permissions', 'public']


def guess_mime(fn):
    """Guess mimetype based on filename."""
    # TODO: could move mime types to scitran.data, but that would only work well if ALL files
    # went thrugh scitra.data.  We can guarantee that all files go through the API during upload,
    # or download.  the API seems the right place to determine mime information.
    mime, enc = mimetypes.guess_type(fn)
    if not mime:
        mime = 'application/octet-stream'
    return mime


def insert_file(dbc, _id, file_info, filepath, digest, data_path, quarantine_path, flavor='file'):
    """Insert a file as an attachment or as a file."""
Gunnar Schaefer's avatar
Gunnar Schaefer committed
    filename = os.path.basename(filepath)
Gunnar Schaefer's avatar
Gunnar Schaefer committed
    flavor += 's'
    dataset = None
Gunnar Schaefer's avatar
Gunnar Schaefer committed
    if _id is None:
        try:
            log.info('Parsing     %s' % filename)
            dataset = scitran.data.parse(filepath)
        except scitran.data.DataError:
            q_path = tempfile.mkdtemp(prefix=datetime.datetime.now().strftime('%Y%m%d_%H%M%S_'), dir=quarantine_path)
            shutil.move(filepath, q_path)
            return 202, 'Quarantining %s (unparsable)' % filename

        log.info('Sorting     %s' % filename)
        _id = _update_db(dbc.database, dataset)
        file_spec = dict(
                _id=_id,
                files={'$elemMatch': {
                    'type': dataset.nims_file_type,
                    'kinds': dataset.nims_file_kinds,
                    'state': dataset.nims_file_state,
                    }},
                )
        file_info = dict(
                name=dataset.nims_file_name,
                ext=dataset.nims_file_ext,
                size=os.path.getsize(filepath),
                sha1=digest,
                #hash=dataset.nims_hash, TODO: datasets should be able to hash themselves (but not here)
                type=dataset.nims_file_type,
                kinds=dataset.nims_file_kinds,
                state=dataset.nims_file_state,
                )
        filename = dataset.nims_file_name + dataset.nims_file_ext
    else:
        file_spec = {
                '_id': _id,
                flavor: {'$elemMatch': {
                    'type': file_info.get('type'),
                    'kinds': file_info.get('kinds'),
                    'state': file_info.get('state'),
Gunnar Schaefer's avatar
Gunnar Schaefer committed
                    }},
        if flavor == 'attachments':
            file_spec[flavor]['$elemMatch'].update({'name': file_info.get('name'), 'ext': file_info.get('ext')})
Gunnar Schaefer's avatar
Gunnar Schaefer committed
    container_path = os.path.join(data_path, str(_id)[-3:] + '/' + str(_id))
    if not os.path.exists(container_path):
        os.makedirs(container_path)
    success = dbc.update(file_spec, {'$set': {flavor + '.$': file_info}})
Gunnar Schaefer's avatar
Gunnar Schaefer committed
    if not success['updatedExisting']:
        dbc.update({'_id': _id}, {'$push': {flavor: file_info}})
    shutil.move(filepath, container_path + '/' + filename)
    if dataset:  # only create jobs if dataset is parseable
        create_job(dbc, dataset)
Gunnar Schaefer's avatar
Gunnar Schaefer committed
    log.debug('Done        %s' % os.path.basename(filepath)) # must use filepath, since filename is updated for sorted files
    return 200, 'Success'


def _update_db(db, dataset):
    #TODO: possibly try to keep a list of session IDs on the project, instead of having the session point to the project
    #      same for the session and acquisition
    #      queries might be more efficient that way
    session_spec = {'uid': dataset.nims_session_id}
    session = db.sessions.find_one(session_spec, ['project'])
    if session: # skip project creation, if session exists
        project = db.projects.find_one({'_id': session['project']}, fields=PROJECTION_FIELDS)
    else:
        existing_group_ids = [g['_id'] for g in db.groups.find(None, ['_id'])]
        group_id_matches = difflib.get_close_matches(dataset.nims_group_id, existing_group_ids, cutoff=0.8)
        if len(group_id_matches) == 1:
            group_id = group_id_matches[0]
            project_name = dataset.nims_project or 'untitled'
        else:
            group_id = 'unknown'
            project_name = dataset.nims_group_id + ('/' + dataset.nims_project if dataset.nims_project else '')
        group = db.groups.find_one({'_id': group_id})
        project_spec = {'group_id': group['_id'], 'name': project_name}
Gunnar Schaefer's avatar
Gunnar Schaefer committed
        project = db.projects.find_and_modify(
                project_spec,
Gunnar Schaefer's avatar
Gunnar Schaefer committed
                {'$setOnInsert': {'permissions': group['roles'], 'public': False, 'files': []}},
Gunnar Schaefer's avatar
Gunnar Schaefer committed
                upsert=True,
                new=True,
                fields=PROJECTION_FIELDS,
                )
    session = db.sessions.find_and_modify(
            session_spec,
            {
                '$setOnInsert': dict(project=project['_id'], permissions=project['permissions'], public=project['public'], files=[]),
                '$set': _entity_metadata(dataset, dataset.session_properties, session_spec), # session_spec ensures non-empty $set
                '$addToSet': {'domains': dataset.nims_file_domain},
                },
            upsert=True,
            new=True,
            fields=PROJECTION_FIELDS,
            )
    acquisition_spec = {'uid': dataset.nims_acquisition_id}
    acquisition = db.acquisitions.find_and_modify(
            acquisition_spec,
            {
                '$setOnInsert': dict(session=session['_id'], permissions=session['permissions'], public=session['public'], files=[]),
                '$set': _entity_metadata(dataset, dataset.acquisition_properties, acquisition_spec), # acquisition_spec ensures non-empty $set
                '$addToSet': {'types': {'$each': [{'domain': dataset.nims_file_domain, 'kind': kind} for kind in dataset.nims_file_kinds]}},
                },
            upsert=True,
            new=True,
            fields=[],
            )
    if dataset.nims_timestamp:
        db.projects.update({'_id': project['_id']}, {'$max': dict(timestamp=dataset.nims_timestamp), '$set': dict(timezone=dataset.nims_timezone)})
Gunnar Schaefer's avatar
Gunnar Schaefer committed
        db.sessions.update({'_id': session['_id']}, {'$min': dict(timestamp=dataset.nims_timestamp), '$set': dict(timezone=dataset.nims_timezone)})
    # create a job, if necessary
Gunnar Schaefer's avatar
Gunnar Schaefer committed
    return acquisition['_id']

# TODO: create job should be use-able from bootstrap.py with only database information
def create_job(dbc, dataset):
    db = dbc.database
    type_ = dataset.nims_file_type
    kinds_ = dataset.nims_file_kinds
    state_ = dataset.nims_file_state
    app = None
    # TODO: check if there are 'default apps' set for this project/session/acquisition
    acquisition = db.acquisitions.find_one({'uid': dataset.nims_acquisition_id})
    session = db.sessions.find_one({'_id': bson.ObjectId(acquisition.get('session'))})
    project = db.projects.find_one({'_id': bson.ObjectId(session.get('project'))})
    aid = acquisition.get('_id')

    # XXX: if an input kinds = None, then that job is meant to work on any file kinds
    app = db.apps.find_one({
        '$or': [
            {'inputs': {'$elemMatch': {'type': type_, 'state': state_, 'kinds': kinds_}}, 'default': True},
            {'inputs': {'$elemMatch': {'type': type_, 'state': state_, 'kinds': None}}, 'default': True},
        ],
    })
    # TODO: this has to move...
    # force acquisition dicom file to be marked as 'optional = True'
    db.acquisitions.find_and_modify(
        {'uid': dataset.nims_acquisition_id, 'files.type': 'dicom'},
        {'$set': {'files.$.optional': True}},
        )

    if not app:
        log.info('no app for type=%s, state=%s, kinds=%s, default=True. no job created.' % (type_, state_, kinds_))
    else:
        # XXX: outputs can specify to __INHERIT__ a value from the parent input file, for ex: kinds
        for output in app['outputs']:
            if output['kinds'] == '__INHERIT__':
                output['kinds'] = kinds_
        # TODO: job description needs more metadata to be searchable in a useful way
        output_url = '%s/%s/%s' % ('acquisitions', aid, 'file')
        job = db.jobs.find_and_modify(
            {
                '_id': db.jobs.count() + 1,
            },
            {
                '_id': db.jobs.count() + 1,
                'group': project.get('group_id'),
                'project': {
                    '_id': project.get('_id'),
                    'name': project.get('name'),
                'exam': session.get('exam'),
                'app': {
                    '_id': app['_id'],
                    'type': 'docker',
                'inputs': [
                    {
                        'filename': dataset.nims_file_name + dataset.nims_file_ext,
                        'url': '%s/%s/%s' % ('acquisitions', aid, 'file'),
                        'payload': {
                            'type': dataset.nims_file_type,
                            'state': dataset.nims_file_state,
                            'kinds': dataset.nims_file_kinds,
                        },
                    }
                ],
                'outputs': [{'url': output_url, 'payload': i} for i in app['outputs']],
                'status': 'pending',
                'activity': None,
                'added': datetime.datetime.now(),
                'timestamp': datetime.datetime.now(),
            },
            upsert=True,
            new=True,
        )
        log.info('created job %d, group: %s, project %s' % (job['_id'], job['group'], job['project']))
def insert_app(db, fp, apps_path, app_meta=None):
    """Validate and insert an application tar into the filesystem and database."""
    # download, md-5 check, and json validation are handled elsewhere
    if not app_meta:
        with tarfile.open(fp) as tf:
            for ti in tf:
                if ti.name.endswith('description.json'):
                    app_meta = json.load(tf.extractfile(ti))
                    break

    name, version = app_meta.get('_id').split(':')
    app_dir = os.path.join(apps_path, name)
    if not os.path.exists(app_dir):
        os.makedirs(app_dir)
    app_tar = os.path.join(app_dir, '%s-%s.tar' % (name, version))

    app_meta.update({'asset_url': 'apps/%s' % app_meta.get('_id')})
    db.apps.update({'_id': app_meta.get('_id')}, app_meta, new=True, upsert=True)
    shutil.move(fp, app_tar)


Gunnar Schaefer's avatar
Gunnar Schaefer committed
def _entity_metadata(dataset, properties, metadata={}, parent_key=''):
    metadata = copy.deepcopy(metadata)
    if dataset.nims_metadata_status is not None:
        parent_key = parent_key and parent_key + '.'
        for key, attributes in properties.iteritems():
            if attributes['type'] == 'object':
                metadata.update(_entity_metadata(dataset, attributes['properties'], parent_key=key))
            else:
                value = getattr(dataset, attributes['field']) if 'field' in attributes else None
                if value or value == 0: # drop Nones and empty iterables
                    metadata[parent_key + key] = value
    return metadata


def hrsize(size):
    if size < 1000:
        return '%d%s' % (size, 'B')
    for suffix in 'KMGTPEZY':
        size /= 1024.
        if size < 10.:
            return '%.1f%s' % (size, suffix)
        if size < 1000.:
            return '%.0f%s' % (size, suffix)
    return '%.0f%s' % (size, 'Y')


def mongo_dict(d):
    def _mongo_list(d, pk=''):
        pk = pk and pk + '.'
        return sum([_mongo_list(v, pk+k) if isinstance(v, dict) else [(pk+k, v)] for k, v in d.iteritems()], [])
    return dict(_mongo_list(d))
Gunnar Schaefer's avatar
Gunnar Schaefer committed


def user_perm(permissions, _id, site=None):
    for perm in permissions:
        if perm['_id'] == _id and perm.get('site') == site:
            return perm
    else:
        return {}


def download_ticket(type_, target, filename, size):
    import bson.json_util
    return {
            '_id': str(bson.ObjectId()), # FIXME: use better ticket ID
            'timestamp': datetime.datetime.utcnow(),
            'type': type_,
            'target': target,
            'filename': filename,
            'size': size,
            }