1
0
mirror of https://github.com/google/gops.git synced 2024-11-24 08:22:25 +02:00

Back to using semaphores removed by #126.

This solution is a bit more idiomatic and resonates with "Don't communicate by
sharing memory, share memory by communicating." Go proverb[^1].

Similar approach to use buffered channel as a semaphore to limit throughput is
discussed in Effective Go[^2].

[^1]: https://go-proverbs.github.io/
[^2]: https://golang.org/doc/effective_go.html#channels
This commit is contained in:
Jaroslavs Samcuks 2020-09-09 21:45:47 +02:00
parent 5d514cabbb
commit ba776bfd3b

View File

@ -7,7 +7,6 @@ package goprocess
import (
"os"
"sync"
goversion "rsc.io/goversion/version"
@ -39,51 +38,36 @@ func FindAll() []P {
type isGoFunc func(ps.Process) (path, version string, agent, ok bool, err error)
func findAll(pss []ps.Process, isGo isGoFunc, concurrencyLimit int) []P {
input := make(chan ps.Process, len(pss))
output := make(chan P, len(pss))
for _, ps := range pss {
input <- ps
}
close(input)
var wg sync.WaitGroup
wg.Add(concurrencyLimit) // used to wait for workers to be finished
// Run concurrencyLimit of workers until there
// is no more processes to be checked in the input channel.
for i := 0; i < concurrencyLimit; i++ {
output := make(chan []P, 1)
output <- nil
// Using buffered channel as a semaphore to limit throughput.
// See https://golang.org/doc/effective_go.html#channels
type token struct{}
sem := make(chan token, concurrencyLimit)
for _, pr := range pss {
sem <- token{}
pr := pr
go func() {
defer wg.Done()
for pr := range input {
path, version, agent, ok, err := isGo(pr)
if err != nil {
// TODO(jbd): Return a list of errors.
continue
}
if !ok {
continue
}
output <- P{
defer func() { <-sem }()
if path, version, agent, ok, err := isGo(pr); err != nil {
// TODO(jbd): Return a list of errors.
} else if ok {
output <- append(<-output, P{
PID: pr.Pid(),
PPID: pr.PPid(),
Exec: pr.Executable(),
Path: path,
BuildVersion: version,
Agent: agent,
}
})
}
}()
}
wg.Wait() // wait until all workers are finished
close(output) // no more results to be waited for
var results []P
for p := range output {
results = append(results, p)
// Acquire all semaphore slots to wait for work to complete.
for n := cap(sem); n > 0; n-- {
sem <- token{}
}
return results
return <-output
}
// Find finds info about the process identified with the given PID.