From: Sergey Matveev Date: Wed, 4 Jun 2025 08:59:30 +0000 (+0300) Subject: UUID-ed RPC X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=f90241c2dec8ddc1160dad51e9e4497fd7765a5ed7ffb7acbe6f865f3f612897;p=keks.git UUID-ed RPC --- diff --git a/go/rpc/all_test.go b/go/rpc/all_test.go index 624d160..7f5b5a8 100644 --- a/go/rpc/all_test.go +++ b/go/rpc/all_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/google/uuid" "go.cypherpunks.su/keks" ) @@ -83,11 +84,11 @@ func TestServer(t *testing.T) { defer cli.Close() go ServeConn(srv) for i := range 10 { - _, err := keks.Encode(cli, []any{i + 1, "Arith.Add"}, nil) + id, err := uuid.NewRandom() if err != nil { panic(err) } - _, err = keks.Encode(cli, map[string]int{"A": i, "B": i + 1}, nil) + _, err = keks.Encode(cli, []any{"c", id, "Arith.Add", Args{i, i + 1}}, nil) if err != nil { panic(err) } @@ -96,18 +97,17 @@ func TestServer(t *testing.T) { t.Fatal(err) } list := reply.([]any) - if list[0] != uint64(i+1) { - t.Fatal("bad id") + if list[0].(string) != "r" { + t.Fatal("bad r") } - if len(list[1].(string)) != 0 { - t.Fatal("has err", list[1]) + if list[1].(*keks.Hexlet).UUID() != id { + t.Fatal("bad id") } - reply, err = keks.NewDecoderFromReader(cli, nil).Decode() - if err != nil { - t.Fatal(err) + if list[2].(string) != "" { + t.Fatal("has err", list[2]) } var r Reply - err = keks.Map2Struct(&r, reply.(map[string]any)) + err = keks.Map2Struct(&r, list[3].(map[string]any)) if err != nil { t.Fatal(err) } diff --git a/go/rpc/client.go b/go/rpc/client.go index 309f13a..3eaa3dd 100644 --- a/go/rpc/client.go +++ b/go/rpc/client.go @@ -16,36 +16,51 @@ package rpc import ( + "bytes" + "crypto/rand" "fmt" "io" "net/rpc" "sync" + "github.com/google/uuid" + "go.cypherpunks.su/keks" "go.cypherpunks.su/keks/schema" ) +type SeqAndMethod struct { + method string + seq uint64 +} + type ClientCodec struct { conn io.ReadWriteCloser - pending map[uint64]string + body map[string]any + pending map[uuid.UUID]SeqAndMethod + rnd [16]byte pendingM sync.Mutex } func NewClientCodec(conn io.ReadWriteCloser) *ClientCodec { - return &ClientCodec{conn: conn, pending: make(map[uint64]string)} + cc := ClientCodec{ + conn: conn, + pending: make(map[uuid.UUID]SeqAndMethod), + } + rand.Read(cc.rnd[:]) + return &cc } func (cc *ClientCodec) WriteRequest(req *rpc.Request, data any) (err error) { + var id uuid.UUID + id, err = uuid.NewV7FromReader(bytes.NewReader(cc.rnd[:])) + if err != nil { + panic(err) + } cc.pendingM.Lock() - cc.pending[req.Seq] = req.ServiceMethod + cc.pending[id] = SeqAndMethod{seq: req.Seq, method: req.ServiceMethod} cc.pendingM.Unlock() - if req.Seq == 0xFFFFFFFFFFFFFFFF { - panic("keks/rpc: WriteRequest: req.Seq overflow") - } - if _, err = keks.Encode(cc.conn, []any{req.Seq + 1, req.ServiceMethod}, nil); err != nil { - return fmt.Errorf("keks/rpc: WriteRequest: %w", err) - } - if _, err = keks.Encode(cc.conn, data, nil); err != nil { + if _, err = keks.Encode(cc.conn, []any{"c", id, req.ServiceMethod, data}, nil); err != nil { return fmt.Errorf("keks/rpc: WriteRequest: %w", err) } return nil @@ -57,38 +72,31 @@ func (cc *ClientCodec) ReadResponseHeader(resp *rpc.Response) (err error) { if r, err = d.Decode(); err != nil { return fmt.Errorf("keks/rpc: ReadResponseHeader: %w", err) } - if err = schema.Check("resp", RPCSchemas, r); err != nil { + if err = schema.Check("reply", RPCSchemas, r); err != nil { return fmt.Errorf("keks/rpc: ReadResponseHeader: %w", err) } list := r.([]any) - resp.Seq = list[0].(uint64) - if resp.Seq == 0 { - panic("keks/rpc: ReadResponseHeader: resp.Seq is zero") - } - resp.Seq-- - var ok bool + id := list[1].(*keks.Hexlet).UUID() cc.pendingM.Lock() - resp.ServiceMethod, ok = cc.pending[resp.Seq] + seqAndMethod, ok := cc.pending[id] if ok { - delete(cc.pending, resp.Seq) + delete(cc.pending, id) } cc.pendingM.Unlock() if !ok { - return fmt.Errorf("keks/rpc: ReadResponseHeader: unknown id: %d", resp.Seq) + return fmt.Errorf("keks/rpc: ReadResponseHeader: unknown id: %s", id) } - resp.Error = list[1].(string) + resp.Seq, resp.ServiceMethod = seqAndMethod.seq, seqAndMethod.method + resp.Error = list[2].(string) + cc.body = list[3].(map[string]any) return nil } func (cc *ClientCodec) ReadResponseBody(dst any) (err error) { - d := keks.NewDecoderFromReader(cc.conn, nil) - if _, err = d.Parse(); err != nil { - return fmt.Errorf("keks/rpc: ReadResponseBody: %w", err) - } if dst == nil { return nil } - if err = d.UnmarshalStruct(dst); err != nil { + if err = keks.Map2Struct(dst, cc.body); err != nil { return fmt.Errorf("keks/rpc: ReadResponseBody: %w", err) } return nil diff --git a/go/rpc/server.go b/go/rpc/server.go index 1a9f936..32e81d3 100644 --- a/go/rpc/server.go +++ b/go/rpc/server.go @@ -21,19 +21,21 @@ import ( "net/rpc" "sync" + "github.com/google/uuid" "go.cypherpunks.su/keks" "go.cypherpunks.su/keks/schema" ) type ServerCodec struct { - conn io.ReadWriteCloser - pending map[uint64]uint64 - pendingM sync.Mutex - pendingSeq uint64 + conn io.ReadWriteCloser + ids map[uint64]uuid.UUID + body map[string]any + seq uint64 + idsM sync.Mutex } func NewServerCodec(conn io.ReadWriteCloser) *ServerCodec { - return &ServerCodec{conn: conn, pending: make(map[uint64]uint64)} + return &ServerCodec{conn: conn, ids: make(map[uint64]uuid.UUID)} } func (sc *ServerCodec) ReadRequestHeader(req *rpc.Request) (err error) { @@ -42,46 +44,39 @@ func (sc *ServerCodec) ReadRequestHeader(req *rpc.Request) (err error) { if r, err = d.Decode(); err != nil { return fmt.Errorf("keks/rpc: ReadRequestHeader: %w", err) } - if err = schema.Check("req", RPCSchemas, r); err != nil { + if err = schema.Check("call", RPCSchemas, r); err != nil { return fmt.Errorf("keks/rpc: ReadRequestHeader: %w", err) } list := r.([]any) - req.Seq = list[0].(uint64) - sc.pendingM.Lock() - sc.pendingSeq++ - sc.pending[sc.pendingSeq] = req.Seq - req.Seq = sc.pendingSeq - sc.pendingM.Unlock() - req.ServiceMethod = list[1].(string) + sc.idsM.Lock() + sc.seq++ + req.Seq = sc.seq + sc.ids[sc.seq] = list[1].(*keks.Hexlet).UUID() + sc.idsM.Unlock() + req.ServiceMethod = list[2].(string) + sc.body = list[3].(map[string]any) return nil } func (sc *ServerCodec) ReadRequestBody(dst any) (err error) { - d := keks.NewDecoderFromReader(sc.conn, nil) - if _, err = d.Parse(); err != nil { - return fmt.Errorf("keks/rpc: ReadRequestBody: %w", err) - } if dst == nil { return nil } - if err = d.UnmarshalStruct(dst); err != nil { + if err = keks.Map2Struct(dst, sc.body); err != nil { return fmt.Errorf("keks/rpc: ReadRequestBody: %w", err) } return nil } func (sc *ServerCodec) WriteResponse(resp *rpc.Response, data any) (err error) { - sc.pendingM.Lock() - id, ok := sc.pending[resp.Seq] + sc.idsM.Lock() + id, ok := sc.ids[resp.Seq] if !ok { panic("unknown id") } - delete(sc.pending, resp.Seq) - sc.pendingM.Unlock() - if _, err = keks.Encode(sc.conn, []any{id, resp.Error}, nil); err != nil { - return fmt.Errorf("keks/rpc: WriteResponse: %w", err) - } - if _, err = keks.Encode(sc.conn, data, nil); err != nil { + delete(sc.ids, resp.Seq) + sc.idsM.Unlock() + if _, err = keks.Encode(sc.conn, []any{"r", id, resp.Error, data}, nil); err != nil { return fmt.Errorf("keks/rpc: WriteResponse: %w", err) } return nil diff --git a/spec/RPC b/spec/RPC index e705624..a841256 100644 --- a/spec/RPC +++ b/spec/RPC @@ -1,13 +1,14 @@ Here is suggestion to use KEKS codec with a simple client-server -Remote Procedure Call interface. It is very similar to JSON-RPC 2.0. +Remote Procedure Call interface. It is very similar to => https://www.jsonrpc.org/ JSON-RPC +=> https://github.com/msgpack-rpc/msgpack-rpc MessagePack-RPC -Request object is a LIST followed by MAP with arbitrary values, maybe -empty. "id" is a positive 64-bit integer, that must be unique during the -whole session's connection. - -Response object is also a LIST followed by MAP with arbitrary values, -corresponding error additional data. Response's "id" must be the same as -in corresponding request. Empty error string means no error occurred. +Call, reply and notify message headers ([schema/tcl] format): << [schemas/rpc.tcl] + +It is advisable to use UUIDv7 for call/notify's id. +Reply's id must be the same as in corresponding call. +Empty error string means no error occurred. + +Notification differs only by its type and no reply expectation. diff --git a/spec/schema/tcl b/spec/schema/tcl index 51f65eb..73b8555 100644 --- a/spec/schema/tcl +++ b/spec/schema/tcl @@ -51,7 +51,7 @@ element has specified string/binary value (use "len=" for integers). integer values, you choose one of: s, ms, us, ns, ps, fs, as. "utc" issues UTC command. -"of s" argument issues checking of EACH element of the list or map +{of s} argument issues checking of EACH element of the list or map against the specified schema, or against specified type if "s" is a known type. diff --git a/tcl/schemas/rpc.tcl b/tcl/schemas/rpc.tcl index 6780272..ecf8050 100644 --- a/tcl/schemas/rpc.tcl +++ b/tcl/schemas/rpc.tcl @@ -1,11 +1,23 @@ -req { - {field . {list} len=2} - {field 0 {int} >0} {# id} - {field 1 {str} >0} {# method} +call { + {field . {list} len=4} + {field 0 {str} =c} + {field 1 {hexlet}} {# id} + {field 2 {str} >0} {# method} + {field 3 {map}} {# params} } -resp { - {field . {list} len=2} - {field 0 {int} >0} {# id} - {field 1 {str}} {# error} +reply { + {field . {list} len=4} + {field 0 {str} =r} + {field 1 {hexlet}} {# id} + {field 2 {str}} {# error} + {field 3 {map}} {# body} +} + +notify { + {field . {list} len=4} + {field 0 {str} =n} + {field 1 {hexlet}} {# id} + {field 2 {str} >0} {# method} + {field 3 {map}} {# body} }