总结下实现思路:
创新互联长期为超过千家客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为资源企业提供专业的成都网站设计、成都网站建设,资源网站改版等技术服务。拥有10多年丰富建站经验和众多成功案例,为您定制开发。
package pipeline import ( "encoding/binary" "fmt" "io" "math/rand" "sort" "time" ) var startTime time.Time func Init() { startTime = time.Now() } //内部处理方法 //这里是排序:异步处理容器元素排序 func InMemSort(in <-chan int) <-chan int { out := make(chan int, 1024) go func() { a := []int{} for v := range in { a = append(a, v) } fmt.Println("Read done:", time.Since(startTime)) sort.Ints(a) fmt.Println("InMemSort done:", time.Since(startTime)) for _, v := range a { out <- v } close(out) }() return out } //两路和并,每路通过内部方法异步处理 //这里是排序:in1,in2元素需要排好序(经过内部方法InMemSort异步处理)的容器单元(channel 异步容器/队列) func Merge(in1, in2 <-chan int) <-chan int { out := make(chan int, 1024) // go func() { // v1, ok1 := <-in1 // v2, ok2 := <-in2 // for { // if ok1 || ok2 { // if !ok2 || (ok1 && v1 <= v2) { //v2无值或v1值比v2大 // out <- v1 // v1, ok1 = <-in1 // } else { // out <- v2 // v2, ok2 = <-in2 // } // } else { // close(out) // break // } // } // }() go func() { v1, ok1 := <-in1 v2, ok2 := <-in2 for ok1 || ok2 { if !ok2 || (ok1 && v1 <= v2) { //v2无值或v1值比v2大 out <- v1 v1, ok1 = <-in1 } else { out <- v2 v2, ok2 = <-in2 } } close(out) fmt.Println("Merge done:", time.Since(startTime)) }() return out } //读取原数据 //chunkSize=-1全读 func ReadSource(r io.Reader, chunkSize int) <-chan int { out := make(chan int, 1024) go func() { buffer := make([]byte, 8) //int长度根据操作系统来的,64位为int64,64位8个字节 bytesRead := 0 for { //持续读取 n, err := r.Read(buffer) //读取一个int 8byte bytesRead += n if n > 0 { out <- int(binary.BigEndian.Uint64(buffer)) //字节数组转int } if err != nil || (chunkSize != -1 && bytesRead >= chunkSize) { //-1全读 break } } close(out) }() return out } //写处理后(排序)数据 func WriteSink(w io.Writer, in <-chan int) { for v := range in { buffer := make([]byte, 8) binary.BigEndian.PutUint64(buffer, uint64(v)) w.Write(buffer) } } //随机生成数据源 func RandomSource(count int) <-chan int { out := make(chan int) go func() { for i := 0; i < count; i++ { out <- rand.Int() } close(out) }() return out } //多路两两归并,每路通过内部方法异步处理 //这里是排序:ins元素需要排好序(经过内部方法InMemSort异步处理)的容器单元(channel 异步容器/队列) func MergeN(ins ...<-chan int) <-chan int { if len(ins) == 1 { return ins[0] } m := len(ins) / 2 return Merge( MergeN(ins[:m]...), MergeN(ins[m:]...)) //chennel异步并发归并 }