fix(kafka): fix kafka timestamps sent from application (#4709)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Sampras Lopes
2024-05-23 14:44:13 +05:30
committed by GitHub
parent ba624d0498
commit c778af26dd
5 changed files with 2 additions and 18 deletions

View File

@ -259,7 +259,7 @@ impl KafkaProducer {
.timestamp( .timestamp(
event event
.creation_timestamp() .creation_timestamp()
.unwrap_or_else(|| OffsetDateTime::now_utc().unix_timestamp()), .unwrap_or_else(|| OffsetDateTime::now_utc().unix_timestamp() * 1_000),
), ),
) )
.map_err(|(error, record)| report!(error).attach_printable(format!("{record:?}"))) .map_err(|(error, record)| report!(error).attach_printable(format!("{record:?}")))
@ -476,7 +476,7 @@ impl MessagingInterface for KafkaProducer {
.key(&data.identifier()) .key(&data.identifier())
.payload(&json_data) .payload(&json_data)
.timestamp( .timestamp(
(timestamp.assume_utc().unix_timestamp_nanos() / 1_000) (timestamp.assume_utc().unix_timestamp_nanos() / 1_000_000)
.to_i64() .to_i64()
.unwrap_or_else(|| { .unwrap_or_else(|| {
// kafka producer accepts milliseconds // kafka producer accepts milliseconds

View File

@ -70,10 +70,6 @@ impl<'a> super::KafkaMessage for KafkaDispute<'a> {
) )
} }
fn creation_timestamp(&self) -> Option<i64> {
Some(self.modified_at.unix_timestamp())
}
fn event_type(&self) -> crate::events::EventType { fn event_type(&self) -> crate::events::EventType {
crate::events::EventType::Dispute crate::events::EventType::Dispute
} }

View File

@ -108,10 +108,6 @@ impl<'a> super::KafkaMessage for KafkaPaymentAttempt<'a> {
) )
} }
fn creation_timestamp(&self) -> Option<i64> {
Some(self.modified_at.unix_timestamp())
}
fn event_type(&self) -> crate::events::EventType { fn event_type(&self) -> crate::events::EventType {
crate::events::EventType::PaymentAttempt crate::events::EventType::PaymentAttempt
} }

View File

@ -66,10 +66,6 @@ impl<'a> super::KafkaMessage for KafkaPaymentIntent<'a> {
format!("{}_{}", self.merchant_id, self.payment_id) format!("{}_{}", self.merchant_id, self.payment_id)
} }
fn creation_timestamp(&self) -> Option<i64> {
Some(self.modified_at.unix_timestamp())
}
fn event_type(&self) -> crate::events::EventType { fn event_type(&self) -> crate::events::EventType {
crate::events::EventType::PaymentIntent crate::events::EventType::PaymentIntent
} }

View File

@ -80,10 +80,6 @@ impl<'a> super::KafkaMessage for KafkaPayout<'a> {
format!("{}_{}", self.merchant_id, self.payout_attempt_id) format!("{}_{}", self.merchant_id, self.payout_attempt_id)
} }
fn creation_timestamp(&self) -> Option<i64> {
Some(self.last_modified_at.unix_timestamp())
}
fn event_type(&self) -> crate::events::EventType { fn event_type(&self) -> crate::events::EventType {
crate::events::EventType::Payout crate::events::EventType::Payout
} }