"os"
"reflect"
"sync"
+ "time"
)
// Import
chans map[int]*netChan
errors chan os.Error
maxId int
+ mu sync.Mutex // protects remaining fields
+ unacked int64 // number of unacknowledged sends.
+ seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu
}
// NewImporter creates a new Importer object to import a set of channels
imp.chans = make(map[int]*netChan)
imp.names = make(map[string]*netChan)
imp.errors = make(chan os.Error, 10)
+ imp.unacked = 0
go imp.run()
return imp
}
for {
*hdr = header{}
if e := imp.decode(hdrValue); e != nil {
- impLog("header:", e)
- imp.shutdown()
+ if e != os.EOF {
+ impLog("header:", e)
+ imp.shutdown()
+ }
return
}
switch hdr.PayloadType {
nch := imp.getChan(hdr.Id, true)
if nch != nil {
nch.acked()
+ imp.mu.Lock()
+ imp.unacked--
+ imp.mu.Unlock()
}
continue
default:
}
return
}
+ // We hold the lock during transmission to guarantee messages are
+ // sent in order.
+ imp.mu.Lock()
+ imp.unacked++
+ imp.seqLock.Lock()
+ imp.mu.Unlock()
if err = imp.encode(hdr, payData, val.Interface()); err != nil {
impLog("error encoding client send:", err)
return
}
+ imp.seqLock.Unlock()
}
}()
}
nc.close()
return nil
}
+
+func (imp *Importer) unackedCount() int64 {
+ imp.mu.Lock()
+ n := imp.unacked
+ imp.mu.Unlock()
+ return n
+}
+
+// Drain waits until all messages sent from this exporter/importer, including
+// those not yet sent to any server and possibly including those sent while
+// Drain was executing, have been received by the exporter. In short, it
+// waits until all the importer's messages have been received.
+// If the timeout (measured in nanoseconds) is positive and Drain takes
+// longer than that to complete, an error is returned.
+func (imp *Importer) Drain(timeout int64) os.Error {
+ startTime := time.Nanoseconds()
+ for imp.unackedCount() > 0 {
+ if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
+ return os.ErrorString("timeout")
+ }
+ time.Sleep(100 * 1e6)
+ }
+ return nil
+}
<-done
}
+// Not a great test but it does at least invoke Drain.
+func TestImportDrain(t *testing.T) {
+ exp, imp := pair(t)
+ expDone := make(chan bool)
+ go exportReceive(exp, t, expDone)
+ <-expDone
+ importSend(imp, closeCount, t, nil)
+ imp.Drain(0)
+}
+
// Not a great test but it does at least invoke Sync.
func TestExportSync(t *testing.T) {
exp, imp := pair(t)