From: RudenkoAD Date: Fri, 15 Nov 2024 18:46:53 +0000 (+0300) Subject: Add tests X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=e2f4892f8602da3d2cdde0b55178f7511a48b148784054dc633fcb489c6366f5;p=keks.git Add tests --- diff --git a/pyac/.gitignore b/pyac/.gitignore new file mode 100644 index 0000000..5cdbb1e --- /dev/null +++ b/pyac/.gitignore @@ -0,0 +1,5 @@ +__pycache__ +*.pyc +.vscode +.coverage +.hypothesis \ No newline at end of file diff --git a/pyac/pyac.py b/pyac/pyac.py index d3708ef..5c670cd 100755 --- a/pyac/pyac.py +++ b/pyac/pyac.py @@ -217,8 +217,11 @@ def dumps(v): return dumps({i: None for i in v}) if isinstance(v, dict): raws = [_byte(TagMap)] + if not all(isinstance(k, str) for k in v.keys()): + raise ValueError("map keys can contain only strings") for k in sorted(v.keys(), key=LenFirstSort): - assert isinstance(k, str) + if len(k) == 0: + raise ValueError("map keys can not be empty") raws.append(dumps(k)) raws.append(dumps(v[k])) raws.append(_byte(TagEOC)) @@ -370,6 +373,8 @@ def loads(v, sets=False, leapsecUTCAllow=False): break if not isinstance(k, str): raise DecodeError("non-string key") + if len(k) == 0: + raise DecodeError("empty key") if (len(k) < len(kPrev)) or ((len(k) == len(kPrev)) and (k <= kPrev)): raise DecodeError("unsorted keys") i, v = loads(v, sets=sets) diff --git a/pyac/pyac_tests/__init__.py b/pyac/pyac_tests/__init__.py new file mode 100644 index 0000000..473a0f4 diff --git a/pyac/pyac_tests/test_blob.py b/pyac/pyac_tests/test_blob.py new file mode 100644 index 0000000..ffef43a --- /dev/null +++ b/pyac/pyac_tests/test_blob.py @@ -0,0 +1,55 @@ +import unittest + +from hypothesis import given +from hypothesis.strategies import binary + +from pyac import Blob +from pyac import dumps +from pyac import DecodeError +from pyac import loads +from pyac import NotEnoughData + + +class TestBlob(unittest.TestCase): + def test_blob_encode(self) -> None: + blob = Blob(4, b"testdata") + encoded = dumps(blob) + self.assertEqual( + encoded, b"\x0b\x00\x00\x00\x00\x00\x00\x00\x03\x01test\x01data\x80" + ) + + @given(binary(max_size=20)) + def test_blob_decode(self, junk: bytes) -> None: + encoded = b"\x0B\x00\x00\x00\x00\x00\x00\x00\x03\x01test\x01data\x80" + junk + decoded, remaining = loads(encoded) + self.assertEqual(decoded.l, 4) + self.assertEqual(decoded.v, b"testdata") + self.assertEqual(remaining, junk) + + def test_throws_when_not_enough_data(self) -> None: + encoded = b"\x0B\x00\x00\x00\x00\x00\x00\x00\x03\x01test\x01da" + with self.assertRaises(NotEnoughData) as cm: + loads(encoded) + self.assertEqual(cm.exception.n, 4) + + def test_throws_when_not_enough_data_for_length(self) -> None: + encoded = b"\x0B\x00\x00\x00\x00" + with self.assertRaises(NotEnoughData) as cm: + loads(encoded) + self.assertEqual(cm.exception.n, 9) + + def test_throws_when_wrong_terminator_length(self) -> None: + encoded = b"\x0B\x00\x00\x00\x00\x00\x00\x00\x03\x01test\x01data\x8Aterminator" + with self.assertRaises(DecodeError) as cm: + loads(encoded) + self.assertEqual(str(cm.exception), "wrong terminator len") + + def test_throws_when_wrong_terminator_tag(self) -> None: + encoded = b"\x0B\x00\x00\x00\x00\x00\x00\x00\x03\x01test\x01data\x04that was a wrong tag" + with self.assertRaises(DecodeError) as cm: + loads(encoded) + self.assertEqual(str(cm.exception), "unexpected tag") + + +if __name__ == "__main__": + unittest.main() diff --git a/pyac/pyac_tests/test_bool.py b/pyac/pyac_tests/test_bool.py new file mode 100644 index 0000000..586deb7 --- /dev/null +++ b/pyac/pyac_tests/test_bool.py @@ -0,0 +1,37 @@ +import unittest + +from hypothesis import given +from hypothesis.strategies import binary + +from pyac import dumps +from pyac import loads + + +class TestBool(unittest.TestCase): + def test_bool_encode_true( + self, + ): + encoded = dumps(True) + self.assertEqual(encoded, b"\x03") + + def test_bool_encode_false(self): + encoded = dumps(False) + self.assertEqual(encoded, b"\x02") + + @given(binary(max_size=20)) + def test_bool_decode_true(self, junk): + encoded = b"\x03" + junk + decoded, remaining = loads(encoded) + self.assertIs(decoded, True) + self.assertEqual(remaining, junk) + + @given(binary(max_size=20)) + def test_bool_decode_false(self, junk): + encoded = b"\x02" + junk + decoded, remaining = loads(encoded) + self.assertIs(decoded, False) + self.assertEqual(remaining, junk) + + +if __name__ == "__main__": + unittest.main() diff --git a/pyac/pyac_tests/test_errors.py b/pyac/pyac_tests/test_errors.py new file mode 100644 index 0000000..b816910 --- /dev/null +++ b/pyac/pyac_tests/test_errors.py @@ -0,0 +1,47 @@ +import unittest + +from hypothesis import given +from hypothesis.strategies import integers +from typing import Any + +from pyac import dumps +from pyac import DecodeError +from pyac import loads +from pyac import NotEnoughData + + +class TestError(unittest.TestCase): + @given(integers(min_value=1, max_value=100)) + def test_not_enough_data_str(self, integer: int) -> None: + self.assertEqual(str(NotEnoughData(integer)), "{} bytes expected".format(integer)) + + @given(integers(min_value=1, max_value=100)) + def test_not_enough_data_repr(self, integer: int) -> None: + self.assertEqual(NotEnoughData(integer).__repr__(), "NotEnoughData({} bytes expected)".format(integer)) + + @given(integers(min_value=1, max_value=100)) + def test_not_enough_data_init(self, integer: int) -> None: + self.assertEqual(NotEnoughData(integer).n, integer) + + def test_throws_when_unknown_class(self) -> None: + with self.assertRaises(NotImplementedError) as cm: + class a: + pass + a_instance: Any = a() + dumps(a_instance) + self.assertEqual(str(cm.exception), "unsupported type") + + def test_throws_when_unknown_tag(self) -> None: + with self.assertRaises(DecodeError) as cm: + tag: bytes = b"\x05" + loads(tag) + self.assertEqual(str(cm.exception), "unknown tag") + + def test_throws_when_empty(self) -> None: + with self.assertRaises(NotEnoughData): + encoded: bytes = b"" + loads(encoded) + + +if __name__ == "__main__": + unittest.main() diff --git a/pyac/pyac_tests/test_float.py b/pyac/pyac_tests/test_float.py new file mode 100644 index 0000000..cccf460 --- /dev/null +++ b/pyac/pyac_tests/test_float.py @@ -0,0 +1,51 @@ +import unittest + +from typing import List + +from pyac import dumps +from pyac import loads +from pyac import NotEnoughData +from pyac import Raw + + +class TestFloat(unittest.TestCase): + def test_throws_when_dumps_float(self) -> None: + with self.assertRaises(NotImplementedError) as cm: + dumps(1.5) + self.assertEqual(str(cm.exception), "no FLOAT* support") + + def test_float_loads(self) -> None: + floats: List[bytes] = [ + b"\x10" + b"\x11" * 2, + b"\x11" + b"\x11" * 4, + b"\x12" + b"\x11" * 8, + b"\x13" + b"\x11" * 16, + b"\x14" + b"\x11" * 32, + ] + expecteds: List[Raw] = [ + Raw(0x10, b"\x11" * 2), + Raw(0x11, b"\x11" * 4), + Raw(0x12, b"\x11" * 8), + Raw(0x13, b"\x11" * 16), + Raw(0x14, b"\x11" * 32), + ] + for _float, expected in zip(floats, expecteds): + decoded, remaining = loads(_float) + self.assertEqual(decoded, expected) + self.assertEqual(remaining, b"") + + def test_float_not_enough_data(self) -> None: + floats: List[bytes] = [ + b"\x10" + b"\x11" * 1, + b"\x11" + b"\x11" * 3, + b"\x12" + b"\x11" * 7, + b"\x13" + b"\x11" * 15, + b"\x14" + b"\x11" * 31, + ] + for fl in floats: + with self.assertRaises(NotEnoughData): + loads(fl) + + +if __name__ == "__main__": + unittest.main() diff --git a/pyac/pyac_tests/test_int.py b/pyac/pyac_tests/test_int.py new file mode 100644 index 0000000..768a6c8 --- /dev/null +++ b/pyac/pyac_tests/test_int.py @@ -0,0 +1,61 @@ +import unittest + +from pyac import dumps +from pyac import DecodeError +from pyac import loads +from pyac import NotEnoughData + + +class TestInt(unittest.TestCase): + def test_int_positive(self) -> None: + ints: list[int] = [1, 123, 1 << 64, 1 << 130] + expected: list[bytes] = [ + b"\x0C\x81\x01", + b"\x0C\x81\x7B", + b"\x0c\x89\x01\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x0c\x91\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + ] + for integer, encoded in zip(ints, expected): + self.assertEqual(dumps(integer), encoded) + decoded, remaining = loads(encoded) + self.assertEqual(decoded, integer) + self.assertEqual(remaining, b"") + + def test_int_negative(self) -> None: + ints: list[int] = [-1, -123, -(1 << 64), -(1 << 130)] + expected: list[bytes] = [ + b"\x0D\x80", + b"\x0D\x81\x7a", + b"\x0D\x88\xff\xff\xff\xff\xff\xff\xff\xff", + b"\x0D\x91\x03\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff", + ] + for integer, encoded in zip(ints, expected): + self.assertEqual(dumps(integer), encoded) + decoded, remaining = loads(encoded) + self.assertEqual(decoded, integer) + self.assertEqual(remaining, b"") + + def test_int_zero(self) -> None: + encoded: bytes = dumps(0) + self.assertEqual(encoded, b"\x0C\x80") + decoded, remaining = loads(encoded) + self.assertEqual(decoded, 0) + self.assertEqual(remaining, b"") + + def test_int_decode_not_enough_data(self) -> None: + encoded: bytes = b"\x0C\x81" + with self.assertRaises(NotEnoughData) as cm: + loads(encoded) + self.assertEqual(cm.exception.n, 2) + + def test_throws_when_unminimal_int(self) -> None: + with self.assertRaises(DecodeError) as cm: + encoded: bytes = b"\x0C\x81\x00\x7B" + loads(encoded) + self.assertEqual(str(cm.exception), "non-minimal encoding") + + def test_throws_when_non_bin_in_int(self) -> None: + with self.assertRaises(DecodeError) as cm: + encoded: bytes = b"\x0C\x01\x7B" + loads(encoded) + self.assertEqual(str(cm.exception), "non-BIN in INT") diff --git a/pyac/pyac_tests/test_list.py b/pyac/pyac_tests/test_list.py new file mode 100644 index 0000000..6fffd04 --- /dev/null +++ b/pyac/pyac_tests/test_list.py @@ -0,0 +1,80 @@ +import unittest + +from hypothesis import given +from hypothesis.strategies import binary +from hypothesis.strategies import booleans +from hypothesis.strategies import datetimes +from hypothesis.strategies import deferred +from hypothesis.strategies import dictionaries +from hypothesis.strategies import integers +from hypothesis.strategies import just +from hypothesis.strategies import lists +from hypothesis.strategies import none +from hypothesis.strategies import one_of +from hypothesis.strategies import text +from hypothesis.strategies import tuples +from hypothesis.strategies import uuids +from hypothesis.strategies import characters +from typing import List + +from pyac import Blob +from pyac import dumps +from pyac import loads +from pyac import NotEnoughData + + +blobs_st = tuples(integers(1, 20), binary(max_size=60)).map(lambda x: Blob(*x)) + +text_st = text( + alphabet=characters(exclude_characters="\x00", exclude_categories=("Cs",)), + max_size=32, +) + +any_st = one_of( + booleans(), + integers(), + just(0), + just(-1), + binary(max_size=32), + text_st, + none(), + uuids(), + datetimes(), +) + + +class TestList(unittest.TestCase): + def test_list_encode_empty(self) -> None: + encoded = dumps([]) + self.assertEqual(encoded, b"\x08\x00") + + @given(lists(any_st)) + def test_list_encode_non_empty(self, test_list: List) -> None: + encoded = dumps(test_list) + self.assertEqual( + encoded, b"\x08" + b"".join(dumps(i) for i in test_list) + b"\x00" + ) + + def test_list_decode_empty(self) -> None: + encoded = b"\x08\x00" + decoded, remaining = loads(encoded) + self.assertEqual(decoded, []) + self.assertEqual(remaining, b"") + + @given(lists(any_st), binary(max_size=20)) + def test_list_decode_non_empty(self, test_list: List, junk: bytes) -> None: + encoded = b"\x08" + b"".join(dumps(i) for i in test_list) + b"\x00" + junk + decoded, remaining = loads(encoded) + self.assertEqual(decoded, test_list) + self.assertEqual(remaining, junk) + + @given(lists(any_st)) + def test_no_eoc(self, test_list: List) -> None: + encoded = dumps(test_list)[:-1] + with self.assertRaises(NotEnoughData) as cm: + loads(encoded) + self.assertEqual(cm.exception.n, 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/pyac/pyac_tests/test_map.py b/pyac/pyac_tests/test_map.py new file mode 100644 index 0000000..afaac01 --- /dev/null +++ b/pyac/pyac_tests/test_map.py @@ -0,0 +1,132 @@ +import unittest + +from hypothesis import given +from hypothesis.strategies import binary +from hypothesis.strategies import booleans +from hypothesis.strategies import datetimes +from hypothesis.strategies import dictionaries +from hypothesis.strategies import integers +from hypothesis.strategies import just +from hypothesis.strategies import lists +from hypothesis.strategies import none +from hypothesis.strategies import one_of +from hypothesis.strategies import text +from hypothesis.strategies import tuples +from hypothesis.strategies import uuids +from hypothesis.strategies import characters + +from pyac import Blob +from pyac import dumps +from pyac import DecodeError +from pyac import loads + +blobs_st = tuples(integers(1, 20), binary(max_size=60)).map(lambda x: Blob(*x)) + +text_st = text( + alphabet=characters(exclude_characters="\x00", exclude_categories=("Cs",)), + max_size=32, +) + +any_st = one_of( + booleans(), + integers(), + just(0), + just(-1), + binary(max_size=32), + text_st, + none(), + uuids(), + datetimes(), +) + +mapkey_st = text( + alphabet=characters(exclude_characters="\x00", exclude_categories=("Cs",)), + min_size=1, + max_size=8, +) + + +class TestMap(unittest.TestCase): + @given(dictionaries(keys=mapkey_st, values=any_st, max_size=4)) + def test_map_encode(self, test_map): + encoded = dumps(test_map) + expected = ( + b"\x09" + + b"".join( + [ + b"".join([dumps(key), dumps(test_map[key])]) + for key in sorted(test_map.keys(), key=lambda x: [len(x), x]) + ] + ) + + b"\x00" + ) + self.assertEqual(encoded, expected) + + @given(dictionaries(keys=mapkey_st, values=any_st, max_size=4), binary(max_size=20)) + def test_map_decode(self, test_map, junk): + encoded = ( + b"\x09" + + b"".join( + [ + b"".join([dumps(key), dumps(test_map[key])]) + for key in sorted(test_map.keys(), key=lambda x: [len(x), x]) + ] + ) + + b"\x00" + + junk + ) + decoded, remaining = loads(encoded) + self.assertEqual(decoded, test_map) + self.assertEqual(remaining, junk) + + @given(binary(max_size=20)) + def test_map_empty(self, junk): + test_map = {} + encoded = dumps(test_map) + junk + expected = b"\x09\x00" + junk + self.assertEqual(encoded, expected) + decoded, remaining = loads(encoded) + self.assertEqual(decoded, test_map) + self.assertEqual(remaining, junk) + + @given(lists(mapkey_st, max_size=4), binary(max_size=20)) + def test_decode_to_set(self, keys, junk): + test_map = {key: None for key in keys} + encoded = dumps(test_map) + junk + decoded, remaining = loads(encoded, sets=True) + self.assertEqual(decoded, set(keys)) + self.assertEqual(remaining, junk) + + def test_map_throws_when_decoding_unsorted_keys(self): + encoded = b"\x09\xc4key2\x0c\x81\x01\xc4key1\xc6value1\x00" + with self.assertRaises(DecodeError) as cm: + loads(encoded) + self.assertEqual(str(cm.exception), "unsorted keys") + + def test_map_throws_when_encoding_non_string_key(self): + with self.assertRaises(ValueError) as cm: + dumps({1: "a"}) + self.assertEqual(str(cm.exception), "map keys can contain only strings") + + def test_map_throws_when_decoding_non_string_key(self): + encoded = b"\x09\x0c\x80\xc6value2\x00" + with self.assertRaises(DecodeError) as cm: + loads(encoded) + self.assertEqual(str(cm.exception), "non-string key") + + def test_map_throws_when_unexpected_eoc(self): + encoded = b"\x09\xc4key1\x00\x00" + with self.assertRaises(DecodeError) as cm: + decoded, remaining = loads(encoded) + self.assertEqual(str(cm.exception), "unexpected EOC") + + def test_map_throws_when_encoding_empty_str_as_key(self): + with self.assertRaises(ValueError) as cm: + dumps({"": "a"}) + self.assertEqual(str(cm.exception), "map keys can not be empty") + + def test_map_throws_when_decoding_empty_str_as_key(self): + encoded = b"\x09\xc0\x0c\x81\x01\xc4key1\xc6value1\x00" + with self.assertRaises(DecodeError) as cm: + decoded, remaining = loads(encoded) + self.assertEqual(str(cm.exception), "empty key") diff --git a/pyac/pyac_tests/test_misc.py b/pyac/pyac_tests/test_misc.py new file mode 100644 index 0000000..3c34136 --- /dev/null +++ b/pyac/pyac_tests/test_misc.py @@ -0,0 +1,17 @@ +# import unittest +# from pyac import LenFirstSort, _byte + + +# class TestMisc(unittest.TestCase): +# def test_len_first_sort(self): +# v = "some_string" +# self.assertEqual(LenFirstSort(v), (len(v), v)) + +# def test_ +# byte(self): +# v = 0x01 +# self.assertEqual(_byte(v), v.to_bytes(1, "big")) + + +# if __name__ == "__main__": +# unittest.main() diff --git a/pyac/pyac_tests/test_raw.py b/pyac/pyac_tests/test_raw.py new file mode 100644 index 0000000..c8268b5 --- /dev/null +++ b/pyac/pyac_tests/test_raw.py @@ -0,0 +1,24 @@ +# import unittest + +# from hypothesis.strategies import integers +# from hypothesis.strategies import binary + +# from pyac import dumps +# from pyac import loads +# from pyac import Raw + + +# class TestRaw(unittest.TestCase): +# def test_raw_encode(self): +# raw = Raw(2, b"data") +# self.assertEqual(dumps(raw), b"\x02data") + +# def test_raw_decode(self): +# encoded = b"\x02data" +# decoded, remaining = loads(encoded) +# self.assertEqual(decoded, Raw(1, data)) +# self.assertEqual(remaining, b"") + + +# if __name__ == "__main__": +# unittest.main() diff --git a/pyac/pyac_tests/test_str.py b/pyac/pyac_tests/test_str.py new file mode 100644 index 0000000..fd594be --- /dev/null +++ b/pyac/pyac_tests/test_str.py @@ -0,0 +1,92 @@ +import unittest + +from hypothesis import given +from hypothesis.strategies import integers +from hypothesis.strategies import text + +from pyac import dumps +from pyac import DecodeError +from pyac import loads +from pyac import NotEnoughData + + +TagStr: int = 0x80 +TagUTF8: int = 0x40 + +invalid_utf8_2byte_s = integers(min_value=0, max_value=(1 << 5) - 1).map( + lambda v: bytes(bytearray([1 << 7 | 1 << 6 | v])) +) + +invalid_utf8_3byte_s = integers(min_value=0, max_value=(1 << 10) - 1).map( + lambda v: bytes(bytearray([ + (1 << 7 | 1 << 6 | 1 << 5 | (v >> 6)), + (1 << 7 | ((1 << 6) - 1) & v), + ])) +) + + +class TestString(unittest.TestCase): + @given(text(max_size=60)) + def test_encode_utf8(self, test_str: str) -> None: + if(len(test_str.encode("utf-8")) > 60): + return + encoded = dumps(test_str) + tag = (TagStr | TagUTF8 | len(test_str.encode("utf-8"))).to_bytes(1, "big") + self.assertEqual(encoded, tag + test_str.encode("utf-8")) + + def test_long_utf8(self) -> None: + long_strings = ["a" * 62, "a" * 318, "a" * 65853] + for long_string in long_strings: + encoded = dumps(long_string) + decoded, remaining = loads(encoded) + self.assertEqual(decoded, long_string) + self.assertEqual(remaining, b"") + + @given(text(max_size=60)) + def test_encode_non_utf8(self) -> None: + bs = b"\x00\x01\x02" + encoded = dumps(bs) + self.assertEqual(encoded, b"\x83\x00\x01\x02") + decoded, remaining = loads(encoded) + self.assertEqual(decoded, b"\x00\x01\x02") + self.assertEqual(remaining, b"") + + def test_long_non_utf8(self) -> None: + long_bss = [b"\x01" * 62, b"\x01" * 318, b"\x01" * 65853] + for long_bs in long_bss: + encoded = dumps(long_bs) + decoded, remaining = loads(encoded) + self.assertEqual(decoded, long_bs) + self.assertEqual(remaining, b"") + + def test_throws_when_not_enough_data(self) -> None: + encoded = b"\x85he" + with self.assertRaises(NotEnoughData) as cm: + loads(encoded) + self.assertEqual(cm.exception.n, 6) + + def test_throws_when_not_enough_data_for_length(self) -> None: + long_string = "a" * 318 + encoded = dumps(long_string)[:2] + with self.assertRaises(NotEnoughData) as cm: + loads(encoded) + self.assertEqual(cm.exception.n, 3) + + def test_throws_when_null_byte_in_utf(self) -> None: + with self.assertRaises(DecodeError) as cm: + result, remaining = loads(b"\xc5he\x00\x01\x02") + self.assertEqual(str(cm.exception), "null byte in UTF-8") + + @given(invalid_utf8_2byte_s) + def test_throws_when_invalid_utf_2bytes(self, invalid_utf: bytes) -> None: + with self.assertRaises(DecodeError) as cm: + encoded = dumps("hello")[:-1] + invalid_utf + result, remaining = loads(encoded) + self.assertEqual(str(cm.exception), "invalid UTF-8") + + @given(invalid_utf8_3byte_s) + def test_throws_when_invalid_utf_3bytes(self, invalid_utf: bytes) -> None: + with self.assertRaises(DecodeError) as cm: + encoded = dumps("hello")[:-2] + invalid_utf + result, remaining = loads(encoded) + self.assertEqual(str(cm.exception), "invalid UTF-8") diff --git a/pyac/pyac_tests/test_symmetric.py b/pyac/pyac_tests/test_symmetric.py new file mode 100644 index 0000000..6cf674f --- /dev/null +++ b/pyac/pyac_tests/test_symmetric.py @@ -0,0 +1,68 @@ +import unittest + +from typing import Any +from hypothesis import given +from hypothesis.strategies import binary +from hypothesis.strategies import booleans +from hypothesis.strategies import datetimes +from hypothesis.strategies import deferred +from hypothesis.strategies import dictionaries +from hypothesis.strategies import integers +from hypothesis.strategies import just +from hypothesis.strategies import lists +from hypothesis.strategies import none +from hypothesis.strategies import one_of +from hypothesis.strategies import text +from hypothesis.strategies import tuples +from hypothesis.strategies import uuids +from hypothesis.strategies import characters + +from pyac import dumps +from pyac import loads +from pyac import Blob + +blobs_st = tuples(integers(1, 20), binary(max_size=60)).map(lambda x: Blob(*x)) + +text_st = text( + alphabet=characters(exclude_characters="\x00", exclude_categories=("Cs",)), + max_size=32, +) + +mapkey_st = text( + alphabet=characters(exclude_characters="\x00", exclude_categories=("Cs",)), + min_size=1, + max_size=8, +) + +any_st = one_of( + booleans(), + integers(), + just(0), + just(-1), + binary(max_size=32), + text_st, + none(), + uuids(), + datetimes(), +) + +everything_st = deferred( + lambda: any_st | + lists(everything_st, max_size=4) | + dictionaries( + mapkey_st, + everything_st, + max_size=4, + ) +) + + +class TestSymmetric(unittest.TestCase): + @given(everything_st, binary(max_size=20)) + def test_symmetric(self, obj: Any, junk: bytes) -> None: + encoded: bytes = dumps(obj) + junk + decoded: Any + remaining: bytes + decoded, remaining = loads(encoded) + self.assertSequenceEqual(remaining, junk) + self.assertEqual(decoded, obj) diff --git a/pyac/pyac_tests/test_tai.py b/pyac/pyac_tests/test_tai.py new file mode 100644 index 0000000..034b250 --- /dev/null +++ b/pyac/pyac_tests/test_tai.py @@ -0,0 +1,149 @@ +import unittest + +from datetime import datetime +from datetime import timedelta +from hypothesis import given +from hypothesis.strategies import binary +from uuid import UUID + +from pyac import dumps +from pyac import DecodeError +from pyac import Leapsecs +from pyac import loads +from pyac import NotEnoughData +from pyac import Raw +from pyac import TAI64Base + +TagTAI64 = 0x18 +TagTAI64N = 0x19 +TagTAI64NA = 0x1A + +DJB_Leapsecs = ( + "4000000004b2580a", + "4000000005a4ec0b", + "4000000007861f8c", + "400000000967530d", + "400000000b48868e", + "400000000d2b0b8f", + "400000000f0c3f10", + "4000000010ed7291", + "4000000012cea612", + "40000000159fca93", + "400000001780fe14", + "4000000019623195", + "400000001d25ea16", + "4000000021dae517", + "40000000259e9d98", + "40000000277fd119", + "400000002a50f59a", + "400000002c32291b", + "400000002e135c9c", + "4000000030e7241d", + "4000000033b8489e", + "40000000368c101f", + "4000000043b71ba0", + "40000000495c07a1", + "400000004fef9322", + "4000000055932da3", + "40000000586846a4", +) + + +class TestTAI64(unittest.TestCase): + @given(binary(max_size=20)) + def test_encode_decode_tai64(self, junk: bytes) -> None: + dt = datetime(2023, 10, 1, 12, 0, 0) + encoded = dumps(dt) + self.assertEqual(encoded, b"\x18\x40\x00\x00\x00\x65\x19\x5f\x65") + decoded, remaining = loads(encoded + junk) + self.assertEqual(dt, decoded) + self.assertEqual(remaining, junk) + + @given(binary(max_size=20)) + def test_encode_decode_tai64n(self, junk: bytes) -> None: + dt = datetime(2023, 10, 1, 12, 0, 0, 123456) + encoded = dumps(dt) + junk + self.assertEqual( + encoded, b"\x19\x40\x00\x00\x00\x65\x19\x5f\x65\x07\x5b\xca\x00" + junk + ) + decoded, remaining = loads(encoded) + self.assertEqual(dt, decoded) + self.assertEqual(remaining, junk) + + @given(binary(max_size=20)) + def test_decode_tai64na(self, junk: bytes) -> None: + encoded = ( + b"\x1A\x40\x00\x00\x00\x65\x19\x5f\x65\x07\x5b\xca\x00\x07\x5b\xca\x00" + + junk + ) + expected = Raw( + 26, b"\x40\x00\x00\x00\x65\x19\x5f\x65\x07\x5b\xca\x00\x07\x5b\xca\x00" + ) + decoded, remaining = loads(encoded) + self.assertEqual(decoded, expected) + self.assertEqual(remaining, junk) + + def test_throws_when_not_enough_data_for_tai64(self) -> None: + with self.assertRaises(NotEnoughData): + loads(b"\x18" + b"\x00" * 7) + + def test_throws_when_not_enough_data_for_tai64n(self) -> None: + with self.assertRaises(NotEnoughData): + loads(b"\x19" + b"\x00" * 11) + + def test_throws_when_not_enough_data_for_tai64na(self) -> None: + with self.assertRaises(NotEnoughData): + loads(b"\x1A" + b"\x00" * 15) + + def test_large_number_of_secs(self) -> None: + decoded, remaining = loads(b"\x18\x70\x00\x00\x00\x65\x19\x5f\x65") + self.assertEqual(decoded, Raw(t=24, v=b"p\x00\x00\x00e\x19_e")) + self.assertEqual(remaining, b"") + + def test_nanoseconds_not_convertible_to_microseconds(self) -> None: + decoded, remaining = loads( + b"\x19\x40\x00\x00\x00\x65\x19\x5f\x65\x07\x5b\xca\x01" + ) + self.assertEqual(decoded, Raw(t=25, v=b"@\x00\x00\x00e\x19_e\x07[\xca\x01")) + self.assertEqual(remaining, b"") + + def test_throws_when_first_bit_is_in_use(self) -> None: + with self.assertRaises(DecodeError) as cm: + loads(b"\x18\x80\x00\x00\x00\x65\x19\x5f\x65") + self.assertEqual(str(cm.exception), "reserved TAI64 value is in use") + + def test_throws_when_too_many_nanosecs(self) -> None: + with self.assertRaises(DecodeError) as cm: + loads(b"\x19\x40\x00\x00\x00\x65\x19\x5f\x65" + (999999999 + 1).to_bytes(4, "big")) + self.assertEqual(str(cm.exception), "too many nanoseconds") + + def test_throws_when_too_many_attosecs(self) -> None: + with self.assertRaises(DecodeError) as cm: + loads( + b"\x1A\x40\x00\x00\x00\x65\x19\x5f\x65\x07\x5b\xca\x00\xA7\x5b\xca\x00" + ) + self.assertEqual(str(cm.exception), "too many attoseconds") + + def test_leapseconds_match_DJB(self) -> None: + """Check that our pyac.Leapsecs is equally calculated to DJB's + ``curl http://cr.yp.to/libtai/leapsecs.dat | xxd -c 8 -p`` + """ + for our, their in zip( + Leapsecs, + DJB_Leapsecs + ): + self.assertEqual( + our, + int.from_bytes(bytes.fromhex(their), "big") - TAI64Base, + ) + + def test_decode_leapsecond(self) -> None: + for leapsec in DJB_Leapsecs: + decoded, remaining = loads(v=b'\x18' + bytes.fromhex(leapsec), leapsecUTCAllow=True) + self.assertIsInstance(decoded, datetime) + decoded, remaining = loads(v=b'\x18' + bytes.fromhex(leapsec), leapsecUTCAllow=False) + self.assertIsInstance(decoded, Raw) + + +if __name__ == "__main__": + unittest.main() diff --git a/pyac/pyac_tests/test_uuid.py b/pyac/pyac_tests/test_uuid.py new file mode 100644 index 0000000..806f664 --- /dev/null +++ b/pyac/pyac_tests/test_uuid.py @@ -0,0 +1,31 @@ +import unittest + +from uuid import UUID + +from pyac import dumps +from pyac import loads +from pyac import NotEnoughData + + +class TestUUID(unittest.TestCase): + def test_uuid_encode(self) -> None: + uuid_str: str = "12345678-1234-5678-1234-567812345678" + uuid_obj: UUID = UUID(uuid_str) + encoded: bytes = dumps(uuid_obj) + self.assertEqual(encoded, b"\x04\x124Vx\x124Vx\x124Vx\x124Vx") + + def test_uuid_decode(self) -> None: + uuid_str: str = "12345678-1234-5678-1234-567812345678" + encoded: bytes = b"\x04\x124Vx\x124Vx\x124Vx\x124Vx" + decoded: UUID + decoded, _ = loads(encoded) + self.assertEqual(decoded, UUID(uuid_str)) + + def test_uuid_not_enough_data(self) -> None: + encoded: bytes = b"\x04\x124Vx\x124Vx\x124Vx\x124V" + with self.assertRaises(NotEnoughData): + loads(encoded) + + +if __name__ == "__main__": + unittest.main() diff --git a/pyac/test_leapsecs.py b/pyac/test_leapsecs.py deleted file mode 100644 index eff9435..0000000 --- a/pyac/test_leapsecs.py +++ /dev/null @@ -1,45 +0,0 @@ -from unittest import TestCase - -from pyac import Leapsecs -from pyac import TAI64Base - - -class TestLeapsecs(TestCase): - """Check that our pyac.Leapsecs is equally calculated to DJB's - ``curl http://cr.yp.to/libtai/leapsecs.dat | xxd -c 8 -p`` - """ - - def runTest(self): - for our, their in zip(Leapsecs, ( - "4000000004b2580a", - "4000000005a4ec0b", - "4000000007861f8c", - "400000000967530d", - "400000000b48868e", - "400000000d2b0b8f", - "400000000f0c3f10", - "4000000010ed7291", - "4000000012cea612", - "40000000159fca93", - "400000001780fe14", - "4000000019623195", - "400000001d25ea16", - "4000000021dae517", - "40000000259e9d98", - "40000000277fd119", - "400000002a50f59a", - "400000002c32291b", - "400000002e135c9c", - "4000000030e7241d", - "4000000033b8489e", - "40000000368c101f", - "4000000043b71ba0", - "40000000495c07a1", - "400000004fef9322", - "4000000055932da3", - "40000000586846a4", - )): - self.assertEqual( - our, - int.from_bytes(bytes.fromhex(their), "big") - TAI64Base, - )