]> Cypherpunks repositories - keks.git/commitdiff
RPC
authorSergey Matveev <stargrave@stargrave.org>
Fri, 30 May 2025 10:46:07 +0000 (13:46 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Fri, 30 May 2025 10:46:07 +0000 (13:46 +0300)
go/default.schema.keks.do [moved from go/cm/default.schema.keks.do with 88% similarity]
go/rpc/.gitignore [new file with mode: 0644]
go/rpc/all_test.go [new file with mode: 0644]
go/rpc/client.go [new file with mode: 0644]
go/rpc/err.go [new file with mode: 0644]
go/rpc/schema.go [new file with mode: 0644]
go/rpc/server.go [new file with mode: 0644]
spec/RPC [new file with mode: 0644]
spec/index
tcl/schemas/rpc.tcl [new file with mode: 0644]

similarity index 88%
rename from go/cm/default.schema.keks.do
rename to go/default.schema.keks.do
index fb10673f456bcd1848d9adfdec004352842c14a3dc37fecffdd1c1904fe51640..8fbe5385663995f2958b66bd650de37a6538dcbb028d8292c99e85a5e7e9265a 100644 (file)
@@ -1,5 +1,5 @@
 n=${2##*/}.tcl
-cd ../../tcl/schemas
+cd ../tcl/schemas
 {
     echo  ../schema.tcl $n
     sed -n "s/^schema-include \(.*\)$/\1/p" <$n
diff --git a/go/rpc/.gitignore b/go/rpc/.gitignore
new file mode 100644 (file)
index 0000000..62d48e6
--- /dev/null
@@ -0,0 +1 @@
+/rpc.schema.keks
diff --git a/go/rpc/all_test.go b/go/rpc/all_test.go
new file mode 100644 (file)
index 0000000..b936002
--- /dev/null
@@ -0,0 +1,218 @@
+// Most of that is copied from go/src/net/rpc/jsonrpc/all_test.go
+
+package rpc
+
+import (
+       "errors"
+       "io"
+       "net"
+       "net/rpc"
+       "testing"
+       "time"
+
+       "go.cypherpunks.su/keks"
+)
+
+type Args struct {
+       A, B int
+}
+
+type Reply struct {
+       C int
+}
+
+type Arith int
+
+func (t *Arith) Add(args *Args, reply *Reply) error {
+       reply.C = args.A + args.B
+       return nil
+}
+
+func (t *Arith) Mul(args *Args, reply *Reply) error {
+       reply.C = args.A * args.B
+       return nil
+}
+
+func (t *Arith) Div(args *Args, reply *Reply) error {
+       if args.B == 0 {
+               return errors.New("divide by zero")
+       }
+       reply.C = args.A / args.B
+       return nil
+}
+
+func (t *Arith) Error(args *Args, reply *Reply) error {
+       panic("ERROR")
+}
+
+func init() {
+       rpc.Register(new(Arith))
+}
+
+func TestServerNoParams(t *testing.T) {
+       cli, srv := net.Pipe()
+       defer cli.Close()
+       srv.SetDeadline(time.Now().Add(10 * time.Millisecond))
+       go ServeConn(srv)
+       _, err := keks.Encode(cli, []any{123, "Arith.Add"}, nil)
+       if err != nil {
+               panic(err)
+       }
+       _, err = keks.NewDecoderFromReader(cli, nil).Decode()
+       if err != io.EOF {
+               t.Fatal("no EOF")
+       }
+}
+
+func TestServerEmptyMessage(t *testing.T) {
+       cli, srv := net.Pipe()
+       defer cli.Close()
+       go ServeConn(srv)
+       _, err := keks.Encode(cli, struct{}{}, nil)
+       if err != nil {
+               panic(err)
+       }
+       _, err = keks.NewDecoderFromReader(cli, nil).Decode()
+       if err != io.EOF {
+               t.Fatal("no EOF")
+       }
+}
+
+func TestServer(t *testing.T) {
+       cli, srv := net.Pipe()
+       defer cli.Close()
+       go ServeConn(srv)
+       for i := range 10 {
+               _, err := keks.Encode(cli, []any{i + 1, "Arith.Add"}, nil)
+               if err != nil {
+                       panic(err)
+               }
+               _, err = keks.Encode(cli, map[string]int{"A": i, "B": i + 1}, nil)
+               if err != nil {
+                       panic(err)
+               }
+               reply, err := keks.NewDecoderFromReader(cli, nil).Decode()
+               if err != nil {
+                       t.Fatal(err)
+               }
+               list := reply.([]any)
+               if list[0] != uint64(i+1) {
+                       t.Fatal("bad id")
+               }
+               if len(list[1].(map[string]any)) != 0 {
+                       t.Fatal("has err", list[1])
+               }
+               reply, err = keks.NewDecoderFromReader(cli, nil).Decode()
+               if err != nil {
+                       t.Fatal(err)
+               }
+               var r Reply
+               err = keks.Map2Struct(&r, reply.(map[string]any))
+               if err != nil {
+                       t.Fatal(err)
+               }
+               if r.C != 2*i+1 {
+                       t.Fatal("bad result", r.C)
+               }
+       }
+}
+
+func TestClient(t *testing.T) {
+       cli, srv := net.Pipe()
+       go ServeConn(srv)
+       client := NewClient(cli)
+       defer client.Close()
+
+       args := &Args{7, 8}
+       reply := new(Reply)
+       err := client.Call("Arith.Add", args, reply)
+       if err != nil {
+               t.Error(err.Error())
+       }
+       if reply.C != args.A+args.B {
+               t.Error(reply.C, args.A+args.B)
+       }
+
+       args = &Args{7, 8}
+       reply = new(Reply)
+       err = client.Call("Arith.Mul", args, reply)
+       if err != nil {
+               t.Error(err.Error())
+       }
+       if reply.C != args.A*args.B {
+               t.Error(reply.C, args.A*args.B)
+       }
+
+       // Out of order.
+       args = &Args{7, 8}
+       mulReply := new(Reply)
+       mulCall := client.Go("Arith.Mul", args, mulReply, nil)
+       addReply := new(Reply)
+       addCall := client.Go("Arith.Add", args, addReply, nil)
+
+       addCall = <-addCall.Done
+       if addCall.Error != nil {
+               t.Error(addCall.Error.Error())
+       }
+       if addReply.C != args.A+args.B {
+               t.Error(addReply.C, args.A+args.B)
+       }
+
+       mulCall = <-mulCall.Done
+       if mulCall.Error != nil {
+               t.Error(mulCall.Error.Error())
+       }
+       if mulReply.C != args.A*args.B {
+               t.Error(mulReply.C, args.A*args.B)
+       }
+
+       // Error test
+       args = &Args{7, 0}
+       reply = new(Reply)
+       err = client.Call("Arith.Div", args, reply)
+       // expect an error: zero divide
+       if err == nil {
+               t.Error("expected error")
+       } else if err.Error() != "divide by zero" {
+               t.Error("expected divide by zero error; got", err)
+       }
+
+       args = &Args{7, 7}
+       reply = new(Reply)
+       err = client.Call("Arith.Div", args, reply)
+       if err != nil {
+               t.Error("unexpected error")
+       }
+}
+
+func TestMalformedInput(t *testing.T) {
+       cli, srv := net.Pipe()
+       go cli.Write([]byte(`{id:1}`)) // invalid json
+       ServeConn(srv)                 // must return, not loop
+}
+
+func TestMalformedOutput(t *testing.T) {
+       cli, srv := net.Pipe()
+       go func() {
+               buf0, err := keks.EncodeBuf([]any{1, nil}, nil)
+               if err != nil {
+                       panic(err)
+               }
+               buf1, err := keks.EncodeBuf(struct{}{}, nil)
+               if err != nil {
+                       panic(err)
+               }
+               srv.Write(append(buf0, buf1...))
+       }()
+       go io.ReadAll(srv)
+
+       client := NewClient(cli)
+       defer client.Close()
+
+       args := &Args{7, 8}
+       reply := new(Reply)
+       err := client.Call("Arith.Add", args, reply)
+       if err == nil {
+               t.Error("expected error")
+       }
+}
diff --git a/go/rpc/client.go b/go/rpc/client.go
new file mode 100644 (file)
index 0000000..c556dc6
--- /dev/null
@@ -0,0 +1,112 @@
+// GoKEKS -- Go KEKS codec implementation
+// Copyright (C) 2024-2025 Sergey Matveev <stargrave@stargrave.org>
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, version 3 of the License.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+package rpc
+
+import (
+       "fmt"
+       "io"
+       "net/rpc"
+       "sync"
+
+       "go.cypherpunks.su/keks"
+       "go.cypherpunks.su/keks/schema"
+)
+
+type ClientCodec struct {
+       conn     io.ReadWriteCloser
+       pending  map[uint64]string
+       pendingM sync.Mutex
+}
+
+func NewClientCodec(conn io.ReadWriteCloser) *ClientCodec {
+       return &ClientCodec{conn: conn, pending: make(map[uint64]string)}
+}
+
+func (cc *ClientCodec) WriteRequest(req *rpc.Request, data any) (err error) {
+       cc.pendingM.Lock()
+       cc.pending[req.Seq] = req.ServiceMethod
+       cc.pendingM.Unlock()
+       if _, err = keks.Encode(cc.conn, []any{req.Seq + 1, req.ServiceMethod}, nil); err != nil {
+               return fmt.Errorf("keks/rpc: WriteRequest: %w", err)
+       }
+       if _, err = keks.Encode(cc.conn, data, nil); err != nil {
+               return fmt.Errorf("keks/rpc: WriteRequest: %w", err)
+       }
+       return nil
+}
+
+func (cc *ClientCodec) ReadResponseHeader(resp *rpc.Response) (err error) {
+       d := keks.NewDecoderFromReader(cc.conn, nil)
+       var r any
+       if r, err = d.Decode(); err != nil {
+               return fmt.Errorf("keks/rpc: ReadResponseHeader: %w", err)
+       }
+       if err = schema.Check("resp", RPCSchemas, r); err != nil {
+               return fmt.Errorf("keks/rpc: ReadResponseHeader: %w", err)
+       }
+       list := r.([]any)
+       resp.Seq = list[0].(uint64) - 1
+       var ok bool
+       cc.pendingM.Lock()
+       resp.ServiceMethod, ok = cc.pending[resp.Seq]
+       if ok {
+               delete(cc.pending, resp.Seq)
+       }
+       cc.pendingM.Unlock()
+       if !ok {
+               return fmt.Errorf("keks/rpc: ReadResponseHeader: unknown id: %d", resp.Seq)
+       }
+       e := list[1].(map[string]any)
+       if len(e) > 0 {
+               msg, ok := e["msg"]
+               if ok {
+                       resp.Error = msg.(string)
+                       if len(resp.Error) == 0 {
+                               resp.Error = "unspecified"
+                       }
+               } else {
+                       code, ok := e["code"]
+                       if ok {
+                               resp.Error = fmt.Sprintf("code: %d", code.(uint64))
+                       } else {
+                               resp.Error = "unspecified"
+                       }
+               }
+       }
+       return nil
+}
+
+func (cc *ClientCodec) ReadResponseBody(dst any) (err error) {
+       d := keks.NewDecoderFromReader(cc.conn, nil)
+       if _, err = d.Parse(); err != nil {
+               return fmt.Errorf("keks/rpc: ReadResponseBody: %w", err)
+       }
+       if dst == nil {
+               return nil
+       }
+       if err = d.UnmarshalStruct(dst); err != nil {
+               return fmt.Errorf("keks/rpc: ReadResponseBody: %w", err)
+       }
+       return nil
+}
+
+func (cc *ClientCodec) Close() error {
+       return cc.conn.Close()
+}
+
+func NewClient(conn io.ReadWriteCloser) *rpc.Client {
+       return rpc.NewClientWithCodec(NewClientCodec(conn))
+}
diff --git a/go/rpc/err.go b/go/rpc/err.go
new file mode 100644 (file)
index 0000000..51ca34c
--- /dev/null
@@ -0,0 +1,6 @@
+package rpc
+
+type Error struct {
+       Msg  *string `keks:"msg,omitempty"`
+       Code *uint64 `keks:"code,omitempty"`
+}
diff --git a/go/rpc/schema.go b/go/rpc/schema.go
new file mode 100644 (file)
index 0000000..fedfdce
--- /dev/null
@@ -0,0 +1,41 @@
+// GoKEKS/CM -- KEKS-encoded cryptographic messages
+// Copyright (C) 2024-2025 Sergey Matveev <stargrave@stargrave.org>
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, version 3 of the License.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+package rpc
+
+import (
+       _ "embed"
+
+       "go.cypherpunks.su/keks"
+       "go.cypherpunks.su/keks/schema"
+)
+
+//go:embed rpc.schema.keks
+var RPCSchemasRaw []byte
+
+var RPCSchemas map[string][][]any
+
+func init() {
+       var magic keks.Magic
+       magic, RPCSchemasRaw = keks.StripMagic(RPCSchemasRaw)
+       if magic != schema.Magic {
+               panic("wrong magic in rpc.schema.keks")
+       }
+       if err := keks.NewDecoderFromBytes(
+               RPCSchemasRaw, nil,
+       ).DecodeStruct(&RPCSchemas); err != nil {
+               panic(err)
+       }
+}
diff --git a/go/rpc/server.go b/go/rpc/server.go
new file mode 100644 (file)
index 0000000..9e0aadc
--- /dev/null
@@ -0,0 +1,100 @@
+// GoKEKS -- Go KEKS codec implementation
+// Copyright (C) 2024-2025 Sergey Matveev <stargrave@stargrave.org>
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, version 3 of the License.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+package rpc
+
+import (
+       "fmt"
+       "io"
+       "net/rpc"
+       "sync"
+
+       "go.cypherpunks.su/keks"
+       "go.cypherpunks.su/keks/schema"
+)
+
+type ServerCodec struct {
+       conn       io.ReadWriteCloser
+       pending    map[uint64]uint64
+       pendingM   sync.Mutex
+       pendingSeq uint64
+}
+
+func NewServerCodec(conn io.ReadWriteCloser) *ServerCodec {
+       return &ServerCodec{conn: conn, pending: make(map[uint64]uint64)}
+}
+
+func (sc *ServerCodec) ReadRequestHeader(req *rpc.Request) (err error) {
+       d := keks.NewDecoderFromReader(sc.conn, nil)
+       var r any
+       if r, err = d.Decode(); err != nil {
+               return fmt.Errorf("keks/rpc: ReadRequestHeader: %w", err)
+       }
+       if err = schema.Check("req", RPCSchemas, r); err != nil {
+               return fmt.Errorf("keks/rpc: ReadRequestHeader: %w", err)
+       }
+       list := r.([]any)
+       req.Seq = list[0].(uint64)
+       sc.pendingM.Lock()
+       sc.pendingSeq++
+       sc.pending[sc.pendingSeq] = req.Seq
+       req.Seq = sc.pendingSeq
+       sc.pendingM.Unlock()
+       req.ServiceMethod = list[1].(string)
+       return nil
+}
+
+func (sc *ServerCodec) ReadRequestBody(dst any) (err error) {
+       d := keks.NewDecoderFromReader(sc.conn, nil)
+       if _, err = d.Parse(); err != nil {
+               return fmt.Errorf("keks/rpc: ReadRequestBody: %w", err)
+       }
+       if dst == nil {
+               return nil
+       }
+       if err = d.UnmarshalStruct(dst); err != nil {
+               return fmt.Errorf("keks/rpc: ReadRequestBody: %w", err)
+       }
+       return nil
+}
+
+func (sc *ServerCodec) WriteResponse(resp *rpc.Response, data any) (err error) {
+       sc.pendingM.Lock()
+       id, ok := sc.pending[resp.Seq]
+       if !ok {
+               panic("unknown id")
+       }
+       delete(sc.pending, resp.Seq)
+       sc.pendingM.Unlock()
+       e := Error{}
+       if resp.Error != "" {
+               e.Msg = &resp.Error
+       }
+       if _, err = keks.Encode(sc.conn, []any{id, e}, nil); err != nil {
+               return fmt.Errorf("keks/rpc: WriteResponse: %w", err)
+       }
+       if _, err = keks.Encode(sc.conn, data, nil); err != nil {
+               return fmt.Errorf("keks/rpc: WriteResponse: %w", err)
+       }
+       return nil
+}
+
+func (sc *ServerCodec) Close() error {
+       return sc.conn.Close()
+}
+
+func ServeConn(conn io.ReadWriteCloser) {
+       rpc.ServeCodec(NewServerCodec(conn))
+}
diff --git a/spec/RPC b/spec/RPC
new file mode 100644 (file)
index 0000000..acc0186
--- /dev/null
+++ b/spec/RPC
@@ -0,0 +1,13 @@
+Here is suggestion to use KEKS codec with a simple client-server
+Remote Procedure Call interface. It is very similar to JSON-RPC 2.0.
+=> https://www.jsonrpc.org/ JSON-RPC\r
+
+Request object is a LIST followed by MAP with arbitrary values, maybe
+empty. "id" is a positive 64-bit integer, that must be unique during the
+whole session's connection.
+
+Response object is also a LIST followed by MAP with arbitrary values.
+Response's "id" must be the same as in corresponding request. Empty
+error map means no error occurred.
+
+<<    [schemas/rpc.tcl]\r
index 1d4926c4c627e7203b0e0714a60059d8361438b917b83290375ea003e3f624e3..985681e7d06f361882d394525116747cf6145935d3a78566e167fe4ca1b608de 100644 (file)
@@ -29,10 +29,11 @@ requirements below.
 
 <<[ComparisonWithOtherCodecs]\r
 
-[INSTALL]
+  [INSTALL]
 [encoding/]
-[schema/] -- structure validation against schemas
-[cm/]     -- cryptographic messages
-[THANKS]
+  [schema/] -- Structure validation against schemas
+      [cm/] -- Cryptographic Messages
+      [RPC] -- Remote Procedure Call
+   [THANKS]
 
 Copyright © 2024-2025 Sergey Matveev <stargrave@stargrave.org>
diff --git a/tcl/schemas/rpc.tcl b/tcl/schemas/rpc.tcl
new file mode 100644 (file)
index 0000000..505a6b3
--- /dev/null
@@ -0,0 +1,16 @@
+req {
+    {field . {list} len=2}
+    {field 0 {int} >0} {# id}
+    {field 1 {str} >0} {# method}
+}
+
+resp {
+    {field . {list} len=2}
+    {field 0 {int} >0} {# id}
+    {field 1 {with err}}
+}
+
+err {
+    {field msg {str} >0 optional}
+    {field code {int} >0 optional}
+}