TAI64Base = 0x4000000000000000
Leapsecs1972 = 10
Leapsecs = tuple(
- int(datetime(y, m, 1, 0, 0, 0, 0, tzinfo=timezone.utc).timestamp())
- for y, m in (
- (2017, 1),
- (2015, 7),
- (2012, 7),
- (2009, 1),
- (2006, 1),
- (1999, 1),
- (1997, 7),
- (1996, 1),
- (1994, 7),
- (1993, 7),
- (1992, 7),
- (1991, 1),
- (1990, 1),
- (1988, 1),
- (1985, 7),
- (1983, 7),
- (1982, 7),
- (1981, 7),
- (1980, 1),
- (1979, 1),
- (1978, 1),
- (1977, 1),
- (1976, 1),
- (1975, 1),
- (1974, 1),
- (1973, 1),
- (1972, 7),
+ Leapsecs1972 + i + int(
+ datetime(y, m, 1, 0, 0, 0, 0, tzinfo=timezone.utc).timestamp()
)
+ for i, (y, m) in enumerate((
+ (1972, 7),
+ (1973, 1),
+ (1974, 1),
+ (1975, 1),
+ (1976, 1),
+ (1977, 1),
+ (1978, 1),
+ (1979, 1),
+ (1980, 1),
+ (1981, 7),
+ (1982, 7),
+ (1983, 7),
+ (1985, 7),
+ (1988, 1),
+ (1990, 1),
+ (1991, 1),
+ (1992, 7),
+ (1993, 7),
+ (1994, 7),
+ (1996, 1),
+ (1997, 7),
+ (1999, 1),
+ (2006, 1),
+ (2009, 1),
+ (2012, 7),
+ (2015, 7),
+ (2017, 1),
+ ))
)
raise NotImplementedError("no FLOAT* support")
if isinstance(v, datetime):
secs = int(v.replace(tzinfo=timezone.utc).timestamp())
- diff = Leapsecs1972
- for n, leapsec in enumerate(Leapsecs):
- if secs > leapsec:
- diff += len(Leapsecs) - n
- break
- secs += TAI64Base + diff
+ secs = utc2tai(secs)
if v.microsecond == 0:
return _byte(TagTAI64) + secs.to_bytes(8, "big")
return (
_tais = {TagTAI64: 8, TagTAI64N: 12, TagTAI64NA: 16}
-def loads(v, sets=False):
+def tai2utc(secs, leapsecUTCAllow=False):
+ """Subtract leapseconds from TAI
+
+ :returns: UTC seconds, or None of TAI equals to leap second
+ """
+ secs -= TAI64Base
+ diff = 0
+ for leapsec in Leapsecs:
+ if secs < leapsec:
+ break
+ diff += 1
+ if secs == leapsec:
+ if leapsecUTCAllow:
+ break
+ return None
+ return secs - (diff + Leapsecs1972)
+
+
+def utc2tai(secs):
+ """Add leapseconds to UTC
+ """
+ for diff, leapsec in enumerate(Leapsecs):
+ if secs < leapsec:
+ break
+ secs += Leapsecs1972 + diff
+ if secs in Leapsecs:
+ secs += 1
+ return secs + TAI64Base
+
+
+def loads(v, sets=False, leapsecUTCAllow=False):
+ """Decode YAC-encoded data.
+
+ :param bool sets: transform maps with NIL-only values to set()s
+ :param bool leapsecUTCAllow: allow TAI64 values equal to leap seconds,
+ to be decoded as datetime UTC value. Raw()
+ value will be returned instead
+ :returns: decoded data and the undecoded tail
+ :rtype: (any, bytes)
+ """
if len(v) == 0:
raise NotEnoughData(1)
if v[0] == TagEOC:
secs = int.from_bytes(v[1:1+8], "big")
if secs >= (1 << 63):
raise DecodeError("reserved TAI64 value is in use")
- secs -= TAI64Base
- diff = 0
- for n, leapsec in enumerate(Leapsecs):
- if secs > (leapsec + len(Leapsecs) - n):
- diff = 10 + len(Leapsecs) - n
- break
- secs -= diff
nsecs = 0
if l > 8:
nsecs = int.from_bytes(v[1+8:1+8+4], "big")
asecs = int.from_bytes(v[1+8+4:1+8+4+4], "big")
if asecs > 999999999:
raise DecodeError("too many attoseconds")
+ secs = tai2utc(secs, leapsecUTCAllow)
+ if secs is None:
+ return Raw(v[0], v[1:1+l]), v[1+l:]
if (abs(secs) > (1 << 60)) or (asecs > 0) or ((nsecs % 1000) > 0):
# Python can represent neither big values, nor nanoseconds
return Raw(v[0], v[1:1+l]), v[1+l:]