Newer
Older
# @author: Gunnar Schaefer
import logging
log = logging.getLogger('scitran.api')
import os
import copy
import shutil
import difflib
import datetime
import scitran.data.medimg.montage
mimetypes.types_map.update({'.bvec': 'text/plain'})
mimetypes.types_map.update({'.bval': 'text/plain'})
get_info = scitran.data.medimg.montage.get_info
get_tile = scitran.data.medimg.montage.get_tile
PROJECTION_FIELDS = ['timestamp', 'permissions', 'public']
def guess_mime(fn):
"""Guess mimetype based on filename."""
# TODO: could move mime types to scitran.data, but that would only work well if ALL files
# went thrugh scitra.data. We can guarantee that all files go through the API during upload,
# or download. the API seems the right place to determine mime information.
mime, enc = mimetypes.guess_type(fn)
if not mime:
mime = 'application/octet-stream'
return mime
def insert_file(dbc, _id, file_info, filepath, digest, data_path, quarantine_path, flavor='file'):
"""Insert a file as an attachment or as a file."""
if _id is None:
try:
log.info('Parsing %s' % filename)
dataset = scitran.data.parse(filepath)
except scitran.data.DataError:
q_path = tempfile.mkdtemp(prefix=datetime.datetime.now().strftime('%Y%m%d_%H%M%S_'), dir=quarantine_path)
shutil.move(filepath, q_path)
return 202, 'Quarantining %s (unparsable)' % filename
log.info('Sorting %s' % filename)
_id = _update_db(dbc.database, dataset)
file_spec = dict(
_id=_id,
files={'$elemMatch': {
'type': dataset.nims_file_type,
'kinds': dataset.nims_file_kinds,
'state': dataset.nims_file_state,
}},
)
file_info = dict(
name=dataset.nims_file_name,
ext=dataset.nims_file_ext,
size=os.path.getsize(filepath),
sha1=digest,
#hash=dataset.nims_hash, TODO: datasets should be able to hash themselves (but not here)
type=dataset.nims_file_type,
kinds=dataset.nims_file_kinds,
state=dataset.nims_file_state,
)
filename = dataset.nims_file_name + dataset.nims_file_ext
else:
file_spec = {
'_id': _id,
flavor: {'$elemMatch': {
'type': file_info.get('type'),
'kinds': file_info.get('kinds'),
'state': file_info.get('state'),
if flavor == 'attachments':
file_spec[flavor]['$elemMatch'].update({'name': file_info.get('name'), 'ext': file_info.get('ext')})
container_path = os.path.join(data_path, str(_id)[-3:] + '/' + str(_id))
if not os.path.exists(container_path):
os.makedirs(container_path)
success = dbc.update(file_spec, {'$set': {flavor + '.$': file_info}})
dbc.update({'_id': _id}, {'$push': {flavor: file_info}})
shutil.move(filepath, container_path + '/' + filename)
if dataset: # only create jobs if dataset is parseable
create_job(dbc, dataset)
log.debug('Done %s' % os.path.basename(filepath)) # must use filepath, since filename is updated for sorted files
return 200, 'Success'
def _update_db(db, dataset):
#TODO: possibly try to keep a list of session IDs on the project, instead of having the session point to the project
# same for the session and acquisition
# queries might be more efficient that way
session_spec = {'uid': dataset.nims_session_id}
session = db.sessions.find_one(session_spec, ['project'])
if session: # skip project creation, if session exists
project = db.projects.find_one({'_id': session['project']}, fields=PROJECTION_FIELDS)
else:
existing_group_ids = [g['_id'] for g in db.groups.find(None, ['_id'])]
group_id_matches = difflib.get_close_matches(dataset.nims_group_id, existing_group_ids, cutoff=0.8)
if len(group_id_matches) == 1:
group_id = group_id_matches[0]
project_name = dataset.nims_project or 'untitled'
else:
group_id = 'unknown'
project_name = dataset.nims_group_id + ('/' + dataset.nims_project if dataset.nims_project else '')
group = db.groups.find_one({'_id': group_id})
project_spec = {'group_id': group['_id'], 'name': project_name}
project = db.projects.find_and_modify(
project_spec,
{'$setOnInsert': {'permissions': group['roles'], 'public': False, 'files': []}},
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
upsert=True,
new=True,
fields=PROJECTION_FIELDS,
)
session = db.sessions.find_and_modify(
session_spec,
{
'$setOnInsert': dict(project=project['_id'], permissions=project['permissions'], public=project['public'], files=[]),
'$set': _entity_metadata(dataset, dataset.session_properties, session_spec), # session_spec ensures non-empty $set
'$addToSet': {'domains': dataset.nims_file_domain},
},
upsert=True,
new=True,
fields=PROJECTION_FIELDS,
)
acquisition_spec = {'uid': dataset.nims_acquisition_id}
acquisition = db.acquisitions.find_and_modify(
acquisition_spec,
{
'$setOnInsert': dict(session=session['_id'], permissions=session['permissions'], public=session['public'], files=[]),
'$set': _entity_metadata(dataset, dataset.acquisition_properties, acquisition_spec), # acquisition_spec ensures non-empty $set
'$addToSet': {'types': {'$each': [{'domain': dataset.nims_file_domain, 'kind': kind} for kind in dataset.nims_file_kinds]}},
},
upsert=True,
new=True,
fields=[],
)
if dataset.nims_timestamp:
db.projects.update({'_id': project['_id']}, {'$max': dict(timestamp=dataset.nims_timestamp)})
db.sessions.update({'_id': session['_id']}, {'$min': dict(timestamp=dataset.nims_timestamp), '$set': dict(timezone=dataset.nims_timezone)})
# TODO: create job should be use-able from bootstrap.py with only database information
def create_job(dbc, dataset):
# TODO: this should search the 'apps' db collection.
# each 'app' must define it's expected inputs's type, state and kind
# some apps are special defaults. one default per data specific triple.
# allow apps to have set-able 'state', that is appended to the file at the
# end of processing
#
# desired query to find default app, for a specific data variety would be:
# app_id = self.app.db.apps.find({
# 'default': True,
# 'type': ftype, # string
# 'kinds': fkinds, # list
# 'state_': fstate[-1], # string
# })
# apps specify the last state of their desired input file.
db = dbc.database
type_ = dataset.nims_file_type
kinds_ = dataset.nims_file_kinds
state_ = dataset.nims_file_state
app_id = None
if type_ == 'dicom' and state_ == ['orig']:
if kinds_ != ['screenshot']:
# could ship a script that gets mounted into the container.
# but then the script would also need to specify what base image it needs.
app_id = 'scitran/dcm2nii:latest'
# app_input is implied; type = 'dicom', state =['orig',] and kinds != 'screenshot'
app_outputs = [
{
'fext': '.nii.gz',
'state': ['derived', ],
'type': 'nifti',
'kinds': dataset.nims_file_kinds, # there should be someway to indicate 'from parent file'
},
{
'fext': '.bvec',
'state': ['derived', ],
'type': 'text',
'kinds': ['bvec', ],
},
{
'fext': '.bval',
'state': ['derived', ],
'type': 'text',
'kinds': ['bval', ],
},
]
# force acquisition dicom file to be marked as 'optional = True'
db.acquisitions.find_and_modify(
{'uid': dataset.nims_acquisition_id, 'files.type': 'dicom'},
{'$set': {'files.$.optional': True}},
)
if not app_id:
log.info('no app for type=%s, state=%s, kinds=%s, default=True. no job created.' % (type_, state_, kinds_))
else:
# TODO: check if there are 'default apps' set for this project/session/acquisition
acquisition = db.acquisitions.find_one({'uid': dataset.nims_acquisition_id})
session = db.sessions.find_one({'_id': bson.ObjectId(acquisition.get('session'))})
project = db.projects.find_one({'_id': bson.ObjectId(session.get('project'))})
aid = acquisition.get('_id')
# TODO: job description needs more metadata to be searchable in a useful way
output_url = '%s/%s/%s' % ('acquisitions', aid, 'file')
job = db.jobs.find_and_modify(
{
'_id': db.jobs.count() + 1,
},
{
'_id': db.jobs.count() + 1,
'group': project.get('group_id'),
'project': {
'_id': project.get('_id'),
'name': project.get('name'),
},
'app': {
'_id': app_id,
'type': 'docker',
},
'inputs': [
{
'url': '%s/%s/%s' % ('acquisitions', aid, 'file'),
'payload': {
'type': dataset.nims_file_type,
'state': dataset.nims_file_state,
'kinds': dataset.nims_file_kinds,
},
}
],
'outputs': [{'url': output_url, 'payload': i} for i in app_outputs],
'status': 'pending', # queued
'activity': None,
'added': datetime.datetime.now(),
'timestamp': datetime.datetime.now(),
},
upsert=True,
new=True,
)
log.info('created job %d, group: %s, project %s' % (job['_id'], job['group'], job['project']))
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def _entity_metadata(dataset, properties, metadata={}, parent_key=''):
metadata = copy.deepcopy(metadata)
if dataset.nims_metadata_status is not None:
parent_key = parent_key and parent_key + '.'
for key, attributes in properties.iteritems():
if attributes['type'] == 'object':
metadata.update(_entity_metadata(dataset, attributes['properties'], parent_key=key))
else:
value = getattr(dataset, attributes['field']) if 'field' in attributes else None
if value or value == 0: # drop Nones and empty iterables
metadata[parent_key + key] = value
return metadata
def hrsize(size):
if size < 1000:
return '%d%s' % (size, 'B')
for suffix in 'KMGTPEZY':
size /= 1024.
if size < 10.:
return '%.1f%s' % (size, suffix)
if size < 1000.:
return '%.0f%s' % (size, suffix)
return '%.0f%s' % (size, 'Y')
def mongo_dict(d):
def _mongo_list(d, pk=''):
pk = pk and pk + '.'
return sum([_mongo_list(v, pk+k) if isinstance(v, dict) else [(pk+k, v)] for k, v in d.iteritems()], [])
return dict(_mongo_list(d))
def user_perm(permissions, _id, site=None):
for perm in permissions:
if perm['_id'] == _id and perm.get('site') == site:
return perm
else:
return {}
def download_ticket(type_, target, filename, size):
import bson.json_util
return {
'_id': str(bson.ObjectId()), # FIXME: use better ticket ID
'timestamp': datetime.datetime.utcnow(),
'type': type_,
'target': target,
'filename': filename,
'size': size,
}