-
Nathaniel Kofalt authoredNathaniel Kofalt authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
encoder.py 1.18 KiB
import bson.objectid
import datetime
import json
import pytz
from .jobs.jobs import Job
def custom_json_serializer(obj):
if isinstance(obj, bson.objectid.ObjectId):
return str(obj)
elif isinstance(obj, datetime.datetime):
return pytz.timezone('UTC').localize(obj).isoformat()
elif isinstance(obj, Job):
return obj.map()
raise TypeError(repr(obj) + " is not JSON serializable")
def sse_pack(d):
"""
Format a map with Server-Sent-Event-meaningful keys into a string for transport.
Happily borrowed from: http://taoofmac.com/space/blog/2014/11/16/1940
For reading on web usage: http://www.html5rocks.com/en/tutorials/eventsource/basics
For reading on the format: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#Event_stream_format
"""
buffer = ''
for k in ['retry', 'id', 'event', 'data']:
if k in d.keys():
buffer += '%s: %s\n' % (k, d[k])
return buffer + '\n'
def json_sse_pack(d):
"""
Variant of sse_pack that will json-encode your data blob.
"""
d['data'] = json.dumps(d['data'], default=custom_json_serializer)
return sse_pack(d)