How to Orchestrate Data Flows in a Hybrid Cloud with AWS Step Functions | Kranio
How to Orchestrate Data Flows in a Hybrid Cloud with AWS Step Functions
Team Kranio 3 de marzo de 2021
Compartir:
This article is based on the webinar “Automate Data Flows between Legacy Systems and Cloud”. The article is more detailed and step-by-step with code. If you prefer to watch the 1-hour webinar now, click here and choose 'watch replay' at the end.
Why orchestrate processes?
Orchestrating and automating processes is part of companies' goals in their digital transformation phase. This is because many companies with more years in the market have legacy systems fulfilling essential roles for decades. Therefore, when companies seek to modernize their processes, the correct approach is to do it incrementally, with decoupled services deployed in a hybrid cloud: with cloud and on-premise components working together.
One of the Amazon Web Services services we like most at Kranio and in which we are experts, is Step Functions. It consists of a state machine, very similar to a flowchart with sequential inputs and outputs where each output depends on each input.
Each step is a Lambda function, that is, serverless code that only runs when needed. AWS provides the runtime and we do not have to manage any type of server.
Use case
A case that helps us understand how to apply Step Functions is creating sequential records in multiple tables of an on-premise DB from a cloud application, through a REST API with an event-based architecture.
This case can be summarized in a diagram like this:
Data payload: these are the data we need to register in the DB.
CloudWatch Events: Also called Event Bridge, these are events that allow triggering AWS services, in this case, the State Machine.
API Gateway: It is the AWS service that allows creating, publishing, maintaining, and monitoring REST, HTTP, or WebSocket APIs.
A relational database.
Advantages
The advantages of orchestrating on-premise from the cloud are:
Reuse of existing components without leaving them aside
The solution is decoupled, so each action to be performed has its own development, facilitating maintenance, error identification, etc.,
If business requirements change, we know what happened and what needs to be modified or between which steps a new state should be added.
In case changes are required, the impact on on-premise systems is mitigated since orchestration is in the cloud.
With serverless alternatives, it is not necessary to manage servers or their operating systems.
They are low-cost solutions. If you want to learn more, check the prices for using Lambda, API Gateway, SNS, and CloudWatch Events.
And now what?
You already know the theory about orchestrating a data flow. Now we will show you the considerations and steps you must take into account to put it into practice.
Development
The resources to use are:
An Amazon Web Services account and AWS CLI configured as in this link
Python +3.7
The Serverless framework (learn here how to set it up)
Since it is an orchestration, we will need to identify the sequential steps we want to orchestrate. And since orchestration is for automation, the flow must also start automatically.
For this, we will base ourselves on the use case presented above, and we will assume that the DB we write to corresponds to one of the components of a company's CRM, that is, one of the technologies used to manage the customer base.
We will create an event-based solution, starting the flow with the reception of a message originating from some source (such as a web form).
After the event is received, its content (payload) must be sent via POST to an endpoint to enter the database. This DB can be cloud or on-premise and the endpoint must have a backend that can perform limited operations on the DB.
To facilitate the deployment of what must be developed, the Serverless framework is used, which allows us to develop and deploy.
The project will be divided into 3 parts:
Name
Description
API Gateway
An API in API Gateway that will be responsible for creating records in the DB
Infrastructure
Here we will simulate an on-premise DB and create an Event Bus in Event Bridge to initialize the flow
Orchestration
The Step Functions code
Then these projects are deployed in the order infrastructure >> step-functions >> api-gateway.
It can be the same directory, where we dedicate 3 folders. The structure would be as follows:
├──api-gateway
│ ├── scripts-database
│ │ ├── db.py
│ │ └── script.py
│ ├── libs
│ │ └── api_responses.py
│ ├── serverless.yml
│ └── service
│ ├── create_benefit_back.py
│ ├── create_client_back.py
│ └──create_partner_back.py
├──infrastructure
│ └── serverless.yml
└── step-functions
├── functions
│ ├── catch_errors.py
│ ├── create_benefit.py
│ ├── create_client.py
│ ├── create_partner.py
│ └── receive_lead.py
├── serverless.yml
└── services
└──crm_service.py
Talk is cheap. Show me the code.
And with this famous phrase by Linus Torvalds, we will see the essential code of the project we are creating. You can see the details here.
Backend
The previous endpoints are useless if they do not have a backend. To link each endpoint with a backend, Lambda functions must be created that write to the database the parameters the endpoint receives. Once the Lambda functions are created, we enter their ARN in the “uri” parameter inside “x-amazon-apigateway-integration“.
A key aspect of Lambda functions is that they consist of a main method called handler that receives 2 parameters: message and context. Message is the input payload, and Context contains data about the function invocation and execution itself. All Lambda functions must receive an input and generate an output. You can learn more here.
The functions for each endpoint are very similar and only vary in the data the function needs to write to the corresponding table.
Function: createClient
Role: creates a record in the CLIENTS table of our DB
Role: creates a record in the PARTNER table of our DB
def handler(message, context):
try:
crm_service = CRMService(message)
crm_service.create_partner()
return message
except Exception as e:
print(e)
return e
Function: createBenefit
Role: creates a record in the BENEFIT table of our DB
def handler(message, context):
try:
crm_service = CRMService(message)
r = crm_service.create_benefit()
return r
except Exception as e:
print(e)
return e
IaaC - Infrastructure as Code
In the serverless.yml code we declare all the resources we are defining. For deployment, AWS CLI must be properly configured and then execute the command
$ sls deploy -s {stage} -r {my AWS region}
This generates a CloudFormation stack that groups all the resources you declared. Learn more here.
In the Serverless.yml files you will see values like these:
${file(${self:provider.stage}.yml):db_pass}
These correspond to references to strings in other yml documents within the same path, pointing to a specific value. You can learn more about this way of working here.
API Gateway
For the REST API we will set up an API Gateway with a Serverless project.
The goal of the API is to receive requests from Step Functions, registering data in the database.
API Gateway will allow us to expose endpoints to which methods can be applied. In this project, we will only create POST methods.
We will show you the essentials of the project and you can see the details here.
OpenAPI Specification
An alternative to declare the API, its resources, and methods is to do it with OpenAPI. To learn more about OpenAPI, read this article we made about it.
This file is read by the API Gateway service and generates the API.
Important: if we want to create an API Gateway, it is necessary to add to OpenAPI an extension with information that only AWS can interpret. For example: the create_client endpoint that we call via POST receives a request body that a specific backend must process. That backend is a Lambda. The relationship between the endpoint and the Lambda function is declared in this extension. You can learn more about this here.
When deploying the project, Api Gateway will interpret this file and create this in your AWS console:
To find out the URL of the deployed API, you must go to the Stages menu. The stage is a state of your API at a given moment (tip: you can have as many stages as you want with different versions of your API). Here you can indicate an abbreviation for the environment you are working in (dev, qa, prd), you can indicate the version of the API you are making (v1, v2), or indicate that it corresponds to a test version (test).
In the Api Gateway console, we indicated that we would do a deploy with the stage name “dev”, so when you go to Stage you will see something like this:
You can find out the URL of each endpoint by clicking on the listed names. This is how the create_client endpoint looks:
Infrastructure
Here we will create the relational database and the Event Bridge event bus.
For now, the DB will be in the AWS cloud, but it could be a database in your own data center or in another cloud.
The Event Bridge event bus allows us to communicate two isolated components that can even be in different architectures. Learn more about this service here
This repository is smaller than the previous one, as it only declares 2 resources.
Serverless.yml
# serverless.yml
service: webinar-iass
provider:
name: aws
runtime: nodejs12.x
stage: ${opt:stage, 'dev'}
region: {YOUR-REGION-HERE}
versionFunctions: false
deploymentBucket:
# here you must put the name of an S3 bucket you determine for deployment. Example: my_project_serverless_s3
# if you put the same in each serverless file, your deploy per project will be organized and within the same bucket.
name: kranio-webinar
resources:
- Resources:
WebinarMariaDB:
Type: AWS::RDS::DBInstance
Properties:
DBName: WebinarMariaDB
AllocatedStorage: '5'
Engine: MariaDB
DBInstanceClass: db.t2.micro
# here goes the database user and password. self:provider.stage makes it take the environment parameter from the deploy
# and opens the file with that name (e.g. dev.yml). This way you can parameterize your values.
MasterUsername: ${file(${self:provider.stage}.yml):db_user}
MasterUserPassword: ${file(${self:provider.stage}.yml):db_pass}
DeletionPolicy: Snapshot
# here we declare the Event Bridge event bus. To write here.
WebinarEventBus:
Type: AWS::Events::EventBus
Properties:
Name: WebinarEventBus
You need to create the following tables in your DB. You can guide yourself with these scripts for the database here
CLIENTS
name
VARCHAR(25)
lastname
VARCHAR(25)
rut
VARCHAR(25)
mail
VARCHAR(25)
phone
VARCHAR(25)
PARTNERS
rut
VARCHAR(25)
store
VARCHAR(25)
BENEFIT
rut
VARCHAR(25)
wantsBenefit
BOOL
With these steps, we conclude the creation of the infrastructure.
Step Functions
Each “step” of the State Machine we will create is a Lambda function. Unlike the Lambdas I mentioned in the Api Gateway section that have the role of writing to the DB, these make requests to the Api Gateway endpoints.
According to the architecture indicated above based on a sequential flow, the State Machine should have these steps:
Receive data from a source (e.g. web form) through the Event Bridge event.
Take the event data, build a payload with the name, last name, phone, branch, and rut to the create_client endpoint so the backend writes it to the CLIENTS table
Take the event data, build a payload with the rut and branch to the create_partner endpoint so the backend writes it to the PARTNERS table
Take the event data, build a payload with the rut and wantsBenefit to the create_benefit endpoint so the backend writes it to the BENEFITS table
You can create an additional Lambda where the flow arrives in case of an execution error (example: the endpoint is down). In this project, it is called catch_errors.
Therefore, one Lambda is made per action of each step.
Function: receive_lead
Role: receives the Event Bridge event. It cleans it and passes it to the next Step. This step is important because when an event is received, it arrives in a JSON document with attributes defined by Amazon, and the event content (the form JSON) is nested inside an attribute called “detail”.
When a source sends you an event through Event Bridge, the payload looks like this:
Role: Receives in message the content from the previous step's Lambda. Takes the content and passes it as an argument to the CRMService class instance.
def handler(message, context):
try:
crm_service = CRMService(message)
crm_service.create_client()
return message
except Exception as e:
print ("Exception: ", e)
return e
In the CRMService class, we declare the methods that will make the requests according to the endpoint. In this example, the request is to the create_client endpoint. For API calls, the Python Requests library was used:
class CRMService:
def __init__(self, payload):
self.payload = payload
def create_client(self):
try:
r = requests.post(url=URL+CREATE_CLIENT, data=json.dumps(self.payload))
# if the response code is different from 200 you can return an exception.
if r.status_code != 200:
return Exception (r.text)
return json.loads(r.text)
except Exception as e:
return e
The Lambda functions for create_partner and create_benefit are similar to create_client, with the difference that they call the corresponding endpoints. You can review case by case in this part of the repository.
Function: catch_error.py
Role: takes the errors that occur and can return them to diagnose what might have happened. It is a Lambda function like any other, so it also has a handler, context, and returns a json.
Then we declare the Serverless.yml of this project
service: webinar-step-functions
# frameworkVersion: '2.3.0'
provider:
name: aws
runtime: python3.7
stage: ${opt:stage, 'dev'}
region: {YOUR-REGION-HERE}
prefix: ${self:service}-${self:provider.stage}
versionFunctions: false
deploymentBucket:
# here you must put the name of an S3 bucket you determine for deployment. Example: my_project_serverless_s3
# if you put the same in each serverless file, your deploy per project will be organized and within the same bucket.
name: kranio-webinar
package:
excludeDevDependencies: false
custom:
prefix: '${self:service}-${self:provider.stage}'
# arn_prefix is a string that would be repeated many times if not parameterized. in dev.yml you can see what it contains.
# according to your environment variables, you should make a document if necessary.
arn_prefix: '${file(${self:provider.stage}.yml):arn_prefix}'
defaultErrorHandler:
ErrorEquals: ["States.ALL"]
Next: CatchError
# declaration of all lambda functions
functions:
receiveLead:
handler: functions/receive_lead.handler
createClient:
handler: functions/create_client.handler
createPartner:
handler: functions/create_partner.handler
createBenefit:
handler: functions/create_benefit.handler
catchErrors:
handler: functions/catch_errors.handler
# the step functions
stepFunctions:
stateMachines:
CreateClientCRM:
# here we indicate that the state machine starts when
# the event eventBridge occurs that has that eventBusName and
# specific source
name: CreateClientCRM
events:
- eventBridge:
eventBusName: WebinarEventBus
event:
source:
- "kranio.event.crm"
definition:
# here we indicate that the state machine starts with this step.
Comment: starts client registration process
StartAt: ReceiveLead
States:
# now we indicate the states.
# type indicates that these steps are tasks.
# resource indicates the arn of the lambda function that
# must be executed in this step
# next indicates the next step
# catch indicates which function we call if an error occurs, in this case, catch_error.
ReceiveLead:
Type: Task
Resource: '${self:custom.arn_prefix}-receiveLead'
Next: CreateClient
Catch:
- ${self:custom.defaultErrorHandler}
CreateClient:
Type: Task
Resource: '${self:custom.arn_prefix}-createClient'
Next: CreatePartner
Catch:
- ${self:custom.defaultErrorHandler}
CreatePartner:
Type: Task
Resource: '${self:custom.arn_prefix}-createPartner'
Next: CreateBenefit
Catch:
- ${self:custom.defaultErrorHandler}
CreateBenefit:
Type: Task
Resource: '${self:custom.arn_prefix}-createBenefit'
Catch:
- ${self:custom.defaultErrorHandler}
End: true
CatchError:
Type: Task
Resource: '${self:custom.arn_prefix}-catchErrors'
End: true
Now we have the Lambda functions for each step of the State Machine, we have the API that writes to the DB, and we have the exposed endpoint to make requests.
Sending a message to the Event Bridge event bus
For all this to start interacting, it is necessary to send the event that initializes the flow.
Assuming you are working in your company's CRM, and you are getting the initial data from a web form, the way to write to the event bus that will initialize the flow is through the AWS SDK. Learn the languages it is available for here
If you are working with Python, the way to send the form would be this:
client = boto3.client('events')
# running this script sends an event to eventbridge
# Source and EventBusName must match what you declare in serverless.yml
response = client.put_events(
Entries=[
{
'Time': datetime.now(),
# event source
'Source': 'kranio.event.crm',
'Resources': [
'string',
],
'DetailType': 'CRM Registration ',
# payload is the form data.
'Detail': json.dumps(payload),
'EventBusName': 'WebinarEventBus'
},
]
)
Having configured everything correctly, you must go to the Step Functions service in your AWS console and see the list of sent events:
If you choose the last event, you will see the execution sequence of the Step Functions and the details of their inputs and outputs.:
Where choosing the ReceiveLead step, the input corresponds to the payload sent as an event via Event Bridge.
The real test
If you access your database (either with the terminal client or with an intuitive visual client) you will see that each data is in each of the corresponding tables.
Conclusions
Step Functions is a very powerful service if what you need is to automate a sequential flow of actions. We have now created a simple example, but it is highly scalable. Also, working with Step Functions is an invitation to decouple the requirements of the solution you need to implement, which makes it easier to identify failure points.
This type of orchestration is totally serverless, therefore, it is much more economical than developing an application that runs on a server just to fulfill this role.
It is an excellent way to experiment with a hybrid cloud, reusing and integrating applications from your data center, and interacting with cloud services.
Ready to optimize your data flows in hybrid environments?
At Kranio, we have experts in system integration and serverless solutions who will help you orchestrate efficient processes between your on-premise systems and the cloud. Contact us and discover how we can drive your company's digital transformation.