--- /dev/null
+__pycache__
+*.pyc
+.vscode
+.coverage
+.hypothesis
\ No newline at end of file
return dumps({i: None for i in v})
if isinstance(v, dict):
raws = [_byte(TagMap)]
+ if not all(isinstance(k, str) for k in v.keys()):
+ raise ValueError("map keys can contain only strings")
for k in sorted(v.keys(), key=LenFirstSort):
- assert isinstance(k, str)
+ if len(k) == 0:
+ raise ValueError("map keys can not be empty")
raws.append(dumps(k))
raws.append(dumps(v[k]))
raws.append(_byte(TagEOC))
break
if not isinstance(k, str):
raise DecodeError("non-string key")
+ if len(k) == 0:
+ raise DecodeError("empty key")
if (len(k) < len(kPrev)) or ((len(k) == len(kPrev)) and (k <= kPrev)):
raise DecodeError("unsorted keys")
i, v = loads(v, sets=sets)
--- /dev/null
+import unittest
+
+from hypothesis import given
+from hypothesis.strategies import binary
+
+from pyac import Blob
+from pyac import dumps
+from pyac import DecodeError
+from pyac import loads
+from pyac import NotEnoughData
+
+
+class TestBlob(unittest.TestCase):
+ def test_blob_encode(self) -> None:
+ blob = Blob(4, b"testdata")
+ encoded = dumps(blob)
+ self.assertEqual(
+ encoded, b"\x0b\x00\x00\x00\x00\x00\x00\x00\x03\x01test\x01data\x80"
+ )
+
+ @given(binary(max_size=20))
+ def test_blob_decode(self, junk: bytes) -> None:
+ encoded = b"\x0B\x00\x00\x00\x00\x00\x00\x00\x03\x01test\x01data\x80" + junk
+ decoded, remaining = loads(encoded)
+ self.assertEqual(decoded.l, 4)
+ self.assertEqual(decoded.v, b"testdata")
+ self.assertEqual(remaining, junk)
+
+ def test_throws_when_not_enough_data(self) -> None:
+ encoded = b"\x0B\x00\x00\x00\x00\x00\x00\x00\x03\x01test\x01da"
+ with self.assertRaises(NotEnoughData) as cm:
+ loads(encoded)
+ self.assertEqual(cm.exception.n, 4)
+
+ def test_throws_when_not_enough_data_for_length(self) -> None:
+ encoded = b"\x0B\x00\x00\x00\x00"
+ with self.assertRaises(NotEnoughData) as cm:
+ loads(encoded)
+ self.assertEqual(cm.exception.n, 9)
+
+ def test_throws_when_wrong_terminator_length(self) -> None:
+ encoded = b"\x0B\x00\x00\x00\x00\x00\x00\x00\x03\x01test\x01data\x8Aterminator"
+ with self.assertRaises(DecodeError) as cm:
+ loads(encoded)
+ self.assertEqual(str(cm.exception), "wrong terminator len")
+
+ def test_throws_when_wrong_terminator_tag(self) -> None:
+ encoded = b"\x0B\x00\x00\x00\x00\x00\x00\x00\x03\x01test\x01data\x04that was a wrong tag"
+ with self.assertRaises(DecodeError) as cm:
+ loads(encoded)
+ self.assertEqual(str(cm.exception), "unexpected tag")
+
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null
+import unittest
+
+from hypothesis import given
+from hypothesis.strategies import binary
+
+from pyac import dumps
+from pyac import loads
+
+
+class TestBool(unittest.TestCase):
+ def test_bool_encode_true(
+ self,
+ ):
+ encoded = dumps(True)
+ self.assertEqual(encoded, b"\x03")
+
+ def test_bool_encode_false(self):
+ encoded = dumps(False)
+ self.assertEqual(encoded, b"\x02")
+
+ @given(binary(max_size=20))
+ def test_bool_decode_true(self, junk):
+ encoded = b"\x03" + junk
+ decoded, remaining = loads(encoded)
+ self.assertIs(decoded, True)
+ self.assertEqual(remaining, junk)
+
+ @given(binary(max_size=20))
+ def test_bool_decode_false(self, junk):
+ encoded = b"\x02" + junk
+ decoded, remaining = loads(encoded)
+ self.assertIs(decoded, False)
+ self.assertEqual(remaining, junk)
+
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null
+import unittest
+
+from hypothesis import given
+from hypothesis.strategies import integers
+from typing import Any
+
+from pyac import dumps
+from pyac import DecodeError
+from pyac import loads
+from pyac import NotEnoughData
+
+
+class TestError(unittest.TestCase):
+ @given(integers(min_value=1, max_value=100))
+ def test_not_enough_data_str(self, integer: int) -> None:
+ self.assertEqual(str(NotEnoughData(integer)), "{} bytes expected".format(integer))
+
+ @given(integers(min_value=1, max_value=100))
+ def test_not_enough_data_repr(self, integer: int) -> None:
+ self.assertEqual(NotEnoughData(integer).__repr__(), "NotEnoughData({} bytes expected)".format(integer))
+
+ @given(integers(min_value=1, max_value=100))
+ def test_not_enough_data_init(self, integer: int) -> None:
+ self.assertEqual(NotEnoughData(integer).n, integer)
+
+ def test_throws_when_unknown_class(self) -> None:
+ with self.assertRaises(NotImplementedError) as cm:
+ class a:
+ pass
+ a_instance: Any = a()
+ dumps(a_instance)
+ self.assertEqual(str(cm.exception), "unsupported type")
+
+ def test_throws_when_unknown_tag(self) -> None:
+ with self.assertRaises(DecodeError) as cm:
+ tag: bytes = b"\x05"
+ loads(tag)
+ self.assertEqual(str(cm.exception), "unknown tag")
+
+ def test_throws_when_empty(self) -> None:
+ with self.assertRaises(NotEnoughData):
+ encoded: bytes = b""
+ loads(encoded)
+
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null
+import unittest
+
+from typing import List
+
+from pyac import dumps
+from pyac import loads
+from pyac import NotEnoughData
+from pyac import Raw
+
+
+class TestFloat(unittest.TestCase):
+ def test_throws_when_dumps_float(self) -> None:
+ with self.assertRaises(NotImplementedError) as cm:
+ dumps(1.5)
+ self.assertEqual(str(cm.exception), "no FLOAT* support")
+
+ def test_float_loads(self) -> None:
+ floats: List[bytes] = [
+ b"\x10" + b"\x11" * 2,
+ b"\x11" + b"\x11" * 4,
+ b"\x12" + b"\x11" * 8,
+ b"\x13" + b"\x11" * 16,
+ b"\x14" + b"\x11" * 32,
+ ]
+ expecteds: List[Raw] = [
+ Raw(0x10, b"\x11" * 2),
+ Raw(0x11, b"\x11" * 4),
+ Raw(0x12, b"\x11" * 8),
+ Raw(0x13, b"\x11" * 16),
+ Raw(0x14, b"\x11" * 32),
+ ]
+ for _float, expected in zip(floats, expecteds):
+ decoded, remaining = loads(_float)
+ self.assertEqual(decoded, expected)
+ self.assertEqual(remaining, b"")
+
+ def test_float_not_enough_data(self) -> None:
+ floats: List[bytes] = [
+ b"\x10" + b"\x11" * 1,
+ b"\x11" + b"\x11" * 3,
+ b"\x12" + b"\x11" * 7,
+ b"\x13" + b"\x11" * 15,
+ b"\x14" + b"\x11" * 31,
+ ]
+ for fl in floats:
+ with self.assertRaises(NotEnoughData):
+ loads(fl)
+
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null
+import unittest
+
+from pyac import dumps
+from pyac import DecodeError
+from pyac import loads
+from pyac import NotEnoughData
+
+
+class TestInt(unittest.TestCase):
+ def test_int_positive(self) -> None:
+ ints: list[int] = [1, 123, 1 << 64, 1 << 130]
+ expected: list[bytes] = [
+ b"\x0C\x81\x01",
+ b"\x0C\x81\x7B",
+ b"\x0c\x89\x01\x00\x00\x00\x00\x00\x00\x00\x00",
+ b"\x0c\x91\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
+ ]
+ for integer, encoded in zip(ints, expected):
+ self.assertEqual(dumps(integer), encoded)
+ decoded, remaining = loads(encoded)
+ self.assertEqual(decoded, integer)
+ self.assertEqual(remaining, b"")
+
+ def test_int_negative(self) -> None:
+ ints: list[int] = [-1, -123, -(1 << 64), -(1 << 130)]
+ expected: list[bytes] = [
+ b"\x0D\x80",
+ b"\x0D\x81\x7a",
+ b"\x0D\x88\xff\xff\xff\xff\xff\xff\xff\xff",
+ b"\x0D\x91\x03\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff",
+ ]
+ for integer, encoded in zip(ints, expected):
+ self.assertEqual(dumps(integer), encoded)
+ decoded, remaining = loads(encoded)
+ self.assertEqual(decoded, integer)
+ self.assertEqual(remaining, b"")
+
+ def test_int_zero(self) -> None:
+ encoded: bytes = dumps(0)
+ self.assertEqual(encoded, b"\x0C\x80")
+ decoded, remaining = loads(encoded)
+ self.assertEqual(decoded, 0)
+ self.assertEqual(remaining, b"")
+
+ def test_int_decode_not_enough_data(self) -> None:
+ encoded: bytes = b"\x0C\x81"
+ with self.assertRaises(NotEnoughData) as cm:
+ loads(encoded)
+ self.assertEqual(cm.exception.n, 2)
+
+ def test_throws_when_unminimal_int(self) -> None:
+ with self.assertRaises(DecodeError) as cm:
+ encoded: bytes = b"\x0C\x81\x00\x7B"
+ loads(encoded)
+ self.assertEqual(str(cm.exception), "non-minimal encoding")
+
+ def test_throws_when_non_bin_in_int(self) -> None:
+ with self.assertRaises(DecodeError) as cm:
+ encoded: bytes = b"\x0C\x01\x7B"
+ loads(encoded)
+ self.assertEqual(str(cm.exception), "non-BIN in INT")
--- /dev/null
+import unittest
+
+from hypothesis import given
+from hypothesis.strategies import binary
+from hypothesis.strategies import booleans
+from hypothesis.strategies import datetimes
+from hypothesis.strategies import deferred
+from hypothesis.strategies import dictionaries
+from hypothesis.strategies import integers
+from hypothesis.strategies import just
+from hypothesis.strategies import lists
+from hypothesis.strategies import none
+from hypothesis.strategies import one_of
+from hypothesis.strategies import text
+from hypothesis.strategies import tuples
+from hypothesis.strategies import uuids
+from hypothesis.strategies import characters
+from typing import List
+
+from pyac import Blob
+from pyac import dumps
+from pyac import loads
+from pyac import NotEnoughData
+
+
+blobs_st = tuples(integers(1, 20), binary(max_size=60)).map(lambda x: Blob(*x))
+
+text_st = text(
+ alphabet=characters(exclude_characters="\x00", exclude_categories=("Cs",)),
+ max_size=32,
+)
+
+any_st = one_of(
+ booleans(),
+ integers(),
+ just(0),
+ just(-1),
+ binary(max_size=32),
+ text_st,
+ none(),
+ uuids(),
+ datetimes(),
+)
+
+
+class TestList(unittest.TestCase):
+ def test_list_encode_empty(self) -> None:
+ encoded = dumps([])
+ self.assertEqual(encoded, b"\x08\x00")
+
+ @given(lists(any_st))
+ def test_list_encode_non_empty(self, test_list: List) -> None:
+ encoded = dumps(test_list)
+ self.assertEqual(
+ encoded, b"\x08" + b"".join(dumps(i) for i in test_list) + b"\x00"
+ )
+
+ def test_list_decode_empty(self) -> None:
+ encoded = b"\x08\x00"
+ decoded, remaining = loads(encoded)
+ self.assertEqual(decoded, [])
+ self.assertEqual(remaining, b"")
+
+ @given(lists(any_st), binary(max_size=20))
+ def test_list_decode_non_empty(self, test_list: List, junk: bytes) -> None:
+ encoded = b"\x08" + b"".join(dumps(i) for i in test_list) + b"\x00" + junk
+ decoded, remaining = loads(encoded)
+ self.assertEqual(decoded, test_list)
+ self.assertEqual(remaining, junk)
+
+ @given(lists(any_st))
+ def test_no_eoc(self, test_list: List) -> None:
+ encoded = dumps(test_list)[:-1]
+ with self.assertRaises(NotEnoughData) as cm:
+ loads(encoded)
+ self.assertEqual(cm.exception.n, 1)
+
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null
+import unittest
+
+from hypothesis import given
+from hypothesis.strategies import binary
+from hypothesis.strategies import booleans
+from hypothesis.strategies import datetimes
+from hypothesis.strategies import dictionaries
+from hypothesis.strategies import integers
+from hypothesis.strategies import just
+from hypothesis.strategies import lists
+from hypothesis.strategies import none
+from hypothesis.strategies import one_of
+from hypothesis.strategies import text
+from hypothesis.strategies import tuples
+from hypothesis.strategies import uuids
+from hypothesis.strategies import characters
+
+from pyac import Blob
+from pyac import dumps
+from pyac import DecodeError
+from pyac import loads
+
+blobs_st = tuples(integers(1, 20), binary(max_size=60)).map(lambda x: Blob(*x))
+
+text_st = text(
+ alphabet=characters(exclude_characters="\x00", exclude_categories=("Cs",)),
+ max_size=32,
+)
+
+any_st = one_of(
+ booleans(),
+ integers(),
+ just(0),
+ just(-1),
+ binary(max_size=32),
+ text_st,
+ none(),
+ uuids(),
+ datetimes(),
+)
+
+mapkey_st = text(
+ alphabet=characters(exclude_characters="\x00", exclude_categories=("Cs",)),
+ min_size=1,
+ max_size=8,
+)
+
+
+class TestMap(unittest.TestCase):
+ @given(dictionaries(keys=mapkey_st, values=any_st, max_size=4))
+ def test_map_encode(self, test_map):
+ encoded = dumps(test_map)
+ expected = (
+ b"\x09" +
+ b"".join(
+ [
+ b"".join([dumps(key), dumps(test_map[key])])
+ for key in sorted(test_map.keys(), key=lambda x: [len(x), x])
+ ]
+ ) +
+ b"\x00"
+ )
+ self.assertEqual(encoded, expected)
+
+ @given(dictionaries(keys=mapkey_st, values=any_st, max_size=4), binary(max_size=20))
+ def test_map_decode(self, test_map, junk):
+ encoded = (
+ b"\x09" +
+ b"".join(
+ [
+ b"".join([dumps(key), dumps(test_map[key])])
+ for key in sorted(test_map.keys(), key=lambda x: [len(x), x])
+ ]
+ ) +
+ b"\x00" +
+ junk
+ )
+ decoded, remaining = loads(encoded)
+ self.assertEqual(decoded, test_map)
+ self.assertEqual(remaining, junk)
+
+ @given(binary(max_size=20))
+ def test_map_empty(self, junk):
+ test_map = {}
+ encoded = dumps(test_map) + junk
+ expected = b"\x09\x00" + junk
+ self.assertEqual(encoded, expected)
+ decoded, remaining = loads(encoded)
+ self.assertEqual(decoded, test_map)
+ self.assertEqual(remaining, junk)
+
+ @given(lists(mapkey_st, max_size=4), binary(max_size=20))
+ def test_decode_to_set(self, keys, junk):
+ test_map = {key: None for key in keys}
+ encoded = dumps(test_map) + junk
+ decoded, remaining = loads(encoded, sets=True)
+ self.assertEqual(decoded, set(keys))
+ self.assertEqual(remaining, junk)
+
+ def test_map_throws_when_decoding_unsorted_keys(self):
+ encoded = b"\x09\xc4key2\x0c\x81\x01\xc4key1\xc6value1\x00"
+ with self.assertRaises(DecodeError) as cm:
+ loads(encoded)
+ self.assertEqual(str(cm.exception), "unsorted keys")
+
+ def test_map_throws_when_encoding_non_string_key(self):
+ with self.assertRaises(ValueError) as cm:
+ dumps({1: "a"})
+ self.assertEqual(str(cm.exception), "map keys can contain only strings")
+
+ def test_map_throws_when_decoding_non_string_key(self):
+ encoded = b"\x09\x0c\x80\xc6value2\x00"
+ with self.assertRaises(DecodeError) as cm:
+ loads(encoded)
+ self.assertEqual(str(cm.exception), "non-string key")
+
+ def test_map_throws_when_unexpected_eoc(self):
+ encoded = b"\x09\xc4key1\x00\x00"
+ with self.assertRaises(DecodeError) as cm:
+ decoded, remaining = loads(encoded)
+ self.assertEqual(str(cm.exception), "unexpected EOC")
+
+ def test_map_throws_when_encoding_empty_str_as_key(self):
+ with self.assertRaises(ValueError) as cm:
+ dumps({"": "a"})
+ self.assertEqual(str(cm.exception), "map keys can not be empty")
+
+ def test_map_throws_when_decoding_empty_str_as_key(self):
+ encoded = b"\x09\xc0\x0c\x81\x01\xc4key1\xc6value1\x00"
+ with self.assertRaises(DecodeError) as cm:
+ decoded, remaining = loads(encoded)
+ self.assertEqual(str(cm.exception), "empty key")
--- /dev/null
+# import unittest
+# from pyac import LenFirstSort, _byte
+
+
+# class TestMisc(unittest.TestCase):
+# def test_len_first_sort(self):
+# v = "some_string"
+# self.assertEqual(LenFirstSort(v), (len(v), v))
+
+# def test_
+# byte(self):
+# v = 0x01
+# self.assertEqual(_byte(v), v.to_bytes(1, "big"))
+
+
+# if __name__ == "__main__":
+# unittest.main()
--- /dev/null
+# import unittest
+
+# from hypothesis.strategies import integers
+# from hypothesis.strategies import binary
+
+# from pyac import dumps
+# from pyac import loads
+# from pyac import Raw
+
+
+# class TestRaw(unittest.TestCase):
+# def test_raw_encode(self):
+# raw = Raw(2, b"data")
+# self.assertEqual(dumps(raw), b"\x02data")
+
+# def test_raw_decode(self):
+# encoded = b"\x02data"
+# decoded, remaining = loads(encoded)
+# self.assertEqual(decoded, Raw(1, data))
+# self.assertEqual(remaining, b"")
+
+
+# if __name__ == "__main__":
+# unittest.main()
--- /dev/null
+import unittest
+
+from hypothesis import given
+from hypothesis.strategies import integers
+from hypothesis.strategies import text
+
+from pyac import dumps
+from pyac import DecodeError
+from pyac import loads
+from pyac import NotEnoughData
+
+
+TagStr: int = 0x80
+TagUTF8: int = 0x40
+
+invalid_utf8_2byte_s = integers(min_value=0, max_value=(1 << 5) - 1).map(
+ lambda v: bytes(bytearray([1 << 7 | 1 << 6 | v]))
+)
+
+invalid_utf8_3byte_s = integers(min_value=0, max_value=(1 << 10) - 1).map(
+ lambda v: bytes(bytearray([
+ (1 << 7 | 1 << 6 | 1 << 5 | (v >> 6)),
+ (1 << 7 | ((1 << 6) - 1) & v),
+ ]))
+)
+
+
+class TestString(unittest.TestCase):
+ @given(text(max_size=60))
+ def test_encode_utf8(self, test_str: str) -> None:
+ if(len(test_str.encode("utf-8")) > 60):
+ return
+ encoded = dumps(test_str)
+ tag = (TagStr | TagUTF8 | len(test_str.encode("utf-8"))).to_bytes(1, "big")
+ self.assertEqual(encoded, tag + test_str.encode("utf-8"))
+
+ def test_long_utf8(self) -> None:
+ long_strings = ["a" * 62, "a" * 318, "a" * 65853]
+ for long_string in long_strings:
+ encoded = dumps(long_string)
+ decoded, remaining = loads(encoded)
+ self.assertEqual(decoded, long_string)
+ self.assertEqual(remaining, b"")
+
+ @given(text(max_size=60))
+ def test_encode_non_utf8(self) -> None:
+ bs = b"\x00\x01\x02"
+ encoded = dumps(bs)
+ self.assertEqual(encoded, b"\x83\x00\x01\x02")
+ decoded, remaining = loads(encoded)
+ self.assertEqual(decoded, b"\x00\x01\x02")
+ self.assertEqual(remaining, b"")
+
+ def test_long_non_utf8(self) -> None:
+ long_bss = [b"\x01" * 62, b"\x01" * 318, b"\x01" * 65853]
+ for long_bs in long_bss:
+ encoded = dumps(long_bs)
+ decoded, remaining = loads(encoded)
+ self.assertEqual(decoded, long_bs)
+ self.assertEqual(remaining, b"")
+
+ def test_throws_when_not_enough_data(self) -> None:
+ encoded = b"\x85he"
+ with self.assertRaises(NotEnoughData) as cm:
+ loads(encoded)
+ self.assertEqual(cm.exception.n, 6)
+
+ def test_throws_when_not_enough_data_for_length(self) -> None:
+ long_string = "a" * 318
+ encoded = dumps(long_string)[:2]
+ with self.assertRaises(NotEnoughData) as cm:
+ loads(encoded)
+ self.assertEqual(cm.exception.n, 3)
+
+ def test_throws_when_null_byte_in_utf(self) -> None:
+ with self.assertRaises(DecodeError) as cm:
+ result, remaining = loads(b"\xc5he\x00\x01\x02")
+ self.assertEqual(str(cm.exception), "null byte in UTF-8")
+
+ @given(invalid_utf8_2byte_s)
+ def test_throws_when_invalid_utf_2bytes(self, invalid_utf: bytes) -> None:
+ with self.assertRaises(DecodeError) as cm:
+ encoded = dumps("hello")[:-1] + invalid_utf
+ result, remaining = loads(encoded)
+ self.assertEqual(str(cm.exception), "invalid UTF-8")
+
+ @given(invalid_utf8_3byte_s)
+ def test_throws_when_invalid_utf_3bytes(self, invalid_utf: bytes) -> None:
+ with self.assertRaises(DecodeError) as cm:
+ encoded = dumps("hello")[:-2] + invalid_utf
+ result, remaining = loads(encoded)
+ self.assertEqual(str(cm.exception), "invalid UTF-8")
--- /dev/null
+import unittest
+
+from typing import Any
+from hypothesis import given
+from hypothesis.strategies import binary
+from hypothesis.strategies import booleans
+from hypothesis.strategies import datetimes
+from hypothesis.strategies import deferred
+from hypothesis.strategies import dictionaries
+from hypothesis.strategies import integers
+from hypothesis.strategies import just
+from hypothesis.strategies import lists
+from hypothesis.strategies import none
+from hypothesis.strategies import one_of
+from hypothesis.strategies import text
+from hypothesis.strategies import tuples
+from hypothesis.strategies import uuids
+from hypothesis.strategies import characters
+
+from pyac import dumps
+from pyac import loads
+from pyac import Blob
+
+blobs_st = tuples(integers(1, 20), binary(max_size=60)).map(lambda x: Blob(*x))
+
+text_st = text(
+ alphabet=characters(exclude_characters="\x00", exclude_categories=("Cs",)),
+ max_size=32,
+)
+
+mapkey_st = text(
+ alphabet=characters(exclude_characters="\x00", exclude_categories=("Cs",)),
+ min_size=1,
+ max_size=8,
+)
+
+any_st = one_of(
+ booleans(),
+ integers(),
+ just(0),
+ just(-1),
+ binary(max_size=32),
+ text_st,
+ none(),
+ uuids(),
+ datetimes(),
+)
+
+everything_st = deferred(
+ lambda: any_st |
+ lists(everything_st, max_size=4) |
+ dictionaries(
+ mapkey_st,
+ everything_st,
+ max_size=4,
+ )
+)
+
+
+class TestSymmetric(unittest.TestCase):
+ @given(everything_st, binary(max_size=20))
+ def test_symmetric(self, obj: Any, junk: bytes) -> None:
+ encoded: bytes = dumps(obj) + junk
+ decoded: Any
+ remaining: bytes
+ decoded, remaining = loads(encoded)
+ self.assertSequenceEqual(remaining, junk)
+ self.assertEqual(decoded, obj)
--- /dev/null
+import unittest
+
+from datetime import datetime
+from datetime import timedelta
+from hypothesis import given
+from hypothesis.strategies import binary
+from uuid import UUID
+
+from pyac import dumps
+from pyac import DecodeError
+from pyac import Leapsecs
+from pyac import loads
+from pyac import NotEnoughData
+from pyac import Raw
+from pyac import TAI64Base
+
+TagTAI64 = 0x18
+TagTAI64N = 0x19
+TagTAI64NA = 0x1A
+
+DJB_Leapsecs = (
+ "4000000004b2580a",
+ "4000000005a4ec0b",
+ "4000000007861f8c",
+ "400000000967530d",
+ "400000000b48868e",
+ "400000000d2b0b8f",
+ "400000000f0c3f10",
+ "4000000010ed7291",
+ "4000000012cea612",
+ "40000000159fca93",
+ "400000001780fe14",
+ "4000000019623195",
+ "400000001d25ea16",
+ "4000000021dae517",
+ "40000000259e9d98",
+ "40000000277fd119",
+ "400000002a50f59a",
+ "400000002c32291b",
+ "400000002e135c9c",
+ "4000000030e7241d",
+ "4000000033b8489e",
+ "40000000368c101f",
+ "4000000043b71ba0",
+ "40000000495c07a1",
+ "400000004fef9322",
+ "4000000055932da3",
+ "40000000586846a4",
+)
+
+
+class TestTAI64(unittest.TestCase):
+ @given(binary(max_size=20))
+ def test_encode_decode_tai64(self, junk: bytes) -> None:
+ dt = datetime(2023, 10, 1, 12, 0, 0)
+ encoded = dumps(dt)
+ self.assertEqual(encoded, b"\x18\x40\x00\x00\x00\x65\x19\x5f\x65")
+ decoded, remaining = loads(encoded + junk)
+ self.assertEqual(dt, decoded)
+ self.assertEqual(remaining, junk)
+
+ @given(binary(max_size=20))
+ def test_encode_decode_tai64n(self, junk: bytes) -> None:
+ dt = datetime(2023, 10, 1, 12, 0, 0, 123456)
+ encoded = dumps(dt) + junk
+ self.assertEqual(
+ encoded, b"\x19\x40\x00\x00\x00\x65\x19\x5f\x65\x07\x5b\xca\x00" + junk
+ )
+ decoded, remaining = loads(encoded)
+ self.assertEqual(dt, decoded)
+ self.assertEqual(remaining, junk)
+
+ @given(binary(max_size=20))
+ def test_decode_tai64na(self, junk: bytes) -> None:
+ encoded = (
+ b"\x1A\x40\x00\x00\x00\x65\x19\x5f\x65\x07\x5b\xca\x00\x07\x5b\xca\x00" +
+ junk
+ )
+ expected = Raw(
+ 26, b"\x40\x00\x00\x00\x65\x19\x5f\x65\x07\x5b\xca\x00\x07\x5b\xca\x00"
+ )
+ decoded, remaining = loads(encoded)
+ self.assertEqual(decoded, expected)
+ self.assertEqual(remaining, junk)
+
+ def test_throws_when_not_enough_data_for_tai64(self) -> None:
+ with self.assertRaises(NotEnoughData):
+ loads(b"\x18" + b"\x00" * 7)
+
+ def test_throws_when_not_enough_data_for_tai64n(self) -> None:
+ with self.assertRaises(NotEnoughData):
+ loads(b"\x19" + b"\x00" * 11)
+
+ def test_throws_when_not_enough_data_for_tai64na(self) -> None:
+ with self.assertRaises(NotEnoughData):
+ loads(b"\x1A" + b"\x00" * 15)
+
+ def test_large_number_of_secs(self) -> None:
+ decoded, remaining = loads(b"\x18\x70\x00\x00\x00\x65\x19\x5f\x65")
+ self.assertEqual(decoded, Raw(t=24, v=b"p\x00\x00\x00e\x19_e"))
+ self.assertEqual(remaining, b"")
+
+ def test_nanoseconds_not_convertible_to_microseconds(self) -> None:
+ decoded, remaining = loads(
+ b"\x19\x40\x00\x00\x00\x65\x19\x5f\x65\x07\x5b\xca\x01"
+ )
+ self.assertEqual(decoded, Raw(t=25, v=b"@\x00\x00\x00e\x19_e\x07[\xca\x01"))
+ self.assertEqual(remaining, b"")
+
+ def test_throws_when_first_bit_is_in_use(self) -> None:
+ with self.assertRaises(DecodeError) as cm:
+ loads(b"\x18\x80\x00\x00\x00\x65\x19\x5f\x65")
+ self.assertEqual(str(cm.exception), "reserved TAI64 value is in use")
+
+ def test_throws_when_too_many_nanosecs(self) -> None:
+ with self.assertRaises(DecodeError) as cm:
+ loads(b"\x19\x40\x00\x00\x00\x65\x19\x5f\x65" + (999999999 + 1).to_bytes(4, "big"))
+ self.assertEqual(str(cm.exception), "too many nanoseconds")
+
+ def test_throws_when_too_many_attosecs(self) -> None:
+ with self.assertRaises(DecodeError) as cm:
+ loads(
+ b"\x1A\x40\x00\x00\x00\x65\x19\x5f\x65\x07\x5b\xca\x00\xA7\x5b\xca\x00"
+ )
+ self.assertEqual(str(cm.exception), "too many attoseconds")
+
+ def test_leapseconds_match_DJB(self) -> None:
+ """Check that our pyac.Leapsecs is equally calculated to DJB's
+ ``curl http://cr.yp.to/libtai/leapsecs.dat | xxd -c 8 -p``
+ """
+ for our, their in zip(
+ Leapsecs,
+ DJB_Leapsecs
+ ):
+ self.assertEqual(
+ our,
+ int.from_bytes(bytes.fromhex(their), "big") - TAI64Base,
+ )
+
+ def test_decode_leapsecond(self) -> None:
+ for leapsec in DJB_Leapsecs:
+ decoded, remaining = loads(v=b'\x18' + bytes.fromhex(leapsec), leapsecUTCAllow=True)
+ self.assertIsInstance(decoded, datetime)
+ decoded, remaining = loads(v=b'\x18' + bytes.fromhex(leapsec), leapsecUTCAllow=False)
+ self.assertIsInstance(decoded, Raw)
+
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null
+import unittest
+
+from uuid import UUID
+
+from pyac import dumps
+from pyac import loads
+from pyac import NotEnoughData
+
+
+class TestUUID(unittest.TestCase):
+ def test_uuid_encode(self) -> None:
+ uuid_str: str = "12345678-1234-5678-1234-567812345678"
+ uuid_obj: UUID = UUID(uuid_str)
+ encoded: bytes = dumps(uuid_obj)
+ self.assertEqual(encoded, b"\x04\x124Vx\x124Vx\x124Vx\x124Vx")
+
+ def test_uuid_decode(self) -> None:
+ uuid_str: str = "12345678-1234-5678-1234-567812345678"
+ encoded: bytes = b"\x04\x124Vx\x124Vx\x124Vx\x124Vx"
+ decoded: UUID
+ decoded, _ = loads(encoded)
+ self.assertEqual(decoded, UUID(uuid_str))
+
+ def test_uuid_not_enough_data(self) -> None:
+ encoded: bytes = b"\x04\x124Vx\x124Vx\x124Vx\x124V"
+ with self.assertRaises(NotEnoughData):
+ loads(encoded)
+
+
+if __name__ == "__main__":
+ unittest.main()
+++ /dev/null
-from unittest import TestCase
-
-from pyac import Leapsecs
-from pyac import TAI64Base
-
-
-class TestLeapsecs(TestCase):
- """Check that our pyac.Leapsecs is equally calculated to DJB's
- ``curl http://cr.yp.to/libtai/leapsecs.dat | xxd -c 8 -p``
- """
-
- def runTest(self):
- for our, their in zip(Leapsecs, (
- "4000000004b2580a",
- "4000000005a4ec0b",
- "4000000007861f8c",
- "400000000967530d",
- "400000000b48868e",
- "400000000d2b0b8f",
- "400000000f0c3f10",
- "4000000010ed7291",
- "4000000012cea612",
- "40000000159fca93",
- "400000001780fe14",
- "4000000019623195",
- "400000001d25ea16",
- "4000000021dae517",
- "40000000259e9d98",
- "40000000277fd119",
- "400000002a50f59a",
- "400000002c32291b",
- "400000002e135c9c",
- "4000000030e7241d",
- "4000000033b8489e",
- "40000000368c101f",
- "4000000043b71ba0",
- "40000000495c07a1",
- "400000004fef9322",
- "4000000055932da3",
- "40000000586846a4",
- )):
- self.assertEqual(
- our,
- int.from_bytes(bytes.fromhex(their), "big") - TAI64Base,
- )