import os
import numpy as np
import math
import cv2
import skimage
from PIL import Image
from torchvision import transforms
from constants import gt_categories_mapping
from agents.utils.semantic_prediction import SemanticPredMaskRCNN
import envs.utils.pose as pu
import agents.utils.visualization as vu
from constants import color_palette
from envs.utils.fmm_planner import FMMPlanner
import torch
import time
from constants import ade_class
class PlanClass:
def __init__(self, args,device=None):
self.num_scenes = 1
self.args = args
self.col_width = None
self.curr_loc = None
self.count_forward_actions = None
self.last_action = None
self.last_loc = None
self.device = device
map_shape = (args.map_size_cm // args.map_resolution,
args.map_size_cm // args.map_resolution)
self.collision_map = np.zeros(map_shape)
self.legend = cv2.imread('docs/legend.png')
self.vis_image=vu.init_vis_image(self.goal_name, self.legend)
self.visited = np.zeros(map_shape)
self.visited_vis = np.zeros(map_shape)
self.metricsByCategorykey = {}
self.res = transforms.Compose(
transforms.Resize((args.frame_height, args.frame_width),
self.sem_pred = SemanticPredMaskRCNN(args)
self.selem = skimage.morphology.disk(3)
def reset(self):
args = self.args
# Episode initializations
map_shape = (args.map_size_cm // args.map_resolution,
args.map_size_cm // args.map_resolution)
self.collision_map = np.zeros(map_shape)
self.visited = np.zeros(map_shape)
self.visited_vis = np.zeros(map_shape)
self.col_width = 1
self.count_forward_actions = 0
self.curr_loc = [0., 0., 0.]
self.curr_loc = [args.map_size_cm / 100.0 / 2.0,
args.map_size_cm / 100.0 / 2.0, 0.]
self.last_action = None
def plan(self, planner_inputs):
args = self.args
self.last_loc = self.curr_loc
# Get Map prediction
map_pred = np.rint(planner_inputs['map_pred'])
goal = planner_inputs['goal']
# Get pose prediction and global policy planning window
start_x, start_y, start_o, gx1, gx2, gy1, gy2 = \
gx1, gx2, gy1, gy2 = int(gx1), int(gx2), int(gy1), int(gy2)
planning_window = [gx1, gx2, gy1, gy2]
# Get curr loc
self.curr_loc = [start_x, start_y, start_o]
r, c = start_y, start_x
# print("curr_loc", self.curr_loc)
start = [int(r * 100.0 / args.map_resolution - gx1),
int(c * 100.0 / args.map_resolution - gy1)]
# print(start)
start = pu.threshold_poses(start, map_pred.shape)
# print(start)
self.visited[gx1:gx2, gy1:gy2][start[0] - 0:start[0] + 1,
start[1] - 0:start[1] + 1] = 1
#if args.visualize or args.print_images:
# Get last loc
last_start_x, last_start_y = self.last_loc[0], self.last_loc[1]
r, c = last_start_y, last_start_x
last_start = [int(r * 100.0 / args.map_resolution - gx1),
int(c * 100.0 / args.map_resolution - gy1)]
last_start = pu.threshold_poses(last_start, map_pred.shape)
self.visited_vis[gx1:gx2, gy1:gy2] = \
vu.draw_line(last_start, start,
self.visited_vis[gx1:gx2, gy1:gy2])
# Collision check
if self.last_action == 1:
x1, y1, t1 = self.last_loc
x2, y2, _ = self.curr_loc
buf = 4
length = 2
if abs(x1 - x2) < 0.05 and abs(y1 - y2) < 0.05:
self.col_width += 2
if self.col_width == 7:
length = 4
buf = 3
self.col_width = min(self.col_width, 5)
self.col_width = 1
dist = pu.get_l2_distance(x1, x2, y1, y2)
if dist < args.collision_threshold: # Collision
width = self.col_width
for i in range(length):
for j in range(width):
wx = x1 + 0.05 * \
((i + buf) * np.cos(np.deg2rad(t1))
+ (j - width // 2) * np.sin(np.deg2rad(t1)))
wy = y1 + 0.05 * \
((i + buf) * np.sin(np.deg2rad(t1))
- (j - width // 2) * np.cos(np.deg2rad(t1)))
r, c = wy, wx
r, c = int(r * 100 / args.map_resolution), \
int(c * 100 / args.map_resolution)
[r, c] = pu.threshold_poses([r, c],
self.collision_map[r, c] = 1
stg, stop = self._get_stg(map_pred, start, np.copy(goal),
# Deterministic Local Policy
if stop and planner_inputs['found_goal'] == 1:
action = 0 # Stop
(stg_x, stg_y) = stg
angle_st_goal = math.degrees(math.atan2(stg_x - start[0],
stg_y - start[1]))
# print("stg", stg)
# print("start", start)
angle_agent = start_o % 360.0
if angle_agent > 180:
angle_agent -= 360
# print("angle_agent=", angle_agent)
# print("angle_st_goal=", angle_st_goal)
relative_angle = (angle_agent - angle_st_goal) % 360.0
# print("relative_angle=", relative_angle)
if relative_angle > 180:
relative_angle -= 360
if relative_angle > self.args.turn_angle / 2.:
action = 3 # Right
elif relative_angle < -self.args.turn_angle / 2.:
action = 2 # Left
action = 1 # Forward
# if self.args.visualize or self.args.print_images:
self.last_action = action
# if action==0:
return action
def _get_stg(self, grid, start, goal, planning_window):
"""Get short-term goal"""
[gx1, gx2, gy1, gy2] = planning_window
x1, y1, = 0, 0
x2, y2 = grid.shape
def add_boundary(mat, value=1):
if (len(mat.shape)) == 3:
mat = mat[0]
h, w = mat.shape
new_mat = np.zeros((h + 2, w + 2)) + value
new_mat[1:h + 1, 1:w + 1] = mat
return new_mat
traversible = skimage.morphology.binary_dilation(
grid[x1:x2, y1:y2],
self.selem) != True
traversible[self.collision_map[gx1:gx2, gy1:gy2]
[x1:x2, y1:y2] == 1] = 0
traversible[self.visited[gx1:gx2, gy1:gy2][x1:x2, y1:y2] == 1] = 1
traversible[int(start[0] - x1) - 1:int(start[0] - x1) + 2,
int(start[1] - y1) - 1:int(start[1] - y1) + 2] = 1
traversible = add_boundary(traversible)
goal = add_boundary(goal, value=0)
planner = FMMPlanner(traversible)
selem = skimage.morphology.disk(10)
goal = skimage.morphology.binary_dilation(
goal, selem) != True
goal = 1 - goal * 1.
state = [start[0] - x1 + 1, start[1] - y1 + 1]
stg_x, stg_y, _, stop = planner.get_short_term_goal(state)
stg_x, stg_y = stg_x + x1 - 1, stg_y + y1 - 1
return (stg_x, stg_y), stop
def preprocess_obs(self, observations, use_seg=True):
args = self.args
rgb = observations['rgb']
depth = observations['depth']
if use_seg:
sem = observations['semantic']#GT
metricsByCategoryvalue = {0:"chair", 1:"bed", 2:"plant", 3:"toilet",
4:"tv_monitor", 5:"sofa"}
if not self.vis_image_check:
metricsByCategoryvalue = {0:"chair", 1:"bed", 2:"plant", 3:"toilet",
4:"tv_monitor", 5:"sofa"}
self.vis_image=vu.init_vis_image(self.goal_name, self.legend)
sem_seg_pred = self._get_sem_pred(
depth = self._preprocess_depth(depth, args.min_depth, args.max_depth)
ds = args.env_frame_width // args.frame_width # Downscaling factor
if ds != 1:
rgb = np.asarray(self.res(rgb.astype(np.uint8)))
depth = depth[ds // 2::ds, ds // 2::ds]
sem_seg_pred = sem_seg_pred[ds // 2::ds, ds // 2::ds]
depth = np.expand_dims(depth, axis=2)
state = np.concatenate((rgb, depth, sem_seg_pred),
axis=2).transpose(2, 0, 1)
return state[np.newaxis]
def _preprocess_depth(self, depth, min_d, max_d):
depth = depth[:, :, 0] * 1
for i in range(depth.shape[1]):
depth[:, i][depth[:, i] == 0.] = depth[:, i].max()
mask2 = depth > 0.99
depth[mask2] = 0.
mask1 = depth == 0
depth[mask1] = 100.0
depth = min_d * 100.0 + depth * max_d * 100.0
return depth
def _get_sem_pred(self, rgb, depth, use_seg=True):
if use_seg:
semantic_pred, self.rgb_vis = self.sem_pred.get_prediction(rgb.astype(np.uint8))
semantic_pred = semantic_pred.astype(np.float32)
semantic_pred = np.zeros((rgb.shape[0], rgb.shape[1], 16))
self.rgb_vis = rgb[:, :, ::-1]
return semantic_pred
def _visualize(self, inputs):
args = self.args
dump_dir = "{}/dump/{}/".format(args.dump_location,
ep_dir = '{}/episodes/thread_{}/eps_{}/'.format(
dump_dir, 0, 0)
if not os.path.exists(ep_dir):
map_pred = inputs['map_pred']
exp_pred = inputs['exp_pred']
start_x, start_y, start_o, gx1, gx2, gy1, gy2 = inputs['pose_pred']
goal = inputs['goal']
if 'sem_map_pred' in inputs.keys():
sem_map = inputs['sem_map_pred']
gx1, gx2, gy1, gy2 = int(gx1), int(gx2), int(gy1), int(gy2)
#sem_map += 5
sem_map += 4
no_cat_mask = sem_map == 20
map_mask = np.rint(map_pred) == 1
exp_mask = np.rint(exp_pred) == 1
vis_mask = self.visited_vis[gx1:gx2, gy1:gy2] == 1
sem_map[no_cat_mask] = 0
m1 = np.logical_and(no_cat_mask, exp_mask)
sem_map[m1] = 2
m2 = np.logical_and(no_cat_mask, map_mask)
sem_map[m2] = 1
sem_map[vis_mask] = 3
selem = skimage.morphology.disk(4)
if len(goal.shape)==2:
goal_mat = 1 - skimage.morphology.binary_dilation(
goal, selem) != True
goal_mask = goal_mat == 1
sem_map[goal_mask] = 5
color_pal = [int(x * 255.) for x in color_palette]
sem_map_vis ="P", (sem_map.shape[1],
sem_map_vis = sem_map_vis.convert("RGB")
sem_map_vis = np.flipud(sem_map_vis)
sem_map_vis = sem_map_vis[:, :, [2, 1, 0]]
sem_map_vis = cv2.resize(sem_map_vis, (480, 480),
self.vis_image[50:530, 15:655] = self.rgb_vis
self.vis_image[50:530, 670:1150] = sem_map_vis
#self.vis_image[50:530, 670:1310] = self.sem_vis
pos = (
(start_x * 100. / args.map_resolution - gy1)
* 480 / map_pred.shape[0],
(map_pred.shape[1] - start_y * 100. / args.map_resolution + gx1)
* 480 / map_pred.shape[1],
agent_arrow = vu.get_contour_points(pos, origin=(670, 50))
color = (int(color_palette[11] * 255),
int(color_palette[10] * 255),
int(color_palette[9] * 255))
cv2.drawContours(self.vis_image, [agent_arrow], 0, color, -1)
fontScale = 1
color = (20, 20, 20) # BGR
thickness = 2
text = "Observations (Goal: {})".format(self.goal_name)
textsize = cv2.getTextSize(text, font, fontScale, thickness)[0]
textX = (640 - textsize[0]) // 2 + 15
textY = (50 + textsize[1]) // 2
vis_image = cv2.putText(self.vis_image, text, (textX, textY),
font, fontScale, color, thickness,
cv2.imshow("image", self.vis_image)
color = (255, 255, 255) # BGR
vis_image = cv2.putText(self.vis_image, text, (textX, textY),
font, fontScale, color, thickness,
