From: Sergey Matveev Date: Tue, 5 Jan 2021 17:08:11 +0000 (+0300) Subject: Fix invalidly used spWorkersGroup X-Git-Tag: v5.5.0^2~12 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=ddf667a168c7572f48770c2756fbb97217c6a5e2;p=nncp.git Fix invalidly used spWorkersGroup * It leads to races among several SP processes * No .Done() was called when error occurs inside checker gorouting --- diff --git a/src/sp.go b/src/sp.go index 56db462..72ae7fb 100644 --- a/src/sp.go +++ b/src/sp.go @@ -56,7 +56,7 @@ var ( DefaultDeadline = 10 * time.Second PingTimeout = time.Minute - spWorkersGroup sync.WaitGroup + spCheckerToken chan struct{} ) type SPType uint8 @@ -148,6 +148,8 @@ func init() { panic(err) } SPFileOverhead = buf.Len() + spCheckerToken = make(chan struct{}, 1) + spCheckerToken <- struct{}{} } func MarshalSP(typ SPType, sp interface{}) []byte { @@ -989,9 +991,11 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { fd.Close() // #nosec G104 continue } - spWorkersGroup.Wait() - spWorkersGroup.Add(1) + <-spCheckerToken go func() { + defer func() { + spCheckerToken <- struct{}{} + }() if err := fd.Sync(); err != nil { state.Ctx.LogE("sp-file", sdsp, err, "sync") fd.Close() // #nosec G104 @@ -1023,7 +1027,6 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Lock() delete(state.infosTheir, *file.Hash) state.Unlock() - spWorkersGroup.Done() state.wg.Add(1) go func() { state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})