2013-04-19 27 views
10

Trong trường hợp của tôi, tôi có hàng ngàn goroutines làm việc đồng thời như work(). Tôi cũng có một goroutine sync(). Khi bắt đầu sync, tôi cần bất kỳ goroutine nào khác để tạm dừng một lúc sau khi hoàn thành công việc đồng bộ hóa. Đây là mã của tôi:Có cách nào thanh lịch để tạm dừng và tiếp tục bất kỳ goroutine nào khác trong golang không?

var channels []chan int 
var channels_mutex sync.Mutex 

func work() { 
    channel := make(chan int, 1) 
    channels_mutex.Lock() 
    channels = append(channels, channel) 
    channels_mutex.Unlock() 
    for { 
    for { 
     sync_stat := <- channel // blocked here 
     if sync_stat == 0 { // if sync complete 
     break 
     } 
    } 
    // Do some jobs 
    if (some condition) { 
     return 
    } 
    } 
} 

func sync() { 
    channels_mutex.Lock() 
    // do some sync 

    for int i := 0; i != len(channels); i++ { 
    channels[i] <- 0 
    } 
    channels_mutex.Unlock() 
} 

Bây giờ vấn đề là, kể từ <- luôn chặn trên đọc, mỗi lần đi vào sync_stat := <- channel chặn. Tôi biết nếu kênh đã bị đóng, kênh sẽ không bị chặn nhưng vì tôi phải sử dụng kênh này cho đến khi thoát work() và tôi không tìm thấy bất kỳ cách nào để mở lại kênh đã đóng.

Tôi nghi ngờ bản thân mình sai hướng, vì vậy mọi trợ giúp đều được đánh giá cao. Có cách nào "thanh lịch" để tạm dừng & tiếp tục bất kỳ goroutine nào khác trong golang không?

Trả lời

16

Nếu tôi hiểu chính xác, bạn muốn N số lượng công nhân và một bộ điều khiển, có thể tạm dừng, tiếp tục và dừng công nhân theo ý muốn. Các mã sau đây sẽ làm điều đó.

package main 

import (
    "fmt" 
    "runtime" 
    "sync" 
) 

// Possible worker states. 
const (
    Stopped = 0 
    Paused = 1 
    Running = 2 
) 

// Maximum number of workers. 
const WorkerCount = 1000 

func main() { 
    // Launch workers. 
    var wg sync.WaitGroup 
    wg.Add(WorkerCount + 1) 

    workers := make([]chan int, WorkerCount) 
    for i := range workers { 
     workers[i] = make(chan int, 1) 

     go func(i int) { 
      worker(i, workers[i]) 
      wg.Done() 
     }(i) 
    } 

    // Launch controller routine. 
    go func() { 
     controller(workers) 
     wg.Done() 
    }() 

    // Wait for all goroutines to finish. 
    wg.Wait() 
} 

func worker(id int, ws <-chan int) { 
    state := Paused // Begin in the paused state. 

    for { 
     select { 
     case state = <-ws: 
      switch state { 
      case Stopped: 
       fmt.Printf("Worker %d: Stopped\n", id) 
       return 
      case Running: 
       fmt.Printf("Worker %d: Running\n", id) 
      case Paused: 
       fmt.Printf("Worker %d: Paused\n", id) 
      } 

     default: 
      // We use runtime.Gosched() to prevent a deadlock in this case. 
      // It will not be needed of work is performed here which yields 
      // to the scheduler. 
      runtime.Gosched() 

      if state == Paused { 
       break 
      } 

      // Do actual work here. 
     } 
    } 
} 

// controller handles the current state of all workers. They can be 
// instructed to be either running, paused or stopped entirely. 
func controller(workers []chan int) { 
    // Start workers 
    setState(workers, Running) 

    // Pause workers. 
    setState(workers, Paused) 

    // Unpause workers. 
    setState(workers, Running) 

    // Shutdown workers. 
    setState(workers, Stopped) 
} 

// setState changes the state of all given workers. 
func setState(workers []chan int, state int) { 
    for _, w := range workers { 
     w <- state 
    } 
} 
+1

'<-time.After (1e1)' có nghĩa là gì? –

+0

Nó tạm dừng goroutine trong 1 giây (1e9 nano giây). Nó được sử dụng trong ví dụ này để làm cho nó trông giống như bộ điều khiển đang làm một số công việc thực tế. 'time.After()' trả về một kênh, gửi một tín hiệu sau thời gian chờ đã cho. '<-time.After (N)', chỉ đơn giản là chặn trên kênh đó cho đến khi nhận được tín hiệu đó. – jimt

+0

Và điều này làm cho tôi nhận ra một suy nghĩ khác: tại sao chúng ta không chỉ sử dụng một giá trị toàn cầu đại diện cho trạng thái điều khiển, và 'công nhân' kiểm tra giá trị toàn cầu mỗi lần? Tôi biết nó không phải là một thực hành tốt, nhưng tôi muốn biết lý do. –