feat(ap): add retryable activitypub outbound delivery client

This commit is contained in:
Gabe Kangas
2026-01-21 20:29:06 -08:00
parent 9f30aa37ce
commit 488ef058f0
5 changed files with 104 additions and 18 deletions

View File

@@ -11,6 +11,7 @@ import (
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/activitypub/crypto"
"github.com/owncast/owncast/persistence/configrepository"
"github.com/owncast/owncast/utils"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
@@ -45,31 +46,37 @@ func Resolve(c context.Context, data []byte, callbacks ...interface{}) error {
return nil
}
// ResolveIRI will resolve an IRI ahd call the correct callback for the resolved type.
// ResolveIRI will resolve an IRI and call the correct callback for the resolved type.
// Uses a retryable HTTP client for resilience against transient failures.
func ResolveIRI(c context.Context, iri string, callbacks ...interface{}) error {
configRepository := configrepository.Get()
log.Debugln("Resolving", iri)
req, _ := http.NewRequest(http.MethodGet, iri, nil)
req.Header.Set("Accept", "application/activity+json, application/ld+json")
actor := apmodels.MakeLocalIRIForAccount(configRepository.GetDefaultFederationUsername())
if err := crypto.SignRequest(req, nil, actor); err != nil {
return err
}
response, err := http.DefaultClient.Do(req)
client := utils.GetRetryableHTTPClient()
response, err := client.Do(req)
if err != nil {
return err
}
defer response.Body.Close()
if response.StatusCode >= 400 {
return errors.New("request failed with status: " + response.Status)
}
data, err := io.ReadAll(response.Body)
if err != nil {
return err
}
// fmt.Println(string(data))
return Resolve(c, data, callbacks...)
}

View File

@@ -5,6 +5,7 @@ import (
"sync"
"time"
"github.com/owncast/owncast/utils"
log "github.com/sirupsen/logrus"
)
@@ -41,19 +42,18 @@ type domainFailure struct {
// InitOutboundWorkerPool starts n go routines that await ActivityPub jobs.
func InitOutboundWorkerPool(workerPoolSize int) {
queue = make(chan Job, workerPoolSize)
// Initialize HTTP client with conservative timeouts and connection limits
// to prevent resource exhaustion and hanging requests
httpClient = &http.Client{
Timeout: 15 * time.Second, // Reduced from 30s for faster failure detection
Transport: &http.Transport{
MaxIdleConns: 20, // Reduced from 100 to limit resource usage
MaxIdleConnsPerHost: 2, // Reduced from 10 to be more conservative
IdleConnTimeout: 10 * time.Second, // Reduced from 30s for faster cleanup
DisableKeepAlives: false,
},
// Use a larger buffer to decouple request creation from processing.
// This prevents SendToFollowers from blocking when many followers need updates.
const minQueueBuffer = 500
queueBuffer := workerPoolSize * 10
if queueBuffer < minQueueBuffer {
queueBuffer = minQueueBuffer
}
queue = make(chan Job, queueBuffer)
// Initialize HTTP client with retry logic for transient failures
// The retryable client handles 502/503/504 errors automatically
httpClient = utils.GetRetryableHTTPClient()
// start workers
for i := 1; i <= workerPoolSize; i++ {
@@ -64,7 +64,7 @@ func InitOutboundWorkerPool(workerPoolSize int) {
// AddToOutboundQueue will queue up an outbound http request.
func AddToOutboundQueue(req *http.Request) {
// Check if domain should be skipped due to circuit breaker
if shouldSkipDomain(req.URL.Host) {
if ShouldSkipDomain(req.URL.Host) {
log.Debugf("Skipping request to %s due to circuit breaker", req.URL.Host)
return
}
@@ -119,8 +119,9 @@ func (e *httpError) Error() string {
return e.message
}
// shouldSkipDomain checks if a domain should be skipped due to circuit breaker.
func shouldSkipDomain(domain string) bool {
// ShouldSkipDomain checks if a domain should be skipped due to circuit breaker.
// This is exported so callers can check before expensive operations like request signing.
func ShouldSkipDomain(domain string) bool {
failedDomainsMutex.RLock()
defer failedDomainsMutex.RUnlock()

2
go.mod
View File

@@ -54,6 +54,8 @@ require (
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/gorilla/css v1.0.1 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-retryablehttp v0.7.8 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jonboulle/clockwork v0.5.0 // indirect
github.com/klauspost/compress v1.18.0 // indirect

4
go.sum
View File

@@ -52,6 +52,10 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafov/m3u8 v0.12.1 h1:DuP1uA1kvRRmGNAZ0m+ObLv1dvrfNO0TPx0c/enNk0s=
github.com/grafov/m3u8 v0.12.1/go.mod h1:nqzOkfBiZJENr52zTVd/Dcl03yzphIMbJqkXGu+u080=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
github.com/hashicorp/go-retryablehttp v0.7.8 h1:ylXZWnqa7Lhqpk0L1P1LzDtGcCR0rPVUrx/c8Unxc48=
github.com/hashicorp/go-retryablehttp v0.7.8/go.mod h1:rjiScheydd+CxvumBsIrFKlx3iS0jrZ7LvzFGFmuKbw=
github.com/jellydator/ttlcache/v3 v3.4.0 h1:YS4P125qQS0tNhtL6aeYkheEaB/m8HCqdMMP4mnWdTY=
github.com/jellydator/ttlcache/v3 v3.4.0/go.mod h1:Hw9EgjymziQD3yGsQdf1FqFdpp7YjFMd4Srg5EJlgD4=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=

72
utils/tlsconfig.go Normal file
View File

@@ -0,0 +1,72 @@
package utils
import (
"crypto/tls"
"net/http"
"os"
"sync"
"time"
"github.com/hashicorp/go-retryablehttp"
log "github.com/sirupsen/logrus"
)
var (
insecureSkipVerify bool
insecureSkipVerifyOnce sync.Once
)
// IsInsecureSkipVerifyEnabled returns true if the OWNCAST_INSECURE_SKIP_VERIFY
// environment variable is set to "true". This is intended for testing only.
func IsInsecureSkipVerifyEnabled() bool {
insecureSkipVerifyOnce.Do(func() {
insecureSkipVerify = os.Getenv("OWNCAST_INSECURE_SKIP_VERIFY") == "true"
if insecureSkipVerify {
log.Warnln("OWNCAST_INSECURE_SKIP_VERIFY is enabled - TLS certificate verification disabled (testing only)")
}
})
return insecureSkipVerify
}
// GetTLSConfig returns a TLS config that optionally skips certificate verification
// based on the OWNCAST_INSECURE_SKIP_VERIFY environment variable.
func GetTLSConfig() *tls.Config {
if IsInsecureSkipVerifyEnabled() {
return &tls.Config{
InsecureSkipVerify: true, // #nosec G402 - intentional for testing
}
}
return nil
}
// GetHTTPTransportWithTLS returns an http.Transport configured with TLS settings.
// If OWNCAST_INSECURE_SKIP_VERIFY is set, certificate verification is skipped.
func GetHTTPTransportWithTLS(baseTransport *http.Transport) *http.Transport {
if baseTransport == nil {
baseTransport = &http.Transport{}
}
baseTransport.TLSClientConfig = GetTLSConfig()
return baseTransport
}
// GetRetryableHTTPClient returns an http.Client with retry logic for transient failures.
// It uses hashicorp/go-retryablehttp with exponential backoff for 502, 503, 504 errors.
func GetRetryableHTTPClient() *http.Client {
retryClient := retryablehttp.NewClient()
retryClient.RetryMax = 3
retryClient.RetryWaitMin = 100 * time.Millisecond
retryClient.RetryWaitMax = 1 * time.Second
retryClient.Logger = nil // Disable default logging
// Configure transport with connection pooling limits
transport := GetHTTPTransportWithTLS(&http.Transport{
MaxIdleConns: 20, // Limit resource usage
MaxIdleConnsPerHost: 2, // Conservative per-host limit
IdleConnTimeout: 10 * time.Second, // Fast cleanup of idle connections
DisableKeepAlives: false,
})
retryClient.HTTPClient.Transport = transport
retryClient.HTTPClient.Timeout = 8 * time.Second // Short timeout - legitimate servers respond quickly
return retryClient.StandardClient()
}