[amscotti@128bit.io ~/posts]# cat amazon-simple-workflow-services.md _
# Amazon Simple Workflow Services
- July 29, 2012 | 3 min read

I’ve been researching Amazon Simple Workflow Services. This is a product which is like Simple Queuing Services but on steroids. It allows you to develop complex workflows that you’re able to scale out each part of the workflow as you need. There are a number of used cases on their site, one that managed to catch my eye is NASA. They are using Amazon Simple Workflow to process images captured by the Mars Exploration Rovers.

I originally looked into Amazon Simple Workflow sometime ago when it first came out. At the time I was very confused with their API and I didn’t feel that their documentation was complete. Looking into it again for a project at work with different eyes I was able to understand the API a bit more then last time. I still feel that the documentation is a bit lacking along with useful API calls missing. Because of this I feel that there is a steep learning curve and converting an app to use Simple Workflow would take some time.

After my research, I presented my findings to my teammates. I wanted to show them a demo that I could use to point out the different parts of the workflow. For my demo I made a sandwich maker, this code will allow you to build a sandwich from a web interface and deliver the order to the backend. At this point the decision part of the code which monitors the entire workflow will assign certain task to workers.

In my example I have one worker that can handle all the tasks, if I wanted to scale this out I could increase the number of workers and I would be able to process more tasks. If you wanted to fine-tune this example you could break the code from one worker into multiple workers for a precise task which would allow you to scale individual tasks. This dramatically allows you to scale your application needs and also allows you to configure your system the way you want it to be laid out.

Decision Code,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
require 'rubygems'
require "aws-sdk"
require "yaml"

#To load the configuration file
CONFIG = YAML.load_file("config.yml") unless defined? CONFIG

#This is for Foreman can properly get the output.
$stdout.sync = true

AWS.config(:access_key_id => CONFIG['access_key_id'], :secret_access_key => CONFIG['secret_access_key'])
swf = AWS::SimpleWorkflow.new
domain = swf.domains[CONFIG['swf_domains']]

#Used to figure out what has happened completed.
def done_task(events)
  ids = []
    task = []
    events.each do |e|
        if e.to_h[:event_type] == 'ActivityTaskCompleted'
            ids << e.to_h[:attributes][:scheduled_event_id]
        elsif e.to_h[:event_type] == 'ActivityTaskScheduled' && ids.include?(e.to_h[:event_id])
            task << e.to_h[:attributes][:activity_type].name
        end
    end
    return task
end

#Used to get the original input sent from the Web server.
def getInput(events)
    events.each do |e|
        if e.to_h[:event_type] == 'WorkflowExecutionStarted'
            return e.to_h[:attributes][:input]
        end
    end
    return ""
end


domain.decision_tasks.poll(CONFIG['swf_task_list']) do |task|
    events_list = task.workflow_execution.events.reverse_order
    done_task_list = done_task(events_list)
    input_json = getInput(events_list)
    begin
        input = JSON.parse(input_json)
    rescue
        task.cancel_workflow_execution
        next
    end

    if !done_task_list.include?("Get-bread")
        orderText = "Starting new order for #{input['name']}, a #{input['fillings'].join(', ')} on #{input['bread']}"
        if input['spread'] != "None"
            orderText += " with #{input['spread']}"
        end
        if input['toasted'] == "true"
            orderText += " toasted"
        end
        puts orderText
        task.schedule_activity_task domain.activity_types['Get-bread', '1'], :input => input_json
    elsif input['spread'] != "None" && done_task_list.include?("Get-bread") && !done_task_list.include?("Add-spread")
        task.schedule_activity_task domain.activity_types['Add-spread', '1'], :input => input_json
    elsif !done_task_list.include?("Add-fillings")
        task.schedule_activity_task domain.activity_types['Add-fillings', '1'], :input => input_json
    elsif done_task_list.include?("Add-fillings") && !done_task_list.include?("toasted")
        if input['toasted'] == "true"
            task.schedule_activity_task domain.activity_types['toasted', '1'], :input => input_json
        else
            puts "#{input['name']}'s sandwich is made!"
            task.complete_workflow_execution
        end
    elsif done_task_list.include?("toasted")
        puts "#{input['name']}'s toasted sandwich is made!"
        task.complete_workflow_execution
    end
end

Click here to view the full project on Github.

From the code you can see that I need to build two functions of my own. I first needed a function that will allow me to know what tasks have been completed along with the other function that will allow me to see the original input from the order.

Reference Links,

If you have any questions, comments, or ways to improve this code please feel free to post in the comments.