acknowledgements: enabled: true enrichment_tables: sdk_map: type: file file: path: /etc/vector/config/sdk_map.csv encoding: type: csv schema: publishable_key: string merchant_id: string api: enabled: true address: 0.0.0.0:8686 sources: kafka_tx_events: type: kafka bootstrap_servers: kafka0:29092 group_id: sessionizer topics: - hyperswitch-payment-attempt-events - hyperswitch-payment-intent-events - hyperswitch-refund-events - hyperswitch-dispute-events decoding: codec: json sessionized_kafka_tx_events: type: kafka bootstrap_servers: kafka0:29092 group_id: sessionizer topics: - ^sessionizer decoding: codec: json app_logs: type: docker_logs include_labels: - "logs=promtail" vector_metrics: type: internal_metrics node_metrics: type: host_metrics sdk_source: type: http_server address: 0.0.0.0:3103 encoding: json transforms: events_create_ts: inputs: - kafka_tx_events source: |- .timestamp = from_unix_timestamp(.created_at, unit: "seconds") ?? now() ."@timestamp" = from_unix_timestamp(.created_at, unit: "seconds") ?? now() type: remap plus_1_events: type: filter inputs: - events_create_ts - sessionized_events_create_ts condition: ".sign_flag == 1" hs_server_logs: type: filter inputs: - app_logs condition: '.labels."com.docker.compose.service" == "hyperswitch-server"' parsed_hs_server_logs: type: remap inputs: - app_logs source: |- .message = parse_json!(.message) sessionized_events_create_ts: type: remap inputs: - sessionized_kafka_tx_events source: |- .timestamp = from_unix_timestamp(.created_at, unit: "milliseconds") ?? now() ."@timestamp" = from_unix_timestamp(.created_at, unit: "milliseconds") ?? now() sdk_transformed: type: throttle inputs: - sdk_source key_field: "{{ .payment_id }}{{ .merchant_id }}" threshold: 1000 window_secs: 60 amend_sdk_logs: type: remap inputs: - sdk_transformed source: | .before_transform = now() merchant_id = .merchant_id row = get_enrichment_table_record!("sdk_map", { "publishable_key" : merchant_id }, case_sensitive: true) .merchant_id = row.merchant_id .after_transform = now() sinks: opensearch_events_1: type: elasticsearch inputs: - plus_1_events endpoints: - "https://opensearch:9200" id_key: message_key api_version: v7 tls: verify_certificate: false verify_hostname: false auth: strategy: basic user: admin password: 0penS3arc# encoding: except_fields: - message_key - offset - partition - topic - clickhouse_database - last_synced - sign_flag bulk: index: "vector-{{ .topic }}" opensearch_events_2: type: elasticsearch inputs: - plus_1_events endpoints: - "https://opensearch:9200" id_key: message_key api_version: v7 tls: verify_certificate: false verify_hostname: false auth: strategy: basic user: admin password: 0penS3arc# encoding: except_fields: - message_key - offset - partition - topic - clickhouse_database - last_synced - sign_flag bulk: # Add a date suffixed index for better grouping index: "vector-{{ .topic }}-%Y-%m-%d" opensearch_events_3: type: elasticsearch inputs: - plus_1_events endpoints: - "https://opensearch:9200" id_key: message_key api_version: v7 tls: verify_certificate: false verify_hostname: false auth: strategy: basic user: admin password: 0penS3arc# encoding: except_fields: - message_key - offset - partition - topic - clickhouse_database - last_synced - sign_flag bulk: index: "{{ .topic }}" opensearch_logs: type: elasticsearch inputs: - parsed_hs_server_logs endpoints: - "https://opensearch:9200" api_version: v7 tls: verify_certificate: false verify_hostname: false auth: strategy: basic user: admin password: 0penS3arc# bulk: # Add a date suffixed index for better grouping # index: "vector-{{ .topic }}-%Y-%m-%d" index: "logs-{{ .container_name }}-%Y-%m-%d" log_events: type: loki inputs: - kafka_tx_events - sessionized_kafka_tx_events endpoint: http://loki:3100 labels: source: vector topic: "{{ .topic }}" job: kafka encoding: codec: json log_app_loki: type: loki inputs: - parsed_hs_server_logs endpoint: http://loki:3100 labels: source: vector job: app_logs container: "{{ .container_name }}" stream: "{{ .stream }}" encoding: codec: json metrics: type: prometheus_exporter inputs: - vector_metrics - node_metrics sdk_sink: type: kafka encoding: codec: json except_fields: - "path" - "source_type" inputs: - "amend_sdk_logs" bootstrap_servers: kafka0:29092 topic: hyper-sdk-logs key_field: ".merchant_id"