import bson
import copy
import datetime
import dateutil
import os
import pymongo
from . import tempdir as tempfile
from . import util
from . import validators
class Placer(object):
Interface for a placer, which knows how to process files and place them where they belong - on disk and database.
def __init__(self, container_type, container, id_, metadata, timestamp, origin, context):
self.metadata = metadata
self.timestamp = timestamp
# A placer-defined map for breaking the Placer abstraction layer.
self.context = context
# Should the caller expect a normal map return, or a generator that gets mapped to Server-Sent Events?
self.sse = False
# A list of files that have been saved via save_file() usually returned by finalize()
self.saved = []
def check(self):
Run any pre-processing checks. Expected to throw on error.
raise NotImplementedError()
def process_file_field(self, field, info):
Process a single file field.
raise NotImplementedError()
def finalize(self):
Run any post-processing work. Expected to return output for the callee.
raise NotImplementedError()
def requireTarget(self):
Helper function that throws unless a container was provided.
if self.id_ is None or self.container is None or self.container_type is None:
raise Exception('Must specify a target')
def requireMetadata(self):
Helper function that throws unless metadata was provided.
if self.metadata == None:
raise Exception('Metadata required')
Helper function that moves a file saved via a form field into our CAS.
May trigger jobs, if applicable, so this should only be called once we're ready for that.
Requires an augmented file field; see process_upload() for details.
# Save file
if field is not None:
hierarchy.upsert_fileinfo(self.container_type, self.id_, info)
# Queue any jobs as a result of this upload
rules.create_jobs(config.db, self.container, self.container_type, info)
class TargetedPlacer(Placer):
A placer that can accept N files to a specific container (acquisition, etc).
LIMITATION: To temporarily avoid messing with the JSON schema, this endpoint can only consume one file :(
An exception is thrown in upload.process_upload() if you try. This could be fixed by making a better schema.
def check(self):
validators.validate_data(self.metadata, 'file.json', 'input', 'POST', optional=True)
def process_file_field(self, field, info):
if self.metadata:
self.save_file(field, info)
def finalize(self):
return self.saved
A placer that can accept multiple files.
It uses the method upsert_bottom_up_hierarchy to create its project/session/acquisition hierarchy
Sessions and acquisitions are identified by UID.
metadata_schema = 'uidupload.json'
create_hierarchy = staticmethod(hierarchy.upsert_bottom_up_hierarchy)
def __init__(self, container_type, container, id_, metadata, timestamp, origin, context):
super(UIDPlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context)
def check(self):
payload_schema_uri = validators.schema_uri('input', self.metadata_schema)
metadata_validator = validators.from_schema_path(payload_schema_uri)
metadata_validator(self.metadata, 'POST')
targets = self.create_hierarchy(self.metadata)
for target in targets:
for name in target[1]:
self.metadata_for_file[name] = {
'container': target[0],
'metadata': target[1][name]
def process_file_field(self, field, info):
# For the file, given self.targets, choose a target
name = field.filename
target = self.metadata_for_file.get(name)
# if the file was not included in the metadata skip it
if not target:
container = target['container']
r_metadata = target['metadata']
if container.level != 'subject':
self.container_type = container.level
self.container = container.container
self.save_file(field, info)
if field is not None:
if info is not None:
# # Queue any jobs as a result of this upload
# rules.create_jobs(config.db, self.container, self.container_type, info)
def finalize(self):
return self.saved
A placer that create a hierarchy based on labels.
It uses the method upsert_top_down_hierarchy to create its project/session/acquisition hierarchy
Sessions and acquisitions are identified by label.
metadata_schema = 'labelupload.json'
create_hierarchy = staticmethod(hierarchy.upsert_top_down_hierarchy)
class UIDMatchPlacer(UIDPlacer):
A placer that uploads to an existing hierarchy it finds based on uid.
metadata_schema = 'uidmatchupload.json'
create_hierarchy = staticmethod(hierarchy.find_existing_hierarchy)
A placer that can accept files and/or metadata sent to it from the engine
It uses update_container_hierarchy to update the container and it's parents' fields from the metadata
def check(self):
if self.metadata is not None:
validators.validate_data(self.metadata, 'enginemetadata.json', 'input', 'POST', optional=True)
def process_file_field(self, field, info):
if self.metadata is not None:
file_mds = self.metadata.get(self.container_type, {}).get('files', [])
for file_md in file_mds:
if file_md['name'] == info['name']:
# Remove file metadata as it was already updated in process_file_field
for k in self.metadata.keys():
self.metadata[k].pop('files', {})
hierarchy.update_container_hierarchy(self.metadata, bid, self.container_type)
class TokenPlacer(Placer):
A placer that can accept N files and save them to a persistent directory across multiple requests.
Intended for use with a token that tracks where the files will be stored.
def __init__(self, container_type, container, id_, metadata, timestamp, origin, context):
super(TokenPlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context)
self.paths = []
self.folder = None
def check(self):
token = self.context['token']
if token is None:
raise Exception('TokenPlacer requires a token')
# This logic is used by:
# TokenPlacer.check
# PackfilePlacer.check
# upload.clean_packfile_tokens
# It must be kept in sync between each instance.
base_path = config.get_item('persistent', 'data_path')
self.folder = os.path.join(base_path, 'tokens', 'packfile', token)
def process_file_field(self, field, info):
def finalize(self):
for path in self.paths:
dest = os.path.join(self.folder, os.path.basename(path))
shutil.move(path, dest)
return self.saved
A placer that can accept N files, save them into a zip archive, and place the result on an acquisition.
def __init__(self, container_type, container, id_, metadata, timestamp, origin, context):
super(PackfilePlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context)
# This endpoint is an SSE endpoint
self.sse = True
# Populated in check(), used in finalize()
self.p_id = None
self.s_label = None
self.a_label = None
self.g_id = None
self.permissions = {}
self.folder = None
token = self.context['token']
if token is None:
raise Exception('PackfilePlacer requires a token')
# This logic is used by:
# TokenPlacer.check
# PackfilePlacer.check
# upload.clean_packfile_tokens
# It must be kept in sync between each instance.
base_path = config.get_item('persistent', 'data_path')
self.folder = os.path.join(base_path, 'tokens', 'packfile', token)
if not os.path.isdir(self.folder):
raise Exception('Packfile directory does not exist or has been deleted')
validators.validate_data(self.metadata, 'packfile.json', 'input', 'POST')
# Save required fields
self.p_id = self.metadata['project']['_id']
self.s_label = self.metadata['session']['label']
self.a_label = self.metadata['acquisition']['label']
# Get project info that we need later
project = config.db['projects'].find_one({ '_id': bson.ObjectId(self.p_id)})
self.permissions = project.get('permissions', {})
# If a timestamp was provided, use that for zip files. Otherwise use a set date.
# Normally we'd use epoch, but zips cannot support years older than 1980, so let's use that instead.
# Then, given the ISO string, convert it to an epoch integer.
minimum = datetime.datetime(1980, 1, 1).isoformat()
stamp = self.metadata['acquisition'].get('timestamp', minimum)
# If there was metadata sent back that predates the zip minimum, don't use it.
# Dateutil has overloaded the comparison operators, except it's totally useless:
# > TypeError: can't compare offset-naive and offset-aware datetimes
# So instead, epoch-integer both and compare that way.
if int(dateutil.parser.parse(stamp).strftime('%s')) < int(dateutil.parser.parse(minimum).strftime('%s')):
stamp = minimum
# Remember the timestamp integer for later use with os.utime.
self.ziptime = int(dateutil.parser.parse(stamp).strftime('%s'))
# The zipfile is a santizied acquisition label
self.dir_ = util.sanitize_string_to_filename(self.a_label) = self.dir_ + '.zip'
# Make a tempdir to store zip until moved
# OPPORTUNITY: this is also called in Could be a util func.
self.tempdir = tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path'))
# Create a zip in the tempdir that later gets moved into the CAS.
self.path = os.path.join(, '')
self.zip_ = zipfile.ZipFile(self.path, 'w', zipfile.ZIP_DEFLATED, allowZip64=True)
# OPPORTUNITY: add zip comment
# = json.dumps(metadata, default=metadata_encoder)
# Bit of a silly hack: write our tempdir directory into the zip (not including its contents).
# Creates an empty directory entry in the zip which will hold all the files inside.
# This way, when you expand a zip, you'll get folder/things instead of a thousand dicoms splattered everywhere.
self.zip_.write(, self.dir_)
def process_file_field(self, field, info):
# Should not be called with any files
raise Exception('Files must already be uploaded')
# Write all files to zip
complete = 0
for path in paths:
p = os.path.join(self.folder, path)
# Set the file's mtime & atime.
os.utime(p, (self.ziptime, self.ziptime))
# Place file into the zip folder we created before
self.zip_.write(p, os.path.join(self.dir_, os.path.basename(path)))
'data': { 'done': complete, 'total': total, 'percent': (complete / float(total)) * 100 },
# Remove the folder created by TokenPlacer
# Create an anyonmous object in the style of our augmented file fields.
# Not a great practice. See process_upload() for details.
cgi_field = util.obj_from_map({
'path': self.path,
'size': os.path.getsize(self.path),
'hash': files.hash_file_formatted(self.path),
'mimetype': util.guess_mimetype(''),
'modified': self.timestamp
# Similarly, create the info map that is consumed by helper funcs. Clear duplication :(
# This could be coalesced into a single map thrown on file fields, for example.
# Used in the API return.
cgi_info = {
'name': cgi_field.filename,
'modified': cgi_field.modified,
'size': cgi_field.size,
'hash': cgi_field.hash,
'type': self.metadata['packfile']['type'],
# OPPORTUNITY: packfile endpoint could be extended someday to take additional metadata.
'instrument': None,
'measurements': [],
'tags': [],
'metadata': {},
# Manually add the file orign to the packfile metadata.
# This is set by upload.process_upload on each file, but we're not storing those.
'origin': self.origin
# Get or create a session based on the hierarchy and provided labels.
'label': self.s_label,
'group': self.g_id
# Updates if existing
updates = {}
updates['permissions'] = self.permissions
updates['modified'] = self.timestamp
updates = util.mongo_dict(updates)
# Extra properties on insert
insert_map = copy.deepcopy(query)
insert_map['created'] = self.timestamp
insert_map['subject'] = containerutil.add_id_to_subject(insert_map.get('subject'), self.p_id)
session = config.db['session' + 's'].find_one_and_update(
query, {
'$set': updates,
'$setOnInsert': insert_map
# Get or create an acquisition based on the hierarchy and provided labels.
# Updates if existing
updates = {}
updates['permissions'] = self.permissions
updates['modified'] = self.timestamp
updates = util.mongo_dict(updates)
# Extra properties on insert
insert_map = copy.deepcopy(query)
insert_map['created'] = self.timestamp
acquisition = config.db['acquisition' + 's'].find_one_and_update(
query, {
'$set': updates,
'$setOnInsert': insert_map
# Set instance target for helper func
self.container_type = 'acquisition'
self.id_ = str(acquisition['_id'])
self.container = acquisition
# Delete token
token = self.context['token']
config.db['tokens'].delete_one({ '_id': token })
'acquisition_id': str(acquisition['_id']),
'session_id': str(session['_id']),
class AnalysisPlacer(Placer):
def check(self):
validators.validate_data(self.metadata, 'analysis.json', 'input', 'POST', optional=True)
def process_file_field(self, field, info):
def finalize(self):
# we are going to merge the "hard" infos from the processed upload
# with the infos from the payload
metadata_infos = {}
for info in self.metadata.pop('inputs', []):
info['input'] = True
metadata_infos[info['name']] = info
for info in self.metadata.pop('outputs', []):
info['output'] = True
metadata_infos[info['name']] = info
self.metadata['files'] = []
for info in self.saved:
metadata_info = metadata_infos.get(info['name'], {})
return self.metadata
class AnalysisJobPlacer(Placer):
if self.id_ is None:
raise Exception('Must specify a target analysis')
def process_file_field(self, field, info):
if self.metadata is not None:
file_mds = self.metadata.get('acquisition', {}).get('files', [])
for file_md in file_mds:
if file_md['name'] == info['name']:
info['output'] = True
# Search the sessions table for analysis, replace file field
u = {'$push': {'analyses.$.files': {'$each': self.saved}}}
if self.context.get('job_id'):
# If the original job failed, update the analysis with the job that succeeded
u['$set'] = {'job': self.context['job_id']}
config.db.sessions.update_one(q, u)