From 03e9cc2ab96c2e3063638272364acab3ae61b1ff Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Sat, 7 Jan 2017 15:12:51 +0300 Subject: [PATCH] Wait for transfer to complete --- src/cypherpunks.ru/nncp/llp.go | 49 ++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/src/cypherpunks.ru/nncp/llp.go b/src/cypherpunks.ru/nncp/llp.go index 26e982d..6b91f2c 100644 --- a/src/cypherpunks.ru/nncp/llp.go +++ b/src/cypherpunks.ru/nncp/llp.go @@ -36,8 +36,9 @@ import ( ) const ( - MaxLLPSize = 2<<15 - 256 - PartSuffix = ".part" + MaxLLPSize = 2<<15 - 256 + PartSuffix = ".part" + DeadlineDuration = 10 ) var ( @@ -54,8 +55,6 @@ var ( noise.CipherChaChaPoly, noise.HashBLAKE2b, ) - - DeadlineDuration time.Duration = 10 * time.Second ) type LLPType uint8 @@ -167,7 +166,6 @@ type LLPState struct { payloads chan []byte infosTheir map[[32]byte]*LLPInfo queueTheir []*LLPFreq - isDead bool wg sync.WaitGroup RxBytes int64 RxLastSeen time.Time @@ -183,6 +181,11 @@ type LLPState struct { sync.RWMutex } +func (state *LLPState) isDead() bool { + now := time.Now() + return now.Sub(state.RxLastSeen).Seconds() >= DeadlineDuration && now.Sub(state.TxLastSeen).Seconds() >= DeadlineDuration +} + func (state *LLPState) dirUnlock() { state.ctx.UnlockDir(state.rxLock) state.ctx.UnlockDir(state.txLock) @@ -320,14 +323,14 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) buf, _, _ = state.hs.WriteMessage(nil, firstPayload) sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))} ctx.LogD("llp-start", sds, "sending first message") - conn.SetWriteDeadline(time.Now().Add(DeadlineDuration)) + conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second)) if err = state.WriteLLP(conn, buf); err != nil { ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() return nil, err } ctx.LogD("llp-start", sds, "waiting for first message") - conn.SetReadDeadline(time.Now().Add(DeadlineDuration)) + conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second)) if buf, err = state.ReadLLP(conn); err != nil { ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() @@ -377,7 +380,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*LLPState, err SDS{"nice": strconv.Itoa(int(nice))}, "waiting for first message", ) - conn.SetReadDeadline(time.Now().Add(DeadlineDuration)) + conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second)) if buf, err = state.ReadLLP(conn); err != nil { ctx.LogE("llp-start", SDS{"err": err}, "") return nil, err @@ -436,7 +439,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*LLPState, err ctx.LogD("llp-start", sds, "sending first message") buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload) - conn.SetWriteDeadline(time.Now().Add(DeadlineDuration)) + conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second)) if err = state.WriteLLP(conn, buf); err != nil { ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() @@ -489,7 +492,7 @@ func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, paylo go func() { defer state.wg.Done() for { - if state.isDead { + if state.isDead() { return } var payload []byte @@ -586,31 +589,32 @@ func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, paylo SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), "sending", ) - conn.SetWriteDeadline(time.Now().Add(DeadlineDuration)) + conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second)) if err := state.WriteLLP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil { state.ctx.LogE("llp-xmit", SdsAdd(sds, SDS{"err": err}), "") break } } - state.isDead = true }() state.wg.Add(1) go func() { defer state.wg.Done() for { - if state.isDead { + if state.isDead() { return } state.ctx.LogD("llp-recv", sds, "waiting for payload") - conn.SetReadDeadline(time.Now().Add(DeadlineDuration)) + conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second)) payload, err := state.ReadLLP(conn) if err != nil { unmarshalErr := err.(*xdr.UnmarshalError) netErr, ok := unmarshalErr.Err.(net.Error) - if !((ok && netErr.Timeout()) || unmarshalErr.ErrorCode == xdr.ErrIO) { + if (ok && netErr.Timeout()) || unmarshalErr.ErrorCode == xdr.ErrIO { + continue + } else { state.ctx.LogE("llp-recv", SdsAdd(sds, SDS{"err": err}), "") + break } - break } state.ctx.LogD( "llp-recv", @@ -643,7 +647,6 @@ func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, paylo } }() } - state.isDead = true }() return nil } @@ -703,6 +706,16 @@ func (state *LLPState) ProcessLLP(payload []byte) ([][]byte, error) { state.infosTheir[*info.Hash] = &info state.Unlock() state.ctx.LogD("llp-process", sdsp, "stating part") + if _, err = os.Stat(filepath.Join( + state.ctx.Spool, + state.NodeId.String(), + string(TRx), + ToBase32(info.Hash[:]), + )); err == nil { + state.ctx.LogD("llp-process", sdsp, "already done") + replies = append(replies, MarshalLLP(LLPTypeDone, LLPDone{info.Hash})) + continue + } fi, err := os.Stat(filepath.Join( state.ctx.Spool, state.NodeId.String(), @@ -788,6 +801,8 @@ func (state *LLPState) ProcessLLP(payload []byte) ([][]byte, error) { fd.Close() return } + state.wg.Add(1) + defer state.wg.Done() fd.Seek(0, 0) state.ctx.LogD("llp-file", sdsp, "checking") gut, err := Check(fd, file.Hash[:]) -- 2.48.1