Skip to content
Snippets Groups Projects
Commit 2eec5918 authored by Nathaniel Kofalt's avatar Nathaniel Kofalt
Browse files

Swap synthetic upgrade for real one

parent 717f0b9f
No related branches found
No related tags found
No related merge requests found
......@@ -53,6 +53,44 @@ def confirm_schema_match():
else:
sys.exit(0)
def getMonotonicTime():
# http://stackoverflow.com/a/7424304
return os.times()[4]
def process_cursor(cursor, closure):
"""
Given an iterable (say, a mongo cursor) and a closure, call that closure in parallel over the iterable.
Call order is undefinied. Currently launches N python process workers, where N is the number of vcpu cores.
Useful for upgrades that need to touch each document in a database, and don't need an iteration order.
"""
begin = getMonotonicTime()
cores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(cores)
logging.info('Iterating over cursor with ' + str(cores) + ' workers')
# Launch all work, iterating over the cursor
# Note that this creates an array of n multiprocessing.pool.AsyncResults, where N is table size.
# Memory usage concern in the future? Doesn't seem to be an issue with ~120K records.
# Could be upgraded later with some yield trickery.
results = [pool.apply_async(closure, (document,)) for document in cursor]
# Read the results back, presumably in order!
for res in results:
result = res.get()
if result != True:
logging.info('Upgrade failed: ' + str(result))
logging.info('Waiting for workers to complete')
pool.close()
pool.join()
end = getMonotonicTime()
elapsed = end - begin
logging.info('Parallel cursor iteration took ' + ('%.2f' % elapsed))
def upgrade_to_1():
"""
scitran/core issue #206
......@@ -878,45 +916,13 @@ def upgrade_to_25():
config.db.authtokens.update_many({'refresh_token': {'$exists': True}}, {'$unset': {'refresh_token': ''}})
def getMonotonicTime():
# http://stackoverflow.com/a/7424304
return os.times()[4]
def upgrade_to_26_closure(job):
def process_cursor(cursor, closure):
begin = getMonotonicTime()
cores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(cores)
logging.info('Iterating over cursor with ' + str(cores) + ' workers')
# Launch all work, iterating over the cursor
# Note that this creates an array of n multiprocessing.pool.AsyncResults, where N is table size.
# Memory usage concern in the future? Doesn't seem to be an issue with ~120K records.
# Could be upgraded later with some yield trickery.
results = [pool.apply_async(closure, (document,)) for document in cursor]
# Read the results back, presumably in order!
for res in results:
result = res.get()
if result != True:
logging.info('Upgrade failed: ' + str(result))
logging.info('Waiting for workers to complete')
pool.close()
pool.join()
end = getMonotonicTime()
elapsed = end - begin
logging.info('Parallel cursor iteration took ' + ('%.3f' % elapsed))
def upgrade(job):
# Semi-worst-case: single uncacheable lookup per migrated job
gear = config.db.gears.find_one({'_id': bson.ObjectId(job['gear_id'])})
gear = config.db.gears.find_one({'_id': bson.ObjectId(job['gear_id'])}, {'gear.name': 1})
gear_name = gear['gear']['name']
# Update doc
result = config.db.jobs.update_one({'_id': job['_id']}, {'$push': {'tags': 'parallel-approach'}})
result = config.db.jobs.update_one({'_id': job['_id']}, {'$push': {'tags': gear_name }})
if result.modified_count == 1:
return True
else:
......@@ -929,33 +935,8 @@ def upgrade_to_26():
Add job tags back to the job document, and use a faster cursor-walking update method
"""
logging.info('Upgrade v26, removing tags')
config.db.jobs.update_many({}, {"$pull": {"tags": "normal-approach"}})
config.db.jobs.update_many({}, {"$pull": {"tags": "parallel-approach"}})
logging.info('Upgrade v26, removed')
# Normal approach
begin = getMonotonicTime()
cursor = config.db.jobs.find({})
for job in cursor:
# Semi-worst-case: single uncacheable lookup per migrated job
gear = config.db.gears.find_one({'_id': bson.ObjectId(job['gear_id'])})
gear_name = gear['gear']['name']
# Update doc
result = config.db.jobs.update_one({'_id': job['_id']}, {'$push': {'tags': 'normal-approach'}})
if result.modified_count != 1:
logging.info('Parallel failed: update doc ' + str(job['_id']) + ' resulted modified ' + str(result.modified_count))
end = getMonotonicTime()
elapsed = end - begin
logging.info('Upgrade v26, normal approach took ' + ('%.3f' % elapsed))
# Parallel approach
cursor = config.db.jobs.find({})
process_cursor(cursor, upgrade)
process_cursor(cursor, upgrade_to_26_closure)
def upgrade_schema():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment