diff --git a/pkg/api/org_invite.go b/pkg/api/org_invite.go index d6ab1c9d372..9f4f714af45 100644 --- a/pkg/api/org_invite.go +++ b/pkg/api/org_invite.go @@ -60,7 +60,9 @@ func AddOrgInvite(c *m.ReqContext, inviteDto dtos.AddInviteForm) Response { } // send invite email + c.Logger.Error("sending?") if inviteDto.SendEmail && util.IsEmail(inviteDto.LoginOrEmail) { + c.Logger.Error("yes sending?") emailCmd := m.SendEmailCommand{ To: []string{inviteDto.LoginOrEmail}, Template: "new_user_invite.html", diff --git a/pkg/cmd/grafana-server/server.go b/pkg/cmd/grafana-server/server.go index 1bf0e90915f..3d4f75978bd 100644 --- a/pkg/cmd/grafana-server/server.go +++ b/pkg/cmd/grafana-server/server.go @@ -26,16 +26,17 @@ import ( "github.com/grafana/grafana/pkg/login" "github.com/grafana/grafana/pkg/metrics" "github.com/grafana/grafana/pkg/plugins" - "github.com/grafana/grafana/pkg/services/notifications" "github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/social" "github.com/grafana/grafana/pkg/tracing" + // self registering services _ "github.com/grafana/grafana/pkg/extensions" _ "github.com/grafana/grafana/pkg/services/alerting" _ "github.com/grafana/grafana/pkg/services/cleanup" + _ "github.com/grafana/grafana/pkg/services/notifications" _ "github.com/grafana/grafana/pkg/services/search" ) @@ -56,9 +57,9 @@ type GrafanaServerImpl struct { shutdownFn context.CancelFunc childRoutines *errgroup.Group log log.Logger - RouteRegister api.RouteRegister `inject:""` - HttpServer *api.HTTPServer `inject:""` + RouteRegister api.RouteRegister `inject:""` + HttpServer *api.HTTPServer `inject:""` } func (g *GrafanaServerImpl) Start() error { @@ -89,10 +90,6 @@ func (g *GrafanaServerImpl) Start() error { } defer tracingCloser.Close() - if err = notifications.Init(); err != nil { - return fmt.Errorf("Notification service failed to initialize. error: %v", err) - } - serviceGraph := inject.Graph{} serviceGraph.Provide(&inject.Object{Value: bus.GetBus()}) serviceGraph.Provide(&inject.Object{Value: dashboards.NewProvisioningService()}) diff --git a/pkg/services/notifications/mailer.go b/pkg/services/notifications/mailer.go index 1bac5025244..37169661d73 100644 --- a/pkg/services/notifications/mailer.go +++ b/pkg/services/notifications/mailer.go @@ -11,44 +11,12 @@ import ( "html/template" "net" "strconv" - "strings" - "github.com/grafana/grafana/pkg/log" m "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/setting" gomail "gopkg.in/mail.v2" ) -var mailQueue chan *Message - -func initMailQueue() { - mailQueue = make(chan *Message, 10) - go processMailQueue() -} - -func processMailQueue() { - for { - select { - case msg := <-mailQueue: - num, err := send(msg) - tos := strings.Join(msg.To, "; ") - info := "" - if err != nil { - if len(msg.Info) > 0 { - info = ", info: " + msg.Info - } - log.Error(4, fmt.Sprintf("Async sent email %d succeed, not send emails: %s%s err: %s", num, tos, info, err)) - } else { - log.Trace(fmt.Sprintf("Async sent email %d succeed, sent emails: %s%s", num, tos, info)) - } - } - } -} - -var addToMailQueue = func(msg *Message) { - mailQueue <- msg -} - func send(msg *Message) (int, error) { dialer, err := createDialer() if err != nil { diff --git a/pkg/services/notifications/notifications.go b/pkg/services/notifications/notifications.go index 25eb2b5936a..ad776057ad7 100644 --- a/pkg/services/notifications/notifications.go +++ b/pkg/services/notifications/notifications.go @@ -7,11 +7,13 @@ import ( "html/template" "net/url" "path/filepath" + "strings" "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/events" "github.com/grafana/grafana/pkg/log" m "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/util" ) @@ -21,20 +23,31 @@ var tmplResetPassword = "reset_password.html" var tmplSignUpStarted = "signup_started.html" var tmplWelcomeOnSignUp = "welcome_on_signup.html" -func Init() error { - initMailQueue() - initWebhookQueue() +func init() { + registry.RegisterService(&NotificationService{}) +} - bus.AddHandler("email", sendResetPasswordEmail) - bus.AddHandler("email", validateResetPasswordCode) - bus.AddHandler("email", sendEmailCommandHandler) +type NotificationService struct { + Bus bus.Bus `inject:""` + mailQueue chan *Message + webhookQueue chan *Webhook + log log.Logger +} - bus.AddCtxHandler("email", sendEmailCommandHandlerSync) +func (ns *NotificationService) Init() error { + ns.log = log.New("notifications") + ns.mailQueue = make(chan *Message, 10) + ns.webhookQueue = make(chan *Webhook, 10) - bus.AddCtxHandler("webhook", SendWebhookSync) + ns.Bus.AddHandler(ns.sendResetPasswordEmail) + ns.Bus.AddHandler(ns.validateResetPasswordCode) + ns.Bus.AddHandler(ns.sendEmailCommandHandler) - bus.AddEventListener(signUpStartedHandler) - bus.AddEventListener(signUpCompletedHandler) + ns.Bus.AddCtxHandler(ns.sendEmailCommandHandlerSync) + ns.Bus.AddCtxHandler(ns.SendWebhookSync) + + ns.Bus.AddEventListener(ns.signUpStartedHandler) + ns.Bus.AddEventListener(ns.signUpCompletedHandler) mailTemplates = template.New("name") mailTemplates.Funcs(template.FuncMap{ @@ -58,8 +71,37 @@ func Init() error { return nil } -func SendWebhookSync(ctx context.Context, cmd *m.SendWebhookSync) error { - return sendWebRequestSync(ctx, &Webhook{ +func (ns *NotificationService) Run(ctx context.Context) error { + for { + select { + case webhook := <-ns.webhookQueue: + err := ns.sendWebRequestSync(context.Background(), webhook) + + if err != nil { + ns.log.Error("Failed to send webrequest ", "error", err) + } + case msg := <-ns.mailQueue: + num, err := send(msg) + tos := strings.Join(msg.To, "; ") + info := "" + if err != nil { + if len(msg.Info) > 0 { + info = ", info: " + msg.Info + } + ns.log.Error(fmt.Sprintf("Async sent email %d succeed, not send emails: %s%s err: %s", num, tos, info, err)) + } else { + ns.log.Debug(fmt.Sprintf("Async sent email %d succeed, sent emails: %s%s", num, tos, info)) + } + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil +} + +func (ns *NotificationService) SendWebhookSync(ctx context.Context, cmd *m.SendWebhookSync) error { + return ns.sendWebRequestSync(ctx, &Webhook{ Url: cmd.Url, User: cmd.User, Password: cmd.Password, @@ -74,7 +116,7 @@ func subjectTemplateFunc(obj map[string]interface{}, value string) string { return "" } -func sendEmailCommandHandlerSync(ctx context.Context, cmd *m.SendEmailCommandSync) error { +func (ns *NotificationService) sendEmailCommandHandlerSync(ctx context.Context, cmd *m.SendEmailCommandSync) error { message, err := buildEmailMessage(&m.SendEmailCommand{ Data: cmd.Data, Info: cmd.Info, @@ -89,24 +131,22 @@ func sendEmailCommandHandlerSync(ctx context.Context, cmd *m.SendEmailCommandSyn } _, err = send(message) - return err } -func sendEmailCommandHandler(cmd *m.SendEmailCommand) error { +func (ns *NotificationService) sendEmailCommandHandler(cmd *m.SendEmailCommand) error { message, err := buildEmailMessage(cmd) if err != nil { return err } - addToMailQueue(message) - + ns.mailQueue <- message return nil } -func sendResetPasswordEmail(cmd *m.SendResetPasswordEmailCommand) error { - return sendEmailCommandHandler(&m.SendEmailCommand{ +func (ns *NotificationService) sendResetPasswordEmail(cmd *m.SendResetPasswordEmailCommand) error { + return ns.sendEmailCommandHandler(&m.SendEmailCommand{ To: []string{cmd.User.Email}, Template: tmplResetPassword, Data: map[string]interface{}{ @@ -116,7 +156,7 @@ func sendResetPasswordEmail(cmd *m.SendResetPasswordEmailCommand) error { }) } -func validateResetPasswordCode(query *m.ValidateResetPasswordCodeQuery) error { +func (ns *NotificationService) validateResetPasswordCode(query *m.ValidateResetPasswordCodeQuery) error { login := getLoginForEmailCode(query.Code) if login == "" { return m.ErrInvalidEmailCode @@ -135,18 +175,18 @@ func validateResetPasswordCode(query *m.ValidateResetPasswordCodeQuery) error { return nil } -func signUpStartedHandler(evt *events.SignUpStarted) error { +func (ns *NotificationService) signUpStartedHandler(evt *events.SignUpStarted) error { if !setting.VerifyEmailEnabled { return nil } - log.Info("User signup started: %s", evt.Email) + ns.log.Info("User signup started", "email", evt.Email) if evt.Email == "" { return nil } - err := sendEmailCommandHandler(&m.SendEmailCommand{ + err := ns.sendEmailCommandHandler(&m.SendEmailCommand{ To: []string{evt.Email}, Template: tmplSignUpStarted, Data: map[string]interface{}{ @@ -155,6 +195,7 @@ func signUpStartedHandler(evt *events.SignUpStarted) error { "SignUpUrl": setting.ToAbsUrl(fmt.Sprintf("signup/?email=%s&code=%s", url.QueryEscape(evt.Email), url.QueryEscape(evt.Code))), }, }) + if err != nil { return err } @@ -163,12 +204,12 @@ func signUpStartedHandler(evt *events.SignUpStarted) error { return bus.Dispatch(&emailSentCmd) } -func signUpCompletedHandler(evt *events.SignUpCompleted) error { +func (ns *NotificationService) signUpCompletedHandler(evt *events.SignUpCompleted) error { if evt.Email == "" || !setting.Smtp.SendWelcomeEmailOnSignUp { return nil } - return sendEmailCommandHandler(&m.SendEmailCommand{ + return ns.sendEmailCommandHandler(&m.SendEmailCommand{ To: []string{evt.Email}, Template: tmplWelcomeOnSignUp, Data: map[string]interface{}{ diff --git a/pkg/services/notifications/notifications_test.go b/pkg/services/notifications/notifications_test.go index 3a5ff5fedb7..a86bd3b19ed 100644 --- a/pkg/services/notifications/notifications_test.go +++ b/pkg/services/notifications/notifications_test.go @@ -3,6 +3,7 @@ package notifications import ( "testing" + "github.com/grafana/grafana/pkg/bus" m "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/setting" . "github.com/smartystreets/goconvey/convey" @@ -17,25 +18,23 @@ type testTriggeredAlert struct { func TestNotifications(t *testing.T) { Convey("Given the notifications service", t, func() { - //bus.ClearBusHandlers() - setting.StaticRootPath = "../../../public/" setting.Smtp.Enabled = true setting.Smtp.TemplatesPattern = "emails/*.html" setting.Smtp.FromAddress = "from@address.com" setting.Smtp.FromName = "Grafana Admin" - err := Init() + ns := &NotificationService{} + ns.Bus = bus.New() + + err := ns.Init() So(err, ShouldBeNil) - var sentMsg *Message - addToMailQueue = func(msg *Message) { - sentMsg = msg - } - Convey("When sending reset email password", func() { - err := sendResetPasswordEmail(&m.SendResetPasswordEmailCommand{User: &m.User{Email: "asd@asd.com"}}) + err := ns.sendResetPasswordEmail(&m.SendResetPasswordEmailCommand{User: &m.User{Email: "asd@asd.com"}}) So(err, ShouldBeNil) + + sentMsg := <-ns.mailQueue So(sentMsg.Body, ShouldContainSubstring, "body") So(sentMsg.Subject, ShouldEqual, "Reset your Grafana password - asd@asd.com") So(sentMsg.Body, ShouldNotContainSubstring, "Subject") diff --git a/pkg/services/notifications/send_email_integration_test.go b/pkg/services/notifications/send_email_integration_test.go index a9a5215d3ca..a9f37018a3a 100644 --- a/pkg/services/notifications/send_email_integration_test.go +++ b/pkg/services/notifications/send_email_integration_test.go @@ -12,8 +12,6 @@ import ( func TestEmailIntegrationTest(t *testing.T) { SkipConvey("Given the notifications service", t, func() { - bus.ClearBusHandlers() - setting.StaticRootPath = "../../../public/" setting.Smtp.Enabled = true setting.Smtp.TemplatesPattern = "emails/*.html" @@ -21,14 +19,11 @@ func TestEmailIntegrationTest(t *testing.T) { setting.Smtp.FromName = "Grafana Admin" setting.BuildVersion = "4.0.0" - err := Init() - So(err, ShouldBeNil) + ns := &NotificationService{} + ns.Bus = bus.New() - addToMailQueue = func(msg *Message) { - So(msg.From, ShouldEqual, "Grafana Admin ") - So(msg.To[0], ShouldEqual, "asdf@asdf.com") - ioutil.WriteFile("../../../tmp/test_email.html", []byte(msg.Body), 0777) - } + err := ns.Init() + So(err, ShouldBeNil) Convey("When sending reset email password", func() { cmd := &m.SendEmailCommand{ @@ -59,8 +54,13 @@ func TestEmailIntegrationTest(t *testing.T) { Template: "alert_notification.html", } - err := sendEmailCommandHandler(cmd) + err := ns.sendEmailCommandHandler(cmd) So(err, ShouldBeNil) + + sentMsg := <-ns.mailQueue + So(sentMsg.From, ShouldEqual, "Grafana Admin ") + So(sentMsg.To[0], ShouldEqual, "asdf@asdf.com") + ioutil.WriteFile("../../../tmp/test_email.html", []byte(sentMsg.Body), 0777) }) }) } diff --git a/pkg/services/notifications/webhook.go b/pkg/services/notifications/webhook.go index 0636a6adadc..01db2d56471 100644 --- a/pkg/services/notifications/webhook.go +++ b/pkg/services/notifications/webhook.go @@ -11,7 +11,6 @@ import ( "golang.org/x/net/context/ctxhttp" - "github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/util" ) @@ -37,32 +36,8 @@ var netClient = &http.Client{ Transport: netTransport, } -var ( - webhookQueue chan *Webhook - webhookLog log.Logger -) - -func initWebhookQueue() { - webhookLog = log.New("notifications.webhook") - webhookQueue = make(chan *Webhook, 10) - go processWebhookQueue() -} - -func processWebhookQueue() { - for { - select { - case webhook := <-webhookQueue: - err := sendWebRequestSync(context.Background(), webhook) - - if err != nil { - webhookLog.Error("Failed to send webrequest ", "error", err) - } - } - } -} - -func sendWebRequestSync(ctx context.Context, webhook *Webhook) error { - webhookLog.Debug("Sending webhook", "url", webhook.Url, "http method", webhook.HttpMethod) +func (ns *NotificationService) sendWebRequestSync(ctx context.Context, webhook *Webhook) error { + ns.log.Debug("Sending webhook", "url", webhook.Url, "http method", webhook.HttpMethod) if webhook.HttpMethod == "" { webhook.HttpMethod = http.MethodPost @@ -98,6 +73,6 @@ func sendWebRequestSync(ctx context.Context, webhook *Webhook) error { return err } - webhookLog.Debug("Webhook failed", "statuscode", resp.Status, "body", string(body)) + ns.log.Debug("Webhook failed", "statuscode", resp.Status, "body", string(body)) return fmt.Errorf("Webhook response status %v", resp.Status) }