mirror of
https://github.com/owncast/owncast.git
synced 2025-11-01 19:32:20 +08:00
⚡ execute webhooks in parallel (and 🐛 update webhooks.last_used correctly) (#1505)
* ⚡ execute webhooks in parallel * 🐛 update the webhooks.last_used column correctly
This commit is contained in:
@ -102,15 +102,15 @@ func GetWebhooksForEvent(event models.EventType) []models.Webhook {
|
|||||||
webhooks := make([]models.Webhook, 0)
|
webhooks := make([]models.Webhook, 0)
|
||||||
|
|
||||||
var query = `SELECT * FROM (
|
var query = `SELECT * FROM (
|
||||||
WITH RECURSIVE split(url, event, rest) AS (
|
WITH RECURSIVE split(id, url, event, rest) AS (
|
||||||
SELECT url, '', events || ',' FROM webhooks
|
SELECT id, url, '', events || ',' FROM webhooks
|
||||||
UNION ALL
|
UNION ALL
|
||||||
SELECT url,
|
SELECT id, url,
|
||||||
substr(rest, 0, instr(rest, ',')),
|
substr(rest, 0, instr(rest, ',')),
|
||||||
substr(rest, instr(rest, ',')+1)
|
substr(rest, instr(rest, ',')+1)
|
||||||
FROM split
|
FROM split
|
||||||
WHERE rest <> '')
|
WHERE rest <> '')
|
||||||
SELECT url, event
|
SELECT id, url, event
|
||||||
FROM split
|
FROM split
|
||||||
WHERE event <> ''
|
WHERE event <> ''
|
||||||
) AS webhook WHERE event IS "` + event + `"`
|
) AS webhook WHERE event IS "` + event + `"`
|
||||||
@ -123,15 +123,17 @@ func GetWebhooksForEvent(event models.EventType) []models.Webhook {
|
|||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
|
var id int
|
||||||
var url string
|
var url string
|
||||||
|
|
||||||
if err := rows.Scan(&url, &event); err != nil {
|
if err := rows.Scan(&id, &url, &event); err != nil {
|
||||||
log.Debugln(err)
|
log.Debugln(err)
|
||||||
log.Error("There is a problem with the database.")
|
log.Error("There is a problem with the database.")
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
singleWebhook := models.Webhook{
|
singleWebhook := models.Webhook{
|
||||||
|
ID: id,
|
||||||
URL: url,
|
URL: url,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,7 +196,7 @@ func GetWebhooks() ([]models.Webhook, error) { //nolint
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SetWebhookAsUsed will update the last used time for a webhook.
|
// SetWebhookAsUsed will update the last used time for a webhook.
|
||||||
func SetWebhookAsUsed(id string) error {
|
func SetWebhookAsUsed(webhook models.Webhook) error {
|
||||||
tx, err := _db.Begin()
|
tx, err := _db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -206,7 +208,7 @@ func SetWebhookAsUsed(id string) error {
|
|||||||
}
|
}
|
||||||
defer stmt.Close()
|
defer stmt.Close()
|
||||||
|
|
||||||
if _, err := stmt.Exec(id); err != nil {
|
if _, err := stmt.Exec(webhook.ID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -36,20 +36,22 @@ func SendEventToWebhooks(payload WebhookEvent) {
|
|||||||
webhooks := data.GetWebhooksForEvent(payload.Type)
|
webhooks := data.GetWebhooksForEvent(payload.Type)
|
||||||
|
|
||||||
for _, webhook := range webhooks {
|
for _, webhook := range webhooks {
|
||||||
log.Debugf("Event %s sent to Webhook %s", payload.Type, webhook.URL)
|
go func(webhook models.Webhook, payload WebhookEvent) {
|
||||||
if err := sendWebhook(webhook.URL, payload); err != nil {
|
log.Debugf("Event %s sent to Webhook %s", payload.Type, webhook.URL)
|
||||||
log.Errorf("Event: %s failed to send to webhook: %s Error: %s", payload.Type, webhook.URL, err)
|
if err := sendWebhook(webhook, payload); err != nil {
|
||||||
}
|
log.Errorf("Event: %s failed to send to webhook: %s Error: %s", payload.Type, webhook.URL, err)
|
||||||
|
}
|
||||||
|
}(webhook, payload)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendWebhook(url string, payload WebhookEvent) error {
|
func sendWebhook(webhook models.Webhook, payload WebhookEvent) error {
|
||||||
jsonText, err := json.Marshal(payload)
|
jsonText, err := json.Marshal(payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequest("POST", url, bytes.NewReader(jsonText))
|
req, err := http.NewRequest("POST", webhook.URL, bytes.NewReader(jsonText))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -65,7 +67,7 @@ func sendWebhook(url string, payload WebhookEvent) error {
|
|||||||
|
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if err := data.SetWebhookAsUsed(url); err != nil {
|
if err := data.SetWebhookAsUsed(webhook); err != nil {
|
||||||
log.Warnln(err)
|
log.Warnln(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user