1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-20 02:21:48 +08:00

fix log hanging issue, and implement close-notify for commands

License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
Jeromy
2015-10-29 21:26:12 -07:00
parent 94bdce63a7
commit 2f5563b3c0
20 changed files with 23 additions and 710 deletions

4
Godeps/Godeps.json generated
View File

@ -144,10 +144,6 @@
"ImportPath": "github.com/inconshreveable/go-update",
"Rev": "68f5725818189545231c1fd8694793d45f2fc529"
},
{
"ImportPath": "github.com/ipfs/go-log",
"Rev": "bf32e06c2f9c81eb33460bc08305aa946f0d893d"
},
{
"ImportPath": "github.com/jackpal/go-nat-pmp",
"Rev": "a45aa3d54aef73b504e15eb71bea0e5565b5e6e1"

View File

@ -1 +0,0 @@
QmTBXYb6y2ZcJmoXVKk3pf9rzSEjbCg7tQaJW7RSuH14nv

View File

@ -1,38 +0,0 @@
package log
import (
"errors"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
type key int
const metadataKey key = 0
// ContextWithLoggable returns a derived context which contains the provided
// Loggable. Any Events logged with the derived context will include the
// provided Loggable.
func ContextWithLoggable(ctx context.Context, l Loggable) context.Context {
existing, err := MetadataFromContext(ctx)
if err != nil {
// context does not contain meta. just set the new metadata
child := context.WithValue(ctx, metadataKey, Metadata(l.Loggable()))
return child
}
merged := DeepMerge(existing, l.Loggable())
child := context.WithValue(ctx, metadataKey, merged)
return child
}
func MetadataFromContext(ctx context.Context) (Metadata, error) {
value := ctx.Value(metadataKey)
if value != nil {
metadata, ok := value.(Metadata)
if ok {
return metadata, nil
}
}
return nil, errors.New("context contains no metadata")
}

View File

@ -1,44 +0,0 @@
package log
import (
"testing"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
func TestContextContainsMetadata(t *testing.T) {
t.Parallel()
m := Metadata{"foo": "bar"}
ctx := ContextWithLoggable(context.Background(), m)
got, err := MetadataFromContext(ctx)
if err != nil {
t.Fatal(err)
}
_, exists := got["foo"]
if !exists {
t.Fail()
}
}
func TestContextWithPreexistingMetadata(t *testing.T) {
t.Parallel()
ctx := ContextWithLoggable(context.Background(), Metadata{"hello": "world"})
ctx = ContextWithLoggable(ctx, Metadata{"goodbye": "earth"})
got, err := MetadataFromContext(ctx)
if err != nil {
t.Fatal(err)
}
_, exists := got["hello"]
if !exists {
t.Fatal("original key not present")
}
_, exists = got["goodbye"]
if !exists {
t.Fatal("new key not present")
}
}

View File

@ -1,7 +0,0 @@
package log
type entry struct {
loggables []Loggable
system string
event string
}

View File

@ -1,16 +0,0 @@
package log
import "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
func ExampleEventLogger() {
{
log := EventLogger(nil)
e := log.EventBegin(context.Background(), "dial")
e.Done()
}
{
log := EventLogger(nil)
e := log.EventBegin(context.Background(), "dial")
_ = e.Close() // implements io.Closer for convenience
}
}

View File

@ -1,170 +0,0 @@
package log
import (
"encoding/json"
"fmt"
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
// StandardLogger provides API compatibility with standard printf loggers
// eg. go-logging
type StandardLogger interface {
Debug(args ...interface{})
Debugf(format string, args ...interface{})
Error(args ...interface{})
Errorf(format string, args ...interface{})
Fatal(args ...interface{})
Fatalf(format string, args ...interface{})
Info(args ...interface{})
Infof(format string, args ...interface{})
Panic(args ...interface{})
Panicf(format string, args ...interface{})
Warning(args ...interface{})
Warningf(format string, args ...interface{})
}
// EventLogger extends the StandardLogger interface to allow for log items
// containing structured metadata
type EventLogger interface {
StandardLogger
// Event merges structured data from the provided inputs into a single
// machine-readable log event.
//
// If the context contains metadata, a copy of this is used as the base
// metadata accumulator.
//
// If one or more loggable objects are provided, these are deep-merged into base blob.
//
// Next, the event name is added to the blob under the key "event". If
// the key "event" already exists, it will be over-written.
//
// Finally the timestamp and package name are added to the accumulator and
// the metadata is logged.
Event(ctx context.Context, event string, m ...Loggable)
EventBegin(ctx context.Context, event string, m ...Loggable) *EventInProgress
}
// Logger retrieves an event logger by name
func Logger(system string) EventLogger {
// TODO if we would like to adjust log levels at run-time. Store this event
// logger in a map (just like the util.Logger impl)
if len(system) == 0 {
setuplog := getLogger("setup-logger")
setuplog.Warning("Missing name parameter")
system = "undefined"
}
logger := getLogger(system)
return &eventLogger{system: system, StandardLogger: logger}
}
// eventLogger implements the EventLogger and wraps a go-logging Logger
type eventLogger struct {
StandardLogger
system string
// TODO add log-level
}
func (el *eventLogger) EventBegin(ctx context.Context, event string, metadata ...Loggable) *EventInProgress {
start := time.Now()
el.Event(ctx, fmt.Sprintf("%sBegin", event), metadata...)
eip := &EventInProgress{}
eip.doneFunc = func(additional []Loggable) {
metadata = append(metadata, additional...) // anything added during the operation
metadata = append(metadata, LoggableMap(map[string]interface{}{ // finally, duration of event
"duration": time.Now().Sub(start),
}))
el.Event(ctx, event, metadata...)
}
return eip
}
func (el *eventLogger) Event(ctx context.Context, event string, metadata ...Loggable) {
// short circuit if theres nothing to write to
if !WriterGroup.Active() {
return
}
// Collect loggables for later logging
var loggables []Loggable
// get any existing metadata from the context
existing, err := MetadataFromContext(ctx)
if err != nil {
existing = Metadata{}
}
loggables = append(loggables, existing)
for _, datum := range metadata {
loggables = append(loggables, datum)
}
e := entry{
loggables: loggables,
system: el.system,
event: event,
}
accum := Metadata{}
for _, loggable := range e.loggables {
accum = DeepMerge(accum, loggable.Loggable())
}
// apply final attributes to reserved keys
// TODO accum["level"] = level
accum["event"] = e.event
accum["system"] = e.system
accum["time"] = FormatRFC3339(time.Now())
out, err := json.Marshal(accum)
if err != nil {
el.Errorf("ERROR FORMATTING EVENT ENTRY: %s", err)
return
}
WriterGroup.Write(append(out, '\n'))
}
type EventInProgress struct {
loggables []Loggable
doneFunc func([]Loggable)
}
// Append adds loggables to be included in the call to Done
func (eip *EventInProgress) Append(l Loggable) {
eip.loggables = append(eip.loggables, l)
}
// SetError includes the provided error
func (eip *EventInProgress) SetError(err error) {
eip.loggables = append(eip.loggables, LoggableMap{
"error": err.Error(),
})
}
// Done creates a new Event entry that includes the duration and appended
// loggables.
func (eip *EventInProgress) Done() {
eip.doneFunc(eip.loggables) // create final event with extra data
}
// Close is an alias for done
func (eip *EventInProgress) Close() error {
eip.Done()
return nil
}
func FormatRFC3339(t time.Time) string {
return t.UTC().Format(time.RFC3339Nano)
}

View File

@ -1,34 +0,0 @@
package log
// Loggable describes objects that can be marshalled into Metadata for logging
type Loggable interface {
Loggable() map[string]interface{}
}
type LoggableMap map[string]interface{}
func (l LoggableMap) Loggable() map[string]interface{} {
return l
}
// LoggableF converts a func into a Loggable
type LoggableF func() map[string]interface{}
func (l LoggableF) Loggable() map[string]interface{} {
return l()
}
func Deferred(key string, f func() string) Loggable {
function := func() map[string]interface{} {
return map[string]interface{}{
key: f(),
}
}
return LoggableF(function)
}
func Pair(key string, l Loggable) Loggable {
return LoggableMap{
key: l,
}
}

View File

@ -1,82 +0,0 @@
package log
import (
"encoding/json"
"errors"
"reflect"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/satori/go.uuid"
)
// Metadata is a convenience type for generic maps
type Metadata map[string]interface{}
// Uuid returns a Metadata with the string key and UUID value
func Uuid(key string) Metadata {
return Metadata{
key: uuid.NewV4().String(),
}
}
// DeepMerge merges the second Metadata parameter into the first.
// Nested Metadata are merged recursively. Primitives are over-written.
func DeepMerge(b, a Metadata) Metadata {
out := Metadata{}
for k, v := range b {
out[k] = v
}
for k, v := range a {
maybe, err := Metadatify(v)
if err != nil {
// if the new value is not meta. just overwrite the dest vaue
out[k] = v
continue
}
// it is meta. What about dest?
outv, exists := out[k]
if !exists {
// the new value is meta, but there's no dest value. just write it
out[k] = v
continue
}
outMetadataValue, err := Metadatify(outv)
if err != nil {
// the new value is meta and there's a dest value, but the dest
// value isn't meta. just overwrite
out[k] = v
continue
}
// both are meta. merge them.
out[k] = DeepMerge(outMetadataValue, maybe)
}
return out
}
// Loggable implements the Loggable interface
func (m Metadata) Loggable() map[string]interface{} {
// NB: method defined on value to avoid de-referencing nil Metadata
return m
}
func (m Metadata) JsonString() (string, error) {
// NB: method defined on value
b, err := json.Marshal(m)
return string(b), err
}
// Metadatify converts maps into Metadata
func Metadatify(i interface{}) (Metadata, error) {
value := reflect.ValueOf(i)
if value.Kind() == reflect.Map {
m := map[string]interface{}{}
for _, k := range value.MapKeys() {
m[k.String()] = value.MapIndex(k).Interface()
}
return Metadata(m), nil
}
return nil, errors.New("is not a map")
}

View File

@ -1,50 +0,0 @@
package log
import "testing"
func TestOverwrite(t *testing.T) {
t.Parallel()
under := Metadata{
"a": Metadata{
"b": Metadata{
"c": Metadata{
"d": "the original value",
"other": "SURVIVE",
},
},
},
}
over := Metadata{
"a": Metadata{
"b": Metadata{
"c": Metadata{
"d": "a new value",
},
},
},
}
out := DeepMerge(under, over)
dval := out["a"].(Metadata)["b"].(Metadata)["c"].(Metadata)["d"].(string)
if dval != "a new value" {
t.Fatal(dval)
}
surv := out["a"].(Metadata)["b"].(Metadata)["c"].(Metadata)["other"].(string)
if surv != "SURVIVE" {
t.Fatal(surv)
}
}
func TestMarshalJSON(t *testing.T) {
t.Parallel()
bs, _ := Metadata{"a": "b"}.JsonString()
t.Log(bs)
}
func TestMetadataIsLoggable(t *testing.T) {
t.Parallel()
func(l Loggable) {
}(Metadata{})
}

View File

@ -1,104 +0,0 @@
package log
import (
"errors"
"os"
logging "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/go-logging"
)
func init() {
SetupLogging()
}
var ansiGray = "\033[0;37m"
var ansiBlue = "\033[0;34m"
var LogFormats = map[string]string{
"nocolor": "%{time:2006-01-02 15:04:05.000000} %{level} %{module} %{shortfile}: %{message}",
"color": ansiGray + "%{time:15:04:05.000} %{color}%{level:5.5s} " + ansiBlue +
"%{module:10.10s}: %{color:reset}%{message} " + ansiGray + "%{shortfile}%{color:reset}",
}
var defaultLogFormat = "color"
// Logging environment variables
const (
envLogging = "IPFS_LOGGING"
envLoggingFmt = "IPFS_LOGGING_FMT"
)
// ErrNoSuchLogger is returned when the util pkg is asked for a non existant logger
var ErrNoSuchLogger = errors.New("Error: No such logger")
// loggers is the set of loggers in the system
var loggers = map[string]*logging.Logger{}
// SetupLogging will initialize the logger backend and set the flags.
func SetupLogging() {
fmt := LogFormats[os.Getenv(envLoggingFmt)]
if fmt == "" {
fmt = LogFormats[defaultLogFormat]
}
backend := logging.NewLogBackend(os.Stderr, "", 0)
logging.SetBackend(backend)
logging.SetFormatter(logging.MustStringFormatter(fmt))
lvl := logging.ERROR
if logenv := os.Getenv(envLogging); logenv != "" {
var err error
lvl, err = logging.LogLevel(logenv)
if err != nil {
}
}
SetAllLoggers(lvl)
}
// SetDebugLogging calls SetAllLoggers with logging.DEBUG
func SetDebugLogging() {
SetAllLoggers(logging.DEBUG)
}
// SetAllLoggers changes the logging.Level of all loggers to lvl
func SetAllLoggers(lvl logging.Level) {
logging.SetLevel(lvl, "")
for n := range loggers {
logging.SetLevel(lvl, n)
}
}
// SetLogLevel changes the log level of a specific subsystem
// name=="*" changes all subsystems
func SetLogLevel(name, level string) error {
lvl, err := logging.LogLevel(level)
if err != nil {
return err
}
// wildcard, change all
if name == "*" {
SetAllLoggers(lvl)
return nil
}
// Check if we have a logger by that name
if _, ok := loggers[name]; !ok {
return ErrNoSuchLogger
}
logging.SetLevel(lvl, name)
return nil
}
func getLogger(name string) *logging.Logger {
log := logging.MustGetLogger(name)
log.ExtraCalldepth = 1
loggers[name] = log
return log
}

View File

@ -1,62 +0,0 @@
package log
import (
"io"
"os"
logging "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/go-logging"
)
// init sets up sane defaults
func init() {
Configure(TextFormatter)
Configure(Output(os.Stderr))
// has the effect of disabling logging since we log event entries at Info
// level by convention
Configure(LevelError)
}
// Global writer group for logs to output to
var WriterGroup = new(MirrorWriter)
type Option func()
// Configure applies the provided options sequentially from left to right
func Configure(options ...Option) {
for _, f := range options {
f()
}
}
// LdJSONFormatter Option formats the event log as line-delimited JSON
var LdJSONFormatter = func() {
logging.SetFormatter(&PoliteJSONFormatter{})
}
// TextFormatter Option formats the event log as human-readable plain-text
var TextFormatter = func() {
logging.SetFormatter(logging.DefaultFormatter)
}
func Output(w io.Writer) Option {
return func() {
backend := logging.NewLogBackend(w, "", 0)
logging.SetBackend(backend)
// TODO return previous Output option
}
}
// LevelDebug Option sets the log level to debug
var LevelDebug = func() {
logging.SetLevel(logging.DEBUG, "")
}
// LevelError Option sets the log level to error
var LevelError = func() {
logging.SetLevel(logging.ERROR, "")
}
// LevelInfo Option sets the log level to info
var LevelInfo = func() {
logging.SetLevel(logging.INFO, "")
}

View File

@ -1,5 +0,0 @@
{
"name": "go-log",
"version": "1.0.0",
"language": "go"
}

View File

@ -1,28 +0,0 @@
package log
import (
"encoding/json"
"io"
logging "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/go-logging"
)
// PoliteJSONFormatter marshals entries into JSON encoded slices (without
// overwriting user-provided keys). How polite of it!
type PoliteJSONFormatter struct{}
func (f *PoliteJSONFormatter) Format(calldepth int, r *logging.Record, w io.Writer) error {
entry := make(map[string]interface{})
entry["id"] = r.Id
entry["level"] = r.Level
entry["time"] = r.Time
entry["module"] = r.Module
entry["message"] = r.Message()
err := json.NewEncoder(w).Encode(entry)
if err != nil {
return err
}
w.Write([]byte{'\n'})
return nil
}

View File

@ -1,50 +0,0 @@
package log
import (
"io"
"sync"
)
type MirrorWriter struct {
writers []io.Writer
lk sync.Mutex
}
func (mw *MirrorWriter) Write(b []byte) (int, error) {
mw.lk.Lock()
// write to all writers, and nil out the broken ones.
var dropped bool
for i, w := range mw.writers {
_, err := w.Write(b)
if err != nil {
mw.writers[i] = nil
dropped = true
}
}
// consolidate the slice
if dropped {
writers := mw.writers
mw.writers = nil
for _, w := range writers {
if w != nil {
mw.writers = append(mw.writers, w)
}
}
}
mw.lk.Unlock()
return len(b), nil
}
func (mw *MirrorWriter) AddWriter(w io.Writer) {
mw.lk.Lock()
mw.writers = append(mw.writers, w)
mw.lk.Unlock()
}
func (mw *MirrorWriter) Active() (active bool) {
mw.lk.Lock()
active = len(mw.writers) > 0
mw.lk.Unlock()
return
}

View File

@ -12,10 +12,10 @@ import (
"strings"
"time"
logging "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-log"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-os-rename"
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
)
var log = logging.Logger("flatfs")

View File

@ -149,6 +149,15 @@ func (i internalHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithCancel(node.Context())
defer cancel()
if cn, ok := w.(http.CloseNotifier); ok {
go func() {
select {
case <-cn.CloseNotify():
case <-ctx.Done():
}
cancel()
}()
}
err = req.SetRootContext(ctx)
if err != nil {

View File

@ -77,12 +77,13 @@ var logTailCmd = &cmds.Command{
},
Run: func(req cmds.Request, res cmds.Response) {
ctx := req.Context()
r, w := io.Pipe()
logging.WriterGroup.AddWriter(w)
go func() {
<-req.Context().Done()
w.Close()
defer w.Close()
<-ctx.Done()
}()
logging.WriterGroup.AddWriter(w)
res.SetOutput(r)
},
}

View File

@ -14,7 +14,7 @@ type writeErrNotifier struct {
errs chan error
}
func newWriteErrNotifier(w io.Writer) (io.Writer, <-chan error) {
func newWriteErrNotifier(w io.Writer) (io.WriteCloser, <-chan error) {
ch := make(chan error, 1)
return &writeErrNotifier{
w: w,
@ -36,6 +36,14 @@ func (w *writeErrNotifier) Write(b []byte) (int, error) {
return n, err
}
func (w *writeErrNotifier) Close() error {
select {
case w.errs <- io.EOF:
default:
}
return nil
}
func LogOption() ServeOption {
return func(n *core.IpfsNode, _ net.Listener, mux *http.ServeMux) (*http.ServeMux, error) {
mux.HandleFunc("/logs", func(w http.ResponseWriter, r *http.Request) {

View File

@ -26,7 +26,6 @@ import (
u "github.com/ipfs/go-ipfs/util"
util "github.com/ipfs/go-ipfs/util"
ds2 "github.com/ipfs/go-ipfs/util/datastore2"
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
)
// version number that we are currently expecting to see
@ -159,9 +158,6 @@ func open(repoPath string) (repo.Repo, error) {
return nil, err
}
// setup eventlogger
configureEventLoggerAtRepoPath(r.config, r.path)
keepLocked = true
return r, nil
}
@ -401,12 +397,6 @@ func (r *FSRepo) openDatastore() error {
return nil
}
func configureEventLoggerAtRepoPath(c *config.Config, repoPath string) {
logging.Configure(logging.LevelInfo)
logging.Configure(logging.LdJSONFormatter)
logging.Configure(logging.Output(logging.WriterGroup))
}
// Close closes the FSRepo, releasing held resources.
func (r *FSRepo) Close() error {
packageLock.Lock()