Task planning » History » Revision 2
Revision 1 (Florian Seguin, 2020-09-10 11:41) → Revision 2/3 (Florian Seguin, 2020-09-10 11:41)
h1. Task planification The Planner module can be used to create a Plan using a modified version of "Dana Nau's pyhop":http://www.cs.umd.edu/projects/shop/index.html. The plan created is then translated into a sequence of mission types objects from the base module. This sequence can be sent to the autopilot and executed from there. Right now, the module is under heavy development and thus may contain bugs, missing features, wrong models... h2. HTN, pyhop and plans Hierarchical task network (HTN) is a planification method where every action is structured. There are three main parts that compose the network : - Primitive tasks : they represent roughly an action of the UAV - Compound tasks : they represent a sequence of actions ordered. The compound task is only feasible when all actions can be done in the order given by the compound task - Decision tasks : they represent a choice between different ways of doing the task. A decision task is feasible whenever a way of doing the task is feasible. These actions are then structured into a tree, where each leaf is a primitive task and every other node is a decision or a compound task. For modeling our tree of decisions and solving we use a modified version of pyhop, a planner written by Dana Nau. We need to first create a description of our "world", and define each primitive, compound and decision tasks. <pre><code class="python"># Creating primitive tasks def simple_task1(state, goal): # Doing stuff with state return state def simple_task2(state, goal): # Doing other stuff with state return state # Creating compound tasks def compound_task1(state, goal): return [('simple_task1', goal), ('simple_task2', goal)] def compound_task2(state, goal): return [('simple_task2', goal), ('simple_task1', goal)] # Pyhop needs to know the primitive tasks pyhop.declare_operators(simple_task1, simple_task2) # You can declare your decisions methods this way pyhop.declare_methods('objective', compound_task1, compound_task2) </code></pre> Once the "world" is described, we can try to solve the problem. <pre><code class="python"># We solve the problem by calling solve method of pyhop pyhop.solve(state, ['objective', goal]) </code></pre> Using our precedent examples, we can draw our tree and explore it. Instead of returning a plan after one is found, it explores always all the tree and returns the plan with a the minimum cost based on a criteria. Since we are working with UAVs, these criterias can be the battery ('bat') or the time ('time'). h2. Translation and transmitting plans to the UAV To translate and transmit a plan to the UAV, we use the PlanManager class. The PlanManager class is a centralized class where every UAV using a specific plugin (the PlanUpdater plugin) is registered. We will get into it later. Since it is centralized, the class itself is a Singleton, meaning there can only be one instance of the object at time. It contains the current state of the UAVs, and other objects related to UAVs. To create a plan, you can use the method create_plan. <pre><code class="python">def create_plan(self, uavId, objective, goalParams, homeParams, minimizeBy='bat', verbose=0): """ Create a plan using the different informations passed in parameters. The plan is created via a PlanObjective object. Translates the plan and updates the uavsPlan attribute Parameters ---------- uavId : int The id of the UAV objective : str The string of the objective we want to fulfill goalParams : dict The goal parameters (i.e. the position of the cloud we target) homeParams : dict The home parameters (i.e. the position where UAVs are launched) minimizeBy : str The parameter to minimize (time, bat...) verbose : int Used for printing """ initState = {} for uId, uStatus in self.uavsStatus.items(): initState[uId] = copy.deepcopy(self.uavsStatus[uId]).to_dict() initState[uId].update(self.uavsParameters[uId]) planObject = PlanObjective(uavId, objective, state=initState, goal=goalParams, home=homeParams) self.uavsPlan[uavId] = self.translate_plan(uavId, planObject.solve(minimizeBy, verbose=verbose)) </code></pre> Note : we use an object called PlanObjective to translate our plan into pyhop goal and state objects. We also translate immediately the plan into a sequence of nephelae.mission.types.MissionTypes. Once translated, the plan is stored into the associated variable. When a UAV is accepting or deleting a plan computed, this variable becomes empty. **Note that if another plan is computed for the same UAV, the previous plan that was contained into the associated variable is erased and replaced by the new one**. h2. Executing a UAV plan After a plan has been accepted by the UAV (note that you need to plug the PlanUpdater plugin to the UAV to accept/reject plans), it is stored inside a dedicated variable called currentPlan. This plan can be accepted and sent to the autopilot. To accept a plan you can use the __accept_plan()__ method of the PlanUpdater plugin. To reject a plan you can use the __reject_plan()__ method of the PlanUpdater plugin. To send the missions to the autopilot and update the executingPlan variable, you can use the __execute_plan()__ method of the PlanUpdater plugin. The indices of the missions are duplicated and stored into another variable called executingPlan. This variable is then updated periodically with the missionStatus messages emitting from the autopilot. The callback used to update the executingPlan : <pre><code class="python">def mission_status_callback(self, missionStatus): """ Callback called whenever a message missionStatus is received. Updates the executingPlan attribute by popping missions from it (Note : the callback seems buggy to me, TBD) Parameters ---------- missionStatus : dict The messages containing the current indexes of the missions being executed and the remaining time of the current mission. """ index_list = missionStatus['index_list'] remaining_time = missionStatus['remaining_time'] popped_missions = [] for index in self.executingPlan: # Popping loop : removing finished tasks if index not in index_list: popped_missions.append(self.pop_plan().missionId) self.remainingTime = remaining_time print(popped_missions) </code></pre> This callback is contained inside the PlanUpdater plugin. Note that it might contain some bugs (for instance, a discrepancy between an old missionStatus message and a new executingPlan could empty the executingPlan variable).