Source code for json_timeseries.jts

import json
import uuid
from datetime import datetime
from typing import Union, List

from dateutil import parser

TimeSeriesDataType = ('NUMBER', 'TEXT', 'TIME', 'COORDINATES')


class CustomDatetimeConverter(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.isoformat(timespec='milliseconds')

        # if isinstance(obj, complex):
        #     return [obj.real, obj.imag]

        # Let the base class default method raise the TypeError
        return super().default(obj)


[docs] class TsRecord: """ A record of TimeSeries object :param timestamp: Timestamp :type timestamp: datetime :param value: Value. For data type 'NUMBER', value must be int or float. :type value: Union[float, str, int] :param quality: Quality :type quality: int :param annotation: Annotation :type annotation: str """ def __init__(self, timestamp: datetime, value: Union[float, str, int] = None, quality: int = None, annotation: str = None): # TODO 1: enforce value types self.timestamp = timestamp self.value = value self.quality = quality self.annotation = annotation
[docs] class TimeSeries: """ TimeSeries object :param data_type: Type of time series. E.g.: 'NUMBER', 'TEXT', 'TIME', 'COORDINATES' :type data_type: str, optional :default data_type: 'NUMBER' :param records: List of records :type records: list, optional :param name: Time series name :type name: str :param identifier: Time series ID. Autogenerated as UUID4 if not specified :type identifier: str, optional """ def __init__(self, name: str, units: str = None, identifier: str = str(uuid.uuid4()), data_type: str = 'NUMBER', records: Union[List[TsRecord], TsRecord] = None): self.identifier = identifier self.name = name self.units = units self.data_type = data_type if records is None: self.records = [] elif isinstance(records, TsRecord): self.records = [records] elif isinstance(records, list) and all(isinstance(x, TsRecord) for x in records): self.records = records else: raise TypeError("'records' value must be TsRecord or List[TsRecord]") for r in self.records: if (r.value is not None) and not (isinstance(r.value, (float, int))) and (self.data_type == 'NUMBER'): raise TypeError( "TimeSeries with data type 'NUMBER' includes TSRecord with value '%s' which is %s. 'NUMBER' " "values must be int or float" % (r.value, type(r.value)) ) # def __eq__(self, other): # return self.__dict__ is other.__dict__
[docs] def insert(self, records: Union[TsRecord, List[TsRecord]]): """ Insert single or multiple records """ if isinstance(records, list): self.records.extend(records) # single instance else: self.records.append(records)
# def sort(self): # pass # # def clone(self): # # ITimeSeries<Type>; # pass def __len__(self): return self.records.__len__()
[docs] def toJSON(self) -> str: """ Outputs formatted JSON """ return json.dumps(self)
# TODO METHODS to implement # # def get_timestamps: # # public sort (): TimeSeries<Type> { # # public clone (): TimeSeries<Type> { # # private recordToJSON (record: ITimeSeriesRecord<Type>): ITimeSeriesRecordJson<Type> { # # private valueToJSON (value: Type | undefined): Type | undefined | string | null { # # private cloneRecords (records: ITimeSeriesRecord<Type>[]): ITimeSeriesRecord<Type>[] { class JtsDocument: pass
[docs] class JtsDocument: """ JTS document object :raise [TypeError]: [Value of 'series' must be types of TimeSeries or List[TimeSeries]] """ """ TODO Methods to implement // Clone document (also clones series) jtsDocument.clone() // Create a new jtsDocument from JSON const jtsDocument = JtsDocument.from('{"docType": "jts", ...}') Properties // Get JTS specification version number jtsDocument.version // 1 // Get jtsDocument.series // [timeseries1, timeseries2] """ def __init__(self, series: Union[List[TimeSeries], TimeSeries] = None, version: str = "1.0"): # enforce accepted types if series is not None: if (not isinstance(series, list) and not isinstance(series, TimeSeries)) \ or (isinstance(series, list) and not all(isinstance(x, TimeSeries) for x in series)): raise TypeError("Value of 'series' must be types of TimeSeries or List[TimeSeries]") self.version = version if isinstance(series, list): self.series = series elif isinstance(series, TimeSeries): self.series = [series] else: self.series = [] # def __eq__(self, other): # return self.__dict__ is other.__dict__
[docs] def addSeries(self, series: Union[List[TimeSeries], TimeSeries]): """ Add single or multiple TimeSeries """ if isinstance(series, list): self.series.extend(series) # single instance else: self.series.append(series)
def __len__(self): return self.series.__len__()
[docs] def toJSON(self) -> dict: """ Output as dictionary of JSON structure :return: Python dictionary of JSON structure :rtype: dict """ # return json.dumps(self.__build()) return self.__build()
[docs] def toJSONString(self) -> str: """ Output as stringified JSON (json.dumps) :return: Output as stringified JSON :rtype: str """ return json.dumps(self.toJSON(), cls=CustomDatetimeConverter)
def __build(self): doc = dict(docType='jts', version=self.version) data = self.__get_data() if not data: raise Exception("Cannot build without jts 'data'") header = self.__get_header(data) if header: doc['header'] = header doc['data'] = data return doc def __get_header(self, data): return dict(startTime=data[0]['ts'], endTime=data[-1]['ts'], recordCount=len(data), columns=self.__getHeaderColumns() ) if data else None def __getHeaderColumns(self): column_map = {} for idx, s in enumerate(self.series): column_map[idx] = dict( id=s.identifier, name=s.name, dataType=s.data_type, ) if s.units: column_map[idx]["units"] = s.units return column_map # build "data" section of the document def __get_data(self): record_map = {} for idx, s in enumerate(self.series): for r in s.records: if (r.value is None) and (r.annotation is None) and (r.quality is None): continue key = r.timestamp.timestamp() if not record_map.get(key): # Dirty way to convert timestamp to string here, but it is to avoid datetime serialization upstream record_map[key] = {"ts": r.timestamp.isoformat(timespec='milliseconds'), "f": {}} record_map[key]["f"][idx] = self.__getDataColumnFromRecord(r, s.data_type) # dict of entry values # record_map_sorted = dict(sorted(record_map.items())) record_map_sorted = [x[1] for x in sorted(record_map.items())] # use tuple here? return record_map_sorted def __getDataColumnFromRecord(self, r, data_type): column = {} v = r.value if v is not None: if data_type == 'NUMBER': if isinstance(v, float) or isinstance(v, int): column["v"] = v else: raise TypeError("Value of Data Type 'NUMBER' must be float or int") elif data_type == 'TEXT': column["v"] = str(v) # TODO other types below # case 'TIME': return { $time: (v as Date).toISOString?.() || 'invalid date' } # case 'COORDINATES': return { $coords: ((v as Array<number>).length === 2 ? v : []) } if r.quality: column["q"] = r.quality if r.annotation: column["a"] = r.annotation return column
[docs] @staticmethod def fromJSON(json_str: str) -> JtsDocument: """ Create a new jtsDocument from JSON """ json_obj = json.loads(json_str) jts_doc = JtsDocument(version=json_obj.get('version')) # build series from header columns for idx, c in json_obj['header']["columns"].items(): jts_doc.addSeries(TimeSeries( identifier=c.get("id"), name=c.get("name"), data_type=c.get("dataType"), units=c.get("units")) ) # add records to corresponding series for e in json_obj['data']: ts = parser.parse(e["ts"]) f = e["f"] for i, r in f.items(): jts_doc.series[int(i)].insert( TsRecord( timestamp=ts, value=r.get('v'), quality=r.get('q'), annotation=r.get('a')) ) # else: # raise Exception("Data columns do not match Header columns") return jts_doc
[docs] def getSeries(self, identifier: str) -> TimeSeries: """ Get series by id """ # TODO: return list of found series return next((x for x in self.series if x.identifier == identifier), None)