From: Russ Cox Date: Tue, 17 Feb 2009 00:32:30 +0000 (-0800) Subject: io.Pipe X-Git-Tag: weekly.2009-11-06~2169 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=78906c38367d16b5c189163d6f6d60b2e99370f3;p=gostls13.git io.Pipe assorted underscore cleanup R=r DELTA=488 (410 added, 3 deleted, 75 changed) OCL=25070 CL=25070 --- diff --git a/src/lib/Makefile b/src/lib/Makefile index 93aa95f00d..148a6dc557 100644 --- a/src/lib/Makefile +++ b/src/lib/Makefile @@ -104,7 +104,7 @@ testing.6: flag.install fmt.dirinstall fmt.dirinstall: io.dirinstall reflect.dirinstall strconv.dirinstall hash.dirinstall: os.dirinstall http.dirinstall: bufio.install io.dirinstall net.dirinstall os.dirinstall strings.install log.install -io.dirinstall: os.dirinstall syscall.dirinstall +io.dirinstall: os.dirinstall syscall.dirinstall sync.dirinstall json.dirinstall: container/array.dirinstall fmt.dirinstall io.dirinstall math.dirinstall \ strconv.dirinstall strings.install utf8.install # TODO(rsc): net is not supposed to depend on fmt or strings or strconv diff --git a/src/lib/io/Makefile b/src/lib/io/Makefile index 39640d164d..4861fd5d68 100644 --- a/src/lib/io/Makefile +++ b/src/lib/io/Makefile @@ -32,19 +32,27 @@ coverage: packages $(AS) $*.s O1=\ - io.$O\ bytebuffer.$O\ + io.$O\ -io.a: a1 +O2=\ + pipe.$O\ + +io.a: a1 a2 a1: $(O1) - $(AR) grc io.a io.$O bytebuffer.$O + $(AR) grc io.a bytebuffer.$O io.$O rm -f $(O1) +a2: $(O2) + $(AR) grc io.a pipe.$O + rm -f $(O2) + newpkg: clean $(AR) grc io.a $(O1): newpkg +$(O2): a1 nuke: clean rm -f $(GOROOT)/pkg/io.a diff --git a/src/lib/io/io.go b/src/lib/io/io.go index cda7a9bbbe..5f5966d2a2 100644 --- a/src/lib/io/io.go +++ b/src/lib/io/io.go @@ -19,18 +19,28 @@ type Write interface { Write(p []byte) (n int, err *os.Error); } +type Close interface { + Close() *os.Error; +} + type ReadWrite interface { Read(p []byte) (n int, err *os.Error); Write(p []byte) (n int, err *os.Error); } -type ReadWriteClose interface { +type ReadClose interface { Read(p []byte) (n int, err *os.Error); + Close() *os.Error; +} + +type WriteClose interface { Write(p []byte) (n int, err *os.Error); Close() *os.Error; } -type Close interface { +type ReadWriteClose interface { + Read(p []byte) (n int, err *os.Error); + Write(p []byte) (n int, err *os.Error); Close() *os.Error; } @@ -69,21 +79,21 @@ func Readn(fd Read, buf []byte) (n int, err *os.Error) { // Convert something that implements Read into something // whose Reads are always Readn -type _FullRead struct { +type fullRead struct { fd Read; } -func (fd *_FullRead) Read(p []byte) (n int, err *os.Error) { +func (fd *fullRead) Read(p []byte) (n int, err *os.Error) { n, err = Readn(fd.fd, p); return n, err } -func Make_FullReader(fd Read) Read { - if fr, ok := fd.(*_FullRead); ok { - // already a _FullRead +func MakeFullReader(fd Read) Read { + if fr, ok := fd.(*fullRead); ok { + // already a fullRead return fd } - return &_FullRead(fd) + return &fullRead(fd) } // Copies n bytes (or until EOF is reached) from src to dst. diff --git a/src/lib/io/pipe.go b/src/lib/io/pipe.go new file mode 100644 index 0000000000..736f49798c --- /dev/null +++ b/src/lib/io/pipe.go @@ -0,0 +1,188 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Pipe adapter to connect code expecting an io.Read +// with code expecting an io.Write. + +package io + +import ( + "io"; + "os"; + "sync"; +) + +type pipeReturn struct { + n int; + err *os.Error; +} + +// Shared pipe structure. +type pipe struct { + rclosed bool; // Read end closed? + wclosed bool; // Write end closed? + wpend []byte; // Written data waiting to be read. + wtot int; // Bytes consumed so far in current write. + cr chan []byte; // Write sends data here... + cw chan pipeReturn; // ... and reads the n, err back from here. +} + +func (p *pipe) Read(data []byte) (n int, err *os.Error) { + if p == nil || p.rclosed { + return 0, os.EINVAL; + } + + // Wait for next write block if necessary. + if p.wpend == nil { + if !p.wclosed { + p.wpend = <-p.cr; + } + if p.wpend == nil { + return 0, nil; + } + p.wtot = 0; + } + + // Read from current write block. + n = len(data); + if n > len(p.wpend) { + n = len(p.wpend); + } + for i := 0; i < n; i++ { + data[i] = p.wpend[i]; + } + p.wtot += n; + p.wpend = p.wpend[n:len(p.wpend)]; + + // If write block is done, finish the write. + if len(p.wpend) == 0 { + p.wpend = nil; + p.cw <- pipeReturn(p.wtot, nil); + p.wtot = 0; + } + + return n, nil; +} + +func (p *pipe) Write(data []byte) (n int, err *os.Error) { + if p == nil || p.wclosed { + return 0, os.EINVAL; + } + if p.rclosed { + return 0, os.EPIPE; + } + + // Send data to reader. + p.cr <- data; + + // Wait for reader to finish copying it. + res := <-p.cw; + return res.n, res.err; +} + +func (p *pipe) CloseReader() *os.Error { + if p == nil || p.rclosed { + return os.EINVAL; + } + + // Stop any future writes. + p.rclosed = true; + + // Stop the current write. + if !p.wclosed { + p.cw <- pipeReturn(p.wtot, os.EPIPE); + } + + return nil; +} + +func (p *pipe) CloseWriter() *os.Error { + if p == nil || p.wclosed { + return os.EINVAL; + } + + // Stop any future reads. + p.wclosed = true; + + // Stop the current read. + if !p.rclosed { + p.cr <- nil; + } + + return nil; +} + +// Read/write halves of the pipe. +// They are separate structures for two reasons: +// 1. If one end becomes garbage without being Closed, +// its finisher can Close so that the other end +// does not hang indefinitely. +// 2. Clients cannot use interface conversions on the +// read end to find the Write method, and vice versa. + +// Read half of pipe. +type pipeRead struct { + lock sync.Mutex; + p *pipe; +} + +func (r *pipeRead) Read(data []byte) (n int, err *os.Error) { + r.lock.Lock(); + defer r.lock.Unlock(); + + return r.p.Read(data); +} + +func (r *pipeRead) Close() *os.Error { + r.lock.Lock(); + defer r.lock.Unlock(); + + return r.p.CloseReader(); +} + +func (r *pipeRead) finish() { + r.Close(); +} + +// Write half of pipe. +type pipeWrite struct { + lock sync.Mutex; + p *pipe; +} + +func (w *pipeWrite) Write(data []byte) (n int, err *os.Error) { + w.lock.Lock(); + defer w.lock.Unlock(); + + return w.p.Write(data); +} + +func (w *pipeWrite) Close() *os.Error { + w.lock.Lock(); + defer w.lock.Unlock(); + + return w.p.CloseWriter(); +} + +func (w *pipeWrite) finish() { + w.Close(); +} + +// Create a synchronous in-memory pipe. +// Reads on one end are matched by writes on the other. +// Writes don't complete until all the data has been +// written or the read end is closed. Reads return +// any available data or block until the next write +// or the write end is closed. +func Pipe() (io.ReadClose, io.WriteClose) { + p := new(pipe); + p.cr = make(chan []byte, 1); + p.cw = make(chan pipeReturn, 1); + r := new(pipeRead); + r.p = p; + w := new(pipeWrite); + w.p = p; + return r, w; +} + diff --git a/src/lib/io/pipe_test.go b/src/lib/io/pipe_test.go new file mode 100644 index 0000000000..c187702f2b --- /dev/null +++ b/src/lib/io/pipe_test.go @@ -0,0 +1,203 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package io + +import ( + "io"; + "os"; + "testing"; + "time"; +) + +func checkWrite(t *testing.T, w io.Write, data []byte, c chan int) { + n, err := w.Write(data); + if err != nil { + t.Errorf("write: %v", err); + } + if n != len(data) { + t.Errorf("short write: %d != %d", n, len(data)); + } + c <- 0; +} + +// Test a single read/write pair. +func TestPipe1(t *testing.T) { + c := make(chan int); + r, w := io.Pipe(); + var buf [64]byte; + go checkWrite(t, w, io.StringBytes("hello, world"), c); + n, err := r.Read(buf); + if err != nil { + t.Errorf("read: %v", err); + } + else if n != 12 || string(buf[0:12]) != "hello, world" { + t.Errorf("bad read: got %q", buf[0:n]); + } + <-c; + r.Close(); + w.Close(); +} + +func reader(t *testing.T, r io.Read, c chan int) { + var buf [64]byte; + for { + n, err := r.Read(buf); + if err != nil { + t.Errorf("read: %v", err); + } + c <- n; + if n == 0 { + break; + } + } +} + +// Test a sequence of read/write pairs. +func TestPipe2(t *testing.T) { + c := make(chan int); + r, w := io.Pipe(); + go reader(t, r, c); + var buf [64]byte; + for i := 0; i < 5; i++ { + p := buf[0:5+i*10]; + n, err := w.Write(p); + if n != len(p) { + t.Errorf("wrote %d, got %d", len(p), n); + } + if err != nil { + t.Errorf("write: %v", err); + } + nn := <-c; + if nn != n { + t.Errorf("wrote %d, read got %d", n, nn); + } + } + w.Close(); + nn := <-c; + if nn != 0 { + t.Errorf("final read got %d", nn); + } +} + +// Test a large write that requires multiple reads to satisfy. +func writer(w io.WriteClose, buf []byte, c chan pipeReturn) { + n, err := w.Write(buf); + w.Close(); + c <- pipeReturn(n, err); +} + +func TestPipe3(t *testing.T) { + c := make(chan pipeReturn); + r, w := io.Pipe(); + var wdat [128]byte; + for i := 0; i < len(wdat); i++ { + wdat[i] = byte(i); + } + go writer(w, wdat, c); + var rdat [1024]byte; + tot := 0; + for n := 1; n <= 256; n *= 2 { + nn, err := r.Read(rdat[tot:tot+n]); + if err != nil { + t.Fatalf("read: %v", err); + } + + // only final two reads should be short - 1 byte, then 0 + expect := n; + if n == 128 { + expect = 1; + } else if n == 256 { + expect = 0; + } + if nn != expect { + t.Fatalf("read %d, expected %d, got %d", n, expect, nn); + } + tot += nn; + } + pr := <-c; + if pr.n != 128 || pr.err != nil { + t.Fatalf("write 128: %d, %v", pr.n, pr.err); + } + if tot != 128 { + t.Fatalf("total read %d != 128", tot); + } + for i := 0; i < 128; i++ { + if rdat[i] != byte(i) { + t.Fatalf("rdat[%d] = %d", i, rdat[i]); + } + } +} + +// Test read after/before writer close. + +func delayClose(t *testing.T, cl io.Close, ch chan int) { + time.Sleep(1000*1000); // 1 ms + if err := cl.Close(); err != nil { + t.Errorf("delayClose: %v", err); + } + ch <- 0; +} + +func testPipeReadClose(t *testing.T, async bool) { + c := make(chan int, 1); + r, w := io.Pipe(); + if async { + go delayClose(t, w, c); + } else { + delayClose(t, w, c); + } + var buf [64]int; + n, err := r.Read(buf); + <-c; + if err != nil { + t.Errorf("read from closed pipe: %v", err); + } + if n != 0 { + t.Errorf("read on closed pipe returned %d", n); + } + if err = r.Close(); err != nil { + t.Errorf("r.Close: %v", err); + } +} + +// Test write after/before reader close. + +func testPipeWriteClose(t *testing.T, async bool) { + c := make(chan int, 1); + r, w := io.Pipe(); + if async { + go delayClose(t, r, c); + } else { + delayClose(t, r, c); + } + n, err := io.WriteString(w, "hello, world"); + <-c; + if err != os.EPIPE { + t.Errorf("write on closed pipe: %v", err); + } + if n != 0 { + t.Errorf("write on closed pipe returned %d", n); + } + if err = w.Close(); err != nil { + t.Errorf("w.Close: %v", err); + } +} + +func TestPipeReadCloseAsync(t *testing.T) { + testPipeReadClose(t, true); +} + +func TestPipeReadCloseSync(t *testing.T) { + testPipeReadClose(t, false); +} + +func TestPipeWriteCloseAsync(t *testing.T) { + testPipeWriteClose(t, true); +} + +func TestPipeWriteCloseSync(t *testing.T) { + testPipeWriteClose(t, false); +} + diff --git a/src/lib/time/Makefile b/src/lib/time/Makefile index 9f42265a86..dab4bbfe90 100644 --- a/src/lib/time/Makefile +++ b/src/lib/time/Makefile @@ -33,6 +33,7 @@ coverage: packages O1=\ zoneinfo.$O\ + sleep.$O\ O2=\ time.$O\ @@ -43,7 +44,7 @@ O3=\ time.a: a1 a2 a3 a1: $(O1) - $(AR) grc time.a zoneinfo.$O + $(AR) grc time.a zoneinfo.$O sleep.$O rm -f $(O1) a2: $(O2) diff --git a/src/lib/time/tick.go b/src/lib/time/tick.go index 814e2bac1f..cb76b7cd4a 100644 --- a/src/lib/time/tick.go +++ b/src/lib/time/tick.go @@ -42,8 +42,7 @@ func ticker(ns int64, c chan int64) { when += ns } - syscall.Nstotimeval(when - now, &tv); - syscall.Syscall6(syscall.SYS_SELECT, 0, 0, 0, 0, int64(uintptr(unsafe.Pointer(&tv))), 0); + time.Sleep(when - now); now = time.Nanoseconds(); c <- now; } diff --git a/src/lib/time/time.go b/src/lib/time/time.go index 205a314c22..abc1b56438 100644 --- a/src/lib/time/time.go +++ b/src/lib/time/time.go @@ -9,7 +9,7 @@ import ( "time" ) -// Seconds since January 1, 1970 00:00:00 GMT +// Seconds since January 1, 1970 00:00:00 UTC func Seconds() int64 { sec, nsec, err := os.Time(); if err != nil { @@ -18,7 +18,7 @@ func Seconds() int64 { return sec } -// Nanoseconds since January 1, 1970 00:00:00 GMT +// Nanoseconds since January 1, 1970 00:00:00 UTC func Nanoseconds() int64 { sec, nsec, err := os.Time(); if err != nil { @@ -61,24 +61,24 @@ func months(year int64) []int { } const ( - _SecondsPerDay = 24*60*60; + secondsPerDay = 24*60*60; - _DaysPer400Years = 365*400+97; - _DaysPer100Years = 365*100+24; - _DaysPer4Years = 365*4+1; + daysPer400Years = 365*400+97; + daysPer100Years = 365*100+24; + daysPer4Years = 365*4+1; - _Days1970To2001 = 31*365+8; + days1970To2001 = 31*365+8; ) func SecondsToUTC(sec int64) *Time { t := new(Time); // Split into time and day. - day := sec/_SecondsPerDay; - sec -= day*_SecondsPerDay; + day := sec/secondsPerDay; + sec -= day*secondsPerDay; if sec < 0 { day--; - sec += _SecondsPerDay + sec += secondsPerDay } // Time @@ -95,30 +95,30 @@ func SecondsToUTC(sec int64) *Time { // Change day from 0 = 1970 to 0 = 2001, // to make leap year calculations easier // (2001 begins 4-, 100-, and 400-year cycles ending in a leap year.) - day -= _Days1970To2001; + day -= days1970To2001; year := int64(2001); if day < 0 { // Go back enough 400 year cycles to make day positive. - n := -day/_DaysPer400Years + 1; + n := -day/daysPer400Years + 1; year -= 400*n; - day += _DaysPer400Years*n; + day += daysPer400Years*n; } else { // Cut off 400 year cycles. - n := day/_DaysPer400Years; + n := day/daysPer400Years; year += 400*n; - day -= _DaysPer400Years*n; + day -= daysPer400Years*n; } // Cut off 100-year cycles - n := day/_DaysPer100Years; + n := day/daysPer100Years; year += 100*n; - day -= _DaysPer100Years*n; + day -= daysPer100Years*n; // Cut off 4-year cycles - n = day/_DaysPer4Years; + n = day/daysPer4Years; year += 4*n; - day -= _DaysPer4Years*n; + day -= daysPer4Years*n; // Cut off non-leap years. n = day/365; @@ -147,7 +147,6 @@ func UTC() *Time { return SecondsToUTC(Seconds()) } -// TODO: Should this return an error? func SecondsToLocalTime(sec int64) *Time { z, offset, err := time.LookupTimezone(sec); if err != nil { @@ -176,23 +175,23 @@ func (t *Time) Seconds() int64 { if year < 2001 { n := (2001 - year)/400 + 1; year += 400*n; - day -= _DaysPer400Years*n; + day -= daysPer400Years*n; } // Add in days from 400-year cycles. n := (year - 2001) / 400; year -= 400*n; - day += _DaysPer400Years*n; + day += daysPer400Years*n; // Add in 100-year cycles. n = (year - 2001) / 100; year -= 100*n; - day += _DaysPer100Years*n; + day += daysPer100Years*n; // Add in 4-year cycles. n = (year - 2001) / 4; year -= 4*n; - day += _DaysPer4Years*n; + day += daysPer4Years*n; // Add in non-leap years. n = year - 2001; @@ -206,7 +205,7 @@ func (t *Time) Seconds() int64 { day += int64(t.Day - 1); // Convert days to seconds since January 1, 2001. - sec := day * _SecondsPerDay; + sec := day * secondsPerDay; // Add in time elapsed today. sec += int64(t.Hour) * 3600; @@ -214,14 +213,14 @@ func (t *Time) Seconds() int64 { sec += int64(t.Second); // Convert from seconds since 2001 to seconds since 1970. - sec += _Days1970To2001 * _SecondsPerDay; + sec += days1970To2001 * secondsPerDay; // Account for local time zone. sec -= int64(t.ZoneOffset); return sec } -var _LongDayNames = []string( +var longDayNames = []string( "Sunday", "Monday", "Tuesday", @@ -231,7 +230,7 @@ var _LongDayNames = []string( "Saturday" ) -var _ShortDayNames = []string( +var shortDayNames = []string( "Sun", "Mon", "Tue", @@ -241,7 +240,7 @@ var _ShortDayNames = []string( "Sat" ) -var _ShortMonthNames = []string( +var shortMonthNames = []string( "Jan", "Feb", "Mar", @@ -256,13 +255,13 @@ var _ShortMonthNames = []string( "Dec" ) -func _Copy(dst []byte, s string) { +func copy(dst []byte, s string) { for i := 0; i < len(s); i++ { dst[i] = s[i] } } -func _Decimal(dst []byte, n int) { +func decimal(dst []byte, n int) { if n < 0 { n = 0 } @@ -272,15 +271,15 @@ func _Decimal(dst []byte, n int) { } } -func _AddString(buf []byte, bp int, s string) int { +func addString(buf []byte, bp int, s string) int { n := len(s); - _Copy(buf[bp:bp+n], s); + copy(buf[bp:bp+n], s); return bp+n } // Just enough of strftime to implement the date formats below. // Not exported. -func _Format(t *Time, fmt string) string { +func format(t *Time, fmt string) string { buf := make([]byte, 128); bp := 0; @@ -289,39 +288,39 @@ func _Format(t *Time, fmt string) string { i++; switch fmt[i] { case 'A': // %A full weekday name - bp = _AddString(buf, bp, _LongDayNames[t.Weekday]); + bp = addString(buf, bp, longDayNames[t.Weekday]); case 'a': // %a abbreviated weekday name - bp = _AddString(buf, bp, _ShortDayNames[t.Weekday]); + bp = addString(buf, bp, shortDayNames[t.Weekday]); case 'b': // %b abbreviated month name - bp = _AddString(buf, bp, _ShortMonthNames[t.Month-1]); + bp = addString(buf, bp, shortMonthNames[t.Month-1]); case 'd': // %d day of month (01-31) - _Decimal(buf[bp:bp+2], t.Day); + decimal(buf[bp:bp+2], t.Day); bp += 2; case 'e': // %e day of month ( 1-31) if t.Day >= 10 { - _Decimal(buf[bp:bp+2], t.Day) + decimal(buf[bp:bp+2], t.Day) } else { buf[bp] = ' '; buf[bp+1] = byte(t.Day + '0') } bp += 2; case 'H': // %H hour 00-23 - _Decimal(buf[bp:bp+2], t.Hour); + decimal(buf[bp:bp+2], t.Hour); bp += 2; case 'M': // %M minute 00-59 - _Decimal(buf[bp:bp+2], t.Minute); + decimal(buf[bp:bp+2], t.Minute); bp += 2; case 'S': // %S second 00-59 - _Decimal(buf[bp:bp+2], t.Second); + decimal(buf[bp:bp+2], t.Second); bp += 2; case 'Y': // %Y year 2008 - _Decimal(buf[bp:bp+4], int(t.Year)); + decimal(buf[bp:bp+4], int(t.Year)); bp += 4; case 'y': // %y year 08 - _Decimal(buf[bp:bp+2], int(t.Year%100)); + decimal(buf[bp:bp+2], int(t.Year%100)); bp += 2; case 'Z': - bp = _AddString(buf, bp, t.Zone); + bp = addString(buf, bp, t.Zone); default: buf[bp] = '%'; buf[bp+1] = fmt[i]; @@ -337,21 +336,21 @@ func _Format(t *Time, fmt string) string { // ANSI C asctime: Sun Nov 6 08:49:37 1994 func (t *Time) Asctime() string { - return _Format(t, "%a %b %e %H:%M:%S %Y") + return format(t, "%a %b %e %H:%M:%S %Y") } -// RFC 850: Sunday, 06-Nov-94 08:49:37 GMT +// RFC 850: Sunday, 06-Nov-94 08:49:37 UTC func (t *Time) RFC850() string { - return _Format(t, "%A, %d-%b-%y %H:%M:%S %Z") + return format(t, "%A, %d-%b-%y %H:%M:%S %Z") } -// RFC 1123: Sun, 06 Nov 1994 08:49:37 GMT +// RFC 1123: Sun, 06 Nov 1994 08:49:37 UTC func (t *Time) RFC1123() string { - return _Format(t, "%a, %d %b %Y %H:%M:%S %Z") + return format(t, "%a, %d %b %Y %H:%M:%S %Z") } -// date(1) - Sun Nov 6 08:49:37 GMT 1994 +// date(1) - Sun Nov 6 08:49:37 UTC 1994 func (t *Time) String() string { - return _Format(t, "%a %b %e %H:%M:%S %Z %Y") + return format(t, "%a %b %e %H:%M:%S %Z %Y") } diff --git a/src/run.bash b/src/run.bash index 250b9efbdb..2c5636cfa4 100755 --- a/src/run.bash +++ b/src/run.bash @@ -26,6 +26,7 @@ maketest() { maketest \ lib/fmt\ lib/hash\ + lib/io\ lib/json\ lib/math\ lib/net\