“The world is made of events, not things.” — Carlo Rovelli
“Every company is becoming software.” — Jay Kreps
In The Order of Time, the physicist Carlo Rovelli argues that the theory of relativity compels us to view the world not as made of things or entities, but as events or processes. In the world of technology, Jay Kreps argues that the core processes that a company executes are increasingly being captured in software, and these processes consume and produce the business events that drive the company. From the viewpoint of both Rovelli and Kreps, one can view a company as made of events or processes.
The dichotomy between events and things (with state) has been noted by many. Martin Kleppmann captured it elegantly in his book Designing Data-Intensive Applications as follows:
One technique often used in Domain-Driven Development (DDD) is event sourcing, which derives application state from immutable business events. Event sourcing often involves arbitrary logic to derive the state. A more formal model would use a finite-state machine (FSM) to derive the state.
In a software architecture involving a network of FSMs, each FSM can perform state transitions when receiving events, produce events for other FSMs to consume, and persist some internal data. With FSMs, one can implement several other models, such as the following:
- Simple CRUD entities. This is the most common use-case for event sourcing.
- Function as a Service (FaaS). In this case, each FSM only has one state, and a single transition to and from that state, during which it performs the function.
- Actor model. In the actor model, actors receive and send events, but are otherwise passive when no events occur.
- Intelligent agents (IA). Intelligent agents are similar to actors in that they receive and send events, but are generally viewed as continuously active in order to achieve some goal.
In the rest of this article I’ll show how to implement a network of intelligent agents using Kafka Streams and finite-state machines.
The implementation is comprised of two parts:
- An FSM implementation that sits atop Kafka Streams, called a KMachine. A KMachine definition is comprised of a set of states, a set of state transitions, some internal data, and a set of functions that can be attached to state transitions. The entire KMachine definition can be expressed in YAML, in which the functions are written as JavaScript.
- A REST-based web application that can be used to create and manage both KMachine definitions and instances. A KMachine instance is created for each unique key in the input stream.
To demonstrate how KMachine can be used to implement a network of intelligent agents, I’ve borrowed an example from “Programming Game AI By Example,” by Mat Buckland. In this example, two intelligent agents inhabit a gaming environment that represents a miners’ town in the Wild West. One agent is a gold miner, and the other agent is the miner’s wife.
As a preview, here is sample output of the interaction between the miner and his wife:
Miner Bob: Walkin' to the goldmine Miner Bob: Pickin' up a nugget Elsa: Makin' the bed Miner Bob: Pickin' up a nugget Elsa: Makin' the bed Miner Bob: Pickin' up a nugget Miner Bob: Ah'm leavin' the goldmine with mah pockets full o' sweet gold Miner Bob: Goin' to the bank. Yes siree Miner Bob: Depositing gold. Total savings now: 3 Miner Bob: Leavin' the bank Miner Bob: Walkin' to the goldmine Miner Bob: Pickin' up a nugget Elsa: Washin' the dishes Miner Bob: Pickin' up a nugget Elsa: Moppin' the floor Miner Bob: Pickin' up a nugget Miner Bob: Ah'm leavin' the goldmine with mah pockets full o' sweet gold Miner Bob: Goin' to the bank. Yes siree Miner Bob: Depositing gold. Total savings now: 6 Miner Bob: WooHoo! Rich enough for now. Back home to mah li'lle lady Miner Bob: Leavin' the bank Miner Bob: Walkin' home Elsa: Hi honey. Let me make you some of mah fine country stew Miner Bob: ZZZZ... Elsa: Putting the stew in the oven Elsa: Fussin' over food Miner Bob: ZZZZ... Elsa: Fussin' over food Elsa: Puttin' the stew on the table Elsa: StewReady! Lets eat Miner Bob: All mah fatigue has drained away. Time to find more gold! Elsa: Time to do some more housework! Miner Bob: Walkin' to the goldmine
Both the miner and his wife are implemented as separate KMachine definitions. Here is the KMachine definition that represents the miner:
name: miner input: miner init: goHomeAndSleepTilRested states: - name: enterMineAndDigForNugget onEntry: enterMineAction onExit: exitMineAction - name: visitBankAndDepositGold onEntry: enterBankAction onExit: exitBankAction - name: goHomeAndSleepTilRested onEntry: enterHomeAction onExit: exitHomeAction - name: quenchThirst onEntry: enterSaloonAction onExit: exitSaloonAction - name: eatStew onEntry: startEatingAction onExit: finishEatingAction transitions: - type: stayInMine from: enterMineAndDigForNugget to: guard: onTransition: stayInMineAction - type: visitBank from: enterMineAndDigForNugget to: visitBankAndDepositGold guard: onTransition: - type: quenchThirst from: enterMineAndDigForNugget to: quenchThirst guard: onTransition: - type: goHome from: visitBankAndDepositGold to: goHomeAndSleepTilRested guard: onTransition: - type: enterMine from: visitBankAndDepositGold to: enterMineAndDigForNugget guard: onTransition: - type: enterMine from: goHomeAndSleepTilRested to: enterMineAndDigForNugget guard: onTransition: - type: enterMine from: quenchThirst to: enterMineAndDigForNugget guard: onTransition: - type: stayHome from: goHomeAndSleepTilRested to: guard: onTransition: stayHomeAction - type: stewReady from: goHomeAndSleepTilRested to: eatStew guard: onTransition: imComingAction - type: finishEating from: eatStew to: goHomeAndSleepTilRested guard: onTransition: data: location: shack goldCarried: 0 moneyInBank: 0 thirst: 0 fatigue: 0 functions: enterMineAction: >- (ctx, key, value, data) => { if (data.location != 'goldMine') { console.log("Miner " + key + ": Walkin' to the goldmine"); data.location = 'goldMine'; } ctx.sendMessage(ctx.topic(), key, { type: 'stayInMine' }, 0); } stayInMineAction: >- (ctx, key, value, data) => { data.goldCarried++; data.fatigue++; console.log("Miner " + key + ": Pickin' up a nugget"); if (data.goldCarried >= 3) { ctx.sendMessage(ctx.topic(), key, { type: 'visitBank' }, 0); } else if (data.thirst >= 5) { ctx.sendMessage(ctx.topic(), key, { type: 'quenchThirst' }, 0); } else { ctx.sendMessage(ctx.topic(), key, { type: 'stayInMine' }, 1000); } } exitMineAction: >- (ctx, key, value, data) => { console.log("Miner " + key + ": Ah'm leavin' the goldmine with mah pockets full o' sweet gold"); } enterBankAction: >- (ctx, key, value, data) => { console.log("Miner " + key + ": Goin' to the bank. Yes siree"); data.location = 'bank'; data.moneyInBank += data.goldCarried; data.goldCarried = 0; console.log("Miner " + key + ": Depositing gold. Total savings now: " + data.moneyInBank); if (data.moneyInBank >= 5) { console.log("Miner " + key + ": WooHoo! Rich enough for now. Back home to mah li'lle lady"); ctx.sendMessage(ctx.topic(), key, { type: 'goHome' }, 0); } else { ctx.sendMessage(ctx.topic(), key, { type: 'enterMine' }, 0); } } exitBankAction: >- (ctx, key, value, data) => { console.log("Miner " + key + ": Leavin' the bank"); } enterHomeAction: >- (ctx, key, value, data) => { if (data.location != 'shack') { console.log("Miner " + key + ": Walkin' home"); data.location = 'shack'; if (data.wife) { ctx.sendMessage('miners_wife', data.wife, { type: 'hiHoneyImHome' }, 0); } } ctx.sendMessage(ctx.topic(), key, { type: 'stayHome' }, 0); } stayHomeAction: >- (ctx, key, value, data) => { if (value.wife) { data.wife = value.wife; } if (data.fatigue < 5) { console.log("Miner " + key + ": All mah fatigue has drained away. Time to find more gold!"); data.location = 'shack'; ctx.sendMessage(ctx.topic(), key, { type: 'enterMine' }, 0); } else { data.fatigue--; console.log("Miner " + key + ": ZZZZ... "); ctx.sendMessage(ctx.topic(), key, { type: 'stayHome' }, 1000); } } exitHomeAction: >- (ctx, key, value, data) => { } enterSaloonAction: >- (ctx, key, value, data) => { if (data.moneyInBank >= 2) { data.thirst = 0; data.moneyInBank -= 2; console.log("Miner " + key + ": That's mighty fine sippin liquer"); } ctx.sendMessage(ctx.topic(), key, { type: 'enterMine' }, 0); } exitSaloonAction: >- (ctx, key, value, data) => { console.log("Miner " + key + ": Leavin' the saloon, feelin' good"); } imComingAction: >- (ctx, key, value, data) => { console.log("Miner " + key + ": Okay Hun, ahm a comin'!"); } startEatingAction: >- (ctx, key, value, data) => { console.log("Miner " + key + ": Smells Reaaal goood Elsa!"); console.log("Miner " + key + ": Tastes real good too!"); ctx.sendMessage(ctx.topic(), key, { type: 'finishEating' }, 0); } finishEatingAction: >- (ctx, key, value, data) => { console.log("Miner " + key + ": Thankya li'lle lady. Ah better get back to whatever ah wuz doin'"); }
The state transition diagram for the miner, generated using the DOT graph language, is shown below.
Next is the KMachine definition that represents the miner’s wife:
name: minersWife input: miners_wife init: doHouseWork states: - name: doHouseWork onEntry: startHouseWorkAction onExit: - name: visitBathroom onEntry: enterBathroomAction onExit: exitBathroomAction - name: cookStew onEntry: startCookingAction onExit: finishCookingAction transitions: - type: continueHouseWork from: doHouseWork to: guard: onTransition: continueHouseWorkAction - type: natureCalls from: doHouseWork to: visitBathroom guard: onTransition: - type: natureCalls from: cookStew to: visitBathroom guard: onTransition: - type: continuePrevious from: visitBathroom to: revertToPreviousState toType: Function guard: onTransition: - type: hiHoneyImHome from: doHouseWork to: cookStew guard: onTransition: hiHoneyAction - type: hiHoneyImHome from: visitBathroom to: cookStew guard: onTransition: hiHoneyAction - type: continueCooking from: cookStew to: guard: onTransition: continueCookingAction - type: stewReady from: cookStew to: doHouseWork guard: onTransition: letsEatAction data: location: shack cooking: false functions: startHouseWorkAction: >- (ctx, key, value, data) => { console.log(key + ": Time to do some more housework!"); ctx.sendMessage(ctx.topic(), key, { type: 'continueHouseWork' }, 0); } continueHouseWorkAction: >- (ctx, key, value, data) => { if (value.husband) { data.husband = value.husband; } switch (Math.floor(Math.random() * 3)) { case 0: console.log(key + ": Moppin' the floor"); break; case 1: console.log(key + ": Washin' the dishes"); break; case 2: console.log(key + ": Makin' the bed"); break; } if (Math.random() < 0.1) { ctx.sendMessage(ctx.topic(), key, { type: 'natureCalls' }, 0); } else { ctx.sendMessage(ctx.topic(), key, { type: 'continueHouseWork' }, 1000); } } enterBathroomAction: >- (ctx, key, value, data) => { console.log(key + ": Walkin' to the can. Need to powda mah pretty li'lle nose"); console.log(key + ": Ahhhhhh! Sweet relief!"); ctx.sendMessage(ctx.topic(), key, { type: 'continuePrevious' }, 0); } exitBathroomAction: >- (ctx, key, value, data) => { console.log(key + ": Leavin' the Jon"); } revertToPreviousState: >- (ctx, key, value, data) => { return data.cooking ? 'cookStew' : 'doHouseWork' } hiHoneyAction: >- (ctx, key, value, data) => { console.log(key + ": Hi honey. Let me make you some of mah fine country stew"); } startCookingAction: >- (ctx, key, value, data) => { if (!data.cooking) { console.log(key + ": Putting the stew in the oven"); ctx.sendMessage(ctx.topic(), key, { type: 'stewReady' }, 2000); data.cooking = true; } ctx.sendMessage(ctx.topic(), key, { type: 'continueCooking' }, 0); } continueCookingAction: >- (ctx, key, value, data) => { console.log(key + ": Fussin' over food"); if (Math.random() < 0.1) { ctx.sendMessage(ctx.topic(), key, { type: 'natureCalls' }, 0); } else { ctx.sendMessage(ctx.topic(), key, { type: 'continueCooking' }, 1000); } } finishCookingAction: >- (ctx, key, value, data) => { console.log(key + ": Puttin' the stew on the table"); } letsEatAction: >- (ctx, key, value, data) => { console.log(key + ": StewReady! Lets eat"); if (data.husband) { ctx.sendMessage('miner', data.husband, { type: 'stewReady' }, 0); } data.cooking = false; }
The state transition diagram for the miner’s wife is shown below.
Each KMachine definition, for both the miner and his wife, is entirely contained in the YAML above. A KMachine definition describes an FSM as follows:
name
– The id of the definition for the FSM.input
– The input topic.init
– The initial state of the FSM.states
– The states of the FSM. Each state can have the following:name
– The name of the state.onEntry
– The function to invoke on entry to the state.onExit
– The function to invoke on exit of the state.
transitions
– The state transitions. Each transition can have the following:type
– The event type that triggers the transition.from
– The source state.to
– The destination, which can be- The name of the destination state.
- The function to determine the destination state.
null
, which represents an internal state transition, where the state does not change, theonEntry
oronExit
functions are not invoked, but theguard
andonTransition
functions are invoked.
toType
– Either “State” or “Function”.guard
– A boolean function to indicate whether the transition should occur.onTransition
– The function to invoke on transition.
data
– A set of key-value pairs that represent the internal data for the FSM.functions
– A set of JavaScript functions that can be attached to states and transitions. Each function takes the following parameters:ctx
– A context object that provides the following methods:topic()
– The input topic.sendMessage()
– Sends a message to a topic.
key
– The key of the event, as a JSON message.value
– The value of the event, as a JSON message. The value is expected to have a property named “type” to trigger state transitions.data
– The local data, which can be mutated.
To see the miner and his wife in action, you’ll first need to start a local instance of Kafka. Then create two topics, one for the miner and one for his wife.
./bin/kafka-topics --create --topic miner --bootstrap-server localhost:9092 ./bin/kafka-topics --create --topic miners_wife --bootstrap-server localhost:9092
Next, clone the project at https://github.com/rayokota/kmachines and start up the web application.
git clone https://github.com/rayokota/kmachines.git cd kmachines mvn clean install -DskipTests mvn -pl kmachines-rest-app compile quarkus:dev -Dquarkus.http.port=8081
In a separate window, create the KMachine definitions for the miner and his wife.
cd kmachines curl -X POST -H "Content-Type: text/yaml" --data-binary @kmachines-rest-app/src/test/resources/miner_messaging.yml "http://localhost:8081/kmachines" curl -X POST -H "Content-Type: text/yaml" --data-binary @kmachines-rest-app/src/test/resources/miners_wife_messaging.yml "http://localhost:8081/kmachines"
Now produce an event to create a KMachine instance for a miner named “Bob”, and another event to create a KMachine instance for his wife named “Elsa”. This can be done with kafkacat
, for example. Events are represented as a pair of JSON messages for the key and value. The key corresponds to a unique KMachine instance, while the value is the message used to trigger state transitions, which must have a “type” property to indicate the event type. In the command below, a dot (.) is used to separate the key from the value (using the -K
option of kafkacat
).
echo '"Bob".{ "type": "stayHome", "wife": "Elsa" }' | kafkacat -b localhost:9092 -K . -P -t miner echo '"Elsa".{ "type": "continueHouseWork", "husband": "Bob" }' | kafkacat -b localhost:9092 -K . -P -t miners_wife
You should see the miner and his wife interacting as above. You can query the state of the FSM for the miner or the wife at any time.
curl -X POST -H "Content-Type: application/json" http://localhost:8081/kmachines/miner/state --data '"Bob"' curl -X POST -H "Content-Type: application/json" http://localhost:8081/kmachines/minersWife/state --data '"Elsa"'
To stop the agents, run the following commands.
curl -X DELETE "http://localhost:8081/kmachines/minersWife" curl -X DELETE "http://localhost:8081/kmachines/miner"
That’s it!