1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-12-30 10:10:44 +02:00

Merge pull request #852 from micro/mutex

add mutex lock implementation
This commit is contained in:
Asim Aslam 2019-10-14 15:23:59 +01:00 committed by GitHub
commit 607fdb3fcb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 147 additions and 0 deletions

View File

@ -2,9 +2,14 @@
package lock package lock
import ( import (
"errors"
"time" "time"
) )
var (
ErrLockTimeout = errors.New("lock timeout")
)
// Lock is a distributed locking interface // Lock is a distributed locking interface
type Lock interface { type Lock interface {
// Acquire a lock with given id // Acquire a lock with given id

142
sync/lock/mutex/mutex.go Normal file
View File

@ -0,0 +1,142 @@
// Package mutex provides a sync.Mutex implementation of the lock for local use
package mutex
import (
"sync"
"time"
lock "github.com/micro/go-micro/sync/lock"
)
type mutexLock struct {
sync.RWMutex
locks map[string]*mlock
}
type mlock struct {
id string
time time.Time
ttl time.Duration
release chan bool
}
func (m *mutexLock) Acquire(id string, opts ...lock.AcquireOption) error {
// lock our access
m.Lock()
var options lock.AcquireOptions
for _, o := range opts {
o(&options)
}
lk, ok := m.locks[id]
if !ok {
m.locks[id] = &mlock{
id: id,
time: time.Now(),
ttl: options.TTL,
release: make(chan bool),
}
// unlock
m.Unlock()
return nil
}
m.Unlock()
// set wait time
var wait <-chan time.Time
var ttl <-chan time.Time
// decide if we should wait
if options.Wait > time.Duration(0) {
wait = time.After(options.Wait)
}
// check the ttl of the lock
if lk.ttl > time.Duration(0) {
// time lived for the lock
live := time.Since(lk.time)
// set a timer for the leftover ttl
if live > lk.ttl {
// release the lock if it expired
m.Release(id)
} else {
ttl = time.After(live)
}
}
lockLoop:
for {
// wait for the lock to be released
select {
case <-lk.release:
m.Lock()
// someone locked before us
lk, ok = m.locks[id]
if ok {
m.Unlock()
continue
}
// got chance to lock
m.locks[id] = &mlock{
id: id,
time: time.Now(),
ttl: options.TTL,
release: make(chan bool),
}
m.Unlock()
break lockLoop
case <-ttl:
// ttl exceeded
m.Release(id)
// TODO: check the ttl again above
ttl = nil
// try acquire
continue
case <-wait:
return lock.ErrLockTimeout
}
}
return nil
}
func (m *mutexLock) Release(id string) error {
m.Lock()
defer m.Unlock()
lk, ok := m.locks[id]
// no lock exists
if !ok {
return nil
}
// delete the lock
delete(m.locks, id)
select {
case <-lk.release:
return nil
default:
close(lk.release)
}
return nil
}
func NewLock(opts ...lock.Option) lock.Lock {
var options lock.Options
for _, o := range opts {
o(&options)
}
return &mutexLock{
locks: make(map[string]*mlock),
}
}