Skip to content
Snippets Groups Projects
Commit 7a246b1d authored by Harsha Kethineni's avatar Harsha Kethineni
Browse files

modified process cursor to take optional params

deleted redundant closures
parent e4690c7f
No related branches found
No related tags found
No related merge requests found
......@@ -58,7 +58,7 @@ def getMonotonicTime():
# http://stackoverflow.com/a/7424304
return os.times()[4]
def process_cursor(cursor, closure):
def process_cursor(cursor, closure, context = None):
"""
Given an iterable (say, a mongo cursor) and a closure, call that closure in parallel over the iterable.
Call order is undefined. Currently launches N python process workers, where N is the number of vcpu cores.
......@@ -110,7 +110,10 @@ def process_cursor(cursor, closure):
if 100 * (cursor_index / cursor_size) >= next_percent:
logging.info('{} percent complete ...'.format(next_percent))
next_percent = next_percent + percent_increment
result = closure(document)
if optional_param == None:
result = closure(document)
else:
result = closure(document, context)
cursor_index = cursor_index + 1
if result != True:
failed = True
......@@ -1054,91 +1057,32 @@ def upgrade_to_29():
users = config.db.users.find({})
process_cursor(users, upgrade_to_29_closure)
def upgrade_to_30_closure_session_analysis(session):
analyses = session.get('analyses', None)
def upgrade_to_30_closure_analysis(coll_item, coll):
analyses = coll_item.get('analyses', None)
if analyses is not None:
for analysis_ in analyses:
files = analysis_.get('files', [])
for file_ in files:
if 'created' not in file_:
file_['created'] = analysis_.get('created', datetime.datetime.utcnow())
result = config.db.sessions.update_one({'_id': session['_id']}, {'$set': {'analyses': analyses}})
file_['created'] = analysis_.get('created', datetime.datetime(1970, 1, 1))
result = coll.update_one({'_id': coll_item['_id']}, {'$set': {'analyses': analyses}})
if result.modified_count == 1:
return True
else:
return "File timestamp creation failed for:" + str(session) + '/analyses' + str(analysis_) + '/files' + str(file_)
def upgrade_to_30_closure_collection_analysis(collection):
analyses = collection.get('analyses', None)
if analyses is not None:
for analysis_ in analyses:
files = analysis_.get('files', [])
for file_ in files:
if 'created' not in file_:
file_['created'] = analysis_.get('created', datetime.datetime.utcnow())
result = config.db.collections.update_one({'_id': collection['_id']}, {'$set': {'analyses': analyses}})
if result.modified_count == 1:
return True
else:
return "File timestamp creation failed for:" + 'str(collection)' + '/analyses' + 'str(analysis_)' + '/files' + 'str(file_)'
def upgrade_to_30_closure_sessions(session):
session_files = session.get('files', [])
for sfile_ in session_files:
if 'created' not in sfile_:
sfile_['created'] = session.get('created', datetime.datetime.utcnow())
result = config.db.sessions.update_one({'_id': session['_id']}, {'$set': {'files': session_files}})
if result.modified_count == 1:
return True
else:
return "File timestamp creation failed for:" + str(session) + '/files' + str(file_)
def upgrade_to_30_closure_collections(collection):
collection_files = collection.get('files', [])
for sfile_ in collection_files:
if 'created' not in sfile_:
sfile_['created'] = collection.get('created', datetime.datetime.utcnow())
result = config.db.collections.update_one({'_id': collection['_id']}, {'$set': {'files': collection_files}})
if result.modified_count == 1:
return True
else:
return "File timestamp creation failed for:" + str(collection) + '/files' + str(file_)
def upgrade_to_30_closure_projects(project):
files = project.get('files', [])
for file_ in files:
if 'created' not in file_:
file_['created'] = project.get('created', datetime.datetime.utcnow())
result = config.db.projects.update_one({'_id': project['_id']}, {'$set': {'files': files}})
if result.modified_count == 1:
return True
else:
return "File timestamp creation failed for:" + str(project) + '/files' + str(file_)
def upgrade_to_30_closure_acquisitions(acquisition):
files = acquisition.get('files', [])
def upgrade_to_30_closure_coll(coll_item, coll):
files = coll_item.get('files', [])
for file_ in files:
if 'created' not in file_:
file_['created'] = acquisition.get('created', datetime.datetime.utcnow())
result = config.db.acquisitions.update_one({'_id': acquisition['_id']}, {'$set': {'files': files}})
file_['created'] = coll_item.get('created', datetime.datetime(1970, 1, 1))
result = coll.update_one({'_id': coll_item['_id']}, {'$set': {'files': files}})
if result.modified_count == 1:
return True
else:
return "File timestamp creation failed for:" + str(acquisition) + '/files' + str(file_)
return "File timestamp creation failed for:" + str(session) + '/files' + str(file_)
def upgrade_to_30_test(coll_item):
analyses = coll_item.get('analyses', None)
if analyses is not None:
for a_index, analysis_ in enumerate(coll_item['analyses']):
files = coll_item['analyses'][a_index].get('files')
if files is not None:
created_count = 0
for file_ in files:
if 'created' not in file_:
logging.info(coll_item + ', ' + analysis_ + ', ' + file_ + ' has no created timestamp')
return True
def upgrade_to_30():
"""
......@@ -1156,23 +1100,23 @@ def upgrade_to_30():
cursor = config.db.collections.find({'analyses': {'$exists': True},
'analyses.files.created': {'$exists': False}})
process_cursor(cursor, upgrade_to_30_closure_collection_analysis)
process_cursor(cursor, upgrade_to_30_closure_analysis, config.db.collections)
cursor = config.db.sessions.find({'analyses': {'$exists': True},
'analyses.files.created': {'$exists': False}})
process_cursor(cursor, upgrade_to_30_closure_session_analysis)
process_cursor(cursor, upgrade_to_30_closure_analysis, config.db.sessions)
cursor = config.db.sessions.find({'files': {'$exists': True}, 'files.created': {'$exists': False}})
process_cursor(cursor, upgrade_to_30_closure_sessions)
process_cursor(cursor, upgrade_to_30_closure_coll, config.db.sessions)
cursor = config.db.collections.find({'files': {'$exists': True}, 'files.created': {'$exists': False}})
process_cursor(cursor, upgrade_to_30_closure_collections)
process_cursor(cursor, upgrade_to_30_closure_coll, config.db.collections)
cursor = config.db.acquisitions.find({'files': {'$exists': True}, 'files.created': {'$exists': False}})
process_cursor(cursor, upgrade_to_30_closure_acquisitions)
process_cursor(cursor, upgrade_to_30_closure_coll, config.db.acquisitions)
cursor = config.db.projects.find({'files': {'$exists': True}, 'files.created': {'$exists': False}})
process_cursor(cursor, upgrade_to_30_closure_projects)
process_cursor(cursor, upgrade_to_30_closure_coll, config.db.projects)
def upgrade_schema():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment