Channels are a powerful feature in Go for synchronizing and communicating between goroutines. Mastering advanced channel patterns is essential for building robust, concurrent applications. This section explores sophisticated channel usage, including patterns for coordination, communication, and resource management.
Channel Types
Channel Operations
ch <- value
to send, value := <- ch
to receive.close(ch)
to indicate no more values will be sent on the channel.Fan-Out
gofunc worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(time.Second) // Simulate work
results <- job * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
for j := 1; j <= 9; j++ {
jobs <- j
}
close(jobs)
for a := 1; a <= 9; a++ {
fmt.Println(<-results)
}
}
Fan-In
gofunc producer(id int, ch chan<- int) {
for i := 0; i < 5; i++ {
ch <- id*10 + i
time.Sleep(time.Millisecond * 100) // Simulate work
}
}
func main() {
ch := make(chan int)
done := make(chan bool)
go func() {
for i := 0; i < 15; i++ {
fmt.Println(<-ch)
}
done <- true
}()
for p := 1; p <= 3; p++ {
go producer(p, ch)
}
<-done
}
Basic Select Usage
select
to wait on multiple channel operations.gofunc main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(time.Second)
ch1 <- "Message from ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "Message from ch2"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
}
}
}
Timeouts and Default Case
select
with time.After
to handle timeouts.default
case to avoid blocking.gofunc main() {
ch := make(chan string)
go func() {
time.Sleep(2 * time.Second)
ch <- "Hello, World!"
}()
select {
case msg := <-ch:
fmt.Println(msg)
case <-time.After(1 * time.Second):
fmt.Println("Timeout!")
}
}
Handling Multiple Channels
select
to handle multiple channels and prioritize messages.gofunc main() {
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
for i := 0; i < 5; i++ {
ch1 <- i
time.Sleep(time.Millisecond * 100)
}
close(ch1)
}()
go func() {
for i := 0; i < 5; i++ {
ch2 <- i * 10
time.Sleep(time.Millisecond * 150)
}
close(ch2)
}()
for ch1 != nil || ch2 != nil {
select {
case val, ok := <-ch1:
if ok {
fmt.Println("From ch1:", val)
} else {
ch1 = nil
}
case val, ok := <-ch2:
if ok {
fmt.Println("From ch2:", val)
} else {
ch2 = nil
}
}
}
}
Directional Channels
gofunc sendData(ch chan<- int) {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
}
func receiveData(ch <-chan int) {
for val := range ch {
fmt.Println("Received:", val)
}
}
func main() {
ch := make(chan int)
go sendData(ch)
receiveData(ch)
}
Channel Composition
gofunc merge(ch1, ch2 <-chan int, out chan<- int) {
for ch1 != nil || ch2 != nil {
select {
case val, ok := <-ch1:
if ok {
out <- val
} else {
ch1 = nil
}
case val, ok := <-ch2:
if ok {
out <- val
} else {
ch2 = nil
}
}
}
close(out)
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
out := make(chan int)
go func() {
for i := 0; i < 5; i++ {
ch1 <- i
time.Sleep(time.Millisecond * 100)
}
close(ch1)
}()
go func() {
for i := 0; i < 5; i++ {
ch2 <- i * 10
time.Sleep(time.Millisecond * 150)
}
close(ch2)
}()
go merge(ch1, ch2, out)
for val := range out {
fmt.Println("Merged:", val)
}
}
Using Context for Cancellation
context
package to manage cancellation and timeouts for goroutines.gofunc worker(ctx context.Context, id int, ch chan<- int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d exiting\n", id)
return
case ch <- id:
time.Sleep(100 * time.Millisecond)
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
ch := make(chan int)
for i := 1; i <= 3; i++ {
go worker(ctx, i, ch)
}
for val := range ch {
fmt.Println("Received:", val)
if ctx.Err() != nil {
break
}
}
}
Context and Select
context
with select
to handle multiple cancellation scenarios.gofunc main() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(500 * time.Millisecond)
ch1 <- "From ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "From ch2"
}()
for {
select {
case msg := <-ch1:
fmt.Println(msg)
case msg := <-ch2:
fmt.Println(msg)
case <-ctx.Done():
fmt.Println("Timeout or cancellation")
return
}
}
}
Error Channels
gofunc worker(id int, errCh chan<- error) {
if id == 2 {
errCh <- fmt.Errorf("worker %d encountered an error", id)
return
}
fmt.Printf("Worker %d completed successfully\n", id)
}
func main() {
errCh := make(chan error)
for i := 1; i <= 3; i++ {
go worker(i, errCh)
}
for i := 1; i <= 3; i++ {
if err := <-errCh; err != nil {
fmt.Println("Error:", err)
}
}
}
Context with Error Handling
gofunc worker(ctx context.Context, id int, errCh chan<- error) {
select {
case <-ctx.Done():
errCh <- ctx.Err()
return
case <-time.After(time.Duration(id) * 500 * time.Millisecond):
if id == 2 {
errCh <- fmt.Errorf("worker %d encountered an error", id)
return
}
fmt.Printf("Worker %d completed successfully\n", id)
errCh <- nil
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
errCh := make(chan error, 3)
for i := 1; i <= 3; i++ {
go worker(ctx, i, errCh)
}
for i := 1; i <= 3; i++ {
if err := <-errCh; err != nil {
fmt.Println("Error:", err)
}
}
}
By mastering these advanced channel patterns, you can build sophisticated, high-performance concurrent systems in Go. These patterns enable you to handle complex coordination, improve communication efficiency, and manage resources effectively in your Go applications.