From: Sergey Matveev Date: Fri, 30 May 2025 10:46:07 +0000 (+0300) Subject: RPC X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=6037bdf85a2d283ccba1ba34ee19cf94a8f4b84608a2a73bb6141a914f677a21;p=keks.git RPC --- diff --git a/go/cm/default.schema.keks.do b/go/default.schema.keks.do similarity index 88% rename from go/cm/default.schema.keks.do rename to go/default.schema.keks.do index fb10673..8fbe538 100644 --- a/go/cm/default.schema.keks.do +++ b/go/default.schema.keks.do @@ -1,5 +1,5 @@ n=${2##*/}.tcl -cd ../../tcl/schemas +cd ../tcl/schemas { echo ../schema.tcl $n sed -n "s/^schema-include \(.*\)$/\1/p" <$n diff --git a/go/rpc/.gitignore b/go/rpc/.gitignore new file mode 100644 index 0000000..62d48e6 --- /dev/null +++ b/go/rpc/.gitignore @@ -0,0 +1 @@ +/rpc.schema.keks diff --git a/go/rpc/all_test.go b/go/rpc/all_test.go new file mode 100644 index 0000000..b936002 --- /dev/null +++ b/go/rpc/all_test.go @@ -0,0 +1,218 @@ +// Most of that is copied from go/src/net/rpc/jsonrpc/all_test.go + +package rpc + +import ( + "errors" + "io" + "net" + "net/rpc" + "testing" + "time" + + "go.cypherpunks.su/keks" +) + +type Args struct { + A, B int +} + +type Reply struct { + C int +} + +type Arith int + +func (t *Arith) Add(args *Args, reply *Reply) error { + reply.C = args.A + args.B + return nil +} + +func (t *Arith) Mul(args *Args, reply *Reply) error { + reply.C = args.A * args.B + return nil +} + +func (t *Arith) Div(args *Args, reply *Reply) error { + if args.B == 0 { + return errors.New("divide by zero") + } + reply.C = args.A / args.B + return nil +} + +func (t *Arith) Error(args *Args, reply *Reply) error { + panic("ERROR") +} + +func init() { + rpc.Register(new(Arith)) +} + +func TestServerNoParams(t *testing.T) { + cli, srv := net.Pipe() + defer cli.Close() + srv.SetDeadline(time.Now().Add(10 * time.Millisecond)) + go ServeConn(srv) + _, err := keks.Encode(cli, []any{123, "Arith.Add"}, nil) + if err != nil { + panic(err) + } + _, err = keks.NewDecoderFromReader(cli, nil).Decode() + if err != io.EOF { + t.Fatal("no EOF") + } +} + +func TestServerEmptyMessage(t *testing.T) { + cli, srv := net.Pipe() + defer cli.Close() + go ServeConn(srv) + _, err := keks.Encode(cli, struct{}{}, nil) + if err != nil { + panic(err) + } + _, err = keks.NewDecoderFromReader(cli, nil).Decode() + if err != io.EOF { + t.Fatal("no EOF") + } +} + +func TestServer(t *testing.T) { + cli, srv := net.Pipe() + defer cli.Close() + go ServeConn(srv) + for i := range 10 { + _, err := keks.Encode(cli, []any{i + 1, "Arith.Add"}, nil) + if err != nil { + panic(err) + } + _, err = keks.Encode(cli, map[string]int{"A": i, "B": i + 1}, nil) + if err != nil { + panic(err) + } + reply, err := keks.NewDecoderFromReader(cli, nil).Decode() + if err != nil { + t.Fatal(err) + } + list := reply.([]any) + if list[0] != uint64(i+1) { + t.Fatal("bad id") + } + if len(list[1].(map[string]any)) != 0 { + t.Fatal("has err", list[1]) + } + reply, err = keks.NewDecoderFromReader(cli, nil).Decode() + if err != nil { + t.Fatal(err) + } + var r Reply + err = keks.Map2Struct(&r, reply.(map[string]any)) + if err != nil { + t.Fatal(err) + } + if r.C != 2*i+1 { + t.Fatal("bad result", r.C) + } + } +} + +func TestClient(t *testing.T) { + cli, srv := net.Pipe() + go ServeConn(srv) + client := NewClient(cli) + defer client.Close() + + args := &Args{7, 8} + reply := new(Reply) + err := client.Call("Arith.Add", args, reply) + if err != nil { + t.Error(err.Error()) + } + if reply.C != args.A+args.B { + t.Error(reply.C, args.A+args.B) + } + + args = &Args{7, 8} + reply = new(Reply) + err = client.Call("Arith.Mul", args, reply) + if err != nil { + t.Error(err.Error()) + } + if reply.C != args.A*args.B { + t.Error(reply.C, args.A*args.B) + } + + // Out of order. + args = &Args{7, 8} + mulReply := new(Reply) + mulCall := client.Go("Arith.Mul", args, mulReply, nil) + addReply := new(Reply) + addCall := client.Go("Arith.Add", args, addReply, nil) + + addCall = <-addCall.Done + if addCall.Error != nil { + t.Error(addCall.Error.Error()) + } + if addReply.C != args.A+args.B { + t.Error(addReply.C, args.A+args.B) + } + + mulCall = <-mulCall.Done + if mulCall.Error != nil { + t.Error(mulCall.Error.Error()) + } + if mulReply.C != args.A*args.B { + t.Error(mulReply.C, args.A*args.B) + } + + // Error test + args = &Args{7, 0} + reply = new(Reply) + err = client.Call("Arith.Div", args, reply) + // expect an error: zero divide + if err == nil { + t.Error("expected error") + } else if err.Error() != "divide by zero" { + t.Error("expected divide by zero error; got", err) + } + + args = &Args{7, 7} + reply = new(Reply) + err = client.Call("Arith.Div", args, reply) + if err != nil { + t.Error("unexpected error") + } +} + +func TestMalformedInput(t *testing.T) { + cli, srv := net.Pipe() + go cli.Write([]byte(`{id:1}`)) // invalid json + ServeConn(srv) // must return, not loop +} + +func TestMalformedOutput(t *testing.T) { + cli, srv := net.Pipe() + go func() { + buf0, err := keks.EncodeBuf([]any{1, nil}, nil) + if err != nil { + panic(err) + } + buf1, err := keks.EncodeBuf(struct{}{}, nil) + if err != nil { + panic(err) + } + srv.Write(append(buf0, buf1...)) + }() + go io.ReadAll(srv) + + client := NewClient(cli) + defer client.Close() + + args := &Args{7, 8} + reply := new(Reply) + err := client.Call("Arith.Add", args, reply) + if err == nil { + t.Error("expected error") + } +} diff --git a/go/rpc/client.go b/go/rpc/client.go new file mode 100644 index 0000000..c556dc6 --- /dev/null +++ b/go/rpc/client.go @@ -0,0 +1,112 @@ +// GoKEKS -- Go KEKS codec implementation +// Copyright (C) 2024-2025 Sergey Matveev +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, version 3 of the License. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this program. If not, see . + +package rpc + +import ( + "fmt" + "io" + "net/rpc" + "sync" + + "go.cypherpunks.su/keks" + "go.cypherpunks.su/keks/schema" +) + +type ClientCodec struct { + conn io.ReadWriteCloser + pending map[uint64]string + pendingM sync.Mutex +} + +func NewClientCodec(conn io.ReadWriteCloser) *ClientCodec { + return &ClientCodec{conn: conn, pending: make(map[uint64]string)} +} + +func (cc *ClientCodec) WriteRequest(req *rpc.Request, data any) (err error) { + cc.pendingM.Lock() + cc.pending[req.Seq] = req.ServiceMethod + cc.pendingM.Unlock() + if _, err = keks.Encode(cc.conn, []any{req.Seq + 1, req.ServiceMethod}, nil); err != nil { + return fmt.Errorf("keks/rpc: WriteRequest: %w", err) + } + if _, err = keks.Encode(cc.conn, data, nil); err != nil { + return fmt.Errorf("keks/rpc: WriteRequest: %w", err) + } + return nil +} + +func (cc *ClientCodec) ReadResponseHeader(resp *rpc.Response) (err error) { + d := keks.NewDecoderFromReader(cc.conn, nil) + var r any + if r, err = d.Decode(); err != nil { + return fmt.Errorf("keks/rpc: ReadResponseHeader: %w", err) + } + if err = schema.Check("resp", RPCSchemas, r); err != nil { + return fmt.Errorf("keks/rpc: ReadResponseHeader: %w", err) + } + list := r.([]any) + resp.Seq = list[0].(uint64) - 1 + var ok bool + cc.pendingM.Lock() + resp.ServiceMethod, ok = cc.pending[resp.Seq] + if ok { + delete(cc.pending, resp.Seq) + } + cc.pendingM.Unlock() + if !ok { + return fmt.Errorf("keks/rpc: ReadResponseHeader: unknown id: %d", resp.Seq) + } + e := list[1].(map[string]any) + if len(e) > 0 { + msg, ok := e["msg"] + if ok { + resp.Error = msg.(string) + if len(resp.Error) == 0 { + resp.Error = "unspecified" + } + } else { + code, ok := e["code"] + if ok { + resp.Error = fmt.Sprintf("code: %d", code.(uint64)) + } else { + resp.Error = "unspecified" + } + } + } + return nil +} + +func (cc *ClientCodec) ReadResponseBody(dst any) (err error) { + d := keks.NewDecoderFromReader(cc.conn, nil) + if _, err = d.Parse(); err != nil { + return fmt.Errorf("keks/rpc: ReadResponseBody: %w", err) + } + if dst == nil { + return nil + } + if err = d.UnmarshalStruct(dst); err != nil { + return fmt.Errorf("keks/rpc: ReadResponseBody: %w", err) + } + return nil +} + +func (cc *ClientCodec) Close() error { + return cc.conn.Close() +} + +func NewClient(conn io.ReadWriteCloser) *rpc.Client { + return rpc.NewClientWithCodec(NewClientCodec(conn)) +} diff --git a/go/rpc/err.go b/go/rpc/err.go new file mode 100644 index 0000000..51ca34c --- /dev/null +++ b/go/rpc/err.go @@ -0,0 +1,6 @@ +package rpc + +type Error struct { + Msg *string `keks:"msg,omitempty"` + Code *uint64 `keks:"code,omitempty"` +} diff --git a/go/rpc/schema.go b/go/rpc/schema.go new file mode 100644 index 0000000..fedfdce --- /dev/null +++ b/go/rpc/schema.go @@ -0,0 +1,41 @@ +// GoKEKS/CM -- KEKS-encoded cryptographic messages +// Copyright (C) 2024-2025 Sergey Matveev +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, version 3 of the License. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this program. If not, see . + +package rpc + +import ( + _ "embed" + + "go.cypherpunks.su/keks" + "go.cypherpunks.su/keks/schema" +) + +//go:embed rpc.schema.keks +var RPCSchemasRaw []byte + +var RPCSchemas map[string][][]any + +func init() { + var magic keks.Magic + magic, RPCSchemasRaw = keks.StripMagic(RPCSchemasRaw) + if magic != schema.Magic { + panic("wrong magic in rpc.schema.keks") + } + if err := keks.NewDecoderFromBytes( + RPCSchemasRaw, nil, + ).DecodeStruct(&RPCSchemas); err != nil { + panic(err) + } +} diff --git a/go/rpc/server.go b/go/rpc/server.go new file mode 100644 index 0000000..9e0aadc --- /dev/null +++ b/go/rpc/server.go @@ -0,0 +1,100 @@ +// GoKEKS -- Go KEKS codec implementation +// Copyright (C) 2024-2025 Sergey Matveev +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, version 3 of the License. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this program. If not, see . + +package rpc + +import ( + "fmt" + "io" + "net/rpc" + "sync" + + "go.cypherpunks.su/keks" + "go.cypherpunks.su/keks/schema" +) + +type ServerCodec struct { + conn io.ReadWriteCloser + pending map[uint64]uint64 + pendingM sync.Mutex + pendingSeq uint64 +} + +func NewServerCodec(conn io.ReadWriteCloser) *ServerCodec { + return &ServerCodec{conn: conn, pending: make(map[uint64]uint64)} +} + +func (sc *ServerCodec) ReadRequestHeader(req *rpc.Request) (err error) { + d := keks.NewDecoderFromReader(sc.conn, nil) + var r any + if r, err = d.Decode(); err != nil { + return fmt.Errorf("keks/rpc: ReadRequestHeader: %w", err) + } + if err = schema.Check("req", RPCSchemas, r); err != nil { + return fmt.Errorf("keks/rpc: ReadRequestHeader: %w", err) + } + list := r.([]any) + req.Seq = list[0].(uint64) + sc.pendingM.Lock() + sc.pendingSeq++ + sc.pending[sc.pendingSeq] = req.Seq + req.Seq = sc.pendingSeq + sc.pendingM.Unlock() + req.ServiceMethod = list[1].(string) + return nil +} + +func (sc *ServerCodec) ReadRequestBody(dst any) (err error) { + d := keks.NewDecoderFromReader(sc.conn, nil) + if _, err = d.Parse(); err != nil { + return fmt.Errorf("keks/rpc: ReadRequestBody: %w", err) + } + if dst == nil { + return nil + } + if err = d.UnmarshalStruct(dst); err != nil { + return fmt.Errorf("keks/rpc: ReadRequestBody: %w", err) + } + return nil +} + +func (sc *ServerCodec) WriteResponse(resp *rpc.Response, data any) (err error) { + sc.pendingM.Lock() + id, ok := sc.pending[resp.Seq] + if !ok { + panic("unknown id") + } + delete(sc.pending, resp.Seq) + sc.pendingM.Unlock() + e := Error{} + if resp.Error != "" { + e.Msg = &resp.Error + } + if _, err = keks.Encode(sc.conn, []any{id, e}, nil); err != nil { + return fmt.Errorf("keks/rpc: WriteResponse: %w", err) + } + if _, err = keks.Encode(sc.conn, data, nil); err != nil { + return fmt.Errorf("keks/rpc: WriteResponse: %w", err) + } + return nil +} + +func (sc *ServerCodec) Close() error { + return sc.conn.Close() +} + +func ServeConn(conn io.ReadWriteCloser) { + rpc.ServeCodec(NewServerCodec(conn)) +} diff --git a/spec/RPC b/spec/RPC new file mode 100644 index 0000000..acc0186 --- /dev/null +++ b/spec/RPC @@ -0,0 +1,13 @@ +Here is suggestion to use KEKS codec with a simple client-server +Remote Procedure Call interface. It is very similar to JSON-RPC 2.0. +=> https://www.jsonrpc.org/ JSON-RPC + +Request object is a LIST followed by MAP with arbitrary values, maybe +empty. "id" is a positive 64-bit integer, that must be unique during the +whole session's connection. + +Response object is also a LIST followed by MAP with arbitrary values. +Response's "id" must be the same as in corresponding request. Empty +error map means no error occurred. + +<< [schemas/rpc.tcl] diff --git a/spec/index b/spec/index index 1d4926c..985681e 100644 --- a/spec/index +++ b/spec/index @@ -29,10 +29,11 @@ requirements below. <<[ComparisonWithOtherCodecs] -[INSTALL] + [INSTALL] [encoding/] -[schema/] -- structure validation against schemas -[cm/] -- cryptographic messages -[THANKS] + [schema/] -- Structure validation against schemas + [cm/] -- Cryptographic Messages + [RPC] -- Remote Procedure Call + [THANKS] Copyright © 2024-2025 Sergey Matveev diff --git a/tcl/schemas/rpc.tcl b/tcl/schemas/rpc.tcl new file mode 100644 index 0000000..505a6b3 --- /dev/null +++ b/tcl/schemas/rpc.tcl @@ -0,0 +1,16 @@ +req { + {field . {list} len=2} + {field 0 {int} >0} {# id} + {field 1 {str} >0} {# method} +} + +resp { + {field . {list} len=2} + {field 0 {int} >0} {# id} + {field 1 {with err}} +} + +err { + {field msg {str} >0 optional} + {field code {int} >0 optional} +}