M
M
Maxim Fedorov2021-05-26 11:48:38
go
Maxim Fedorov, 2021-05-26 11:48:38

How to notify a bunch of goroutines about the completion of work?

20 goroutines of this function are launched.

func (w *Writer) Work(ctx context.Context) {
  // start

  f, err := os.Create("./tmp/" + filename)
  if err != nil {
    panic(err)
  }

  defer f.Close()


  // Запись шапки в файл

  for {
    select {
    case <-ctx.Done():
      // Как  выполнить тут работу в конце работы горутины? сейчас не работает
    case it := <-*w.Channel:
      // Запись элементов в файл
    }
  }

  // Или может тут  выполнить  работу в конце работы горутины ? тоже не работает
}


In the loop, I send messages to each of them (through a channel for each, 20 goroutines means 20 channels), and each writes to its own file

ctx := context.Background()
  ctx, cancel := context.WithCancel(ctx)

defer func() {
    fmt.Println("Finish items loading!") // это сообщеие выводится
    cancel()
  }()

// тут стартанули нашу пачку горутин
 
  for {
    res, err := esScroller.Do(ctx)
    
    for _, hit := range res.Hits.Hits {

    		// ... билд item

    		//  отправляем во все каналы элемент
    		for _, w := range f.Writers {
        		*w.Channel <- item
   		 }
      }
}


When the messages are over - how to complete everything and do useful final work in each of them?

Answer the question

In order to leave comments, you need to log in

1 answer(s)
E
Evgeny Mamonov, 2021-05-26
@Maksclub

Try like this

package main

import (
    "context"
    "fmt"
    "sync"
)

func worker(ctx context.Context, worderID int, data chan int, wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Printf("worker %d started\n", worderID)
    for {
        fmt.Printf("worker %d enter for\n", worderID)
        select {
        case <-ctx.Done():
            fmt.Printf("worker %d cancelled\n", worderID)
            return
        case v, ok := <-data:
            fmt.Printf("worker %d got data: %v, ok: %v\n", worderID, v, ok)
            if !ok {
                fmt.Printf("worker %d data channel was closed\n", worderID)
                return
            }
        }
    }
}

func main() {
    var wg sync.WaitGroup
    ctx, cancel := context.WithCancel(context.Background())
    channels := make([]chan int, 10)

    for i := 0; i < 10; i++ {
        wg.Add(1)
        channels[i] = make(chan int)
        go worker(ctx, i, channels[i], &wg)
    }

    for i := 0; i < 10; i++ {
        channels[i] <- i
    }

    cancel()
    wg.Wait()
}

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question