feat: add hyperswitch ai chats table (#8831)

Co-authored-by: Apoorv Dixit <apoorv.dixit@juspay.in>
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Jeeva Ramachandran
2025-09-16 13:55:41 +05:30
committed by GitHub
parent d93e73dd83
commit 8ed3f7dbf2
34 changed files with 743 additions and 37 deletions

View File

@ -298,6 +298,29 @@ impl SecretsHandler for settings::UserAuthMethodSettings {
}
}
#[async_trait::async_trait]
impl SecretsHandler for settings::ChatSettings {
async fn convert_to_raw_secret(
value: SecretStateContainer<Self, SecuredSecret>,
secret_management_client: &dyn SecretManagementInterface,
) -> CustomResult<SecretStateContainer<Self, RawSecret>, SecretsManagementError> {
let chat_settings = value.get_inner();
let encryption_key = if chat_settings.enabled {
secret_management_client
.get_secret(chat_settings.encryption_key.clone())
.await?
} else {
chat_settings.encryption_key.clone()
};
Ok(value.transition_state(|chat_settings| Self {
encryption_key,
..chat_settings
}))
}
}
#[async_trait::async_trait]
impl SecretsHandler for settings::NetworkTokenizationService {
async fn convert_to_raw_secret(
@ -450,9 +473,14 @@ pub(crate) async fn fetch_raw_secrets(
})
.await;
#[allow(clippy::expect_used)]
let chat = settings::ChatSettings::convert_to_raw_secret(conf.chat, secret_management_client)
.await
.expect("Failed to decrypt chat configs");
Settings {
server: conf.server,
chat: conf.chat,
chat,
master_database,
redis: conf.redis,
log: conf.log,

View File

@ -71,7 +71,7 @@ pub struct Settings<S: SecretState> {
pub server: Server,
pub proxy: Proxy,
pub env: Env,
pub chat: ChatSettings,
pub chat: SecretStateContainer<ChatSettings, S>,
pub master_database: SecretStateContainer<Database, S>,
#[cfg(feature = "olap")]
pub replica_database: SecretStateContainer<Database, S>,
@ -207,6 +207,7 @@ pub struct Platform {
pub struct ChatSettings {
pub enabled: bool,
pub hyperswitch_ai_host: String,
pub encryption_key: Secret<String>,
}
#[derive(Debug, Clone, Default, Deserialize)]
@ -1048,8 +1049,7 @@ impl Settings<SecuredSecret> {
self.secrets.get_inner().validate()?;
self.locker.validate()?;
self.connectors.validate("connectors")?;
self.chat.validate()?;
self.chat.get_inner().validate()?;
self.cors.validate()?;
self.scheduler

View File

@ -1,18 +1,24 @@
use api_models::chat as chat_api;
use common_utils::{
consts,
crypto::{DecodeMessage, GcmAes256},
errors::CustomResult,
request::{Method, RequestBuilder, RequestContent},
};
use error_stack::ResultExt;
use external_services::http_client;
use hyperswitch_domain_models::chat as chat_domain;
use router_env::{instrument, logger, tracing};
use masking::ExposeInterface;
use router_env::{
instrument, logger,
tracing::{self, Instrument},
};
use crate::{
db::errors::chat::ChatErrors,
routes::{app::SessionStateInfo, SessionState},
services::{authentication as auth, ApplicationResponse},
services::{authentication as auth, authorization::roles, ApplicationResponse},
utils,
};
#[instrument(skip_all, fields(?session_id))]
@ -22,15 +28,34 @@ pub async fn get_data_from_hyperswitch_ai_workflow(
req: chat_api::ChatRequest,
session_id: Option<&str>,
) -> CustomResult<ApplicationResponse<chat_api::ChatResponse>, ChatErrors> {
let url = format!("{}/webhook", state.conf.chat.hyperswitch_ai_host);
let request_id = state.get_request_id();
let role_info = roles::RoleInfo::from_role_id_org_id_tenant_id(
&state,
&user_from_token.role_id,
&user_from_token.org_id,
user_from_token
.tenant_id
.as_ref()
.unwrap_or(&state.tenant.tenant_id),
)
.await
.change_context(ChatErrors::InternalServerError)
.attach_printable("Failed to retrieve role information")?;
let url = format!(
"{}/webhook",
state.conf.chat.get_inner().hyperswitch_ai_host
);
let request_id = state
.get_request_id()
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let request_body = chat_domain::HyperswitchAiDataRequest {
query: chat_domain::GetDataMessage {
message: req.message,
message: req.message.clone(),
},
org_id: user_from_token.org_id,
merchant_id: user_from_token.merchant_id,
profile_id: user_from_token.profile_id,
org_id: user_from_token.org_id.clone(),
merchant_id: user_from_token.merchant_id.clone(),
profile_id: user_from_token.profile_id.clone(),
entity_type: role_info.get_entity_type(),
};
logger::info!("Request for AI service: {:?}", request_body);
@ -38,11 +63,9 @@ pub async fn get_data_from_hyperswitch_ai_workflow(
.method(Method::Post)
.url(&url)
.attach_default_headers()
.header(consts::X_REQUEST_ID, &request_id)
.set_body(RequestContent::Json(Box::new(request_body.clone())));
if let Some(request_id) = request_id {
request_builder = request_builder.header(consts::X_REQUEST_ID, &request_id);
}
if let Some(session_id) = session_id {
request_builder = request_builder.header(consts::X_CHAT_SESSION_ID, session_id);
}
@ -57,10 +80,132 @@ pub async fn get_data_from_hyperswitch_ai_workflow(
.await
.change_context(ChatErrors::InternalServerError)
.attach_printable("Error when sending request to AI service")?
.json::<_>()
.json::<chat_api::ChatResponse>()
.await
.change_context(ChatErrors::InternalServerError)
.attach_printable("Error when deserializing response from AI service")?;
Ok(ApplicationResponse::Json(response))
let response_to_return = response.clone();
tokio::spawn(
async move {
let new_hyperswitch_ai_interaction = utils::chat::construct_hyperswitch_ai_interaction(
&state,
&user_from_token,
&req,
&response,
&request_id,
)
.await;
match new_hyperswitch_ai_interaction {
Ok(interaction) => {
let db = state.store.as_ref();
if let Err(e) = db.insert_hyperswitch_ai_interaction(interaction).await {
logger::error!("Failed to insert hyperswitch_ai_interaction: {:?}", e);
}
}
Err(e) => {
logger::error!("Failed to construct hyperswitch_ai_interaction: {:?}", e);
}
}
}
.in_current_span(),
);
Ok(ApplicationResponse::Json(response_to_return))
}
#[instrument(skip_all)]
pub async fn list_chat_conversations(
state: SessionState,
user_from_token: auth::UserFromToken,
req: chat_api::ChatListRequest,
) -> CustomResult<ApplicationResponse<chat_api::ChatListResponse>, ChatErrors> {
let role_info = roles::RoleInfo::from_role_id_org_id_tenant_id(
&state,
&user_from_token.role_id,
&user_from_token.org_id,
user_from_token
.tenant_id
.as_ref()
.unwrap_or(&state.tenant.tenant_id),
)
.await
.change_context(ChatErrors::InternalServerError)
.attach_printable("Failed to retrieve role information")?;
if !role_info.is_internal() {
return Err(error_stack::Report::new(ChatErrors::UnauthorizedAccess)
.attach_printable("Only internal roles are allowed for this operation"));
}
let db = state.store.as_ref();
let hyperswitch_ai_interactions = db
.list_hyperswitch_ai_interactions(
req.merchant_id,
req.limit.unwrap_or(consts::DEFAULT_LIST_LIMIT),
req.offset.unwrap_or(consts::DEFAULT_LIST_OFFSET),
)
.await
.change_context(ChatErrors::InternalServerError)
.attach_printable("Error when fetching hyperswitch_ai_interactions")?;
let encryption_key = state.conf.chat.get_inner().encryption_key.clone().expose();
let key = match hex::decode(&encryption_key) {
Ok(key) => key,
Err(e) => {
router_env::logger::error!("Failed to decode encryption key: {}", e);
encryption_key.as_bytes().to_vec()
}
};
let mut conversations = Vec::new();
for interaction in hyperswitch_ai_interactions {
let user_query_encrypted = interaction
.user_query
.ok_or(ChatErrors::InternalServerError)
.attach_printable("Missing user_query field in hyperswitch_ai_interaction")?;
let response_encrypted = interaction
.response
.ok_or(ChatErrors::InternalServerError)
.attach_printable("Missing response field in hyperswitch_ai_interaction")?;
let user_query_decrypted_bytes = GcmAes256
.decode_message(&key, user_query_encrypted.into_inner())
.change_context(ChatErrors::InternalServerError)
.attach_printable("Failed to decrypt user query")?;
let response_decrypted_bytes = GcmAes256
.decode_message(&key, response_encrypted.into_inner())
.change_context(ChatErrors::InternalServerError)
.attach_printable("Failed to decrypt response")?;
let user_query_decrypted = String::from_utf8(user_query_decrypted_bytes)
.change_context(ChatErrors::InternalServerError)
.attach_printable("Failed to convert decrypted user query to string")?;
let response_decrypted = serde_json::from_slice(&response_decrypted_bytes)
.change_context(ChatErrors::InternalServerError)
.attach_printable("Failed to deserialize decrypted response")?;
conversations.push(chat_api::ChatConversation {
id: interaction.id,
session_id: interaction.session_id,
user_id: interaction.user_id,
merchant_id: interaction.merchant_id,
profile_id: interaction.profile_id,
org_id: interaction.org_id,
role_id: interaction.role_id,
user_query: user_query_decrypted.into(),
response: response_decrypted,
database_query: interaction.database_query,
interaction_status: interaction.interaction_status,
created_at: interaction.created_at,
});
}
return Ok(ApplicationResponse::Json(chat_api::ChatListResponse {
conversations,
}));
}

View File

@ -6,6 +6,8 @@ pub enum ChatErrors {
MissingConfigError,
#[error("Chat response deserialization failed")]
ChatResponseDeserializationFailed,
#[error("Unauthorized access")]
UnauthorizedAccess,
}
impl common_utils::errors::ErrorSwitch<api_models::errors::types::ApiErrorResponse> for ChatErrors {
@ -22,6 +24,9 @@ impl common_utils::errors::ErrorSwitch<api_models::errors::types::ApiErrorRespon
Self::ChatResponseDeserializationFailed => {
AER::BadRequest(ApiError::new(sub_code, 2, self.get_error_message(), None))
}
Self::UnauthorizedAccess => {
AER::Unauthorized(ApiError::new(sub_code, 3, self.get_error_message(), None))
}
}
}
}
@ -32,6 +37,7 @@ impl ChatErrors {
Self::InternalServerError => "Something went wrong".to_string(),
Self::MissingConfigError => "Missing webhook url".to_string(),
Self::ChatResponseDeserializationFailed => "Failed to parse chat response".to_string(),
Self::UnauthorizedAccess => "Not authorized to access the resource".to_string(),
}
}
}

View File

@ -20,6 +20,7 @@ pub mod fraud_check;
pub mod generic_link;
pub mod gsm;
pub mod health_check;
pub mod hyperswitch_ai_interaction;
pub mod kafka_store;
pub mod locker_mock_up;
pub mod mandate;
@ -137,6 +138,7 @@ pub trait StorageInterface:
+ user::sample_data::BatchSampleDataInterface
+ health_check::HealthCheckDbInterface
+ user_authentication_method::UserAuthenticationMethodInterface
+ hyperswitch_ai_interaction::HyperswitchAiInteractionInterface
+ authentication::AuthenticationInterface
+ generic_link::GenericLinkInterface
+ relay::RelayInterface

View File

@ -0,0 +1,123 @@
use diesel_models::hyperswitch_ai_interaction as storage;
use error_stack::report;
use router_env::{instrument, tracing};
use super::MockDb;
use crate::{
connection,
core::errors::{self, CustomResult},
services::Store,
};
#[async_trait::async_trait]
pub trait HyperswitchAiInteractionInterface {
async fn insert_hyperswitch_ai_interaction(
&self,
hyperswitch_ai_interaction: storage::HyperswitchAiInteractionNew,
) -> CustomResult<storage::HyperswitchAiInteraction, errors::StorageError>;
async fn list_hyperswitch_ai_interactions(
&self,
merchant_id: Option<common_utils::id_type::MerchantId>,
limit: i64,
offset: i64,
) -> CustomResult<Vec<storage::HyperswitchAiInteraction>, errors::StorageError>;
}
#[async_trait::async_trait]
impl HyperswitchAiInteractionInterface for Store {
#[instrument(skip_all)]
async fn insert_hyperswitch_ai_interaction(
&self,
hyperswitch_ai_interaction: storage::HyperswitchAiInteractionNew,
) -> CustomResult<storage::HyperswitchAiInteraction, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
hyperswitch_ai_interaction
.insert(&conn)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
#[instrument(skip_all)]
async fn list_hyperswitch_ai_interactions(
&self,
merchant_id: Option<common_utils::id_type::MerchantId>,
limit: i64,
offset: i64,
) -> CustomResult<Vec<storage::HyperswitchAiInteraction>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage::HyperswitchAiInteraction::filter_by_optional_merchant_id(
&conn,
merchant_id.as_ref(),
limit,
offset,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
}
#[async_trait::async_trait]
impl HyperswitchAiInteractionInterface for MockDb {
async fn insert_hyperswitch_ai_interaction(
&self,
hyperswitch_ai_interaction: storage::HyperswitchAiInteractionNew,
) -> CustomResult<storage::HyperswitchAiInteraction, errors::StorageError> {
let mut hyperswitch_ai_interactions = self.hyperswitch_ai_interactions.lock().await;
let hyperswitch_ai_interaction = storage::HyperswitchAiInteraction {
id: hyperswitch_ai_interaction.id,
session_id: hyperswitch_ai_interaction.session_id,
user_id: hyperswitch_ai_interaction.user_id,
merchant_id: hyperswitch_ai_interaction.merchant_id,
profile_id: hyperswitch_ai_interaction.profile_id,
org_id: hyperswitch_ai_interaction.org_id,
role_id: hyperswitch_ai_interaction.role_id,
user_query: hyperswitch_ai_interaction.user_query,
response: hyperswitch_ai_interaction.response,
database_query: hyperswitch_ai_interaction.database_query,
interaction_status: hyperswitch_ai_interaction.interaction_status,
created_at: hyperswitch_ai_interaction.created_at,
};
hyperswitch_ai_interactions.push(hyperswitch_ai_interaction.clone());
Ok(hyperswitch_ai_interaction)
}
async fn list_hyperswitch_ai_interactions(
&self,
merchant_id: Option<common_utils::id_type::MerchantId>,
limit: i64,
offset: i64,
) -> CustomResult<Vec<storage::HyperswitchAiInteraction>, errors::StorageError> {
let hyperswitch_ai_interactions = self.hyperswitch_ai_interactions.lock().await;
let offset_usize = offset.try_into().unwrap_or_else(|_| {
common_utils::consts::DEFAULT_LIST_OFFSET
.try_into()
.unwrap_or(usize::MIN)
});
let limit_usize = limit.try_into().unwrap_or_else(|_| {
common_utils::consts::DEFAULT_LIST_LIMIT
.try_into()
.unwrap_or(usize::MAX)
});
let filtered_interactions: Vec<storage::HyperswitchAiInteraction> =
hyperswitch_ai_interactions
.iter()
.filter(
|interaction| match (merchant_id.as_ref(), &interaction.merchant_id) {
(Some(merchant_id), Some(interaction_merchant_id)) => {
interaction_merchant_id == &merchant_id.get_string_repr().to_owned()
}
(None, _) => true,
_ => false,
},
)
.skip(offset_usize)
.take(limit_usize)
.cloned()
.collect();
Ok(filtered_interactions)
}
}

View File

@ -43,6 +43,7 @@ use time::PrimitiveDateTime;
use super::{
dashboard_metadata::DashboardMetadataInterface,
ephemeral_key::ClientSecretInterface,
hyperswitch_ai_interaction::HyperswitchAiInteractionInterface,
role::RoleInterface,
user::{sample_data::BatchSampleDataInterface, theme::ThemeInterface, UserInterface},
user_authentication_method::UserAuthenticationMethodInterface,
@ -4127,6 +4128,29 @@ impl UserAuthenticationMethodInterface for KafkaStore {
}
}
#[async_trait::async_trait]
impl HyperswitchAiInteractionInterface for KafkaStore {
async fn insert_hyperswitch_ai_interaction(
&self,
hyperswitch_ai_interaction: storage::HyperswitchAiInteractionNew,
) -> CustomResult<storage::HyperswitchAiInteraction, errors::StorageError> {
self.diesel_store
.insert_hyperswitch_ai_interaction(hyperswitch_ai_interaction)
.await
}
async fn list_hyperswitch_ai_interactions(
&self,
merchant_id: Option<id_type::MerchantId>,
limit: i64,
offset: i64,
) -> CustomResult<Vec<storage::HyperswitchAiInteraction>, errors::StorageError> {
self.diesel_store
.list_hyperswitch_ai_interactions(merchant_id, limit, offset)
.await
}
}
#[async_trait::async_trait]
impl ThemeInterface for KafkaStore {
async fn insert_theme(

View File

@ -2343,12 +2343,16 @@ pub struct Chat;
impl Chat {
pub fn server(state: AppState) -> Scope {
let mut route = web::scope("/chat").app_data(web::Data::new(state.clone()));
if state.conf.chat.enabled {
if state.conf.chat.get_inner().enabled {
route = route.service(
web::scope("/ai").service(
web::resource("/data")
.route(web::post().to(chat::get_data_from_hyperswitch_ai_workflow)),
),
web::scope("/ai")
.service(
web::resource("/data")
.route(web::post().to(chat::get_data_from_hyperswitch_ai_workflow)),
)
.service(
web::resource("/list").route(web::get().to(chat::get_all_conversations)),
),
);
}
route

View File

@ -45,3 +45,24 @@ pub async fn get_data_from_hyperswitch_ai_workflow(
))
.await
}
#[instrument(skip_all)]
pub async fn get_all_conversations(
state: web::Data<AppState>,
http_req: HttpRequest,
payload: web::Query<chat_api::ChatListRequest>,
) -> HttpResponse {
let flow = Flow::ListAllChatInteractions;
Box::pin(api::server_wrap(
flow.clone(),
state,
&http_req,
payload.into_inner(),
|state, user: auth::UserFromToken, payload, _| {
chat_core::list_chat_conversations(state, user, payload)
},
&auth::DashboardNoPermissionAuth,
api_locking::LockAction::NotApplicable,
))
.await
}

View File

@ -315,7 +315,7 @@ impl From<Flow> for ApiIdentifier {
| Flow::ListAllThemesInLineage
| Flow::CloneConnector => Self::User,
Flow::GetDataFromHyperswitchAiFlow => Self::AiWorkflow,
Flow::GetDataFromHyperswitchAiFlow | Flow::ListAllChatInteractions => Self::AiWorkflow,
Flow::ListRolesV2
| Flow::ListInvitableRolesAtEntityLevel

View File

@ -21,6 +21,7 @@ pub mod file;
pub mod fraud_check;
pub mod generic_link;
pub mod gsm;
pub mod hyperswitch_ai_interaction;
#[cfg(feature = "kv_store")]
pub mod kv;
pub mod locker_mock_up;
@ -75,8 +76,9 @@ pub use self::{
blocklist_fingerprint::*, blocklist_lookup::*, business_profile::*, callback_mapper::*,
capture::*, cards_info::*, configs::*, customers::*, dashboard_metadata::*, dispute::*,
dynamic_routing_stats::*, ephemeral_key::*, events::*, file::*, fraud_check::*,
generic_link::*, gsm::*, locker_mock_up::*, mandate::*, merchant_account::*,
merchant_connector_account::*, merchant_key_store::*, payment_link::*, payment_method::*,
process_tracker::*, refund::*, reverse_lookup::*, role::*, routing_algorithm::*,
subscription::*, unified_translations::*, user::*, user_authentication_method::*, user_role::*,
generic_link::*, gsm::*, hyperswitch_ai_interaction::*, locker_mock_up::*, mandate::*,
merchant_account::*, merchant_connector_account::*, merchant_key_store::*, payment_link::*,
payment_method::*, process_tracker::*, refund::*, reverse_lookup::*, role::*,
routing_algorithm::*, subscription::*, unified_translations::*, user::*,
user_authentication_method::*, user_role::*,
};

View File

@ -0,0 +1 @@
pub use diesel_models::hyperswitch_ai_interaction::*;

View File

@ -1,3 +1,4 @@
pub mod chat;
#[cfg(feature = "olap")]
pub mod connector_onboarding;
pub mod currency;

View File

@ -0,0 +1,70 @@
use api_models::chat as chat_api;
use common_utils::{type_name, types::keymanager::Identifier};
use diesel_models::hyperswitch_ai_interaction::{
HyperswitchAiInteraction, HyperswitchAiInteractionNew,
};
use error_stack::ResultExt;
use hyperswitch_domain_models::type_encryption::{crypto_operation, CryptoOperation};
use masking::ExposeInterface;
use crate::{
core::errors::{self, CustomResult},
routes::SessionState,
services::authentication as auth,
};
pub async fn construct_hyperswitch_ai_interaction(
state: &SessionState,
user_from_token: &auth::UserFromToken,
req: &chat_api::ChatRequest,
response: &chat_api::ChatResponse,
request_id: &str,
) -> CustomResult<HyperswitchAiInteractionNew, errors::ApiErrorResponse> {
let encryption_key = state.conf.chat.get_inner().encryption_key.clone().expose();
let key = match hex::decode(&encryption_key) {
Ok(key) => key,
Err(e) => {
router_env::logger::error!("Failed to decode encryption key: {}", e);
// Fallback to using the string as bytes, which was the previous behavior
encryption_key.as_bytes().to_vec()
}
};
let encrypted_user_query = crypto_operation::<String, masking::WithType>(
&state.into(),
type_name!(HyperswitchAiInteraction),
CryptoOperation::Encrypt(req.message.clone()),
Identifier::Merchant(user_from_token.merchant_id.clone()),
&key,
)
.await
.and_then(|val| val.try_into_operation())
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to encrypt user query")?;
let encrypted_response = crypto_operation::<serde_json::Value, masking::WithType>(
&state.into(),
type_name!(HyperswitchAiInteraction),
CryptoOperation::Encrypt(response.response.clone()),
Identifier::Merchant(user_from_token.merchant_id.clone()),
&key,
)
.await
.and_then(|val| val.try_into_operation())
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to encrypt response")?;
Ok(HyperswitchAiInteractionNew {
id: request_id.to_owned(),
session_id: Some(request_id.to_string()),
user_id: Some(user_from_token.user_id.clone()),
merchant_id: Some(user_from_token.merchant_id.get_string_repr().to_string()),
profile_id: Some(user_from_token.profile_id.get_string_repr().to_string()),
org_id: Some(user_from_token.org_id.get_string_repr().to_string()),
role_id: Some(user_from_token.role_id.clone()),
user_query: Some(encrypted_user_query.into()),
response: Some(encrypted_response.into()),
database_query: response.query_executed.clone().map(|q| q.expose()),
interaction_status: Some(response.status.clone()),
created_at: common_utils::date_time::now(),
})
}