From 137c819a92961066efc7a7e5ef1999ad7f579113 Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Sat, 14 Jan 2017 22:32:20 +0300 Subject: [PATCH] Ability to configure online deadline timeout --- src/cypherpunks.ru/nncp/cfg.go | 25 +++++--- src/cypherpunks.ru/nncp/cmd/nncp-call/main.go | 12 +++- .../nncp/cmd/nncp-daemon/main.go | 8 +-- src/cypherpunks.ru/nncp/node.go | 21 +++--- src/cypherpunks.ru/nncp/sp.go | 64 +++++++++---------- 5 files changed, 73 insertions(+), 57 deletions(-) diff --git a/src/cypherpunks.ru/nncp/cfg.go b/src/cypherpunks.ru/nncp/cfg.go index 1bb9c02..a10c670 100644 --- a/src/cypherpunks.ru/nncp/cfg.go +++ b/src/cypherpunks.ru/nncp/cfg.go @@ -47,6 +47,8 @@ type NodeYAML struct { Via []string `via,omitempty` Addrs map[string]string `addrs,omitempty` + + OnlineDeadline *int `onlinedeadline,omitempty` } type NodeOurYAML struct { @@ -130,20 +132,27 @@ func NewNode(name string, yml NodeYAML) (*Node, error) { } node := Node{ - Name: name, - Id: nodeId, - ExchPub: new([32]byte), - SignPub: ed25519.PublicKey(signPub), - Sendmail: yml.Sendmail, - Incoming: incoming, - Freq: freq, - Addrs: yml.Addrs, + Name: name, + Id: nodeId, + ExchPub: new([32]byte), + SignPub: ed25519.PublicKey(signPub), + Sendmail: yml.Sendmail, + Incoming: incoming, + Freq: freq, + Addrs: yml.Addrs, + OnlineDeadline: DefaultDeadline, } copy(node.ExchPub[:], exchPub) if len(noisePub) > 0 { node.NoisePub = new([32]byte) copy(node.NoisePub[:], noisePub) } + if yml.OnlineDeadline != nil { + if *yml.OnlineDeadline <= 0 { + return nil, errors.New("OnlineDeadline must be at least 1 second") + } + node.OnlineDeadline = *yml.OnlineDeadline + } return &node, nil } diff --git a/src/cypherpunks.ru/nncp/cmd/nncp-call/main.go b/src/cypherpunks.ru/nncp/cmd/nncp-call/main.go index a3861dc..771582b 100644 --- a/src/cypherpunks.ru/nncp/cmd/nncp-call/main.go +++ b/src/cypherpunks.ru/nncp/cmd/nncp-call/main.go @@ -50,6 +50,8 @@ func main() { debug = flag.Bool("debug", false, "Print debug messages") version = flag.Bool("version", false, "Print version information") warranty = flag.Bool("warranty", false, "Print warranty information") + + onlineDeadline = flag.Int("onlinedeadline", 0, "Override onlinedeadline option") ) flag.Usage = usage flag.Parse() @@ -93,6 +95,10 @@ func main() { log.Fatalln("Node does not have online communication capability") } + if *onlineDeadline > 0 { + node.OnlineDeadline = *onlineDeadline + } + var xxOnly nncp.TRxTx if *rxOnly { xxOnly = nncp.TRx @@ -126,10 +132,10 @@ func main() { ctx.LogD("call", nncp.SDS{"addr": addr}, "connected") state, err := ctx.StartI(conn, node.Id, nice, &xxOnly) if err == nil { - ctx.LogI("call-start", nncp.SDS{"node": state.NodeId}, "connected") + ctx.LogI("call-start", nncp.SDS{"node": state.Node.Id}, "connected") state.Wait() ctx.LogI("call-finish", nncp.SDS{ - "node": state.NodeId, + "node": state.Node.Id, "duration": strconv.FormatInt(int64(state.Duration.Seconds()), 10), "rxbytes": strconv.FormatInt(state.RxBytes, 10), "txbytes": strconv.FormatInt(state.TxBytes, 10), @@ -140,7 +146,7 @@ func main() { conn.Close() break } else { - ctx.LogE("call-start", nncp.SDS{"node": state.NodeId, "err": err}, "") + ctx.LogE("call-start", nncp.SDS{"node": state.Node.Id, "err": err}, "") conn.Close() } } diff --git a/src/cypherpunks.ru/nncp/cmd/nncp-daemon/main.go b/src/cypherpunks.ru/nncp/cmd/nncp-daemon/main.go index 890c823..c1355c8 100644 --- a/src/cypherpunks.ru/nncp/cmd/nncp-daemon/main.go +++ b/src/cypherpunks.ru/nncp/cmd/nncp-daemon/main.go @@ -90,10 +90,10 @@ func main() { go func(conn net.Conn) { state, err := ctx.StartR(conn, nice, nil) if err == nil { - ctx.LogI("call-start", nncp.SDS{"node": state.NodeId}, "connected") + ctx.LogI("call-start", nncp.SDS{"node": state.Node.Id}, "connected") state.Wait() ctx.LogI("call-finish", nncp.SDS{ - "node": state.NodeId, + "node": state.Node.Id, "duration": strconv.FormatInt(int64(state.Duration.Seconds()), 10), "rxbytes": strconv.FormatInt(state.RxBytes, 10), "txbytes": strconv.FormatInt(state.TxBytes, 10), @@ -102,8 +102,8 @@ func main() { }, "") } else { nodeId := "unknown" - if state != nil && state.NodeId != nil { - nodeId = state.NodeId.String() + if state != nil && state.Node != nil { + nodeId = state.Node.Id.String() } ctx.LogE("call-start", nncp.SDS{"node": nodeId, "err": err}, "") } diff --git a/src/cypherpunks.ru/nncp/node.go b/src/cypherpunks.ru/nncp/node.go index 8b3a012..5edd5ea 100644 --- a/src/cypherpunks.ru/nncp/node.go +++ b/src/cypherpunks.ru/nncp/node.go @@ -35,16 +35,17 @@ func (id NodeId) String() string { } type Node struct { - Name string - Id *NodeId - ExchPub *[32]byte - SignPub ed25519.PublicKey - NoisePub *[32]byte - Sendmail []string - Incoming *string - Freq *string - Via []*NodeId - Addrs map[string]string + Name string + Id *NodeId + ExchPub *[32]byte + SignPub ed25519.PublicKey + NoisePub *[32]byte + Sendmail []string + Incoming *string + Freq *string + Via []*NodeId + Addrs map[string]string + OnlineDeadline int } type NodeOur struct { diff --git a/src/cypherpunks.ru/nncp/sp.go b/src/cypherpunks.ru/nncp/sp.go index 235770d..27a62eb 100644 --- a/src/cypherpunks.ru/nncp/sp.go +++ b/src/cypherpunks.ru/nncp/sp.go @@ -36,9 +36,9 @@ import ( ) const ( - MaxSPSize = 2<<16 - 256 - PartSuffix = ".part" - DeadlineDuration = 10 + MaxSPSize = 2<<16 - 256 + PartSuffix = ".part" + DefaultDeadline = 10 ) var ( @@ -158,7 +158,7 @@ func payloadsSplit(payloads [][]byte) [][]byte { type SPState struct { ctx *Ctx - NodeId *NodeId + Node *Node nice uint8 hs *noise.HandshakeState csOur *noise.CipherState @@ -184,7 +184,7 @@ type SPState struct { func (state *SPState) isDead() bool { now := time.Now() - return now.Sub(state.RxLastSeen).Seconds() >= DeadlineDuration && now.Sub(state.TxLastSeen).Seconds() >= DeadlineDuration + return int(now.Sub(state.RxLastSeen).Seconds()) >= state.Node.OnlineDeadline && int(now.Sub(state.TxLastSeen).Seconds()) >= state.Node.OnlineDeadline } func (state *SPState) dirUnlock() { @@ -275,6 +275,7 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) } } started := time.Now() + node := ctx.Neigh[*nodeId] conf := noise.Config{ CipherSuite: NoiseCipherSuite, Pattern: noise.HandshakeIK, @@ -283,12 +284,12 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) Private: ctx.Self.NoisePrv[:], Public: ctx.Self.NoisePub[:], }, - PeerStatic: ctx.Neigh[*nodeId].NoisePub[:], + PeerStatic: node.NoisePub[:], } state := SPState{ ctx: ctx, hs: noise.NewHandshakeState(conf), - NodeId: nodeId, + Node: node, nice: nice, payloads: make(chan []byte), infosTheir: make(map[[32]byte]*SPInfo), @@ -317,14 +318,14 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) buf, _, _ = state.hs.WriteMessage(nil, firstPayload) sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))} ctx.LogD("sp-start", sds, "sending first message") - conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second)) + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) if err = state.WriteSP(conn, buf); err != nil { ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() return nil, err } ctx.LogD("sp-start", sds, "waiting for first message") - conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second)) + conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) if buf, err = state.ReadSP(conn); err != nil { ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() @@ -375,7 +376,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro SDS{"nice": strconv.Itoa(int(nice))}, "waiting for first message", ) - conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second)) + conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) if buf, err = state.ReadSP(conn); err != nil { ctx.LogE("sp-start", SDS{"err": err}, "") return nil, err @@ -385,27 +386,26 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro return nil, err } - var nodeId *NodeId - for _, node := range ctx.Neigh { + var node *Node + for _, node = range ctx.Neigh { if subtle.ConstantTimeCompare(state.hs.PeerStatic(), node.NoisePub[:]) == 1 { - nodeId = node.Id break } } - if nodeId == nil { + if node == nil { peerId := ToBase32(state.hs.PeerStatic()) ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown") return nil, errors.New("Unknown peer: " + peerId) } - state.NodeId = nodeId - sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))} + state.Node = node + sds := SDS{"node": node.Id, "nice": strconv.Itoa(int(nice))} - if ctx.ensureRxDir(nodeId); err != nil { + if ctx.ensureRxDir(node.Id); err != nil { return nil, err } var rxLock *os.File if xxOnly != nil && *xxOnly == TRx { - rxLock, err = ctx.LockDir(nodeId, TRx) + rxLock, err = ctx.LockDir(node.Id, TRx) if err != nil { return nil, err } @@ -413,7 +413,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro state.rxLock = rxLock var txLock *os.File if xxOnly != nil && *xxOnly == TTx { - txLock, err = ctx.LockDir(nodeId, TTx) + txLock, err = ctx.LockDir(node.Id, TTx) if err != nil { return nil, err } @@ -422,7 +422,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro var infosPayloads [][]byte if xxOnly == nil || *xxOnly != TTx { - infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen) + infosPayloads = ctx.infosOur(node.Id, nice, &state.infosOurSeen) } var firstPayload []byte if len(infosPayloads) > 0 { @@ -435,7 +435,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro ctx.LogD("sp-start", sds, "sending first message") buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload) - conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second)) + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) if err = state.WriteSP(conn, buf); err != nil { ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() @@ -451,7 +451,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro } func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payload []byte) error { - sds := SDS{"node": state.NodeId, "nice": strconv.Itoa(int(state.nice))} + sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))} if len(infosPayloads) > 1 { go func() { for _, payload := range infosPayloads[1:] { @@ -489,7 +489,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa go func() { for range time.Tick(time.Second) { for _, payload := range state.ctx.infosOur( - state.NodeId, + state.Node.Id, state.nice, &state.infosOurSeen, ) { @@ -538,7 +538,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa state.ctx.LogD("sp-file", sdsp, "queueing") fd, err := os.Open(filepath.Join( state.ctx.Spool, - state.NodeId.String(), + state.Node.Id.String(), string(TTx), ToBase32(freq.Hash[:]), )) @@ -604,7 +604,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), "sending", ) - conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second)) + conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil { state.ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "") break @@ -620,7 +620,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa return } state.ctx.LogD("sp-recv", sds, "waiting for payload") - conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second)) + conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) payload, err := state.ReadSP(conn) if err != nil { unmarshalErr := err.(*xdr.UnmarshalError) @@ -685,7 +685,7 @@ func (state *SPState) Wait() { } func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { - sds := SDS{"node": state.NodeId, "nice": strconv.Itoa(int(state.nice))} + sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))} r := bytes.NewReader(payload) var err error var replies [][]byte @@ -725,7 +725,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.ctx.LogD("sp-process", sdsp, "stating part") if _, err = os.Stat(filepath.Join( state.ctx.Spool, - state.NodeId.String(), + state.Node.Id.String(), string(TRx), ToBase32(info.Hash[:]), )); err == nil { @@ -735,7 +735,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } fi, err := os.Stat(filepath.Join( state.ctx.Spool, - state.NodeId.String(), + state.Node.Id.String(), string(TRx), ToBase32(info.Hash[:])+PartSuffix, )) @@ -773,7 +773,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { }) filePath := filepath.Join( state.ctx.Spool, - state.NodeId.String(), + state.Node.Id.String(), string(TRx), ToBase32(file.Hash[:]), ) @@ -852,7 +852,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.ctx.LogD("sp-done", sdsp, "removing") err := os.Remove(filepath.Join( state.ctx.Spool, - state.NodeId.String(), + state.Node.Id.String(), string(TTx), ToBase32(done.Hash[:]), )) @@ -900,7 +900,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } state.ctx.LogI("sp-infos", SDS{ "xx": string(TRx), - "node": state.NodeId, + "node": state.Node.Id, "pkts": strconv.Itoa(pkts), "size": strconv.FormatInt(int64(size), 10), }, "") -- 2.48.1