Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
S
SemEXPforbob
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to JiHu GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Liangxinyue
SemEXPforbob
Merge requests
!3
Update myPlanClass.py
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
Update myPlanClass.py
Liangsanzhu-master-patch-53864
into
master
Overview
0
Commits
1
Pipelines
0
Changes
1
Merged
Liangxinyue
requested to merge
Liangsanzhu-master-patch-53864
into
master
2 years ago
Overview
0
Commits
1
Pipelines
0
Changes
1
Expand
0
0
Merge request reports
Viewing commit
6f7db2a0
Show latest version
1 file
+
426
−
426
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
6f7db2a0
Update myPlanClass.py
· 6f7db2a0
Liangxinyue
authored
2 years ago
myPlanClass.py
+
426
−
426
Options
import
os
import
numpy
as
np
import
math
import
cv2
import
skimage
from
PIL
import
Image
from
torchvision
import
transforms
from
constants
import
gt_categories_mapping
from
agents.utils.semantic_prediction
import
SemanticPredMaskRCNN
import
envs.utils.pose
as
pu
import
agents.utils.visualization
as
vu
from
constants
import
color_palette
from
envs.utils.fmm_planner
import
FMMPlanner
import
torch
import
time
from
constants
import
ade_class
#主要是做PLAN,并收集
class
PlanClass
:
def
__init__
(
self
,
args
,
device
=
None
):
self
.
num_scenes
=
1
self
.
args
=
args
self
.
sim
=
None
self
.
col_width
=
None
self
.
curr_loc
=
None
self
.
count_forward_actions
=
None
self
.
last_action
=
None
self
.
last_loc
=
None
self
.
device
=
device
map_shape
=
(
args
.
map_size_cm
//
args
.
map_resolution
,
args
.
map_size_cm
//
args
.
map_resolution
)
self
.
collision_map
=
np
.
zeros
(
map_shape
)
self
.
legend
=
cv2
.
imread
(
'
docs/legend.png
'
)
self
.
vis_image
=
None
self
.
vis_image_check
=
True
self
.
cate_list
=
[]
self
.
goal_name
=
"
door
"
self
.
count
=
0
self
.
vis_image
=
vu
.
init_vis_image
(
self
.
goal_name
,
self
.
legend
)
self
.
visited
=
np
.
zeros
(
map_shape
)
self
.
visited_vis
=
np
.
zeros
(
map_shape
)
self
.
metricsByCategorykey
=
{}
self
.
res
=
transforms
.
Compose
(
[
transforms
.
ToPILImage
(),
transforms
.
Resize
((
args
.
frame_height
,
args
.
frame_width
),
interpolation
=
Image
.
NEAREST
)])
self
.
sem_pred
=
SemanticPredMaskRCNN
(
args
)
self
.
selem
=
skimage
.
morphology
.
disk
(
3
)
def
reset
(
self
):
args
=
self
.
args
# Episode initializations
map_shape
=
(
args
.
map_size_cm
//
args
.
map_resolution
,
args
.
map_size_cm
//
args
.
map_resolution
)
self
.
collision_map
=
np
.
zeros
(
map_shape
)
self
.
visited
=
np
.
zeros
(
map_shape
)
self
.
visited_vis
=
np
.
zeros
(
map_shape
)
self
.
col_width
=
1
self
.
count_forward_actions
=
0
self
.
curr_loc
=
[
0.
,
0.
,
0.
]
self
.
curr_loc
=
[
args
.
map_size_cm
/
100.0
/
2.0
,
args
.
map_size_cm
/
100.0
/
2.0
,
0.
]
self
.
last_action
=
None
def
plan
(
self
,
planner_inputs
):
args
=
self
.
args
self
.
last_loc
=
self
.
curr_loc
# Get Map prediction
map_pred
=
np
.
rint
(
planner_inputs
[
'
map_pred
'
])
goal
=
planner_inputs
[
'
goal
'
]
# Get pose prediction and global policy planning window
start_x
,
start_y
,
start_o
,
gx1
,
gx2
,
gy1
,
gy2
=
\
planner_inputs
[
'
pose_pred
'
]
gx1
,
gx2
,
gy1
,
gy2
=
int
(
gx1
),
int
(
gx2
),
int
(
gy1
),
int
(
gy2
)
planning_window
=
[
gx1
,
gx2
,
gy1
,
gy2
]
# Get curr loc
self
.
curr_loc
=
[
start_x
,
start_y
,
start_o
]
r
,
c
=
start_y
,
start_x
# print("curr_loc", self.curr_loc)
start
=
[
int
(
r
*
100.0
/
args
.
map_resolution
-
gx1
),
int
(
c
*
100.0
/
args
.
map_resolution
-
gy1
)]
# print(start)
start
=
pu
.
threshold_poses
(
start
,
map_pred
.
shape
)
# print(start)
self
.
visited
[
gx1
:
gx2
,
gy1
:
gy2
][
start
[
0
]
-
0
:
start
[
0
]
+
1
,
start
[
1
]
-
0
:
start
[
1
]
+
1
]
=
1
#if args.visualize or args.print_images:
# Get last loc
last_start_x
,
last_start_y
=
self
.
last_loc
[
0
],
self
.
last_loc
[
1
]
r
,
c
=
last_start_y
,
last_start_x
last_start
=
[
int
(
r
*
100.0
/
args
.
map_resolution
-
gx1
),
int
(
c
*
100.0
/
args
.
map_resolution
-
gy1
)]
last_start
=
pu
.
threshold_poses
(
last_start
,
map_pred
.
shape
)
self
.
visited_vis
[
gx1
:
gx2
,
gy1
:
gy2
]
=
\
vu
.
draw_line
(
last_start
,
start
,
self
.
visited_vis
[
gx1
:
gx2
,
gy1
:
gy2
])
# Collision check
if
self
.
last_action
==
1
:
x1
,
y1
,
t1
=
self
.
last_loc
x2
,
y2
,
_
=
self
.
curr_loc
buf
=
4
length
=
2
if
abs
(
x1
-
x2
)
<
0.05
and
abs
(
y1
-
y2
)
<
0.05
:
self
.
col_width
+=
2
if
self
.
col_width
==
7
:
length
=
4
buf
=
3
self
.
col_width
=
min
(
self
.
col_width
,
5
)
else
:
self
.
col_width
=
1
dist
=
pu
.
get_l2_distance
(
x1
,
x2
,
y1
,
y2
)
if
dist
<
args
.
collision_threshold
:
# Collision
width
=
self
.
col_width
for
i
in
range
(
length
):
for
j
in
range
(
width
):
wx
=
x1
+
0.05
*
\
((
i
+
buf
)
*
np
.
cos
(
np
.
deg2rad
(
t1
))
+
(
j
-
width
//
2
)
*
np
.
sin
(
np
.
deg2rad
(
t1
)))
wy
=
y1
+
0.05
*
\
((
i
+
buf
)
*
np
.
sin
(
np
.
deg2rad
(
t1
))
-
(
j
-
width
//
2
)
*
np
.
cos
(
np
.
deg2rad
(
t1
)))
r
,
c
=
wy
,
wx
r
,
c
=
int
(
r
*
100
/
args
.
map_resolution
),
\
int
(
c
*
100
/
args
.
map_resolution
)
[
r
,
c
]
=
pu
.
threshold_poses
([
r
,
c
],
self
.
collision_map
.
shape
)
self
.
collision_map
[
r
,
c
]
=
1
#计算短期目标
stg
,
stop
=
self
.
_get_stg
(
map_pred
,
start
,
np
.
copy
(
goal
),
planning_window
)
# Deterministic Local Policy
#找到了
if
stop
and
planner_inputs
[
'
found_goal
'
]
==
1
:
action
=
0
# Stop
else
:
#没找到
(
stg_x
,
stg_y
)
=
stg
angle_st_goal
=
math
.
degrees
(
math
.
atan2
(
stg_x
-
start
[
0
],
stg_y
-
start
[
1
]))
# print("stg", stg)
# print("start", start)
angle_agent
=
start_o
%
360.0
if
angle_agent
>
180
:
angle_agent
-=
360
# print("angle_agent=", angle_agent)
# print("angle_st_goal=", angle_st_goal)
relative_angle
=
(
angle_agent
-
angle_st_goal
)
%
360.0
# print("relative_angle=", relative_angle)
if
relative_angle
>
180
:
relative_angle
-=
360
if
relative_angle
>
self
.
args
.
turn_angle
/
2.
:
action
=
3
# Right
elif
relative_angle
<
-
self
.
args
.
turn_angle
/
2.
:
action
=
2
# Left
else
:
action
=
1
# Forward
# if self.args.visualize or self.args.print_images:
self
.
_visualize
(
planner_inputs
)
self
.
last_action
=
action
# if action==0:
#print("my"+str(action))
return
action
def
_get_stg
(
self
,
grid
,
start
,
goal
,
planning_window
):
"""
Get short-term goal
"""
[
gx1
,
gx2
,
gy1
,
gy2
]
=
planning_window
x1
,
y1
,
=
0
,
0
x2
,
y2
=
grid
.
shape
def
add_boundary
(
mat
,
value
=
1
):
if
(
len
(
mat
.
shape
))
==
3
:
mat
=
mat
[
0
]
h
,
w
=
mat
.
shape
new_mat
=
np
.
zeros
((
h
+
2
,
w
+
2
))
+
value
new_mat
[
1
:
h
+
1
,
1
:
w
+
1
]
=
mat
return
new_mat
traversible
=
skimage
.
morphology
.
binary_dilation
(
grid
[
x1
:
x2
,
y1
:
y2
],
self
.
selem
)
!=
True
traversible
[
self
.
collision_map
[
gx1
:
gx2
,
gy1
:
gy2
]
[
x1
:
x2
,
y1
:
y2
]
==
1
]
=
0
traversible
[
self
.
visited
[
gx1
:
gx2
,
gy1
:
gy2
][
x1
:
x2
,
y1
:
y2
]
==
1
]
=
1
traversible
[
int
(
start
[
0
]
-
x1
)
-
1
:
int
(
start
[
0
]
-
x1
)
+
2
,
int
(
start
[
1
]
-
y1
)
-
1
:
int
(
start
[
1
]
-
y1
)
+
2
]
=
1
traversible
=
add_boundary
(
traversible
)
goal
=
add_boundary
(
goal
,
value
=
0
)
#print("shortterm")
#print(goal)
planner
=
FMMPlanner
(
traversible
)
selem
=
skimage
.
morphology
.
disk
(
10
)
goal
=
skimage
.
morphology
.
binary_dilation
(
goal
,
selem
)
!=
True
goal
=
1
-
goal
*
1.
planner
.
set_multi_goal
(
goal
)
state
=
[
start
[
0
]
-
x1
+
1
,
start
[
1
]
-
y1
+
1
]
stg_x
,
stg_y
,
_
,
stop
=
planner
.
get_short_term_goal
(
state
)
stg_x
,
stg_y
=
stg_x
+
x1
-
1
,
stg_y
+
y1
-
1
return
(
stg_x
,
stg_y
),
stop
def
preprocess_obs
(
self
,
observations
,
use_seg
=
True
):
args
=
self
.
args
rgb
=
observations
[
'
rgb
'
]
depth
=
observations
[
'
depth
'
]
'''
if use_seg:
sem = observations[
'
semantic
'
]#GT
'''
goal
=
observations
[
'
objectgoal
'
]
metricsByCategoryvalue
=
{
0
:
"
chair
"
,
1
:
"
bed
"
,
2
:
"
plant
"
,
3
:
"
toilet
"
,
4
:
"
tv_monitor
"
,
5
:
"
sofa
"
}
self
.
goal_name
=
metricsByCategoryvalue
[
goal
[
0
]]
if
not
self
.
vis_image_check
:
goal
=
observations
[
'
objectgoal
'
]
metricsByCategoryvalue
=
{
0
:
"
chair
"
,
1
:
"
bed
"
,
2
:
"
plant
"
,
3
:
"
toilet
"
,
4
:
"
tv_monitor
"
,
5
:
"
sofa
"
}
self
.
goal_name
=
metricsByCategoryvalue
[
goal
[
0
]]
self
.
vis_image
=
vu
.
init_vis_image
(
self
.
goal_name
,
self
.
legend
)
self
.
vis_image_check
=
True
sem_seg_pred
=
self
.
_get_sem_pred
(
rgb
,
depth
,
use_seg
=
use_seg
)
depth
=
self
.
_preprocess_depth
(
depth
,
args
.
min_depth
,
args
.
max_depth
)
ds
=
args
.
env_frame_width
//
args
.
frame_width
# Downscaling factor
if
ds
!=
1
:
rgb
=
np
.
asarray
(
self
.
res
(
rgb
.
astype
(
np
.
uint8
)))
depth
=
depth
[
ds
//
2
::
ds
,
ds
//
2
::
ds
]
sem_seg_pred
=
sem_seg_pred
[
ds
//
2
::
ds
,
ds
//
2
::
ds
]
depth
=
np
.
expand_dims
(
depth
,
axis
=
2
)
state
=
np
.
concatenate
((
rgb
,
depth
,
sem_seg_pred
),
axis
=
2
).
transpose
(
2
,
0
,
1
)
return
state
[
np
.
newaxis
]
def
_preprocess_depth
(
self
,
depth
,
min_d
,
max_d
):
depth
=
depth
[:,
:,
0
]
*
1
for
i
in
range
(
depth
.
shape
[
1
]):
depth
[:,
i
][
depth
[:,
i
]
==
0.
]
=
depth
[:,
i
].
max
()
mask2
=
depth
>
0.99
depth
[
mask2
]
=
0.
mask1
=
depth
==
0
depth
[
mask1
]
=
100.0
depth
=
min_d
*
100.0
+
depth
*
max_d
*
100.0
return
depth
def
_get_sem_pred
(
self
,
rgb
,
depth
,
use_seg
=
True
):
if
use_seg
:
semantic_pred
,
self
.
rgb_vis
=
self
.
sem_pred
.
get_prediction
(
rgb
.
astype
(
np
.
uint8
))
semantic_pred
=
semantic_pred
.
astype
(
np
.
float32
)
else
:
semantic_pred
=
np
.
zeros
((
rgb
.
shape
[
0
],
rgb
.
shape
[
1
],
16
))
self
.
rgb_vis
=
rgb
[:,
:,
::
-
1
]
return
semantic_pred
def
_visualize
(
self
,
inputs
):
args
=
self
.
args
dump_dir
=
"
{}/dump/{}/
"
.
format
(
args
.
dump_location
,
args
.
exp_name
)
ep_dir
=
'
{}/episodes/thread_{}/eps_{}/
'
.
format
(
dump_dir
,
0
,
0
)
if
not
os
.
path
.
exists
(
ep_dir
):
os
.
makedirs
(
ep_dir
)
map_pred
=
inputs
[
'
map_pred
'
]
exp_pred
=
inputs
[
'
exp_pred
'
]
start_x
,
start_y
,
start_o
,
gx1
,
gx2
,
gy1
,
gy2
=
inputs
[
'
pose_pred
'
]
goal
=
inputs
[
'
goal
'
]
if
'
sem_map_pred
'
in
inputs
.
keys
():
sem_map
=
inputs
[
'
sem_map_pred
'
]
else
:
return
gx1
,
gx2
,
gy1
,
gy2
=
int
(
gx1
),
int
(
gx2
),
int
(
gy1
),
int
(
gy2
)
#sem_map += 5
sem_map
+=
4
no_cat_mask
=
sem_map
==
20
map_mask
=
np
.
rint
(
map_pred
)
==
1
exp_mask
=
np
.
rint
(
exp_pred
)
==
1
vis_mask
=
self
.
visited_vis
[
gx1
:
gx2
,
gy1
:
gy2
]
==
1
sem_map
[
no_cat_mask
]
=
0
m1
=
np
.
logical_and
(
no_cat_mask
,
exp_mask
)
sem_map
[
m1
]
=
2
m2
=
np
.
logical_and
(
no_cat_mask
,
map_mask
)
sem_map
[
m2
]
=
1
sem_map
[
vis_mask
]
=
3
selem
=
skimage
.
morphology
.
disk
(
4
)
if
len
(
goal
.
shape
)
==
2
:
goal_mat
=
1
-
skimage
.
morphology
.
binary_dilation
(
goal
,
selem
)
!=
True
else
:
goal_mat
=
False
goal_mask
=
goal_mat
==
1
sem_map
[
goal_mask
]
=
5
color_pal
=
[
int
(
x
*
255.
)
for
x
in
color_palette
]
sem_map_vis
=
Image
.
new
(
"
P
"
,
(
sem_map
.
shape
[
1
],
sem_map
.
shape
[
0
]))
sem_map_vis
.
putpalette
(
color_pal
)
sem_map_vis
.
putdata
(
sem_map
.
flatten
().
astype
(
np
.
uint8
))
sem_map_vis
=
sem_map_vis
.
convert
(
"
RGB
"
)
sem_map_vis
=
np
.
flipud
(
sem_map_vis
)
sem_map_vis
=
sem_map_vis
[:,
:,
[
2
,
1
,
0
]]
sem_map_vis
=
cv2
.
resize
(
sem_map_vis
,
(
480
,
480
),
interpolation
=
cv2
.
INTER_NEAREST
)
self
.
vis_image
[
50
:
530
,
15
:
655
]
=
self
.
rgb_vis
self
.
vis_image
[
50
:
530
,
670
:
1150
]
=
sem_map_vis
#self.vis_image[50:530, 670:1310] = self.sem_vis
pos
=
(
(
start_x
*
100.
/
args
.
map_resolution
-
gy1
)
*
480
/
map_pred
.
shape
[
0
],
(
map_pred
.
shape
[
1
]
-
start_y
*
100.
/
args
.
map_resolution
+
gx1
)
*
480
/
map_pred
.
shape
[
1
],
np
.
deg2rad
(
-
start_o
)
)
agent_arrow
=
vu
.
get_contour_points
(
pos
,
origin
=
(
670
,
50
))
color
=
(
int
(
color_palette
[
11
]
*
255
),
int
(
color_palette
[
10
]
*
255
),
int
(
color_palette
[
9
]
*
255
))
cv2
.
drawContours
(
self
.
vis_image
,
[
agent_arrow
],
0
,
color
,
-
1
)
font
=
cv2
.
FONT_HERSHEY_SIMPLEX
fontScale
=
1
color
=
(
20
,
20
,
20
)
# BGR
thickness
=
2
text
=
"
Observations (Goal: {})
"
.
format
(
self
.
goal_name
)
textsize
=
cv2
.
getTextSize
(
text
,
font
,
fontScale
,
thickness
)[
0
]
textX
=
(
640
-
textsize
[
0
])
//
2
+
15
textY
=
(
50
+
textsize
[
1
])
//
2
vis_image
=
cv2
.
putText
(
self
.
vis_image
,
text
,
(
textX
,
textY
),
font
,
fontScale
,
color
,
thickness
,
cv2
.
LINE_AA
)
cv2
.
imshow
(
"
image
"
,
self
.
vis_image
)
cv2
.
waitKey
(
int
(
1
))
color
=
(
255
,
255
,
255
)
# BGR
vis_image
=
cv2
.
putText
(
self
.
vis_image
,
text
,
(
textX
,
textY
),
font
,
fontScale
,
color
,
thickness
,
cv2
.
LINE_AA
)
#cv2.destroyAllWindows()
import
os
import
numpy
as
np
import
math
import
cv2
import
skimage
from
PIL
import
Image
from
torchvision
import
transforms
from
constants
import
gt_categories_mapping
from
agents.utils.semantic_prediction
import
SemanticPredMaskRCNN
import
envs.utils.pose
as
pu
import
agents.utils.visualization
as
vu
from
constants
import
color_palette
from
envs.utils.fmm_planner
import
FMMPlanner
import
torch
import
time
from
constants
import
ade_class
#主要是做PLAN,并收集
class
PlanClass
:
def
__init__
(
self
,
args
,
device
=
None
):
self
.
num_scenes
=
1
self
.
args
=
args
self
.
sim
=
None
self
.
col_width
=
None
self
.
curr_loc
=
None
self
.
count_forward_actions
=
None
self
.
last_action
=
None
self
.
last_loc
=
None
self
.
device
=
device
map_shape
=
(
args
.
map_size_cm
//
args
.
map_resolution
,
args
.
map_size_cm
//
args
.
map_resolution
)
self
.
collision_map
=
np
.
zeros
(
map_shape
)
self
.
legend
=
cv2
.
imread
(
'
docs/legend.png
'
)
self
.
vis_image
=
None
self
.
vis_image_check
=
True
self
.
cate_list
=
[]
self
.
goal_name
=
"
door
"
self
.
count
=
0
self
.
vis_image
=
vu
.
init_vis_image
(
self
.
goal_name
,
self
.
legend
)
self
.
visited
=
np
.
zeros
(
map_shape
)
self
.
visited_vis
=
np
.
zeros
(
map_shape
)
self
.
metricsByCategorykey
=
{}
self
.
res
=
transforms
.
Compose
(
[
transforms
.
ToPILImage
(),
transforms
.
Resize
((
args
.
frame_height
,
args
.
frame_width
),
interpolation
=
Image
.
NEAREST
)])
self
.
sem_pred
=
SemanticPredMaskRCNN
(
args
)
self
.
selem
=
skimage
.
morphology
.
disk
(
3
)
def
reset
(
self
):
args
=
self
.
args
# Episode initializations
map_shape
=
(
args
.
map_size_cm
//
args
.
map_resolution
,
args
.
map_size_cm
//
args
.
map_resolution
)
self
.
collision_map
=
np
.
zeros
(
map_shape
)
self
.
visited
=
np
.
zeros
(
map_shape
)
self
.
visited_vis
=
np
.
zeros
(
map_shape
)
self
.
col_width
=
1
self
.
count_forward_actions
=
0
self
.
curr_loc
=
[
0.
,
0.
,
0.
]
self
.
curr_loc
=
[
args
.
map_size_cm
/
100.0
/
2.0
,
args
.
map_size_cm
/
100.0
/
2.0
,
0.
]
self
.
last_action
=
None
def
plan
(
self
,
planner_inputs
):
args
=
self
.
args
self
.
last_loc
=
self
.
curr_loc
# Get Map prediction
map_pred
=
np
.
rint
(
planner_inputs
[
'
map_pred
'
])
goal
=
planner_inputs
[
'
goal
'
]
# Get pose prediction and global policy planning window
start_x
,
start_y
,
start_o
,
gx1
,
gx2
,
gy1
,
gy2
=
\
planner_inputs
[
'
pose_pred
'
]
gx1
,
gx2
,
gy1
,
gy2
=
int
(
gx1
),
int
(
gx2
),
int
(
gy1
),
int
(
gy2
)
planning_window
=
[
gx1
,
gx2
,
gy1
,
gy2
]
# Get curr loc
self
.
curr_loc
=
[
start_x
,
start_y
,
start_o
]
r
,
c
=
start_y
,
start_x
# print("curr_loc", self.curr_loc)
start
=
[
int
(
r
*
100.0
/
args
.
map_resolution
-
gx1
),
int
(
c
*
100.0
/
args
.
map_resolution
-
gy1
)]
# print(start)
start
=
pu
.
threshold_poses
(
start
,
map_pred
.
shape
)
# print(start)
self
.
visited
[
gx1
:
gx2
,
gy1
:
gy2
][
start
[
0
]
-
0
:
start
[
0
]
+
1
,
start
[
1
]
-
0
:
start
[
1
]
+
1
]
=
1
#if args.visualize or args.print_images:
# Get last loc
last_start_x
,
last_start_y
=
self
.
last_loc
[
0
],
self
.
last_loc
[
1
]
r
,
c
=
last_start_y
,
last_start_x
last_start
=
[
int
(
r
*
100.0
/
args
.
map_resolution
-
gx1
),
int
(
c
*
100.0
/
args
.
map_resolution
-
gy1
)]
last_start
=
pu
.
threshold_poses
(
last_start
,
map_pred
.
shape
)
self
.
visited_vis
[
gx1
:
gx2
,
gy1
:
gy2
]
=
\
vu
.
draw_line
(
last_start
,
start
,
self
.
visited_vis
[
gx1
:
gx2
,
gy1
:
gy2
])
# Collision check
if
self
.
last_action
==
1
:
x1
,
y1
,
t1
=
self
.
last_loc
x2
,
y2
,
_
=
self
.
curr_loc
buf
=
4
length
=
2
if
abs
(
x1
-
x2
)
<
0.05
and
abs
(
y1
-
y2
)
<
0.05
:
self
.
col_width
+=
2
if
self
.
col_width
==
7
:
length
=
4
buf
=
3
self
.
col_width
=
min
(
self
.
col_width
,
5
)
else
:
self
.
col_width
=
1
dist
=
pu
.
get_l2_distance
(
x1
,
x2
,
y1
,
y2
)
if
dist
<
args
.
collision_threshold
:
# Collision
width
=
self
.
col_width
for
i
in
range
(
length
):
for
j
in
range
(
width
):
wx
=
x1
+
0.05
*
\
((
i
+
buf
)
*
np
.
cos
(
np
.
deg2rad
(
t1
))
+
(
j
-
width
//
2
)
*
np
.
sin
(
np
.
deg2rad
(
t1
)))
wy
=
y1
+
0.05
*
\
((
i
+
buf
)
*
np
.
sin
(
np
.
deg2rad
(
t1
))
-
(
j
-
width
//
2
)
*
np
.
cos
(
np
.
deg2rad
(
t1
)))
r
,
c
=
wy
,
wx
r
,
c
=
int
(
r
*
100
/
args
.
map_resolution
),
\
int
(
c
*
100
/
args
.
map_resolution
)
[
r
,
c
]
=
pu
.
threshold_poses
([
r
,
c
],
self
.
collision_map
.
shape
)
self
.
collision_map
[
r
,
c
]
=
1
#计算短期目标
stg
,
stop
=
self
.
_get_stg
(
map_pred
,
start
,
np
.
copy
(
goal
),
planning_window
)
# Deterministic Local Policy
#找到了
if
stop
and
planner_inputs
[
'
found_goal
'
]
==
1
:
action
=
0
# Stop
else
:
#没找到
(
stg_x
,
stg_y
)
=
stg
angle_st_goal
=
math
.
degrees
(
math
.
atan2
(
stg_x
-
start
[
0
],
stg_y
-
start
[
1
]))
# print("stg", stg)
# print("start", start)
angle_agent
=
start_o
%
360.0
if
angle_agent
>
180
:
angle_agent
-=
360
# print("angle_agent=", angle_agent)
# print("angle_st_goal=", angle_st_goal)
relative_angle
=
(
angle_agent
-
angle_st_goal
)
%
360.0
# print("relative_angle=", relative_angle)
if
relative_angle
>
180
:
relative_angle
-=
360
if
relative_angle
>
self
.
args
.
turn_angle
/
2.
:
action
=
3
# Right
elif
relative_angle
<
-
self
.
args
.
turn_angle
/
2.
:
action
=
2
# Left
else
:
action
=
1
# Forward
# if self.args.visualize or self.args.print_images:
self
.
_visualize
(
planner_inputs
)
self
.
last_action
=
action
# if action==0:
#print("my"+str(action))
return
action
def
_get_stg
(
self
,
grid
,
start
,
goal
,
planning_window
):
"""
Get short-term goal
"""
[
gx1
,
gx2
,
gy1
,
gy2
]
=
planning_window
x1
,
y1
,
=
0
,
0
x2
,
y2
=
grid
.
shape
def
add_boundary
(
mat
,
value
=
1
):
if
(
len
(
mat
.
shape
))
==
3
:
mat
=
mat
[
0
]
h
,
w
=
mat
.
shape
new_mat
=
np
.
zeros
((
h
+
2
,
w
+
2
))
+
value
new_mat
[
1
:
h
+
1
,
1
:
w
+
1
]
=
mat
return
new_mat
traversible
=
skimage
.
morphology
.
binary_dilation
(
grid
[
x1
:
x2
,
y1
:
y2
],
self
.
selem
)
!=
True
traversible
[
self
.
collision_map
[
gx1
:
gx2
,
gy1
:
gy2
]
[
x1
:
x2
,
y1
:
y2
]
==
1
]
=
0
traversible
[
self
.
visited
[
gx1
:
gx2
,
gy1
:
gy2
][
x1
:
x2
,
y1
:
y2
]
==
1
]
=
1
traversible
[
int
(
start
[
0
]
-
x1
)
-
1
:
int
(
start
[
0
]
-
x1
)
+
2
,
int
(
start
[
1
]
-
y1
)
-
1
:
int
(
start
[
1
]
-
y1
)
+
2
]
=
1
traversible
=
add_boundary
(
traversible
)
goal
=
add_boundary
(
goal
,
value
=
0
)
#print("shortterm")
#print(goal)
planner
=
FMMPlanner
(
traversible
)
selem
=
skimage
.
morphology
.
disk
(
10
)
goal
=
skimage
.
morphology
.
binary_dilation
(
goal
,
selem
)
!=
True
goal
=
1
-
goal
*
1.
planner
.
set_multi_goal
(
goal
)
state
=
[
start
[
0
]
-
x1
+
1
,
start
[
1
]
-
y1
+
1
]
stg_x
,
stg_y
,
_
,
stop
=
planner
.
get_short_term_goal
(
state
)
stg_x
,
stg_y
=
stg_x
+
x1
-
1
,
stg_y
+
y1
-
1
return
(
stg_x
,
stg_y
),
stop
def
preprocess_obs
(
self
,
observations
,
use_seg
=
True
):
args
=
self
.
args
rgb
=
observations
[
'
rgb
'
]
depth
=
observations
[
'
depth
'
]
'''
if use_seg:
sem = observations[
'
semantic
'
]#GT
'''
goal
=
observations
[
'
objectgoal
'
]
metricsByCategoryvalue
=
{
0
:
"
chair
"
,
1
:
"
bed
"
,
2
:
"
plant
"
,
3
:
"
toilet
"
,
4
:
"
tv_monitor
"
,
5
:
"
sofa
"
}
self
.
goal_name
=
metricsByCategoryvalue
[
goal
[
0
]]
if
not
self
.
vis_image_check
:
goal
=
observations
[
'
objectgoal
'
]
metricsByCategoryvalue
=
{
0
:
"
chair
"
,
1
:
"
bed
"
,
2
:
"
plant
"
,
3
:
"
toilet
"
,
4
:
"
tv_monitor
"
,
5
:
"
sofa
"
}
self
.
goal_name
=
metricsByCategoryvalue
[
goal
[
0
]]
self
.
vis_image
=
vu
.
init_vis_image
(
self
.
goal_name
,
self
.
legend
)
self
.
vis_image_check
=
True
sem_seg_pred
=
self
.
_get_sem_pred
(
rgb
,
depth
,
use_seg
=
use_seg
)
depth
=
self
.
_preprocess_depth
(
depth
,
args
.
min_depth
,
args
.
max_depth
)
ds
=
args
.
env_frame_width
//
args
.
frame_width
# Downscaling factor
if
ds
!=
1
:
rgb
=
np
.
asarray
(
self
.
res
(
rgb
.
astype
(
np
.
uint8
)))
depth
=
depth
[
ds
//
2
::
ds
,
ds
//
2
::
ds
]
sem_seg_pred
=
sem_seg_pred
[
ds
//
2
::
ds
,
ds
//
2
::
ds
]
depth
=
np
.
expand_dims
(
depth
,
axis
=
2
)
state
=
np
.
concatenate
((
rgb
,
depth
,
sem_seg_pred
),
axis
=
2
).
transpose
(
2
,
0
,
1
)
return
state
[
np
.
newaxis
]
def
_preprocess_depth
(
self
,
depth
,
min_d
,
max_d
):
depth
=
depth
[:,
:,
0
]
*
1
for
i
in
range
(
depth
.
shape
[
1
]):
depth
[:,
i
][
depth
[:,
i
]
==
0.
]
=
depth
[:,
i
].
max
()
mask2
=
depth
>
0.99
depth
[
mask2
]
=
0.
mask1
=
depth
==
0
depth
[
mask1
]
=
100.0
depth
=
min_d
*
100.0
+
depth
*
max_d
*
100.0
return
depth
def
_get_sem_pred
(
self
,
rgb
,
depth
,
use_seg
=
True
):
if
use_seg
:
semantic_pred
,
self
.
rgb_vis
=
self
.
sem_pred
.
get_prediction
(
rgb
.
astype
(
np
.
uint8
))
semantic_pred
=
semantic_pred
.
astype
(
np
.
float32
)
else
:
semantic_pred
=
np
.
zeros
((
rgb
.
shape
[
0
],
rgb
.
shape
[
1
],
16
))
self
.
rgb_vis
=
rgb
[:,
:,
::
-
1
]
return
semantic_pred
def
_visualize
(
self
,
inputs
):
args
=
self
.
args
dump_dir
=
"
{}/dump/{}/
"
.
format
(
args
.
dump_location
,
args
.
exp_name
)
ep_dir
=
'
{}/episodes/thread_{}/eps_{}/
'
.
format
(
dump_dir
,
0
,
0
)
if
not
os
.
path
.
exists
(
ep_dir
):
os
.
makedirs
(
ep_dir
)
map_pred
=
inputs
[
'
map_pred
'
]
exp_pred
=
inputs
[
'
exp_pred
'
]
start_x
,
start_y
,
start_o
,
gx1
,
gx2
,
gy1
,
gy2
=
inputs
[
'
pose_pred
'
]
goal
=
inputs
[
'
goal
'
]
if
'
sem_map_pred
'
in
inputs
.
keys
():
sem_map
=
inputs
[
'
sem_map_pred
'
]
else
:
return
gx1
,
gx2
,
gy1
,
gy2
=
int
(
gx1
),
int
(
gx2
),
int
(
gy1
),
int
(
gy2
)
#sem_map += 5
sem_map
+=
5
no_cat_mask
=
sem_map
==
20
map_mask
=
np
.
rint
(
map_pred
)
==
1
exp_mask
=
np
.
rint
(
exp_pred
)
==
1
vis_mask
=
self
.
visited_vis
[
gx1
:
gx2
,
gy1
:
gy2
]
==
1
sem_map
[
no_cat_mask
]
=
0
m1
=
np
.
logical_and
(
no_cat_mask
,
exp_mask
)
sem_map
[
m1
]
=
2
m2
=
np
.
logical_and
(
no_cat_mask
,
map_mask
)
sem_map
[
m2
]
=
1
sem_map
[
vis_mask
]
=
3
selem
=
skimage
.
morphology
.
disk
(
4
)
if
len
(
goal
.
shape
)
==
2
:
goal_mat
=
1
-
skimage
.
morphology
.
binary_dilation
(
goal
,
selem
)
!=
True
else
:
goal_mat
=
False
goal_mask
=
goal_mat
==
1
sem_map
[
goal_mask
]
=
5
color_pal
=
[
int
(
x
*
255.
)
for
x
in
color_palette
]
sem_map_vis
=
Image
.
new
(
"
P
"
,
(
sem_map
.
shape
[
1
],
sem_map
.
shape
[
0
]))
sem_map_vis
.
putpalette
(
color_pal
)
sem_map_vis
.
putdata
(
sem_map
.
flatten
().
astype
(
np
.
uint8
))
sem_map_vis
=
sem_map_vis
.
convert
(
"
RGB
"
)
sem_map_vis
=
np
.
flipud
(
sem_map_vis
)
sem_map_vis
=
sem_map_vis
[:,
:,
[
2
,
1
,
0
]]
sem_map_vis
=
cv2
.
resize
(
sem_map_vis
,
(
480
,
480
),
interpolation
=
cv2
.
INTER_NEAREST
)
self
.
vis_image
[
50
:
530
,
15
:
655
]
=
self
.
rgb_vis
self
.
vis_image
[
50
:
530
,
670
:
1150
]
=
sem_map_vis
#self.vis_image[50:530, 670:1310] = self.sem_vis
pos
=
(
(
start_x
*
100.
/
args
.
map_resolution
-
gy1
)
*
480
/
map_pred
.
shape
[
0
],
(
map_pred
.
shape
[
1
]
-
start_y
*
100.
/
args
.
map_resolution
+
gx1
)
*
480
/
map_pred
.
shape
[
1
],
np
.
deg2rad
(
-
start_o
)
)
agent_arrow
=
vu
.
get_contour_points
(
pos
,
origin
=
(
670
,
50
))
color
=
(
int
(
color_palette
[
11
]
*
255
),
int
(
color_palette
[
10
]
*
255
),
int
(
color_palette
[
9
]
*
255
))
cv2
.
drawContours
(
self
.
vis_image
,
[
agent_arrow
],
0
,
color
,
-
1
)
font
=
cv2
.
FONT_HERSHEY_SIMPLEX
fontScale
=
1
color
=
(
20
,
20
,
20
)
# BGR
thickness
=
2
text
=
"
Observations (Goal: {})
"
.
format
(
self
.
goal_name
)
textsize
=
cv2
.
getTextSize
(
text
,
font
,
fontScale
,
thickness
)[
0
]
textX
=
(
640
-
textsize
[
0
])
//
2
+
15
textY
=
(
50
+
textsize
[
1
])
//
2
vis_image
=
cv2
.
putText
(
self
.
vis_image
,
text
,
(
textX
,
textY
),
font
,
fontScale
,
color
,
thickness
,
cv2
.
LINE_AA
)
cv2
.
imshow
(
"
image
"
,
self
.
vis_image
)
cv2
.
waitKey
(
int
(
1
))
color
=
(
255
,
255
,
255
)
# BGR
vis_image
=
cv2
.
putText
(
self
.
vis_image
,
text
,
(
textX
,
textY
),
font
,
fontScale
,
color
,
thickness
,
cv2
.
LINE_AA
)
#cv2.destroyAllWindows()
Loading