]> Cypherpunks repositories - keks.git/commitdiff
UUID-ed RPC
authorSergey Matveev <stargrave@stargrave.org>
Wed, 4 Jun 2025 08:59:30 +0000 (11:59 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Wed, 4 Jun 2025 09:02:34 +0000 (12:02 +0300)
go/rpc/all_test.go
go/rpc/client.go
go/rpc/server.go
spec/RPC
spec/schema/tcl
tcl/schemas/rpc.tcl

index 624d160c0d7ba1becf02f2d3a10d07923b4ca22d879e70953b1e8660a96c2677..7f5b5a88ebe5d66fb989bae02738c35eeeda416af68ede8ec45b2823feadfdb5 100644 (file)
@@ -10,6 +10,7 @@ import (
        "testing"
        "time"
 
+       "github.com/google/uuid"
        "go.cypherpunks.su/keks"
 )
 
@@ -83,11 +84,11 @@ func TestServer(t *testing.T) {
        defer cli.Close()
        go ServeConn(srv)
        for i := range 10 {
-               _, err := keks.Encode(cli, []any{i + 1, "Arith.Add"}, nil)
+               id, err := uuid.NewRandom()
                if err != nil {
                        panic(err)
                }
-               _, err = keks.Encode(cli, map[string]int{"A": i, "B": i + 1}, nil)
+               _, err = keks.Encode(cli, []any{"c", id, "Arith.Add", Args{i, i + 1}}, nil)
                if err != nil {
                        panic(err)
                }
@@ -96,18 +97,17 @@ func TestServer(t *testing.T) {
                        t.Fatal(err)
                }
                list := reply.([]any)
-               if list[0] != uint64(i+1) {
-                       t.Fatal("bad id")
+               if list[0].(string) != "r" {
+                       t.Fatal("bad r")
                }
-               if len(list[1].(string)) != 0 {
-                       t.Fatal("has err", list[1])
+               if list[1].(*keks.Hexlet).UUID() != id {
+                       t.Fatal("bad id")
                }
-               reply, err = keks.NewDecoderFromReader(cli, nil).Decode()
-               if err != nil {
-                       t.Fatal(err)
+               if list[2].(string) != "" {
+                       t.Fatal("has err", list[2])
                }
                var r Reply
-               err = keks.Map2Struct(&r, reply.(map[string]any))
+               err = keks.Map2Struct(&r, list[3].(map[string]any))
                if err != nil {
                        t.Fatal(err)
                }
index 309f13ae93d5a3b52d6bfcf64df19c80acf22fa82d25454fe5030382931846de..3eaa3dd6047ae0ec18b43ad8a60333eda110a8fa2d0c7551993b5b63fa325ae8 100644 (file)
 package rpc
 
 import (
+       "bytes"
+       "crypto/rand"
        "fmt"
        "io"
        "net/rpc"
        "sync"
 
+       "github.com/google/uuid"
+
        "go.cypherpunks.su/keks"
        "go.cypherpunks.su/keks/schema"
 )
 
+type SeqAndMethod struct {
+       method string
+       seq    uint64
+}
+
 type ClientCodec struct {
        conn     io.ReadWriteCloser
-       pending  map[uint64]string
+       body     map[string]any
+       pending  map[uuid.UUID]SeqAndMethod
+       rnd      [16]byte
        pendingM sync.Mutex
 }
 
 func NewClientCodec(conn io.ReadWriteCloser) *ClientCodec {
-       return &ClientCodec{conn: conn, pending: make(map[uint64]string)}
+       cc := ClientCodec{
+               conn:    conn,
+               pending: make(map[uuid.UUID]SeqAndMethod),
+       }
+       rand.Read(cc.rnd[:])
+       return &cc
 }
 
 func (cc *ClientCodec) WriteRequest(req *rpc.Request, data any) (err error) {
+       var id uuid.UUID
+       id, err = uuid.NewV7FromReader(bytes.NewReader(cc.rnd[:]))
+       if err != nil {
+               panic(err)
+       }
        cc.pendingM.Lock()
-       cc.pending[req.Seq] = req.ServiceMethod
+       cc.pending[id] = SeqAndMethod{seq: req.Seq, method: req.ServiceMethod}
        cc.pendingM.Unlock()
-       if req.Seq == 0xFFFFFFFFFFFFFFFF {
-               panic("keks/rpc: WriteRequest: req.Seq overflow")
-       }
-       if _, err = keks.Encode(cc.conn, []any{req.Seq + 1, req.ServiceMethod}, nil); err != nil {
-               return fmt.Errorf("keks/rpc: WriteRequest: %w", err)
-       }
-       if _, err = keks.Encode(cc.conn, data, nil); err != nil {
+       if _, err = keks.Encode(cc.conn, []any{"c", id, req.ServiceMethod, data}, nil); err != nil {
                return fmt.Errorf("keks/rpc: WriteRequest: %w", err)
        }
        return nil
@@ -57,38 +72,31 @@ func (cc *ClientCodec) ReadResponseHeader(resp *rpc.Response) (err error) {
        if r, err = d.Decode(); err != nil {
                return fmt.Errorf("keks/rpc: ReadResponseHeader: %w", err)
        }
-       if err = schema.Check("resp", RPCSchemas, r); err != nil {
+       if err = schema.Check("reply", RPCSchemas, r); err != nil {
                return fmt.Errorf("keks/rpc: ReadResponseHeader: %w", err)
        }
        list := r.([]any)
-       resp.Seq = list[0].(uint64)
-       if resp.Seq == 0 {
-               panic("keks/rpc: ReadResponseHeader: resp.Seq is zero")
-       }
-       resp.Seq--
-       var ok bool
+       id := list[1].(*keks.Hexlet).UUID()
        cc.pendingM.Lock()
-       resp.ServiceMethod, ok = cc.pending[resp.Seq]
+       seqAndMethod, ok := cc.pending[id]
        if ok {
-               delete(cc.pending, resp.Seq)
+               delete(cc.pending, id)
        }
        cc.pendingM.Unlock()
        if !ok {
-               return fmt.Errorf("keks/rpc: ReadResponseHeader: unknown id: %d", resp.Seq)
+               return fmt.Errorf("keks/rpc: ReadResponseHeader: unknown id: %s", id)
        }
-       resp.Error = list[1].(string)
+       resp.Seq, resp.ServiceMethod = seqAndMethod.seq, seqAndMethod.method
+       resp.Error = list[2].(string)
+       cc.body = list[3].(map[string]any)
        return nil
 }
 
 func (cc *ClientCodec) ReadResponseBody(dst any) (err error) {
-       d := keks.NewDecoderFromReader(cc.conn, nil)
-       if _, err = d.Parse(); err != nil {
-               return fmt.Errorf("keks/rpc: ReadResponseBody: %w", err)
-       }
        if dst == nil {
                return nil
        }
-       if err = d.UnmarshalStruct(dst); err != nil {
+       if err = keks.Map2Struct(dst, cc.body); err != nil {
                return fmt.Errorf("keks/rpc: ReadResponseBody: %w", err)
        }
        return nil
index 1a9f936e6d8281a8d39cd8c66843a481b228a3897f7fe40ccef3deb7d0ff8c1d..32e81d390fd0224de5a0a176de03802d81575255086eccc4d003332439c956b8 100644 (file)
@@ -21,19 +21,21 @@ import (
        "net/rpc"
        "sync"
 
+       "github.com/google/uuid"
        "go.cypherpunks.su/keks"
        "go.cypherpunks.su/keks/schema"
 )
 
 type ServerCodec struct {
-       conn       io.ReadWriteCloser
-       pending    map[uint64]uint64
-       pendingM   sync.Mutex
-       pendingSeq uint64
+       conn io.ReadWriteCloser
+       ids  map[uint64]uuid.UUID
+       body map[string]any
+       seq  uint64
+       idsM sync.Mutex
 }
 
 func NewServerCodec(conn io.ReadWriteCloser) *ServerCodec {
-       return &ServerCodec{conn: conn, pending: make(map[uint64]uint64)}
+       return &ServerCodec{conn: conn, ids: make(map[uint64]uuid.UUID)}
 }
 
 func (sc *ServerCodec) ReadRequestHeader(req *rpc.Request) (err error) {
@@ -42,46 +44,39 @@ func (sc *ServerCodec) ReadRequestHeader(req *rpc.Request) (err error) {
        if r, err = d.Decode(); err != nil {
                return fmt.Errorf("keks/rpc: ReadRequestHeader: %w", err)
        }
-       if err = schema.Check("req", RPCSchemas, r); err != nil {
+       if err = schema.Check("call", RPCSchemas, r); err != nil {
                return fmt.Errorf("keks/rpc: ReadRequestHeader: %w", err)
        }
        list := r.([]any)
-       req.Seq = list[0].(uint64)
-       sc.pendingM.Lock()
-       sc.pendingSeq++
-       sc.pending[sc.pendingSeq] = req.Seq
-       req.Seq = sc.pendingSeq
-       sc.pendingM.Unlock()
-       req.ServiceMethod = list[1].(string)
+       sc.idsM.Lock()
+       sc.seq++
+       req.Seq = sc.seq
+       sc.ids[sc.seq] = list[1].(*keks.Hexlet).UUID()
+       sc.idsM.Unlock()
+       req.ServiceMethod = list[2].(string)
+       sc.body = list[3].(map[string]any)
        return nil
 }
 
 func (sc *ServerCodec) ReadRequestBody(dst any) (err error) {
-       d := keks.NewDecoderFromReader(sc.conn, nil)
-       if _, err = d.Parse(); err != nil {
-               return fmt.Errorf("keks/rpc: ReadRequestBody: %w", err)
-       }
        if dst == nil {
                return nil
        }
-       if err = d.UnmarshalStruct(dst); err != nil {
+       if err = keks.Map2Struct(dst, sc.body); err != nil {
                return fmt.Errorf("keks/rpc: ReadRequestBody: %w", err)
        }
        return nil
 }
 
 func (sc *ServerCodec) WriteResponse(resp *rpc.Response, data any) (err error) {
-       sc.pendingM.Lock()
-       id, ok := sc.pending[resp.Seq]
+       sc.idsM.Lock()
+       id, ok := sc.ids[resp.Seq]
        if !ok {
                panic("unknown id")
        }
-       delete(sc.pending, resp.Seq)
-       sc.pendingM.Unlock()
-       if _, err = keks.Encode(sc.conn, []any{id, resp.Error}, nil); err != nil {
-               return fmt.Errorf("keks/rpc: WriteResponse: %w", err)
-       }
-       if _, err = keks.Encode(sc.conn, data, nil); err != nil {
+       delete(sc.ids, resp.Seq)
+       sc.idsM.Unlock()
+       if _, err = keks.Encode(sc.conn, []any{"r", id, resp.Error, data}, nil); err != nil {
                return fmt.Errorf("keks/rpc: WriteResponse: %w", err)
        }
        return nil
index e7056244fcba2f57829691eead8d9df1f3caa09d0eb6bb5ff0888dc1a2bffca7..a8412560258bbc51b7ee123c647b68177d99d6889d9e2b9e1e62df729735d3c9 100644 (file)
--- a/spec/RPC
+++ b/spec/RPC
@@ -1,13 +1,14 @@
 Here is suggestion to use KEKS codec with a simple client-server
-Remote Procedure Call interface. It is very similar to JSON-RPC 2.0.
+Remote Procedure Call interface. It is very similar to
 => https://www.jsonrpc.org/ JSON-RPC\r
+=> https://github.com/msgpack-rpc/msgpack-rpc MessagePack-RPC\r
 
-Request object is a LIST followed by MAP with arbitrary values, maybe
-empty. "id" is a positive 64-bit integer, that must be unique during the
-whole session's connection.
-
-Response object is also a LIST followed by MAP with arbitrary values,
-corresponding error additional data. Response's "id" must be the same as
-in corresponding request. Empty error string means no error occurred.
+Call, reply and notify message headers ([schema/tcl] format):
 
 <<    [schemas/rpc.tcl]\r
+
+It is advisable to use UUIDv7 for call/notify's id.
+Reply's id must be the same as in corresponding call.
+Empty error string means no error occurred.
+
+Notification differs only by its type and no reply expectation.
index 51f65eb5b1c04395f3f6b08cf5aa6d828cb1cbd62c88c966423e03c03e155190..73b8555ade64961b0945a0cca7598413901596872369da1d08df19eba722eac4 100644 (file)
@@ -51,7 +51,7 @@ element has specified string/binary value (use "len=" for integers).
 integer values, you choose one of: s, ms, us, ns, ps, fs, as.
 "utc" issues UTC command.
 
-"of s" argument issues checking of EACH element of the list or map
+{of s} argument issues checking of EACH element of the list or map
 against the specified schema, or against specified type if "s" is a
 known type.
 
index 6780272fc6357128f8f73dbe9760e8848d375560b004d806b7166eb57d558713..ecf80508e5870532fdc58bdba9460a26964eab9036b24982a4370fd9d0f0c53e 100644 (file)
@@ -1,11 +1,23 @@
-req {
-    {field . {list} len=2}
-    {field 0 {int} >0} {# id}
-    {field 1 {str} >0} {# method}
+call {
+    {field . {list} len=4}
+    {field 0 {str} =c}
+    {field 1 {hexlet}} {# id}
+    {field 2 {str} >0} {# method}
+    {field 3 {map}}    {# params}
 }
 
-resp {
-    {field . {list} len=2}
-    {field 0 {int} >0} {# id}
-    {field 1 {str}} {# error}
+reply {
+    {field . {list} len=4}
+    {field 0 {str} =r}
+    {field 1 {hexlet}} {# id}
+    {field 2 {str}}    {# error}
+    {field 3 {map}}    {# body}
+}
+
+notify {
+    {field . {list} len=4}
+    {field 0 {str} =n}
+    {field 1 {hexlet}} {# id}
+    {field 2 {str} >0} {# method}
+    {field 3 {map}}    {# body}
 }