]> Cypherpunks repositories - nncp.git/commitdiff
Initial nncp-reass utility
authorSergey Matveev <stargrave@stargrave.org>
Thu, 27 Apr 2017 16:02:50 +0000 (19:02 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Thu, 27 Apr 2017 19:30:38 +0000 (22:30 +0300)
VERSION
common.mk
src/cypherpunks.ru/nncp/chunked.go [new file with mode: 0644]
src/cypherpunks.ru/nncp/cmd/nncp-file/main.go
src/cypherpunks.ru/nncp/cmd/nncp-reass/main.go [new file with mode: 0644]
src/cypherpunks.ru/nncp/humanizer.go
src/cypherpunks.ru/nncp/tx.go

diff --git a/VERSION b/VERSION
index 5a2a5806df6e909afe3609b5706cb1012913ca0e..eb49d7c7fdcbb1b4745de39837864aa7f78570ac 100644 (file)
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.6
+0.7
index e949d1283c2b482f89b1612a2a07352438fc068f..4f12b1be97b45ae40f76868e87cc0716755f201a 100644 (file)
--- a/common.mk
+++ b/common.mk
@@ -28,6 +28,7 @@ ALL = \
        nncp-mincfg \
        nncp-newcfg \
        nncp-pkt \
+       nncp-reass \
        nncp-rm \
        nncp-stat \
        nncp-toss \
@@ -68,6 +69,9 @@ nncp-newcfg:
 nncp-pkt:
        GOPATH=$(GOPATH) go build -ldflags "$(LDFLAGS)" cypherpunks.ru/nncp/cmd/nncp-pkt
 
+nncp-reass:
+       GOPATH=$(GOPATH) go build -ldflags "$(LDFLAGS)" cypherpunks.ru/nncp/cmd/nncp-reass
+
 nncp-rm:
        GOPATH=$(GOPATH) go build -ldflags "$(LDFLAGS)" cypherpunks.ru/nncp/cmd/nncp-rm
 
diff --git a/src/cypherpunks.ru/nncp/chunked.go b/src/cypherpunks.ru/nncp/chunked.go
new file mode 100644 (file)
index 0000000..abcc6fa
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+NNCP -- Node to Node copy, utilities for store-and-forward data exchange
+Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package nncp
+
+var (
+       MagicNNCPMv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'M', 0, 0, 1}
+
+       ChunkedSuffixMeta = ".nncp.meta"
+       ChunkedSuffixPart = ".nncp.part"
+)
+
+type ChunkedMeta struct {
+       Magic     [8]byte
+       FileSize  uint64
+       ChunkSize uint64
+       Checksums [][32]byte
+}
index 4fafc4834b845dd2965f47b2fb17d54216b6e79d..824cdb0a2a7820308a4e4b404d8276bbc808d906 100644 (file)
@@ -39,13 +39,14 @@ func usage() {
 
 func main() {
        var (
-               cfgPath  = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file")
-               niceRaw  = flag.Int("nice", nncp.DefaultNiceFile, "Outbound packet niceness")
-               minSize  = flag.Uint64("minsize", 0, "Minimal required resulting packet size")
-               quiet    = flag.Bool("quiet", false, "Print only errors")
-               debug    = flag.Bool("debug", false, "Print debug messages")
-               version  = flag.Bool("version", false, "Print version information")
-               warranty = flag.Bool("warranty", false, "Print warranty information")
+               cfgPath   = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file")
+               niceRaw   = flag.Int("nice", nncp.DefaultNiceFile, "Outbound packet niceness")
+               minSize   = flag.Uint64("minsize", 0, "Minimal required resulting packet size")
+               chunkSize = flag.Uint64("chunk", 0, "Split file on specified size chunks, in KiB")
+               quiet     = flag.Bool("quiet", false, "Print only errors")
+               debug     = flag.Bool("debug", false, "Print debug messages")
+               version   = flag.Bool("version", false, "Print version information")
+               warranty  = flag.Bool("warranty", false, "Print warranty information")
        )
        flag.Usage = usage
        flag.Parse()
@@ -90,7 +91,19 @@ func main() {
                log.Fatalln("Invalid NODE specified:", err)
        }
 
-       if err = ctx.TxFile(node, nice, flag.Arg(0), splitted[1], int64(*minSize)); err != nil {
+       if *chunkSize == 0 {
+               err = ctx.TxFile(node, nice, flag.Arg(0), splitted[1], int64(*minSize))
+       } else {
+               err = ctx.TxFileChunked(
+                       node,
+                       nice,
+                       flag.Arg(0),
+                       splitted[1],
+                       int64(*minSize),
+                       int64(*chunkSize) * 1024,
+               )
+       }
+       if err != nil {
                log.Fatalln(err)
        }
 }
diff --git a/src/cypherpunks.ru/nncp/cmd/nncp-reass/main.go b/src/cypherpunks.ru/nncp/cmd/nncp-reass/main.go
new file mode 100644 (file)
index 0000000..1dbe56e
--- /dev/null
@@ -0,0 +1,311 @@
+/*
+NNCP -- Node to Node copy, utilities for store-and-forward data exchange
+Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+// Send file via NNCP
+package main
+
+import (
+       "bufio"
+       "bytes"
+       "flag"
+       "fmt"
+       "hash"
+       "io"
+       "io/ioutil"
+       "log"
+       "os"
+       "path/filepath"
+       "strconv"
+       "strings"
+
+       "cypherpunks.ru/nncp"
+       "github.com/davecgh/go-xdr/xdr2"
+       "golang.org/x/crypto/blake2b"
+)
+
+func usage() {
+       fmt.Fprintf(os.Stderr, nncp.UsageHeader())
+       fmt.Fprintln(os.Stderr, "nncp-reass -- reassemble chunked files\n")
+       fmt.Fprintf(os.Stderr, "Usage: %s [options] [FILE]\nOptions:\n", os.Args[0])
+       flag.PrintDefaults()
+       fmt.Fprint(os.Stderr, `
+Neither FILE, nor -node nor -all can be set simultaneously,
+but at least one of them must be specified.
+`)
+}
+
+func process(ctx *nncp.Ctx, path string, keep, dryRun bool) bool {
+       fd, err := os.Open(path)
+       defer fd.Close()
+       if err != nil {
+               log.Fatalln("Can not open file:", err)
+       }
+       var metaPkt nncp.ChunkedMeta
+       if _, err = xdr.Unmarshal(fd, &metaPkt); err != nil {
+               ctx.LogE("nncp-reass", nncp.SDS{"path": path, "err": err}, "bad meta file")
+               return false
+       }
+       fd.Close()
+       if metaPkt.Magic != nncp.MagicNNCPMv1 {
+               ctx.LogE("nncp-reass", nncp.SDS{"path": path, "err": nncp.BadMagic}, "")
+               return false
+       }
+       metaName := filepath.Base(path)
+       if !strings.HasSuffix(metaName, nncp.ChunkedSuffixMeta) {
+               ctx.LogE("nncp-reass", nncp.SDS{
+                       "path": path,
+                       "err":  "invalid filename suffix",
+               }, "")
+               return false
+       }
+       mainName := strings.TrimSuffix(metaName, nncp.ChunkedSuffixMeta)
+       mainDir := filepath.Dir(path)
+
+       chunksPaths := make([]string, 0, len(metaPkt.Checksums))
+       for i := 0; i < len(metaPkt.Checksums); i++ {
+               chunksPaths = append(
+                       chunksPaths,
+                       filepath.Join(mainDir, mainName+nncp.ChunkedSuffixPart+strconv.Itoa(i)),
+               )
+       }
+
+       allChunksExist := true
+       for chunkNum, chunkPath := range chunksPaths {
+               fi, err := os.Stat(chunkPath)
+               if err != nil && os.IsNotExist(err) {
+                       ctx.LogI("nncp-reass", nncp.SDS{
+                               "path":  path,
+                               "chunk": strconv.Itoa(chunkNum),
+                       }, "missing")
+                       allChunksExist = false
+                       continue
+               }
+               if chunkNum+1 != len(chunksPaths) && uint64(fi.Size()) != metaPkt.ChunkSize {
+                       ctx.LogE("nncp-reass", nncp.SDS{
+                               "path":  path,
+                               "chunk": strconv.Itoa(chunkNum),
+                       }, "invalid size")
+                       allChunksExist = false
+               }
+       }
+       if !allChunksExist {
+               return false
+       }
+
+       var hsh hash.Hash
+       allChecksumsGood := true
+       for chunkNum, chunkPath := range chunksPaths {
+               fd, err = os.Open(chunkPath)
+               if err != nil {
+                       log.Fatalln("Can not open file:", err)
+               }
+               hsh, err = blake2b.New256(nil)
+               if err != nil {
+                       log.Fatalln(err)
+               }
+               if _, err = io.Copy(hsh, bufio.NewReader(fd)); err != nil {
+                       log.Fatalln(err)
+               }
+               fd.Close()
+               if bytes.Compare(hsh.Sum(nil), metaPkt.Checksums[chunkNum][:]) != 0 {
+                       ctx.LogE("nncp-reass", nncp.SDS{
+                               "path":  path,
+                               "chunk": strconv.Itoa(chunkNum),
+                               "err":   "checksum is bad",
+                       }, "")
+                       allChecksumsGood = false
+               }
+       }
+       if !allChecksumsGood {
+               return false
+       }
+       if dryRun {
+               ctx.LogI("nncp-reass", nncp.SDS{"path": path}, "ready")
+               return true
+       }
+
+       tmp, err := ioutil.TempFile(mainDir, "nncp-reass")
+       if err != nil {
+               log.Fatalln(err)
+       }
+       sds := nncp.SDS{"path": path, "tmp": tmp.Name()}
+       ctx.LogD("nncp-reass", sds, "created")
+       tmpW := bufio.NewWriter(tmp)
+
+       hasErrors := false
+       for chunkNum, chunkPath := range chunksPaths {
+               fd, err = os.Open(chunkPath)
+               if err != nil {
+                       log.Fatalln("Can not open file:", err)
+               }
+               if _, err = io.Copy(tmpW, bufio.NewReader(fd)); err != nil {
+                       log.Fatalln(err)
+               }
+               fd.Close()
+               if !keep {
+                       if err = os.Remove(chunkPath); err != nil {
+                               ctx.LogE("nncp-reass", nncp.SdsAdd(sds, nncp.SDS{
+                                       "chunk": strconv.Itoa(chunkNum),
+                                       "err":   err,
+                               }), "")
+                               hasErrors = true
+                       }
+               }
+       }
+       tmpW.Flush()
+       tmp.Sync()
+       tmp.Close()
+       ctx.LogD("nncp-reass", sds, "written")
+       if !keep {
+               if err = os.Remove(path); err != nil {
+                       ctx.LogE("nncp-reass", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "")
+                       hasErrors = true
+               }
+       }
+
+       dstPathOrig := filepath.Join(mainDir, mainName)
+       dstPath := dstPathOrig
+       dstPathCtr := 0
+       for {
+               if _, err = os.Stat(dstPath); err != nil {
+                       if os.IsNotExist(err) {
+                               break
+                       }
+                       log.Fatalln(err)
+               }
+               dstPath = dstPathOrig + strconv.Itoa(dstPathCtr)
+               dstPathCtr++
+       }
+       if err = os.Rename(tmp.Name(), dstPath); err != nil {
+               log.Fatalln(err)
+       }
+       ctx.LogI("nncp-reass", nncp.SDS{"path": path}, "done")
+       return !hasErrors
+}
+
+func findMetas(ctx *nncp.Ctx, dirPath string) []string {
+       dir, err := os.Open(dirPath)
+       defer dir.Close()
+       if err != nil {
+               ctx.LogE("nncp-reass", nncp.SDS{"path": dirPath, "err": err}, "")
+               return nil
+       }
+       fis, err := dir.Readdir(0)
+       dir.Close()
+       if err != nil {
+               ctx.LogE("nncp-reass", nncp.SDS{"path": dirPath, "err": err}, "")
+               return nil
+       }
+       metaPaths := make([]string, 0)
+       for _, fi := range fis {
+               if strings.HasSuffix(fi.Name(), nncp.ChunkedSuffixMeta) {
+                       metaPaths = append(metaPaths, filepath.Join(dirPath, fi.Name()))
+               }
+       }
+       return metaPaths
+}
+
+func main() {
+       var (
+               cfgPath  = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file")
+               allNodes = flag.Bool("all", false, "Process all found chunked files for all nodes")
+               nodeRaw  = flag.String("node", "", "Process all found chunked files for that node")
+               keep     = flag.Bool("keep", false, "Do not remove chunks while assembling")
+               dryRun   = flag.Bool("dryrun", false, "Do not assemble whole file")
+               quiet    = flag.Bool("quiet", false, "Print only errors")
+               debug    = flag.Bool("debug", false, "Print debug messages")
+               version  = flag.Bool("version", false, "Print version information")
+               warranty = flag.Bool("warranty", false, "Print warranty information")
+       )
+       flag.Usage = usage
+       flag.Parse()
+       if *warranty {
+               fmt.Println(nncp.Warranty)
+               return
+       }
+       if *version {
+               fmt.Println(nncp.VersionGet())
+               return
+       }
+
+       cfgRaw, err := ioutil.ReadFile(nncp.CfgPathFromEnv(cfgPath))
+       if err != nil {
+               log.Fatalln("Can not read config:", err)
+       }
+       ctx, err := nncp.CfgParse(cfgRaw)
+       if err != nil {
+               log.Fatalln("Can not parse config:", err)
+       }
+       ctx.Quiet = *quiet
+       ctx.Debug = *debug
+
+       var nodeOnly *nncp.Node
+       if *nodeRaw != "" {
+               nodeOnly, err = ctx.FindNode(*nodeRaw)
+               if err != nil {
+                       log.Fatalln("Invalid -node specified:", err)
+               }
+       }
+
+       if !(*allNodes || nodeOnly != nil || flag.NArg() > 0) {
+               usage()
+               os.Exit(1)
+       }
+       if flag.NArg() > 0 && (*allNodes || nodeOnly != nil) {
+               usage()
+               os.Exit(1)
+       }
+       if *allNodes && nodeOnly != nil {
+               usage()
+               os.Exit(1)
+       }
+
+       if flag.NArg() > 0 {
+               if !process(ctx, flag.Arg(0), *keep, *dryRun) {
+                       os.Exit(1)
+               }
+               return
+       }
+
+       hasErrors := false
+       if nodeOnly == nil {
+               seenMetaPaths := make(map[string]struct{})
+               for _, node := range ctx.Neigh {
+                       if node.Incoming == nil {
+                               continue
+                       }
+                       for _, metaPath := range findMetas(ctx, *node.Incoming) {
+                               if _, seen := seenMetaPaths[metaPath]; seen {
+                                       continue
+                               }
+                               hasErrors = hasErrors || !process(ctx, metaPath, *keep, *dryRun)
+                               seenMetaPaths[metaPath] = struct{}{}
+                       }
+               }
+       } else {
+               if nodeOnly.Incoming == nil {
+                       log.Fatalln("Specified -node does not allow incoming")
+               }
+               for _, metaPath := range findMetas(ctx, *nodeOnly.Incoming) {
+                       hasErrors = hasErrors || !process(ctx, metaPath, *keep, *dryRun)
+               }
+       }
+       if hasErrors {
+               os.Exit(1)
+       }
+}
index 06b33bd278783573e0db04579c46d0a21d062179..546b71fea60ee8e49e49f67da764aaad7fa17571 100644 (file)
@@ -218,6 +218,22 @@ func (ctx *Ctx) Humanize(s string) string {
                default:
                        return s
                }
+       case "nncp-reass":
+               chunkNum, exists := sds["chunk"]
+               if exists {
+                       msg = fmt.Sprintf(
+                               "Reassembling chunked file \"%s\" (chunk %s): %s",
+                               sds["path"],
+                               chunkNum,
+                               rem,
+                       )
+               } else {
+                       msg = fmt.Sprintf(
+                               "Reassembling chunked file \"%s\": %s",
+                               sds["path"],
+                               rem,
+                       )
+               }
        default:
                return s
        }
index 65ad7ec3b58fbcadb1fcce340b7f2bdcf65e144d..a04f5bdb2dadaaf59012b3ab54939d14bfaff95c 100644 (file)
@@ -23,12 +23,14 @@ import (
        "bytes"
        "compress/zlib"
        "errors"
+       "hash"
        "io"
        "os"
        "path/filepath"
        "strconv"
        "strings"
 
+       "github.com/davecgh/go-xdr/xdr2"
        "golang.org/x/crypto/blake2b"
 )
 
@@ -146,6 +148,135 @@ func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize
        return err
 }
 
+func (ctx *Ctx) TxFileChunked(node *Node, nice uint8, srcPath, dstPath string, minSize int64, chunkSize int64) error {
+       if dstPath == "" {
+               dstPath = filepath.Base(srcPath)
+       }
+       dstPath = filepath.Clean(dstPath)
+       if filepath.IsAbs(dstPath) {
+               return errors.New("Relative destination path required")
+       }
+       src, err := os.Open(srcPath)
+       if err != nil {
+               return err
+       }
+       defer src.Close()
+       srcStat, err := src.Stat()
+       if err != nil {
+               return err
+       }
+       srcReader := bufio.NewReader(src)
+       fileSize := srcStat.Size()
+       leftSize := fileSize
+       metaPkt := ChunkedMeta{
+               Magic:     MagicNNCPMv1,
+               FileSize:  uint64(fileSize),
+               ChunkSize: uint64(chunkSize),
+               Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
+       }
+       for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
+               hsh := new([32]byte)
+               metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
+       }
+       var sizeToSend int64
+       var hsh hash.Hash
+       var pkt *Pkt
+       var chunkNum int
+       var path string
+       for {
+               if leftSize <= chunkSize {
+                       sizeToSend = leftSize
+               } else {
+                       sizeToSend = chunkSize
+               }
+               path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
+               pkt, err = NewPkt(PktTypeFile, path)
+               if err != nil {
+                       return err
+               }
+               hsh, err = blake2b.New256(nil)
+               if err != nil {
+                       return err
+               }
+               _, err = ctx.Tx(
+                       node,
+                       pkt,
+                       nice,
+                       sizeToSend,
+                       minSize,
+                       io.TeeReader(srcReader, hsh),
+               )
+               if err == nil {
+                       ctx.LogD("tx", SDS{
+                               "type": "file",
+                               "node": node.Id,
+                               "nice": strconv.Itoa(int(nice)),
+                               "src":  srcPath,
+                               "dst":  path,
+                               "size": strconv.FormatInt(sizeToSend, 10),
+                       }, "sent")
+               } else {
+                       ctx.LogE("tx", SDS{
+                               "type": "file",
+                               "node": node.Id,
+                               "nice": strconv.Itoa(int(nice)),
+                               "src":  srcPath,
+                               "dst":  path,
+                               "size": strconv.FormatInt(sizeToSend, 10),
+                               "err":  err,
+                       }, "sent")
+                       return err
+               }
+               hsh.Sum(metaPkt.Checksums[chunkNum][:0])
+               leftSize -= sizeToSend
+               chunkNum++
+               if leftSize == 0 {
+                       break
+               }
+       }
+       var metaBuf bytes.Buffer
+       _, err = xdr.Marshal(&metaBuf, metaPkt)
+       if err != nil {
+               return err
+       }
+       path = dstPath + ChunkedSuffixMeta
+       pkt, err = NewPkt(PktTypeFile, path)
+       if err != nil {
+               return err
+       }
+       metaPktSize := int64(metaBuf.Len())
+       _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
+       if err == nil {
+               ctx.LogD("tx", SDS{
+                       "type": "file",
+                       "node": node.Id,
+                       "nice": strconv.Itoa(int(nice)),
+                       "src":  srcPath,
+                       "dst":  path,
+                       "size": strconv.FormatInt(metaPktSize, 10),
+               }, "sent")
+               ctx.LogI("tx", SDS{
+                       "type": "file",
+                       "node": node.Id,
+                       "nice": strconv.Itoa(int(nice)),
+                       "src":  srcPath,
+                       "dst":  dstPath,
+                       "size": strconv.FormatInt(fileSize, 10),
+               }, "sent")
+       } else {
+               ctx.LogE("tx", SDS{
+                       "type": "file",
+                       "node": node.Id,
+                       "nice": strconv.Itoa(int(nice)),
+                       "src":  srcPath,
+                       "dst":  path,
+                       "size": strconv.FormatInt(metaPktSize, 10),
+                       "err":  err,
+               }, "sent")
+       }
+       return err
+}
+
 func (ctx *Ctx) TxFreq(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
        dstPath = filepath.Clean(dstPath)
        if filepath.IsAbs(dstPath) {