1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-29 17:36:38 +08:00

Merge pull request #1539 from ipfs/log-api-route

fix log tail command
This commit is contained in:
Juan Benet
2015-07-29 15:22:49 -07:00
51 changed files with 18 additions and 5167 deletions

13
Godeps/Godeps.json generated
View File

@ -14,10 +14,6 @@
"Comment": "null-5", "Comment": "null-5",
"Rev": "75cd24fc2f2c2a2088577d12123ddee5f54e0675" "Rev": "75cd24fc2f2c2a2088577d12123ddee5f54e0675"
}, },
{
"ImportPath": "github.com/ActiveState/tail",
"Rev": "068b72961a6bc5b4a82cf4fc14ccc724c0cfa73a"
},
{ {
"ImportPath": "github.com/Sirupsen/logrus", "ImportPath": "github.com/Sirupsen/logrus",
"Comment": "v0.7.3-2-g26709e2", "Comment": "v0.7.3-2-g26709e2",
@ -111,11 +107,6 @@
"ImportPath": "github.com/hashicorp/yamux", "ImportPath": "github.com/hashicorp/yamux",
"Rev": "9feabe6854fadca1abec9cd3bd2a613fe9a34000" "Rev": "9feabe6854fadca1abec9cd3bd2a613fe9a34000"
}, },
{
"ImportPath": "github.com/howeyc/fsnotify",
"Comment": "v0.9.0-11-g6b1ef89",
"Rev": "6b1ef893dc11e0447abda6da20a5203481878dda"
},
{ {
"ImportPath": "github.com/huin/goupnp", "ImportPath": "github.com/huin/goupnp",
"Rev": "223008361153d7d434c1f0ac990cd3fcae6931f5" "Rev": "223008361153d7d434c1f0ac990cd3fcae6931f5"
@ -327,10 +318,6 @@
"ImportPath": "gopkg.in/fsnotify.v1", "ImportPath": "gopkg.in/fsnotify.v1",
"Comment": "v1.2.0", "Comment": "v1.2.0",
"Rev": "96c060f6a6b7e0d6f75fddd10efeaca3e5d1bcb0" "Rev": "96c060f6a6b7e0d6f75fddd10efeaca3e5d1bcb0"
},
{
"ImportPath": "gopkg.in/tomb.v1",
"Rev": "dd632973f1e7218eb1089048e0798ec9ae7dceb8"
} }
] ]
} }

View File

@ -1,3 +0,0 @@
.test
.go

View File

@ -1,10 +0,0 @@
language: go
# depman needs to be installed with GOPATH pointing to a single directory.
script:
- GOPATH=$HOME/gopath go get github.com/vube/depman
- GOPATH=$HOME/gopath $HOME/gopath/bin/depman
- go test -v ./...
go:
- 1.2.1

View File

@ -1,44 +0,0 @@
# July, 2014
* Fix tail for Windows (PR #36)
# May, 2014
* Improved rate limiting using leaky bucket (PR #29)
* Fix odd line splitting (PR #30)
# Apr, 2014
* LimitRate now discards read buffer (PR #28)
* allow reading of longer lines if MaxLineSize is unset (PR #24)
* updated deps.json to latest fsnotify (441bbc86b1)
# Feb, 2014
* added `Config.Logger` to suppress library logging
# Nov, 2013
* add Cleanup to remove leaky inotify watches (PR #20)
# Aug, 2013
* redesigned Location field (PR #12)
* add tail.Tell (PR #14)
# July, 2013
* Rate limiting (PR #10)
# May, 2013
* Detect file deletions/renames in polling file watcher (PR #1)
* Detect file truncation
* Fix potential race condition when reopening the file (issue 5)
* Fix potential blocking of `tail.Stop` (issue 4)
* Fix uncleaned up ChangeEvents goroutines after calling tail.Stop
* Support Follow=false
# Feb, 2013
* Initial open source release

View File

@ -1,27 +0,0 @@
FROM ubuntu
RUN apt-get -qy update
RUN apt-get -y install golang-go
RUN apt-get -y install git
RUN apt-get -y install mercurial subversion
ENV GOPATH $HOME/go
RUN mkdir -p $GOPATH/src/github.com/ActiveState/tail/
ADD . $GOPATH/src/github.com/ActiveState/tail/
# expecting to fetch dependencies successfully.
RUN go get -v github.com/ActiveState/tail
# expecting to run the test successfully.
RUN go test -v github.com/ActiveState/tail
# expecting to install successfully
RUN go install -v github.com/ActiveState/tail
RUN go install -v github.com/ActiveState/tail/cmd/gotail
RUN $GOPATH/bin/gotail -h || true
ENV PATH $GOPATH/bin:$PATH
CMD ["gotail"]

View File

@ -1,23 +0,0 @@
# This is the MIT license
# Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -1,11 +0,0 @@
default: test
test: *.go
go test -v ./...
fmt:
gofmt -w .
# Run the test in an isolated environment.
fulltest:
docker build -t ActiveState/tail .

View File

@ -1,24 +0,0 @@
[![Build Status](https://travis-ci.org/ActiveState/tail.svg)](https://travis-ci.org/ActiveState/tail)
# Go package for tail-ing files
A Go package striving to emulate the features of the BSD `tail` program.
```Go
t, err := tail.TailFile("/var/log/nginx.log", tail.Config{Follow: true})
for line := range t.Lines {
fmt.Println(line.Text)
}
```
See [API documentation](http://godoc.org/github.com/ActiveState/tail).
## Log rotation
Tail comes with full support for truncation/move detection as it is
designed to work with log rotation tools.
## Installing
go get github.com/ActiveState/tail/...

View File

@ -1 +0,0 @@
gotail

View File

@ -1,4 +0,0 @@
default: gotail
gotail: *.go ../../*.go
go build

View File

@ -1,64 +0,0 @@
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
package main
import (
"flag"
"fmt"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ActiveState/tail"
"os"
)
func args2config() (tail.Config, int64) {
config := tail.Config{Follow: true}
n := int64(0)
maxlinesize := int(0)
flag.Int64Var(&n, "n", 0, "tail from the last Nth location")
flag.IntVar(&maxlinesize, "max", 0, "max line size")
flag.BoolVar(&config.Follow, "f", false, "wait for additional data to be appended to the file")
flag.BoolVar(&config.ReOpen, "F", false, "follow, and track file rename/rotation")
flag.BoolVar(&config.Poll, "p", false, "use polling, instead of inotify")
flag.Parse()
if config.ReOpen {
config.Follow = true
}
config.MaxLineSize = maxlinesize
return config, n
}
func main() {
config, n := args2config()
if flag.NFlag() < 1 {
fmt.Println("need one or more files as arguments")
os.Exit(1)
}
if n != 0 {
config.Location = &tail.SeekInfo{-n, os.SEEK_END}
}
done := make(chan bool)
for _, filename := range flag.Args() {
go tailFile(filename, config, done)
}
for _, _ = range flag.Args() {
<-done
}
}
func tailFile(filename string, config tail.Config, done chan bool) {
defer func() { done <- true }()
t, err := tail.TailFile(filename, config)
if err != nil {
fmt.Println(err)
return
}
for line := range t.Lines {
fmt.Println(line.Text)
}
err = t.Wait()
if err != nil {
fmt.Println(err)
}
}

View File

@ -1,14 +0,0 @@
{
"github.com/howeyc/fsnotify": {
"repo": "http://github.com/howeyc/fsnotify.git",
"version": "441bbc86b167",
"type": "git-clone",
"alias": "github.com/howeyc/fsnotify"
},
"gopkg.in/tomb.v1": {
"repo": "https://github.com/go-tomb/tomb.git",
"version": "c131134a1947e9afd9cecfe11f4c6dff0732ae58",
"type": "git-clone",
"alias": "gopkg.in/tomb.v1"
}
}

View File

@ -1,7 +0,0 @@
Copyright (C) 2013 99designs
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -1,97 +0,0 @@
// Package ratelimiter implements the Leaky Bucket ratelimiting algorithm with memcached and in-memory backends.
package ratelimiter
import (
"time"
)
type LeakyBucket struct {
Size uint16
Fill float64
LeakInterval time.Duration // time.Duration for 1 unit of size to leak
Lastupdate time.Time
Now func() time.Time
}
func NewLeakyBucket(size uint16, leakInterval time.Duration) *LeakyBucket {
bucket := LeakyBucket{
Size: size,
Fill: 0,
LeakInterval: leakInterval,
Now: time.Now,
Lastupdate: time.Now(),
}
return &bucket
}
func (b *LeakyBucket) updateFill() {
now := b.Now()
if b.Fill > 0 {
elapsed := now.Sub(b.Lastupdate)
b.Fill -= float64(elapsed) / float64(b.LeakInterval)
if b.Fill < 0 {
b.Fill = 0
}
}
b.Lastupdate = now
}
func (b *LeakyBucket) Pour(amount uint16) bool {
b.updateFill()
var newfill float64 = b.Fill + float64(amount)
if newfill > float64(b.Size) {
return false
}
b.Fill = newfill
return true
}
// The time at which this bucket will be completely drained
func (b *LeakyBucket) DrainedAt() time.Time {
return b.Lastupdate.Add(time.Duration(b.Fill * float64(b.LeakInterval)))
}
// The duration until this bucket is completely drained
func (b *LeakyBucket) TimeToDrain() time.Duration {
return b.DrainedAt().Sub(b.Now())
}
func (b *LeakyBucket) TimeSinceLastUpdate() time.Duration {
return b.Now().Sub(b.Lastupdate)
}
type LeakyBucketSer struct {
Size uint16
Fill float64
LeakInterval time.Duration // time.Duration for 1 unit of size to leak
Lastupdate time.Time
}
func (b *LeakyBucket) Serialise() *LeakyBucketSer {
bucket := LeakyBucketSer{
Size: b.Size,
Fill: b.Fill,
LeakInterval: b.LeakInterval,
Lastupdate: b.Lastupdate,
}
return &bucket
}
func (b *LeakyBucketSer) DeSerialise() *LeakyBucket {
bucket := LeakyBucket{
Size: b.Size,
Fill: b.Fill,
LeakInterval: b.LeakInterval,
Lastupdate: b.Lastupdate,
Now: time.Now,
}
return &bucket
}

View File

@ -1,73 +0,0 @@
package ratelimiter
import (
"testing"
"time"
)
func TestPour(t *testing.T) {
bucket := NewLeakyBucket(60, time.Second)
bucket.Lastupdate = time.Unix(0, 0)
bucket.Now = func() time.Time { return time.Unix(1, 0) }
if bucket.Pour(61) {
t.Error("Expected false")
}
if !bucket.Pour(10) {
t.Error("Expected true")
}
if !bucket.Pour(49) {
t.Error("Expected true")
}
if bucket.Pour(2) {
t.Error("Expected false")
}
bucket.Now = func() time.Time { return time.Unix(61, 0) }
if !bucket.Pour(60) {
t.Error("Expected true")
}
if bucket.Pour(1) {
t.Error("Expected false")
}
bucket.Now = func() time.Time { return time.Unix(70, 0) }
if !bucket.Pour(1) {
t.Error("Expected true")
}
}
func TestTimeSinceLastUpdate(t *testing.T) {
bucket := NewLeakyBucket(60, time.Second)
bucket.Now = func() time.Time { return time.Unix(1, 0) }
bucket.Pour(1)
bucket.Now = func() time.Time { return time.Unix(2, 0) }
sinceLast := bucket.TimeSinceLastUpdate()
if sinceLast != time.Second*1 {
t.Errorf("Expected time since last update to be less than 1 second, got %d", sinceLast)
}
}
func TestTimeToDrain(t *testing.T) {
bucket := NewLeakyBucket(60, time.Second)
bucket.Now = func() time.Time { return time.Unix(1, 0) }
bucket.Pour(10)
if bucket.TimeToDrain() != time.Second*10 {
t.Error("Time to drain should be 10 seconds")
}
bucket.Now = func() time.Time { return time.Unix(2, 0) }
if bucket.TimeToDrain() != time.Second*9 {
t.Error("Time to drain should be 9 seconds")
}
}

View File

@ -1,58 +0,0 @@
package ratelimiter
import (
"errors"
"time"
)
const GC_SIZE int = 100
type Memory struct {
store map[string]LeakyBucket
lastGCCollected time.Time
}
func NewMemory() *Memory {
m := new(Memory)
m.store = make(map[string]LeakyBucket)
m.lastGCCollected = time.Now()
return m
}
func (m *Memory) GetBucketFor(key string) (*LeakyBucket, error) {
bucket, ok := m.store[key]
if !ok {
return nil, errors.New("miss")
}
return &bucket, nil
}
func (m *Memory) SetBucketFor(key string, bucket LeakyBucket) error {
if len(m.store) > GC_SIZE {
m.GarbageCollect()
}
m.store[key] = bucket
return nil
}
func (m *Memory) GarbageCollect() {
now := time.Now()
// rate limit GC to once per minute
if now.Add(60*time.Second).Unix() > m.lastGCCollected.Unix() {
for key, bucket := range m.store {
// if the bucket is drained, then GC
if bucket.DrainedAt().Unix() > now.Unix() {
delete(m.store, key)
}
}
m.lastGCCollected = now
}
}

View File

@ -1,6 +0,0 @@
package ratelimiter
type Storage interface {
GetBucketFor(string) (*LeakyBucket, error)
SetBucketFor(string, LeakyBucket) error
}

View File

@ -1,361 +0,0 @@
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
package tail
import (
"bufio"
"fmt"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ActiveState/tail/ratelimiter"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ActiveState/tail/util"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ActiveState/tail/watch"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/gopkg.in/tomb.v1"
"io"
"io/ioutil"
"log"
"os"
"strings"
"time"
)
var (
ErrStop = fmt.Errorf("tail should now stop")
)
type Line struct {
Text string
Time time.Time
Err error // Error from tail
}
// NewLine returns a Line with present time.
func NewLine(text string) *Line {
return &Line{text, time.Now(), nil}
}
// SeekInfo represents arguments to `os.Seek`
type SeekInfo struct {
Offset int64
Whence int // os.SEEK_*
}
// Config is used to specify how a file must be tailed.
type Config struct {
// File-specifc
Location *SeekInfo // Seek to this location before tailing
ReOpen bool // Reopen recreated files (tail -F)
MustExist bool // Fail early if the file does not exist
Poll bool // Poll for file changes instead of using inotify
RateLimiter *ratelimiter.LeakyBucket
// Generic IO
Follow bool // Continue looking for new lines (tail -f)
MaxLineSize int // If non-zero, split longer lines into multiple lines
// Logger, when nil, is set to tail.DefaultLogger
// To disable logging: set field to tail.DiscardingLogger
Logger *log.Logger
}
type Tail struct {
Filename string
Lines chan *Line
Config
file *os.File
reader *bufio.Reader
watcher watch.FileWatcher
changes *watch.FileChanges
tomb.Tomb // provides: Done, Kill, Dying
}
var (
// DefaultLogger is used when Config.Logger == nil
DefaultLogger = log.New(os.Stderr, "", log.LstdFlags)
// DiscardingLogger can be used to disable logging output
DiscardingLogger = log.New(ioutil.Discard, "", 0)
)
// TailFile begins tailing the file. Output stream is made available
// via the `Tail.Lines` channel. To handle errors during tailing,
// invoke the `Wait` or `Err` method after finishing reading from the
// `Lines` channel.
func TailFile(filename string, config Config) (*Tail, error) {
if config.ReOpen && !config.Follow {
util.Fatal("cannot set ReOpen without Follow.")
}
t := &Tail{
Filename: filename,
Lines: make(chan *Line),
Config: config,
}
// when Logger was not specified in config, use default logger
if t.Logger == nil {
t.Logger = log.New(os.Stderr, "", log.LstdFlags)
}
if t.Poll {
t.watcher = watch.NewPollingFileWatcher(filename)
} else {
t.watcher = watch.NewInotifyFileWatcher(filename)
}
if t.MustExist {
var err error
t.file, err = OpenFile(t.Filename)
if err != nil {
return nil, err
}
}
go t.tailFileSync()
return t, nil
}
// Return the file's current position, like stdio's ftell().
// But this value is not very accurate.
// it may readed one line in the chan(tail.Lines),
// so it may lost one line.
func (tail *Tail) Tell() (offset int64, err error) {
if tail.file == nil {
return
}
offset, err = tail.file.Seek(0, os.SEEK_CUR)
if err == nil {
offset -= int64(tail.reader.Buffered())
}
return
}
// Stop stops the tailing activity.
func (tail *Tail) Stop() error {
tail.Kill(nil)
return tail.Wait()
}
func (tail *Tail) close() {
close(tail.Lines)
if tail.file != nil {
tail.file.Close()
}
}
func (tail *Tail) reopen() error {
if tail.file != nil {
tail.file.Close()
}
for {
var err error
tail.file, err = OpenFile(tail.Filename)
if err != nil {
if os.IsNotExist(err) {
tail.Logger.Printf("Waiting for %s to appear...", tail.Filename)
if err := tail.watcher.BlockUntilExists(&tail.Tomb); err != nil {
if err == tomb.ErrDying {
return err
}
return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err)
}
continue
}
return fmt.Errorf("Unable to open file %s: %s", tail.Filename, err)
}
break
}
return nil
}
func (tail *Tail) readLine() (string, error) {
line, err := tail.reader.ReadString('\n')
if err != nil {
// Note ReadString "returns the data read before the error" in
// case of an error, including EOF, so we return it as is. The
// caller is expected to process it if err is EOF.
return line, err
}
line = strings.TrimRight(line, "\n")
return line, err
}
func (tail *Tail) tailFileSync() {
defer tail.Done()
defer tail.close()
if !tail.MustExist {
// deferred first open.
err := tail.reopen()
if err != nil {
if err != tomb.ErrDying {
tail.Kill(err)
}
return
}
}
// Seek to requested location on first open of the file.
if tail.Location != nil {
_, err := tail.file.Seek(tail.Location.Offset, tail.Location.Whence)
tail.Logger.Printf("Seeked %s - %+v\n", tail.Filename, tail.Location)
if err != nil {
tail.Killf("Seek error on %s: %s", tail.Filename, err)
return
}
}
tail.openReader()
// Read line by line.
for {
line, err := tail.readLine()
// Process `line` even if err is EOF.
if err == nil || (err == io.EOF && line != "") {
cooloff := !tail.sendLine(line)
if cooloff {
// Wait a second before seeking till the end of
// file when rate limit is reached.
msg := fmt.Sprintf(
"Too much log activity; waiting a second " +
"before resuming tailing")
tail.Lines <- &Line{msg, time.Now(), fmt.Errorf(msg)}
select {
case <-time.After(time.Second):
case <-tail.Dying():
return
}
err = tail.seekEnd()
if err != nil {
tail.Kill(err)
return
}
}
} else if err == io.EOF {
if !tail.Follow {
return
}
// When EOF is reached, wait for more data to become
// available. Wait strategy is based on the `tail.watcher`
// implementation (inotify or polling).
err := tail.waitForChanges()
if err != nil {
if err != ErrStop {
tail.Kill(err)
}
return
}
} else {
// non-EOF error
tail.Killf("Error reading %s: %s", tail.Filename, err)
return
}
select {
case <-tail.Dying():
return
default:
}
}
}
// waitForChanges waits until the file has been appended, deleted,
// moved or truncated. When moved or deleted - the file will be
// reopened if ReOpen is true. Truncated files are always reopened.
func (tail *Tail) waitForChanges() error {
if tail.changes == nil {
st, err := tail.file.Stat()
if err != nil {
return err
}
tail.changes = tail.watcher.ChangeEvents(&tail.Tomb, st)
}
select {
case <-tail.changes.Modified:
return nil
case <-tail.changes.Deleted:
tail.changes = nil
if tail.ReOpen {
// XXX: we must not log from a library.
tail.Logger.Printf("Re-opening moved/deleted file %s ...", tail.Filename)
if err := tail.reopen(); err != nil {
return err
}
tail.Logger.Printf("Successfully reopened %s", tail.Filename)
tail.openReader()
return nil
} else {
tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
return ErrStop
}
case <-tail.changes.Truncated:
// Always reopen truncated files (Follow is true)
tail.Logger.Printf("Re-opening truncated file %s ...", tail.Filename)
if err := tail.reopen(); err != nil {
return err
}
tail.Logger.Printf("Successfully reopened truncated %s", tail.Filename)
tail.openReader()
return nil
case <-tail.Dying():
return ErrStop
}
panic("unreachable")
}
func (tail *Tail) openReader() {
if tail.MaxLineSize > 0 {
// add 2 to account for newline characters
tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize+2)
} else {
tail.reader = bufio.NewReader(tail.file)
}
}
func (tail *Tail) seekEnd() error {
_, err := tail.file.Seek(0, 2)
if err != nil {
return fmt.Errorf("Seek error on %s: %s", tail.Filename, err)
}
// Reset the read buffer whenever the file is re-seek'ed
tail.reader.Reset(tail.file)
return nil
}
// sendLine sends the line(s) to Lines channel, splitting longer lines
// if necessary. Return false if rate limit is reached.
func (tail *Tail) sendLine(line string) bool {
now := time.Now()
lines := []string{line}
// Split longer lines
if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize {
lines = util.PartitionString(line, tail.MaxLineSize)
}
for _, line := range lines {
tail.Lines <- &Line{line, now, nil}
}
if tail.Config.RateLimiter != nil {
ok := tail.Config.RateLimiter.Pour(uint16(len(lines)))
if !ok {
tail.Logger.Printf("Leaky bucket full (%v); entering 1s cooloff period.\n",
tail.Filename)
return false
}
}
return true
}
// Cleanup removes inotify watches added by the tail package. This function is
// meant to be invoked from a process's exit handler. Linux kernel may not
// automatically remove inotify watches after the process exits.
func Cleanup() {
watch.Cleanup()
}

View File

@ -1,11 +0,0 @@
// +build linux darwin freebsd
package tail
import (
"os"
)
func OpenFile(name string) (file *os.File, err error) {
return os.Open(name)
}

View File

@ -1,428 +0,0 @@
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
// TODO:
// * repeat all the tests with Poll:true
package tail
import (
"./watch"
_ "fmt"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ActiveState/tail/ratelimiter"
"io/ioutil"
"os"
"strings"
"testing"
"time"
)
func init() {
// Clear the temporary test directory
err := os.RemoveAll(".test")
if err != nil {
panic(err)
}
}
func TestMustExist(t *testing.T) {
tail, err := TailFile("/no/such/file", Config{Follow: true, MustExist: true})
if err == nil {
t.Error("MustExist:true is violated")
tail.Stop()
}
tail, err = TailFile("/no/such/file", Config{Follow: true, MustExist: false})
if err != nil {
t.Error("MustExist:false is violated")
}
tail.Stop()
_, err = TailFile("README.md", Config{Follow: true, MustExist: true})
if err != nil {
t.Error("MustExist:true on an existing file is violated")
}
tail.Stop()
Cleanup()
}
func TestStop(t *testing.T) {
tail, err := TailFile("_no_such_file", Config{Follow: true, MustExist: false})
if err != nil {
t.Error("MustExist:false is violated")
}
if tail.Stop() != nil {
t.Error("Should be stoped successfully")
}
Cleanup()
}
func TestMaxLineSize(_t *testing.T) {
t := NewTailTest("maxlinesize", _t)
t.CreateFile("test.txt", "hello\nworld\nfin\nhe")
tail := t.StartTail("test.txt", Config{Follow: true, Location: nil, MaxLineSize: 3})
go t.VerifyTailOutput(tail, []string{"hel", "lo", "wor", "ld", "fin", "he"})
// Delete after a reasonable delay, to give tail sufficient time
// to read all lines.
<-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt")
tail.Stop()
Cleanup()
}
func TestOver4096ByteLine(_t *testing.T) {
t := NewTailTest("Over4096ByteLine", _t)
testString := strings.Repeat("a", 4097)
t.CreateFile("test.txt", "test\n"+testString+"\nhello\nworld\n")
tail := t.StartTail("test.txt", Config{Follow: true, Location: nil})
go t.VerifyTailOutput(tail, []string{"test", testString, "hello", "world"})
// Delete after a reasonable delay, to give tail sufficient time
// to read all lines.
<-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt")
tail.Stop()
Cleanup()
}
func TestOver4096ByteLineWithSetMaxLineSize(_t *testing.T) {
t := NewTailTest("Over4096ByteLineMaxLineSize", _t)
testString := strings.Repeat("a", 4097)
t.CreateFile("test.txt", "test\n"+testString+"\nhello\nworld\n")
tail := t.StartTail("test.txt", Config{Follow: true, Location: nil, MaxLineSize: 4097})
go t.VerifyTailOutput(tail, []string{"test", testString, "hello", "world"})
// Delete after a reasonable delay, to give tail sufficient time
// to read all lines.
<-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt")
// tail.Stop()
Cleanup()
}
func TestLocationFull(_t *testing.T) {
t := NewTailTest("location-full", _t)
t.CreateFile("test.txt", "hello\nworld\n")
tail := t.StartTail("test.txt", Config{Follow: true, Location: nil})
go t.VerifyTailOutput(tail, []string{"hello", "world"})
// Delete after a reasonable delay, to give tail sufficient time
// to read all lines.
<-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt")
tail.Stop()
Cleanup()
}
func TestLocationFullDontFollow(_t *testing.T) {
t := NewTailTest("location-full-dontfollow", _t)
t.CreateFile("test.txt", "hello\nworld\n")
tail := t.StartTail("test.txt", Config{Follow: false, Location: nil})
go t.VerifyTailOutput(tail, []string{"hello", "world"})
// Add more data only after reasonable delay.
<-time.After(100 * time.Millisecond)
t.AppendFile("test.txt", "more\ndata\n")
<-time.After(100 * time.Millisecond)
tail.Stop()
Cleanup()
}
func TestLocationEnd(_t *testing.T) {
t := NewTailTest("location-end", _t)
t.CreateFile("test.txt", "hello\nworld\n")
tail := t.StartTail("test.txt", Config{Follow: true, Location: &SeekInfo{0, os.SEEK_END}})
go t.VerifyTailOutput(tail, []string{"more", "data"})
<-time.After(100 * time.Millisecond)
t.AppendFile("test.txt", "more\ndata\n")
// Delete after a reasonable delay, to give tail sufficient time
// to read all lines.
<-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt")
tail.Stop()
Cleanup()
}
func TestLocationMiddle(_t *testing.T) {
// Test reading from middle.
t := NewTailTest("location-end", _t)
t.CreateFile("test.txt", "hello\nworld\n")
tail := t.StartTail("test.txt", Config{Follow: true, Location: &SeekInfo{-6, os.SEEK_END}})
go t.VerifyTailOutput(tail, []string{"world", "more", "data"})
<-time.After(100 * time.Millisecond)
t.AppendFile("test.txt", "more\ndata\n")
// Delete after a reasonable delay, to give tail sufficient time
// to read all lines.
<-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt")
tail.Stop()
Cleanup()
}
func _TestReOpen(_t *testing.T, poll bool) {
var name string
var delay time.Duration
if poll {
name = "reopen-polling"
delay = 300 * time.Millisecond // account for POLL_DURATION
} else {
name = "reopen-inotify"
delay = 100 * time.Millisecond
}
t := NewTailTest(name, _t)
t.CreateFile("test.txt", "hello\nworld\n")
tail := t.StartTail(
"test.txt",
Config{Follow: true, ReOpen: true, Poll: poll})
go t.VerifyTailOutput(tail, []string{"hello", "world", "more", "data", "endofworld"})
// deletion must trigger reopen
<-time.After(delay)
t.RemoveFile("test.txt")
<-time.After(delay)
t.CreateFile("test.txt", "more\ndata\n")
// rename must trigger reopen
<-time.After(delay)
t.RenameFile("test.txt", "test.txt.rotated")
<-time.After(delay)
t.CreateFile("test.txt", "endofworld")
// Delete after a reasonable delay, to give tail sufficient time
// to read all lines.
<-time.After(delay)
t.RemoveFile("test.txt")
<-time.After(delay)
// Do not bother with stopping as it could kill the tomb during
// the reading of data written above. Timings can vary based on
// test environment.
// tail.Stop()
Cleanup()
}
// The use of polling file watcher could affect file rotation
// (detected via renames), so test these explicitly.
func TestReOpenInotify(_t *testing.T) {
_TestReOpen(_t, false)
}
func TestReOpenPolling(_t *testing.T) {
_TestReOpen(_t, true)
}
func _TestReSeek(_t *testing.T, poll bool) {
var name string
if poll {
name = "reseek-polling"
} else {
name = "reseek-inotify"
}
t := NewTailTest(name, _t)
t.CreateFile("test.txt", "a really long string goes here\nhello\nworld\n")
tail := t.StartTail(
"test.txt",
Config{Follow: true, ReOpen: false, Poll: poll})
go t.VerifyTailOutput(tail, []string{
"a really long string goes here", "hello", "world", "h311o", "w0r1d", "endofworld"})
// truncate now
<-time.After(100 * time.Millisecond)
t.TruncateFile("test.txt", "h311o\nw0r1d\nendofworld\n")
// Delete after a reasonable delay, to give tail sufficient time
// to read all lines.
<-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt")
// Do not bother with stopping as it could kill the tomb during
// the reading of data written above. Timings can vary based on
// test environment.
// tail.Stop()
Cleanup()
}
// The use of polling file watcher could affect file rotation
// (detected via renames), so test these explicitly.
func TestReSeekInotify(_t *testing.T) {
_TestReSeek(_t, false)
}
func TestReSeekPolling(_t *testing.T) {
_TestReSeek(_t, true)
}
func TestRateLimiting(_t *testing.T) {
t := NewTailTest("rate-limiting", _t)
t.CreateFile("test.txt", "hello\nworld\nagain\nextra\n")
config := Config{
Follow: true,
RateLimiter: ratelimiter.NewLeakyBucket(2, time.Second)}
leakybucketFull := "Too much log activity; waiting a second before resuming tailing"
tail := t.StartTail("test.txt", config)
// TODO: also verify that tail resumes after the cooloff period.
go t.VerifyTailOutput(tail, []string{
"hello", "world", "again",
leakybucketFull,
"more", "data",
leakybucketFull})
// Add more data only after reasonable delay.
<-time.After(1200 * time.Millisecond)
t.AppendFile("test.txt", "more\ndata\n")
// Delete after a reasonable delay, to give tail sufficient time
// to read all lines.
<-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt")
// tail.Stop()
Cleanup()
}
func TestTell(_t *testing.T) {
t := NewTailTest("tell-position", _t)
t.CreateFile("test.txt", "hello\nworld\nagain\nmore\n")
config := Config{
Follow: false,
Location: &SeekInfo{0, os.SEEK_SET}}
tail := t.StartTail("test.txt", config)
// read noe line
<-tail.Lines
offset, err := tail.Tell()
if err != nil {
t.Errorf("Tell return error: %s", err.Error())
}
tail.Done()
// tail.close()
config = Config{
Follow: false,
Location: &SeekInfo{offset, os.SEEK_SET}}
tail = t.StartTail("test.txt", config)
for l := range tail.Lines {
// it may readed one line in the chan(tail.Lines),
// so it may lost one line.
if l.Text != "world" && l.Text != "again" {
t.Fatalf("mismatch; expected world or again, but got %s",
l.Text)
}
break
}
t.RemoveFile("test.txt")
tail.Done()
Cleanup()
}
// Test library
type TailTest struct {
Name string
path string
*testing.T
}
func NewTailTest(name string, t *testing.T) TailTest {
tt := TailTest{name, ".test/" + name, t}
err := os.MkdirAll(tt.path, os.ModeTemporary|0700)
if err != nil {
tt.Fatal(err)
}
// Use a smaller poll duration for faster test runs. Keep it below
// 100ms (which value is used as common delays for tests)
watch.POLL_DURATION = 5 * time.Millisecond
return tt
}
func (t TailTest) CreateFile(name string, contents string) {
err := ioutil.WriteFile(t.path+"/"+name, []byte(contents), 0600)
if err != nil {
t.Fatal(err)
}
}
func (t TailTest) RemoveFile(name string) {
err := os.Remove(t.path + "/" + name)
if err != nil {
t.Fatal(err)
}
}
func (t TailTest) RenameFile(oldname string, newname string) {
oldname = t.path + "/" + oldname
newname = t.path + "/" + newname
err := os.Rename(oldname, newname)
if err != nil {
t.Fatal(err)
}
}
func (t TailTest) AppendFile(name string, contents string) {
f, err := os.OpenFile(t.path+"/"+name, os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
t.Fatal(err)
}
defer f.Close()
_, err = f.WriteString(contents)
if err != nil {
t.Fatal(err)
}
}
func (t TailTest) TruncateFile(name string, contents string) {
f, err := os.OpenFile(t.path+"/"+name, os.O_TRUNC|os.O_WRONLY, 0600)
if err != nil {
t.Fatal(err)
}
defer f.Close()
_, err = f.WriteString(contents)
if err != nil {
t.Fatal(err)
}
}
func (t TailTest) StartTail(name string, config Config) *Tail {
tail, err := TailFile(t.path+"/"+name, config)
if err != nil {
t.Fatal(err)
}
return tail
}
func (t TailTest) VerifyTailOutput(tail *Tail, lines []string) {
for idx, line := range lines {
tailedLine, ok := <-tail.Lines
if !ok {
// tail.Lines is closed and empty.
err := tail.Err()
if err != nil {
t.Fatalf("tail ended with error: %v", err)
}
t.Fatalf("tail ended early; expecting more: %v", lines[idx:])
}
if tailedLine == nil {
t.Fatalf("tail.Lines returned nil; not possible")
}
// Note: not checking .Err as the `lines` argument is designed
// to match error strings as well.
if tailedLine.Text != line {
t.Fatalf(
"unexpected line/err from tail: "+
"expecting <<%s>>>, but got <<<%s>>>",
line, tailedLine.Text)
}
}
line, ok := <-tail.Lines
if ok {
t.Fatalf("more content from tail: %+v", line)
}
}

View File

@ -1,12 +0,0 @@
// +build windows
package tail
import (
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ActiveState/tail/winfile"
"os"
)
func OpenFile(name string) (file *os.File, err error) {
return winfile.OpenFile(name, os.O_RDONLY, 0)
}

View File

@ -1,47 +0,0 @@
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
package util
import (
"fmt"
"log"
"os"
"runtime/debug"
)
type Logger struct {
*log.Logger
}
var LOGGER = &Logger{log.New(os.Stderr, "", log.LstdFlags)}
// fatal is like panic except it displays only the current goroutine's stack.
func Fatal(format string, v ...interface{}) {
// https://github.com/ActiveState/log/blob/master/log.go#L45
LOGGER.Output(2, fmt.Sprintf("FATAL -- "+format, v...)+"\n"+string(debug.Stack()))
os.Exit(1)
}
// partitionString partitions the string into chunks of given size,
// with the last chunk of variable size.
func PartitionString(s string, chunkSize int) []string {
if chunkSize <= 0 {
panic("invalid chunkSize")
}
length := len(s)
chunks := 1 + length/chunkSize
start := 0
end := chunkSize
parts := make([]string, 0, chunks)
for {
if end > length {
end = length
}
parts = append(parts, s[start:end])
if end == length {
break
}
start, end = end, end+chunkSize
}
return parts
}

View File

@ -1,42 +0,0 @@
package watch
type FileChanges struct {
Modified chan bool // Channel to get notified of modifications
Truncated chan bool // Channel to get notified of truncations
Deleted chan bool // Channel to get notified of deletions/renames
}
func NewFileChanges() *FileChanges {
return &FileChanges{
make(chan bool), make(chan bool), make(chan bool)}
}
func (fc *FileChanges) NotifyModified() {
sendOnlyIfEmpty(fc.Modified)
}
func (fc *FileChanges) NotifyTruncated() {
sendOnlyIfEmpty(fc.Truncated)
}
func (fc *FileChanges) NotifyDeleted() {
sendOnlyIfEmpty(fc.Deleted)
}
func (fc *FileChanges) Close() {
close(fc.Modified)
close(fc.Truncated)
close(fc.Deleted)
}
// sendOnlyIfEmpty sends on a bool channel only if the channel has no
// backlog to be read by other goroutines. This concurrency pattern
// can be used to notify other goroutines if and only if they are
// looking for it (i.e., subsequent notifications can be compressed
// into one).
func sendOnlyIfEmpty(ch chan bool) {
select {
case ch <- true:
default:
}
}

View File

@ -1,136 +0,0 @@
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
package watch
import (
"fmt"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ActiveState/tail/util"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/howeyc/fsnotify"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/gopkg.in/tomb.v1"
"os"
"path/filepath"
)
var inotifyTracker *InotifyTracker
// InotifyFileWatcher uses inotify to monitor file changes.
type InotifyFileWatcher struct {
Filename string
Size int64
}
func NewInotifyFileWatcher(filename string) *InotifyFileWatcher {
fw := &InotifyFileWatcher{filename, 0}
return fw
}
func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error {
w, err := inotifyTracker.NewWatcher()
if err != nil {
return err
}
defer inotifyTracker.CloseWatcher(w)
dirname := filepath.Dir(fw.Filename)
// Watch for new files to be created in the parent directory.
err = w.WatchFlags(dirname, fsnotify.FSN_CREATE)
if err != nil {
return err
}
// Do a real check now as the file might have been created before
// calling `WatchFlags` above.
if _, err = os.Stat(fw.Filename); !os.IsNotExist(err) {
// file exists, or stat returned an error.
return err
}
for {
select {
case evt, ok := <-w.Event:
if !ok {
return fmt.Errorf("inotify watcher has been closed")
} else if evt.Name == fw.Filename {
return nil
}
case <-t.Dying():
return tomb.ErrDying
}
}
panic("unreachable")
}
func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileChanges {
changes := NewFileChanges()
w, err := inotifyTracker.NewWatcher()
if err != nil {
util.Fatal("Error creating fsnotify watcher: %v", err)
}
err = w.Watch(fw.Filename)
if err != nil {
util.Fatal("Error watching %v: %v", fw.Filename, err)
}
fw.Size = fi.Size()
go func() {
defer inotifyTracker.CloseWatcher(w)
defer changes.Close()
for {
prevSize := fw.Size
var evt *fsnotify.FileEvent
var ok bool
select {
case evt, ok = <-w.Event:
if !ok {
return
}
case <-t.Dying():
return
}
switch {
case evt.IsDelete():
fallthrough
case evt.IsRename():
changes.NotifyDeleted()
return
case evt.IsModify():
fi, err := os.Stat(fw.Filename)
if err != nil {
if os.IsNotExist(err) {
changes.NotifyDeleted()
return
}
// XXX: report this error back to the user
util.Fatal("Failed to stat file %v: %v", fw.Filename, err)
}
fw.Size = fi.Size()
if prevSize > 0 && prevSize > fw.Size {
changes.NotifyTruncated()
} else {
changes.NotifyModified()
}
prevSize = fw.Size
}
}
}()
return changes
}
func Cleanup() {
inotifyTracker.CloseAll()
}
func init() {
inotifyTracker = NewInotifyTracker()
}

View File

@ -1,51 +0,0 @@
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
package watch
import (
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/howeyc/fsnotify"
"log"
"sync"
)
type InotifyTracker struct {
mux sync.Mutex
watchers map[*fsnotify.Watcher]bool
}
func NewInotifyTracker() *InotifyTracker {
t := new(InotifyTracker)
t.watchers = make(map[*fsnotify.Watcher]bool)
return t
}
func (t *InotifyTracker) NewWatcher() (*fsnotify.Watcher, error) {
t.mux.Lock()
defer t.mux.Unlock()
w, err := fsnotify.NewWatcher()
if err == nil {
t.watchers[w] = true
}
return w, err
}
func (t *InotifyTracker) CloseWatcher(w *fsnotify.Watcher) (err error) {
t.mux.Lock()
defer t.mux.Unlock()
if _, ok := t.watchers[w]; ok {
err = w.Close()
delete(t.watchers, w)
}
return
}
func (t *InotifyTracker) CloseAll() {
t.mux.Lock()
defer t.mux.Unlock()
for w, _ := range t.watchers {
if err := w.Close(); err != nil {
log.Printf("Error closing watcher: %v", err)
}
delete(t.watchers, w)
}
}

View File

@ -1,110 +0,0 @@
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
package watch
import (
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ActiveState/tail/util"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/gopkg.in/tomb.v1"
"os"
"time"
)
// PollingFileWatcher polls the file for changes.
type PollingFileWatcher struct {
Filename string
Size int64
}
func NewPollingFileWatcher(filename string) *PollingFileWatcher {
fw := &PollingFileWatcher{filename, 0}
return fw
}
var POLL_DURATION time.Duration
func (fw *PollingFileWatcher) BlockUntilExists(t *tomb.Tomb) error {
for {
if _, err := os.Stat(fw.Filename); err == nil {
return nil
} else if !os.IsNotExist(err) {
return err
}
select {
case <-time.After(POLL_DURATION):
continue
case <-t.Dying():
return tomb.ErrDying
}
}
panic("unreachable")
}
func (fw *PollingFileWatcher) ChangeEvents(t *tomb.Tomb, origFi os.FileInfo) *FileChanges {
changes := NewFileChanges()
var prevModTime time.Time
// XXX: use tomb.Tomb to cleanly manage these goroutines. replace
// the fatal (below) with tomb's Kill.
fw.Size = origFi.Size()
go func() {
defer changes.Close()
var retry int = 0
prevSize := fw.Size
for {
select {
case <-t.Dying():
return
default:
}
time.Sleep(POLL_DURATION)
fi, err := os.Stat(fw.Filename)
if err != nil {
if os.IsNotExist(err) {
// File does not exist (has been deleted).
changes.NotifyDeleted()
return
}
if permissionErrorRetry(err, &retry) {
continue
}
// XXX: report this error back to the user
util.Fatal("Failed to stat file %v: %v", fw.Filename, err)
}
// File got moved/renamed?
if !os.SameFile(origFi, fi) {
changes.NotifyDeleted()
return
}
// File got truncated?
fw.Size = fi.Size()
if prevSize > 0 && prevSize > fw.Size {
changes.NotifyTruncated()
prevSize = fw.Size
continue
}
prevSize = fw.Size
// File was appended to (changed)?
modTime := fi.ModTime()
if modTime != prevModTime {
prevModTime = modTime
changes.NotifyModified()
}
}
}()
return changes
}
func init() {
POLL_DURATION = 250 * time.Millisecond
}

View File

@ -1,9 +0,0 @@
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
// +build linux darwin freebsd
package watch
func permissionErrorRetry(err error, retry *int) bool {
// No need for this on linux, don't retry
return false
}

View File

@ -1,18 +0,0 @@
// +build windows
package watch
import (
"os"
)
const permissionDeniedRetryCount int = 5
func permissionErrorRetry(err error, retry *int) bool {
if os.IsPermission(err) && *retry < permissionDeniedRetryCount {
// While pooling a file that does not exist yet, but will be created by another process we can get Permission Denied
(*retry)++
return true
}
return false
}

View File

@ -1,20 +0,0 @@
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
package watch
import (
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/gopkg.in/tomb.v1"
"os"
)
// FileWatcher monitors file-level events.
type FileWatcher interface {
// BlockUntilExists blocks until the file comes into existence.
BlockUntilExists(*tomb.Tomb) error
// ChangeEvents reports on changes to a file, be it modification,
// deletion, renames or truncations. Returned FileChanges group of
// channels will be closed, thus become unusable, after a deletion
// or truncation event.
ChangeEvents(*tomb.Tomb, os.FileInfo) *FileChanges
}

View File

@ -1,92 +0,0 @@
// +build windows
package winfile
import (
"os"
"syscall"
"unsafe"
)
// issue also described here
//https://codereview.appspot.com/8203043/
// https://github.com/jnwhiteh/golang/blob/master/src/pkg/syscall/syscall_windows.go#L218
func Open(path string, mode int, perm uint32) (fd syscall.Handle, err error) {
if len(path) == 0 {
return syscall.InvalidHandle, syscall.ERROR_FILE_NOT_FOUND
}
pathp, err := syscall.UTF16PtrFromString(path)
if err != nil {
return syscall.InvalidHandle, err
}
var access uint32
switch mode & (syscall.O_RDONLY | syscall.O_WRONLY | syscall.O_RDWR) {
case syscall.O_RDONLY:
access = syscall.GENERIC_READ
case syscall.O_WRONLY:
access = syscall.GENERIC_WRITE
case syscall.O_RDWR:
access = syscall.GENERIC_READ | syscall.GENERIC_WRITE
}
if mode&syscall.O_CREAT != 0 {
access |= syscall.GENERIC_WRITE
}
if mode&syscall.O_APPEND != 0 {
access &^= syscall.GENERIC_WRITE
access |= syscall.FILE_APPEND_DATA
}
sharemode := uint32(syscall.FILE_SHARE_READ | syscall.FILE_SHARE_WRITE | syscall.FILE_SHARE_DELETE)
var sa *syscall.SecurityAttributes
if mode&syscall.O_CLOEXEC == 0 {
sa = makeInheritSa()
}
var createmode uint32
switch {
case mode&(syscall.O_CREAT|syscall.O_EXCL) == (syscall.O_CREAT | syscall.O_EXCL):
createmode = syscall.CREATE_NEW
case mode&(syscall.O_CREAT|syscall.O_TRUNC) == (syscall.O_CREAT | syscall.O_TRUNC):
createmode = syscall.CREATE_ALWAYS
case mode&syscall.O_CREAT == syscall.O_CREAT:
createmode = syscall.OPEN_ALWAYS
case mode&syscall.O_TRUNC == syscall.O_TRUNC:
createmode = syscall.TRUNCATE_EXISTING
default:
createmode = syscall.OPEN_EXISTING
}
h, e := syscall.CreateFile(pathp, access, sharemode, sa, createmode, syscall.FILE_ATTRIBUTE_NORMAL, 0)
return h, e
}
// https://github.com/jnwhiteh/golang/blob/master/src/pkg/syscall/syscall_windows.go#L211
func makeInheritSa() *syscall.SecurityAttributes {
var sa syscall.SecurityAttributes
sa.Length = uint32(unsafe.Sizeof(sa))
sa.InheritHandle = 1
return &sa
}
// https://github.com/jnwhiteh/golang/blob/master/src/pkg/os/file_windows.go#L133
func OpenFile(name string, flag int, perm os.FileMode) (file *os.File, err error) {
r, e := Open(name, flag|syscall.O_CLOEXEC, syscallMode(perm))
if e != nil {
return nil, e
}
return os.NewFile(uintptr(r), name), nil
}
// https://github.com/jnwhiteh/golang/blob/master/src/pkg/os/file_posix.go#L61
func syscallMode(i os.FileMode) (o uint32) {
o |= uint32(i.Perm())
if i&os.ModeSetuid != 0 {
o |= syscall.S_ISUID
}
if i&os.ModeSetgid != 0 {
o |= syscall.S_ISGID
}
if i&os.ModeSticky != 0 {
o |= syscall.S_ISVTX
}
// No mapping for Go's ModeTemporary (plan9 only).
return
}

View File

@ -1,5 +0,0 @@
# Setup a Global .gitignore for OS and editor generated files:
# https://help.github.com/articles/ignoring-files
# git config --global core.excludesfile ~/.gitignore_global
.vagrant

View File

@ -1,28 +0,0 @@
# Names should be added to this file as
# Name or Organization <email address>
# The email address is not required for organizations.
# You can update this list using the following command:
#
# $ git shortlog -se | awk '{print $2 " " $3 " " $4}'
# Please keep the list sorted.
Adrien Bustany <adrien@bustany.org>
Caleb Spare <cespare@gmail.com>
Case Nelson <case@teammating.com>
Chris Howey <howeyc@gmail.com> <chris@howey.me>
Christoffer Buchholz <christoffer.buchholz@gmail.com>
Dave Cheney <dave@cheney.net>
Francisco Souza <f@souza.cc>
John C Barstow
Kelvin Fo <vmirage@gmail.com>
Nathan Youngman <git@nathany.com>
Paul Hammond <paul@paulhammond.org>
Pursuit92 <JoshChase@techpursuit.net>
Rob Figueiredo <robfig@gmail.com>
Travis Cline <travis.cline@gmail.com>
Tudor Golubenco <tudor.g@gmail.com>
bronze1man <bronze1man@gmail.com>
debrando <denis.brandolini@gmail.com>
henrikedwards <henrik.edwards@gmail.com>

View File

@ -1,160 +0,0 @@
# Changelog
## v0.9.0 / 2014-01-17
* IsAttrib() for events that only concern a file's metadata [#79][] (thanks @abustany)
* [Fix] kqueue: fix deadlock [#77][] (thanks @cespare)
* [NOTICE] Development has moved to `code.google.com/p/go.exp/fsnotify` in preparation for inclusion in the Go standard library.
## v0.8.12 / 2013-11-13
* [API] Remove FD_SET and friends from Linux adapter
## v0.8.11 / 2013-11-02
* [Doc] Add Changelog [#72][] (thanks @nathany)
* [Doc] Spotlight and double modify events on OS X [#62][] (reported by @paulhammond)
## v0.8.10 / 2013-10-19
* [Fix] kqueue: remove file watches when parent directory is removed [#71][] (reported by @mdwhatcott)
* [Fix] kqueue: race between Close and readEvents [#70][] (reported by @bernerdschaefer)
* [Doc] specify OS-specific limits in README (thanks @debrando)
## v0.8.9 / 2013-09-08
* [Doc] Contributing (thanks @nathany)
* [Doc] update package path in example code [#63][] (thanks @paulhammond)
* [Doc] GoCI badge in README (Linux only) [#60][]
* [Doc] Cross-platform testing with Vagrant [#59][] (thanks @nathany)
## v0.8.8 / 2013-06-17
* [Fix] Windows: handle `ERROR_MORE_DATA` on Windows [#49][] (thanks @jbowtie)
## v0.8.7 / 2013-06-03
* [API] Make syscall flags internal
* [Fix] inotify: ignore event changes
* [Fix] race in symlink test [#45][] (reported by @srid)
* [Fix] tests on Windows
* lower case error messages
## v0.8.6 / 2013-05-23
* kqueue: Use EVT_ONLY flag on Darwin
* [Doc] Update README with full example
## v0.8.5 / 2013-05-09
* [Fix] inotify: allow monitoring of "broken" symlinks (thanks @tsg)
## v0.8.4 / 2013-04-07
* [Fix] kqueue: watch all file events [#40][] (thanks @ChrisBuchholz)
## v0.8.3 / 2013-03-13
* [Fix] inoitfy/kqueue memory leak [#36][] (reported by @nbkolchin)
* [Fix] kqueue: use fsnFlags for watching a directory [#33][] (reported by @nbkolchin)
## v0.8.2 / 2013-02-07
* [Doc] add Authors
* [Fix] fix data races for map access [#29][] (thanks @fsouza)
## v0.8.1 / 2013-01-09
* [Fix] Windows path separators
* [Doc] BSD License
## v0.8.0 / 2012-11-09
* kqueue: directory watching improvements (thanks @vmirage)
* inotify: add `IN_MOVED_TO` [#25][] (requested by @cpisto)
* [Fix] kqueue: deleting watched directory [#24][] (reported by @jakerr)
## v0.7.4 / 2012-10-09
* [Fix] inotify: fixes from https://codereview.appspot.com/5418045/ (ugorji)
* [Fix] kqueue: preserve watch flags when watching for delete [#21][] (reported by @robfig)
* [Fix] kqueue: watch the directory even if it isn't a new watch (thanks @robfig)
* [Fix] kqueue: modify after recreation of file
## v0.7.3 / 2012-09-27
* [Fix] kqueue: watch with an existing folder inside the watched folder (thanks @vmirage)
* [Fix] kqueue: no longer get duplicate CREATE events
## v0.7.2 / 2012-09-01
* kqueue: events for created directories
## v0.7.1 / 2012-07-14
* [Fix] for renaming files
## v0.7.0 / 2012-07-02
* [Feature] FSNotify flags
* [Fix] inotify: Added file name back to event path
## v0.6.0 / 2012-06-06
* kqueue: watch files after directory created (thanks @tmc)
## v0.5.1 / 2012-05-22
* [Fix] inotify: remove all watches before Close()
## v0.5.0 / 2012-05-03
* [API] kqueue: return errors during watch instead of sending over channel
* kqueue: match symlink behavior on Linux
* inotify: add `DELETE_SELF` (requested by @taralx)
* [Fix] kqueue: handle EINTR (reported by @robfig)
* [Doc] Godoc example [#1][] (thanks @davecheney)
## v0.4.0 / 2012-03-30
* Go 1 released: build with go tool
* [Feature] Windows support using winfsnotify
* Windows does not have attribute change notifications
* Roll attribute notifications into IsModify
## v0.3.0 / 2012-02-19
* kqueue: add files when watch directory
## v0.2.0 / 2011-12-30
* update to latest Go weekly code
## v0.1.0 / 2011-10-19
* kqueue: add watch on file creation to match inotify
* kqueue: create file event
* inotify: ignore `IN_IGNORED` events
* event String()
* linux: common FileEvent functions
* initial commit
[#79]: https://github.com/howeyc/fsnotify/pull/79
[#77]: https://github.com/howeyc/fsnotify/pull/77
[#72]: https://github.com/howeyc/fsnotify/issues/72
[#71]: https://github.com/howeyc/fsnotify/issues/71
[#70]: https://github.com/howeyc/fsnotify/issues/70
[#63]: https://github.com/howeyc/fsnotify/issues/63
[#62]: https://github.com/howeyc/fsnotify/issues/62
[#60]: https://github.com/howeyc/fsnotify/issues/60
[#59]: https://github.com/howeyc/fsnotify/issues/59
[#49]: https://github.com/howeyc/fsnotify/issues/49
[#45]: https://github.com/howeyc/fsnotify/issues/45
[#40]: https://github.com/howeyc/fsnotify/issues/40
[#36]: https://github.com/howeyc/fsnotify/issues/36
[#33]: https://github.com/howeyc/fsnotify/issues/33
[#29]: https://github.com/howeyc/fsnotify/issues/29
[#25]: https://github.com/howeyc/fsnotify/issues/25
[#24]: https://github.com/howeyc/fsnotify/issues/24
[#21]: https://github.com/howeyc/fsnotify/issues/21
[#1]: https://github.com/howeyc/fsnotify/issues/1

View File

@ -1,7 +0,0 @@
# Contributing
## Moving Notice
There is a fork being actively developed with a new API in preparation for the Go Standard Library:
[github.com/go-fsnotify/fsnotify](https://github.com/go-fsnotify/fsnotify)

View File

@ -1,28 +0,0 @@
Copyright (c) 2012 The Go Authors. All rights reserved.
Copyright (c) 2012 fsnotify Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,92 +0,0 @@
# File system notifications for Go
[![GoDoc](https://godoc.org/github.com/howeyc/fsnotify?status.png)](http://godoc.org/github.com/howeyc/fsnotify)
Cross platform: Windows, Linux, BSD and OS X.
## Moving Notice
There is a fork being actively developed with a new API in preparation for the Go Standard Library:
[github.com/go-fsnotify/fsnotify](https://github.com/go-fsnotify/fsnotify)
## Example:
```go
package main
import (
"log"
"github.com/howeyc/fsnotify"
)
func main() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
}
done := make(chan bool)
// Process events
go func() {
for {
select {
case ev := <-watcher.Event:
log.Println("event:", ev)
case err := <-watcher.Error:
log.Println("error:", err)
}
}
}()
err = watcher.Watch("testDir")
if err != nil {
log.Fatal(err)
}
<-done
/* ... do stuff ... */
watcher.Close()
}
```
For each event:
* Name
* IsCreate()
* IsDelete()
* IsModify()
* IsRename()
## FAQ
**When a file is moved to another directory is it still being watched?**
No (it shouldn't be, unless you are watching where it was moved to).
**When I watch a directory, are all subdirectories watched as well?**
No, you must add watches for any directory you want to watch (a recursive watcher is in the works [#56][]).
**Do I have to watch the Error and Event channels in a separate goroutine?**
As of now, yes. Looking into making this single-thread friendly (see [#7][])
**Why am I receiving multiple events for the same file on OS X?**
Spotlight indexing on OS X can result in multiple events (see [#62][]). A temporary workaround is to add your folder(s) to the *Spotlight Privacy settings* until we have a native FSEvents implementation (see [#54][]).
**How many files can be watched at once?**
There are OS-specific limits as to how many watches can be created:
* Linux: /proc/sys/fs/inotify/max_user_watches contains the limit,
reaching this limit results in a "no space left on device" error.
* BSD / OSX: sysctl variables "kern.maxfiles" and "kern.maxfilesperproc", reaching these limits results in a "too many open files" error.
[#62]: https://github.com/howeyc/fsnotify/issues/62
[#56]: https://github.com/howeyc/fsnotify/issues/56
[#54]: https://github.com/howeyc/fsnotify/issues/54
[#7]: https://github.com/howeyc/fsnotify/issues/7

View File

@ -1,34 +0,0 @@
// Copyright 2012 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package fsnotify_test
import (
"log"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/howeyc/fsnotify"
)
func ExampleNewWatcher() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
}
go func() {
for {
select {
case ev := <-watcher.Event:
log.Println("event:", ev)
case err := <-watcher.Error:
log.Println("error:", err)
}
}
}()
err = watcher.Watch("/tmp/foo")
if err != nil {
log.Fatal(err)
}
}

View File

@ -1,111 +0,0 @@
// Copyright 2012 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package fsnotify implements file system notification.
package fsnotify
import "fmt"
const (
FSN_CREATE = 1
FSN_MODIFY = 2
FSN_DELETE = 4
FSN_RENAME = 8
FSN_ALL = FSN_MODIFY | FSN_DELETE | FSN_RENAME | FSN_CREATE
)
// Purge events from interal chan to external chan if passes filter
func (w *Watcher) purgeEvents() {
for ev := range w.internalEvent {
sendEvent := false
w.fsnmut.Lock()
fsnFlags := w.fsnFlags[ev.Name]
w.fsnmut.Unlock()
if (fsnFlags&FSN_CREATE == FSN_CREATE) && ev.IsCreate() {
sendEvent = true
}
if (fsnFlags&FSN_MODIFY == FSN_MODIFY) && ev.IsModify() {
sendEvent = true
}
if (fsnFlags&FSN_DELETE == FSN_DELETE) && ev.IsDelete() {
sendEvent = true
}
if (fsnFlags&FSN_RENAME == FSN_RENAME) && ev.IsRename() {
sendEvent = true
}
if sendEvent {
w.Event <- ev
}
// If there's no file, then no more events for user
// BSD must keep watch for internal use (watches DELETEs to keep track
// what files exist for create events)
if ev.IsDelete() {
w.fsnmut.Lock()
delete(w.fsnFlags, ev.Name)
w.fsnmut.Unlock()
}
}
close(w.Event)
}
// Watch a given file path
func (w *Watcher) Watch(path string) error {
return w.WatchFlags(path, FSN_ALL)
}
// Watch a given file path for a particular set of notifications (FSN_MODIFY etc.)
func (w *Watcher) WatchFlags(path string, flags uint32) error {
w.fsnmut.Lock()
w.fsnFlags[path] = flags
w.fsnmut.Unlock()
return w.watch(path)
}
// Remove a watch on a file
func (w *Watcher) RemoveWatch(path string) error {
w.fsnmut.Lock()
delete(w.fsnFlags, path)
w.fsnmut.Unlock()
return w.removeWatch(path)
}
// String formats the event e in the form
// "filename: DELETE|MODIFY|..."
func (e *FileEvent) String() string {
var events string = ""
if e.IsCreate() {
events += "|" + "CREATE"
}
if e.IsDelete() {
events += "|" + "DELETE"
}
if e.IsModify() {
events += "|" + "MODIFY"
}
if e.IsRename() {
events += "|" + "RENAME"
}
if e.IsAttrib() {
events += "|" + "ATTRIB"
}
if len(events) > 0 {
events = events[1:]
}
return fmt.Sprintf("%q: %s", e.Name, events)
}

View File

@ -1,496 +0,0 @@
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build freebsd openbsd netbsd darwin
package fsnotify
import (
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"syscall"
)
const (
// Flags (from <sys/event.h>)
sys_NOTE_DELETE = 0x0001 /* vnode was removed */
sys_NOTE_WRITE = 0x0002 /* data contents changed */
sys_NOTE_EXTEND = 0x0004 /* size increased */
sys_NOTE_ATTRIB = 0x0008 /* attributes changed */
sys_NOTE_LINK = 0x0010 /* link count changed */
sys_NOTE_RENAME = 0x0020 /* vnode was renamed */
sys_NOTE_REVOKE = 0x0040 /* vnode access was revoked */
// Watch all events
sys_NOTE_ALLEVENTS = sys_NOTE_DELETE | sys_NOTE_WRITE | sys_NOTE_ATTRIB | sys_NOTE_RENAME
// Block for 100 ms on each call to kevent
keventWaitTime = 100e6
)
type FileEvent struct {
mask uint32 // Mask of events
Name string // File name (optional)
create bool // set by fsnotify package if found new file
}
// IsCreate reports whether the FileEvent was triggered by a creation
func (e *FileEvent) IsCreate() bool { return e.create }
// IsDelete reports whether the FileEvent was triggered by a delete
func (e *FileEvent) IsDelete() bool { return (e.mask & sys_NOTE_DELETE) == sys_NOTE_DELETE }
// IsModify reports whether the FileEvent was triggered by a file modification
func (e *FileEvent) IsModify() bool {
return ((e.mask&sys_NOTE_WRITE) == sys_NOTE_WRITE || (e.mask&sys_NOTE_ATTRIB) == sys_NOTE_ATTRIB)
}
// IsRename reports whether the FileEvent was triggered by a change name
func (e *FileEvent) IsRename() bool { return (e.mask & sys_NOTE_RENAME) == sys_NOTE_RENAME }
// IsAttrib reports whether the FileEvent was triggered by a change in the file metadata.
func (e *FileEvent) IsAttrib() bool {
return (e.mask & sys_NOTE_ATTRIB) == sys_NOTE_ATTRIB
}
type Watcher struct {
mu sync.Mutex // Mutex for the Watcher itself.
kq int // File descriptor (as returned by the kqueue() syscall)
watches map[string]int // Map of watched file descriptors (key: path)
wmut sync.Mutex // Protects access to watches.
fsnFlags map[string]uint32 // Map of watched files to flags used for filter
fsnmut sync.Mutex // Protects access to fsnFlags.
enFlags map[string]uint32 // Map of watched files to evfilt note flags used in kqueue
enmut sync.Mutex // Protects access to enFlags.
paths map[int]string // Map of watched paths (key: watch descriptor)
finfo map[int]os.FileInfo // Map of file information (isDir, isReg; key: watch descriptor)
pmut sync.Mutex // Protects access to paths and finfo.
fileExists map[string]bool // Keep track of if we know this file exists (to stop duplicate create events)
femut sync.Mutex // Protects access to fileExists.
externalWatches map[string]bool // Map of watches added by user of the library.
ewmut sync.Mutex // Protects access to externalWatches.
Error chan error // Errors are sent on this channel
internalEvent chan *FileEvent // Events are queued on this channel
Event chan *FileEvent // Events are returned on this channel
done chan bool // Channel for sending a "quit message" to the reader goroutine
isClosed bool // Set to true when Close() is first called
}
// NewWatcher creates and returns a new kevent instance using kqueue(2)
func NewWatcher() (*Watcher, error) {
fd, errno := syscall.Kqueue()
if fd == -1 {
return nil, os.NewSyscallError("kqueue", errno)
}
w := &Watcher{
kq: fd,
watches: make(map[string]int),
fsnFlags: make(map[string]uint32),
enFlags: make(map[string]uint32),
paths: make(map[int]string),
finfo: make(map[int]os.FileInfo),
fileExists: make(map[string]bool),
externalWatches: make(map[string]bool),
internalEvent: make(chan *FileEvent),
Event: make(chan *FileEvent),
Error: make(chan error),
done: make(chan bool, 1),
}
go w.readEvents()
go w.purgeEvents()
return w, nil
}
// Close closes a kevent watcher instance
// It sends a message to the reader goroutine to quit and removes all watches
// associated with the kevent instance
func (w *Watcher) Close() error {
w.mu.Lock()
if w.isClosed {
w.mu.Unlock()
return nil
}
w.isClosed = true
w.mu.Unlock()
// Send "quit" message to the reader goroutine
w.done <- true
w.wmut.Lock()
ws := w.watches
w.wmut.Unlock()
for path := range ws {
w.removeWatch(path)
}
return nil
}
// AddWatch adds path to the watched file set.
// The flags are interpreted as described in kevent(2).
func (w *Watcher) addWatch(path string, flags uint32) error {
w.mu.Lock()
if w.isClosed {
w.mu.Unlock()
return errors.New("kevent instance already closed")
}
w.mu.Unlock()
watchDir := false
w.wmut.Lock()
watchfd, found := w.watches[path]
w.wmut.Unlock()
if !found {
fi, errstat := os.Lstat(path)
if errstat != nil {
return errstat
}
// don't watch socket
if fi.Mode()&os.ModeSocket == os.ModeSocket {
return nil
}
// Follow Symlinks
// Unfortunately, Linux can add bogus symlinks to watch list without
// issue, and Windows can't do symlinks period (AFAIK). To maintain
// consistency, we will act like everything is fine. There will simply
// be no file events for broken symlinks.
// Hence the returns of nil on errors.
if fi.Mode()&os.ModeSymlink == os.ModeSymlink {
path, err := filepath.EvalSymlinks(path)
if err != nil {
return nil
}
fi, errstat = os.Lstat(path)
if errstat != nil {
return nil
}
}
fd, errno := syscall.Open(path, open_FLAGS, 0700)
if fd == -1 {
return errno
}
watchfd = fd
w.wmut.Lock()
w.watches[path] = watchfd
w.wmut.Unlock()
w.pmut.Lock()
w.paths[watchfd] = path
w.finfo[watchfd] = fi
w.pmut.Unlock()
}
// Watch the directory if it has not been watched before.
w.pmut.Lock()
w.enmut.Lock()
if w.finfo[watchfd].IsDir() &&
(flags&sys_NOTE_WRITE) == sys_NOTE_WRITE &&
(!found || (w.enFlags[path]&sys_NOTE_WRITE) != sys_NOTE_WRITE) {
watchDir = true
}
w.enmut.Unlock()
w.pmut.Unlock()
w.enmut.Lock()
w.enFlags[path] = flags
w.enmut.Unlock()
var kbuf [1]syscall.Kevent_t
watchEntry := &kbuf[0]
watchEntry.Fflags = flags
syscall.SetKevent(watchEntry, watchfd, syscall.EVFILT_VNODE, syscall.EV_ADD|syscall.EV_CLEAR)
entryFlags := watchEntry.Flags
success, errno := syscall.Kevent(w.kq, kbuf[:], nil, nil)
if success == -1 {
return errno
} else if (entryFlags & syscall.EV_ERROR) == syscall.EV_ERROR {
return errors.New("kevent add error")
}
if watchDir {
errdir := w.watchDirectoryFiles(path)
if errdir != nil {
return errdir
}
}
return nil
}
// Watch adds path to the watched file set, watching all events.
func (w *Watcher) watch(path string) error {
w.ewmut.Lock()
w.externalWatches[path] = true
w.ewmut.Unlock()
return w.addWatch(path, sys_NOTE_ALLEVENTS)
}
// RemoveWatch removes path from the watched file set.
func (w *Watcher) removeWatch(path string) error {
w.wmut.Lock()
watchfd, ok := w.watches[path]
w.wmut.Unlock()
if !ok {
return errors.New(fmt.Sprintf("can't remove non-existent kevent watch for: %s", path))
}
var kbuf [1]syscall.Kevent_t
watchEntry := &kbuf[0]
syscall.SetKevent(watchEntry, watchfd, syscall.EVFILT_VNODE, syscall.EV_DELETE)
entryFlags := watchEntry.Flags
success, errno := syscall.Kevent(w.kq, kbuf[:], nil, nil)
if success == -1 {
return os.NewSyscallError("kevent_rm_watch", errno)
} else if (entryFlags & syscall.EV_ERROR) == syscall.EV_ERROR {
return errors.New("kevent rm error")
}
syscall.Close(watchfd)
w.wmut.Lock()
delete(w.watches, path)
w.wmut.Unlock()
w.enmut.Lock()
delete(w.enFlags, path)
w.enmut.Unlock()
w.pmut.Lock()
delete(w.paths, watchfd)
fInfo := w.finfo[watchfd]
delete(w.finfo, watchfd)
w.pmut.Unlock()
// Find all watched paths that are in this directory that are not external.
if fInfo.IsDir() {
var pathsToRemove []string
w.pmut.Lock()
for _, wpath := range w.paths {
wdir, _ := filepath.Split(wpath)
if filepath.Clean(wdir) == filepath.Clean(path) {
w.ewmut.Lock()
if !w.externalWatches[wpath] {
pathsToRemove = append(pathsToRemove, wpath)
}
w.ewmut.Unlock()
}
}
w.pmut.Unlock()
for _, p := range pathsToRemove {
// Since these are internal, not much sense in propagating error
// to the user, as that will just confuse them with an error about
// a path they did not explicitly watch themselves.
w.removeWatch(p)
}
}
return nil
}
// readEvents reads from the kqueue file descriptor, converts the
// received events into Event objects and sends them via the Event channel
func (w *Watcher) readEvents() {
var (
eventbuf [10]syscall.Kevent_t // Event buffer
events []syscall.Kevent_t // Received events
twait *syscall.Timespec // Time to block waiting for events
n int // Number of events returned from kevent
errno error // Syscall errno
)
events = eventbuf[0:0]
twait = new(syscall.Timespec)
*twait = syscall.NsecToTimespec(keventWaitTime)
for {
// See if there is a message on the "done" channel
var done bool
select {
case done = <-w.done:
default:
}
// If "done" message is received
if done {
errno := syscall.Close(w.kq)
if errno != nil {
w.Error <- os.NewSyscallError("close", errno)
}
close(w.internalEvent)
close(w.Error)
return
}
// Get new events
if len(events) == 0 {
n, errno = syscall.Kevent(w.kq, nil, eventbuf[:], twait)
// EINTR is okay, basically the syscall was interrupted before
// timeout expired.
if errno != nil && errno != syscall.EINTR {
w.Error <- os.NewSyscallError("kevent", errno)
continue
}
// Received some events
if n > 0 {
events = eventbuf[0:n]
}
}
// Flush the events we received to the events channel
for len(events) > 0 {
fileEvent := new(FileEvent)
watchEvent := &events[0]
fileEvent.mask = uint32(watchEvent.Fflags)
w.pmut.Lock()
fileEvent.Name = w.paths[int(watchEvent.Ident)]
fileInfo := w.finfo[int(watchEvent.Ident)]
w.pmut.Unlock()
if fileInfo != nil && fileInfo.IsDir() && !fileEvent.IsDelete() {
// Double check to make sure the directory exist. This can happen when
// we do a rm -fr on a recursively watched folders and we receive a
// modification event first but the folder has been deleted and later
// receive the delete event
if _, err := os.Lstat(fileEvent.Name); os.IsNotExist(err) {
// mark is as delete event
fileEvent.mask |= sys_NOTE_DELETE
}
}
if fileInfo != nil && fileInfo.IsDir() && fileEvent.IsModify() && !fileEvent.IsDelete() {
w.sendDirectoryChangeEvents(fileEvent.Name)
} else {
// Send the event on the events channel
w.internalEvent <- fileEvent
}
// Move to next event
events = events[1:]
if fileEvent.IsRename() {
w.removeWatch(fileEvent.Name)
w.femut.Lock()
delete(w.fileExists, fileEvent.Name)
w.femut.Unlock()
}
if fileEvent.IsDelete() {
w.removeWatch(fileEvent.Name)
w.femut.Lock()
delete(w.fileExists, fileEvent.Name)
w.femut.Unlock()
// Look for a file that may have overwritten this
// (ie mv f1 f2 will delete f2 then create f2)
fileDir, _ := filepath.Split(fileEvent.Name)
fileDir = filepath.Clean(fileDir)
w.wmut.Lock()
_, found := w.watches[fileDir]
w.wmut.Unlock()
if found {
// make sure the directory exist before we watch for changes. When we
// do a recursive watch and perform rm -fr, the parent directory might
// have gone missing, ignore the missing directory and let the
// upcoming delete event remove the watch form the parent folder
if _, err := os.Lstat(fileDir); !os.IsNotExist(err) {
w.sendDirectoryChangeEvents(fileDir)
}
}
}
}
}
}
func (w *Watcher) watchDirectoryFiles(dirPath string) error {
// Get all files
files, err := ioutil.ReadDir(dirPath)
if err != nil {
return err
}
// Search for new files
for _, fileInfo := range files {
filePath := filepath.Join(dirPath, fileInfo.Name())
// Inherit fsnFlags from parent directory
w.fsnmut.Lock()
if flags, found := w.fsnFlags[dirPath]; found {
w.fsnFlags[filePath] = flags
} else {
w.fsnFlags[filePath] = FSN_ALL
}
w.fsnmut.Unlock()
if fileInfo.IsDir() == false {
// Watch file to mimic linux fsnotify
e := w.addWatch(filePath, sys_NOTE_ALLEVENTS)
if e != nil {
return e
}
} else {
// If the user is currently watching directory
// we want to preserve the flags used
w.enmut.Lock()
currFlags, found := w.enFlags[filePath]
w.enmut.Unlock()
var newFlags uint32 = sys_NOTE_DELETE
if found {
newFlags |= currFlags
}
// Linux gives deletes if not explicitly watching
e := w.addWatch(filePath, newFlags)
if e != nil {
return e
}
}
w.femut.Lock()
w.fileExists[filePath] = true
w.femut.Unlock()
}
return nil
}
// sendDirectoryEvents searches the directory for newly created files
// and sends them over the event channel. This functionality is to have
// the BSD version of fsnotify match linux fsnotify which provides a
// create event for files created in a watched directory.
func (w *Watcher) sendDirectoryChangeEvents(dirPath string) {
// Get all files
files, err := ioutil.ReadDir(dirPath)
if err != nil {
w.Error <- err
}
// Search for new files
for _, fileInfo := range files {
filePath := filepath.Join(dirPath, fileInfo.Name())
w.femut.Lock()
_, doesExist := w.fileExists[filePath]
w.femut.Unlock()
if !doesExist {
// Inherit fsnFlags from parent directory
w.fsnmut.Lock()
if flags, found := w.fsnFlags[dirPath]; found {
w.fsnFlags[filePath] = flags
} else {
w.fsnFlags[filePath] = FSN_ALL
}
w.fsnmut.Unlock()
// Send create event
fileEvent := new(FileEvent)
fileEvent.Name = filePath
fileEvent.create = true
w.internalEvent <- fileEvent
}
w.femut.Lock()
w.fileExists[filePath] = true
w.femut.Unlock()
}
w.watchDirectoryFiles(dirPath)
}

View File

@ -1,304 +0,0 @@
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build linux
package fsnotify
import (
"errors"
"fmt"
"os"
"strings"
"sync"
"syscall"
"unsafe"
)
const (
// Options for inotify_init() are not exported
// sys_IN_CLOEXEC uint32 = syscall.IN_CLOEXEC
// sys_IN_NONBLOCK uint32 = syscall.IN_NONBLOCK
// Options for AddWatch
sys_IN_DONT_FOLLOW uint32 = syscall.IN_DONT_FOLLOW
sys_IN_ONESHOT uint32 = syscall.IN_ONESHOT
sys_IN_ONLYDIR uint32 = syscall.IN_ONLYDIR
// The "sys_IN_MASK_ADD" option is not exported, as AddWatch
// adds it automatically, if there is already a watch for the given path
// sys_IN_MASK_ADD uint32 = syscall.IN_MASK_ADD
// Events
sys_IN_ACCESS uint32 = syscall.IN_ACCESS
sys_IN_ALL_EVENTS uint32 = syscall.IN_ALL_EVENTS
sys_IN_ATTRIB uint32 = syscall.IN_ATTRIB
sys_IN_CLOSE uint32 = syscall.IN_CLOSE
sys_IN_CLOSE_NOWRITE uint32 = syscall.IN_CLOSE_NOWRITE
sys_IN_CLOSE_WRITE uint32 = syscall.IN_CLOSE_WRITE
sys_IN_CREATE uint32 = syscall.IN_CREATE
sys_IN_DELETE uint32 = syscall.IN_DELETE
sys_IN_DELETE_SELF uint32 = syscall.IN_DELETE_SELF
sys_IN_MODIFY uint32 = syscall.IN_MODIFY
sys_IN_MOVE uint32 = syscall.IN_MOVE
sys_IN_MOVED_FROM uint32 = syscall.IN_MOVED_FROM
sys_IN_MOVED_TO uint32 = syscall.IN_MOVED_TO
sys_IN_MOVE_SELF uint32 = syscall.IN_MOVE_SELF
sys_IN_OPEN uint32 = syscall.IN_OPEN
sys_AGNOSTIC_EVENTS = sys_IN_MOVED_TO | sys_IN_MOVED_FROM | sys_IN_CREATE | sys_IN_ATTRIB | sys_IN_MODIFY | sys_IN_MOVE_SELF | sys_IN_DELETE | sys_IN_DELETE_SELF
// Special events
sys_IN_ISDIR uint32 = syscall.IN_ISDIR
sys_IN_IGNORED uint32 = syscall.IN_IGNORED
sys_IN_Q_OVERFLOW uint32 = syscall.IN_Q_OVERFLOW
sys_IN_UNMOUNT uint32 = syscall.IN_UNMOUNT
)
type FileEvent struct {
mask uint32 // Mask of events
cookie uint32 // Unique cookie associating related events (for rename(2))
Name string // File name (optional)
}
// IsCreate reports whether the FileEvent was triggered by a creation
func (e *FileEvent) IsCreate() bool {
return (e.mask&sys_IN_CREATE) == sys_IN_CREATE || (e.mask&sys_IN_MOVED_TO) == sys_IN_MOVED_TO
}
// IsDelete reports whether the FileEvent was triggered by a delete
func (e *FileEvent) IsDelete() bool {
return (e.mask&sys_IN_DELETE_SELF) == sys_IN_DELETE_SELF || (e.mask&sys_IN_DELETE) == sys_IN_DELETE
}
// IsModify reports whether the FileEvent was triggered by a file modification or attribute change
func (e *FileEvent) IsModify() bool {
return ((e.mask&sys_IN_MODIFY) == sys_IN_MODIFY || (e.mask&sys_IN_ATTRIB) == sys_IN_ATTRIB)
}
// IsRename reports whether the FileEvent was triggered by a change name
func (e *FileEvent) IsRename() bool {
return ((e.mask&sys_IN_MOVE_SELF) == sys_IN_MOVE_SELF || (e.mask&sys_IN_MOVED_FROM) == sys_IN_MOVED_FROM)
}
// IsAttrib reports whether the FileEvent was triggered by a change in the file metadata.
func (e *FileEvent) IsAttrib() bool {
return (e.mask & sys_IN_ATTRIB) == sys_IN_ATTRIB
}
type watch struct {
wd uint32 // Watch descriptor (as returned by the inotify_add_watch() syscall)
flags uint32 // inotify flags of this watch (see inotify(7) for the list of valid flags)
}
type Watcher struct {
mu sync.Mutex // Map access
fd int // File descriptor (as returned by the inotify_init() syscall)
watches map[string]*watch // Map of inotify watches (key: path)
fsnFlags map[string]uint32 // Map of watched files to flags used for filter
fsnmut sync.Mutex // Protects access to fsnFlags.
paths map[int]string // Map of watched paths (key: watch descriptor)
Error chan error // Errors are sent on this channel
internalEvent chan *FileEvent // Events are queued on this channel
Event chan *FileEvent // Events are returned on this channel
done chan bool // Channel for sending a "quit message" to the reader goroutine
isClosed bool // Set to true when Close() is first called
}
// NewWatcher creates and returns a new inotify instance using inotify_init(2)
func NewWatcher() (*Watcher, error) {
fd, errno := syscall.InotifyInit()
if fd == -1 {
return nil, os.NewSyscallError("inotify_init", errno)
}
w := &Watcher{
fd: fd,
watches: make(map[string]*watch),
fsnFlags: make(map[string]uint32),
paths: make(map[int]string),
internalEvent: make(chan *FileEvent),
Event: make(chan *FileEvent),
Error: make(chan error),
done: make(chan bool, 1),
}
go w.readEvents()
go w.purgeEvents()
return w, nil
}
// Close closes an inotify watcher instance
// It sends a message to the reader goroutine to quit and removes all watches
// associated with the inotify instance
func (w *Watcher) Close() error {
if w.isClosed {
return nil
}
w.isClosed = true
// Remove all watches
for path := range w.watches {
w.RemoveWatch(path)
}
// Send "quit" message to the reader goroutine
w.done <- true
return nil
}
// AddWatch adds path to the watched file set.
// The flags are interpreted as described in inotify_add_watch(2).
func (w *Watcher) addWatch(path string, flags uint32) error {
if w.isClosed {
return errors.New("inotify instance already closed")
}
w.mu.Lock()
watchEntry, found := w.watches[path]
w.mu.Unlock()
if found {
watchEntry.flags |= flags
flags |= syscall.IN_MASK_ADD
}
wd, errno := syscall.InotifyAddWatch(w.fd, path, flags)
if wd == -1 {
return errno
}
w.mu.Lock()
w.watches[path] = &watch{wd: uint32(wd), flags: flags}
w.paths[wd] = path
w.mu.Unlock()
return nil
}
// Watch adds path to the watched file set, watching all events.
func (w *Watcher) watch(path string) error {
return w.addWatch(path, sys_AGNOSTIC_EVENTS)
}
// RemoveWatch removes path from the watched file set.
func (w *Watcher) removeWatch(path string) error {
w.mu.Lock()
defer w.mu.Unlock()
watch, ok := w.watches[path]
if !ok {
return errors.New(fmt.Sprintf("can't remove non-existent inotify watch for: %s", path))
}
success, errno := syscall.InotifyRmWatch(w.fd, watch.wd)
if success == -1 {
return os.NewSyscallError("inotify_rm_watch", errno)
}
delete(w.watches, path)
return nil
}
// readEvents reads from the inotify file descriptor, converts the
// received events into Event objects and sends them via the Event channel
func (w *Watcher) readEvents() {
var (
buf [syscall.SizeofInotifyEvent * 4096]byte // Buffer for a maximum of 4096 raw events
n int // Number of bytes read with read()
errno error // Syscall errno
)
for {
// See if there is a message on the "done" channel
select {
case <-w.done:
syscall.Close(w.fd)
close(w.internalEvent)
close(w.Error)
return
default:
}
n, errno = syscall.Read(w.fd, buf[:])
// If EOF is received
if n == 0 {
syscall.Close(w.fd)
close(w.internalEvent)
close(w.Error)
return
}
if n < 0 {
w.Error <- os.NewSyscallError("read", errno)
continue
}
if n < syscall.SizeofInotifyEvent {
w.Error <- errors.New("inotify: short read in readEvents()")
continue
}
var offset uint32 = 0
// We don't know how many events we just read into the buffer
// While the offset points to at least one whole event...
for offset <= uint32(n-syscall.SizeofInotifyEvent) {
// Point "raw" to the event in the buffer
raw := (*syscall.InotifyEvent)(unsafe.Pointer(&buf[offset]))
event := new(FileEvent)
event.mask = uint32(raw.Mask)
event.cookie = uint32(raw.Cookie)
nameLen := uint32(raw.Len)
// If the event happened to the watched directory or the watched file, the kernel
// doesn't append the filename to the event, but we would like to always fill the
// the "Name" field with a valid filename. We retrieve the path of the watch from
// the "paths" map.
w.mu.Lock()
event.Name = w.paths[int(raw.Wd)]
w.mu.Unlock()
watchedName := event.Name
if nameLen > 0 {
// Point "bytes" at the first byte of the filename
bytes := (*[syscall.PathMax]byte)(unsafe.Pointer(&buf[offset+syscall.SizeofInotifyEvent]))
// The filename is padded with NUL bytes. TrimRight() gets rid of those.
event.Name += "/" + strings.TrimRight(string(bytes[0:nameLen]), "\000")
}
// Send the events that are not ignored on the events channel
if !event.ignoreLinux() {
// Setup FSNotify flags (inherit from directory watch)
w.fsnmut.Lock()
if _, fsnFound := w.fsnFlags[event.Name]; !fsnFound {
if fsnFlags, watchFound := w.fsnFlags[watchedName]; watchFound {
w.fsnFlags[event.Name] = fsnFlags
} else {
w.fsnFlags[event.Name] = FSN_ALL
}
}
w.fsnmut.Unlock()
w.internalEvent <- event
}
// Move to the next event in the buffer
offset += syscall.SizeofInotifyEvent + nameLen
}
}
}
// Certain types of events can be "ignored" and not sent over the Event
// channel. Such as events marked ignore by the kernel, or MODIFY events
// against files that do not exist.
func (e *FileEvent) ignoreLinux() bool {
// Ignore anything the inotify API says to ignore
if e.mask&sys_IN_IGNORED == sys_IN_IGNORED {
return true
}
// If the event is not a DELETE or RENAME, the file must exist.
// Otherwise the event is ignored.
// *Note*: this was put in place because it was seen that a MODIFY
// event was sent after the DELETE. This ignores that MODIFY and
// assumes a DELETE will come or has come if the file doesn't exist.
if !(e.IsDelete() || e.IsRename()) {
_, statErr := os.Lstat(e.Name)
return os.IsNotExist(statErr)
}
return false
}

View File

@ -1,11 +0,0 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build freebsd openbsd netbsd
package fsnotify
import "syscall"
const open_FLAGS = syscall.O_NONBLOCK | syscall.O_RDONLY

View File

@ -1,11 +0,0 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build darwin
package fsnotify
import "syscall"
const open_FLAGS = syscall.O_EVTONLY

View File

@ -1,74 +0,0 @@
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build freebsd openbsd netbsd darwin linux
package fsnotify
import (
"os"
"path/filepath"
"testing"
"time"
)
func TestFsnotifyFakeSymlink(t *testing.T) {
watcher := newWatcher(t)
// Create directory to watch
testDir := tempMkdir(t)
defer os.RemoveAll(testDir)
var errorsReceived counter
// Receive errors on the error channel on a separate goroutine
go func() {
for errors := range watcher.Error {
t.Logf("Received error: %s", errors)
errorsReceived.increment()
}
}()
// Count the CREATE events received
var createEventsReceived, otherEventsReceived counter
go func() {
for ev := range watcher.Event {
t.Logf("event received: %s", ev)
if ev.IsCreate() {
createEventsReceived.increment()
} else {
otherEventsReceived.increment()
}
}
}()
addWatch(t, watcher, testDir)
if err := os.Symlink(filepath.Join(testDir, "zzz"), filepath.Join(testDir, "zzznew")); err != nil {
t.Fatalf("Failed to create bogus symlink: %s", err)
}
t.Logf("Created bogus symlink")
// We expect this event to be received almost immediately, but let's wait 500 ms to be sure
time.Sleep(500 * time.Millisecond)
// Should not be error, just no events for broken links (watching nothing)
if errorsReceived.value() > 0 {
t.Fatal("fsnotify errors have been received.")
}
if otherEventsReceived.value() > 0 {
t.Fatal("fsnotify other events received on the broken link")
}
// Except for 1 create event (for the link itself)
if createEventsReceived.value() == 0 {
t.Fatal("fsnotify create events were not received after 500 ms")
}
if createEventsReceived.value() > 1 {
t.Fatal("fsnotify more create events received than expected")
}
// Try closing the fsnotify instance
t.Log("calling Close()")
watcher.Close()
}

File diff suppressed because it is too large Load Diff

View File

@ -1,598 +0,0 @@
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build windows
package fsnotify
import (
"errors"
"fmt"
"os"
"path/filepath"
"runtime"
"sync"
"syscall"
"unsafe"
)
const (
// Options for AddWatch
sys_FS_ONESHOT = 0x80000000
sys_FS_ONLYDIR = 0x1000000
// Events
sys_FS_ACCESS = 0x1
sys_FS_ALL_EVENTS = 0xfff
sys_FS_ATTRIB = 0x4
sys_FS_CLOSE = 0x18
sys_FS_CREATE = 0x100
sys_FS_DELETE = 0x200
sys_FS_DELETE_SELF = 0x400
sys_FS_MODIFY = 0x2
sys_FS_MOVE = 0xc0
sys_FS_MOVED_FROM = 0x40
sys_FS_MOVED_TO = 0x80
sys_FS_MOVE_SELF = 0x800
// Special events
sys_FS_IGNORED = 0x8000
sys_FS_Q_OVERFLOW = 0x4000
)
const (
// TODO(nj): Use syscall.ERROR_MORE_DATA from ztypes_windows in Go 1.3+
sys_ERROR_MORE_DATA syscall.Errno = 234
)
// Event is the type of the notification messages
// received on the watcher's Event channel.
type FileEvent struct {
mask uint32 // Mask of events
cookie uint32 // Unique cookie associating related events (for rename)
Name string // File name (optional)
}
// IsCreate reports whether the FileEvent was triggered by a creation
func (e *FileEvent) IsCreate() bool { return (e.mask & sys_FS_CREATE) == sys_FS_CREATE }
// IsDelete reports whether the FileEvent was triggered by a delete
func (e *FileEvent) IsDelete() bool {
return ((e.mask&sys_FS_DELETE) == sys_FS_DELETE || (e.mask&sys_FS_DELETE_SELF) == sys_FS_DELETE_SELF)
}
// IsModify reports whether the FileEvent was triggered by a file modification or attribute change
func (e *FileEvent) IsModify() bool {
return ((e.mask&sys_FS_MODIFY) == sys_FS_MODIFY || (e.mask&sys_FS_ATTRIB) == sys_FS_ATTRIB)
}
// IsRename reports whether the FileEvent was triggered by a change name
func (e *FileEvent) IsRename() bool {
return ((e.mask&sys_FS_MOVE) == sys_FS_MOVE || (e.mask&sys_FS_MOVE_SELF) == sys_FS_MOVE_SELF || (e.mask&sys_FS_MOVED_FROM) == sys_FS_MOVED_FROM || (e.mask&sys_FS_MOVED_TO) == sys_FS_MOVED_TO)
}
// IsAttrib reports whether the FileEvent was triggered by a change in the file metadata.
func (e *FileEvent) IsAttrib() bool {
return (e.mask & sys_FS_ATTRIB) == sys_FS_ATTRIB
}
const (
opAddWatch = iota
opRemoveWatch
)
const (
provisional uint64 = 1 << (32 + iota)
)
type input struct {
op int
path string
flags uint32
reply chan error
}
type inode struct {
handle syscall.Handle
volume uint32
index uint64
}
type watch struct {
ov syscall.Overlapped
ino *inode // i-number
path string // Directory path
mask uint64 // Directory itself is being watched with these notify flags
names map[string]uint64 // Map of names being watched and their notify flags
rename string // Remembers the old name while renaming a file
buf [4096]byte
}
type indexMap map[uint64]*watch
type watchMap map[uint32]indexMap
// A Watcher waits for and receives event notifications
// for a specific set of files and directories.
type Watcher struct {
mu sync.Mutex // Map access
port syscall.Handle // Handle to completion port
watches watchMap // Map of watches (key: i-number)
fsnFlags map[string]uint32 // Map of watched files to flags used for filter
fsnmut sync.Mutex // Protects access to fsnFlags.
input chan *input // Inputs to the reader are sent on this channel
internalEvent chan *FileEvent // Events are queued on this channel
Event chan *FileEvent // Events are returned on this channel
Error chan error // Errors are sent on this channel
isClosed bool // Set to true when Close() is first called
quit chan chan<- error
cookie uint32
}
// NewWatcher creates and returns a Watcher.
func NewWatcher() (*Watcher, error) {
port, e := syscall.CreateIoCompletionPort(syscall.InvalidHandle, 0, 0, 0)
if e != nil {
return nil, os.NewSyscallError("CreateIoCompletionPort", e)
}
w := &Watcher{
port: port,
watches: make(watchMap),
fsnFlags: make(map[string]uint32),
input: make(chan *input, 1),
Event: make(chan *FileEvent, 50),
internalEvent: make(chan *FileEvent),
Error: make(chan error),
quit: make(chan chan<- error, 1),
}
go w.readEvents()
go w.purgeEvents()
return w, nil
}
// Close closes a Watcher.
// It sends a message to the reader goroutine to quit and removes all watches
// associated with the watcher.
func (w *Watcher) Close() error {
if w.isClosed {
return nil
}
w.isClosed = true
// Send "quit" message to the reader goroutine
ch := make(chan error)
w.quit <- ch
if err := w.wakeupReader(); err != nil {
return err
}
return <-ch
}
// AddWatch adds path to the watched file set.
func (w *Watcher) AddWatch(path string, flags uint32) error {
if w.isClosed {
return errors.New("watcher already closed")
}
in := &input{
op: opAddWatch,
path: filepath.Clean(path),
flags: flags,
reply: make(chan error),
}
w.input <- in
if err := w.wakeupReader(); err != nil {
return err
}
return <-in.reply
}
// Watch adds path to the watched file set, watching all events.
func (w *Watcher) watch(path string) error {
return w.AddWatch(path, sys_FS_ALL_EVENTS)
}
// RemoveWatch removes path from the watched file set.
func (w *Watcher) removeWatch(path string) error {
in := &input{
op: opRemoveWatch,
path: filepath.Clean(path),
reply: make(chan error),
}
w.input <- in
if err := w.wakeupReader(); err != nil {
return err
}
return <-in.reply
}
func (w *Watcher) wakeupReader() error {
e := syscall.PostQueuedCompletionStatus(w.port, 0, 0, nil)
if e != nil {
return os.NewSyscallError("PostQueuedCompletionStatus", e)
}
return nil
}
func getDir(pathname string) (dir string, err error) {
attr, e := syscall.GetFileAttributes(syscall.StringToUTF16Ptr(pathname))
if e != nil {
return "", os.NewSyscallError("GetFileAttributes", e)
}
if attr&syscall.FILE_ATTRIBUTE_DIRECTORY != 0 {
dir = pathname
} else {
dir, _ = filepath.Split(pathname)
dir = filepath.Clean(dir)
}
return
}
func getIno(path string) (ino *inode, err error) {
h, e := syscall.CreateFile(syscall.StringToUTF16Ptr(path),
syscall.FILE_LIST_DIRECTORY,
syscall.FILE_SHARE_READ|syscall.FILE_SHARE_WRITE|syscall.FILE_SHARE_DELETE,
nil, syscall.OPEN_EXISTING,
syscall.FILE_FLAG_BACKUP_SEMANTICS|syscall.FILE_FLAG_OVERLAPPED, 0)
if e != nil {
return nil, os.NewSyscallError("CreateFile", e)
}
var fi syscall.ByHandleFileInformation
if e = syscall.GetFileInformationByHandle(h, &fi); e != nil {
syscall.CloseHandle(h)
return nil, os.NewSyscallError("GetFileInformationByHandle", e)
}
ino = &inode{
handle: h,
volume: fi.VolumeSerialNumber,
index: uint64(fi.FileIndexHigh)<<32 | uint64(fi.FileIndexLow),
}
return ino, nil
}
// Must run within the I/O thread.
func (m watchMap) get(ino *inode) *watch {
if i := m[ino.volume]; i != nil {
return i[ino.index]
}
return nil
}
// Must run within the I/O thread.
func (m watchMap) set(ino *inode, watch *watch) {
i := m[ino.volume]
if i == nil {
i = make(indexMap)
m[ino.volume] = i
}
i[ino.index] = watch
}
// Must run within the I/O thread.
func (w *Watcher) addWatch(pathname string, flags uint64) error {
dir, err := getDir(pathname)
if err != nil {
return err
}
if flags&sys_FS_ONLYDIR != 0 && pathname != dir {
return nil
}
ino, err := getIno(dir)
if err != nil {
return err
}
w.mu.Lock()
watchEntry := w.watches.get(ino)
w.mu.Unlock()
if watchEntry == nil {
if _, e := syscall.CreateIoCompletionPort(ino.handle, w.port, 0, 0); e != nil {
syscall.CloseHandle(ino.handle)
return os.NewSyscallError("CreateIoCompletionPort", e)
}
watchEntry = &watch{
ino: ino,
path: dir,
names: make(map[string]uint64),
}
w.mu.Lock()
w.watches.set(ino, watchEntry)
w.mu.Unlock()
flags |= provisional
} else {
syscall.CloseHandle(ino.handle)
}
if pathname == dir {
watchEntry.mask |= flags
} else {
watchEntry.names[filepath.Base(pathname)] |= flags
}
if err = w.startRead(watchEntry); err != nil {
return err
}
if pathname == dir {
watchEntry.mask &= ^provisional
} else {
watchEntry.names[filepath.Base(pathname)] &= ^provisional
}
return nil
}
// Must run within the I/O thread.
func (w *Watcher) remWatch(pathname string) error {
dir, err := getDir(pathname)
if err != nil {
return err
}
ino, err := getIno(dir)
if err != nil {
return err
}
w.mu.Lock()
watch := w.watches.get(ino)
w.mu.Unlock()
if watch == nil {
return fmt.Errorf("can't remove non-existent watch for: %s", pathname)
}
if pathname == dir {
w.sendEvent(watch.path, watch.mask&sys_FS_IGNORED)
watch.mask = 0
} else {
name := filepath.Base(pathname)
w.sendEvent(watch.path+"\\"+name, watch.names[name]&sys_FS_IGNORED)
delete(watch.names, name)
}
return w.startRead(watch)
}
// Must run within the I/O thread.
func (w *Watcher) deleteWatch(watch *watch) {
for name, mask := range watch.names {
if mask&provisional == 0 {
w.sendEvent(watch.path+"\\"+name, mask&sys_FS_IGNORED)
}
delete(watch.names, name)
}
if watch.mask != 0 {
if watch.mask&provisional == 0 {
w.sendEvent(watch.path, watch.mask&sys_FS_IGNORED)
}
watch.mask = 0
}
}
// Must run within the I/O thread.
func (w *Watcher) startRead(watch *watch) error {
if e := syscall.CancelIo(watch.ino.handle); e != nil {
w.Error <- os.NewSyscallError("CancelIo", e)
w.deleteWatch(watch)
}
mask := toWindowsFlags(watch.mask)
for _, m := range watch.names {
mask |= toWindowsFlags(m)
}
if mask == 0 {
if e := syscall.CloseHandle(watch.ino.handle); e != nil {
w.Error <- os.NewSyscallError("CloseHandle", e)
}
w.mu.Lock()
delete(w.watches[watch.ino.volume], watch.ino.index)
w.mu.Unlock()
return nil
}
e := syscall.ReadDirectoryChanges(watch.ino.handle, &watch.buf[0],
uint32(unsafe.Sizeof(watch.buf)), false, mask, nil, &watch.ov, 0)
if e != nil {
err := os.NewSyscallError("ReadDirectoryChanges", e)
if e == syscall.ERROR_ACCESS_DENIED && watch.mask&provisional == 0 {
// Watched directory was probably removed
if w.sendEvent(watch.path, watch.mask&sys_FS_DELETE_SELF) {
if watch.mask&sys_FS_ONESHOT != 0 {
watch.mask = 0
}
}
err = nil
}
w.deleteWatch(watch)
w.startRead(watch)
return err
}
return nil
}
// readEvents reads from the I/O completion port, converts the
// received events into Event objects and sends them via the Event channel.
// Entry point to the I/O thread.
func (w *Watcher) readEvents() {
var (
n, key uint32
ov *syscall.Overlapped
)
runtime.LockOSThread()
for {
e := syscall.GetQueuedCompletionStatus(w.port, &n, &key, &ov, syscall.INFINITE)
watch := (*watch)(unsafe.Pointer(ov))
if watch == nil {
select {
case ch := <-w.quit:
w.mu.Lock()
var indexes []indexMap
for _, index := range w.watches {
indexes = append(indexes, index)
}
w.mu.Unlock()
for _, index := range indexes {
for _, watch := range index {
w.deleteWatch(watch)
w.startRead(watch)
}
}
var err error
if e := syscall.CloseHandle(w.port); e != nil {
err = os.NewSyscallError("CloseHandle", e)
}
close(w.internalEvent)
close(w.Error)
ch <- err
return
case in := <-w.input:
switch in.op {
case opAddWatch:
in.reply <- w.addWatch(in.path, uint64(in.flags))
case opRemoveWatch:
in.reply <- w.remWatch(in.path)
}
default:
}
continue
}
switch e {
case sys_ERROR_MORE_DATA:
if watch == nil {
w.Error <- errors.New("ERROR_MORE_DATA has unexpectedly null lpOverlapped buffer")
} else {
// The i/o succeeded but the buffer is full.
// In theory we should be building up a full packet.
// In practice we can get away with just carrying on.
n = uint32(unsafe.Sizeof(watch.buf))
}
case syscall.ERROR_ACCESS_DENIED:
// Watched directory was probably removed
w.sendEvent(watch.path, watch.mask&sys_FS_DELETE_SELF)
w.deleteWatch(watch)
w.startRead(watch)
continue
case syscall.ERROR_OPERATION_ABORTED:
// CancelIo was called on this handle
continue
default:
w.Error <- os.NewSyscallError("GetQueuedCompletionPort", e)
continue
case nil:
}
var offset uint32
for {
if n == 0 {
w.internalEvent <- &FileEvent{mask: sys_FS_Q_OVERFLOW}
w.Error <- errors.New("short read in readEvents()")
break
}
// Point "raw" to the event in the buffer
raw := (*syscall.FileNotifyInformation)(unsafe.Pointer(&watch.buf[offset]))
buf := (*[syscall.MAX_PATH]uint16)(unsafe.Pointer(&raw.FileName))
name := syscall.UTF16ToString(buf[:raw.FileNameLength/2])
fullname := watch.path + "\\" + name
var mask uint64
switch raw.Action {
case syscall.FILE_ACTION_REMOVED:
mask = sys_FS_DELETE_SELF
case syscall.FILE_ACTION_MODIFIED:
mask = sys_FS_MODIFY
case syscall.FILE_ACTION_RENAMED_OLD_NAME:
watch.rename = name
case syscall.FILE_ACTION_RENAMED_NEW_NAME:
if watch.names[watch.rename] != 0 {
watch.names[name] |= watch.names[watch.rename]
delete(watch.names, watch.rename)
mask = sys_FS_MOVE_SELF
}
}
sendNameEvent := func() {
if w.sendEvent(fullname, watch.names[name]&mask) {
if watch.names[name]&sys_FS_ONESHOT != 0 {
delete(watch.names, name)
}
}
}
if raw.Action != syscall.FILE_ACTION_RENAMED_NEW_NAME {
sendNameEvent()
}
if raw.Action == syscall.FILE_ACTION_REMOVED {
w.sendEvent(fullname, watch.names[name]&sys_FS_IGNORED)
delete(watch.names, name)
}
if w.sendEvent(fullname, watch.mask&toFSnotifyFlags(raw.Action)) {
if watch.mask&sys_FS_ONESHOT != 0 {
watch.mask = 0
}
}
if raw.Action == syscall.FILE_ACTION_RENAMED_NEW_NAME {
fullname = watch.path + "\\" + watch.rename
sendNameEvent()
}
// Move to the next event in the buffer
if raw.NextEntryOffset == 0 {
break
}
offset += raw.NextEntryOffset
// Error!
if offset >= n {
w.Error <- errors.New("Windows system assumed buffer larger than it is, events have likely been missed.")
break
}
}
if err := w.startRead(watch); err != nil {
w.Error <- err
}
}
}
func (w *Watcher) sendEvent(name string, mask uint64) bool {
if mask == 0 {
return false
}
event := &FileEvent{mask: uint32(mask), Name: name}
if mask&sys_FS_MOVE != 0 {
if mask&sys_FS_MOVED_FROM != 0 {
w.cookie++
}
event.cookie = w.cookie
}
select {
case ch := <-w.quit:
w.quit <- ch
case w.Event <- event:
}
return true
}
func toWindowsFlags(mask uint64) uint32 {
var m uint32
if mask&sys_FS_ACCESS != 0 {
m |= syscall.FILE_NOTIFY_CHANGE_LAST_ACCESS
}
if mask&sys_FS_MODIFY != 0 {
m |= syscall.FILE_NOTIFY_CHANGE_LAST_WRITE
}
if mask&sys_FS_ATTRIB != 0 {
m |= syscall.FILE_NOTIFY_CHANGE_ATTRIBUTES
}
if mask&(sys_FS_MOVE|sys_FS_CREATE|sys_FS_DELETE) != 0 {
m |= syscall.FILE_NOTIFY_CHANGE_FILE_NAME | syscall.FILE_NOTIFY_CHANGE_DIR_NAME
}
return m
}
func toFSnotifyFlags(action uint32) uint64 {
switch action {
case syscall.FILE_ACTION_ADDED:
return sys_FS_CREATE
case syscall.FILE_ACTION_REMOVED:
return sys_FS_DELETE
case syscall.FILE_ACTION_MODIFIED:
return sys_FS_MODIFY
case syscall.FILE_ACTION_RENAMED_OLD_NAME:
return sys_FS_MOVED_FROM
case syscall.FILE_ACTION_RENAMED_NEW_NAME:
return sys_FS_MOVED_TO
}
return 0
}

View File

@ -1,29 +0,0 @@
tomb - support for clean goroutine termination in Go.
Copyright (c) 2010-2011 - Gustavo Niemeyer <gustavo@niemeyer.net>
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,4 +0,0 @@
Installation and usage
----------------------
See [gopkg.in/tomb.v1](https://gopkg.in/tomb.v1) for documentation and usage details.

View File

@ -1,176 +0,0 @@
// Copyright (c) 2011 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
// * Neither the name of the copyright holder nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// The tomb package offers a conventional API for clean goroutine termination.
//
// A Tomb tracks the lifecycle of a goroutine as alive, dying or dead,
// and the reason for its death.
//
// The zero value of a Tomb assumes that a goroutine is about to be
// created or already alive. Once Kill or Killf is called with an
// argument that informs the reason for death, the goroutine is in
// a dying state and is expected to terminate soon. Right before the
// goroutine function or method returns, Done must be called to inform
// that the goroutine is indeed dead and about to stop running.
//
// A Tomb exposes Dying and Dead channels. These channels are closed
// when the Tomb state changes in the respective way. They enable
// explicit blocking until the state changes, and also to selectively
// unblock select statements accordingly.
//
// When the tomb state changes to dying and there's still logic going
// on within the goroutine, nested functions and methods may choose to
// return ErrDying as their error value, as this error won't alter the
// tomb state if provided to the Kill method. This is a convenient way to
// follow standard Go practices in the context of a dying tomb.
//
// For background and a detailed example, see the following blog post:
//
// http://blog.labix.org/2011/10/09/death-of-goroutines-under-control
//
// For a more complex code snippet demonstrating the use of multiple
// goroutines with a single Tomb, see:
//
// http://play.golang.org/p/Xh7qWsDPZP
//
package tomb
import (
"errors"
"fmt"
"sync"
)
// A Tomb tracks the lifecycle of a goroutine as alive, dying or dead,
// and the reason for its death.
//
// See the package documentation for details.
type Tomb struct {
m sync.Mutex
dying chan struct{}
dead chan struct{}
reason error
}
var (
ErrStillAlive = errors.New("tomb: still alive")
ErrDying = errors.New("tomb: dying")
)
func (t *Tomb) init() {
t.m.Lock()
if t.dead == nil {
t.dead = make(chan struct{})
t.dying = make(chan struct{})
t.reason = ErrStillAlive
}
t.m.Unlock()
}
// Dead returns the channel that can be used to wait
// until t.Done has been called.
func (t *Tomb) Dead() <-chan struct{} {
t.init()
return t.dead
}
// Dying returns the channel that can be used to wait
// until t.Kill or t.Done has been called.
func (t *Tomb) Dying() <-chan struct{} {
t.init()
return t.dying
}
// Wait blocks until the goroutine is in a dead state and returns the
// reason for its death.
func (t *Tomb) Wait() error {
t.init()
<-t.dead
t.m.Lock()
reason := t.reason
t.m.Unlock()
return reason
}
// Done flags the goroutine as dead, and should be called a single time
// right before the goroutine function or method returns.
// If the goroutine was not already in a dying state before Done is
// called, it will be flagged as dying and dead at once with no
// error.
func (t *Tomb) Done() {
t.Kill(nil)
close(t.dead)
}
// Kill flags the goroutine as dying for the given reason.
// Kill may be called multiple times, but only the first
// non-nil error is recorded as the reason for termination.
//
// If reason is ErrDying, the previous reason isn't replaced
// even if it is nil. It's a runtime error to call Kill with
// ErrDying if t is not in a dying state.
func (t *Tomb) Kill(reason error) {
t.init()
t.m.Lock()
defer t.m.Unlock()
if reason == ErrDying {
if t.reason == ErrStillAlive {
panic("tomb: Kill with ErrDying while still alive")
}
return
}
if t.reason == nil || t.reason == ErrStillAlive {
t.reason = reason
}
// If the receive on t.dying succeeds, then
// it can only be because we have already closed it.
// If it blocks, then we know that it needs to be closed.
select {
case <-t.dying:
default:
close(t.dying)
}
}
// Killf works like Kill, but builds the reason providing the received
// arguments to fmt.Errorf. The generated error is also returned.
func (t *Tomb) Killf(f string, a ...interface{}) error {
err := fmt.Errorf(f, a...)
t.Kill(err)
return err
}
// Err returns the reason for the goroutine death provided via Kill
// or Killf, or ErrStillAlive when the goroutine is still alive.
func (t *Tomb) Err() (reason error) {
t.init()
t.m.Lock()
reason = t.reason
t.m.Unlock()
return
}

View File

@ -1,114 +0,0 @@
package tomb_test
import (
"errors"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/gopkg.in/tomb.v1"
"reflect"
"testing"
)
func TestNewTomb(t *testing.T) {
tb := &tomb.Tomb{}
testState(t, tb, false, false, tomb.ErrStillAlive)
tb.Done()
testState(t, tb, true, true, nil)
}
func TestKill(t *testing.T) {
// a nil reason flags the goroutine as dying
tb := &tomb.Tomb{}
tb.Kill(nil)
testState(t, tb, true, false, nil)
// a non-nil reason now will override Kill
err := errors.New("some error")
tb.Kill(err)
testState(t, tb, true, false, err)
// another non-nil reason won't replace the first one
tb.Kill(errors.New("ignore me"))
testState(t, tb, true, false, err)
tb.Done()
testState(t, tb, true, true, err)
}
func TestKillf(t *testing.T) {
tb := &tomb.Tomb{}
err := tb.Killf("BO%s", "OM")
if s := err.Error(); s != "BOOM" {
t.Fatalf(`Killf("BO%s", "OM"): want "BOOM", got %q`, s)
}
testState(t, tb, true, false, err)
// another non-nil reason won't replace the first one
tb.Killf("ignore me")
testState(t, tb, true, false, err)
tb.Done()
testState(t, tb, true, true, err)
}
func TestErrDying(t *testing.T) {
// ErrDying being used properly, after a clean death.
tb := &tomb.Tomb{}
tb.Kill(nil)
tb.Kill(tomb.ErrDying)
testState(t, tb, true, false, nil)
// ErrDying being used properly, after an errorful death.
err := errors.New("some error")
tb.Kill(err)
tb.Kill(tomb.ErrDying)
testState(t, tb, true, false, err)
// ErrDying being used badly, with an alive tomb.
tb = &tomb.Tomb{}
defer func() {
err := recover()
if err != "tomb: Kill with ErrDying while still alive" {
t.Fatalf("Wrong panic on Kill(ErrDying): %v", err)
}
testState(t, tb, false, false, tomb.ErrStillAlive)
}()
tb.Kill(tomb.ErrDying)
}
func testState(t *testing.T, tb *tomb.Tomb, wantDying, wantDead bool, wantErr error) {
select {
case <-tb.Dying():
if !wantDying {
t.Error("<-Dying: should block")
}
default:
if wantDying {
t.Error("<-Dying: should not block")
}
}
seemsDead := false
select {
case <-tb.Dead():
if !wantDead {
t.Error("<-Dead: should block")
}
seemsDead = true
default:
if wantDead {
t.Error("<-Dead: should not block")
}
}
if err := tb.Err(); err != wantErr {
t.Errorf("Err: want %#v, got %#v", wantErr, err)
}
if wantDead && seemsDead {
waitErr := tb.Wait()
switch {
case waitErr == tomb.ErrStillAlive:
t.Errorf("Wait should not return ErrStillAlive")
case !reflect.DeepEqual(waitErr, wantErr):
t.Errorf("Wait: want %#v, got %#v", wantErr, waitErr)
}
}
}

View File

@ -10,6 +10,7 @@ import (
"strings" "strings"
cors "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/rs/cors" cors "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/rs/cors"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
cmds "github.com/ipfs/go-ipfs/commands" cmds "github.com/ipfs/go-ipfs/commands"
u "github.com/ipfs/go-ipfs/util" u "github.com/ipfs/go-ipfs/util"
@ -150,7 +151,11 @@ func (i internalHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
//ps: take note of the name clash - commands.Context != context.Context //ps: take note of the name clash - commands.Context != context.Context
req.SetInvocContext(i.ctx) req.SetInvocContext(i.ctx)
err = req.SetRootContext(node.Context())
ctx, cancel := context.WithCancel(node.Context())
defer cancel()
err = req.SetRootContext(ctx)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
@ -246,7 +251,12 @@ func sendResponse(w http.ResponseWriter, r *http.Request, res cmds.Response, req
} }
if err := writeResponse(status, w, out); err != nil { if err := writeResponse(status, w, out); err != nil {
log.Error("error while writing stream", err) if strings.Contains(err.Error(), "broken pipe") {
log.Info("client disconnect while writing stream ", err)
return
}
log.Error("error while writing stream ", err)
} }
} }

View File

@ -3,12 +3,10 @@ package commands
import ( import (
"fmt" "fmt"
"io" "io"
"strings"
cmds "github.com/ipfs/go-ipfs/commands" cmds "github.com/ipfs/go-ipfs/commands"
"github.com/ipfs/go-ipfs/thirdparty/eventlog"
u "github.com/ipfs/go-ipfs/util" u "github.com/ipfs/go-ipfs/util"
tail "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ActiveState/tail"
) )
// Golang os.Args overrides * and replaces the character argument with // Golang os.Args overrides * and replaces the character argument with
@ -80,61 +78,12 @@ var logTailCmd = &cmds.Command{
}, },
Run: func(req cmds.Request, res cmds.Response) { Run: func(req cmds.Request, res cmds.Response) {
path := fmt.Sprintf("%s/logs/events.log", req.InvocContext().ConfigRoot) r, w := io.Pipe()
eventlog.WriterGroup.AddWriter(w)
outChan := make(chan interface{})
go func() { go func() {
defer close(outChan) <-req.Context().Done()
w.Close()
t, err := tail.TailFile(path, tail.Config{
Location: &tail.SeekInfo{0, 2},
Follow: true,
MustExist: true,
Logger: tail.DiscardingLogger,
})
if err != nil {
fmt.Println(err.Error())
return
}
defer t.Stop()
done := req.Context().Done()
for line := range t.Lines {
// return when context closes
select {
case <-done:
return
default:
}
if line.Err != nil {
fmt.Println(err.Error())
return
}
// TODO: unpack the line text into a struct and output that
outChan <- &MessageOutput{line.Text}
}
}() }()
res.SetOutput(r)
res.SetOutput((<-chan interface{})(outChan))
}, },
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
outChan, ok := res.Output().(<-chan interface{})
if !ok {
return nil, u.ErrCast()
}
return &cmds.ChannelMarshaler{
Channel: outChan,
Marshaler: func(v interface{}) (io.Reader, error) {
output := v.(*MessageOutput)
return strings.NewReader(output.Message + "\n"), nil
},
}, nil
},
},
Type: MessageOutput{},
} }