-
Renzo Frigato authoredRenzo Frigato authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
util.py 12.30 KiB
import os
import copy
import pytz
import uuid
import shutil
import difflib
import hashlib
import logging
import pymongo
import zipfile
import datetime
import mimetypes
import tempdir as tempfile
logging.basicConfig(
format='%(asctime)s %(name)16.16s %(filename)24.24s %(lineno)5d:%(levelname)4.4s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG,
)
log = logging.getLogger('scitran.api')
MIMETYPES = [
('.bvec', 'text', 'bvec'),
('.bval', 'text', 'bval'),
('.m', 'text', 'matlab'),
('.sh', 'text', 'shell'),
('.r', 'text', 'r'),
]
for mt in MIMETYPES:
mimetypes.types_map.update({mt[0]: mt[1] + '/' + mt[2]})
valid_timezones = pytz.all_timezones
PROJECTION_FIELDS = ['group', 'name', 'label', 'timestamp', 'permissions', 'public']
def parse_file(filepath, digest):
filename = os.path.basename(filepath)
try:
log.info('Parsing %s' % filename)
dataset = scitran.data.parse(filepath)
except scitran.data.DataError as exp:
log.info('Unparsable %s (%s)' % (filename, exp))
return None
filename = dataset.nims_file_name + dataset.nims_file_ext
fileinfo = {
'mimetype': guess_mimetype(filename),
'filename': filename,
'filesize': os.path.getsize(filepath),
'filetype': dataset.nims_file_type,
'filehash': digest,
'modality': dataset.nims_file_domain,
'datatypes': dataset.nims_file_kinds,
'tags': ['data'],
}
datainfo = {
'acquisition_id': dataset.nims_acquisition_id,
'session_id': dataset.nims_session_id,
'group_id': dataset.nims_group_id,
'project_name': dataset.nims_project,
'session_properties': _entity_metadata(dataset, dataset.session_properties),
'acquisition_properties': _entity_metadata(dataset, dataset.acquisition_properties),
'timestamp': dataset.nims_timestamp,
'timezone': dataset.nims_timezone,
}
datainfo['fileinfo'] = fileinfo
# HACK!!!
datainfo['acquisition_properties'].pop('filetype', None)
if fileinfo['filetype'] == 'dicom' and fileinfo['datatypes'][0] != 'screenshot':
datainfo['acquisition_properties']['modality'] = fileinfo['modality']
datainfo['acquisition_properties']['datatype'] = fileinfo['datatypes'][0]
elif fileinfo['filetype'] == 'meeg':
datainfo['acquisition_properties']['datatype'] = fileinfo['datatypes'][0]
return datainfo
def quarantine_file(filepath, quarantine_path):
q_path = tempfile.mkdtemp(prefix=datetime.datetime.now().strftime('%Y%m%d_%H%M%S_'), dir=quarantine_path)
shutil.move(filepath, q_path)
def commit_file(dbc, _id, datainfo, filepath, data_path, force=False):
"""Insert a file as an attachment or as a file."""
filename = os.path.basename(filepath)
fileinfo = datainfo['fileinfo']
if _id is None:
_id = _update_db(dbc.database, datainfo)
container_path = os.path.join(data_path, str(_id)[-3:] + '/' + str(_id))
target_filepath = container_path + '/' + fileinfo['filename']
if not os.path.exists(container_path):
os.makedirs(container_path)
# Copy datatype from the container to the file, if the file does not have one already.
container = dbc.find_one({'_id':_id})
if 'datatype' in container and not 'datatypes' in datainfo['fileinfo']:
# Whitelist data types
if datainfo['fileinfo'].get('filetype') in ('nifti', 'montage', 'bvec', 'bval'):
datainfo['fileinfo']['datatypes'] = [ container['datatype'] ]
container = dbc.find_one({'_id':_id, 'files.filename': fileinfo['filename']})
if container: # file already exists
for f in container['files']:
if f['filename'] == fileinfo['filename']:
if not force:
updated = False
elif identical_content(target_filepath, f['filehash'], filepath, fileinfo['filehash']): # existing file has identical content
log.debug('Dropping %s (identical)' % filename)
os.remove(filepath)
updated = None
else: # existing file has different content
log.debug('Replacing %s' % filename)
shutil.move(filepath, target_filepath)
update_set = {'files.$.dirty': True, 'files.$.modified': datetime.datetime.utcnow()}
# in this branch of the code, we are overriding an existing file.
# update_set allows to update all the fileinfo like size, hash, etc.
for k,v in fileinfo.iteritems():
update_set['files.$.' + k] = v
dbc.update_one({'_id':_id, 'files.filename': fileinfo['filename']},
{'$set': update_set})
updated = True
break
else: # file does not exist
log.debug('Adding %s' % filename)
fileinfo['dirty'] = True
fileinfo['created'] = fileinfo['modified'] = datetime.datetime.utcnow()
shutil.move(filepath, target_filepath)
dbc.update_one({'_id': _id}, {'$push': {'files': fileinfo}})
updated = True
log.debug('Done %s' % filename)
return updated
def _update_db(db, datainfo):
#TODO: possibly try to keep a list of session IDs on the project, instead of having the session point to the project
# same for the session and acquisition
# queries might be more efficient that way
session_spec = {'uid': datainfo['session_id']}
session = db.sessions.find_one(session_spec, ['project'])
if session: # skip project creation, if session exists
project = db.projects.find_one({'_id': session['project']}, projection=PROJECTION_FIELDS + ['name'])
else:
existing_group_ids = [g['_id'] for g in db.groups.find(None, ['_id'])]
group_id_matches = difflib.get_close_matches(datainfo['group_id'], existing_group_ids, cutoff=0.8)
if len(group_id_matches) == 1:
group_id = group_id_matches[0]
project_name = datainfo['project_name'] or 'untitled'
else:
group_id = 'unknown'
project_name = datainfo['group_id'] + ('/' + datainfo['project_name'] if datainfo['project_name'] else '')
group = db.groups.find_one({'_id': group_id})
project_spec = {'group': group['_id'], 'name': project_name}
project = db.projects.find_one_and_update(
project_spec,
{'$setOnInsert': {'permissions': group['roles'], 'public': False, 'files': []}},
PROJECTION_FIELDS,
upsert=True,
return_document=pymongo.collection.ReturnDocument.AFTER,
)
session = db.sessions.find_one_and_update(
session_spec,
{
'$setOnInsert': dict(group=project['group'], project=project['_id'], permissions=project['permissions'], public=project['public'], files=[]),
'$set': datainfo['session_properties'] or session_spec, # session_spec ensures non-empty $set
#'$addToSet': {'modalities': datainfo['fileinfo']['modality']}, # FIXME
},
PROJECTION_FIELDS,
upsert=True,
return_document=pymongo.collection.ReturnDocument.AFTER,
)
log.info('Setting group_id="%s", project_name="%s", and session_label="%s"' % (project['group'], project.get('name', 'unknown'), session.get('label', '(unknown)')))
acquisition_spec = {'uid': datainfo['acquisition_id']}
acquisition = db.acquisitions.find_one_and_update(
acquisition_spec,
{
'$setOnInsert': dict(session=session['_id'], permissions=session['permissions'], public=session['public'], files=[]),
'$set': datainfo['acquisition_properties'] or acquisition_spec, # acquisition_spec ensures non-empty $set
#'$addToSet': {'types': {'$each': [{'domain': dataset.nims_file_domain, 'kind': kind} for kind in dataset.nims_file_kinds]}},
},
[],
upsert=True,
return_document=pymongo.collection.ReturnDocument.AFTER,
)
if datainfo['timestamp']:
db.projects.update_one({'_id': project['_id']}, {'$max': dict(timestamp=datainfo['timestamp']), '$set': dict(timezone=datainfo['timezone'])})
db.sessions.update_one({'_id': session['_id']}, {'$min': dict(timestamp=datainfo['timestamp']), '$set': dict(timezone=datainfo['timezone'])})
return acquisition['_id']
def _entity_metadata(dataset, properties, metadata={}, parent_key=''):
metadata = copy.deepcopy(metadata)
if dataset.nims_metadata_status is not None:
parent_key = parent_key and parent_key + '.'
for key, attributes in properties.iteritems():
if attributes['type'] == 'object':
metadata.update(_entity_metadata(dataset, attributes['properties'], parent_key=key))
else:
value = getattr(dataset, attributes['field']) if 'field' in attributes else None
if value or value == 0: # drop Nones and empty iterables
metadata[parent_key + key] = value
return metadata
def identical_content(filepath1, digest1, filepath2, digest2):
if zipfile.is_zipfile(filepath1) and zipfile.is_zipfile(filepath2):
with zipfile.ZipFile(filepath1) as zf1, zipfile.ZipFile(filepath2) as zf2:
zf1_infolist = sorted(zf1.infolist(), key=lambda zi: zi.filename)
zf2_infolist = sorted(zf2.infolist(), key=lambda zi: zi.filename)
if zf1.comment != zf2.comment:
return False
if len(zf1_infolist) != len(zf2_infolist):
return False
for zii, zij in zip(zf1_infolist, zf2_infolist):
if zii.CRC != zij.CRC:
return False
else:
return True
else:
return digest1 == digest2
def hrsize(size):
if size < 1000:
return '%d%s' % (size, 'B')
for suffix in 'KMGTPEZY':
size /= 1024.
if size < 10.:
return '%.1f%sB' % (size, suffix)
if size < 1000.:
return '%.0f%sB' % (size, suffix)
return '%.0f%sB' % (size, 'Y')
def mongo_dict(d):
def _mongo_list(d, pk=''):
pk = pk and pk + '.'
return sum([_mongo_list(v, pk+k) if isinstance(v, dict) else [(pk+k, v)] for k, v in d.iteritems()], [])
return dict(_mongo_list(d))
def user_perm(permissions, _id, site=None):
for perm in permissions:
if perm['_id'] == _id and perm.get('site') == site:
return perm
else:
return {}
def container_fileinfo(container, filename):
for fileinfo in container.get('files', []):
if fileinfo['filename'] == filename:
return fileinfo
else:
return None
def download_ticket(ip, type_, target, filename, size):
return {
'_id': str(uuid.uuid4()),
'timestamp': datetime.datetime.utcnow(),
'ip': ip,
'type': type_,
'target': target,
'filename': filename,
'size': size,
}
def receive_stream_and_validate(stream, filepath, received_md5):
skip_md5 = False
if received_md5 is not None:
md5 = hashlib.md5()
else:
skip_md5 = True
sha384 = hashlib.sha384()
filesize = 0
start_time = datetime.datetime.utcnow()
with open(filepath, 'wb') as fd:
for chunk in iter(lambda: stream.read(2**20), ''):
if received_md5 is not None:
md5.update(chunk)
sha384.update(chunk)
filesize += len(chunk)
fd.write(chunk)
duration = datetime.datetime.utcnow() - start_time
return skip_md5 or (md5.hexdigest() == received_md5), sha384.hexdigest(), filesize, duration
def guess_mimetype(filepath):
"""Guess MIME type based on filename."""
mime, _ = mimetypes.guess_type(filepath)
return mime or 'application/octet-stream'
def guess_filetype(filepath, mimetype):
"""Guess file type based on filename and MIME type."""
type_, subtype = mimetype.split('/')
if filepath.endswith('.nii') or filepath.endswith('.nii.gz'):
return 'nifti'
elif filepath.endswith('_montage.zip'):
return 'montage'
elif type_ == 'text' and subtype == 'plain':
return 'text'
else:
return subtype