]> Cypherpunks repositories - gostls13.git/commitdiff
sort: implement stable sorting
authorVolker Dobler <dr.volker.dobler@gmail.com>
Tue, 2 Jul 2013 01:20:33 +0000 (21:20 -0400)
committerRuss Cox <rsc@golang.org>
Tue, 2 Jul 2013 01:20:33 +0000 (21:20 -0400)
This CL provides stable in-place sorting by use of
bottom up merge sort with in-place merging done by
the SymMerge algorithm from P.-S. Kim and A. Kutzner.

The additional space needed for stable sorting (in the form of
stack space) is logarithmic in the inputs size n.
Number of calls to Less and Swap grow like O(n * log n) and
O(n * log n * log n):
Stable sorting random data uses significantly more calls
to Swap than the unstable quicksort implementation (5 times more
on n=100, 10 times more on n=1e4 and 23 times more on n=1e8).
The number of calls to Less is practically the same for Sort and
Stable.

Stable sorting 1 million random integers takes 5 times longer
than using Sort.

BenchmarkSortString1K      50000       328662 ns/op
BenchmarkStableString1K    50000       380231 ns/op  1.15 slower
BenchmarkSortInt1K         50000       157336 ns/op
BenchmarkStableInt1K       50000       191167 ns/op  1.22 slower
BenchmarkSortInt64K         1000     14466297 ns/op
BenchmarkStableInt64K        500     16190266 ns/op  1.12 slower

BenchmarkSort1e2          200000        64923 ns/op
BenchmarkStable1e2         50000       167128 ns/op  2.57 slower
BenchmarkSort1e4            1000     14540613 ns/op
BenchmarkStable1e4           100     58117289 ns/op  4.00 slower
BenchmarkSort1e6               5   2429631508 ns/op
BenchmarkStable1e6             1  12077036952 ns/op  4.97 slower

R=golang-dev, bradfitz, iant, 0xjnml, rsc
CC=golang-dev
https://golang.org/cl/9612044

src/pkg/sort/sort.go
src/pkg/sort/sort_test.go

index d3092e8019171d11d76df5d198b87b8739766f99..edef06ff363c9b53bd4cc45850dbb055f48edc0f 100644 (file)
@@ -283,3 +283,192 @@ func Float64sAreSorted(a []float64) bool { return IsSorted(Float64Slice(a)) }
 
 // StringsAreSorted tests whether a slice of strings is sorted in increasing order.
 func StringsAreSorted(a []string) bool { return IsSorted(StringSlice(a)) }
+
+// Notes on stable sorting:
+// The used algorithms are simple and provable correct on all input and use
+// only logarithmic additional stack space.  They perform well if compared
+// experimentaly to other stable in-place sorting algorithms.
+//
+// Remarks on other algoritms evaluated:
+//  - GCC's 4.6.3 stable_sort with merge_without_buffer from libstdc++:
+//    Not faster.
+//  - GCC's __rotate for block rotations: Not faster.
+//  - "Practical in-place mergesort" from  Jyrki Katajainen, Tomi A. Pasanen
+//    and Jukka Teuhola; Nordic Journal of Computing 3,1 (1996), 27-40:
+//    The given algorithms are in-place, number of Swap and Assignments
+//    grow as n log n but the algorithm is not stable.
+//  - "Fast Stable In-Plcae Sorting with O(n) Data Moves" J.I. Munro and
+//    V. Raman in Algorithmica (1996) 16, 115-160:
+//    This algorithm either needs additional 2n bits or works only if there
+//    are enough different elements available to encode some permutations
+//    which have to be undone later (so not stable an any input).
+//  - All the optimal in-place sorting/merging algorithms I found are either
+//    unstable or rely on enough different elements in each step to encode the
+//    performed block rearrangements. See also "In-Place Merging Algorithms",
+//    Denham Coates-Evely, Department of Computer Science, Kings College,
+//    January 2004 and the reverences in there.
+//  - Often "optimal" algorithms are optimal in the number of assignments
+//    but Interface has only Swap as operation.
+
+// Stable sorts data while keeping the original order of equal elements.
+//
+// It makes one call to data.Len to determine n, O(n*log(n)) calls to
+// data.Less and O(n*log(n)*log(n)) calls to data.Swap.
+func Stable(data Interface) {
+       n := data.Len()
+       blockSize := 20
+       a, b := 0, blockSize
+       for b <= n {
+               insertionSort(data, a, b)
+               a = b
+               b += blockSize
+       }
+       insertionSort(data, a, n)
+
+       for blockSize < n {
+               a, b = 0, 2*blockSize
+               for b <= n {
+                       symMerge(data, a, a+blockSize, b)
+                       a = b
+                       b += 2 * blockSize
+               }
+               symMerge(data, a, a+blockSize, n)
+               blockSize *= 2
+       }
+}
+
+// SymMerge merges the two sorted subsequences data[a:m] and data[m:b] using
+// the SymMerge algorithm from Pok-Son Kim and Arne Kutzner, "Stable Minimum
+// Storage Merging by Symmetric Comparisons", in Susanne Albers and Tomasz
+// Radzik, editors, Algorithms - ESA 2004, volume 3221 of Lecture Notes in
+// Computer Science, pages 714-723. Springer, 2004.
+//
+// Let M = m-a and N = b-n. Wolog M < N.
+// The recursion depth is bound by ceil(log(N+M)).
+// The algorithm needs O(M*log(N/M + 1)) calls to data.Less.
+// The algorithm needs O((M+N)*log(M)) calls to data.Swap.
+//
+// The paper gives O((M+N)*log(M)) as the number of assignments assuming a
+// rotation algorithm wich uses O(M+N+gcd(M+N)) assignments. The argumentation
+// in the paper carries through for Swap operations, especially as the block
+// swapping rotate uses only O(M+N) Swaps.
+func symMerge(data Interface, a, m, b int) {
+       if a >= m || m >= b {
+               return
+       }
+
+       mid := a + (b-a)/2
+       n := mid + m
+       start := 0
+       if m > mid {
+               start = n - b
+               r, p := mid, n-1
+               for start < r {
+                       c := start + (r-start)/2
+                       if !data.Less(p-c, c) {
+                               start = c + 1
+                       } else {
+                               r = c
+                       }
+               }
+       } else {
+               start = a
+               r, p := m, n-1
+               for start < r {
+                       c := start + (r-start)/2
+                       if !data.Less(p-c, c) {
+                               start = c + 1
+                       } else {
+                               r = c
+                       }
+               }
+       }
+       end := n - start
+       rotate(data, start, m, end)
+       symMerge(data, a, start, mid)
+       symMerge(data, mid, end, b)
+}
+
+// Rotate two consecutives blocks u = data[a:m] and v = data[m:b] in data:
+// Data of the form 'x u v y' is changed to 'x v u y'.
+// Rotate performs at most b-a many calls to data.Swap.
+func rotate(data Interface, a, m, b int) {
+       i := m - a
+       if i == 0 {
+               return
+       }
+       j := b - m
+       if j == 0 {
+               return
+       }
+
+       if i == j {
+               swapRange(data, a, m, i)
+               return
+       }
+
+       p := a + i
+       for i != j {
+               if i > j {
+                       swapRange(data, p-i, p, j)
+                       i -= j
+               } else {
+                       swapRange(data, p-i, p+j-i, i)
+                       j -= i
+               }
+       }
+       swapRange(data, p-i, p, i)
+}
+
+/*
+Complexity of Stable Sorting
+
+
+Complexity of block swapping rotation
+
+Each Swap puts one new element into its correct, final position.
+Elements which reach their final position are no longer moved.
+Thus block swapping rotation needs |u|+|v| calls to Swaps.
+This is best possible as each element might need a move.
+
+Pay attention when comparing to other optimal algorithms which
+typically count the number of assignments instead of swaps:
+E.g. the optimal algorithm of Dudzinski and Dydek for in-place
+rotations uses O(u + v + gcd(u,v)) assignments which is
+better than our O(3 * (u+v)) as gcd(u,v) <= u.
+
+
+Stable sorting by SymMerge and BlockSwap rotations
+
+SymMerg complexity for same size input M = N:
+Calls to Less:  O(M*log(N/M+1)) = O(N*log(2)) = O(N)
+Calls to Swap:  O((M+N)*log(M)) = O(2*N*log(N)) = O(N*log(N))
+
+(The following argument does not fuzz over a missing -1 or
+other stuff which does not impact the final result).
+
+Let n = data.Len(). Assume n = 2^k.
+
+Plain merge sort performs log(n) = k iterations.
+On iteration i the algorithm merges 2^(k-i) blocks, each of size 2^i.
+
+Thus iteration i of merge sort performs:
+Calls to Less  O(2^(k-i) * 2^i) = O(2^k) = O(2^log(n)) = O(n)
+Calls to Swap  O(2^(k-i) * 2^i * log(2^i)) = O(2^k * i) = O(n*i)
+
+In total k = log(n) iterations are performed; so in total:
+Calls to Less O(log(n) * n)
+Calls to Swap O(n + 2*n + 3*n + ... + (k-1)*n + k*n)
+   = O((k/2) * k * n) = O(n * k^2) = O(n * log^2(n))
+
+
+Above results should generalize to arbitrary n = 2^k + p
+and should not be influenced by the initial insertion sort phase:
+Insertion sort is O(n^2) on Swap and Less, thus O(bs^2) per block of
+size bs at n/bs blocks:  O(bs*n) Swaps and Less during insertion sort.
+Merge sort iterations start at i = log(bs). With t = log(bs) constant:
+Calls to Less O((log(n)-t) * n + bs*n) = O(log(n)*n + (bs-t)*n)
+   = O(n * log(n))
+Calls to Swap O(n * log^2(n) - (t^2+t)/2*n) = O(n * log^2(n))
+
+*/
index 5daf8482b9c271581f62948f7a1d2ff6b41f4bb3..2dd65c44366a40b11cd6b041de8cc6b9be2a61ee 100644 (file)
@@ -122,6 +122,19 @@ func BenchmarkSortString1K(b *testing.B) {
        }
 }
 
+func BenchmarkStableString1K(b *testing.B) {
+       b.StopTimer()
+       for i := 0; i < b.N; i++ {
+               data := make([]string, 1<<10)
+               for i := 0; i < len(data); i++ {
+                       data[i] = strconv.Itoa(i ^ 0x2cc)
+               }
+               b.StartTimer()
+               Stable(StringSlice(data))
+               b.StopTimer()
+       }
+}
+
 func BenchmarkSortInt1K(b *testing.B) {
        b.StopTimer()
        for i := 0; i < b.N; i++ {
@@ -135,6 +148,19 @@ func BenchmarkSortInt1K(b *testing.B) {
        }
 }
 
+func BenchmarkStableInt1K(b *testing.B) {
+       b.StopTimer()
+       for i := 0; i < b.N; i++ {
+               data := make([]int, 1<<10)
+               for i := 0; i < len(data); i++ {
+                       data[i] = i ^ 0x2cc
+               }
+               b.StartTimer()
+               Stable(IntSlice(data))
+               b.StopTimer()
+       }
+}
+
 func BenchmarkSortInt64K(b *testing.B) {
        b.StopTimer()
        for i := 0; i < b.N; i++ {
@@ -148,6 +174,19 @@ func BenchmarkSortInt64K(b *testing.B) {
        }
 }
 
+func BenchmarkStableInt64K(b *testing.B) {
+       b.StopTimer()
+       for i := 0; i < b.N; i++ {
+               data := make([]int, 1<<16)
+               for i := 0; i < len(data); i++ {
+                       data[i] = i ^ 0xcccc
+               }
+               b.StartTimer()
+               Stable(IntSlice(data))
+               b.StopTimer()
+       }
+}
+
 const (
        _Sawtooth = iota
        _Rand
@@ -204,7 +243,7 @@ func lg(n int) int {
        return i
 }
 
-func testBentleyMcIlroy(t *testing.T, sort func(Interface)) {
+func testBentleyMcIlroy(t *testing.T, sort func(Interface), maxswap func(int) int) {
        sizes := []int{100, 1023, 1024, 1025}
        if testing.Short() {
                sizes = []int{100, 127, 128, 129}
@@ -278,7 +317,7 @@ func testBentleyMcIlroy(t *testing.T, sort func(Interface)) {
                                        }
 
                                        desc := fmt.Sprintf("n=%d m=%d dist=%s mode=%s", n, m, dists[dist], modes[mode])
-                                       d := &testingData{desc: desc, t: t, data: mdata[0:n], maxswap: n * lg(n) * 12 / 10}
+                                       d := &testingData{desc: desc, t: t, data: mdata[0:n], maxswap: maxswap(n)}
                                        sort(d)
                                        // Uncomment if you are trying to improve the number of compares/swaps.
                                        //t.Logf("%s: ncmp=%d, nswp=%d", desc, d.ncmp, d.nswap)
@@ -303,11 +342,15 @@ func testBentleyMcIlroy(t *testing.T, sort func(Interface)) {
 }
 
 func TestSortBM(t *testing.T) {
-       testBentleyMcIlroy(t, Sort)
+       testBentleyMcIlroy(t, Sort, func(n int) int { return n * lg(n) * 12 / 10 })
 }
 
 func TestHeapsortBM(t *testing.T) {
-       testBentleyMcIlroy(t, Heapsort)
+       testBentleyMcIlroy(t, Heapsort, func(n int) int { return n * lg(n) * 12 / 10 })
+}
+
+func TestStableBM(t *testing.T) {
+       testBentleyMcIlroy(t, Stable, func(n int) int { return n * lg(n) * lg(n) })
 }
 
 // This is based on the "antiquicksort" implementation by M. Douglas McIlroy.
@@ -357,3 +400,148 @@ func TestAdversary(t *testing.T) {
        d := &adversaryTestingData{data, make(map[int]int), 0}
        Sort(d) // This should degenerate to heapsort.
 }
+
+func TestStableInts(t *testing.T) {
+       data := ints
+       Stable(IntSlice(data[0:]))
+       if !IntsAreSorted(data[0:]) {
+               t.Errorf("nsorted %v\n   got %v", ints, data)
+       }
+}
+
+type intPairs []struct {
+       a, b int
+}
+
+// IntPairs compare on a only.
+func (d intPairs) Len() int           { return len(d) }
+func (d intPairs) Less(i, j int) bool { return d[i].a < d[j].a }
+func (d intPairs) Swap(i, j int)      { d[i], d[j] = d[j], d[i] }
+
+// Record initial order in B.
+func (d intPairs) initB() {
+       for i := range d {
+               d[i].b = i
+       }
+}
+
+// InOrder checks if a-equal elements were not reordered.
+func (d intPairs) inOrder() bool {
+       lastA, lastB := -1, 0
+       for i := 0; i < len(d); i++ {
+               if lastA != d[i].a {
+                       lastA = d[i].a
+                       lastB = d[i].b
+                       continue
+               }
+               if d[i].b <= lastB {
+                       return false
+               }
+               lastB = d[i].b
+       }
+       return true
+}
+
+func TestStability(t *testing.T) {
+       n, m := 100000, 1000
+       if testing.Short() {
+               n, m = 1000, 100
+       }
+       data := make(intPairs, n)
+
+       // random distribution
+       for i := 0; i < len(data); i++ {
+               data[i].a = rand.Intn(m)
+       }
+       if IsSorted(data) {
+               t.Fatalf("terrible rand.rand")
+       }
+       data.initB()
+       Stable(data)
+       if !IsSorted(data) {
+               t.Errorf("Stable didn't sort %d ints", n)
+       }
+       if !data.inOrder() {
+               t.Errorf("Stable wasn't stable on %d ints", n)
+       }
+
+       // already sorted
+       data.initB()
+       Stable(data)
+       if !IsSorted(data) {
+               t.Errorf("Stable shuffeled sorted %d ints (order)", n)
+       }
+       if !data.inOrder() {
+               t.Errorf("Stable shuffeled sorted %d ints (stability)", n)
+       }
+
+       // sorted reversed
+       for i := 0; i < len(data); i++ {
+               data[i].a = len(data) - i
+       }
+       data.initB()
+       Stable(data)
+       if !IsSorted(data) {
+               t.Errorf("Stable didn't sort %d ints", n)
+       }
+       if !data.inOrder() {
+               t.Errorf("Stable wasn't stable on %d ints", n)
+       }
+}
+
+var countOpsSizes = []int{1e2, 3e2, 1e3, 3e3, 1e4, 3e4, 1e5, 3e5, 1e6}
+
+func countOps(t *testing.T, algo func(Interface), name string) {
+       sizes := countOpsSizes
+       if testing.Short() {
+               sizes = sizes[:5]
+       }
+       if !testing.Verbose() {
+               t.Skip("Counting skipped as non-verbose mode.")
+       }
+       for _, n := range sizes {
+               td := testingData{
+                       desc:    name,
+                       t:       t,
+                       data:    make([]int, n),
+                       maxswap: 1 << 31,
+               }
+               for i := 0; i < n; i++ {
+                       td.data[i] = rand.Intn(n / 5)
+               }
+               algo(&td)
+               t.Logf("%s %8d elements: %11d Swap, %10d Less", name, n, td.nswap, td.ncmp)
+       }
+}
+
+func TestCountStableOps(t *testing.T) { countOps(t, Stable, "Stable") }
+func TestCountSortOps(t *testing.T)   { countOps(t, Sort, "Sort  ") }
+
+func bench(b *testing.B, size int, algo func(Interface), name string) {
+       b.StopTimer()
+       data := make(intPairs, size)
+       for i := 0; i < b.N; i++ {
+               for n := size - 3; n <= size+3; n++ {
+                       for i := 0; i < len(data); i++ {
+                               data[i].a = rand.Intn(n / 5)
+                       }
+                       data.initB()
+                       b.StartTimer()
+                       algo(data)
+                       b.StopTimer()
+                       if !IsSorted(data) {
+                               b.Errorf("%s did not sort %d ints", name, n)
+                       }
+                       if name == "Stable" && !data.inOrder() {
+                               b.Errorf("%s unstable on %d ints", name, n)
+                       }
+               }
+       }
+}
+
+func BenchmarkSort1e2(b *testing.B)   { bench(b, 1e2, Sort, "Sort") }
+func BenchmarkStable1e2(b *testing.B) { bench(b, 1e2, Stable, "Stable") }
+func BenchmarkSort1e4(b *testing.B)   { bench(b, 1e4, Sort, "Sort") }
+func BenchmarkStable1e4(b *testing.B) { bench(b, 1e4, Stable, "Stable") }
+func BenchmarkSort1e6(b *testing.B)   { bench(b, 1e6, Sort, "Sort") }
+func BenchmarkStable1e6(b *testing.B) { bench(b, 1e6, Stable, "Stable") }