Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
util.py 11.82 KiB
# @author:  Gunnar Schaefer

import os
import copy
import pytz
import uuid
import shutil
import difflib
import hashlib
import pymongo
import zipfile
import datetime
import mimetypes
import dateutil.parser
import tempdir as tempfile
import logging

import scitran.data

logging.basicConfig(
    format='%(asctime)s %(name)16.16s:%(levelname)4.4s %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
    level=logging.DEBUG,
)
log = logging.getLogger('scitran.api')

MIMETYPES = [
    ('.bvec', 'text', 'bvec'),
    ('.bval', 'text', 'bval'),
    ('.m', 'text', 'matlab'),
    ('.sh', 'text', 'shell'),
    ('.r', 'text', 'r'),
]
for mt in MIMETYPES:
    mimetypes.types_map.update({mt[0]: mt[1] + '/' + mt[2]})

valid_timezones = pytz.all_timezones

PROJECTION_FIELDS = ['group', 'name', 'label', 'timestamp', 'permissions', 'public']


def parse_file(filepath, digest):
    filename = os.path.basename(filepath)
    try:
        log.info('Parsing     %s' % filename)
        dataset = scitran.data.parse(filepath)
    except scitran.data.DataError as exp:
        log.info('Unparsable  %s (%s)' % (filename, exp))
        return None
    filename = dataset.nims_file_name + dataset.nims_file_ext
    fileinfo = {
            'mimetype': guess_mimetype(filename),
            'filename': filename,
            'filesize': os.path.getsize(filepath),
            'filetype': dataset.nims_file_type,
            'filehash': digest,
            'modality': dataset.nims_file_domain,
            'datatypes': dataset.nims_file_kinds,
            'flavor': 'data',
            }
    datainfo = {
            'acquisition_id': dataset.nims_acquisition_id,
            'session_id': dataset.nims_session_id,
            'group_id': dataset.nims_group_id,
            'project_name': dataset.nims_project,
            'session_properties': _entity_metadata(dataset, dataset.session_properties),
            'acquisition_properties': _entity_metadata(dataset, dataset.acquisition_properties),
            'timestamp': dataset.nims_timestamp,
            'timezone': dataset.nims_timezone,
            }
    datainfo['fileinfo'] = fileinfo
    # HACK!!!
    datainfo['acquisition_properties'].pop('filetype', None)
    if fileinfo['filetype'] == 'dicom' and fileinfo['datatypes'][0] != 'screenshot':
        datainfo['acquisition_properties']['modality'] = fileinfo['modality']
        datainfo['acquisition_properties']['datatype'] = fileinfo['datatypes'][0]
    elif fileinfo['filetype'] == 'meeg':
        datainfo['acquisition_properties']['datatype'] = fileinfo['datatypes'][0]
    return datainfo


def quarantine_file(filepath, quarantine_path):
    q_path = tempfile.mkdtemp(prefix=datetime.datetime.now().strftime('%Y%m%d_%H%M%S_'), dir=quarantine_path)
    shutil.move(filepath, q_path)


def commit_file(dbc, _id, datainfo, filepath, data_path, force=False):
    """Insert a file as an attachment or as a file."""
    filename = os.path.basename(filepath)
    fileinfo = datainfo['fileinfo']
    if _id is None:
        _id = _update_db(dbc.database, datainfo)
    container_path = os.path.join(data_path, str(_id)[-3:] + '/' + str(_id))
    target_filepath = container_path + '/' + fileinfo['filename']
    if not os.path.exists(container_path):
        os.makedirs(container_path)
    container = dbc.find_one({'_id':_id, 'files.filename': fileinfo['filename']})
    if container: # file already exists
        for f in container['files']:
            if f['filename'] == fileinfo['filename']:
                if not force:
                    updated = False
                elif identical_content(target_filepath, f['filehash'], filepath, fileinfo['filehash']): # existing file has identical content
                    log.debug('Dropping    %s (identical)' % filename)
                    os.remove(filepath)
                    updated = None
                else: # existing file has different content
                    log.debug('Replacing   %s' % filename)
                    shutil.move(filepath, target_filepath)
                    dbc.update_one({'_id':_id, 'files.filename': fileinfo['filename']},
                            {'$set': {'files.$.dirty': True, 'files.$.modified': datetime.datetime.utcnow()}})
                    updated = True
                break
    else:         # file does not exist
        log.debug('Adding      %s' % filename)
        fileinfo['dirty'] = True
        fileinfo['created'] = fileinfo['modified'] = datetime.datetime.utcnow()
        shutil.move(filepath, target_filepath)
        dbc.update_one({'_id': _id}, {'$push': {'files': fileinfo}})
        updated = True
    log.debug('Done        %s' % filename)
    return updated


def _update_db(db, datainfo):
    #TODO: possibly try to keep a list of session IDs on the project, instead of having the session point to the project
    #      same for the session and acquisition
    #      queries might be more efficient that way
    session_spec = {'uid': datainfo['session_id']}
    session = db.sessions.find_one(session_spec, ['project'])
    if session: # skip project creation, if session exists
        project = db.projects.find_one({'_id': session['project']}, projection=PROJECTION_FIELDS + ['name'])
    else:
        existing_group_ids = [g['_id'] for g in db.groups.find(None, ['_id'])]
        group_id_matches = difflib.get_close_matches(datainfo['group_id'], existing_group_ids, cutoff=0.8)
        if len(group_id_matches) == 1:
            group_id = group_id_matches[0]
            project_name = datainfo['project_name'] or 'untitled'
        else:
            group_id = 'unknown'
            project_name = datainfo['group_id'] + ('/' + datainfo['project_name'] if datainfo['project_name'] else '')
        group = db.groups.find_one({'_id': group_id})
        project_spec = {'group': group['_id'], 'name': project_name}
        project = db.projects.find_one_and_update(
                project_spec,
                {'$setOnInsert': {'permissions': group['roles'], 'public': False, 'files': []}},
                PROJECTION_FIELDS,
                upsert=True,
                return_document=pymongo.collection.ReturnDocument.AFTER,
                )
    session = db.sessions.find_one_and_update(
            session_spec,
            {
                '$setOnInsert': dict(group=project['group'], project=project['_id'], permissions=project['permissions'], public=project['public'], files=[]),
                '$set': datainfo['session_properties'] or session_spec, # session_spec ensures non-empty $set
                #'$addToSet': {'modalities': datainfo['fileinfo']['modality']}, # FIXME
                },
            PROJECTION_FIELDS,
            upsert=True,
            return_document=pymongo.collection.ReturnDocument.AFTER,
            )
    log.info('Setting     group_id="%s", project_name="%s", and session_label="%s"' % (project['group'], project.get('name', 'unknown'), session.get('label', '(unknown)')))
    acquisition_spec = {'uid': datainfo['acquisition_id']}
    acquisition = db.acquisitions.find_one_and_update(
            acquisition_spec,
            {
                '$setOnInsert': dict(session=session['_id'], permissions=session['permissions'], public=session['public'], files=[]),
                '$set': datainfo['acquisition_properties'] or acquisition_spec, # acquisition_spec ensures non-empty $set
                #'$addToSet': {'types': {'$each': [{'domain': dataset.nims_file_domain, 'kind': kind} for kind in dataset.nims_file_kinds]}},
                },
            [],
            upsert=True,
            return_document=pymongo.collection.ReturnDocument.AFTER,
            )
    if datainfo['timestamp']:
        db.projects.update_one({'_id': project['_id']}, {'$max': dict(timestamp=datainfo['timestamp']), '$set': dict(timezone=datainfo['timezone'])})
        db.sessions.update_one({'_id': session['_id']}, {'$min': dict(timestamp=datainfo['timestamp']), '$set': dict(timezone=datainfo['timezone'])})
    return acquisition['_id']


def _entity_metadata(dataset, properties, metadata={}, parent_key=''):
    metadata = copy.deepcopy(metadata)
    if dataset.nims_metadata_status is not None:
        parent_key = parent_key and parent_key + '.'
        for key, attributes in properties.iteritems():
            if attributes['type'] == 'object':
                metadata.update(_entity_metadata(dataset, attributes['properties'], parent_key=key))
            else:
                value = getattr(dataset, attributes['field']) if 'field' in attributes else None
                if value or value == 0: # drop Nones and empty iterables
                    metadata[parent_key + key] = value
    return metadata


def identical_content(filepath1, digest1, filepath2, digest2):
    if zipfile.is_zipfile(filepath1) and zipfile.is_zipfile(filepath2):
        with zipfile.ZipFile(filepath1) as zf1, zipfile.ZipFile(filepath2) as zf2:
            zf1_infolist = sorted(zf1.infolist(), key=lambda zi: zi.filename)
            zf2_infolist = sorted(zf2.infolist(), key=lambda zi: zi.filename)
            if zf1.comment != zf2.comment:
                return False
            if len(zf1_infolist) != len(zf2_infolist):
                return False
            for zii, zij in zip(zf1_infolist, zf2_infolist):
                if zii.CRC != zij.CRC:
                    return False
            else:
                return True
    else:
        return digest1 == digest2


def hrsize(size):
    if size < 1000:
        return '%d%s' % (size, 'B')
    for suffix in 'KMGTPEZY':
        size /= 1024.
        if size < 10.:
            return '%.1f%sB' % (size, suffix)
        if size < 1000.:
            return '%.0f%sB' % (size, suffix)
    return '%.0f%sB' % (size, 'Y')


def mongo_dict(d):
    def _mongo_list(d, pk=''):
        pk = pk and pk + '.'
        return sum([_mongo_list(v, pk+k) if isinstance(v, dict) else [(pk+k, v)] for k, v in d.iteritems()], [])
    return dict(_mongo_list(d))


def user_perm(permissions, _id, site=None):
    for perm in permissions:
        if perm['_id'] == _id and perm.get('site') == site:
            return perm
    else:
        return {}


def container_fileinfo(container, filename):
    for fileinfo in container.get('files', []):
        if fileinfo['filename'] == filename:
            return fileinfo
    else:
        return None


def upload_ticket(ip, **kwargs):
    ticket = {
        '_id': str(uuid.uuid4()),
        'timestamp': datetime.datetime.utcnow(),
        'ip': ip,
    }
    ticket.update(kwargs)
    return ticket


def download_ticket(ip, type_, target, filename, size):
    return {
        '_id': str(uuid.uuid4()),
        'timestamp': datetime.datetime.utcnow(),
        'ip': ip,
        'type': type_,
        'target': target,
        'filename': filename,
        'size': size,
    }


def receive_stream_and_validate(stream, filepath, received_md5):
    md5 = hashlib.md5()
    sha1 = hashlib.sha1()
    filesize = 0
    start_time = datetime.datetime.utcnow()
    with open(filepath, 'wb') as fd:
        for chunk in iter(lambda: stream.read(2**20), ''):
            md5.update(chunk)
            sha1.update(chunk)
            filesize += len(chunk)
            fd.write(chunk)
    duration = datetime.datetime.utcnow() - start_time
    return (md5.hexdigest() == received_md5) if received_md5 is not None else True, sha1.hexdigest(), filesize, duration


def guess_mimetype(filepath):
    """Guess MIME type based on filename."""
    mime, _ = mimetypes.guess_type(filepath)
    return mime or 'application/octet-stream'


def guess_filetype(filepath, mimetype):
    """Guess file type based on filename and MIME type."""
    type_, subtype = mimetype.split('/')
    if filepath.endswith('.nii') or filepath.endswith('.nii.gz'):
        return 'nifti'
    elif filepath.endswith('_montage.zip'):
        return 'montage'
    elif type_ == 'text' and subtype in ['plain'] + [mt[2] for mt in MIMETYPES]:
        return 'text'
    else:
        return subtype


def parse_timestamp(iso_timestamp):
    return dateutil.parser.parse(iso_timestamp)