“The world is made of events, not things.” — Carlo Rovelli
“Every company is becoming software.” — Jay Kreps
In The Order of Time, the physicist Carlo Rovelli argues that the theory of relativity compels us to view the world not as made of things or entities, but as events or processes. In the world of technology, Jay Kreps argues that the core processes that a company executes are increasingly being captured in software, and these processes consume and produce the business events that drive the company. From the viewpoint of both Rovelli and Kreps, one can view a company as made of events or processes.
The dichotomy between events and things (with state) has been noted by many. Martin Kleppmann captured it elegantly in his book Designing Data-Intensive Applications as follows:
One technique often used in Domain-Driven Development (DDD) is event sourcing, which derives application state from immutable business events. Event sourcing often involves arbitrary logic to derive the state. A more formal model would use a finite-state machine (FSM) to derive the state.
In a software architecture involving a network of FSMs, each FSM can perform state transitions when receiving events, produce events for other FSMs to consume, and persist some internal data. With FSMs, one can implement several other models, such as the following:
- Simple CRUD entities. This is the most common use-case for event sourcing.
- Function as a Service (FaaS). In this case, each FSM only has one state, and a single transition to and from that state, during which it performs the function.
- Actor model. In the actor model, actors receive and send events, but are otherwise passive when no events occur.
- Intelligent agents (IA). Intelligent agents are similar to actors in that they receive and send events, but are generally viewed as continuously active in order to achieve some goal.
In the rest of this article I’ll show how to implement a network of intelligent agents using Kafka Streams and finite-state machines.
The implementation is comprised of two parts:
- An FSM implementation that sits atop Kafka Streams, called a KMachine. A KMachine definition is comprised of a set of states, a set of state transitions, some internal data, and a set of functions that can be attached to state transitions. The entire KMachine definition can be expressed in YAML, in which the functions are written as JavaScript.
- A REST-based web application that can be used to create and manage both KMachine definitions and instances. A KMachine instance is created for each unique key in the input stream.
To demonstrate how KMachine can be used to implement a network of intelligent agents, I’ve borrowed an example from “Programming Game AI By Example,” by Mat Buckland. In this example, two intelligent agents inhabit a gaming environment that represents a miners’ town in the Wild West. One agent is a gold miner, and the other agent is the miner’s wife.
As a preview, here is sample output of the interaction between the miner and his wife:
Miner Bob: Walkin' to the goldmine Miner Bob: Pickin' up a nugget Elsa: Makin' the bed Miner Bob: Pickin' up a nugget Elsa: Makin' the bed Miner Bob: Pickin' up a nugget Miner Bob: Ah'm leavin' the goldmine with mah pockets full o' sweet gold Miner Bob: Goin' to the bank. Yes siree Miner Bob: Depositing gold. Total savings now: 3 Miner Bob: Leavin' the bank Miner Bob: Walkin' to the goldmine Miner Bob: Pickin' up a nugget Elsa: Washin' the dishes Miner Bob: Pickin' up a nugget Elsa: Moppin' the floor Miner Bob: Pickin' up a nugget Miner Bob: Ah'm leavin' the goldmine with mah pockets full o' sweet gold Miner Bob: Goin' to the bank. Yes siree Miner Bob: Depositing gold. Total savings now: 6 Miner Bob: WooHoo! Rich enough for now. Back home to mah li'lle lady Miner Bob: Leavin' the bank Miner Bob: Walkin' home Elsa: Hi honey. Let me make you some of mah fine country stew Miner Bob: ZZZZ... Elsa: Putting the stew in the oven Elsa: Fussin' over food Miner Bob: ZZZZ... Elsa: Fussin' over food Elsa: Puttin' the stew on the table Elsa: StewReady! Lets eat Miner Bob: All mah fatigue has drained away. Time to find more gold! Elsa: Time to do some more housework! Miner Bob: Walkin' to the goldmine
Both the miner and his wife are implemented as separate KMachine definitions. Here is the KMachine definition that represents the miner:
name: miner
input: miner
init: goHomeAndSleepTilRested
states:
- name: enterMineAndDigForNugget
onEntry: enterMineAction
onExit: exitMineAction
- name: visitBankAndDepositGold
onEntry: enterBankAction
onExit: exitBankAction
- name: goHomeAndSleepTilRested
onEntry: enterHomeAction
onExit: exitHomeAction
- name: quenchThirst
onEntry: enterSaloonAction
onExit: exitSaloonAction
- name: eatStew
onEntry: startEatingAction
onExit: finishEatingAction
transitions:
- type: stayInMine
from: enterMineAndDigForNugget
to:
guard:
onTransition: stayInMineAction
- type: visitBank
from: enterMineAndDigForNugget
to: visitBankAndDepositGold
guard:
onTransition:
- type: quenchThirst
from: enterMineAndDigForNugget
to: quenchThirst
guard:
onTransition:
- type: goHome
from: visitBankAndDepositGold
to: goHomeAndSleepTilRested
guard:
onTransition:
- type: enterMine
from: visitBankAndDepositGold
to: enterMineAndDigForNugget
guard:
onTransition:
- type: enterMine
from: goHomeAndSleepTilRested
to: enterMineAndDigForNugget
guard:
onTransition:
- type: enterMine
from: quenchThirst
to: enterMineAndDigForNugget
guard:
onTransition:
- type: stayHome
from: goHomeAndSleepTilRested
to:
guard:
onTransition: stayHomeAction
- type: stewReady
from: goHomeAndSleepTilRested
to: eatStew
guard:
onTransition: imComingAction
- type: finishEating
from: eatStew
to: goHomeAndSleepTilRested
guard:
onTransition:
data:
location: shack
goldCarried: 0
moneyInBank: 0
thirst: 0
fatigue: 0
functions:
enterMineAction: >-
(ctx, key, value, data) => {
if (data.location != 'goldMine') {
console.log("Miner " + key + ": Walkin' to the goldmine");
data.location = 'goldMine';
}
ctx.sendMessage(ctx.topic(), key, { type: 'stayInMine' }, 0);
}
stayInMineAction: >-
(ctx, key, value, data) => {
data.goldCarried++;
data.fatigue++;
console.log("Miner " + key + ": Pickin' up a nugget");
if (data.goldCarried >= 3) {
ctx.sendMessage(ctx.topic(), key, { type: 'visitBank' }, 0);
} else if (data.thirst >= 5) {
ctx.sendMessage(ctx.topic(), key, { type: 'quenchThirst' }, 0);
} else {
ctx.sendMessage(ctx.topic(), key, { type: 'stayInMine' }, 1000);
}
}
exitMineAction: >-
(ctx, key, value, data) => {
console.log("Miner " + key + ": Ah'm leavin' the goldmine with mah pockets full o' sweet gold");
}
enterBankAction: >-
(ctx, key, value, data) => {
console.log("Miner " + key + ": Goin' to the bank. Yes siree");
data.location = 'bank';
data.moneyInBank += data.goldCarried;
data.goldCarried = 0;
console.log("Miner " + key + ": Depositing gold. Total savings now: " + data.moneyInBank);
if (data.moneyInBank >= 5) {
console.log("Miner " + key + ": WooHoo! Rich enough for now. Back home to mah li'lle lady");
ctx.sendMessage(ctx.topic(), key, { type: 'goHome' }, 0);
} else {
ctx.sendMessage(ctx.topic(), key, { type: 'enterMine' }, 0);
}
}
exitBankAction: >-
(ctx, key, value, data) => {
console.log("Miner " + key + ": Leavin' the bank");
}
enterHomeAction: >-
(ctx, key, value, data) => {
if (data.location != 'shack') {
console.log("Miner " + key + ": Walkin' home");
data.location = 'shack';
if (data.wife) {
ctx.sendMessage('miners_wife', data.wife, { type: 'hiHoneyImHome' }, 0);
}
}
ctx.sendMessage(ctx.topic(), key, { type: 'stayHome' }, 0);
}
stayHomeAction: >-
(ctx, key, value, data) => {
if (value.wife) {
data.wife = value.wife;
}
if (data.fatigue < 5) {
console.log("Miner " + key + ": All mah fatigue has drained away. Time to find more gold!");
data.location = 'shack';
ctx.sendMessage(ctx.topic(), key, { type: 'enterMine' }, 0);
} else {
data.fatigue--;
console.log("Miner " + key + ": ZZZZ... ");
ctx.sendMessage(ctx.topic(), key, { type: 'stayHome' }, 1000);
}
}
exitHomeAction: >-
(ctx, key, value, data) => {
}
enterSaloonAction: >-
(ctx, key, value, data) => {
if (data.moneyInBank >= 2) {
data.thirst = 0;
data.moneyInBank -= 2;
console.log("Miner " + key + ": That's mighty fine sippin liquer");
}
ctx.sendMessage(ctx.topic(), key, { type: 'enterMine' }, 0);
}
exitSaloonAction: >-
(ctx, key, value, data) => {
console.log("Miner " + key + ": Leavin' the saloon, feelin' good");
}
imComingAction: >-
(ctx, key, value, data) => {
console.log("Miner " + key + ": Okay Hun, ahm a comin'!");
}
startEatingAction: >-
(ctx, key, value, data) => {
console.log("Miner " + key + ": Smells Reaaal goood Elsa!");
console.log("Miner " + key + ": Tastes real good too!");
ctx.sendMessage(ctx.topic(), key, { type: 'finishEating' }, 0);
}
finishEatingAction: >-
(ctx, key, value, data) => {
console.log("Miner " + key + ": Thankya li'lle lady. Ah better get back to whatever ah wuz doin'");
}
The state transition diagram for the miner, generated using the DOT graph language, is shown below.
Next is the KMachine definition that represents the miner’s wife:
name: minersWife
input: miners_wife
init: doHouseWork
states:
- name: doHouseWork
onEntry: startHouseWorkAction
onExit:
- name: visitBathroom
onEntry: enterBathroomAction
onExit: exitBathroomAction
- name: cookStew
onEntry: startCookingAction
onExit: finishCookingAction
transitions:
- type: continueHouseWork
from: doHouseWork
to:
guard:
onTransition: continueHouseWorkAction
- type: natureCalls
from: doHouseWork
to: visitBathroom
guard:
onTransition:
- type: natureCalls
from: cookStew
to: visitBathroom
guard:
onTransition:
- type: continuePrevious
from: visitBathroom
to: revertToPreviousState
toType: Function
guard:
onTransition:
- type: hiHoneyImHome
from: doHouseWork
to: cookStew
guard:
onTransition: hiHoneyAction
- type: hiHoneyImHome
from: visitBathroom
to: cookStew
guard:
onTransition: hiHoneyAction
- type: continueCooking
from: cookStew
to:
guard:
onTransition: continueCookingAction
- type: stewReady
from: cookStew
to: doHouseWork
guard:
onTransition: letsEatAction
data:
location: shack
cooking: false
functions:
startHouseWorkAction: >-
(ctx, key, value, data) => {
console.log(key + ": Time to do some more housework!");
ctx.sendMessage(ctx.topic(), key, { type: 'continueHouseWork' }, 0);
}
continueHouseWorkAction: >-
(ctx, key, value, data) => {
if (value.husband) {
data.husband = value.husband;
}
switch (Math.floor(Math.random() * 3)) {
case 0:
console.log(key + ": Moppin' the floor");
break;
case 1:
console.log(key + ": Washin' the dishes");
break;
case 2:
console.log(key + ": Makin' the bed");
break;
}
if (Math.random() < 0.1) {
ctx.sendMessage(ctx.topic(), key, { type: 'natureCalls' }, 0);
} else {
ctx.sendMessage(ctx.topic(), key, { type: 'continueHouseWork' }, 1000);
}
}
enterBathroomAction: >-
(ctx, key, value, data) => {
console.log(key + ": Walkin' to the can. Need to powda mah pretty li'lle nose");
console.log(key + ": Ahhhhhh! Sweet relief!");
ctx.sendMessage(ctx.topic(), key, { type: 'continuePrevious' }, 0);
}
exitBathroomAction: >-
(ctx, key, value, data) => {
console.log(key + ": Leavin' the Jon");
}
revertToPreviousState: >-
(ctx, key, value, data) => {
return data.cooking ? 'cookStew' : 'doHouseWork'
}
hiHoneyAction: >-
(ctx, key, value, data) => {
console.log(key + ": Hi honey. Let me make you some of mah fine country stew");
}
startCookingAction: >-
(ctx, key, value, data) => {
if (!data.cooking) {
console.log(key + ": Putting the stew in the oven");
ctx.sendMessage(ctx.topic(), key, { type: 'stewReady' }, 2000);
data.cooking = true;
}
ctx.sendMessage(ctx.topic(), key, { type: 'continueCooking' }, 0);
}
continueCookingAction: >-
(ctx, key, value, data) => {
console.log(key + ": Fussin' over food");
if (Math.random() < 0.1) {
ctx.sendMessage(ctx.topic(), key, { type: 'natureCalls' }, 0);
} else {
ctx.sendMessage(ctx.topic(), key, { type: 'continueCooking' }, 1000);
}
}
finishCookingAction: >-
(ctx, key, value, data) => {
console.log(key + ": Puttin' the stew on the table");
}
letsEatAction: >-
(ctx, key, value, data) => {
console.log(key + ": StewReady! Lets eat");
if (data.husband) {
ctx.sendMessage('miner', data.husband, { type: 'stewReady' }, 0);
}
data.cooking = false;
}
The state transition diagram for the miner’s wife is shown below.
Each KMachine definition, for both the miner and his wife, is entirely contained in the YAML above. A KMachine definition describes an FSM as follows:
name– The id of the definition for the FSM.input– The input topic.init– The initial state of the FSM.states– The states of the FSM. Each state can have the following:name– The name of the state.onEntry– The function to invoke on entry to the state.onExit– The function to invoke on exit of the state.
transitions– The state transitions. Each transition can have the following:type– The event type that triggers the transition.from– The source state.to– The destination, which can be- The name of the destination state.
- The function to determine the destination state.
null, which represents an internal state transition, where the state does not change, theonEntryoronExitfunctions are not invoked, but theguardandonTransitionfunctions are invoked.
toType– Either “State” or “Function”.guard– A boolean function to indicate whether the transition should occur.onTransition– The function to invoke on transition.
data– A set of key-value pairs that represent the internal data for the FSM.functions– A set of JavaScript functions that can be attached to states and transitions. Each function takes the following parameters:ctx– A context object that provides the following methods:topic()– The input topic.sendMessage()– Sends a message to a topic.
key– The key of the event, as a JSON message.value– The value of the event, as a JSON message. The value is expected to have a property named “type” to trigger state transitions.data– The local data, which can be mutated.
To see the miner and his wife in action, you’ll first need to start a local instance of Kafka. Then create two topics, one for the miner and one for his wife.
./bin/kafka-topics --create --topic miner --bootstrap-server localhost:9092 ./bin/kafka-topics --create --topic miners_wife --bootstrap-server localhost:9092
Next, clone the project at https://github.com/rayokota/kmachines and start up the web application.
git clone https://github.com/rayokota/kmachines.git cd kmachines mvn clean install -DskipTests mvn -pl kmachines-rest-app compile quarkus:dev -Dquarkus.http.port=8081
In a separate window, create the KMachine definitions for the miner and his wife.
cd kmachines curl -X POST -H "Content-Type: text/yaml" --data-binary @kmachines-rest-app/src/test/resources/miner_messaging.yml "http://localhost:8081/kmachines" curl -X POST -H "Content-Type: text/yaml" --data-binary @kmachines-rest-app/src/test/resources/miners_wife_messaging.yml "http://localhost:8081/kmachines"
Now produce an event to create a KMachine instance for a miner named “Bob”, and another event to create a KMachine instance for his wife named “Elsa”. This can be done with kafkacat, for example. Events are represented as a pair of JSON messages for the key and value. The key corresponds to a unique KMachine instance, while the value is the message used to trigger state transitions, which must have a “type” property to indicate the event type. In the command below, a dot (.) is used to separate the key from the value (using the -K option of kafkacat).
echo '"Bob".{ "type": "stayHome", "wife": "Elsa" }' | kafkacat -b localhost:9092 -K . -P -t miner
echo '"Elsa".{ "type": "continueHouseWork", "husband": "Bob" }' | kafkacat -b localhost:9092 -K . -P -t miners_wife
You should see the miner and his wife interacting as above. You can query the state of the FSM for the miner or the wife at any time.
curl -X POST -H "Content-Type: application/json" http://localhost:8081/kmachines/miner/state --data '"Bob"' curl -X POST -H "Content-Type: application/json" http://localhost:8081/kmachines/minersWife/state --data '"Elsa"'
To stop the agents, run the following commands.
curl -X DELETE "http://localhost:8081/kmachines/minersWife" curl -X DELETE "http://localhost:8081/kmachines/miner"
That’s it!


