From ddf667a168c7572f48770c2756fbb97217c6a5e2 Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Tue, 5 Jan 2021 20:08:11 +0300 Subject: [PATCH] Fix invalidly used spWorkersGroup * It leads to races among several SP processes * No .Done() was called when error occurs inside checker gorouting --- src/sp.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/sp.go b/src/sp.go index 56db462..72ae7fb 100644 --- a/src/sp.go +++ b/src/sp.go @@ -56,7 +56,7 @@ var ( DefaultDeadline = 10 * time.Second PingTimeout = time.Minute - spWorkersGroup sync.WaitGroup + spCheckerToken chan struct{} ) type SPType uint8 @@ -148,6 +148,8 @@ func init() { panic(err) } SPFileOverhead = buf.Len() + spCheckerToken = make(chan struct{}, 1) + spCheckerToken <- struct{}{} } func MarshalSP(typ SPType, sp interface{}) []byte { @@ -989,9 +991,11 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { fd.Close() // #nosec G104 continue } - spWorkersGroup.Wait() - spWorkersGroup.Add(1) + <-spCheckerToken go func() { + defer func() { + spCheckerToken <- struct{}{} + }() if err := fd.Sync(); err != nil { state.Ctx.LogE("sp-file", sdsp, err, "sync") fd.Close() // #nosec G104 @@ -1023,7 +1027,6 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Lock() delete(state.infosTheir, *file.Hash) state.Unlock() - spWorkersGroup.Done() state.wg.Add(1) go func() { state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) -- 2.48.1