diff --git a/.vscode/settings.json b/.vscode/settings.json index bf6870d6..a5eb0fe7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,11 +2,12 @@ "editor.formatOnSave": true, "black-formatter.importStrategy": "fromEnvironment", "python.testing.pytestArgs": [ - "." + ".", + "--doctest-modules" ], "python.testing.unittestEnabled": false, "python.testing.pytestEnabled": true, "[python]": { "editor.defaultFormatter": "ms-python.black-formatter" } -} +} \ No newline at end of file diff --git a/examples/README.md b/examples/README.md index 48ca6b67..3b9c7113 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,3 +1,113 @@ -# Example Experiments +# Experiment Resources + +This guide provides instructions for creating and structuring resource-file.json, which is used to define the behavior tree and experiment flow for robots. Currently, there are three robots with custom contexts: + +1. Atlas +2. Digit +3. Optimus + + + + +## Breakdown of {robot}-resource-file.json + +Resource files are JSON files made up of key-value pairs. The keys represent subgoals, and their values define the subgoal's context, children, behavior library, and interruptions. + +Here’s an example template: + + +```json +{ + "subgoal_A": { + "context": "...", + "children": [], + "behavior_library": [ + { + "id": "...", + "name": "...", + "in_behavior_bank": false + }, + { + "id": "...", + "name": "...", + "in_behavior_bank": true + } + ], + "interruptions": {} + }, + "subgoal_B": { + "context": "...", + "children": [], + "behavior_library": [ + { + "id": "...", + "name": "...", + "in_behavior_bank": false + }, + { + "id": "...", + "name": "...", + "in_behavior_bank": true + } + ], + "interruptions": {} + } +} +``` + +## Subgoal Structure Breakdown +- Each subgoal is represented by a unique key (e.g., "subgoal_A") with the following fields: + + +### Context +- A string describing the subgoal's background and objectives. + +Example: +```json +"context": "Atlas is assigned to retrieve medicine A from the medicine cabinet..." +``` + + +### Children +- An array of behavior IDs (strings) specifying the execution order of the subgoal's behaviors. +- Important: These IDs must exist in the behavior_library field. + +Example: +```json +"children": [ + "take_path_to_medicine_cabinet", + "unlock_cabinet", + "retrieve_medicine" +] +``` + + +### Behavior Library +- A list of all possible behaviors for the subgoal. +- Each behavior is an object with the following fields: + - id (required): Unique identifier. + - name (required): Description of the behavior. + - in_behavior_bank (optional): true if included in the behavior bank. + +Example: +```json +"behavior_library": [ + { + "id": "take_path_to_medicine_cabinet", + "name": "take acceptable path to the medicine cabinet", + "in_behavior_bank": false + }, + { + "id": "request_medicine", + "name": "request medicine from pharmacist", + "in_behavior_bank": true + } +] +``` + +### Interruptions + +Currently not used. Keep as an empty object: {} + + -This directory includes configuration files for example experiments. \ No newline at end of file diff --git a/examples/abstract-experiment.json b/examples/abstract-experiment.json deleted file mode 100644 index 881b77e8..00000000 --- a/examples/abstract-experiment.json +++ /dev/null @@ -1,129 +0,0 @@ -{ - "behavior_tree": { - "name": "Sequence A", - "type": "Sequence", - "children": [ - { - "name": "Behavior A", - "type": "Behavior", - "children": [] - }, - { - "name": "Behavior B", - "type": "Behavior", - "children": [] - }, - { - "name": "Sequence B", - "type": "Sequence", - "children": [ - { - "name": "Behavior C", - "type": "Behavior", - "children": [] - }, - { - "name": "Behavior D", - "type": "Behavior", - "children": [] - } - ] - }, - { - "name": "Sequence C", - "type": "Sequence", - "children": [ - { - "name": "Behavior E", - "type": "Behavior", - "children": [] - }, - { - "name": "Behavior F", - "type": "Behavior", - "children": [] - }, - { - "name": "Behavior G", - "type": "Behavior", - "children": [] - }, - { - "name": "Behavior H", - "type": "Behavior", - "children": [] - } - ] - } - ] - }, - "behavior_library": [ - { - "id": "001", - "display_name": "Behavior A", - "type": "Behavior", - "show": true - }, - { - "id": "002", - "display_name": "Behavior B", - "type": "Behavior", - "show": true - }, - { - "id": "003", - "display_name": "Behavior C", - "type": "Behavior", - "show": true - }, - { - "id": "004", - "display_name": "Behavior D", - "type": "Behavior", - "show": true - }, - { - "id": "005", - "display_name": "Behavior E", - "type": "Behavior", - "show": true - }, - { - "id": "006", - "display_name": "Behavior F", - "type": "Behavior", - "show": true - }, - { - "id": "007", - "display_name": "Behavior G", - "type": "Behavior", - "show": true - }, - { - "id": "008", - "display_name": "Behavior H", - "type": "Behavior", - "show": true - }, - { - "id": "009", - "display_name": "Sequence A", - "type": "Sequence", - "show": false - }, - { - "id": "010", - "display_name": "Sequence B", - "type": "Sequence", - "show": false - }, - { - "id": "011", - "display_name": "Sequence C", - "type": "Sequence", - "show": false - } - ], - "context": "This is a sample paragraph that describes the context of the environment in which the robot is operating. This context might affect the decision making, rendering some actions relevant and others less relevant." -} diff --git a/examples/atlas-resource-file.json b/examples/atlas-resource-file.json new file mode 100644 index 00000000..29e7eb59 --- /dev/null +++ b/examples/atlas-resource-file.json @@ -0,0 +1,140 @@ +{ + "pick_up_medicine": { + "context": "Atlas is assigned to retrieve medicine A from the medicine cabinet. Atlas is currently on the third floor near the nurse's desk. The medicine cabinet is located in the pharmacy room on the third floor. The medicine cabinet has a total 5 stock for medicine A. To retrieve medicine from the pharmacy room, qualified personnel must communicate with the pharmacist at the door and request the medicine they require.", + "children": [ + "take_path_to_medicine_cabinet", + "unlock_cabinet", + "retrieve_medicine" + ], + "behavior_library": [ + { + "id": "take_path_to_medicine_cabinet", + "name": "take acceptable path to the medicine cabinet", + "in_behavior_bank": false + }, + { + "id": "unlock_cabinet", + "name": "unlock the cabinet" + }, + { + "id": "retrieve_medicine", + "name": "retrieve the medicine" + }, + { + "id": "wait_for_cabinet_access", + "name": "wait at medicine cabinet until it is accessible", + "in_behavior_bank": true + }, + { + "id": "alert_adminstrator", + "name": "alert adminstrator for assistance", + "in_behavior_bank": true + }, + { + "id": "request_medicine", + "name": "request required medicine from pharmacist", + "in_behavior_bank": true + } + ], + "interruptions": { + "take_path_to_medicine_cabinet": "While moving toward the medicine cabinet, the robot is interrupted by a visitor: “Hello, I’m looking for room 536 to visit John. Can you help me find the way?”", + "unlock_cabinet": "Upon reaching the cabinet, the robot finds it locked and is unable to electronically open it with the password." + } + }, + "travel_to_desired_floor": { + "context": "Atlas has the medicine in it's posession. Atlas is now assigned to travel to Patient P's Room. Patient P is in room 405, which is on the 4th floor. Atlas is currently located on the 3rd floor in the medicine room. Atlas is designed to fit in conventional elevators. The capacity of the hospital elevator is 1500 lbs based on an average passenger weight of 150 lbs. There are currently 0 occupants in the elevator.", + "children": [ + "take_path_to_elevator", + "get_into_elevator", + "press_correct_floor_button", + "exit_elevator" + ], + "behavior_library": [ + { + "id": "take_path_to_elevator", + "name": "take acceptable path to the elevator", + "in_behavior_bank": false + }, + { + "id": "get_into_elevator", + "name": "get into the elevator", + "in_behavior_bank": false + }, + { + "id": "press_correct_floor_button", + "name": "press the correct floor button", + "in_behavior_bank": false + }, + { + "id": "exit_elevator", + "name": "exit the elevator", + "in_behavior_bank": false + }, + { + "id": "announce_presence_and_task", + "name": "announce its presence and task", + "in_behavior_bank": true + }, + { + "id": "wait_for_empty_elevator", + "name": "wait for elevator that has no occupants", + "in_behavior_bank": true + }, + { + "id": "alert_adminstrator", + "name": "alert adminstrator for assistance", + "in_behavior_bank": true + } + ], + "interruptions": { + "take_path_to_medicine_cabinet": "While moving toward the medicine cabinet, the robot is interrupted by a visitor: “Hello, I’m looking for room 536 to visit John. Can you help me find the way?”", + "unlock_cabinet": "Upon reaching the cabinet, the robot finds it locked and is unable to electronically open it with the password." + } + }, + "deliver_the_medicine": { + "context": "Atlas has the medicine in it's possession. Atlas is now assigned to deliver the medicine to Patient P. Atlas is on the 4th floor, outside the elevator lobby. Patient P is in room 405 and the door to Patient P's room is purposely left open for nurses to keep a constant eye on the patient.", + "children": [ + "take_path_to_patient_room", + "enter_room", + "hand_medicine_to_patient" + ], + "behavior_library": [ + { + "id": "take_path_to_patient_room", + "name": "take acceptable path to the patient room", + "in_behavior_bank": false + }, + { + "id": "enter_room", + "name": "enter the patient's room", + "in_behavior_bank": false + }, + { + "id": "hand_medicine_to_patient", + "name": "hand the medicine to the patient", + "in_behavior_bank": false + }, + { + "id": "announce_presence_and_task", + "name": "announce its presence and task", + "in_behavior_bank": true + }, + { + "id": "knock_on_door", + "name": "knock on the door", + "in_behavior_bank": true + }, + { + "id": "open_room_door", + "name": "open the door", + "in_behavior_bank": true + }, + { + "id": "alert_adminstrator", + "name": "alert adminstrator for assistance", + "in_behavior_bank": true + } + ], + "interruptions": {} + } +} \ No newline at end of file diff --git a/examples/digit-resource-file.json b/examples/digit-resource-file.json new file mode 100644 index 00000000..0ef56d8b --- /dev/null +++ b/examples/digit-resource-file.json @@ -0,0 +1,140 @@ +{ + "pick_up_medicine": { + "context": "Digit is assigned to retrieve medicine A from the medicine cabinet. Digit is currently on the third floor near the nurse's desk. The medicine cabinet is located in the pharmacy room on the third floor. The medicine cabinet has a total 5 stock for medicine A. To retrieve medicine from the pharmacy room, qualified personnel are allowed to enter the pharmacy room and grab the medicine they require.", + "children": [ + "take_path_to_medicine_cabinet", + "unlock_cabinet", + "retrieve_medicine" + ], + "behavior_library": [ + { + "id": "take_path_to_medicine_cabinet", + "name": "take acceptable path to the medicine cabinet", + "in_behavior_bank": false + }, + { + "id": "unlock_cabinet", + "name": "unlock the cabinet" + }, + { + "id": "retrieve_medicine", + "name": "retrieve the medicine" + }, + { + "id": "wait_for_cabinet_access", + "name": "wait at medicine cabinet until it is accessible", + "in_behavior_bank": true + }, + { + "id": "alert_adminstrator", + "name": "alert adminstrator for assistance", + "in_behavior_bank": true + }, + { + "id": "request_medicine", + "name": "request required medicine from pharmacist", + "in_behavior_bank": true + } + ], + "interruptions": { + "take_path_to_medicine_cabinet": "While moving toward the medicine cabinet, the robot is interrupted by a visitor: “Hello, I’m looking for room 536 to visit John. Can you help me find the way?”", + "unlock_cabinet": "Upon reaching the cabinet, the robot finds it locked and is unable to electronically open it with the password." + } + }, + "travel_to_desired_floor": { + "context": "Digit has the medicine in it's posession. Digit is now assigned to travel to Patient P's Room. Patient P is in room 405, which is on the 4th floor. Digit is currently located on the 3rd floor in the medicine room. Digit is designed to fit in conventional elevators. The capacity of the hospital elevator is 1500 lbs based on an average passenger weight of 150 lbs. There are currently 10 occupants in the elevator.", + "children": [ + "take_path_to_elevator", + "get_into_elevator", + "press_correct_floor_button", + "exit_elevator" + ], + "behavior_library": [ + { + "id": "take_path_to_elevator", + "name": "take acceptable path to the elevator", + "in_behavior_bank": false + }, + { + "id": "get_into_elevator", + "name": "get into the elevator", + "in_behavior_bank": false + }, + { + "id": "press_correct_floor_button", + "name": "press the correct floor button", + "in_behavior_bank": false + }, + { + "id": "exit_elevator", + "name": "exit the elevator", + "in_behavior_bank": false + }, + { + "id": "announce_presence_and_task", + "name": "announce its presence and task", + "in_behavior_bank": true + }, + { + "id": "wait_for_empty_elevator", + "name": "wait for elevator that has no occupants", + "in_behavior_bank": true + }, + { + "id": "alert_adminstrator", + "name": "alert adminstrator for assistance", + "in_behavior_bank": true + } + ], + "interruptions": { + "take_path_to_medicine_cabinet": "While moving toward the medicine cabinet, the robot is interrupted by a visitor: “Hello, I’m looking for room 536 to visit John. Can you help me find the way?”", + "unlock_cabinet": "Upon reaching the cabinet, the robot finds it locked and is unable to electronically open it with the password." + } + }, + "deliver_the_medicine": { + "context": "Digit has the medicine in it's possession. Digit is now assigned to deliver the medicine to Patient P. Digit is on the 4th floor, outside the elevator lobby. Patient P is in room 405 and the door to Patient P's room is closed for privacy reasons. Only select personnel are allowed to enter.", + "children": [ + "take_path_to_patient_room", + "enter_room", + "hand_medicine_to_patient" + ], + "behavior_library": [ + { + "id": "take_path_to_patient_room", + "name": "take acceptable path to the patient room", + "in_behavior_bank": false + }, + { + "id": "enter_room", + "name": "enter the patient's room", + "in_behavior_bank": false + }, + { + "id": "hand_medicine_to_patient", + "name": "hand the medicine to the patient", + "in_behavior_bank": false + }, + { + "id": "announce_presence_and_task", + "name": "announce its presence and task", + "in_behavior_bank": true + }, + { + "id": "knock_on_door", + "name": "knock on the door", + "in_behavior_bank": true + }, + { + "id": "open_room_door", + "name": "open the door", + "in_behavior_bank": true + }, + { + "id": "alert_adminstrator", + "name": "alert adminstrator for assistance", + "in_behavior_bank": true + } + ], + "interruptions": {} + } +} \ No newline at end of file diff --git a/examples/entering-a-room.json b/examples/entering-a-room.json deleted file mode 100644 index 2c961464..00000000 --- a/examples/entering-a-room.json +++ /dev/null @@ -1,84 +0,0 @@ -{ - "context": "A nurse assistant robot in a hospital is taking medicine to Mrs. Jones during the day. The robot has already collected the medicine and enters the corridor outside Mrs. Jones' room. It needs to enter the room to take Mrs. Jones her medicine.", - "behavior_tree": { - "name": "", - "type": "Sequence", - "children": [ - { - "name": "Approach the door", - "type": "Behavior" - }, - { - "name": "Open the door", - "type": "Behavior" - }, - { - "name": "Go through the doorway", - "type": "Behavior" - } - ] - }, - "behavior_library": [ - { - "id": "anonymous-sequence", - "name": "", - "type": "Sequence", - "show": false - }, - { - "id": "approach-door", - "name": "Approach the door", - "type": "Behavior", - "show": true - }, - { - "id": "open-door", - "name": "Open the door", - "type": "Behavior", - "show": true - }, - { - "id": "go-through-doorway", - "name": "Go through the doorway", - "type": "Behavior", - "show": true - }, - { - "id": "knock-on-the-door", - "name": "Knock on the door", - "type": "Behavior", - "show": true - }, - { - "id": "sing-a-song", - "name": "Sing a song", - "type": "Behavior", - "show": true - }, - { - "id": "announce-your-presence", - "name": "Announce your presence", - "type": "Behavior", - "show": true - }, - { - "id": "contact-supervisor-guidance", - "name": "Contact supervisor for guidance", - "type": "Behavior", - "show": true - }, - { - "id": "stop", - "name": "Stop", - "type": "Behavior", - "show": true - }, - { - "id": "return-to-charging-station", - "name": "Return to charging station", - "type": "Behavior", - "show": false - } - ] - -} diff --git a/examples/optimus-resource-file.json b/examples/optimus-resource-file.json new file mode 100644 index 00000000..94336f99 --- /dev/null +++ b/examples/optimus-resource-file.json @@ -0,0 +1,140 @@ +{ + "pick_up_medicine": { + "context": "Optimus is assigned to retrieve medicine A from the medicine cabinet. Optimus is currently on the third floor near the nurse's desk. The medicine cabinet is located in the pharmacy room on the third floor. The medicine cabinet has a total 5 stock for medicine A. To retrieve medicine from the pharmacy room, qualified personnel are allowed to enter the pharmacy room and grab the medicine they require.", + "children": [ + "take_path_to_medicine_cabinet", + "unlock_cabinet", + "retrieve_medicine" + ], + "behavior_library": [ + { + "id": "take_path_to_medicine_cabinet", + "name": "take acceptable path to the medicine cabinet", + "in_behavior_bank": false + }, + { + "id": "unlock_cabinet", + "name": "unlock the cabinet" + }, + { + "id": "retrieve_medicine", + "name": "retrieve the medicine" + }, + { + "id": "wait_for_cabinet_access", + "name": "wait at medicine cabinet until it is accessible", + "in_behavior_bank": true + }, + { + "id": "alert_adminstrator", + "name": "alert adminstrator for assistance", + "in_behavior_bank": true + }, + { + "id": "request_medicine", + "name": "request required medicine from pharmacist", + "in_behavior_bank": true + } + ], + "interruptions": { + "take_path_to_medicine_cabinet": "While moving toward the medicine cabinet, the robot is interrupted by a visitor: “Hello, I’m looking for room 536 to visit John. Can you help me find the way?”", + "unlock_cabinet": "Upon reaching the cabinet, the robot finds it locked and is unable to electronically open it with the password." + } + }, + "travel_to_desired_floor": { + "context": "Optimus has the medicine in it's posession. Optimus is now assigned to travel to Patient P's Room. Patient P is in room 405, which is on the 4th floor. Optimus is currently located on the 3rd floor in the medicine room. Optimus is designed to fit in conventional elevators. The capacity of the hospital elevator is 1500 lbs based on an average passenger weight of 150 lbs. There are currently 10 occupants in the elevator.", + "children": [ + "take_path_to_elevator", + "get_into_elevator", + "press_correct_floor_button", + "exit_elevator" + ], + "behavior_library": [ + { + "id": "take_path_to_elevator", + "name": "take acceptable path to the elevator", + "in_behavior_bank": false + }, + { + "id": "get_into_elevator", + "name": "get into the elevator", + "in_behavior_bank": false + }, + { + "id": "press_correct_floor_button", + "name": "press the correct floor button", + "in_behavior_bank": false + }, + { + "id": "exit_elevator", + "name": "exit the elevator", + "in_behavior_bank": false + }, + { + "id": "announce_presence_and_task", + "name": "announce its presence and task", + "in_behavior_bank": true + }, + { + "id": "wait_for_empty_elevator", + "name": "wait for elevator that has no occupants", + "in_behavior_bank": true + }, + { + "id": "alert_adminstrator", + "name": "alert adminstrator for assistance", + "in_behavior_bank": true + } + ], + "interruptions": { + "take_path_to_medicine_cabinet": "While moving toward the medicine cabinet, the robot is interrupted by a visitor: “Hello, I’m looking for room 536 to visit John. Can you help me find the way?”", + "unlock_cabinet": "Upon reaching the cabinet, the robot finds it locked and is unable to electronically open it with the password." + } + }, + "deliver_the_medicine": { + "context": "Optimus has the medicine in it's possession. Optimus is now assigned to deliver the medicine to Patient P. Optimus is on the 4th floor, outside the elevator lobby. Patient P is in room 405 and the door to Patient P's room is closed for privacy reasons. Only select personnel are allowed to enter.", + "children": [ + "take_path_to_patient_room", + "enter_room", + "hand_medicine_to_patient" + ], + "behavior_library": [ + { + "id": "take_path_to_patient_room", + "name": "take acceptable path to the patient room", + "in_behavior_bank": false + }, + { + "id": "enter_room", + "name": "enter the patient's room", + "in_behavior_bank": false + }, + { + "id": "hand_medicine_to_patient", + "name": "hand the medicine to the patient", + "in_behavior_bank": false + }, + { + "id": "announce_presence_and_task", + "name": "announce its presence and task", + "in_behavior_bank": true + }, + { + "id": "knock_on_door", + "name": "knock on the door", + "in_behavior_bank": true + }, + { + "id": "open_room_door", + "name": "open the door", + "in_behavior_bank": true + }, + { + "id": "alert_adminstrator", + "name": "alert adminstrator for assistance", + "in_behavior_bank": true + } + ], + "interruptions": {} + } +} \ No newline at end of file diff --git a/norms/ball.py b/norms/ball.py deleted file mode 100644 index 70321b4c..00000000 --- a/norms/ball.py +++ /dev/null @@ -1,206 +0,0 @@ -#!/usr/bin/env python -# -# License: BSD -# https://raw.githubusercontent.com/splintered-reality/py_trees/devel/LICENSE -# -############################################################################## -# Documentation -############################################################################## - -""" -A py_trees demo. - -.. argparse:: - :module: py_trees.demos.selector - :func: command_line_argument_parser - :prog: py-trees-demo-selector - -.. graphviz:: dot/demo-selector.dot - -.. image:: images/selector.gif - -""" -############################################################################## -# Imports -############################################################################## - -import argparse -import sys -import time -import typing - -import py_trees -import py_trees.console as console - -############################################################################## -# Classes -############################################################################## - - -def description() -> str: - """ - Print description and usage information about the program. - - Returns: - the program description string - """ - content = ( - "Higher priority switching and interruption in the children of a selector.\n" - ) - content += "\n" - content += "In this example the higher priority child is setup to fail initially,\n" - content += "falling back to the continually running second child. On the third\n" - content += ( - "tick, the first child succeeds and cancels the hitherto running child.\n" - ) - if py_trees.console.has_colours: - banner_line = console.green + "*" * 79 + "\n" + console.reset - s = banner_line - s += console.bold_white + "Selectors".center(79) + "\n" + console.reset - s += banner_line - s += "\n" - s += content - s += "\n" - s += banner_line - else: - s = content - return s - - -def epilog() -> typing.Optional[str]: - """ - Print a noodly epilog for --help. - - Returns: - the noodly message - """ - if py_trees.console.has_colours: - return ( - console.cyan - + "And his noodly appendage reached forth to tickle the blessed...\n" - + console.reset - ) - else: - return None - - -def command_line_argument_parser() -> argparse.ArgumentParser: - """ - Process command line arguments. - - Returns: - the argument parser - """ - parser = argparse.ArgumentParser( - description=description(), - epilog=epilog(), - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - group = parser.add_mutually_exclusive_group() - group.add_argument( - "-r", "--render", action="store_true", help="render dot tree to file" - ) - group.add_argument( - "--render-with-blackboard-variables", - action="store_true", - help="render dot tree to file with blackboard variables", - ) - group.add_argument( - "-i", - "--interactive", - action="store_true", - help="pause and wait for keypress at each tick", - ) - return parser - - -def create_root() -> py_trees.behaviour.Behaviour: - """ - Create the root behaviour and it's subtree. - - Returns: - the root behaviour - """ - root = py_trees.composites.Sequence(name="Sequence", memory=False) - - find = py_trees.behaviours.Success(name="Find Ball") - - pick_up_ball = py_trees.composites.Sequence(name="Sequence", memory=False) - - place = py_trees.behaviours.Success(name="Place Ball") - - root.add_children([find, pick_up_ball, place]) - - move_to_ball = py_trees.composites.Selector(name="Selector", memory=False) - - obtain_ball = py_trees.composites.Selector(name="Selector", memory=False) - - pick_up_ball.add_children([move_to_ball, obtain_ball]) - - isClose = py_trees.behaviours.StatusQueue( - name="Ball close?", - queue=[ - py_trees.common.Status.FAILURE, - py_trees.common.Status.FAILURE, - py_trees.common.Status.SUCCESS, - ], - eventually=py_trees.common.Status.SUCCESS, - ) - approach = py_trees.behaviours.Running(name="Approach Ball") - - move_to_ball.add_children([isClose, approach]) - - isGrasped = py_trees.behaviours.StatusQueue( - name="Ball Grasped?", - queue=[ - py_trees.common.Status.FAILURE, - py_trees.common.Status.SUCCESS, - ], - eventually=py_trees.common.Status.SUCCESS, - ) - grasp = py_trees.behaviours.Running(name="Grasp Ball") - - obtain_ball.add_children([isGrasped, grasp]) - - return root - - -############################################################################## -# Main -############################################################################## - - -def main() -> None: - """Entry point for the demo script.""" - args = command_line_argument_parser().parse_args() - print(description()) - py_trees.logging.level = py_trees.logging.Level.DEBUG - - root = create_root() - - #################### - # Rendering - #################### - if args.render: - py_trees.display.render_dot_tree(root) - sys.exit() - - #################### - # Execute - #################### - root.setup_with_descendants() - print(py_trees.display.unicode_tree(root=root, show_status=True)) - for i in range(1, 5): - try: - time.sleep(3.0) - print("\n--------- Tick {0} ---------\n".format(i)) - root.tick_once() - print("\n") - print(py_trees.display.unicode_tree(root=root, show_status=True)) - except KeyboardInterrupt: - break - print("\n") - - -if __name__ == "__main__": - main() diff --git a/norms/ball_blackboard.py b/norms/ball_blackboard.py deleted file mode 100644 index c9a0a5d7..00000000 --- a/norms/ball_blackboard.py +++ /dev/null @@ -1,220 +0,0 @@ -#!/usr/bin/env python -# -# License: BSD -# https://raw.githubusercontent.com/splintered-reality/py_trees/devel/LICENSE -# -############################################################################## -# Documentation -############################################################################## - -""" -A py_trees demo. - -.. argparse:: - :module: py_trees.demos.selector - :func: command_line_argument_parser - :prog: py-trees-demo-selector - -.. graphviz:: dot/demo-selector.dot - -.. image:: images/selector.gif - -""" -############################################################################## -# Imports -############################################################################## - -import argparse -import sys -import time -import typing -import operator - -import py_trees -import py_trees.console as console - -############################################################################## -# Classes -############################################################################## - -py_trees.logging.level = py_trees.logging.Level.DEBUG -blackboard = py_trees.blackboard.Client(name="Client") -blackboard.register_key(key="isBallClose", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="isBallGrasped", access=py_trees.common.Access.WRITE) -blackboard.isBallClose = False -blackboard.isBallGrasped = False - - -def description() -> str: - """ - Print description and usage information about the program. - - Returns: - the program description string - """ - content = ( - "Higher priority switching and interruption in the children of a selector.\n" - ) - content += "\n" - content += "In this example the higher priority child is setup to fail initially,\n" - content += "falling back to the continually running second child. On the third\n" - content += ( - "tick, the first child succeeds and cancels the hitherto running child.\n" - ) - if py_trees.console.has_colours: - banner_line = console.green + "*" * 79 + "\n" + console.reset - s = banner_line - s += console.bold_white + "Selectors".center(79) + "\n" + console.reset - s += banner_line - s += "\n" - s += content - s += "\n" - s += banner_line - else: - s = content - return s - - -def epilog() -> typing.Optional[str]: - """ - Print a noodly epilog for --help. - - Returns: - the noodly message - """ - if py_trees.console.has_colours: - return ( - console.cyan - + "And his noodly appendage reached forth to tickle the blessed...\n" - + console.reset - ) - else: - return None - - -def command_line_argument_parser() -> argparse.ArgumentParser: - """ - Process command line arguments. - - Returns: - the argument parser - """ - parser = argparse.ArgumentParser( - description=description(), - epilog=epilog(), - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - group = parser.add_mutually_exclusive_group() - group.add_argument( - "-r", "--render", action="store_true", help="render dot tree to file" - ) - group.add_argument( - "--render-with-blackboard-variables", - action="store_true", - help="render dot tree to file with blackboard variables", - ) - group.add_argument( - "-i", - "--interactive", - action="store_true", - help="pause and wait for keypress at each tick", - ) - return parser - - -def create_root() -> py_trees.behaviour.Behaviour: - """ - Create the root behaviour and it's subtree. - - Returns: - the root behaviour - """ - root = py_trees.composites.Sequence(name="Sequence", memory=False) - - find = py_trees.behaviours.Success(name="Find Ball") - - pick_up_ball = py_trees.composites.Sequence(name="Sequence", memory=False) - - place = py_trees.behaviours.Success(name="Place Ball") - - root.add_children([find, pick_up_ball, place]) - - move_to_ball = py_trees.composites.Selector(name="Selector", memory=False) - - obtain_ball = py_trees.composites.Selector(name="Selector", memory=False) - - pick_up_ball.add_children([move_to_ball, obtain_ball]) - - print(blackboard) - - isClose = py_trees.behaviours.CheckBlackboardVariableValue( - name="Ball Close?", - check=py_trees.common.ComparisonExpression( - variable="isBallClose", value=True, operator=operator.eq - ), - ) - - approach = py_trees.behaviours.Running(name="Approach Ball") - - move_to_ball.add_children([isClose, approach]) - - isGrasped = py_trees.behaviours.CheckBlackboardVariableValue( - name="Ball Grasped?", - check=py_trees.common.ComparisonExpression( - variable="isBallGrasped", value=True, operator=operator.eq - ), - ) - - grasp = py_trees.behaviours.Running(name="Grasp Ball") - - obtain_ball.add_children([isGrasped, grasp]) - - return root - - -############################################################################## -# Main -############################################################################## - - -def main() -> None: - """Entry point for the demo script.""" - args = command_line_argument_parser().parse_args() - print(description()) - # print(blackboard) - - root = create_root() - - #################### - # Rendering - #################### - if args.render: - py_trees.display.render_dot_tree(root) - sys.exit() - - #################### - # Execute - #################### - root.setup_with_descendants() - print(py_trees.display.unicode_tree(root=root, show_status=True)) - for i in range(1, 6): - try: - time.sleep(1.0) - print("\n--------- Tick {0} ---------\n".format(i)) - if i == 3: - print("Ball is now close\n") - blackboard.isBallClose = True - if i == 5: - print("Ball is now grasped\n") - blackboard.isBallGrasped = True - # blackboard.isBallClose = False - root.tick_once() - print("\n") - print(py_trees.display.unicode_tree(root=root, show_status=True)) - except KeyboardInterrupt: - break - print("\n") - - -if __name__ == "__main__": - main() diff --git a/norms/deliver_medicine.py b/norms/deliver_medicine.py deleted file mode 100644 index 2c61139d..00000000 --- a/norms/deliver_medicine.py +++ /dev/null @@ -1,324 +0,0 @@ -#!/usr/bin/env python -# -# License: BSD -# https://raw.githubusercontent.com/splintered-reality/py_trees/devel/LICENSE -# -############################################################################## -# Documentation -############################################################################## - -""" -A py_trees demo. - -.. argparse:: - :module: py_trees.demos.selector - :func: command_line_argument_parser - :prog: py-trees-demo-selector - -.. graphviz:: dot/demo-selector.dot - -.. image:: images/selector.gif - -""" -############################################################################## -# Imports -############################################################################## - -import argparse -import sys -import time -import typing -import operator - -import py_trees -import py_trees.console as console - -############################################################################## -# Classes -############################################################################## - -py_trees.logging.level = py_trees.logging.Level.INFO -blackboard = py_trees.blackboard.Client(name="Client") -blackboard.register_key(key="isCabinetUnlocked", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="isElevatorOpen", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="elevatorHasSpace", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="canEnterElevator", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="isElevatorOn7th", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="canLeaveElevator", access=py_trees.common.Access.WRITE) -blackboard.isCabinetUnlocked = False -blackboard.isElevatorOpen = False -blackboard.elevatorHasSpace = False -blackboard.canEnterElevator = False -blackboard.isElevatorOn7th = False -blackboard.canLeaveElevator = False - - -def description() -> str: - """ - Print description and usage information about the program. - - Returns: - the program description string - """ - content = ( - "Higher priority switching and interruption in the children of a selector.\n" - ) - content += "\n" - content += "In this example the higher priority child is setup to fail initially,\n" - content += "falling back to the continually running second child. On the third\n" - content += ( - "tick, the first child succeeds and cancels the hitherto running child.\n" - ) - if py_trees.console.has_colours: - banner_line = console.green + "*" * 79 + "\n" + console.reset - s = banner_line - s += console.bold_white + "Selectors".center(79) + "\n" + console.reset - s += banner_line - s += "\n" - s += content - s += "\n" - s += banner_line - else: - s = content - return s - - -def epilog() -> typing.Optional[str]: - """ - Print a noodly epilog for --help. - - Returns: - the noodly message - """ - if py_trees.console.has_colours: - return ( - console.cyan - + "And his noodly appendage reached forth to tickle the blessed...\n" - + console.reset - ) - else: - return None - - -def command_line_argument_parser() -> argparse.ArgumentParser: - """ - Process command line arguments. - - Returns: - the argument parser - """ - parser = argparse.ArgumentParser( - description=description(), - epilog=epilog(), - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - group = parser.add_mutually_exclusive_group() - group.add_argument( - "-r", "--render", action="store_true", help="render dot tree to file" - ) - group.add_argument( - "--render-with-blackboard-variables", - action="store_true", - help="render dot tree to file with blackboard variables", - ) - group.add_argument( - "-i", - "--interactive", - action="store_true", - help="pause and wait for keypress at each tick", - ) - return parser - - -def wait(name, ticks) -> py_trees.behaviours.StatusQueue: - queue = [] - for i in range(0, ticks + 1): - queue.append(py_trees.common.Status.RUNNING) - return py_trees.behaviours.StatusQueue( - name=name, - queue=queue, - eventually=py_trees.common.Status.FAILURE, - ) - - -def create_root() -> py_trees.behaviour.Behaviour: - """ - Create the root behaviour and it's subtree. - - Returns: - the root behaviour - """ - # print(blackboard) - - root = py_trees.composites.Sequence(name="Sequence", memory=False) - - get_medicine = py_trees.composites.Sequence(name="Sequence", memory=False) - - take_elevator = py_trees.composites.Sequence(name="Sequence", memory=False) - - deliver_medicine = py_trees.composites.Sequence(name="Sequence", memory=False) - - root.add_children([get_medicine, take_elevator, deliver_medicine]) - - go_to = py_trees.behaviours.Success(name="Go To Medicine Cabinet") - unlock = py_trees.composites.Selector(name="Selector", memory=False) - take = py_trees.behaviours.Success(name="Take Medicine") - get_medicine.add_children([go_to, unlock, take]) - - unlock_cabinet = py_trees.behaviours.CheckBlackboardVariableValue( - name="Unlock Cabinet", - check=py_trees.common.ComparisonExpression( - variable="isCabinetUnlocked", value=True, operator=operator.eq - ), - ) - wait_cabinet = wait("Wait for at most 3 Ticks", 3) - supervisor = py_trees.behaviours.Success(name="Call Supervisor for Virtual Unlock") - unlock.add_children([unlock_cabinet, wait_cabinet, supervisor]) - - go_to_elevator = py_trees.behaviours.Success(name="Go to Elevator") - click_up_button = py_trees.behaviours.Success(name="Click Up Button") - wait_for_elevator = py_trees.composites.Sequence(name="Sequence", memory=False) - enter_elevator = py_trees.composites.Sequence(name="Sequence", memory=False) - - take_elevator.add_children( - [go_to_elevator, click_up_button, wait_for_elevator, enter_elevator] - ) - - is_elevator_open = py_trees.composites.Selector(name="Selector", memory=False) - has_space_in_elevator = py_trees.composites.Selector(name="Selector", memory=False) - can_enter_elevator = py_trees.composites.Selector(name="Selector", memory=False) - wait_for_elevator.add_children( - [is_elevator_open, has_space_in_elevator, can_enter_elevator] - ) - - elevator_open = py_trees.behaviours.CheckBlackboardVariableValue( - name="Is Elevator Open?", - check=py_trees.common.ComparisonExpression( - variable="isElevatorOpen", value=True, operator=operator.eq - ), - ) - wait_for_open_elevator = wait("Wait for at most 5 Ticks", 5) - supervisor = py_trees.behaviours.Success( - name="Call Supervisor to Virtually Call Elevator" - ) - is_elevator_open.add_children([elevator_open, wait_for_open_elevator, supervisor]) - - elevator_space = py_trees.behaviours.CheckBlackboardVariableValue( - name=">= 9ft^2 of space in the Elevator?", - check=py_trees.common.ComparisonExpression( - variable="elevatorHasSpace", value=True, operator=operator.eq - ), - ) - wait_for_space = py_trees.behaviours.Running(name="State 'I’ll wait'") - has_space_in_elevator.add_children([elevator_space, wait_for_space]) - - elevator_enter = py_trees.behaviours.CheckBlackboardVariableValue( - name="Ask to enter Elevator", - check=py_trees.common.ComparisonExpression( - variable="canEnterElevator", value=True, operator=operator.eq - ), - ) - wait_to_enter_elevator = py_trees.behaviours.Running(name="State I’ll wait") - can_enter_elevator.add_children([elevator_enter, wait_to_enter_elevator]) - - enter_elevator_task = py_trees.behaviours.Success(name="Enter Elevator") - hit_7th_floor_button = py_trees.behaviours.Success(name="Hit 7th Floor Button") - enter_elevator.add_children([enter_elevator_task, hit_7th_floor_button]) - - exit = py_trees.composites.Sequence(name="Sequence", memory=False) - exit_statement = py_trees.behaviours.Success(name="State: I'll exit now") - go_to_patient = py_trees.behaviours.Success(name="Go to Patient") - deliver = py_trees.behaviours.Success(name="Deliver medicine to Patient") - deliver_medicine.add_children([exit, exit_statement, go_to_patient, deliver]) - - get_to_7th = py_trees.composites.Selector(name="Selector", memory=False) - can_exit_now = py_trees.composites.Selector(name="Selector", memory=False) - - exit.add_children([get_to_7th, can_exit_now]) - - is_elevator_on_7th = py_trees.behaviours.CheckBlackboardVariableValue( - name="Is Elevator on 7th?", - check=py_trees.common.ComparisonExpression( - variable="isElevatorOn7th", value=True, operator=operator.eq - ), - ) - elevator_wait = py_trees.behaviours.Running(name="Wait") - get_to_7th.add_children([is_elevator_on_7th, elevator_wait]) - - are_people_on_elevator = py_trees.behaviours.CheckBlackboardVariableValue( - name="Is >= 1 Person in Elevator?", - check=py_trees.common.ComparisonExpression( - variable="canLeaveElevator", value=True, operator=operator.eq - ), - ) - exit_elevator_wait = wait("Wait for at most 3 Ticks", 3) - can_exit_now.add_children([are_people_on_elevator, exit_elevator_wait]) - - return root - - -############################################################################## -# Main -############################################################################## - - -def main() -> None: - """Entry point for the demo script.""" - args = command_line_argument_parser().parse_args() - print(description()) - # print(blackboard) - - root = create_root() - - #################### - # Rendering - #################### - if args.render: - py_trees.display.render_dot_tree(root) - sys.exit() - - #################### - # Execute - #################### - behaviour_tree = py_trees.trees.BehaviourTree(root) - behaviour_tree.visitors.append(py_trees.visitors.DebugVisitor()) - snapshot_visitor = py_trees.visitors.SnapshotVisitor() - behaviour_tree.visitors.append(snapshot_visitor) - print(py_trees.display.unicode_tree(root=root, show_status=True)) - tree_success = False - i = 0 - while not tree_success: - try: - time.sleep(1.0) - print("\n--------- Tick {0} ---------\n".format(i)) - if i == 3: - print("Cabinet is now unlocked\n") - blackboard.isCabinetUnlocked = True - if i == 5: - # print("Ball is now grasped\n") - blackboard.isElevatorOpen = True - if i == 7: - blackboard.elevatorHasSpace = True - if i == 9: - blackboard.canEnterElevator = True - if i == 11: - blackboard.isElevatorOn7th = True - if i == 13: - blackboard.canLeaveElevator = True - - behaviour_tree.tick() - print("\n") - print(py_trees.display.unicode_tree(root=root, show_status=True)) - # ascii_tree = py_trees.display.ascii_tree( - # behaviour_tree.root) - # print(ascii_tree) - if behaviour_tree.root.status == py_trees.common.Status.SUCCESS: - tree_success = True - i += 1 - except KeyboardInterrupt: - break - print("\n") - - -if __name__ == "__main__": - main() diff --git a/norms/deliver_medicine_with_deontic.py b/norms/deliver_medicine_with_deontic.py deleted file mode 100644 index 87fce26b..00000000 --- a/norms/deliver_medicine_with_deontic.py +++ /dev/null @@ -1,375 +0,0 @@ -#!/usr/bin/env python -# -# License: BSD -# https://raw.githubusercontent.com/splintered-reality/py_trees/devel/LICENSE -# -############################################################################## -# Documentation -############################################################################## - -""" -A py_trees demo. - -.. argparse:: - :module: py_trees.demos.selector - :func: command_line_argument_parser - :prog: py-trees-demo-selector - -.. graphviz:: dot/demo-selector.dot - -.. image:: images/selector.gif - -""" -############################################################################## -# Imports -############################################################################## - -import argparse -import sys -import time -import typing -import operator - -import py_trees -import py_trees.console as console - -############################################################################## -# Classes -############################################################################## - -py_trees.logging.level = py_trees.logging.Level.INFO -blackboard = py_trees.blackboard.Client(name="Client") -blackboard.register_key(key="isCabinetUnlocked", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="isElevatorOpen", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="elevatorHasSpace", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="canEnterElevator", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="isElevatorOn7th", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="canLeaveElevator", access=py_trees.common.Access.WRITE) -blackboard.isCabinetUnlocked = False -blackboard.isElevatorOpen = False -blackboard.elevatorHasSpace = False -blackboard.canEnterElevator = False -blackboard.isElevatorOn7th = False -blackboard.canLeaveElevator = False - - -def description() -> str: - """ - Print description and usage information about the program. - - Returns: - the program description string - """ - content = ( - "This is a demonstration of a medicine delivery robot decision tree process.\n" - ) - content += "The robot needs to acquire the medicine, take the elevator to the 7th floor, and then deliver it to the patient.\n" - content += "There are failing nodes within the tree that asks the human for input to demonstrate altering the tree between ticks.\n" - content += "\n" - content += "Key:\n" - content += ( - "'--> ' - A leaf node that can return success, failure, or running when ran.\n" - ) - content += "'\{-\} Sequence' - A sequential operator with children that will be run in sequential order.\n" - content += "'\{o\} Selector' - A fallback operator with children that will be run one at a time if the previous child fails.\n" - content += "'-' - A node that has not been ran in the current tick yet.\n" - content += "'✕' - A node that has ran and failed in the current tick.\n" - content += "'✓' - A node that has ran and succeeded in the current tick.\n" - content += "'*' - A node that has ran and returned running in the current tick.\n" - - if py_trees.console.has_colours: - banner_line = console.green + "*" * 79 + "\n" + console.reset - s = banner_line - s += console.bold_white + "Selectors".center(79) + "\n" + console.reset - s += banner_line - s += "\n" - s += content - s += "\n" - s += banner_line - else: - s = content - return s - - -def epilog() -> typing.Optional[str]: - """ - Print a noodly epilog for --help. - - Returns: - the noodly message - """ - if py_trees.console.has_colours: - return ( - console.cyan - + "And his noodly appendage reached forth to tickle the blessed...\n" - + console.reset - ) - else: - return None - - -def command_line_argument_parser() -> argparse.ArgumentParser: - """ - Process command line arguments. - - Returns: - the argument parser - """ - parser = argparse.ArgumentParser( - description=description(), - epilog=epilog(), - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - group = parser.add_mutually_exclusive_group() - group.add_argument( - "-r", "--render", action="store_true", help="render dot tree to file" - ) - group.add_argument( - "--render-with-blackboard-variables", - action="store_true", - help="render dot tree to file with blackboard variables", - ) - group.add_argument( - "-i", - "--interactive", - action="store_true", - help="pause and wait for keypress at each tick", - ) - return parser - - -def wait(name, ticks) -> py_trees.behaviours.StatusQueue: - queue = [] - for i in range(0, ticks + 1): - queue.append(py_trees.common.Status.RUNNING) - return py_trees.behaviours.StatusQueue( - name=name, - queue=queue, - eventually=py_trees.common.Status.FAILURE, - ) - - -def create_root() -> py_trees.behaviour.Behaviour: - """ - Create the root behaviour and it's subtree. - - Returns: - the root behaviour - """ - # print(blackboard) - - root = py_trees.composites.Sequence(name="Sequence", memory=False) - - get_medicine = py_trees.composites.Sequence(name="Sequence", memory=False) - - take_elevator = py_trees.composites.Sequence(name="Sequence", memory=False) - - deliver_medicine = py_trees.composites.Sequence(name="Sequence", memory=False) - - root.add_children([get_medicine, take_elevator, deliver_medicine]) - - go_to = py_trees.behaviours.Success(name="Go To Medicine Cabinet") - unlock = py_trees.composites.Selector(name="Selector", memory=False) - take = py_trees.behaviours.Success(name="Pickup Medicine") - get_medicine.add_children([go_to, unlock, take]) - - unlock_cabinet = py_trees.behaviours.CheckBlackboardVariableValue( - name="Unlock Cabinet", - check=py_trees.common.ComparisonExpression( - variable="isCabinetUnlocked", value=True, operator=operator.eq - ), - ) - - wait_cabinet = py_trees.behaviours.Running(name="Wait") - supervisor = py_trees.behaviours.Success(name="Call Supervisor for Virtual Unlock") - unlock.add_children([unlock_cabinet, wait_cabinet, supervisor]) - - go_to_elevator = py_trees.behaviours.Success(name="Go to Elevator") - click_up_button = py_trees.behaviours.Success(name="Click Up Button") - wait_for_elevator = py_trees.composites.Sequence(name="Sequence", memory=False) - enter_elevator = py_trees.composites.Sequence(name="Sequence", memory=False) - - take_elevator.add_children( - [go_to_elevator, click_up_button, wait_for_elevator, enter_elevator] - ) - - is_elevator_open = py_trees.composites.Selector(name="Selector", memory=False) - has_space_in_elevator = py_trees.composites.Selector(name="Selector", memory=False) - can_enter_elevator = py_trees.composites.Selector(name="Selector", memory=False) - wait_for_elevator.add_children( - [is_elevator_open, has_space_in_elevator, can_enter_elevator] - ) - - elevator_open = py_trees.behaviours.CheckBlackboardVariableValue( - name="Is Elevator Open?", - check=py_trees.common.ComparisonExpression( - variable="isElevatorOpen", value=True, operator=operator.eq - ), - ) - wait_for_open_elevator = py_trees.behaviours.Running(name="Wait") - supervisor = py_trees.behaviours.Success( - name="Call Supervisor to Virtually Call Elevator" - ) - is_elevator_open.add_children([elevator_open, wait_for_open_elevator, supervisor]) - # is_elevator_open.add_children([elevator_open, wait_for_open_elevator]) - - elevator_space = py_trees.behaviours.CheckBlackboardVariableValue( - name=">= 9ft^2 of space in the Elevator?", - check=py_trees.common.ComparisonExpression( - variable="elevatorHasSpace", value=True, operator=operator.eq - ), - ) - wait_for_space = py_trees.behaviours.Running(name="State I’ll wait") - has_space_in_elevator.add_children([elevator_space, wait_for_space]) - - elevator_enter = py_trees.behaviours.CheckBlackboardVariableValue( - name="Ask to enter Elevator", - check=py_trees.common.ComparisonExpression( - variable="canEnterElevator", value=True, operator=operator.eq - ), - ) - wait_to_enter_elevator = py_trees.behaviours.Running(name="State I’ll wait") - can_enter_elevator.add_children([elevator_enter, wait_to_enter_elevator]) - - enter_elevator_task = py_trees.behaviours.Success(name="Enter Elevator") - hit_7th_floor_button = py_trees.behaviours.Success(name="Hit 7th Floor Button") - enter_elevator.add_children([enter_elevator_task, hit_7th_floor_button]) - - exit = py_trees.composites.Sequence(name="Sequence", memory=False) - exit_statement = py_trees.behaviours.Success(name="State: I'll exit now") - go_to_patient = py_trees.behaviours.Success(name="Go to Patient") - deliver = py_trees.behaviours.Success(name="Deliver medicine to Patient") - deliver_medicine.add_children([exit, exit_statement, go_to_patient, deliver]) - - get_to_7th = py_trees.composites.Selector(name="Selector", memory=False) - can_exit_now = py_trees.composites.Selector(name="Selector", memory=False) - - exit.add_children([get_to_7th, can_exit_now]) - - is_elevator_on_7th = py_trees.behaviours.CheckBlackboardVariableValue( - name="Is Elevator on 7th?", - check=py_trees.common.ComparisonExpression( - variable="isElevatorOn7th", value=True, operator=operator.eq - ), - ) - elevator_wait = py_trees.behaviours.Running(name="Wait") - get_to_7th.add_children([is_elevator_on_7th, elevator_wait]) - - are_people_on_elevator = py_trees.behaviours.CheckBlackboardVariableValue( - name="Is >= 1 Person in Elevator?", - check=py_trees.common.ComparisonExpression( - variable="canLeaveElevator", value=True, operator=operator.eq - ), - ) - exit_elevator_wait = wait("Wait for at most 3 Ticks", 3) - can_exit_now.add_children([are_people_on_elevator, exit_elevator_wait]) - - return root - - -############################################################################## -# Main -############################################################################## - - -def main() -> None: - """Entry point for the demo script.""" - args = command_line_argument_parser().parse_args() - print(description()) - # print(blackboard) - - root = create_root() - - #################### - # Rendering - #################### - if args.render: - py_trees.display.render_dot_tree(root) - sys.exit() - - #################### - # Execute - #################### - behaviour_tree = py_trees.trees.BehaviourTree(root) - behaviour_tree.visitors.append(py_trees.visitors.DebugVisitor()) - snapshot_visitor = py_trees.visitors.SnapshotVisitor() - behaviour_tree.visitors.append(snapshot_visitor) - print(py_trees.display.unicode_tree(root=root, show_status=True)) - tree_success = False - i = 0 - time.sleep(1) - while not tree_success: - try: - time.sleep(0.5) - print("\n--------- Tick {0} ---------\n".format(i)) - # if i == 3: - # print("Cabinet is now unlocked\n") - # blackboard.isCabinetUnlocked = True - # if i == 5: - # blackboard.isElevatorOpen = True - if i == 7: - blackboard.elevatorHasSpace = True - if i == 9: - blackboard.canEnterElevator = True - if i == 11: - blackboard.isElevatorOn7th = True - if i == 13: - blackboard.canLeaveElevator = True - # if i == 15: - # blackboard.isElevatorOpen = True - - behaviour_tree.tick() - print("\n") - print(py_trees.display.unicode_tree(root=root, show_status=True)) - # ascii_tree = py_trees.display.ascii_tree( - # behaviour_tree.root) - # print(ascii_tree) - if behaviour_tree.root.status == py_trees.common.Status.SUCCESS: - tree_success = True - elif behaviour_tree.root.status == py_trees.common.Status.FAILURE: - callSupervisor = input( - "Should the robot call it's supervisor for help on the next tick for the failed task? (1 for Yes 0 for No)\n" - ) - if callSupervisor == "1": - node = behaviour_tree.root - found = False - while not found: - for n in node.children: - if n.status == py_trees.common.Status.FAILURE: - node = n - break - if type(node) == py_trees.composites.Selector: - found = True - # supervisor = py_trees.behaviours.Success(name="Call Supervisor to " + node.name) - supervisor = py_trees.behaviours.Success( - name="Call Supervisor for help" - ) - node.add_children([supervisor]) - elif behaviour_tree.root.status == py_trees.common.Status.RUNNING: - reduceDeontic = input( - "Should the robot reduce the deontic force of the current running task to be less than the next task? (1 for Yes 0 for No)\n" - ) - if reduceDeontic == "1": - node = behaviour_tree.root - found = False - while not found: - for n in node.children: - if n.status == py_trees.common.Status.RUNNING: - node = n - break - if type(node) == py_trees.behaviours.Running: - found = True - # supervisor = py_trees.behaviours.Success(name="Call Supervisor to " + node.name) - parent = node.parent - parent.remove_child(node) - # supervisor = py_trees.behaviours.Success(name="Call Supervisor for help") - parent.add_children([node]) - - i += 1 - except KeyboardInterrupt: - break - print("\n") - - -if __name__ == "__main__": - main() diff --git a/norms/deliver_medicine_with_input.py b/norms/deliver_medicine_with_input.py deleted file mode 100644 index 3f7b0b55..00000000 --- a/norms/deliver_medicine_with_input.py +++ /dev/null @@ -1,353 +0,0 @@ -#!/usr/bin/env python -# -# License: BSD -# https://raw.githubusercontent.com/splintered-reality/py_trees/devel/LICENSE -# -############################################################################## -# Documentation -############################################################################## - -""" -A py_trees demo. - -.. argparse:: - :module: py_trees.demos.selector - :func: command_line_argument_parser - :prog: py-trees-demo-selector - -.. graphviz:: dot/demo-selector.dot - -.. image:: images/selector.gif - -""" -############################################################################## -# Imports -############################################################################## - -import argparse -import sys -import time -import typing -import operator - -import py_trees -import py_trees.console as console - -############################################################################## -# Classes -############################################################################## - -py_trees.logging.level = py_trees.logging.Level.INFO -blackboard = py_trees.blackboard.Client(name="Client") -blackboard.register_key(key="isCabinetUnlocked", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="isElevatorOpen", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="elevatorHasSpace", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="canEnterElevator", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="isElevatorOn7th", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="canLeaveElevator", access=py_trees.common.Access.WRITE) -blackboard.isCabinetUnlocked = False -blackboard.isElevatorOpen = False -blackboard.elevatorHasSpace = False -blackboard.canEnterElevator = False -blackboard.isElevatorOn7th = False -blackboard.canLeaveElevator = False - - -def description() -> str: - """ - Print description and usage information about the program. - - Returns: - the program description string - """ - content = ( - "This is a demonstration of a medicine delivery robot decision tree process.\n" - ) - content += "The robot needs to acquire the medicine, take the elevator to the 7th floor, and then deliver it to the patient.\n" - content += "There are failing nodes within the tree that asks the human for input to demonstrate altering the tree between ticks.\n" - content += "\n" - content += "Key:\n" - content += ( - "'--> ' - A leaf node that can return success, failure, or running when ran.\n" - ) - content += "'\{-\} Sequence' - A sequential operator with children that will be run in sequential order.\n" - content += "'\{o\} Selector' - A fallback operator with children that will be run one at a time if the previous child fails.\n" - content += "'-' - A node that has not been ran in the current tick yet.\n" - content += "'✕' - A node that has ran and failed in the current tick.\n" - content += "'✓' - A node that has ran and succeeded in the current tick.\n" - content += "'*' - A node that has ran and returned running in the current tick.\n" - - if py_trees.console.has_colours: - banner_line = console.green + "*" * 79 + "\n" + console.reset - s = banner_line - s += console.bold_white + "Selectors".center(79) + "\n" + console.reset - s += banner_line - s += "\n" - s += content - s += "\n" - s += banner_line - else: - s = content - return s - - -def epilog() -> typing.Optional[str]: - """ - Print a noodly epilog for --help. - - Returns: - the noodly message - """ - if py_trees.console.has_colours: - return ( - console.cyan - + "And his noodly appendage reached forth to tickle the blessed...\n" - + console.reset - ) - else: - return None - - -def command_line_argument_parser() -> argparse.ArgumentParser: - """ - Process command line arguments. - - Returns: - the argument parser - """ - parser = argparse.ArgumentParser( - description=description(), - epilog=epilog(), - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - group = parser.add_mutually_exclusive_group() - group.add_argument( - "-r", "--render", action="store_true", help="render dot tree to file" - ) - group.add_argument( - "--render-with-blackboard-variables", - action="store_true", - help="render dot tree to file with blackboard variables", - ) - group.add_argument( - "-i", - "--interactive", - action="store_true", - help="pause and wait for keypress at each tick", - ) - return parser - - -def wait(name, ticks) -> py_trees.behaviours.StatusQueue: - queue = [] - for i in range(0, ticks + 1): - queue.append(py_trees.common.Status.RUNNING) - return py_trees.behaviours.StatusQueue( - name=name, - queue=queue, - eventually=py_trees.common.Status.FAILURE, - ) - - -def create_root() -> py_trees.behaviour.Behaviour: - """ - Create the root behaviour and it's subtree. - - Returns: - the root behaviour - """ - # print(blackboard) - - root = py_trees.composites.Sequence(name="Sequence", memory=False) - - get_medicine = py_trees.composites.Sequence(name="Sequence", memory=False) - - take_elevator = py_trees.composites.Sequence(name="Sequence", memory=False) - - deliver_medicine = py_trees.composites.Sequence(name="Sequence", memory=False) - - root.add_children([get_medicine, take_elevator, deliver_medicine]) - - go_to = py_trees.behaviours.Success(name="Go To Medicine Cabinet") - unlock = py_trees.composites.Selector(name="Selector", memory=False) - take = py_trees.behaviours.Success(name="Pickup Medicine") - get_medicine.add_children([go_to, unlock, take]) - - unlock_cabinet = py_trees.behaviours.CheckBlackboardVariableValue( - name="Unlock Cabinet", - check=py_trees.common.ComparisonExpression( - variable="isCabinetUnlocked", value=True, operator=operator.eq - ), - ) - wait_cabinet = wait("Wait for at most 3 Ticks", 3) - # supervisor = py_trees.behaviours.Success(name="Call Supervisor for Virtual Unlock") - # unlock.add_children([unlock_cabinet, wait_cabinet, supervisor]) - unlock.add_children([unlock_cabinet, wait_cabinet]) - - go_to_elevator = py_trees.behaviours.Success(name="Go to Elevator") - click_up_button = py_trees.behaviours.Success(name="Click Up Button") - wait_for_elevator = py_trees.composites.Sequence(name="Sequence", memory=False) - enter_elevator = py_trees.composites.Sequence(name="Sequence", memory=False) - - take_elevator.add_children( - [go_to_elevator, click_up_button, wait_for_elevator, enter_elevator] - ) - - is_elevator_open = py_trees.composites.Selector(name="Selector", memory=False) - has_space_in_elevator = py_trees.composites.Selector(name="Selector", memory=False) - can_enter_elevator = py_trees.composites.Selector(name="Selector", memory=False) - wait_for_elevator.add_children( - [is_elevator_open, has_space_in_elevator, can_enter_elevator] - ) - - elevator_open = py_trees.behaviours.CheckBlackboardVariableValue( - name="Is Elevator Open?", - check=py_trees.common.ComparisonExpression( - variable="isElevatorOpen", value=True, operator=operator.eq - ), - ) - wait_for_open_elevator = wait("Wait for at most 5 Ticks", 5) - # supervisor = py_trees.behaviours.Success(name="Call Supervisor to Virtually Call Elevator") - # is_elevator_open.add_children([elevator_open, wait_for_open_elevator, supervisor]) - is_elevator_open.add_children([elevator_open, wait_for_open_elevator]) - - elevator_space = py_trees.behaviours.CheckBlackboardVariableValue( - name=">= 9ft^2 of space in the Elevator?", - check=py_trees.common.ComparisonExpression( - variable="elevatorHasSpace", value=True, operator=operator.eq - ), - ) - wait_for_space = py_trees.behaviours.Running(name="State 'I’ll wait'") - has_space_in_elevator.add_children([elevator_space, wait_for_space]) - - elevator_enter = py_trees.behaviours.CheckBlackboardVariableValue( - name="Ask to enter Elevator", - check=py_trees.common.ComparisonExpression( - variable="canEnterElevator", value=True, operator=operator.eq - ), - ) - wait_to_enter_elevator = py_trees.behaviours.Running(name="State I’ll wait") - can_enter_elevator.add_children([elevator_enter, wait_to_enter_elevator]) - - enter_elevator_task = py_trees.behaviours.Success(name="Enter Elevator") - hit_7th_floor_button = py_trees.behaviours.Success(name="Hit 7th Floor Button") - enter_elevator.add_children([enter_elevator_task, hit_7th_floor_button]) - - exit = py_trees.composites.Sequence(name="Sequence", memory=False) - exit_statement = py_trees.behaviours.Success(name="State: I'll exit now") - go_to_patient = py_trees.behaviours.Success(name="Go to Patient") - deliver = py_trees.behaviours.Success(name="Deliver medicine to Patient") - deliver_medicine.add_children([exit, exit_statement, go_to_patient, deliver]) - - get_to_7th = py_trees.composites.Selector(name="Selector", memory=False) - can_exit_now = py_trees.composites.Selector(name="Selector", memory=False) - - exit.add_children([get_to_7th, can_exit_now]) - - is_elevator_on_7th = py_trees.behaviours.CheckBlackboardVariableValue( - name="Is Elevator on 7th?", - check=py_trees.common.ComparisonExpression( - variable="isElevatorOn7th", value=True, operator=operator.eq - ), - ) - elevator_wait = py_trees.behaviours.Running(name="Wait") - get_to_7th.add_children([is_elevator_on_7th, elevator_wait]) - - are_people_on_elevator = py_trees.behaviours.CheckBlackboardVariableValue( - name="Is >= 1 Person in Elevator?", - check=py_trees.common.ComparisonExpression( - variable="canLeaveElevator", value=True, operator=operator.eq - ), - ) - exit_elevator_wait = wait("Wait for at most 3 Ticks", 3) - can_exit_now.add_children([are_people_on_elevator, exit_elevator_wait]) - - return root - - -############################################################################## -# Main -############################################################################## - - -def main() -> None: - """Entry point for the demo script.""" - args = command_line_argument_parser().parse_args() - print(description()) - # print(blackboard) - - root = create_root() - - #################### - # Rendering - #################### - if args.render: - py_trees.display.render_dot_tree(root) - sys.exit() - - #################### - # Execute - #################### - behaviour_tree = py_trees.trees.BehaviourTree(root) - behaviour_tree.visitors.append(py_trees.visitors.DebugVisitor()) - snapshot_visitor = py_trees.visitors.SnapshotVisitor() - behaviour_tree.visitors.append(snapshot_visitor) - print(py_trees.display.unicode_tree(root=root, show_status=True)) - tree_success = False - i = 0 - while not tree_success: - try: - time.sleep(0.5) - print("\n--------- Tick {0} ---------\n".format(i)) - # if i == 3: - # print("Cabinet is now unlocked\n") - # blackboard.isCabinetUnlocked = True - # if i == 5: - # blackboard.isElevatorOpen = True - if i == 7: - blackboard.elevatorHasSpace = True - if i == 9: - blackboard.canEnterElevator = True - if i == 11: - blackboard.isElevatorOn7th = True - if i == 13: - blackboard.canLeaveElevator = True - # if i == 15: - # blackboard.isElevatorOpen = True - - behaviour_tree.tick() - print("\n") - print(py_trees.display.unicode_tree(root=root, show_status=True)) - # ascii_tree = py_trees.display.ascii_tree( - # behaviour_tree.root) - # print(ascii_tree) - if behaviour_tree.root.status == py_trees.common.Status.SUCCESS: - tree_success = True - elif behaviour_tree.root.status == py_trees.common.Status.FAILURE: - callSupervisor = input( - "Should the robot call it's supervisor for help on the next tick for the failed task? (1 for Yes 0 for No)\n" - ) - if callSupervisor == "1": - node = behaviour_tree.root - found = False - while not found: - for n in node.children: - if n.status == py_trees.common.Status.FAILURE: - node = n - break - if type(node) == py_trees.composites.Selector: - found = True - # supervisor = py_trees.behaviours.Success(name="Call Supervisor to " + node.name) - supervisor = py_trees.behaviours.Success( - name="Call Supervisor for help" - ) - node.add_children([supervisor]) - - i += 1 - except KeyboardInterrupt: - break - print("\n") - - -if __name__ == "__main__": - main() diff --git a/norms/deliver_medicine_with_reorder.py b/norms/deliver_medicine_with_reorder.py deleted file mode 100644 index 6aa74c6c..00000000 --- a/norms/deliver_medicine_with_reorder.py +++ /dev/null @@ -1,417 +0,0 @@ -#!/usr/bin/env python -# -# License: BSD -# https://raw.githubusercontent.com/splintered-reality/py_trees/devel/LICENSE -# -############################################################################## -# Documentation -############################################################################## - -""" -A py_trees demo. - -.. argparse:: - :module: py_trees.demos.selector - :func: command_line_argument_parser - :prog: py-trees-demo-selector - -.. graphviz:: dot/demo-selector.dot - -.. image:: images/selector.gif - -""" -############################################################################## -# Imports -############################################################################## - -import argparse -import sys -import time -import typing -import operator - -import py_trees -import py_trees.console as console - -############################################################################## -# Classes -############################################################################## - -py_trees.logging.level = py_trees.logging.Level.INFO -blackboard = py_trees.blackboard.Client(name="Client") -blackboard.register_key(key="isCabinetUnlocked", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="isElevatorOpen", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="elevatorHasSpace", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="canEnterElevator", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="isElevatorOn7th", access=py_trees.common.Access.WRITE) -blackboard.register_key(key="canLeaveElevator", access=py_trees.common.Access.WRITE) -blackboard.isCabinetUnlocked = False -blackboard.isElevatorOpen = False -blackboard.elevatorHasSpace = False -blackboard.canEnterElevator = False -blackboard.isElevatorOn7th = False -blackboard.canLeaveElevator = False -# print(blackboard) -# exit() - - -def description() -> str: - """ - Print description and usage information about the program. - - Returns: - the program description string - """ - content = ( - "This is a demonstration of a medicine delivery robot decision tree process.\n" - ) - content += "The robot needs to acquire the medicine, take the elevator to the 7th floor, and then deliver it to the patient.\n" - content += "There are failing nodes within the tree that asks the human for input to demonstrate altering the tree between ticks.\n" - content += "\n" - content += "Key:\n" - content += ( - "'--> ' - A leaf node that can return success, failure, or running when ran.\n" - ) - content += "'\{-\} Sequence' - A sequential operator with children that will be run in sequential order.\n" - content += "'\{o\} Selector' - A fallback operator with children that will be run one at a time if the previous child fails.\n" - content += "'-' - A node that has not been ran in the current tick yet.\n" - content += "'✕' - A node that has ran and failed in the current tick.\n" - content += "'✓' - A node that has ran and succeeded in the current tick.\n" - content += "'*' - A node that has ran and returned running in the current tick.\n" - - if py_trees.console.has_colours: - banner_line = console.green + "*" * 79 + "\n" + console.reset - s = banner_line - s += console.bold_white + "Norms".center(79) + "\n" + console.reset - s += banner_line - s += "\n" - s += content - s += "\n" - s += banner_line - else: - s = content - return s - - -def epilog() -> typing.Optional[str]: - """ - Print a noodly epilog for --help. - - Returns: - the noodly message - """ - if py_trees.console.has_colours: - return ( - console.cyan - + "And his noodly appendage reached forth to tickle the blessed...\n" - + console.reset - ) - else: - return None - - -def command_line_argument_parser() -> argparse.ArgumentParser: - """ - Process command line arguments. - - Returns: - the argument parser - """ - parser = argparse.ArgumentParser( - description=description(), - epilog=epilog(), - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - group = parser.add_mutually_exclusive_group() - group.add_argument( - "-r", "--render", action="store_true", help="render dot tree to file" - ) - group.add_argument( - "--render-with-blackboard-variables", - action="store_true", - help="render dot tree to file with blackboard variables", - ) - group.add_argument( - "-i", - "--interactive", - action="store_true", - help="pause and wait for keypress at each tick", - ) - return parser - - -def wait(name, ticks) -> py_trees.behaviours.StatusQueue: - queue = [] - for i in range(0, ticks + 1): - queue.append(py_trees.common.Status.RUNNING) - return py_trees.behaviours.StatusQueue( - name=name, - queue=queue, - eventually=py_trees.common.Status.FAILURE, - ) - - -def create_root() -> py_trees.behaviour.Behaviour: - """ - Create the root behaviour and it's subtree. - - Returns: - the root behaviour - """ - # print(blackboard) - - root = py_trees.composites.Sequence(name="Sequence", memory=False) - - get_medicine = py_trees.composites.Sequence(name="Sequence", memory=False) - - take_elevator = py_trees.composites.Sequence(name="Sequence", memory=False) - - deliver_medicine = py_trees.composites.Sequence(name="Sequence", memory=False) - - root.add_children([get_medicine, take_elevator, deliver_medicine]) - - go_to = py_trees.behaviours.Success(name="Go To Medicine Cabinet") - unlock = py_trees.composites.Selector(name="Selector", memory=False) - take = py_trees.behaviours.Success(name="Pickup Medicine") - get_medicine.add_children([go_to, unlock, take]) - - unlock_cabinet = py_trees.behaviours.CheckBlackboardVariableValue( - name="Unlock Cabinet", - check=py_trees.common.ComparisonExpression( - variable="isCabinetUnlocked", value=True, operator=operator.eq - ), - ) - - wait_cabinet = py_trees.behaviours.Running(name="Wait") - supervisor = py_trees.behaviours.Success(name="Call Supervisor for Virtual Unlock") - unlock.add_children([unlock_cabinet, wait_cabinet, supervisor]) - - go_to_elevator = py_trees.behaviours.Success(name="Go to Elevator") - click_up_button = py_trees.behaviours.Success(name="Click Up Button") - wait_for_elevator = py_trees.composites.Sequence(name="Sequence", memory=False) - enter_elevator = py_trees.composites.Sequence(name="Sequence", memory=False) - - take_elevator.add_children( - [go_to_elevator, click_up_button, wait_for_elevator, enter_elevator] - ) - - is_elevator_open = py_trees.composites.Selector(name="Selector", memory=False) - has_space_in_elevator = py_trees.composites.Selector(name="Selector", memory=False) - can_enter_elevator = py_trees.composites.Selector(name="Selector", memory=False) - wait_for_elevator.add_children( - [is_elevator_open, has_space_in_elevator, can_enter_elevator] - ) - - elevator_open = py_trees.behaviours.CheckBlackboardVariableValue( - name="Is Elevator Open?", - check=py_trees.common.ComparisonExpression( - variable="isElevatorOpen", value=True, operator=operator.eq - ), - ) - wait_for_open_elevator = py_trees.behaviours.Running(name="Wait") - button_again = py_trees.behaviours.Running(name="Press button again") - supervisor = py_trees.behaviours.Success( - name="Call Supervisor to Virtually Call Elevator" - ) - is_elevator_open.add_children( - [ - elevator_open, - wait_for_open_elevator, - button_again, - supervisor, - ] - ) - # is_elevator_open.add_children([elevator_open, wait_for_open_elevator]) - - elevator_space = py_trees.behaviours.CheckBlackboardVariableValue( - name=">= 9ft^2 of space in the Elevator?", - check=py_trees.common.ComparisonExpression( - variable="elevatorHasSpace", value=True, operator=operator.eq - ), - ) - wait_for_space = py_trees.behaviours.Running(name="State I’ll wait") - has_space_in_elevator.add_children([elevator_space, wait_for_space]) - - elevator_enter = py_trees.behaviours.CheckBlackboardVariableValue( - name="Ask to enter Elevator", - check=py_trees.common.ComparisonExpression( - variable="canEnterElevator", value=True, operator=operator.eq - ), - ) - wait_to_enter_elevator = py_trees.behaviours.Running(name="State I’ll wait") - can_enter_elevator.add_children([elevator_enter, wait_to_enter_elevator]) - - enter_elevator_task = py_trees.behaviours.Success(name="Enter Elevator") - hit_7th_floor_button = py_trees.behaviours.Success(name="Hit 7th Floor Button") - enter_elevator.add_children([enter_elevator_task, hit_7th_floor_button]) - - exit = py_trees.composites.Sequence(name="Sequence", memory=False) - exit_statement = py_trees.behaviours.Success(name="State: I'll exit now") - go_to_patient = py_trees.behaviours.Success(name="Go to Patient") - deliver = py_trees.behaviours.Success(name="Deliver medicine to Patient") - deliver_medicine.add_children([exit, exit_statement, go_to_patient, deliver]) - - get_to_7th = py_trees.composites.Selector(name="Selector", memory=False) - can_exit_now = py_trees.composites.Selector(name="Selector", memory=False) - - exit.add_children([get_to_7th, can_exit_now]) - - is_elevator_on_7th = py_trees.behaviours.CheckBlackboardVariableValue( - name="Is Elevator on 7th?", - check=py_trees.common.ComparisonExpression( - variable="isElevatorOn7th", value=True, operator=operator.eq - ), - ) - elevator_wait = py_trees.behaviours.Running(name="Wait") - get_to_7th.add_children([is_elevator_on_7th, elevator_wait]) - - are_people_on_elevator = py_trees.behaviours.CheckBlackboardVariableValue( - name="Is >= 1 Person in Elevator?", - check=py_trees.common.ComparisonExpression( - variable="canLeaveElevator", value=True, operator=operator.eq - ), - ) - exit_elevator_wait = wait("Wait for at most 3 Ticks", 3) - can_exit_now.add_children([are_people_on_elevator, exit_elevator_wait]) - - return root - - -############################################################################## -# Main -############################################################################## - - -def main() -> None: - """Entry point for the demo script.""" - args = command_line_argument_parser().parse_args() - print(description()) - # print(blackboard) - - root = create_root() - - #################### - # Rendering - #################### - if args.render: - py_trees.display.render_dot_tree(root) - sys.exit() - - #################### - # Execute - #################### - behaviour_tree = py_trees.trees.BehaviourTree(root) - behaviour_tree.visitors.append(py_trees.visitors.DebugVisitor()) - snapshot_visitor = py_trees.visitors.SnapshotVisitor() - behaviour_tree.visitors.append(snapshot_visitor) - print(py_trees.display.unicode_tree(root=root, show_status=True)) - tree_success = False - i = 0 - time.sleep(1) - while not tree_success: - try: - time.sleep(0.5) - print("\n--------- Tick {0} ---------\n".format(i)) - # if i == 3: - # print("Cabinet is now unlocked\n") - # blackboard.isCabinetUnlocked = True - # if i == 5: - # blackboard.isElevatorOpen = True - if i == 7: - blackboard.elevatorHasSpace = True - if i == 9: - blackboard.canEnterElevator = True - if i == 11: - blackboard.isElevatorOn7th = True - if i == 13: - blackboard.canLeaveElevator = True - # if i == 15: - # blackboard.isElevatorOpen = True - - behaviour_tree.tick() - print("\n") - print(py_trees.display.unicode_tree(root=root, show_status=True)) - # ascii_tree = py_trees.display.ascii_tree( - # behaviour_tree.root) - # print(ascii_tree) - if behaviour_tree.root.status == py_trees.common.Status.SUCCESS: - tree_success = True - elif behaviour_tree.root.status == py_trees.common.Status.FAILURE: - callSupervisor = input( - "Should the robot call it's supervisor for help on the next tick for the failed task? (1 for Yes 0 for No)\n" - ) - if callSupervisor == "1": - node = behaviour_tree.root - found = False - while not found: - for n in node.children: - if n.status == py_trees.common.Status.FAILURE: - node = n - break - if type(node) == py_trees.composites.Selector: - found = True - # supervisor = py_trees.behaviours.Success(name="Call Supervisor to " + node.name) - supervisor = py_trees.behaviours.Success( - name="Call Supervisor for help" - ) - node.add_children([supervisor]) - elif behaviour_tree.root.status == py_trees.common.Status.RUNNING: - reorder = input( - "Would you like to reorder the children of the Selector that is running? (1 for Yes 0 for No)\n" - ) - if reorder == "1": - node = behaviour_tree.root - found = False - while not found: - for n in node.children: - if n.status == py_trees.common.Status.RUNNING: - node = n - break - if type(node) == py_trees.behaviours.Running: - found = True - # supervisor = py_trees.behaviours.Success(name="Call Supervisor to " + node.name) - parent = node.parent - stored_children = [] - for i in range(0, len(parent.children)): - child = parent.children[0] - print("Node", i, "-", child.name) - stored_children.append(child) - parent.remove_child(child) - print("\n") - stored = [] - for i in range(0, len(stored_children)): - valid_input = False - while not valid_input: - user_input = input( - "Which node (input number e.g. 0) would you like in position " - + str(i) - + "? " - ) - try: - node_number = int(user_input) - except ValueError: - print( - "Please enter an integer equal to one of the node numbers." - ) - continue - if node_number >= len(stored_children) or node_number < 0: - print( - "Please enter an integer equal to one of the node numbers." - ) - elif node_number in stored: - print( - "Please enter a node that hasn't been used already." - ) - else: - stored.append(node_number) - valid_input = True - parent.add_child(stored_children[node_number]) - - i += 1 - except KeyboardInterrupt: - break - print("\n") - - -if __name__ == "__main__": - main() diff --git a/norms/nest.py b/norms/nest.py deleted file mode 100644 index fc785345..00000000 --- a/norms/nest.py +++ /dev/null @@ -1,195 +0,0 @@ -#!/usr/bin/env python -# -# License: BSD -# https://raw.githubusercontent.com/splintered-reality/py_trees/devel/LICENSE -# -############################################################################## -# Documentation -############################################################################## - -""" -A py_trees demo. - -.. argparse:: - :module: py_trees.demos.selector - :func: command_line_argument_parser - :prog: py-trees-demo-selector - -.. graphviz:: dot/demo-selector.dot - -.. image:: images/selector.gif - -""" -############################################################################## -# Imports -############################################################################## - -import argparse -import sys -import time -import typing - -import py_trees -import py_trees.console as console - -############################################################################## -# Classes -############################################################################## - - -def description() -> str: - """ - Print description and usage information about the program. - - Returns: - the program description string - """ - content = ( - "Higher priority switching and interruption in the children of a selector.\n" - ) - content += "\n" - content += "In this example the higher priority child is setup to fail initially,\n" - content += "falling back to the continually running second child. On the third\n" - content += ( - "tick, the first child succeeds and cancels the hitherto running child.\n" - ) - if py_trees.console.has_colours: - banner_line = console.green + "*" * 79 + "\n" + console.reset - s = banner_line - s += console.bold_white + "Selectors".center(79) + "\n" + console.reset - s += banner_line - s += "\n" - s += content - s += "\n" - s += banner_line - else: - s = content - return s - - -def epilog() -> typing.Optional[str]: - """ - Print a noodly epilog for --help. - - Returns: - the noodly message - """ - if py_trees.console.has_colours: - return ( - console.cyan - + "And his noodly appendage reached forth to tickle the blessed...\n" - + console.reset - ) - else: - return None - - -def command_line_argument_parser() -> argparse.ArgumentParser: - """ - Process command line arguments. - - Returns: - the argument parser - """ - parser = argparse.ArgumentParser( - description=description(), - epilog=epilog(), - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument( - "-r", "--render", action="store_true", help="render dot tree to file" - ) - return parser - - -def create_root() -> py_trees.behaviour.Behaviour: - """ - Create the root behaviour and it's subtree. - - Returns: - the root behaviour - """ - root = py_trees.composites.Selector(name="Selector", memory=True) - root2 = py_trees.composites.Selector(name="Selector", memory=True) - root3 = py_trees.composites.Selector(name="Selector", memory=True) - ffs = py_trees.behaviours.StatusQueue( - name="FFS", - queue=[ - py_trees.common.Status.FAILURE, - py_trees.common.Status.FAILURE, - py_trees.common.Status.SUCCESS, - ], - eventually=py_trees.common.Status.SUCCESS, - ) - always_running = py_trees.behaviours.Running(name="Running") - ffs3 = py_trees.behaviours.StatusQueue( - name="FFS", - queue=[ - py_trees.common.Status.FAILURE, - py_trees.common.Status.FAILURE, - py_trees.common.Status.SUCCESS, - ], - eventually=py_trees.common.Status.SUCCESS, - ) - always_running3 = py_trees.behaviours.Running(name="Running") - ffs2 = py_trees.behaviours.StatusQueue( - name="FFS", - queue=[ - py_trees.common.Status.FAILURE, - py_trees.common.Status.FAILURE, - py_trees.common.Status.SUCCESS, - ], - eventually=py_trees.common.Status.SUCCESS, - ) - always_running2 = py_trees.behaviours.Running(name="Running") - new_root = py_trees.composites.Selector(name="Selector", memory=False).add_children( - [root, root2, root3] - ) - # new_root.add_children([ffs, always_running]) - root.add_children([ffs]) - root2.add_children([ffs2]) - root3.add_children([ffs3, always_running]) - # root2.add_children([ffs, always_running]) - # root3.add_children([ffs2, always_running2]) - # root.add_children([ffs3, always_running3]) - return new_root - - -############################################################################## -# Main -############################################################################## - - -def main() -> None: - """Entry point for the demo script.""" - args = command_line_argument_parser().parse_args() - print(description()) - py_trees.logging.level = py_trees.logging.Level.DEBUG - - root = create_root() - - #################### - # Rendering - #################### - if args.render: - py_trees.display.render_dot_tree(root) - sys.exit() - - #################### - # Execute - #################### - root.setup_with_descendants() - for i in range(1, 12): - try: - print("\n--------- Tick {0} ---------\n".format(i)) - root.tick_once() - print("\n") - print(py_trees.display.unicode_tree(root=root, show_status=True)) - time.sleep(1.0) - except KeyboardInterrupt: - break - print("\n") - - -if __name__ == "__main__": - main() diff --git a/norms/norms.py b/norms/norms.py deleted file mode 100644 index eee6740b..00000000 --- a/norms/norms.py +++ /dev/null @@ -1,12 +0,0 @@ -print("") -print("Cabinet is locked.") -print("Would you like to change the priorities of actions?") -print("") -print("A - Try Unlocking Cabinet again") -print("B – Wait 10 seconds") -print("C – Call supervisor for remote unlock") -print("") -print("Which action should I take first? (type A, B, or C) _") -print("Which action should I take second? (type A, B, or C) _") -print("") -print("") diff --git a/norms/selector.py b/norms/selector.py deleted file mode 100644 index ba4ed2a8..00000000 --- a/norms/selector.py +++ /dev/null @@ -1,163 +0,0 @@ -#!/usr/bin/env python -# -# License: BSD -# https://raw.githubusercontent.com/splintered-reality/py_trees/devel/LICENSE -# -############################################################################## -# Documentation -############################################################################## - -""" -A py_trees demo. - -.. argparse:: - :module: py_trees.demos.selector - :func: command_line_argument_parser - :prog: py-trees-demo-selector - -.. graphviz:: dot/demo-selector.dot - -.. image:: images/selector.gif - -""" -############################################################################## -# Imports -############################################################################## - -import argparse -import sys -import time -import typing - -import py_trees -import py_trees.console as console - -############################################################################## -# Classes -############################################################################## - - -def description() -> str: - """ - Print description and usage information about the program. - - Returns: - the program description string - """ - content = ( - "Higher priority switching and interruption in the children of a selector.\n" - ) - content += "\n" - content += "In this example the higher priority child is setup to fail initially,\n" - content += "falling back to the continually running second child. On the third\n" - content += ( - "tick, the first child succeeds and cancels the hitherto running child.\n" - ) - if py_trees.console.has_colours: - banner_line = console.green + "*" * 79 + "\n" + console.reset - s = banner_line - s += console.bold_white + "Selectors".center(79) + "\n" + console.reset - s += banner_line - s += "\n" - s += content - s += "\n" - s += banner_line - else: - s = content - return s - - -def epilog() -> typing.Optional[str]: - """ - Print a noodly epilog for --help. - - Returns: - the noodly message - """ - if py_trees.console.has_colours: - return ( - console.cyan - + "And his noodly appendage reached forth to tickle the blessed...\n" - + console.reset - ) - else: - return None - - -def command_line_argument_parser() -> argparse.ArgumentParser: - """ - Process command line arguments. - - Returns: - the argument parser - """ - parser = argparse.ArgumentParser( - description=description(), - epilog=epilog(), - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument( - "-r", "--render", action="store_true", help="render dot tree to file" - ) - return parser - - -def create_root() -> py_trees.behaviour.Behaviour: - """ - Create the root behaviour and it's subtree. - - Returns: - the root behaviour - """ - root = py_trees.composites.Selector(name="Selector", memory=False) - ffs = py_trees.behaviours.StatusQueue( - name="FFS", - queue=[ - py_trees.common.Status.FAILURE, - py_trees.common.Status.FAILURE, - py_trees.common.Status.SUCCESS, - ], - eventually=py_trees.common.Status.SUCCESS, - ) - always_running = py_trees.behaviours.Running(name="Running") - root.add_children([ffs, always_running]) - return root - - -############################################################################## -# Main -############################################################################## - - -def main() -> None: - """Entry point for the demo script.""" - args = command_line_argument_parser().parse_args() - print(description()) - py_trees.logging.level = py_trees.logging.Level.DEBUG - - root = create_root() - - #################### - # Rendering - #################### - if args.render: - py_trees.display.render_dot_tree(root) - sys.exit() - - #################### - # Execute - #################### - root.setup_with_descendants() - for i in range(1, 4): - try: - print("\n--------- Tick {0} ---------\n".format(i)) - root.tick_once() - print("\n") - print(py_trees.display.unicode_tree(root=root, show_status=True)) - time.sleep(1.0) - except KeyboardInterrupt: - break - print("\n") - - -# main() diff --git a/norms/wsocket.html b/norms/wsocket.html deleted file mode 100644 index 628ee5d6..00000000 --- a/norms/wsocket.html +++ /dev/null @@ -1,28 +0,0 @@ - - - - - - - - WebSocket Test - - - - - - - \ No newline at end of file diff --git a/norms/wsocket.py b/norms/wsocket.py deleted file mode 100644 index ad37930e..00000000 --- a/norms/wsocket.py +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env python - -import asyncio -from websockets.server import serve - - -async def echo(websocket): - async for message in websocket: - print("Message from client:", message) - await websocket.send("Hello client") - - -async def main(): - async with serve(echo, "localhost", 8765): - await asyncio.Future() # run forever - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index 3ff2675a..221f7cf6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,6 +78,7 @@ dependencies = [ # Optional "websockets", "click", "typer", + "prompt_toolkit" ] # List additional groups of dependencies here (e.g. development @@ -111,7 +112,7 @@ test = [ "Say Thanks!" = "http://saythanks.io/to/example" "Source" = "https://github.com/pypa/sampleproject/" -[project.scripts] # Optional +[project.scripts] social-norms-trees = "social_norms_trees.ui_wrapper:app" [build-system] @@ -123,3 +124,9 @@ version.source = "vcs" [tool.pytest.ini_options] addopts = "--doctest-modules" + +[tool.hatch.build] +include = [ + "examples/**", + "social_norms_trees/**", +] \ No newline at end of file diff --git a/src/social_norms_trees/__init__.py b/social_norms_trees/__init__.py similarity index 100% rename from src/social_norms_trees/__init__.py rename to social_norms_trees/__init__.py diff --git a/social_norms_trees/atomic_mutations.py b/social_norms_trees/atomic_mutations.py new file mode 100644 index 00000000..68a5b8bb --- /dev/null +++ b/social_norms_trees/atomic_mutations.py @@ -0,0 +1,217 @@ +from collections import namedtuple +import inspect +from functools import partial, wraps +import logging +from types import GenericAlias +from typing import Callable, List, Mapping, NamedTuple, Tuple, TypeVar, Union, Dict + +from social_norms_trees.behavior_tree_library import Behavior, Composite, Sequence + +from pprint import pprint + +_logger = logging.getLogger(__name__) + +# ============================================================================= +# Argument types +# ============================================================================= + +ExistingNode = TypeVar("ExistingNode", bound=Behavior) +NewNode = TypeVar("NewNode", bound=Behavior) +CompositeIndex = TypeVar("CompositeIndex", bound=Tuple[Composite, int]) +BehaviorIdentifier = TypeVar( + "BehaviorIdentifier", bound=Union[ExistingNode, NewNode, CompositeIndex] +) +BehaviorTreeNode = TypeVar("BehaviorTreeNode", bound=Behavior) +BehaviorTree = TypeVar("BehaviorTree", bound=BehaviorTreeNode) +BehaviorLibrary = TypeVar("BehaviorLibrary", bound=List[BehaviorTreeNode]) +TreeOrLibrary = TypeVar("TreeOrLibrary", bound=Union[BehaviorTree, BehaviorLibrary]) + + +# ============================================================================= +# Atomic operations +# ============================================================================= + +# The very top line of each operation's docstring is used as the +# description of the operation in the UI, so it's required. +# The argument annotations are vital, because they tell the UI which prompt +# to use. + + +# TODO: pass in the parent node, and do the action on the parent node directly. +def remove(node: ExistingNode, parent: Composite) -> ExistingNode: + """Remove a node. + Examples: + >>> success_node = Behavior(name="Success") + >>> failure_node = Behavior(name="Failure") + + >>> tree = Sequence("") + >>> tree.add_child(success_node) + >>> tree.add_child(failure_node) + + >>> pprint(tree) + ... # doctest: +NORMALIZE_WHITESPACE + Sequence(name='', + children=[Behavior(name='Success', id=None), + Behavior(name='Failure', id=None)]) + + >>> removed = remove(failure_node, tree) + >>> pprint(tree) + ... # doctest: +NORMALIZE_WHITESPACE + Sequence(name='', + children=[Behavior(name='Success', id=None)]) + """ + parent.remove_child(node) + return node + + +def insert(node: NewNode, where: CompositeIndex) -> None: + """Insert a new node. + Examples: + >>> success_node = Behavior(name="Success") + >>> tree = Sequence("", children=[success_node]) + + >>> pprint(tree) + ... # doctest: +NORMALIZE_WHITESPACE + Sequence(name='', + children=[Behavior(name='Success', id=None)]) + + >>> failure_node = Behavior(name="Failure") + >>> insert(failure_node, (tree, 1)) + + >>> pprint(tree) + ... # doctest: +NORMALIZE_WHITESPACE + Sequence(name='', + children=[Behavior(name='Success', id=None), + Behavior(name='Failure', id=None)]) + + >>> dummy_node = Behavior(name="Dummy") + >>> insert(dummy_node, (tree, 0)) + >>> pprint(tree) + ... # doctest: +NORMALIZE_WHITESPACE + Sequence(name='', + children=[Behavior(name='Dummy', id=None), + Behavior(name='Success', id=None), + Behavior(name='Failure', id=None)]) + """ + + parent, index = where + parent.insert_child(index, node) + return + + +def move( + node: ExistingNode, + where: CompositeIndex, +) -> None: + """Move a node. + Examples: + + >>> success_node = Behavior(name="Success") + >>> failure_node = Behavior(name="Failure") + + >>> tree = Sequence("") + >>> tree.add_child(success_node) + >>> tree.add_child(failure_node) + + >>> pprint(tree) + ... # doctest: +NORMALIZE_WHITESPACE + Sequence(name='', + children=[Behavior(name='Success', id=None), + Behavior(name='Failure', id=None)]) + + >>> move(failure_node, (tree, 0)) + >>> pprint(tree) + ... # doctest: +NORMALIZE_WHITESPACE + Sequence(name='', + children=[Behavior(name='Failure', id=None), + Behavior(name='Success', id=None)]) + """ + parent, index = where + insert(remove(node, parent), (parent, index)) + return + + +# # # ============================================================================= +# # # Node and Position Selectors +# # # ============================================================================= + +from typing import Union, Generator + + +def iterate_nodes(tree: Union[Behavior, Sequence]): + """ + Examples: + >>> dummy_node = Behavior(name="Dummy") + + >>> list(iterate_nodes(dummy_node)) + ... # doctest: +ELLIPSIS + [Behavior(name='Dummy', id=None)] + + >>> sequence = Sequence("", children=[dummy_node]) + >>> pprint(list(iterate_nodes(sequence))) + ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE + [Sequence(name='', children=[Behavior(name='Dummy', id=None)]), + Behavior(name='Dummy', id=None)] + + >>> dummy_node_2 = Behavior(name="Dummy") + >>> dummy_node_3 = Behavior(name="Dummy") + >>> sequence_2 = Sequence("", children=[dummy_node_3]) + >>> sequence_3 = Sequence("", children=[dummy_node, dummy_node_2, sequence_2]) + >>> pprint(list(iterate_nodes(sequence_3))) + ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE + [Sequence(name='', + children=[Behavior(name='Dummy', id=None), + Behavior(name='Dummy', id=None), + Sequence(name='', + children=[Behavior(name='Dummy', id=None)])]), + Behavior(name='Dummy', id=None), + Behavior(name='Dummy', id=None), + Sequence(name='', children=[Behavior(name='Dummy', id=None)]), + Behavior(name='Dummy', id=None)] + """ + yield tree + + # Check if the node is a Sequence and has children to iterate over + if hasattr(tree, "children"): + for child in tree.children: + yield from iterate_nodes(child) + + +def enumerate_nodes(tree: Behavior): + """ + Examples: + >>> dummy_node = Behavior(name="Dummy") + >>> print(list(enumerate_nodes(dummy_node))) + [(0, Behavior(name='Dummy', id=None))] + + >>> sequence = Sequence("", children=[dummy_node]) + >>> print(list(enumerate_nodes(sequence))) + ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE + [(0, Sequence(name='', children=[Behavior(name='Dummy', id=None)])), (1, Behavior(name='Dummy', id=None))] + + >>> success_node = Behavior(name="Success") + >>> dummy_node_2 = Behavior(name="Dummy") + >>> failure_node = Behavior(name="Failure") + >>> success_node_2 = Behavior(name="Success") + >>> sequence_2 = Sequence("", children=[dummy_node_2, success_node_2]) + >>> sequence_3 = Sequence("", children=[failure_node]) + >>> sequence_1 = Sequence("", children=[success_node, sequence_2, sequence_3]) + >>> print(list(enumerate_nodes(sequence_1))) + ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE + [(0, Sequence(name='', children=[Behavior(name='Success', id=None), Sequence(name='', children=[Behavior(name='Dummy', id=None), Behavior(name='Success', id=None)]), Sequence(name='', children=[Behavior(name='Failure', id=None)])])), (1, Behavior(name='Success', id=None)), (2, Sequence(name='', children=[Behavior(name='Dummy', id=None), Behavior(name='Success', id=None)])), (3, Behavior(name='Dummy', id=None)), (4, Behavior(name='Success', id=None)), (5, Sequence(name='', children=[Behavior(name='Failure', id=None)])), (6, Behavior(name='Failure', id=None))] + """ + return enumerate(iterate_nodes(tree)) + + +# # ============================================================================= +# # Utility functions +# # ============================================================================= + + +class QuitException(Exception): + pass + + +def end_experiment(): + """I'm done, end the experiment.""" + raise QuitException("User ended the experiment.") diff --git a/social_norms_trees/behavior_tree_library.py b/social_norms_trees/behavior_tree_library.py new file mode 100644 index 00000000..1836cec8 --- /dev/null +++ b/social_norms_trees/behavior_tree_library.py @@ -0,0 +1,35 @@ +from dataclasses import dataclass, field +from typing import Optional, List + + +@dataclass +class Behavior: + name: str + id: Optional[str] = None + + +@dataclass +class Composite: + name: str + children: List[Behavior] = field(default_factory=list) + + def add_child(self, child: Behavior): + self.children.append(child) + + def insert_child(self, index: int, child: Behavior): + if 0 <= index <= len(self.children): + self.children.insert(index, child) + + def remove_child(self, child: Behavior): + if child in self.children: + self.children.remove(child) + + +@dataclass +class Sequence(Composite): + pass + + +@dataclass +class Selector(Composite): + pass diff --git a/social_norms_trees/interactive_ui.py b/social_norms_trees/interactive_ui.py new file mode 100644 index 00000000..21c18470 --- /dev/null +++ b/social_norms_trees/interactive_ui.py @@ -0,0 +1,152 @@ +from prompt_toolkit import Application +from prompt_toolkit.layout import Layout, HSplit, VSplit, Window +from prompt_toolkit.key_binding import KeyBindings +from prompt_toolkit.layout.controls import FormattedTextControl +from prompt_toolkit.formatted_text import FormattedText +from prompt_toolkit.styles import Style + +from typing import Optional, List, Callable + +from social_norms_trees.behavior_tree_library import Behavior + + +def run_interactive_list( + nodes: List, mode: str, new_behavior: Optional[Behavior] = None +): + """ + Runs an interactive list UI to insert a new action. + """ + selected_index = 0 + + fontColors = {"move": "ff8000", "select": "0080ff", "insert": "00ff80"} + + if mode == "move": + selected_index = nodes.index(new_behavior) + nodes.remove(new_behavior) + + def get_display_text_insert(): + result = [] + for i in range(len(nodes) + 1): + if i == selected_index: + result.append( + (f"fg:#{fontColors[mode]}", f"-> {{{new_behavior.name}}}\n") + ) + elif i < selected_index: + result.append(("fg:white", f"-> {nodes[i].name}\n")) + else: + # now that selected index has moved behind this behavior, index is index-1 + result.append(("fg:white", f"-> {nodes[i - 1].name}\n")) + return FormattedText(result) + + def get_display_text_select(): + result = [] + for i in range(len(nodes)): + if i == selected_index: + result.append((f"fg:#{fontColors[mode]}", f"-> {nodes[i].name}\n")) + else: + result.append(("fg:white", f"-> {nodes[i].name}\n")) + + return FormattedText(result) + + instructions_set = { + "insert": "Use the Up/Down arrow keys to select where to insert the action. ", + "select": "Use the Up/Down arrow keys to select the desired action to operate on. ", + "move": "Use the Up/Down arrow keys to select the new position for the action. ", + } + + # FormattedText used to define text and also text styling + instructions = FormattedText( + [ + ( + f"bg:#282c34 fg:#{fontColors[mode]} bold", + instructions_set[mode] + + "Press Enter to confirm. Press esc to exit at anytime.", + ) + ] + ) + + instructions_window = Window( + content=FormattedTextControl(instructions), height=1, align="center" + ) + + # initial window display + display_mode = { + "insert": get_display_text_insert, + "move": get_display_text_insert, + "select": get_display_text_select, + } + + display = Window( + content=FormattedTextControl(display_mode[mode]), + style="class:output", + height=10, + align="center", + ) + + # Key bindings + kb = KeyBindings() + + @kb.add("up") + def move_up(event): + nonlocal selected_index + if selected_index > 0: + selected_index -= 1 + if mode == "insert" or mode == "move": + display.content.text = get_display_text_insert() + elif mode == "select": + display.content.text = get_display_text_select() + + @kb.add("down") + def move_down(event): + nonlocal selected_index + + if mode == "insert" or mode == "move": + if selected_index < len(nodes): + selected_index += 1 + display.content.text = get_display_text_insert() + elif mode == "select": + if selected_index < len(nodes) - 1: + selected_index += 1 + display.content.text = get_display_text_select() + + @kb.add("enter") + def select_action(event): + if mode == "insert" or mode == "move": + # nodes.insert(selected_index, new_behavior) + app.exit(result=selected_index) + # app.exit(result=nodes) + elif mode == "select": + app.exit(result=nodes[selected_index]) + + @kb.add("escape") + def exit_without_changes(event): + app.exit(result=None) + + # For styling the "box" + root_container = HSplit( + [ + Window(height=5), # top padding + VSplit( + [ + Window(), # Left padding + HSplit( + [instructions_window, Window(height=1), display], align="center" + ), + Window(), # Right padding + ], + align="center", + ), + Window(height=2), # Bottom padding + ] + ) + layout = Layout(root_container) + + style = Style.from_dict( + { + "output": "bg:#282c34 #ffffff bold", # Dark grey background, text bold + "instructions": f"fg:#{fontColors[mode]} bold", + } + ) + + app = Application(layout=layout, key_bindings=kb, full_screen=True, style=style) + return app.run() diff --git a/social_norms_trees/ui_wrapper.py b/social_norms_trees/ui_wrapper.py new file mode 100644 index 00000000..1528c88d --- /dev/null +++ b/social_norms_trees/ui_wrapper.py @@ -0,0 +1,402 @@ +import logging +import pathlib +from typing import Annotated, List +import click +from datetime import datetime +import json +import os +import uuid +import traceback +import time +import typer +import importlib.resources as pkg_resources + +from social_norms_trees.behavior_tree_library import Behavior, Sequence +from social_norms_trees.atomic_mutations import remove, insert, move + +from social_norms_trees.interactive_ui import run_interactive_list + +SLEEP_TIME = 2 + + +def load_db(db_file): + if os.path.exists(db_file): + with open(db_file, "r") as f: + return json.load(f) + else: + return {} + + +def save_db(db, db_file): + """Saves the Python dictionary back to db.json.""" + + print(f"Writing results of simulation to {db_file}...") + time.sleep(SLEEP_TIME) + print("Done.") + + json_representation = json.dumps(db, indent=4) + + with open(db_file, "w") as f: + f.write(json_representation) + + +def experiment_setup(db, resource_file): + participant_name = participant_login() + + experiment_id = initialize_experiment_record(db, participant_name, resource_file) + + print("\nSetup Complete.\n") + + return participant_name, experiment_id + + +def participant_login(): + name = click.prompt("Please enter your name", type=str) + + return name + + +def deserialize_behaviors(behaviors): + deserialized_behaviors = {} + + for behavior in behaviors: + deserialized_behaviors[behavior["id"]] = Behavior( + id=behavior["id"], name=behavior["name"] + ) + + return deserialized_behaviors + + +def build_tree(subtree, children, behaviors): + children_behaviors = [] + + parent_node = Sequence( + name=subtree, + ) + + for behavior_id in children: + behaviors[behavior_id].parent = parent_node + children_behaviors.append(behaviors[behavior_id]) + + return Sequence(name=subtree, children=children_behaviors) + + +def serialize_tree(behavior_tree): + children_list = [] + + for node in behavior_tree.children: + children_list.append(node.id) + return children_list + + +# behaviors = deserialized behavious +# behavior_list = array of all behaviors +def build_behavior_bank(behaviors, behavior_list): + behavior_bank = [] + + for behavior in behavior_list: + if "in_behavior_bank" in behavior and behavior["in_behavior_bank"]: + behavior_bank.append(behaviors[behavior["id"]]) + return behavior_bank + + +def display_tree(node, indent=0): + """Recursively display the behavior tree in a readable format.""" + if isinstance(node, Sequence): + print(" " * indent + node.name) + if node.children: + for child in node.children: + display_tree(child, indent + 4) + elif isinstance(node, Behavior): + print(" " * indent + " -> " + node.name) + + +def load_resources(resource_file): + try: + print(f"\nLoading behavior tree and behavior library from {resource_file}...\n") + # Use importlib.resources to access files within the package + resource_file = pkg_resources.files("examples") / resource_file + with resource_file.open() as f: + resources = json.load(f) + + except json.JSONDecodeError: + raise ValueError("Error") + except Exception: + raise RuntimeError("Error") + + all_resources = {} + + for subtree in resources: + children = resources[subtree].get("children") + behavior_list = resources[subtree].get("behavior_library") + context_paragraph = resources[subtree].get("context") + + # deserialize behavior_list + deserialized_behaviors = deserialize_behaviors(behavior_list) + + # then use it to build the subgoal behavior tree + sub_tree = build_tree(subtree, children, deserialized_behaviors) + + behavior_bank = build_behavior_bank(deserialized_behaviors, behavior_list) + + all_resources[subtree] = { + "context": context_paragraph, + "behaviors": behavior_bank, + "sub_tree": sub_tree, + } + + return all_resources + + +def initialize_experiment_record(db, participant_name, resource_file): + experiment_id = str(uuid.uuid4()) + + experiment_record = { + "participant_name": participant_name, + "experiment_start_date": datetime.now().isoformat(), + "experiment_progression": {}, + "resource_file": resource_file, + } + + db[experiment_id] = experiment_record + + return experiment_id + + +def summarize_behaviors_check(subgoal_resources, db): + print("Bot: Here are the actions, in order, that I will take to achieve this goal.") + run_tree_manipulation( + subgoal_resources["behaviors"], subgoal_resources["sub_tree"], db + ) + + +def display_tree_one_level( + node, + indent=0, +): + print(" " * indent + node.name) + + if isinstance(node, Sequence): + if node.children: + for child in node.children: + print(f" " * indent + f" -> {child.name}") + + +def run_tree_manipulation(behavior_library, tree, db): + try: + while True: + print("\n") + display_tree_one_level(tree) + user_choice = click.prompt( + "Would you like to make a change before I begin?", + show_choices=True, + type=click.Choice(["y", "n"], case_sensitive=False), + ) + + if user_choice == "y": + action = click.prompt( + "\n1. move an existing node\n" + + "2. remove an existing node\n" + + "3. add a new node\n" + + "Please select an action to perform on the behavior tree", + type=click.IntRange(min=1, max=4), + show_choices=True, + ) + + if action == 1: + # Select node to be moved + selected_node = run_interactive_list(tree.children, mode="select") + if selected_node == None: + continue + + # Select position of node + selected_index = run_interactive_list( + tree.children, mode="move", new_behavior=selected_node + ) + if selected_index == None: + continue + + # Perform operation + move(selected_node, (tree, selected_index)) + + action_log = { + "type": "move_node", + "nodes": [ + { + "display_name": selected_node.name, + }, + ], + "timestamp": datetime.now().isoformat(), + } + db["action_history"].append(action_log) + + elif action == 2: + # Select node to be removed + selected_node = run_interactive_list(tree.children, mode="select") + if selected_node == None: + continue + + # Perform operation + remove(selected_node, tree) + + action_log = { + "type": "remove_node", + "nodes": [ + {"display_name": selected_node.name}, + ], + "timestamp": datetime.now().isoformat(), + } + db["action_history"].append(action_log) + + elif action == 3: + # TODO: think about where the new action should originally show up in the list. It's original position could + # possible affect participant's decision making + + # Select node to be add + selected_node = run_interactive_list( + behavior_library, mode="select" + ) + if selected_node == None: + continue + + # Select position of node + selected_index = run_interactive_list( + tree.children, mode="insert", new_behavior=selected_node + ) + if selected_index == None: + continue + + # Perform operation + insert(selected_node, (tree, selected_index)) + + action_log = { + "type": "add_node", + "node": {"name": selected_node.name}, + "timestamp": datetime.now().isoformat(), + } + db["action_history"].append(action_log) + + else: + break + + except Exception: + print( + "\nAn error has occured during the tree manipulation, the experiment will now end." + ) + db["error_log"] = traceback.format_exc() + + finally: + return + + +def run_milestone(subgoal_resources, title, db): + db["start_time"] = datetime.now().isoformat() + db["base_subtree"] = serialize_tree(subgoal_resources["sub_tree"]) + db["action_history"] = [] + + # present context for this subgoal + print("\n =========================================================") + time.sleep(SLEEP_TIME) + print("\n" + subgoal_resources["context"]) + + print(f"\nBot: I am starting the following milestone: {title}\n") + time.sleep(SLEEP_TIME) + + summarize_behaviors_check(subgoal_resources, db) + + time.sleep(SLEEP_TIME) + print("\nBot: Okay, I will begin.") + + for action in subgoal_resources["sub_tree"].children: + time.sleep(SLEEP_TIME) + print(f"\nBot: I am about to {action.name}") + + if isinstance(action, Sequence): + print( + "Bot: This is a Sequence type node, would you like to see the sub-behaviors of this node?" + ) + time.sleep(SLEEP_TIME) + print(f"Action in progress..") + + db["final_subtree"] = serialize_tree(subgoal_resources["sub_tree"]) + db["end_time"] = datetime.now().isoformat() + + print(f"\nBot: The following milestone has been reached: {title}\n") + + +def sub_function(): + print("subfunction pressed.") + + +def run_experiment(db, all_resources, experiment_id): + # Loop for the actual experiment part, which takes user input to decide which action to take + + for subgoal in all_resources: + db[experiment_id]["experiment_progression"][subgoal] = {} + run_milestone( + all_resources[subgoal], + subgoal, + db[experiment_id]["experiment_progression"][subgoal], + ) + + return db + + +app = typer.Typer() + + +@app.command() +def main( + robot: Annotated[ + str, + typer.Argument(help="Name of the robot to run experiment on"), + ], + db_file: Annotated[ + pathlib.Path, + typer.Option(help="file where the experimental results will be written"), + ] = "experiment_results.json", + verbose: Annotated[bool, typer.Option("--verbose")] = False, + debug: Annotated[bool, typer.Option("--debug")] = False, +): + if debug: + logging.basicConfig(level=logging.DEBUG) + _logger.debug("debug logging") + elif verbose: + logging.basicConfig(level=logging.INFO) + _logger.debug("verbose logging") + else: + logging.basicConfig() + + print("AIT Prototype #1 Simulator") + + db = load_db(db_file) + + # load robot profile to run experiment on, and behavior library + resource_file = f"{robot}-resource-file.json" + all_resources = load_resources(resource_file) + + name, experiment_id = experiment_setup(db, resource_file) + + # TODO: update the colors of the instructions in the prompt toolkit, change the color + # when we move from first to second interface + print( + f"Bot: Hello {name}, welcome to the agent iteractive training experiment! My name is {robot}. We will be working together to achieve a specific milestone. I will first provide you with a list of actions I plan to take to accomplish the goal. After reviewing the list, you will have the opportunity to make any adjustments to these actions. Once you're satisfied with the plan, I will perform the actions. Let's begin!" + ) + time.sleep(SLEEP_TIME) + db = run_experiment(db, all_resources, experiment_id) + + save_db(db, db_file) + + # TODO: Add more context to simulation ending + print( + f"\nThank you, {name}, for participating in the experiment. " + f"The experiment has concluded, and the results have been recorded in the {db_file} file. " + "We greatly appreciate your time and effort!" + ) + + # TODO: visualize the differences between old and new behavior trees after experiment. + # Potentially use git diff + + +if __name__ == "__main__": + app() diff --git a/src/social_norms_trees/atomic_mutations.py b/src/social_norms_trees/atomic_mutations.py deleted file mode 100644 index 850c319a..00000000 --- a/src/social_norms_trees/atomic_mutations.py +++ /dev/null @@ -1,745 +0,0 @@ -from collections import namedtuple -import inspect -from functools import partial, wraps -import logging -from types import GenericAlias -from typing import Callable, List, Mapping, NamedTuple, Tuple, TypeVar, Union, Dict - -import click -import py_trees -import typer - -_logger = logging.getLogger(__name__) - -# ============================================================================= -# Argument types -# ============================================================================= - -ExistingNode = TypeVar("ExistingNode", bound=py_trees.behaviour.Behaviour) -NewNode = TypeVar("NewNode", bound=py_trees.behaviour.Behaviour) -CompositeIndex = TypeVar( - "CompositeIndex", bound=Tuple[py_trees.composites.Composite, int] -) -BehaviorIdentifier = TypeVar( - "BehaviorIdentifier", bound=Union[ExistingNode, NewNode, CompositeIndex] -) -BehaviorTreeNode = TypeVar("BehaviorTreeNode", bound=py_trees.behaviour.Behaviour) -BehaviorTree = TypeVar("BehaviorTree", bound=BehaviorTreeNode) -BehaviorLibrary = TypeVar("BehaviorLibrary", bound=List[BehaviorTreeNode]) -TreeOrLibrary = TypeVar("TreeOrLibrary", bound=Union[BehaviorTree, BehaviorLibrary]) - - -# ============================================================================= -# Atomic operations -# ============================================================================= - -# The very top line of each operation's docstring is used as the -# description of the operation in the UI, so it's required. -# The argument annotations are vital, because they tell the UI which prompt -# to use. - - -def remove(node: ExistingNode) -> ExistingNode: - """Remove a node. - Examples: - >>> tree = py_trees.composites.Sequence("", False, children=[ - ... py_trees.behaviours.Success(), - ... failure := py_trees.behaviours.Failure()]) - >>> print(py_trees.display.ascii_tree(tree)) - ... # doctest: +NORMALIZE_WHITESPACE - [-] - --> Success - --> Failure - >>> removed = remove(failure) - >>> print(py_trees.display.ascii_tree(tree)) - ... # doctest: +NORMALIZE_WHITESPACE - [-] - --> Success - """ - if node.parent is None: - msg = ( - f"%s's parent is None, so we can't remove it. We can't remove the root node." - % (node) - ) - raise ValueError(msg) - elif isinstance(node.parent, py_trees.composites.Composite): - node.parent.remove_child(node) - else: - raise NotImplementedError() - return node - - -def insert(node: NewNode, where: CompositeIndex) -> None: - """Insert a new node. - Examples: - >>> tree = py_trees.composites.Sequence("", False, children=[ - ... py_trees.behaviours.Success() - ... ]) - >>> print(py_trees.display.ascii_tree(tree)) - ... # doctest: +NORMALIZE_WHITESPACE - [-] - --> Success - - >>> insert(py_trees.behaviours.Failure(), (tree, 1)) - >>> print(py_trees.display.ascii_tree(tree)) - ... # doctest: +NORMALIZE_WHITESPACE - [-] - --> Success - --> Failure - - >>> insert(py_trees.behaviours.Dummy(), (tree, 0)) - >>> print(py_trees.display.ascii_tree(tree)) - ... # doctest: +NORMALIZE_WHITESPACE - [-] - --> Dummy - --> Success - --> Failure - """ - parent, index = where - parent.insert_child(node, index) - return - - -def move( - node: ExistingNode, - where: CompositeIndex, -) -> None: - """Move a node. - Examples: - >>> tree = py_trees.composites.Sequence("", False, children=[ - ... failure_node := py_trees.behaviours.Failure(), - ... success_node := py_trees.behaviours.Success(), - ... ]) - - >>> print(py_trees.display.ascii_tree(tree)) - ... # doctest: +NORMALIZE_WHITESPACE - [-] - --> Failure - --> Success - - >>> move(failure_node, (tree, 1)) - >>> print(py_trees.display.ascii_tree(tree)) - ... # doctest: +NORMALIZE_WHITESPACE - [-] - --> Success - --> Failure - >>> move(failure_node, (tree, 1)) - - >>> print(py_trees.display.ascii_tree(tree)) - ... # doctest: +NORMALIZE_WHITESPACE - [-] - --> Success - --> Failure - """ - parent, index = where - insert(remove(node), (parent, index)) - return - - -def exchange( - node0: ExistingNode, - node1: ExistingNode, -) -> None: - """Exchange two nodes. - Examples: - >>> tree = py_trees.composites.Sequence("", False, children=[ - ... s := py_trees.behaviours.Success(), - ... f := py_trees.behaviours.Failure(), - ... ]) - - >>> print(py_trees.display.ascii_tree(tree)) - ... # doctest: +NORMALIZE_WHITESPACE - [-] - --> Success - --> Failure - - >>> exchange(s, f) - >>> print(py_trees.display.ascii_tree(tree)) - ... # doctest: +NORMALIZE_WHITESPACE - [-] - --> Failure - --> Success - - >>> tree = py_trees.composites.Sequence("", False, children=[ - ... a:= py_trees.composites.Sequence("A", False, children=[ - ... py_trees.behaviours.Dummy() - ... ]), - ... py_trees.composites.Sequence("B", False, children=[ - ... py_trees.behaviours.Success(), - ... c := py_trees.composites.Sequence("C", False, children=[]) - ... ]) - ... ]) - - >>> print(py_trees.display.ascii_tree(tree)) - ... # doctest: +NORMALIZE_WHITESPACE - [-] - [-] A - --> Dummy - [-] B - --> Success - [-] C - - >>> exchange(a, c) - >>> print(py_trees.display.ascii_tree(tree)) - ... # doctest: +NORMALIZE_WHITESPACE - [-] - [-] C - [-] B - --> Success - [-] A - --> Dummy - - >>> tree = py_trees.composites.Sequence("", False, children=[ - ... py_trees.composites.Sequence("1", False, children=[ - ... a := py_trees.behaviours.Dummy("A") - ... ]), - ... py_trees.composites.Sequence("2", False, children=[ - ... b := py_trees.behaviours.Dummy("B") - ... ]) - ... ]) - - >>> print(py_trees.display.ascii_tree(tree)) - ... # doctest: +NORMALIZE_WHITESPACE - [-] - [-] 1 - --> A - [-] 2 - --> B - - >>> exchange(a, b) - >>> print(py_trees.display.ascii_tree(tree)) - ... # doctest: +NORMALIZE_WHITESPACE - [-] - [-] 1 - --> B - [-] 2 - --> A - """ - - node0_parent = node0.parent - node0_index = node0.parent.children.index(node0) - node1_parent = node1.parent - node1_index = node1.parent.children.index(node1) - - move(node0, (node1_parent, node1_index)) - move(node1, (node0_parent, node0_index)) - - return - - -# ============================================================================= -# Node and Position Selectors -# ============================================================================= - - -def iterate_nodes(tree: py_trees.behaviour.Behaviour): - """ - Examples: - >>> list(iterate_nodes(py_trees.behaviours.Dummy())) - ... # doctest: +ELLIPSIS - [] - >>> list(iterate_nodes( - ... py_trees.composites.Sequence("", False, children=[ - ... py_trees.behaviours.Dummy(), - ... ]))) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE - [, - ] - >>> list(iterate_nodes( - ... py_trees.composites.Sequence("", False, children=[ - ... py_trees.behaviours.Dummy(), - ... py_trees.behaviours.Dummy(), - ... py_trees.composites.Sequence("", False, children=[ - ... py_trees.behaviours.Dummy(), - ... ]), - ... ]))) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE - [, - , - , - , - ] - """ - yield tree - for child in tree.children: - yield from iterate_nodes(child) - - -def enumerate_nodes(tree: py_trees.behaviour.Behaviour): - """ - Examples: - - >>> list(enumerate_nodes(py_trees.behaviours.Dummy())) - ... # doctest: +ELLIPSIS - [(0, )] - >>> list(enumerate_nodes( - ... py_trees.composites.Sequence("", False, children=[ - ... py_trees.behaviours.Dummy(), - ... ]))) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE - [(0, ), - (1, )] - >>> list(enumerate_nodes( - ... py_trees.composites.Sequence("s1", False, children=[ - ... py_trees.behaviours.Dummy(), - ... py_trees.behaviours.Success(), - ... py_trees.composites.Sequence("s2", False, children=[ - ... py_trees.behaviours.Dummy(), - ... ]), - ... py_trees.composites.Sequence("", False, children=[ - ... py_trees.behaviours.Failure(), - ... py_trees.behaviours.Periodic("p", n=1), - ... ]), - ... ]))) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE - [(0, ), - (1, ), - (2, ), - (3, ), - (4, ), - (5, ), - (6, ), - (7, )] - """ - return enumerate(iterate_nodes(tree)) - - -def label_tree_lines( - tree: py_trees.behaviour.Behaviour, - labels: List[str], - representation=py_trees.display.unicode_tree, -) -> str: - """Label the lines of a tree. - Examples: - >>> print(label_tree_lines(py_trees.behaviours.Dummy(), labels=["0"])) - 0: --> Dummy - >>> tree = py_trees.composites.Sequence( - ... "S1", - ... False, - ... children=[ - ... py_trees.behaviours.Dummy(), - ... py_trees.behaviours.Dummy()] - ... ) - >>> print(label_tree_lines(tree, labels=["A", "B", "C"])) - A: [-] S1 - B: --> Dummy - C: --> Dummy - >>> print(label_tree_lines(tree, labels=["AAA", "BB", "C"])) - AAA: [-] S1 - BB: --> Dummy - C: --> Dummy - - If there are more labels than lines, then those are shown too: - >>> print(label_tree_lines(tree, labels=["AAA", "BB", "C", "O"])) - AAA: [-] S1 - BB: --> Dummy - C: --> Dummy - O: - """ - max_len = max([len(s) for s in labels]) - padded_labels = [s.rjust(max_len) for s in labels] - - tree_representation_lines = representation(tree).split("\n") - - enumerated_tree_representation_lines = [ - # Make the line. If `t` is missing, - # then we don't want a trailing space - # so we strip that away - f"{i}: {t}".rstrip() - for i, t in zip(padded_labels, tree_representation_lines) - ] - - output = "\n".join(enumerated_tree_representation_lines) - return output - - -# TODO: Split each of these functions into one which -# returns a labeled representation of the tree, -# a mapping of allowed values to nodes and -# a separate function which does the prompting. -# This should help testing. - -# Edge cases: what happens if the name of a node is really long - does the ascii representation wrap around? - - -class NodeMappingRepresentation(NamedTuple): - mapping: Dict[str, py_trees.behaviour.Behaviour] - labels: List[str] - representation: str - - -def prompt_identify( - tree: TreeOrLibrary, - function: Callable[ - [TreeOrLibrary], Tuple[Mapping[str, BehaviorIdentifier], List[str], str] - ], - message: str = "Which?", - display_nodes: bool = True, -) -> BehaviorIdentifier: - - mapping, labels, representation = function(tree) - - if display_nodes: - text = f"{representation}\n{message}" - else: - text = f"{message}" - - key = click.prompt(text=text, type=click.Choice(labels)) - node = mapping[key] - return node - - -def get_node_mapping(tree: BehaviorTree) -> NodeMappingRepresentation: - """ - Examples: - >>> a = get_node_mapping(py_trees.behaviours.Dummy()) - >>> a.mapping - {'0': } - - >>> a.labels - ['0'] - - >>> print(a.representation) - 0: --> Dummy - - >>> b = get_node_mapping(py_trees.composites.Sequence("", False, children=[py_trees.behaviours.Dummy()])) - >>> b.mapping # doctest: +NORMALIZE_WHITESPACE - {'0': , - '1': } - - >>> b.labels - ['0', '1'] - - >>> print(b.representation) - 0: [-] - 1: --> Dummy - - """ - mapping = {str(i): n for i, n in enumerate_nodes(tree)} - labels = list(mapping.keys()) - representation = label_tree_lines(tree=tree, labels=labels) - return NodeMappingRepresentation(mapping, labels, representation) - - -prompt_identify_node = partial(prompt_identify, function=get_node_mapping) - - -def get_library_mapping(library: BehaviorLibrary) -> NodeMappingRepresentation: - """ - Examples: - >>> from py_trees.behaviours import Success, Failure - >>> n = get_library_mapping([]) - >>> n.mapping - {} - - >>> n.labels - [] - - >>> print(n.representation) - - - >>> a = get_library_mapping([Success(), Failure()]) - >>> a.mapping # doctest: +NORMALIZE_WHITESPACE - {'0': , - '1': } - - >>> a.labels - ['0', '1'] - - >>> print(a.representation) - 0: Success - 1: Failure - """ - - mapping = {str(i): n for i, n in enumerate(library)} - labels = list(mapping.keys()) - representation = "\n".join([f"{i}: {n.name}" for i, n in enumerate(library)]) - return NodeMappingRepresentation(mapping, labels, representation) - - -prompt_identify_library_node = partial(prompt_identify, function=get_library_mapping) - - -def get_composite_mapping(tree: BehaviorTree, skip_label="_"): - """ - Examples: - >>> a = get_composite_mapping(py_trees.behaviours.Dummy()) - >>> a.mapping - {} - - >>> a.labels - [] - - >>> print(a.representation) - _: --> Dummy - - >>> b = get_composite_mapping(py_trees.composites.Sequence("", False, children=[py_trees.behaviours.Dummy()])) - >>> b.mapping # doctest: +NORMALIZE_WHITESPACE - {'0': } - - >>> b.labels - ['0'] - - >>> print(b.representation) - 0: [-] - _: --> Dummy - """ - mapping = {} - display_labels, allowed_labels = [], [] - - for i, node in enumerate_nodes(tree): - label = str(i) - if isinstance(node, py_trees.composites.Composite): - mapping[label] = node - display_labels.append(label) - allowed_labels.append(label) - else: - display_labels.append(skip_label) - representation = label_tree_lines(tree=tree, labels=display_labels) - - return NodeMappingRepresentation(mapping, allowed_labels, representation) - - -prompt_identify_composite = partial(prompt_identify, function=get_composite_mapping) - - -def get_child_index_mapping(tree: BehaviorTree, skip_label="_"): - """ - Examples: - >>> a = get_child_index_mapping(py_trees.behaviours.Dummy()) - >>> a.mapping - {'0': 0} - - >>> a.labels - ['0'] - - >>> print(a.representation) - _: --> Dummy - 0: - - >>> b = get_child_index_mapping(py_trees.composites.Sequence("", False, children=[py_trees.behaviours.Dummy()])) - >>> b.mapping # doctest: +NORMALIZE_WHITESPACE - {'0': 0, '1': 1} - - >>> b.labels - ['0', '1'] - - >>> print(b.representation) - _: [-] - 0: --> Dummy - 1: - """ - mapping = {} - display_labels, allowed_labels = [], [] - - for node in iterate_nodes(tree): - if node in tree.children: - index = tree.children.index(node) - label = str(index) - mapping[label] = index - allowed_labels.append(label) - display_labels.append(label) - else: - display_labels.append(skip_label) - - # Add the "after all the elements" label - post_list_index = len(tree.children) - post_list_label = str(post_list_index) - allowed_labels.append(post_list_label) - display_labels.append(post_list_label) - mapping[post_list_label] = post_list_index - - representation = label_tree_lines(tree=tree, labels=display_labels) - - return NodeMappingRepresentation(mapping, allowed_labels, representation) - - -prompt_identify_child_index = partial(prompt_identify, function=get_child_index_mapping) - - -def get_position_mapping(tree): - """ - - - - [-] S0 - --> {1} - [-] S1 - --> {2} - --> Dummy - --> {3} - --> {4} - [-] S2 - --> {5} - --> Failure - --> {6} - --> {7} - - - - """ - pass - - -# ============================================================================= -# User Interface -# ============================================================================= - - -# Wrapper functions for the atomic operations which give them a UI. -MutationResult = namedtuple("MutationResult", ["result", "tree", "function", "kwargs"]) - - -def mutate_chooser(*fs: Union[Callable], message="Which action?"): - """Prompt the user to choose one of the functions f. - Returns the wrapped version of the function. - """ - n_fs = len(fs) - docstring_summaries = [f_.__doc__.split("\n")[0] for f_ in fs] - text = ( - "\n".join( - [ - f"{i}: {docstring_summary}" - for i, docstring_summary in enumerate(docstring_summaries) - ] - ) - + f"\n{message}" - ) - i = click.prompt(text=text, type=click.IntRange(0, n_fs - 1)) - f = mutate_ui(fs[i]) - - return f - - -def mutate_ui( - f: Callable, -) -> Callable[ - [py_trees.behaviour.Behaviour, List[py_trees.behaviour.Behaviour]], MutationResult -]: - """Factory function for a tree mutator UI. - This creates a version of the atomic function `f` - which prompts the user for the appropriate arguments - based on `f`'s type annotations. - """ - - signature = inspect.signature(f) - - @wraps(f) - def f_inner(tree, library): - kwargs = {} - for parameter_name in signature.parameters.keys(): - annotation = signature.parameters[parameter_name].annotation - _logger.debug(f"getting arguments for {annotation=}") - value = prompt_get_mutate_arguments(annotation, tree, library) - kwargs[parameter_name] = value - inner_result = f(**kwargs) - return_value = MutationResult( - result=inner_result, tree=tree, function=f, kwargs=kwargs - ) - _logger.debug(return_value) - return return_value - - return f_inner - - -def prompt_get_mutate_arguments(annotation: GenericAlias, tree, library): - """Prompt the user to specify nodes and positions in the tree.""" - annotation_ = str(annotation) - assert isinstance(annotation_, str) - - if annotation_ == str(inspect.Parameter.empty): - _logger.debug("No argument annotation, returning None") - return None - elif annotation_ == str(ExistingNode): - _logger.debug("in ExistingNode") - node = prompt_identify_node(tree) - return node - elif annotation_ == str(CompositeIndex): - _logger.debug("in CompositeIndex") - composite_node = prompt_identify_composite(tree, message="Which parent?") - index = prompt_identify_child_index(composite_node) - return composite_node, index - elif annotation_ == str(NewNode): - _logger.debug("in NewNode") - new_node = prompt_identify_library_node( - library, message="Which node from the library?" - ) - return new_node - else: - _logger.debug("in 'else'") - msg = "Can't work out what to do with %s" % annotation - raise NotImplementedError(msg) - - -# ============================================================================= -# Utility functions -# ============================================================================= - - -class QuitException(Exception): - pass - - -def end_experiment(): - """I'm done, end the experiment.""" - raise QuitException("User ended the experiment.") - - -# ============================================================================= -# Main Loop -# ============================================================================= - - -def load_experiment(): - """Placeholder function for loading a tree and library (should come from a file).""" - tree = py_trees.composites.Sequence( - "S0", - False, - children=[ - py_trees.behaviours.Dummy("A"), - py_trees.composites.Sequence( - "S1", - memory=False, - children=[ - py_trees.behaviours.Dummy("B"), - py_trees.behaviours.Dummy("C"), - py_trees.behaviours.Dummy("D"), - ], - ), - py_trees.composites.Selector( - "S2", - memory=False, - children=[ - py_trees.behaviours.Dummy("E"), - py_trees.behaviours.Dummy("F"), - py_trees.behaviours.Failure(), - ], - ), - py_trees.behaviours.Success(), - ], - ) - library = [py_trees.behaviours.Success(), py_trees.behaviours.Failure()] - return tree, library - - -def save_results(tree, protocol): - _logger.info("saving results") - print(f"protocol: {protocol}") - print(f"tree:\n{py_trees.display.unicode_tree(tree)}") - - -app = typer.Typer() - - -@app.command() -def main(): - logging.basicConfig(level=logging.DEBUG) - tree, library = load_experiment() - protocol = [] - - # The main loop of the experiment - while f := mutate_chooser(insert, move, exchange, remove, end_experiment): - results = f(tree, library) - _logger.debug(results) - protocol.append(results) - print(py_trees.display.ascii_tree(tree)) - - -if __name__ == "__main__": - app() diff --git a/src/social_norms_trees/behavior_library.py b/src/social_norms_trees/behavior_library.py deleted file mode 100644 index 4cbd7100..00000000 --- a/src/social_norms_trees/behavior_library.py +++ /dev/null @@ -1,11 +0,0 @@ -class BehaviorLibrary: - def __init__(self, behavior_list): - self.behaviors = behavior_list - self.behavior_from_display_name = { - behavior["name"]: behavior for behavior in behavior_list - } - self.behavior_from_id = {behavior["id"]: behavior for behavior in behavior_list} - - def __iter__(self): - for i in self.behaviors: - yield i diff --git a/src/social_norms_trees/custom_node_library.py b/src/social_norms_trees/custom_node_library.py deleted file mode 100644 index 4b58724e..00000000 --- a/src/social_norms_trees/custom_node_library.py +++ /dev/null @@ -1,19 +0,0 @@ -import py_trees - - -class CustomBehavior(py_trees.behaviours.Dummy): - def __init__(self, name, id_, display_name): - super().__init__(name) - self.id_ = id_ - self.display_name = display_name - - -class CustomSequence(py_trees.composites.Sequence): - def __init__(self, name, id_, display_name, children=None, memory=False): - super().__init__(name=name, memory=memory, children=children) - self.id_ = id_ - self.display_name = display_name - - # id of the behavior within the behavior library (persists) - # but also the unique id for the behavior within the tree (in case there are multiple instances of - # the behavior in one tree) diff --git a/src/social_norms_trees/serialize_tree.py b/src/social_norms_trees/serialize_tree.py deleted file mode 100644 index 930b1077..00000000 --- a/src/social_norms_trees/serialize_tree.py +++ /dev/null @@ -1,182 +0,0 @@ -from social_norms_trees.custom_node_library import CustomBehavior, CustomSequence - - -def serialize_tree(tree, include_children=True): - """ - Examples: - >>> from py_trees.behaviours import Dummy, Success, Failure - >>> from py_trees.composites import Sequence, Selector - - >>> serialize_tree(Dummy()) - {'type': 'Dummy', 'name': 'Dummy'} - - >>> serialize_tree(Success()) - {'type': 'Success', 'name': 'Success'} - - >>> serialize_tree(Failure()) - {'type': 'Failure', 'name': 'Failure'} - - >>> serialize_tree(Sequence("root", True, children=[Dummy()])) - {'type': 'Sequence', 'name': 'root', 'children': [{'type': 'Dummy', 'name': 'Dummy'}]} - - >>> serialize_tree(CustomBehavior("behavior", "theid", "display behavior")) - {'type': 'CustomBehavior', 'name': 'behavior', 'display_name': 'display behavior', 'id_': 'theid'} - - >>> serialize_tree(CustomSequence("root", "theid", "display root", children=[Dummy()])) - {'type': 'CustomSequence', 'name': 'root', 'display_name': 'display root', 'id_': 'theid', 'children': [{'type': 'Dummy', 'name': 'Dummy'}]} - """ - - data = { - "type": tree.__class__.__name__, - "name": tree.name, - } - if hasattr(tree, "display_name"): - data["display_name"] = tree.display_name - if hasattr(tree, "id_"): - data["id_"] = tree.id_ - if include_children and tree.children: - data["children"] = [serialize_tree(child) for child in tree.children] - - return data - - -def deserialize_library_element(description: dict): - """ - Examples: - >>> s = deserialize_library_element({"type": "Sequence", "name": "Sequence 0", "id": "s0"}) - >>> s - - - >>> s.id_ - 's0' - - >>> s.name - 'Sequence 0' - - >>> s.children - [] - - TODO: Implement selectors - >>> deserialize_library_element({"type": "Selector", "name": "Selector 0", "id": "s0"}) - Traceback (most recent call last): - ... - NotImplementedError: node_type=Selector is not implemented - - >>> b = deserialize_library_element({"type": "Behavior", "name": "Behavior 0", "id": "b0"}) - >>> b - - - >>> b.id_ - 'b0' - - >>> b.name - 'Behavior 0' - - - """ - assert isinstance(description["type"], str), ( - f"\nThere was an invalid configuration detected in the inputted behavior tree: " - f"Invalid type for node attribute 'type' found for node '{description['name']}'. " - f"Please ensure that the 'name' attribute is a string." - ) - - node_type = description["type"] - assert node_type in {"Sequence", "Selector", "Behavior"}, ( - f"\nThere was an invalid configuration detected in the inputted behavior tree: " - f"Invalid node type '{node_type}' found for node '{description['name']}'. " - f"Please ensure that all node types are correct and supported." - ) - - if node_type == "Sequence": - if "children" in description.keys(): - children = [ - deserialize_library_element(child) for child in description["children"] - ] - else: - children = [] - - node = CustomSequence( - name=description["name"], - id_=description["id"], - display_name=description["name"], - children=children, - ) - - elif node_type == "Behavior": - assert "children" not in description or len(description["children"]) == 0, ( - f"\nThere was an invalid configuration detected in the inputted behavior tree: " - f"Children were detected for Behavior type node '{description['name']}': " - f"Behavior nodes should not have any children. Please check the structure of your behavior tree." - ) - - node = CustomBehavior( - name=description["name"], - id_=description["id"], - display_name=description["name"], - ) - - else: - msg = "node_type=%s is not implemented" % node_type - raise NotImplementedError(msg) - - return node - - -def deserialize_tree(tree, behavior_library): - def deserialize_node(node): - assert type(node["type"] == str), ( - f"\nThere was an invalid configuration detected in the inputted behavior tree: " - f"Invalid type for node attribute 'type' found for node '{node['name']}'. " - f"Please ensure that the 'name' attribute is a string." - ) - assert type(node["name"] == str), ( - f"\nThere was an invalid configuration detected in the inputted behavior tree: " - f"Invalid type for node attribute 'name' found for node '{node['name']}'. " - f"Please ensure that the 'name' attribute is a string." - ) - - node_type = node["type"] - assert node_type in ["Sequence", "Selector", "Behavior"], ( - f"\nThere was an invalid configuration detected in the inputted behavior tree: " - f"Invalid node type '{node_type}' found for node '{node['name']}'. " - f"Please ensure that all node types are correct and supported." - ) - - behavior = behavior_library.behavior_from_display_name[node["name"]] - - if node_type == "Sequence": - children = [deserialize_node(child) for child in node["children"]] - - if behavior: - return CustomSequence( - name=behavior["name"], - id_=behavior["id"], - display_name=behavior["name"], - children=children, - ) - else: - raise ValueError( - f"Behavior {node['name']} not found in behavior library" - ) - - # TODO: node type Selector - - elif node_type == "Behavior": - assert "children" not in node or len(node["children"]) == 0, ( - f"\nThere was an invalid configuration detected in the inputted behavior tree: " - f"Children were detected for Behavior type node '{node['name']}': " - f"Behavior nodes should not have any children. Please check the structure of your behavior tree." - ) - - if behavior: - return CustomBehavior( - name=behavior["name"], - id_=behavior["id"], - display_name=behavior["name"], - ) - else: - raise ValueError( - f"Behavior {node['name']} not found in behavior library" - ) - - return deserialize_node(tree) diff --git a/src/social_norms_trees/ui_wrapper.py b/src/social_norms_trees/ui_wrapper.py deleted file mode 100644 index a81fe6ef..00000000 --- a/src/social_norms_trees/ui_wrapper.py +++ /dev/null @@ -1,229 +0,0 @@ -import logging -import pathlib -from typing import Annotated, List -import click -from datetime import datetime -import json -import os -import uuid -import py_trees -import traceback - -import typer - -from social_norms_trees.atomic_mutations import ( - QuitException, - exchange, - insert, - move, - mutate_chooser, - remove, - end_experiment, -) -from social_norms_trees.serialize_tree import ( - deserialize_library_element, - serialize_tree, - deserialize_tree, -) - -from social_norms_trees.behavior_library import BehaviorLibrary - -_logger = logging.getLogger(__name__) - - -def load_db(db_file): - if os.path.exists(db_file): - with open(db_file, "r") as f: - return json.load(f) - else: - return {} - - -def save_db(db, db_file): - """Saves the Python dictionary back to db.json.""" - - print(f"\nWriting results of simulation to {db_file}...") - - json_representation = json.dumps(db, indent=4) - - with open(db_file, "w") as f: - f.write(json_representation) - - -def experiment_setup(db, origin_tree): - print("\n") - participant_id = participant_login() - - experiment_id = initialize_experiment_record(db, participant_id, origin_tree) - - print("\nSetup Complete.\n") - - return participant_id, experiment_id - - -def participant_login(): - participant_id = click.prompt("Please enter your participant id", type=str) - - return participant_id - - -def load_resources(file_path): - try: - print(f"\nLoading behavior tree and behavior library from {file_path}...\n") - with open(file_path, "r") as file: - resources = json.load(file) - - except json.JSONDecodeError: - raise ValueError("Error") - except Exception: - raise RuntimeError("Error") - - behavior_tree = resources.get("behavior_tree") - behavior_list = resources.get("behavior_library") - context_paragraph = resources.get("context") - - behavior_tree = deserialize_tree(behavior_tree, BehaviorLibrary(behavior_list)) - - behavior_library = [deserialize_library_element(e) for e in behavior_list] - - print("Loading success.") - return behavior_tree, behavior_library, context_paragraph - - -def initialize_experiment_record(db, participant_id, origin_tree): - experiment_id = str(uuid.uuid4()) - - # TODO: look into python data class - - experiment_record = { - "experiment_id": experiment_id, - "participant_id": participant_id, - "base_behavior_tree": serialize_tree(origin_tree), - "start_date": datetime.now().isoformat(), - "action_history": [], - } - - db[experiment_id] = experiment_record - - return experiment_id - - -def display_tree(tree): - print(py_trees.display.ascii_tree(tree)) - return - - -def serialize_function_arguments(args): - results = {} - if isinstance(args, dict): - for key, value in args.items(): - results[key] = serialize_function_arguments(value) - return results - elif isinstance(args, py_trees.behaviour.Behaviour): - value = serialize_tree(args, include_children=False) - return value - elif isinstance(args, tuple): - value = tuple(serialize_function_arguments(i) for i in args) - return value - elif isinstance(args, list): - value = [serialize_function_arguments(i) for i in args] - return value - else: - return args - - -def run_experiment(tree, library): - # Loop for the actual experiment part, which takes user input to decide which action to take - print("\nExperiment beginning...\n") - - results_dict = { - "start_time": datetime.now().isoformat(), - "initial_behavior_tree": serialize_tree(tree), - "action_log": [], - } - - try: - while True: - display_tree(tree) - f = mutate_chooser(insert, move, exchange, remove, end_experiment) - if f is end_experiment: - break - results = f(tree, library) - results_dict["action_log"].append( - { - "type": results.function.__name__, - "kwargs": serialize_function_arguments(results.kwargs), - "time": datetime.now().isoformat(), - } - ) - - except QuitException: - pass - - except Exception: - print( - "\nAn error has occured during the experiment, the experiment will now end." - ) - results_dict["error_log"] = traceback.format_exc() - - # finally: - results_dict["final_behavior_tree"] = serialize_tree(tree) - results_dict["start_time"] = datetime.now().isoformat() - - return results_dict - - -app = typer.Typer() - - -@app.command() -def main( - resources_file: Annotated[ - pathlib.Path, - typer.Argument( - help="file with the experimental context, behavior tree, and behavior library" - ), - ], - db_file: Annotated[ - pathlib.Path, - typer.Option(help="file where the experimental results will be written"), - ] = "db.json", - verbose: Annotated[bool, typer.Option("--verbose")] = False, - debug: Annotated[bool, typer.Option("--debug")] = False, -): - if debug: - logging.basicConfig(level=logging.DEBUG) - _logger.debug("debug logging") - elif verbose: - logging.basicConfig(level=logging.INFO) - _logger.debug("verbose logging") - else: - logging.basicConfig() - - print("AIT Prototype #1 Simulator") - - # TODO: write up some context, assumptions made in the README - - db = load_db(db_file) - - # load tree to run experiment on, and behavior library - - original_tree, behavior_library, context_paragraph = load_resources(resources_file) - print(f"\nContext of this experiment: {context_paragraph}") - - participant_id, experiment_id = experiment_setup(db, original_tree) - results = run_experiment(original_tree, behavior_library) - db[experiment_id] = results - _logger.debug(db) - save_db(db, db_file) - - # TODO: define export file, that will be where we export the results to - - print("\nSimulation has ended.") - - # TODO: visualize the differences between old and new behavior trees after experiment. - # Potentially use git diff - - -if __name__ == "__main__": - app() diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index 4eef093a..00000000 --- a/tests/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -# The inclusion of the tests module is not meant to offer best practices for -# testing in general. diff --git a/tests/test_stub.py b/tests/test_stub.py deleted file mode 100644 index 3ada1ee4..00000000 --- a/tests/test_stub.py +++ /dev/null @@ -1,2 +0,0 @@ -def test_placeholder(): - assert True