From 2aed185474d8f57e910db16fac60d700589c6110 Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Fri, 20 Dec 2019 11:05:58 +0300 Subject: [PATCH] Finish SP related goroutines --- doc/news.ru.texi | 3 ++ doc/news.texi | 3 ++ src/sp.go | 98 +++++++++++++++++++++++++++++++++--------------- 3 files changed, 74 insertions(+), 30 deletions(-) diff --git a/doc/news.ru.texi b/doc/news.ru.texi index 28c0922..71c82d8 100644 --- a/doc/news.ru.texi +++ b/doc/news.ru.texi @@ -11,6 +11,9 @@ @item Исправлено не происходящее дополнение (padding) handshake сообщений. +@item +Завершать все порождаемые в SP протоколе горутины, меньше утечек памяти. + @end itemize @node Релиз 5.2.1 diff --git a/doc/news.texi b/doc/news.texi index 9af9ba4..7c72540 100644 --- a/doc/news.texi +++ b/doc/news.texi @@ -13,6 +13,9 @@ Progress messages contain prefix, describing the running action. @item Fixed not occurring handshake messages padding. +@item +Finish all SP protocol related goroutines, less memory leak. + @end itemize @node Release 5.2.1 diff --git a/src/sp.go b/src/sp.go index 580b97e..2aebb07 100644 --- a/src/sp.go +++ b/src/sp.go @@ -38,7 +38,7 @@ const ( PartSuffix = ".part" DefaultDeadline = 10 - SPHeadOverhead = 4 + SPHeadOverhead = 4 ) var ( @@ -195,15 +195,37 @@ type SPState struct { xxOnly TRxTx rxRate int txRate int - isDead bool + isDead chan struct{} listOnly bool onlyPkts map[[32]byte]bool sync.RWMutex } +func (state *SPState) SetDead() { + state.Lock() + defer state.Unlock() + select { + case _, ok := <-state.isDead: + if !ok { + // Already closed channel, dead + return + } + default: + } + close(state.isDead) + go func() { + for _ = range state.payloads { + } + }() +} + func (state *SPState) NotAlive() bool { - if state.isDead { - return true + select { + case _, ok := <-state.isDead: + if !ok { + return true + } + default: } now := time.Now() if state.maxOnlineTime > 0 && state.started.Add(time.Duration(state.maxOnlineTime)*time.Second).Before(now) { @@ -488,9 +510,13 @@ func (state *SPState) StartR(conn ConnDeadlined) error { func (state *SPState) StartWorkers( conn ConnDeadlined, infosPayloads [][]byte, - payload []byte) error { + payload []byte, +) error { + state.isDead = make(chan struct{}) sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)} + if len(infosPayloads) > 1 { + state.wg.Add(1) go func() { for _, payload := range infosPayloads[1:] { state.Ctx.LogD( @@ -500,6 +526,7 @@ func (state *SPState) StartWorkers( ) state.payloads <- payload } + state.wg.Done() }() } state.Ctx.LogD( @@ -513,6 +540,7 @@ func (state *SPState) StartWorkers( return err } + state.wg.Add(1) go func() { for _, reply := range replies { state.Ctx.LogD( @@ -522,25 +550,34 @@ func (state *SPState) StartWorkers( ) state.payloads <- reply } + state.wg.Done() }() if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) { + state.wg.Add(1) go func() { - for range time.Tick(time.Second) { - if state.NotAlive() { - return - } - for _, payload := range state.Ctx.infosOur( - state.Node.Id, - state.Nice, - &state.infosOurSeen, - ) { - state.Ctx.LogD( - "sp-work", - SdsAdd(sds, SDS{"size": len(payload)}), - "queuing new info", - ) - state.payloads <- payload + ticker := time.NewTicker(time.Second) + for { + select { + case _, ok := <-state.isDead: + if !ok { + state.wg.Done() + ticker.Stop() + return + } + case <-ticker.C: + for _, payload := range state.Ctx.infosOur( + state.Node.Id, + state.Nice, + &state.infosOurSeen, + ) { + state.Ctx.LogD( + "sp-work", + SdsAdd(sds, SDS{"size": len(payload)}), + "queuing new info", + ) + state.payloads <- payload + } } } }() @@ -548,13 +585,9 @@ func (state *SPState) StartWorkers( state.wg.Add(1) go func() { - defer func() { - state.isDead = true - state.wg.Done() - }() for { if state.NotAlive() { - return + break } var payload []byte select { @@ -662,17 +695,15 @@ func (state *SPState) StartWorkers( break } } + state.SetDead() + state.wg.Done() }() state.wg.Add(1) go func() { - defer func() { - state.isDead = true - state.wg.Done() - }() for { if state.NotAlive() { - return + break } state.Ctx.LogD("sp-recv", sds, "waiting for payload") conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) @@ -712,6 +743,7 @@ func (state *SPState) StartWorkers( state.Ctx.LogE("sp-recv", sds, err, "") break } + state.wg.Add(1) go func() { for _, reply := range replies { state.Ctx.LogD( @@ -721,11 +753,14 @@ func (state *SPState) StartWorkers( ) state.payloads <- reply } + state.wg.Done() }() if state.rxRate > 0 { time.Sleep(time.Second / time.Duration(state.rxRate)) } } + state.SetDead() + state.wg.Done() }() return nil @@ -733,6 +768,7 @@ func (state *SPState) StartWorkers( func (state *SPState) Wait() { state.wg.Wait() + close(state.payloads) state.dirUnlock() state.Duration = time.Now().Sub(state.started) state.RxSpeed = state.RxBytes @@ -915,8 +951,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { delete(state.infosTheir, *file.Hash) state.Unlock() spWorkersGroup.Done() + state.wg.Add(1) go func() { state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) + state.wg.Done() }() }() case SPTypeDone: -- 2.48.1