Kogito Serverless Workflow Order Processing Example
Description
In this example we will see how to use the Serverless Workflow specification implementation on Kogito to orchestrate events with Knative Eventing.
This use case is about processing an incoming order and producing new events based on the order details. The images below illustrates the workflow:
The main workflow process the incoming Order event and start a parallel state calling two subflows: Fraud Handling and Shipping Handling. The workflow will end once both subflows end.
Fraud Handling will produce a new FraudEvaluation
event if the order is above 1000 USD. Any other system or
service in the architecture can then read this event and react upon it, like canceling the order for example.
In parallel, regarding or not the order would need fraud evaluation, the workflow will produce events classifying the required Shipping service: International or Domestic. For this example, domestic shipping is any order with address within US.
Infrastructure requirements
This quickstart requires Knative Eventing to be available in your cluster:
- Install minikube
- Install Knative Eventing in your minikube cluster
For local testing only you can use Podman or Docker to simulate an application receiving your events.
Build and run
Prerequisites
You will need:
- Java 17+ installed
- Environment variable JAVA_HOME set accordingly
- Maven 3.9.6+ installed
When using native image compilation, you will also need:
- GraalVM 19.3+ installed
- Environment variable GRAALVM_HOME set accordingly
- GraalVM native image needs as well native-image extension: https://www.graalvm.org/reference-manual/native-image/
- Note that GraalVM native image compilation typically requires other packages (glibc-devel, zlib-devel and gcc) to be installed too, please refer to GraalVM installation documentation for more details.
Compile and Run in Local Dev Mode
K_SINK=http://localhost:8181 mvn clean compile quarkus:dev
K_SINK
is the environment variable injected by the Knative Eventing platform
once we deploy the application on a Kubernetes cluster.
Instead of hardcoding the endpoint where we are going to send our produced messages, we rely on Knative to inject this information in runtime.
The environment variable will be assigned to the SmallRye HTTP connector in runtime:
mp.messaging.outgoing.kogito_outgoing_stream.url=${K_SINK}
.
For local tests we will mock the endpoint where the produced messages supposed to be delivered.
Package and Run in JVM mode
mvn clean package
java -jar target/quarkus-app/quarkus-run.jar
or on Windows
mvn clean package
java -jar target\quarkus-app\quarkus-run.jar
Package and Run using Local Native Image
Note that this requires GRAALVM_HOME to point to a valid GraalVM installation
mvn clean package -Pnative
To run the generated native executable, generated in target/
, execute
./target/serverless-workflow-order-processing-runner
OpenAPI (Swagger) documentation
You can take a look at the OpenAPI definition - automatically generated and included in this service - to determine all available operations exposed by this service.
For easy readability you can visualize the OpenAPI definition file using a UI tool like for example available Swagger UI.
In addition, various clients to interact with this service can be easily generated using this OpenAPI definition.
When running in either Quarkus Development or Native mode, we also leverage the Quarkus OpenAPI extension that exposes Swagger UI that you can use to look at available REST endpoints and send test requests.
Use the application locally
First thing, fire up the sink application using podman/docker:
$ podman run --rm -it -p 8181:8080 gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/event_display
This is the same image used by Knative Eventing demos. It's running on port 8181 to not clash with the example application.
Then run the application with:
$ K_SINK=http://localhost:8181 mvn clean quarkus:dev
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2021-04-13 17:39:39,488 INFO [org.kie.kog.cod.api.uti.AddonsConfigDiscovery] (build-22) Performed addonsConfig discovery, found: AddonsConfig{usePersistence=false, useTracing=false, useMonitoring=false, usePrometheusMonitoring=false, useKnativeEventing=true, useCloudEvents=true, useExplainability=false}
2021-04-13 17:39:39,612 INFO [io.ser.api.wor.BaseWorkflow] (build-22) Unable to convert as json markup, trying as yaml
2021-04-13 17:39:39,676 INFO [io.ser.api.wor.BaseWorkflow] (build-22) Unable to convert as json markup, trying as yaml
2021-04-13 17:39:39,677 INFO [io.ser.api.wor.BaseWorkflow] (build-22) Unable to convert as json markup, trying as yaml
2021-04-13 17:39:39,746 INFO [org.kie.kog.cod.cor.uti.ApplicationGeneratorDiscovery] (build-22) Generator discovery performed, found [openapispecs, processes]
2021-04-13 17:39:40,193 INFO [org.kie.kog.cod.pro.ProcessCodegen] (build-22) Knative Eventing addon enabled, generating CloudEvent HTTP listener
2021-04-13 17:39:41,034 WARN [io.qua.arc.pro.BeanArchives] (build-45) Failed to index byte: Class does not exist in ClassLoader QuarkusClassLoader:Deployment Class Loader
2021-04-13 17:39:41,754 INFO [org.kie.kog.ser.eve.imp.AbstractMessageConsumer] (Quarkus Main Thread) Consumer for class com.fasterxml.jackson.databind.JsonNode started.
2021-04-13 17:39:41,776 INFO [org.kie.kog.add.clo.qua.QuarkusKogitoExtensionInitializer] (Quarkus Main Thread) Registered Kogito CloudEvent extension
2021-04-13 17:39:41,827 INFO [io.quarkus] (Quarkus Main Thread) serverless-workflow-order-processing 999-SNAPSHOT on JVM (powered by Quarkus 1.11.5.Final) started in 2.815s. Listening on: http://localhost:8080
2021-04-13 17:39:41,827 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2021-04-13 17:39:41,827 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, mutiny, rest-client, rest-client-jackson, resteasy, resteasy-jackson, servlet, smallrye-context-propagation, smallrye-health, smallrye-openapi, smallrye-reactive-messaging, swagger-ui, vertx]
Now send a message to the application on 8080 port using the cloud event format with curl
:
$ curl -X POST \
-H "content-type: application/json" \
-H "ce-specversion: 1.0" \
-H "ce-source: /from/localhost" \
-H "ce-type: orderEvent" \
-H "ce-id: f0643c68-609c-48aa-a820-5df423fa4fe0" \
-d ' {"id":"f0643c68-609c-48aa-a820-5df423fa4fe0","country":"Brazil","total":10000,"description":"iPhone 12"}' \
http://localhost:8080
You should see an output like this one in the running container terminal:
☁️ cloudevents.Event
Validation: valid
Context Attributes,
specversion: 1.0
type: fraudEvaluation
source: /process/fraudhandling
id: 2acd4329-a215-4460-9750-d7f07eb2d5e5
time: 2021-04-13T20:40:38.394409Z
Extensions,
kogitoparentprociid: f12e91c0-8980-40b6-a49b-3c35ce435718
kogitoprocid: fraudhandling
kogitoprocinstanceid: b822962b-959d-412c-bb4b-80a0cdcdf7f0
kogitorootprocid: orderworkflow
kogitorootprociid: f12e91c0-8980-40b6-a49b-3c35ce435718
kogitousertaskist: 1
Data,
{"id":"f0643c68-609c-48aa-a820-5df423fa4fe0","country":"Brazil","total":10000,"description":"iPhone 12","fraudEvaluation":true}
☁️ cloudevents.Event
Validation: valid
Context Attributes,
specversion: 1.0
type: internationalShipping
source: /process/shippinghandling
id: d557fad8-81b5-482a-b981-5ecb267a92f9
time: 2021-04-13T20:40:38.431677Z
Extensions,
kogitoparentprociid: f12e91c0-8980-40b6-a49b-3c35ce435718
kogitoprocid: shippinghandling
kogitoprocinstanceid: ba57743d-521f-41f2-864a-a7b3f68d35af
kogitorootprocid: orderworkflow
kogitorootprociid: f12e91c0-8980-40b6-a49b-3c35ce435718
kogitousertaskist: 1
Data,
{"id":"f0643c68-609c-48aa-a820-5df423fa4fe0","country":"Brazil","total":10000,"description":"iPhone 12","fraudEvaluation":true,"shipping":"international"}
You will notice that we received two events: one for Fraud Evaluation and another one for International Shipping processing.
Additionally, there are some extension attributes that starts with kogito
to provide some context of the execution and the event producer.
To take the other path of the workflow send this message to your application:
$ curl -X POST \
-H "content-type: application/json" \
-H "ce-specversion: 1.0" \
-H "ce-source: /from/localhost" \
-H "ce-type: orderEvent" \
-H "ce-id: f0643c68-609c-48aa-a820-5df423fa4fe0" \
-d ' {"id":"f0643c68-609c-48aa-a820-5df423fa4fe0","country":"Brazil","total":500,"description":"iPhone 7"}' \
http://localhost:8080
Since the value is under 1000 USD, we will only receive the International Shipping event:
☁️ cloudevents.Event
Validation: valid
Context Attributes,
specversion: 1.0
type: internationalShipping
source: /process/shippinghandling
id: 096d1009-7566-41ac-a3f1-ab0756172adc
time: 2021-04-13T20:44:12.391093Z
Extensions,
kogitoparentprociid: 4ae2d088-5492-417a-b625-3a4ac0e91403
kogitoprocid: shippinghandling
kogitoprocinstanceid: 2e9fca25-c1f9-4e31-bc08-0ad740cfea5d
kogitorootprocid: orderworkflow
kogitorootprociid: 4ae2d088-5492-417a-b625-3a4ac0e91403
kogitousertaskist: 1
Data,
{"id":"f0643c68-609c-48aa-a820-5df423fa4fe0","country":"Brazil","total":500,"description":"iPhone 12","shipping":"international"}
Deploying in Minikube
We have prepared a knative
Maven profile to build the service image locally and all the Knative resources you need
to get started.
Just make sure your cluster has Knative Eventing available:
- Install Knative
- Install the
KogitoSource
via command line. - Run
eval $(minikube docker-env)
to build the image directly into the Minikube registry. - Run
mvn clean install -Pknative -Dnamespace=<your namespace>
to build the image and the Knative resources for your application to run. - Apply the objects created for you with
kubectl apply -f target/kubernetes/knative.yml,target/kubernetes/kogito.yml
. It will deploy the objects fromknative.yml
andkogito.yml
generated files. - Run
curl
from the terminal like you did in the previously steps. To see what's going on, just query for one of the Knative service sinks created on step #5. You should see something like:
☁️ cloudevents.Event
Validation: valid
Context Attributes,
specversion: 1.0
type: internationalShipping
source: /process/shippinghandling
id: f1193deb-9c4e-4e90-8aab-27fdba4ed740
time: 2021-04-15T16:12:04.455905Z
Extensions,
knativearrivaltime: 2021-04-15T16:12:04.657355879Z
kogitoparentprociid: 836fbb3b-a68f-487b-9d96-cc93ca387ab9
kogitoprocid: shippinghandling
kogitoprocinstanceid: 1cc0d2b9-c5fb-445c-8229-5790e7a7b254
kogitorootprocid: orderworkflow
kogitorootprociid: 836fbb3b-a68f-487b-9d96-cc93ca387ab9
kogitousertaskist: 1
Data,
{"id":"f0643c68-609c-48aa-a820-5df423fa4fe0","country":"Brazil","total":500,"description":"iPhone 7","shipping":"international"}
Accessing the Service on Minikube
Ideally, you installed Knative on Minikube via their quickstart. Doing so, you will have installed nip.io DNS and will be able to access the services via their exposed Knative Routes.
Alternatively, if you installed via Knative Operators and Istio, you will need to follow this procedure in order to access the service:
-
Run
minikube tunnel
-
Define the Ingress Gateway
INGRESSGATEWAY=istio-ingressgateway
-
Get the Gateway IP with:
export GATEWAY_IP=`kubectl get svc $INGRESSGATEWAY --namespace istio-system \ --output jsonpath="{.status.loadBalancer.ingress[*]['ip']}"`
-
Run the
curl
command using the Gateway URL. For example:curl -X POST \ -H "content-type: application/json" \ -H "ce-specversion: 1.0" \ -H "ce-source: /from/localhost" \ -H "ce-type: orderEvent" \ -H "ce-id: f0643c68-609c-48aa-a820-5df423fa4fe0" \ -d ' {"id":"f0643c68-609c-48aa-a820-5df423fa4fe0","country":"Brazil","total":15000,"description":"iPhone 7"}' \ http://${GATEWAY_IP} --header "Host:serverless-workflow-order-processing.kogito.example.com"