import json
import uuid
from datetime import datetime
from typing import Union, List
from dateutil import parser
TimeSeriesDataType = ('NUMBER', 'TEXT', 'TIME', 'COORDINATES')
class CustomDatetimeConverter(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat(timespec='milliseconds')
# if isinstance(obj, complex):
# return [obj.real, obj.imag]
# Let the base class default method raise the TypeError
return super().default(obj)
[docs]
class TsRecord:
"""
A record of TimeSeries object
:param timestamp: Timestamp
:type timestamp: datetime
:param value: Value. For data type 'NUMBER', value must be int or float.
:type value: Union[float, str, int]
:param quality: Quality
:type quality: int
:param annotation: Annotation
:type annotation: str
"""
def __init__(self, timestamp: datetime, value: Union[float, str, int] = None, quality: int = None,
annotation: str = None):
# TODO 1: enforce value types
self.timestamp = timestamp
self.value = value
self.quality = quality
self.annotation = annotation
[docs]
class TimeSeries:
"""
TimeSeries object
:param data_type: Type of time series. E.g.: 'NUMBER', 'TEXT', 'TIME', 'COORDINATES'
:type data_type: str, optional
:default data_type: 'NUMBER'
:param records: List of records
:type records: list, optional
:param name: Time series name
:type name: str
:param identifier: Time series ID. Autogenerated as UUID4 if not specified
:type identifier: str, optional
"""
def __init__(self, name: str, units: str = None, identifier: str = str(uuid.uuid4()),
data_type: str = 'NUMBER',
records: Union[List[TsRecord], TsRecord] = None):
self.identifier = identifier
self.name = name
self.units = units
self.data_type = data_type
if records is None:
self.records = []
elif isinstance(records, TsRecord):
self.records = [records]
elif isinstance(records, list) and all(isinstance(x, TsRecord) for x in records):
self.records = records
else:
raise TypeError("'records' value must be TsRecord or List[TsRecord]")
for r in self.records:
if (r.value is not None) and not (isinstance(r.value, (float, int))) and (self.data_type == 'NUMBER'):
raise TypeError(
"TimeSeries with data type 'NUMBER' includes TSRecord with value '%s' which is %s. 'NUMBER' "
"values must be int or float" % (r.value, type(r.value))
)
# def __eq__(self, other):
# return self.__dict__ is other.__dict__
[docs]
def insert(self, records: Union[TsRecord, List[TsRecord]]):
"""
Insert single or multiple records
"""
if isinstance(records, list):
self.records.extend(records)
# single instance
else:
self.records.append(records)
# def sort(self):
# pass
#
# def clone(self):
# # ITimeSeries<Type>;
# pass
def __len__(self):
return self.records.__len__()
[docs]
def toJSON(self) -> str:
"""
Outputs formatted JSON
"""
return json.dumps(self)
# TODO METHODS to implement
#
# def get_timestamps:
#
# public sort (): TimeSeries<Type> {
#
# public clone (): TimeSeries<Type> {
#
# private recordToJSON (record: ITimeSeriesRecord<Type>): ITimeSeriesRecordJson<Type> {
#
# private valueToJSON (value: Type | undefined): Type | undefined | string | null {
#
# private cloneRecords (records: ITimeSeriesRecord<Type>[]): ITimeSeriesRecord<Type>[] {
class JtsDocument:
pass
[docs]
class JtsDocument:
"""
JTS document object
:raise [TypeError]: [Value of 'series' must be types of TimeSeries or List[TimeSeries]]
"""
"""
TODO Methods to implement
// Clone document (also clones series)
jtsDocument.clone()
// Create a new jtsDocument from JSON
const jtsDocument = JtsDocument.from('{"docType": "jts", ...}')
Properties
// Get JTS specification version number
jtsDocument.version // 1
// Get
jtsDocument.series // [timeseries1, timeseries2]
"""
def __init__(self, series: Union[List[TimeSeries], TimeSeries] = None, version: str = "1.0"):
# enforce accepted types
if series is not None:
if (not isinstance(series, list) and not isinstance(series, TimeSeries)) \
or (isinstance(series, list) and not all(isinstance(x, TimeSeries) for x in series)):
raise TypeError("Value of 'series' must be types of TimeSeries or List[TimeSeries]")
self.version = version
if isinstance(series, list):
self.series = series
elif isinstance(series, TimeSeries):
self.series = [series]
else:
self.series = []
# def __eq__(self, other):
# return self.__dict__ is other.__dict__
[docs]
def addSeries(self, series: Union[List[TimeSeries], TimeSeries]):
"""
Add single or multiple TimeSeries
"""
if isinstance(series, list):
self.series.extend(series)
# single instance
else:
self.series.append(series)
def __len__(self):
return self.series.__len__()
[docs]
def toJSON(self) -> dict:
"""
Output as dictionary of JSON structure
:return: Python dictionary of JSON structure
:rtype: dict
"""
# return json.dumps(self.__build())
return self.__build()
[docs]
def toJSONString(self) -> str:
"""
Output as stringified JSON (json.dumps)
:return: Output as stringified JSON
:rtype: str
"""
return json.dumps(self.toJSON(), cls=CustomDatetimeConverter)
def __build(self):
doc = dict(docType='jts',
version=self.version)
data = self.__get_data()
if not data:
raise Exception("Cannot build without jts 'data'")
header = self.__get_header(data)
if header:
doc['header'] = header
doc['data'] = data
return doc
def __get_header(self, data):
return dict(startTime=data[0]['ts'],
endTime=data[-1]['ts'],
recordCount=len(data),
columns=self.__getHeaderColumns()
) if data else None
def __getHeaderColumns(self):
column_map = {}
for idx, s in enumerate(self.series):
column_map[idx] = dict(
id=s.identifier,
name=s.name,
dataType=s.data_type,
)
if s.units:
column_map[idx]["units"] = s.units
return column_map
# build "data" section of the document
def __get_data(self):
record_map = {}
for idx, s in enumerate(self.series):
for r in s.records:
if (r.value is None) and (r.annotation is None) and (r.quality is None):
continue
key = r.timestamp.timestamp()
if not record_map.get(key):
# Dirty way to convert timestamp to string here, but it is to avoid datetime serialization upstream
record_map[key] = {"ts": r.timestamp.isoformat(timespec='milliseconds'), "f": {}}
record_map[key]["f"][idx] = self.__getDataColumnFromRecord(r, s.data_type) # dict of entry values
# record_map_sorted = dict(sorted(record_map.items()))
record_map_sorted = [x[1] for x in sorted(record_map.items())] # use tuple here?
return record_map_sorted
def __getDataColumnFromRecord(self, r, data_type):
column = {}
v = r.value
if v is not None:
if data_type == 'NUMBER':
if isinstance(v, float) or isinstance(v, int):
column["v"] = v
else:
raise TypeError("Value of Data Type 'NUMBER' must be float or int")
elif data_type == 'TEXT':
column["v"] = str(v)
# TODO other types below
# case 'TIME': return { $time: (v as Date).toISOString?.() || 'invalid date' }
# case 'COORDINATES': return { $coords: ((v as Array<number>).length === 2 ? v : []) }
if r.quality:
column["q"] = r.quality
if r.annotation:
column["a"] = r.annotation
return column
[docs]
@staticmethod
def fromJSON(json_str: str) -> JtsDocument:
"""
Create a new jtsDocument from JSON
"""
json_obj = json.loads(json_str)
jts_doc = JtsDocument(version=json_obj.get('version'))
# build series from header columns
for idx, c in json_obj['header']["columns"].items():
jts_doc.addSeries(TimeSeries(
identifier=c.get("id"),
name=c.get("name"),
data_type=c.get("dataType"),
units=c.get("units"))
)
# add records to corresponding series
for e in json_obj['data']:
ts = parser.parse(e["ts"])
f = e["f"]
for i, r in f.items():
jts_doc.series[int(i)].insert(
TsRecord(
timestamp=ts,
value=r.get('v'),
quality=r.get('q'),
annotation=r.get('a'))
)
# else:
# raise Exception("Data columns do not match Header columns")
return jts_doc
[docs]
def getSeries(self, identifier: str) -> TimeSeries:
"""
Get series by id
"""
# TODO: return list of found series
return next((x for x in self.series if x.identifier == identifier), None)