From: Sergey Matveev Date: Sun, 8 Jan 2017 09:01:52 +0000 (+0300) Subject: Rename LLP to SP (some kind of Sync Protocol) X-Git-Tag: 0.1~29 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=db1eac8703e54a9dd5ddfcefba215fa1236a9ea7;p=nncp.git Rename LLP to SP (some kind of Sync Protocol) --- diff --git a/src/cypherpunks.ru/nncp/ctx.go b/src/cypherpunks.ru/nncp/ctx.go index 0d00803..1122f45 100644 --- a/src/cypherpunks.ru/nncp/ctx.go +++ b/src/cypherpunks.ru/nncp/ctx.go @@ -57,12 +57,12 @@ func (ctx *Ctx) FindNode(id string) (*Node, error) { func (ctx *Ctx) ensureRxDir(nodeId *NodeId) error { dirPath := filepath.Join(ctx.Spool, nodeId.String(), string(TRx)) if err := os.MkdirAll(dirPath, os.FileMode(0700)); err != nil { - ctx.LogE("llp-ensure", SDS{"dir": dirPath, "err": err}, "") + ctx.LogE("dir-ensure", SDS{"dir": dirPath, "err": err}, "") return err } fd, err := os.Open(dirPath) if err != nil { - ctx.LogE("llp-ensure", SDS{"dir": dirPath, "err": err}, "") + ctx.LogE("dir-ensure", SDS{"dir": dirPath, "err": err}, "") return err } fd.Close() diff --git a/src/cypherpunks.ru/nncp/humanizer.go b/src/cypherpunks.ru/nncp/humanizer.go index a3ebc8d..fc2ecd0 100644 --- a/src/cypherpunks.ru/nncp/humanizer.go +++ b/src/cypherpunks.ru/nncp/humanizer.go @@ -177,7 +177,7 @@ func (ctx *Ctx) Humanize(s string) string { humanize.IBytes(uint64(rx)), humanize.IBytes(uint64(rxs)), humanize.IBytes(uint64(tx)), humanize.IBytes(uint64(txs)), ) - case "llp-infos": + case "sp-infos": switch sds["xx"] { case "rx": msg = fmt.Sprintf("%s has got for us: ", nodeS) @@ -187,7 +187,7 @@ func (ctx *Ctx) Humanize(s string) string { return s } msg += fmt.Sprintf("%s packets, %s", sds["pkts"], size) - case "llp-file": + case "sp-file": switch sds["xx"] { case "rx": msg = "Got file " @@ -211,7 +211,7 @@ func (ctx *Ctx) Humanize(s string) string { humanize.IBytes(uint64(sizeParsed)), humanize.IBytes(uint64(fullsize)), ) - case "llp-done": + case "sp-done": switch sds["xx"] { case "rx": msg = fmt.Sprintf("File %s is retreived (%s)", sds["hash"], size) diff --git a/src/cypherpunks.ru/nncp/sortbynice.go b/src/cypherpunks.ru/nncp/sortbynice.go index 9e68f52..1e02796 100644 --- a/src/cypherpunks.ru/nncp/sortbynice.go +++ b/src/cypherpunks.ru/nncp/sortbynice.go @@ -1,6 +1,6 @@ package nncp -type ByNice []*LLPInfo +type ByNice []*SPInfo func (a ByNice) Len() int { return len(a) diff --git a/src/cypherpunks.ru/nncp/llp.go b/src/cypherpunks.ru/nncp/sp.go similarity index 69% rename from src/cypherpunks.ru/nncp/llp.go rename to src/cypherpunks.ru/nncp/sp.go index 211c9a9..f5775f9 100644 --- a/src/cypherpunks.ru/nncp/llp.go +++ b/src/cypherpunks.ru/nncp/sp.go @@ -36,7 +36,7 @@ import ( ) const ( - MaxLLPSize = 2<<15 - 256 + MaxSPSize = 2<<15 - 256 PartSuffix = ".part" DeadlineDuration = 10 ) @@ -44,11 +44,11 @@ const ( var ( MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'L', 1, 0, 0} - LLPHeadOverhead int - LLPInfoOverhead int - LLPFreqOverhead int - LLPFileOverhead int - LLPHaltMarshalized []byte + SPHeadOverhead int + SPInfoOverhead int + SPFreqOverhead int + SPFileOverhead int + SPHaltMarshalized []byte NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite( noise.DH25519, @@ -57,84 +57,84 @@ var ( ) ) -type LLPType uint8 +type SPType uint8 const ( - LLPTypeInfo LLPType = iota - LLPTypeFreq LLPType = iota - LLPTypeFile LLPType = iota - LLPTypeDone LLPType = iota - LLPTypeHalt LLPType = iota + SPTypeInfo SPType = iota + SPTypeFreq SPType = iota + SPTypeFile SPType = iota + SPTypeDone SPType = iota + SPTypeHalt SPType = iota ) -type LLPHead struct { - Type LLPType +type SPHead struct { + Type SPType } -type LLPInfo struct { +type SPInfo struct { Nice uint8 Size uint64 Hash *[32]byte } -type LLPFreq struct { +type SPFreq struct { Hash *[32]byte Offset uint64 } -type LLPFile struct { +type SPFile struct { Hash *[32]byte Offset uint64 Payload []byte } -type LLPDone struct { +type SPDone struct { Hash *[32]byte } -type LLPRaw struct { +type SPRaw struct { Magic [8]byte Payload []byte } func init() { var buf bytes.Buffer - llpHead := LLPHead{Type: LLPTypeHalt} - if _, err := xdr.Marshal(&buf, llpHead); err != nil { + spHead := SPHead{Type: SPTypeHalt} + if _, err := xdr.Marshal(&buf, spHead); err != nil { panic(err) } - copy(LLPHaltMarshalized, buf.Bytes()) - LLPHeadOverhead = buf.Len() + copy(SPHaltMarshalized, buf.Bytes()) + SPHeadOverhead = buf.Len() buf.Reset() - llpInfo := LLPInfo{Nice: 123, Size: 123, Hash: new([32]byte)} - if _, err := xdr.Marshal(&buf, llpInfo); err != nil { + spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)} + if _, err := xdr.Marshal(&buf, spInfo); err != nil { panic(err) } - LLPInfoOverhead = buf.Len() + SPInfoOverhead = buf.Len() buf.Reset() - llpFreq := LLPFreq{Hash: new([32]byte), Offset: 123} - if _, err := xdr.Marshal(&buf, llpFreq); err != nil { + spFreq := SPFreq{Hash: new([32]byte), Offset: 123} + if _, err := xdr.Marshal(&buf, spFreq); err != nil { panic(err) } - LLPFreqOverhead = buf.Len() + SPFreqOverhead = buf.Len() buf.Reset() - llpFile := LLPFile{Hash: new([32]byte), Offset: 123} - if _, err := xdr.Marshal(&buf, llpFile); err != nil { + spFile := SPFile{Hash: new([32]byte), Offset: 123} + if _, err := xdr.Marshal(&buf, spFile); err != nil { panic(err) } - LLPFileOverhead = buf.Len() + SPFileOverhead = buf.Len() } -func MarshalLLP(typ LLPType, llp interface{}) []byte { +func MarshalSP(typ SPType, sp interface{}) []byte { var buf bytes.Buffer var err error - if _, err = xdr.Marshal(&buf, LLPHead{typ}); err != nil { + if _, err = xdr.Marshal(&buf, SPHead{typ}); err != nil { panic(err) } - if _, err = xdr.Marshal(&buf, llp); err != nil { + if _, err = xdr.Marshal(&buf, sp); err != nil { panic(err) } return buf.Bytes() @@ -142,12 +142,12 @@ func MarshalLLP(typ LLPType, llp interface{}) []byte { func payloadsSplit(payloads [][]byte) [][]byte { var outbounds [][]byte - outbound := make([]byte, 0, MaxLLPSize) + outbound := make([]byte, 0, MaxSPSize) for i, payload := range payloads { outbound = append(outbound, payload...) - if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxLLPSize { + if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize { outbounds = append(outbounds, outbound) - outbound = make([]byte, 0, MaxLLPSize) + outbound = make([]byte, 0, MaxSPSize) } } if len(outbound) > 0 { @@ -156,7 +156,7 @@ func payloadsSplit(payloads [][]byte) [][]byte { return outbounds } -type LLPState struct { +type SPState struct { ctx *Ctx NodeId *NodeId nice uint8 @@ -164,8 +164,8 @@ type LLPState struct { csOur *noise.CipherState csTheir *noise.CipherState payloads chan []byte - infosTheir map[[32]byte]*LLPInfo - queueTheir []*LLPFreq + infosTheir map[[32]byte]*SPInfo + queueTheir []*SPFreq wg sync.WaitGroup RxBytes int64 RxLastSeen time.Time @@ -181,18 +181,18 @@ type LLPState struct { sync.RWMutex } -func (state *LLPState) isDead() bool { +func (state *SPState) isDead() bool { now := time.Now() return now.Sub(state.RxLastSeen).Seconds() >= DeadlineDuration && now.Sub(state.TxLastSeen).Seconds() >= DeadlineDuration } -func (state *LLPState) dirUnlock() { +func (state *SPState) dirUnlock() { state.ctx.UnlockDir(state.rxLock) state.ctx.UnlockDir(state.txLock) } -func (state *LLPState) WriteLLP(dst io.Writer, payload []byte) error { - n, err := xdr.Marshal(dst, LLPRaw{Magic: MagicNNCPLv1, Payload: payload}) +func (state *SPState) WriteSP(dst io.Writer, payload []byte) error { + n, err := xdr.Marshal(dst, SPRaw{Magic: MagicNNCPLv1, Payload: payload}) if err == nil { state.TxLastSeen = time.Now() state.TxBytes += int64(n) @@ -200,22 +200,22 @@ func (state *LLPState) WriteLLP(dst io.Writer, payload []byte) error { return err } -func (state *LLPState) ReadLLP(src io.Reader) ([]byte, error) { - var llp LLPRaw - n, err := xdr.Unmarshal(src, &llp) +func (state *SPState) ReadSP(src io.Reader) ([]byte, error) { + var sp SPRaw + n, err := xdr.Unmarshal(src, &sp) if err != nil { return nil, err } state.RxLastSeen = time.Now() state.RxBytes += int64(n) - if llp.Magic != MagicNNCPLv1 { + if sp.Magic != MagicNNCPLv1 { return nil, BadMagic } - return llp.Payload, nil + return sp.Payload, nil } func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte { - var infos []*LLPInfo + var infos []*SPInfo var totalSize int64 for job := range ctx.Jobs(nodeId, TTx) { job.Fd.Close() @@ -223,7 +223,7 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte { continue } totalSize += job.Size - infos = append(infos, &LLPInfo{ + infos = append(infos, &SPInfo{ Nice: job.PktEnc.Nice, Size: uint64(job.Size), Hash: job.HshValue, @@ -232,14 +232,14 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte { sort.Sort(ByNice(infos)) var payloads [][]byte for _, info := range infos { - payloads = append(payloads, MarshalLLP(LLPTypeInfo, info)) - ctx.LogD("llp-info-our", SDS{ + payloads = append(payloads, MarshalSP(SPTypeInfo, info)) + ctx.LogD("sp-info-our", SDS{ "node": nodeId, "name": ToBase32(info.Hash[:]), "size": strconv.FormatInt(int64(info.Size), 10), }, "") } - ctx.LogI("llp-infos", SDS{ + ctx.LogI("sp-infos", SDS{ "xx": string(TTx), "node": nodeId, "pkts": strconv.Itoa(len(payloads)), @@ -248,7 +248,7 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte { return payloadsSplit(payloads) } -func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) (*LLPState, error) { +func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) (*SPState, error) { err := ctx.ensureRxDir(nodeId) if err != nil { return nil, err @@ -278,13 +278,13 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) }, PeerStatic: ctx.Neigh[*nodeId].NoisePub[:], } - state := LLPState{ + state := SPState{ ctx: ctx, hs: noise.NewHandshakeState(conf), NodeId: nodeId, nice: nice, payloads: make(chan []byte), - infosTheir: make(map[[32]byte]*LLPInfo), + infosTheir: make(map[[32]byte]*SPInfo), started: started, rxLock: rxLock, txLock: txLock, @@ -300,45 +300,45 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) firstPayload = infosPayloads[0] } // Pad first payload, to hide actual existing files - for i := 0; i < (MaxLLPSize-len(firstPayload))/LLPHeadOverhead; i++ { - firstPayload = append(firstPayload, LLPHaltMarshalized...) + for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ { + firstPayload = append(firstPayload, SPHaltMarshalized...) } var buf []byte var payload []byte buf, _, _ = state.hs.WriteMessage(nil, firstPayload) sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))} - ctx.LogD("llp-start", sds, "sending first message") + ctx.LogD("sp-start", sds, "sending first message") conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second)) - if err = state.WriteLLP(conn, buf); err != nil { - ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "") + if err = state.WriteSP(conn, buf); err != nil { + ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() return nil, err } - ctx.LogD("llp-start", sds, "waiting for first message") + ctx.LogD("sp-start", sds, "waiting for first message") conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second)) - if buf, err = state.ReadLLP(conn); err != nil { - ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "") + if buf, err = state.ReadSP(conn); err != nil { + ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() return nil, err } payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf) if err != nil { - ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "") + ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() return nil, err } - ctx.LogD("llp-start", sds, "starting workers") + ctx.LogD("sp-start", sds, "starting workers") err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { - ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "") + ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() return nil, err } return &state, err } -func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*LLPState, error) { +func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, error) { started := time.Now() conf := noise.Config{ CipherSuite: NoiseCipherSuite, @@ -349,12 +349,12 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*LLPState, err Public: ctx.Self.NoisePub[:], }, } - state := LLPState{ + state := SPState{ ctx: ctx, hs: noise.NewHandshakeState(conf), nice: nice, payloads: make(chan []byte), - infosTheir: make(map[[32]byte]*LLPInfo), + infosTheir: make(map[[32]byte]*SPInfo), started: started, xxOnly: xxOnly, } @@ -362,17 +362,17 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*LLPState, err var payload []byte var err error ctx.LogD( - "llp-start", + "sp-start", SDS{"nice": strconv.Itoa(int(nice))}, "waiting for first message", ) conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second)) - if buf, err = state.ReadLLP(conn); err != nil { - ctx.LogE("llp-start", SDS{"err": err}, "") + if buf, err = state.ReadSP(conn); err != nil { + ctx.LogE("sp-start", SDS{"err": err}, "") return nil, err } if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil { - ctx.LogE("llp-start", SDS{"err": err}, "") + ctx.LogE("sp-start", SDS{"err": err}, "") return nil, err } @@ -385,7 +385,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*LLPState, err } if nodeId == nil { peerId := ToBase32(state.hs.PeerStatic()) - ctx.LogE("llp-start", SDS{"peer": peerId}, "unknown") + ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown") return nil, errors.New("Unknown peer: " + peerId) } state.NodeId = nodeId @@ -420,19 +420,19 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*LLPState, err firstPayload = infosPayloads[0] } // Pad first payload, to hide actual existing files - for i := 0; i < (MaxLLPSize-len(firstPayload))/LLPHeadOverhead; i++ { - firstPayload = append(firstPayload, LLPHaltMarshalized...) + for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ { + firstPayload = append(firstPayload, SPHaltMarshalized...) } - ctx.LogD("llp-start", sds, "sending first message") + ctx.LogD("sp-start", sds, "sending first message") buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload) conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second)) - if err = state.WriteLLP(conn, buf); err != nil { - ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "") + if err = state.WriteSP(conn, buf); err != nil { + ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() return nil, err } - ctx.LogD("llp-start", sds, "starting workers") + ctx.LogD("sp-start", sds, "starting workers") err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { state.dirUnlock() @@ -441,13 +441,13 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*LLPState, err return &state, err } -func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payload []byte) error { +func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payload []byte) error { sds := SDS{"node": state.NodeId, "nice": strconv.Itoa(int(state.nice))} if len(infosPayloads) > 1 { go func() { for _, payload := range infosPayloads[1:] { state.ctx.LogD( - "llp-work", + "sp-work", SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), "queuing remaining payload", ) @@ -456,19 +456,19 @@ func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, paylo }() } state.ctx.LogD( - "llp-work", + "sp-work", SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), "processing first payload", ) - replies, err := state.ProcessLLP(payload) + replies, err := state.ProcessSP(payload) if err != nil { - state.ctx.LogE("llp-work", SdsAdd(sds, SDS{"err": err}), "") + state.ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "") return err } go func() { for _, reply := range replies { state.ctx.LogD( - "llp-work", + "sp-work", SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}), "queuing reply", ) @@ -486,7 +486,7 @@ func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, paylo select { case payload = <-state.payloads: state.ctx.LogD( - "llp-xmit", + "sp-xmit", SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), "got payload", ) @@ -495,7 +495,7 @@ func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, paylo if payload == nil { state.RLock() if len(state.queueTheir) == 0 { - state.ctx.LogD("llp-xmit", sds, "file queue is empty") + state.ctx.LogD("sp-xmit", sds, "file queue is empty") state.RUnlock() time.Sleep(100 * time.Millisecond) continue @@ -507,7 +507,7 @@ func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, paylo "hash": ToBase32(freq.Hash[:]), "size": strconv.FormatInt(int64(freq.Offset), 10), }) - state.ctx.LogD("llp-file", sdsp, "queueing") + state.ctx.LogD("sp-file", sdsp, "queueing") fd, err := os.Open(filepath.Join( state.ctx.Spool, state.NodeId.String(), @@ -515,37 +515,37 @@ func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, paylo ToBase32(freq.Hash[:]), )) if err != nil { - state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") break } fi, err := fd.Stat() if err != nil { - state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") break } fullSize := uint64(fi.Size()) var buf []byte if freq.Offset < fullSize { - state.ctx.LogD("llp-file", sdsp, "seeking") + state.ctx.LogD("sp-file", sdsp, "seeking") if _, err = fd.Seek(int64(freq.Offset), 0); err != nil { - state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") break } - buf = make([]byte, MaxLLPSize-LLPHeadOverhead-LLPFileOverhead) + buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead) n, err := fd.Read(buf) if err != nil { - state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") break } buf = buf[:n] state.ctx.LogD( - "llp-file", + "sp-file", SdsAdd(sdsp, SDS{"size": strconv.Itoa(n)}), "read", ) } fd.Close() - payload = MarshalLLP(LLPTypeFile, LLPFile{ + payload = MarshalSP(SPTypeFile, SPFile{ Hash: freq.Hash, Offset: freq.Offset, Payload: buf, @@ -553,11 +553,11 @@ func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, paylo ourSize := freq.Offset + uint64(len(buf)) sdsp["size"] = strconv.FormatInt(int64(ourSize), 10) sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10) - state.ctx.LogP("llp-file", sdsp, "") + state.ctx.LogP("sp-file", sdsp, "") state.Lock() if len(state.queueTheir) > 0 && *state.queueTheir[0].Hash == *freq.Hash { if ourSize == fullSize { - state.ctx.LogD("llp-file", sdsp, "finished") + state.ctx.LogD("sp-file", sdsp, "finished") if len(state.queueTheir) > 1 { state.queueTheir = state.queueTheir[1:] } else { @@ -567,18 +567,18 @@ func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, paylo state.queueTheir[0].Offset += uint64(len(buf)) } } else { - state.ctx.LogD("llp-file", sdsp, "queue disappeared") + state.ctx.LogD("sp-file", sdsp, "queue disappeared") } state.Unlock() } state.ctx.LogD( - "llp-xmit", + "sp-xmit", SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), "sending", ) conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second)) - if err := state.WriteLLP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil { - state.ctx.LogE("llp-xmit", SdsAdd(sds, SDS{"err": err}), "") + if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil { + state.ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "") break } } @@ -590,43 +590,43 @@ func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, paylo if state.isDead() { return } - state.ctx.LogD("llp-recv", sds, "waiting for payload") + state.ctx.LogD("sp-recv", sds, "waiting for payload") conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second)) - payload, err := state.ReadLLP(conn) + payload, err := state.ReadSP(conn) if err != nil { unmarshalErr := err.(*xdr.UnmarshalError) netErr, ok := unmarshalErr.Err.(net.Error) if (ok && netErr.Timeout()) || unmarshalErr.ErrorCode == xdr.ErrIO { continue } else { - state.ctx.LogE("llp-recv", SdsAdd(sds, SDS{"err": err}), "") + state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") break } } state.ctx.LogD( - "llp-recv", + "sp-recv", SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), "got payload", ) payload, err = state.csTheir.Decrypt(nil, nil, payload) if err != nil { - state.ctx.LogE("llp-recv", SdsAdd(sds, SDS{"err": err}), "") + state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") break } state.ctx.LogD( - "llp-recv", + "sp-recv", SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), "processing", ) - replies, err := state.ProcessLLP(payload) + replies, err := state.ProcessSP(payload) if err != nil { - state.ctx.LogE("llp-recv", SdsAdd(sds, SDS{"err": err}), "") + state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") break } go func() { for _, reply := range replies { state.ctx.LogD( - "llp-recv", + "sp-recv", SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}), "queuing reply", ) @@ -638,7 +638,7 @@ func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, paylo return nil } -func (state *LLPState) Wait() { +func (state *SPState) Wait() { state.wg.Wait() state.dirUnlock() state.Duration = time.Now().Sub(state.started) @@ -654,27 +654,27 @@ func (state *LLPState) Wait() { } } -func (state *LLPState) ProcessLLP(payload []byte) ([][]byte, error) { +func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { sds := SDS{"node": state.NodeId, "nice": strconv.Itoa(int(state.nice))} r := bytes.NewReader(payload) var err error var replies [][]byte var infosGot bool for r.Len() > 0 { - state.ctx.LogD("llp-process", sds, "unmarshaling header") - var head LLPHead + state.ctx.LogD("sp-process", sds, "unmarshaling header") + var head SPHead if _, err = xdr.Unmarshal(r, &head); err != nil { - state.ctx.LogE("llp-process", SdsAdd(sds, SDS{"err": err}), "") + state.ctx.LogE("sp-process", SdsAdd(sds, SDS{"err": err}), "") return nil, err } switch head.Type { - case LLPTypeInfo: + case SPTypeInfo: infosGot = true sdsp := SdsAdd(sds, SDS{"type": "info"}) - state.ctx.LogD("llp-process", sdsp, "unmarshaling packet") - var info LLPInfo + state.ctx.LogD("sp-process", sdsp, "unmarshaling packet") + var info SPInfo if _, err = xdr.Unmarshal(r, &info); err != nil { - state.ctx.LogE("llp-process", SdsAdd(sdsp, SDS{"err": err}), "") + state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "") return nil, err } sdsp = SdsAdd(sds, SDS{ @@ -682,25 +682,25 @@ func (state *LLPState) ProcessLLP(payload []byte) ([][]byte, error) { "size": strconv.FormatInt(int64(info.Size), 10), }) if info.Nice > state.nice { - state.ctx.LogD("llp-process", sdsp, "too nice") + state.ctx.LogD("sp-process", sdsp, "too nice") continue } - state.ctx.LogD("llp-process", sdsp, "received") + state.ctx.LogD("sp-process", sdsp, "received") if state.xxOnly != nil && *state.xxOnly == TTx { continue } state.Lock() state.infosTheir[*info.Hash] = &info state.Unlock() - state.ctx.LogD("llp-process", sdsp, "stating part") + state.ctx.LogD("sp-process", sdsp, "stating part") if _, err = os.Stat(filepath.Join( state.ctx.Spool, state.NodeId.String(), string(TRx), ToBase32(info.Hash[:]), )); err == nil { - state.ctx.LogD("llp-process", sdsp, "already done") - replies = append(replies, MarshalLLP(LLPTypeDone, LLPDone{info.Hash})) + state.ctx.LogD("sp-process", sdsp, "already done") + replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) continue } fi, err := os.Stat(filepath.Join( @@ -713,24 +713,24 @@ func (state *LLPState) ProcessLLP(payload []byte) ([][]byte, error) { if err == nil { offset = fi.Size() state.ctx.LogD( - "llp-process", + "sp-process", SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}), "part exists", ) } - replies = append(replies, MarshalLLP( - LLPTypeFreq, - LLPFreq{info.Hash, uint64(offset)}, + replies = append(replies, MarshalSP( + SPTypeFreq, + SPFreq{info.Hash, uint64(offset)}, )) - case LLPTypeFile: + case SPTypeFile: state.ctx.LogD( - "llp-process", + "sp-process", SdsAdd(sds, SDS{"type": "file"}), "unmarshaling packet", ) - var file LLPFile + var file SPFile if _, err = xdr.Unmarshal(r, &file); err != nil { - state.ctx.LogE("llp-process", SdsAdd(sds, SDS{ + state.ctx.LogE("sp-process", SdsAdd(sds, SDS{ "err": err, "type": "file", }), "") @@ -747,77 +747,77 @@ func (state *LLPState) ProcessLLP(payload []byte) ([][]byte, error) { string(TRx), ToBase32(file.Hash[:]), ) - state.ctx.LogD("llp-file", sdsp, "opening part") + state.ctx.LogD("sp-file", sdsp, "opening part") fd, err := os.OpenFile( filePath+PartSuffix, os.O_RDWR|os.O_CREATE, os.FileMode(0600), ) if err != nil { - state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") return nil, err } state.ctx.LogD( - "llp-file", + "sp-file", SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}), "seeking", ) if _, err = fd.Seek(int64(file.Offset), 0); err != nil { - state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") fd.Close() return nil, err } - state.ctx.LogD("llp-file", sdsp, "writing") + state.ctx.LogD("sp-file", sdsp, "writing") _, err = fd.Write(file.Payload) if err != nil { - state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") fd.Close() return nil, err } ourSize := uint64(file.Offset) + uint64(len(file.Payload)) sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10) sdsp["size"] = strconv.FormatInt(int64(ourSize), 10) - state.ctx.LogP("llp-file", sdsp, "") + state.ctx.LogP("sp-file", sdsp, "") if state.infosTheir[*file.Hash].Size != ourSize { fd.Close() continue } go func() { if err := fd.Sync(); err != nil { - state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "sync") + state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync") fd.Close() return } state.wg.Add(1) defer state.wg.Done() fd.Seek(0, 0) - state.ctx.LogD("llp-file", sdsp, "checking") + state.ctx.LogD("sp-file", sdsp, "checking") gut, err := Check(fd, file.Hash[:]) fd.Close() if err != nil || !gut { - state.ctx.LogE("llp-file", sdsp, "checksum mismatch") + state.ctx.LogE("sp-file", sdsp, "checksum mismatch") return } - state.ctx.LogI("llp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "") + state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "") os.Rename(filePath+PartSuffix, filePath) - state.payloads <- MarshalLLP(LLPTypeDone, LLPDone{file.Hash}) + state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) }() - case LLPTypeDone: + case SPTypeDone: state.ctx.LogD( - "llp-process", + "sp-process", SdsAdd(sds, SDS{"type": "done"}), "unmarshaling packet", ) - var done LLPDone + var done SPDone if _, err = xdr.Unmarshal(r, &done); err != nil { - state.ctx.LogE("llp-process", SdsAdd(sds, SDS{ + state.ctx.LogE("sp-process", SdsAdd(sds, SDS{ "type": "done", "err": err, }), "") return nil, err } sdsp := SdsAdd(sds, SDS{"hash": ToBase32(done.Hash[:])}) - state.ctx.LogD("llp-done", sdsp, "removing") + state.ctx.LogD("sp-done", sdsp, "removing") err := os.Remove(filepath.Join( state.ctx.Spool, state.NodeId.String(), @@ -825,34 +825,34 @@ func (state *LLPState) ProcessLLP(payload []byte) ([][]byte, error) { ToBase32(done.Hash[:]), )) if err == nil { - state.ctx.LogI("llp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "") + state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "") } else { - state.ctx.LogE("llp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "") + state.ctx.LogE("sp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "") } - case LLPTypeFreq: + case SPTypeFreq: sdsp := SdsAdd(sds, SDS{"type": "freq"}) - state.ctx.LogD("llp-process", sdsp, "unmarshaling packet") - var freq LLPFreq + state.ctx.LogD("sp-process", sdsp, "unmarshaling packet") + var freq SPFreq if _, err = xdr.Unmarshal(r, &freq); err != nil { - state.ctx.LogE("llp-process", SdsAdd(sdsp, SDS{"err": err}), "") + state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "") return nil, err } - state.ctx.LogD("llp-process", SdsAdd(sdsp, SDS{ + state.ctx.LogD("sp-process", SdsAdd(sdsp, SDS{ "hash": ToBase32(freq.Hash[:]), "offset": strconv.FormatInt(int64(freq.Offset), 10), }), "queueing") state.Lock() state.queueTheir = append(state.queueTheir, &freq) state.Unlock() - case LLPTypeHalt: + case SPTypeHalt: sdsp := SdsAdd(sds, SDS{"type": "halt"}) - state.ctx.LogD("llp-process", sdsp, "") + state.ctx.LogD("sp-process", sdsp, "") state.Lock() state.queueTheir = nil state.Unlock() default: state.ctx.LogE( - "llp-process", + "sp-process", SdsAdd(sds, SDS{"type": head.Type}), "unknown", ) @@ -866,7 +866,7 @@ func (state *LLPState) ProcessLLP(payload []byte) ([][]byte, error) { pkts++ size += info.Size } - state.ctx.LogI("llp-infos", SDS{ + state.ctx.LogI("sp-infos", SDS{ "xx": string(TRx), "node": state.NodeId, "pkts": strconv.Itoa(pkts),