-
Notifications
You must be signed in to change notification settings - Fork 92
Application Deployment Requirement
Command line tool csruntime take the node attribute data in JSON coded form, either on the line or as a file.
csruntime -n localhost --attr-file test_attr.json
csruntime -n localhost --attr '{"indexed_public": {"owner": {"personOrGroup": "Me"}}}'
The python functions start_node and dispatch_node takes the attributes as a python object.
The data structure is as follows:
{
"public": # Any public info stored in storage when requesting the node id
{ # TODO fomalize these values, e.g. public key
},
"private": # Any configuration of the node that is NOT in storage only kept in node
{ # TODO formalize these values, e.g. private key
},
"indexed_public": # Any public info that is also searchable by the index, also in storage with node id
# The index is prefix searchable by higher level keywords. It is OK to skip levels.
# This list is formal and is intended to be extended as needed
{
"owner": {# The node's affilation
"organization": (reversed DNS) name of organisation,
"organizationalUnit": Sub-unit name,
"role": The title of owner e.g. "Site owner", "admin",
"personOrGroup": The name of the owner(s), e.g. person name or responsible group name
},
"address": {# The node's (static) address
"country": ISO 3166-1 alpha2 coded country name string,
"stateOrProvince": ISO 3166-2 coded sub-country name string,
"locality": Name of e.g. city,
"street": Street name,
"streetNumber": String or number for street number,
"building": Some buildings have names (maybe instead of number),
"floor": String or number specifying floor of building,
"room": String or number specifying room or flat name
},
"node_name": { # The node's static easy identification
"organization": (reversed DNS) name of organisation,
"organizationalUnit": Sub-unit name,
"purpose": If specific purpose of node, e.g. test, production,
"group": Name of node group e.g. "project" name,
"name": Name of node
},
"user_extra": {# Any user specific extra attributes, as a list of list with index words, not possible to skip levels
}
}
The public indexed values can be obtained by get_index function or the corresponding control API.
To format the index search string an attribute resolver function needs to be used:
from calvin.utilities.attribute_resolver import format_index_string
format_index_string(attr_obj, trim=True)
where the attr_obj should only contain ONE attribute e.g.
{"owner": {"organization": "org.testorg", "role": "admin"}}
alternatively the attr is a tuple e.g.:
("owner", {"organization": "org.testorg", "role": "admin"})
The trim parameter when true will remove trailing empty keys instead of leaving them empty, this allows the prefix search to find nodes based only on the included higher level keys.
We want to extend the list of indexed_public
formalized index keys as they are needed, and the user_extra
is to be used scarcely.
A Calvin application (a calvin script) can have a corresponding deployment requirement specification. Currently there is no parser/compiler for the deployment script, thus the requirements have to be specified in a somewhat cumbersome JSON format. The requirements can be passed in as a file using the cscontrol
command line and the --reqs
option:
cscontrol <control uri> deploy --reqs <reqs> <calvin script>
Alternatively the control API can be used directly (should be used on the same node as the application has been started before any other commands which could affect the application are issued):
POST /application/{application-id}/migrate
Body: Requirements JSON-file
The structure of the JSON-file is shown here:
{
"reqs":
"requirements": {
"<actor instance 1 name>": [ {"op": "<matching rule name>",
"kwargs": {<rule param key>: <rule param value>, ...},
"type": either "+" or "-" for set selection or removal, respectively
}, ...
],
...
}
}
The matching rules are implemented as plug-ins, intended to be extended. Current matching rules available are current_node
,
all_nodes
and node_attr_match
. The last rule takes an index parameter, which needs to be attribute formatted, e.g.
{"op": "node_attr_match",
"kwargs": {"index": ["node_name", {"organization": "org.testexample", "name": "testNode1"}]}
"type": "+"
}
All rules must return a list of possible node ids to deploy on, no preference is attached to the order of the list.
Each actor instance can have several rules, hence the list of rules. A final set of possible nodes is built up from the returned result of the individual rules. A type "+" marks that a intersection is to made between final set and the individual rule's returned nodes. Accordingly, for each rule with "+" the final section shrinks to only have the common nodes. A type "-" marks that the relative complement of the individual rule in the final set should be made, in other words any returned nodes are impossible to deploy to, "subtracted" from the final set. Note that only "-"-type rules will result in an empty set of possible nodes, i.e. there is no implied 'all but these.'
Since it is also neccessary to be able to express alternative rules, usually between a subset of the rules, a special matching rule exist, to first form a union between sub-rules. This is useful for e.g. alternative namings, ownerships, or specifying either of two specific nodes. Such a rule looks as follows:
{"op": "union_group",
"requirements": [
{"op": "node_attr_match",
"kwargs": {"index": ["node_name", {"organization": "org.testexample", "name": "testNode1"}]}
},
{"op": "node_attr_match",
"kwargs": {"index": ["node_name", {"organization": "org.testexample", "name": "testNode2"}]}
}
],
"type": "+"
}
As can be seen this rules does not have kwargs
but instead have requirements
. For consistency, it still has the
type
-field but it only makes sense to use "+". The requirements
take a list of matching rules op
-field and the kwargs
-field.
Here we show a short example which assumes you are working with a calvin script myapp.calvin
:
src : std.CountTimer()
sum : std.Sum()
snk : io.Print()
src.integer > sum.integer
sum.integer > snk.token
and a deployment requirement JSON-file myapp.deployjson
:
{
"groups": {"firstgroup": ["src", "sum"]},
"requirements": {
"src": [{"op": "union_group",
"requirements": [{"op": "node_attr_match",
"kwargs": {"index": ["node_name", {"organization": "org.testexample", "name": "testNode1"}]}
},
{"op": "node_attr_match",
"kwargs": {"index": ["node_name", {"organization": "org.testexample", "name": "testNode2"}]}
}],
"type": "+"
},
{"op": "current_node",
"kwargs": {},
"type": "-"
}],
"sum": [{"op": "node_attr_match",
"kwargs": {"index": ["node_name", {"organization": "org.testexample", "name": "testNode2"}]},
"type": "+"
}],
"snk": [{"op": "node_attr_match",
"kwargs": {"index": ["node_name", {"organization": "org.testexample", "name": "testNode3"}]},
"type": "+"
},
{"op": "current_node",
"kwargs": {},
"type": "-"
}]
}
}
First start 3 nodes with node attributes
csruntime -n <ip-addr> -p 5000 -c 5001 -w 0 --attr '{"indexed_public": {"node_name": {"organization": "org.testexample", "name": "testNode1"}}}' &
csruntime -n <ip-addr> -p 5002 -c 5003 -w 0 --attr '{"indexed_public": {"node_name": {"organization": "org.testexample", "name": "testNode2"}}}' &
csruntime -n <ip-addr> -p 5004 -c 5005 -w 0 --attr '{"indexed_public": {"node_name": {"organization": "org.testexample", "name": "testNode3"}}}' &
Obviously replacing <ip_addr>
with e.g. your local machine's ip addr.
Then deploy the application, assuming the above listed script and json files are in your current directory:
cscontrol http://<ip-addr>:5001 deploy --reqs myapp.deployjson myapp.calvin
(Replacing <ip_addr>
with e.g. your local machine's ip addr.)
Now the application is first built-up on the testNode1
-node and then the actors are migrated according to the requirements:src
to testNode2
,
sum
to testNode2
and snk
to testNode3
.
For this to work the runtime storage has to be up and running. At times, it takes a while for the distributed hash table (DHT) to connect the nodes, hence potentially you need to wait a second or two between starting the nodes and deploying the application. Also note that the ip-addresses used above don't need to be the same as long as the DHT nodes can find each other, i.e. currently broadcast over ip can be sent and heard.
Here we list a few requirement expressions that can be used in Calvinscripts to control actor placements and scaling.
To match runtime attributes for placement these corresponds to the requirements expressions listed in previous section.
node_attr_match(index=["node_name", {"organization": "com.ericsson", "purpose": "distributed-test", "group": "first"}])
all_nodes()
etc
The actors may also be replicated according to requirement expressions. Two main classes exist performance scaling that will scale to not stall upstream actors and device coverage scaling that will scale to any matching devices. When scaling it is important to use port properties on upstream, downstream and replicating actor ports. This is to allow tokens to be distributed balanced between replicas or collecting resulting tokens from all of them.
For performance scaling the runtime holding the origin (master) actor will measure how often upstream tokens can't be received due to queues being full. When this happens often enough the runtime will replicate the actor, or vise versa dereplicate the last replica.
performance_scaling(max=<upper limit>, alone=<true/false>)
-
max
sets an upper limit on the number of replicas that can be created simultaneously. -
alone
decides if replicas should only be placed on runtimes with or without other replicas of the same actor.
Any upstream actors should use a port property routing="balanced"
to make sure that the tokens are delivered to available replicas. Any downstream actors should use a port property that collect tokens from several ports, e.g. routing="collect-tagged"
.
For device scaling the runtime holding the origin (master) actor will periodically check if placement requirement expression match new runtimes, in which case a replica is created on that runtime. Hence, device scaling is best combined with a placement requirement expression but sometimes the actor requires attribute is enough to achieve a wanted operation.
device_scaling(max=<upper limit>)
-
max
sets an upper limit on the number of replicas that can be created simultaneously.
Any upstream actors could use a port property routing="balanced"
or routing="random"
to not deliver all tokens to all replicas. Any downstream actors should use a port property that collect tokens from several ports, e.g. routing="collect-tagged"
.
sensor : std.CountTimer(sleep=0.01)
analyze : std.Burn(duration=0.03)
accumulate : std.Identity()
show : io.Print()
sensor.integer(routing="balanced")
accumulate.token[in](routing="collect-unordered")
analyze.token[in](routing="collect-tagged")
sensor.integer > analyze.token
analyze.token > accumelate.token
accumulate.token > show.token
rule first: node_attr_match(index=["node_name", {"organization": "com.ericsson", "purpose": "distributed-test", "group": "first"}])
rule rest: node_attr_match(index=["node_name", {"organization": "com.ericsson", "purpose": "distributed-test", "group": "rest"}]) & performance_scaling(max=20, alone=true)
rule device: node_attr_match(index=["node_name", {"organization": "com.ericsson", "purpose": "distributed-test", "group": "device"}]) & device_scaling(max=20)
apply show, accumulate: first
apply sensor: device
apply analyze: rest