channel.go (4542B)
1 package iter 2 3 import "log" 4 5 // A result from a iterator.Go() channel. 6 type ChannelItem struct { 7 Value interface{} 8 Error error 9 } 10 11 // Runs the iterator in a goroutine, sending to a channel. 12 // 13 // The returned items channel will send ChannelItems to you, which can indicate 14 // either a value or an error. If item.Error is set, iteration is ready 15 // to terminate. Otherwise item.Value is the next value of the iteration. 16 // 17 // When you are done iterating, you must close the done channel. Even if iteration 18 // was terminated early, this will ensure that the goroutine and channel are 19 // properly cleaned up. 20 // 21 // channel, done := iter.Go(iterator) 22 // defer close(done) // if early return or panic happens, this will clean up the goroutine 23 // for item := range channel { 24 // if item.Error != nil { 25 // // Iteration failed; handle the error and exit the loop 26 // ... 27 // } 28 // value := item.Value 29 // ... 30 // } 31 func (iterator Iterator) Go() (items <-chan ChannelItem, done chan<- bool) { 32 itemsChannel := make(chan ChannelItem) 33 doneChannel := make(chan bool) 34 go iterator.IterateToChannel(itemsChannel, doneChannel) 35 return itemsChannel, doneChannel 36 } 37 38 // Iterates all items and sends them to the given channel. Runs on the current 39 // goroutine (call go iterator.IterateToChannel to set it up on a new goroutine). 40 // This will close the items channel when done. If the done channel is closed, 41 // iteration will terminate. 42 func (iterator Iterator) IterateToChannel(items chan<- ChannelItem, done <-chan bool) { 43 defer close(items) 44 err := iterator.EachWithError(func(result interface{}) error { 45 select { 46 case items <- ChannelItem{Value: result}: 47 return nil 48 case _, _ = <-done: 49 // If we are told we're done early, we finish quietly. 50 return FINISHED 51 } 52 }) 53 if err != nil { 54 items <- ChannelItem{Error: err} 55 } 56 } 57 58 // Iterate over the channels from a Go(), calling a user-defined function for each value. 59 // This function handles all anomalous conditions including errors, early 60 // termination and safe cleanup of the goroutine and channels. 61 func EachFromChannel(items <-chan ChannelItem, done chan<- bool, processor func(interface{}) error) error { 62 defer close(done) // if early return or panic happens, this will clean up the goroutine 63 for item := range items { 64 if item.Error != nil { 65 return item.Error 66 } 67 err := processor(item.Value) 68 if err != nil { 69 return err 70 } 71 } 72 return nil 73 } 74 75 // Perform the iteration in the background concurrently with the Each() statement. 76 // Useful when the iterator or iteratee will be doing blocking work. 77 // 78 // The bufferSize parameter lets you set how far ahead the background goroutine can 79 // get. 80 // 81 // iterator.BackgroundEach(100, func(item interface{}) { ... }) 82 func (iterator Iterator) BackgroundEach(bufferSize int, processor func(interface{}) error) error { 83 itemsChannel := make(chan ChannelItem, bufferSize) 84 doneChannel := make(chan bool) 85 go iterator.IterateToChannel(itemsChannel, doneChannel) 86 return EachFromChannel(itemsChannel, doneChannel, processor) 87 } 88 89 // Iterate to a channel in the background. 90 // 91 // for value := range iter.GoSimple(iterator) { 92 // ... 93 // } 94 // 95 // With this method, two undesirable things can happen: 96 // - if the iteration stops early due to an error, you will not be able to handle 97 // it (the goroutine will log and panic, and the program will exit). 98 // - if callers panic or exit early without retrieving all values from the channel, 99 // the goroutine is blocked forever and leaks. 100 // 101 // The Go() routine allows you to handle both of these issues, at a small cost to 102 // caller complexity. BackgroundEach() provides a simple way to use Go(), as 103 // well. 104 // 105 // That said, if you can make guarantees about no panics or don't care, this 106 // method can make calling code easier to read. 107 func (iterator Iterator) GoSimple() (values <-chan interface{}) { 108 mainChannel := make(chan interface{}) 109 go iterator.IterateToChannelSimple(mainChannel) 110 return mainChannel 111 } 112 113 // Iterates all items and sends them to the given channel. Runs on the current 114 // goroutine (call go iterator.IterateToChannelSimple() to set it up on a new goroutine). 115 // This will close the values channel when done. See warnings about GoSimple() 116 // vs. Go() in the GoSimple() method. 117 func (iterator Iterator) IterateToChannelSimple(values chan<- interface{}) { 118 defer close(values) 119 err := iterator.Each(func(item interface{}) { 120 values <- item 121 }) 122 if err != nil { 123 log.Fatalf("Iterator returned an error in GoSimple: %v", err) 124 } 125 }