"testing"
"time"
+ "github.com/google/uuid"
"go.cypherpunks.su/keks"
)
defer cli.Close()
go ServeConn(srv)
for i := range 10 {
- _, err := keks.Encode(cli, []any{i + 1, "Arith.Add"}, nil)
+ id, err := uuid.NewRandom()
if err != nil {
panic(err)
}
- _, err = keks.Encode(cli, map[string]int{"A": i, "B": i + 1}, nil)
+ _, err = keks.Encode(cli, []any{"c", id, "Arith.Add", Args{i, i + 1}}, nil)
if err != nil {
panic(err)
}
t.Fatal(err)
}
list := reply.([]any)
- if list[0] != uint64(i+1) {
- t.Fatal("bad id")
+ if list[0].(string) != "r" {
+ t.Fatal("bad r")
}
- if len(list[1].(string)) != 0 {
- t.Fatal("has err", list[1])
+ if list[1].(*keks.Hexlet).UUID() != id {
+ t.Fatal("bad id")
}
- reply, err = keks.NewDecoderFromReader(cli, nil).Decode()
- if err != nil {
- t.Fatal(err)
+ if list[2].(string) != "" {
+ t.Fatal("has err", list[2])
}
var r Reply
- err = keks.Map2Struct(&r, reply.(map[string]any))
+ err = keks.Map2Struct(&r, list[3].(map[string]any))
if err != nil {
t.Fatal(err)
}
package rpc
import (
+ "bytes"
+ "crypto/rand"
"fmt"
"io"
"net/rpc"
"sync"
+ "github.com/google/uuid"
+
"go.cypherpunks.su/keks"
"go.cypherpunks.su/keks/schema"
)
+type SeqAndMethod struct {
+ method string
+ seq uint64
+}
+
type ClientCodec struct {
conn io.ReadWriteCloser
- pending map[uint64]string
+ body map[string]any
+ pending map[uuid.UUID]SeqAndMethod
+ rnd [16]byte
pendingM sync.Mutex
}
func NewClientCodec(conn io.ReadWriteCloser) *ClientCodec {
- return &ClientCodec{conn: conn, pending: make(map[uint64]string)}
+ cc := ClientCodec{
+ conn: conn,
+ pending: make(map[uuid.UUID]SeqAndMethod),
+ }
+ rand.Read(cc.rnd[:])
+ return &cc
}
func (cc *ClientCodec) WriteRequest(req *rpc.Request, data any) (err error) {
+ var id uuid.UUID
+ id, err = uuid.NewV7FromReader(bytes.NewReader(cc.rnd[:]))
+ if err != nil {
+ panic(err)
+ }
cc.pendingM.Lock()
- cc.pending[req.Seq] = req.ServiceMethod
+ cc.pending[id] = SeqAndMethod{seq: req.Seq, method: req.ServiceMethod}
cc.pendingM.Unlock()
- if req.Seq == 0xFFFFFFFFFFFFFFFF {
- panic("keks/rpc: WriteRequest: req.Seq overflow")
- }
- if _, err = keks.Encode(cc.conn, []any{req.Seq + 1, req.ServiceMethod}, nil); err != nil {
- return fmt.Errorf("keks/rpc: WriteRequest: %w", err)
- }
- if _, err = keks.Encode(cc.conn, data, nil); err != nil {
+ if _, err = keks.Encode(cc.conn, []any{"c", id, req.ServiceMethod, data}, nil); err != nil {
return fmt.Errorf("keks/rpc: WriteRequest: %w", err)
}
return nil
if r, err = d.Decode(); err != nil {
return fmt.Errorf("keks/rpc: ReadResponseHeader: %w", err)
}
- if err = schema.Check("resp", RPCSchemas, r); err != nil {
+ if err = schema.Check("reply", RPCSchemas, r); err != nil {
return fmt.Errorf("keks/rpc: ReadResponseHeader: %w", err)
}
list := r.([]any)
- resp.Seq = list[0].(uint64)
- if resp.Seq == 0 {
- panic("keks/rpc: ReadResponseHeader: resp.Seq is zero")
- }
- resp.Seq--
- var ok bool
+ id := list[1].(*keks.Hexlet).UUID()
cc.pendingM.Lock()
- resp.ServiceMethod, ok = cc.pending[resp.Seq]
+ seqAndMethod, ok := cc.pending[id]
if ok {
- delete(cc.pending, resp.Seq)
+ delete(cc.pending, id)
}
cc.pendingM.Unlock()
if !ok {
- return fmt.Errorf("keks/rpc: ReadResponseHeader: unknown id: %d", resp.Seq)
+ return fmt.Errorf("keks/rpc: ReadResponseHeader: unknown id: %s", id)
}
- resp.Error = list[1].(string)
+ resp.Seq, resp.ServiceMethod = seqAndMethod.seq, seqAndMethod.method
+ resp.Error = list[2].(string)
+ cc.body = list[3].(map[string]any)
return nil
}
func (cc *ClientCodec) ReadResponseBody(dst any) (err error) {
- d := keks.NewDecoderFromReader(cc.conn, nil)
- if _, err = d.Parse(); err != nil {
- return fmt.Errorf("keks/rpc: ReadResponseBody: %w", err)
- }
if dst == nil {
return nil
}
- if err = d.UnmarshalStruct(dst); err != nil {
+ if err = keks.Map2Struct(dst, cc.body); err != nil {
return fmt.Errorf("keks/rpc: ReadResponseBody: %w", err)
}
return nil
"net/rpc"
"sync"
+ "github.com/google/uuid"
"go.cypherpunks.su/keks"
"go.cypherpunks.su/keks/schema"
)
type ServerCodec struct {
- conn io.ReadWriteCloser
- pending map[uint64]uint64
- pendingM sync.Mutex
- pendingSeq uint64
+ conn io.ReadWriteCloser
+ ids map[uint64]uuid.UUID
+ body map[string]any
+ seq uint64
+ idsM sync.Mutex
}
func NewServerCodec(conn io.ReadWriteCloser) *ServerCodec {
- return &ServerCodec{conn: conn, pending: make(map[uint64]uint64)}
+ return &ServerCodec{conn: conn, ids: make(map[uint64]uuid.UUID)}
}
func (sc *ServerCodec) ReadRequestHeader(req *rpc.Request) (err error) {
if r, err = d.Decode(); err != nil {
return fmt.Errorf("keks/rpc: ReadRequestHeader: %w", err)
}
- if err = schema.Check("req", RPCSchemas, r); err != nil {
+ if err = schema.Check("call", RPCSchemas, r); err != nil {
return fmt.Errorf("keks/rpc: ReadRequestHeader: %w", err)
}
list := r.([]any)
- req.Seq = list[0].(uint64)
- sc.pendingM.Lock()
- sc.pendingSeq++
- sc.pending[sc.pendingSeq] = req.Seq
- req.Seq = sc.pendingSeq
- sc.pendingM.Unlock()
- req.ServiceMethod = list[1].(string)
+ sc.idsM.Lock()
+ sc.seq++
+ req.Seq = sc.seq
+ sc.ids[sc.seq] = list[1].(*keks.Hexlet).UUID()
+ sc.idsM.Unlock()
+ req.ServiceMethod = list[2].(string)
+ sc.body = list[3].(map[string]any)
return nil
}
func (sc *ServerCodec) ReadRequestBody(dst any) (err error) {
- d := keks.NewDecoderFromReader(sc.conn, nil)
- if _, err = d.Parse(); err != nil {
- return fmt.Errorf("keks/rpc: ReadRequestBody: %w", err)
- }
if dst == nil {
return nil
}
- if err = d.UnmarshalStruct(dst); err != nil {
+ if err = keks.Map2Struct(dst, sc.body); err != nil {
return fmt.Errorf("keks/rpc: ReadRequestBody: %w", err)
}
return nil
}
func (sc *ServerCodec) WriteResponse(resp *rpc.Response, data any) (err error) {
- sc.pendingM.Lock()
- id, ok := sc.pending[resp.Seq]
+ sc.idsM.Lock()
+ id, ok := sc.ids[resp.Seq]
if !ok {
panic("unknown id")
}
- delete(sc.pending, resp.Seq)
- sc.pendingM.Unlock()
- if _, err = keks.Encode(sc.conn, []any{id, resp.Error}, nil); err != nil {
- return fmt.Errorf("keks/rpc: WriteResponse: %w", err)
- }
- if _, err = keks.Encode(sc.conn, data, nil); err != nil {
+ delete(sc.ids, resp.Seq)
+ sc.idsM.Unlock()
+ if _, err = keks.Encode(sc.conn, []any{"r", id, resp.Error, data}, nil); err != nil {
return fmt.Errorf("keks/rpc: WriteResponse: %w", err)
}
return nil
Here is suggestion to use KEKS codec with a simple client-server
-Remote Procedure Call interface. It is very similar to JSON-RPC 2.0.
+Remote Procedure Call interface. It is very similar to
=> https://www.jsonrpc.org/ JSON-RPC\r
+=> https://github.com/msgpack-rpc/msgpack-rpc MessagePack-RPC\r
-Request object is a LIST followed by MAP with arbitrary values, maybe
-empty. "id" is a positive 64-bit integer, that must be unique during the
-whole session's connection.
-
-Response object is also a LIST followed by MAP with arbitrary values,
-corresponding error additional data. Response's "id" must be the same as
-in corresponding request. Empty error string means no error occurred.
+Call, reply and notify message headers ([schema/tcl] format):
<< [schemas/rpc.tcl]\r
+
+It is advisable to use UUIDv7 for call/notify's id.
+Reply's id must be the same as in corresponding call.
+Empty error string means no error occurred.
+
+Notification differs only by its type and no reply expectation.
integer values, you choose one of: s, ms, us, ns, ps, fs, as.
"utc" issues UTC command.
-"of s" argument issues checking of EACH element of the list or map
+{of s} argument issues checking of EACH element of the list or map
against the specified schema, or against specified type if "s" is a
known type.
-req {
- {field . {list} len=2}
- {field 0 {int} >0} {# id}
- {field 1 {str} >0} {# method}
+call {
+ {field . {list} len=4}
+ {field 0 {str} =c}
+ {field 1 {hexlet}} {# id}
+ {field 2 {str} >0} {# method}
+ {field 3 {map}} {# params}
}
-resp {
- {field . {list} len=2}
- {field 0 {int} >0} {# id}
- {field 1 {str}} {# error}
+reply {
+ {field . {list} len=4}
+ {field 0 {str} =r}
+ {field 1 {hexlet}} {# id}
+ {field 2 {str}} {# error}
+ {field 3 {map}} {# body}
+}
+
+notify {
+ {field . {list} len=4}
+ {field 0 {str} =n}
+ {field 1 {hexlet}} {# id}
+ {field 2 {str} >0} {# method}
+ {field 3 {map}} {# body}
}