Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
31 |
Tags
- 3dmapping
- realtimerendering
- rostopics
- rosnoetic
- turtlebot3
- vectorfields
- pointcloud
- sensorfusion
- differentiablerendering
- NERF
- imageprocessing
- covariancematrix
- adaptivedensitycontrol
- usbcamera
- vectorcalculus
- Slam
- turtlebot
- Ros
- 3dgaussiansplatting
- opencv
- tilebasedrasterizer
- alphablending
- raspberrypi
- LIDAR
- roslaunch
- catkinworkspace
- gaussiansplatting
- rospackages
- electromagnetism
- ComputerVision
Archives
- Today
- Total
Wiredwisdom
Fusion Final Code analysis 본문
#!/usr/bin/env python3
import rospy
from sensor_msgs.msg import LaserScan, Image
from cv_bridge import CvBridge, CvBridgeError
import numpy as np
import cv2
import math
from message_filters import ApproximateTimeSynchronizer, Subscriber
class CameraLidarFusion:
def __init__(self):
self.bridge = CvBridge()
# 동기화를 위한 Subscriber 설정
self.image_sub = Subscriber("/usb_cam/image_raw", Image)
self.scan_sub = Subscriber("/scan", LaserScan)
self.fusion_pub = rospy.Publisher("/fusion_image", Image, queue_size=1)
# 동기화 설정
self.ts = ApproximateTimeSynchronizer([self.image_sub, self.scan_sub], 10, 0.1)
self.ts.registerCallback(self.sync_callback)
# Camera parametersrosc
self.fx = 844
self.fy = 844
self.cx = 320
self.cy = 250
# 미리 계산된 색상 맵
self.color_map = self.create_color_map()
def sync_callback(self, image_msg, scan_msg):
try:
image_data = self.bridge.imgmsg_to_cv2(image_msg, "bgr8")
self.fuse_data(image_data, scan_msg)
except CvBridgeError as e:
rospy.logerr(f"CvBridge Error: {e}")
def fuse_data(self, image_data, scan_msg):
height, width = image_data.shape[:2]
fusion_image = image_data.copy()
angles = np.arange(len(scan_msg.ranges)) * scan_msg.angle_increment + scan_msg.angle_min
degrees = np.degrees(angles)
# 각도 범위 조정
valid_indices = np.logical_or(
(degrees >= 0) & (degrees <= 45),
(degrees >= 315) & (degrees < 360)
)
valid_degrees = degrees[valid_indices]
valid_ranges = np.array(scan_msg.ranges)[valid_indices]
if len(valid_ranges) == 0:
rospy.logwarn("No range")
return # 유효한 범위가 없으면 함수 종료
x = valid_ranges * np.cos(angles[valid_indices])
y = valid_ranges * np.sin(angles[valid_indices])
# 이미지 좌표로 변환
u = np.zeros_like(valid_degrees)
u[valid_degrees <= 45] = ((-valid_degrees[valid_degrees <= 45]+45) / 45 * (width/2)).astype(int)
u[valid_degrees >= 315] = ((-valid_degrees[valid_degrees >= 315] + 360) / 45 * (width / 2)+(width/2)).astype(int)
v = np.full_like(u, height // 2, dtype=int)
for ui, vi, range_val, deg in zip(u, v, valid_ranges, valid_degrees):
if 0 <= ui < width:
color_index = min(int(range_val * 63), 255)
color = tuple(map(int, self.color_map[color_index]))
cv2.circle(fusion_image, (int(ui), int(vi)), 2, color, -1)
for u_offset in range(-5, 6):
for v_offset in range(-25, 26):
x = int(ui + u_offset)
y = int(vi + v_offset)
if (0 <= x < width) and (0 <= y < height):
fusion_image[y, x] = tuple(map(int,color))
rospy.loginfo(f"LiDAR point (x, y): ({x:.2f}, {y:.2f}), projected (u, v): ({ui}, {vi})")
rospy.loginfo(f"Angle: {deg:.2f} degrees, Distance: {range_val:.2f} meters")
cv2.rectangle(fusion_image, (0, height // 2 - 25), (width, height // 2 + 25), (0, 255, 0), 1)
try:
fusion_msg = self.bridge.cv2_to_imgmsg(fusion_image, "bgr8")
self.fusion_pub.publish(fusion_msg)
except CvBridgeError as e:
rospy.logerr(f"CvBridge Error: {e}")
def create_color_map(self):
color_map = np.zeros((256, 3), dtype=np.uint8)
for i in range(256):
normalized_distance = i / 255.0
hue = 20 # Orange hue
saturation = 255
value = int(255 * (1 - normalized_distance))
color = cv2.cvtColor(np.uint8([[[hue, saturation, value]]]), cv2.COLOR_HSV2BGR)[0][0]
color_map[i] = color
return color_map
if __name__ == '__main__':
rospy.init_node('camera_lidar_fusion', anonymous=True)
try:
fusion_node = CameraLidarFusion()
rospy.spin()
except rospy.ROSInterruptException:
pass
class CameraLidarFusion:
def __init__(self):
self.bridge = CvBridge()
openCV의 객체를 bridge로 표기한다.
# 동기화를 위한 Subscriber 설정
self.image_sub = Subscriber("/usb_cam/image_raw", Image)
#Topic 중 하나인 sub_cam/image_raw 에서 오는 raw 이미지 파일을 수집하는
#토픽 구독자(Topic Subcriber), 메세지 리스터(Massage Listener, 센서 데이터 수신기(Sensor data Receiver 등으로 생각하자.
self.scan_sub = Subscriber("/scan", LaserScan)
#Topinc 중 하나인 /scan 에서 데이터를 읽어 오는 구독자.
self.fusion_pub = rospy.Publisher("/fusion_image", Image, queue_size=1)
#/fusion_imagae를 저장하며 큐는 1인 데이터 생산자를 fusion_pub 으로 표기한다.
# 동기화 설정
self.ts = ApproximateTimeSynchronizer([self.image_sub, self.scan_sub], 10, 0.1)
#ROS 에서는 각 데이터 타입 헤더에 시간에 대한 정보가 포함되어 있다.
#이를 매개로 각 데이터들의 시간들에 대한 싱크를 맞출 수 있으며
#해당 ApproximateTimeSynchronizer의 경우도 ROS에서 제공을 해준다.
self.ts.registerCallback(self.sync_callback)
#싱크된 데이터 도착시 시스템에 통보한다.
#여기서 싱크의 조건은 100ms의 차이까지다.
# Camera parametersrosc
self.fx = 844
self.fy = 844
self.cx = 320
self.cy = 250
# 미리 계산된 색상 맵
self.color_map = self.create_color_map()
ROS 의 표준 메세지 헤더 (다중 센서 시스템에서 중요!)
Header header
uint32 seq
time stamp
string frame_id
각 메시지가 도착하면 그 타임스탬프를 확인
서로 다른 토픽의 메시지들 중 타임스탬프가 지정된 시간 범위(여기서는 0.1초) 내에 있는 것들을 찾기
조건을 만족하는 메시지 세트를 찾으면, 이들을 동기화된 것으로 간주하고 등록된 콜백 함수를 호출
def sync_callback(self, image_msg, scan_msg):
try:
image_data = self.bridge.imgmsg_to_cv2(image_msg, "bgr8")
self.fuse_data(image_data, scan_msg)
except CvBridgeError as e:
rospy.logerr(f"CvBridge Error: {e}")
따라서 상기의
self.ts.registerCallback(self.sync_callback)
함수에서 ts에 해당되는 100ms단위로 정렬된 데이터들의 쌍이 수집이 되면
여기서 정의 된 sync_callback 함수로 이전하여 해당 이미지를 opencv2 데이터(bgr8-RGB 32bit)로 변환하고
fuse_data로 조립한다.
def fuse_data(self, image_data, scan_msg):
height, width = image_data.shape[:2]
fusion_image = image_data.copy()
# fuse_data 를 정의한다. 먼저 해당 이미지 데이터의 row 픽셀과 column 픽셀을 추출하여
# height, width 를 정의해준다.
# 혹시 모르니 원본 데이터를 보존해둔다. 하지만 이 이미지는 raw data가 아닌 bgr8 으로 변환된 포멧이다.
angles = np.arange(len(scan_msg.ranges)) * scan_msg.angle_increment + scan_msg.angle_min
# 360 라이더는 한바퀴를 돌 때 자동적으로 ranges 배열을 생성하도록 하는데
# 이 배열을 기반으로 해상도를 알 수 있다.
# 만약 해상도가 60이라면 그 뒤의 데이터와 데이터 사이의 변화 각도를 토대로
# 각 배열에 곱해주면 60개의 해상도에 대한 각 센싱의 총 이동 누적 각도를 알 수 있다.
# 여기서는 1도의 increment를 가지고 있으며, 총 360개의 배열을 생성하므로
# np.arange는 360, angle_increment는 1도로 계산이 된다.
# 각 데이터는 라이더에서 송신 받는 데이터를 기반을 하는 것으로 보여진다.
# min 데이터의 경우 시작 데이터라 표기되지만 실제로는 Lidar에서 고정된 값으로 정해져 있다.
degrees = np.degrees(angles)
# 해당 라디안 단위의 값을 360도 단위법으로 바꾼다.
# 각도 범위 조정
valid_indices = np.logical_or(
(degrees >= 0) & (degrees <= 45),
(degrees >= 315) & (degrees < 360)
)
# (0도 이상 and 45도 이하) or (315도 이상 and 360도 이하) 인 경우에 valid_indices 로 데이터 저장.
valid_degrees = degrees[valid_indices]
valid_ranges = np.array(scan_msg.ranges)[valid_indices]
# 위에 만든 degrees데이터들에 valid_indices 마스크 적용하여, valid_degrees 추출.
# 그리고 해당 각도에 대한 거리를 구하기 위하여 valid_indices 에 해당되는 배열들에 대한 거리 값을 추출한다.
# increment가 1도이기 때문에 360도 단위의 valid_indices로 라디안 표기의 데이터 추출이 가능한 작업이다.
if len(valid_ranges) == 0:
rospy.logwarn("No range")
return # 유효한 범위가 없으면 함수 종료
# 유효한 범위가 없으면 No range 를 터미널로 표기
x = valid_ranges * np.cos(angles[valid_indices])
y = valid_ranges * np.sin(angles[valid_indices])
# 이미지 좌표로 변환
u = np.zeros_like(valid_degrees)
# valid_degrees 와 동일한 배열을 새로 생성한 후 그 안의 값을 0으로 정리합니다.
# 여기에는 이미지의 x축 좌표가 위치하게 됩니다.
# 라이더의 각도에 따른 이미지 투영이 이루어지기 위해서 레졸루션이 90도인 90개의 배열을 필요로 합니다.
u[valid_degrees <= 45] = ((-valid_degrees[valid_degrees <= 45]+45) / 45 * (width/2)).astype(int)
# 0~45도의 경우 0이 정면이고 45도는 왼쪽 측면입니다. 이를 이미지 상에서는 x 축 상에서
# 0~ width/2 까지를 산정하므로 (45-(45도 이하의 각도들)/45)*(width/2) 를 하여 투영 해상도를 45단위로 분리합니다.
u[valid_degrees >= 315] = ((-valid_degrees[valid_degrees >= 315] + 360) / 45 * (width / 2)+(width/2)).astype(int)
# 이 경우 315~360도의 오른쪽 라이더 해상도를 이미지의 오른쪽으로 투영시키려 하므로
# 360도는 정면임을 감안하여 각도가 높아질수록 정면을 향하도록 수식을 만들었습니다. 때문에 역시 입력값은 -로 출발합니다.
v = np.full_like(u, height // 2, dtype=int)
# y 값의 경우 해당 이미지가 삽입되는 이미지의 중간점으로 고정시켜 줍니다.
# 즉, height이 480인 경우 240이 됩니다.
for ui, vi, range_val, deg in zip(u, v, valid_ranges, valid_degrees):
# ui,vi 는 이미지 상의 x,y 좌표
# range_val 은 라이다로부터의 측정 거리
# deg 는 각도
if 0 <= ui < width:
# ui가 이미지의 x 축 위에 올라갈 수 있는지를 판단한다
color_index = min(int(range_val * 63), 255)
# 거리에 63을 곱한 값에서 255까지의 색상을 사용한다.
color = tuple(map(int, self.color_map[color_index]))
# color_index에서 색상 맵을 가지고 오고 정수로 변환한다.
cv2.circle(fusion_image, (int(ui), int(vi)), 2, color, -1)
# 중심점은 ui,vi 이고 반지름은 2픽셀, 색상은 위에서 정의된 color, 그리고 -1은 원을 해당 색으로 채운다는 의미.
for u_offset in range(-5, 6):
# 해당 라이다 포인트 주변에 다음과 같은 사각형을 만든다.
# 폭이 중심 기준으로 -5~6, 상하로는 -25~26이다.
# 즉 폭이 10, 높이는 50이 되는 사각형이 완성된다.
for v_offset in range(-25, 26):
x = int(ui + u_offset)
y = int(vi + v_offset)
# 이 옵셋을 해당 ui,vi 를 중심으로 적층해나아간다.
if (0 <= x < width) and (0 <= y < height):
# 해당 좌표가 이미지 안에 들어가는지 확인하고
fusion_image[y, x] = tuple(map(int,color))
# fusion_image 함수를 통해 이미지의 x,y 위에 해당 튜플의 색상을 덧 입힌다.
rospy.loginfo(f"LiDAR point (x, y): ({x:.2f}, {y:.2f}), projected (u, v): ({ui}, {vi})")
rospy.loginfo(f"Angle: {deg:.2f} degrees, Distance: {range_val:.2f} meters")
cv2.rectangle(fusion_image, (0, height // 2 - 25), (width, height // 2 + 25), (0, 255, 0), 1)
# 해당 Depth이 효현될 모든 범위에 사각형의 표식을 준다.
try:
fusion_msg = self.bridge.cv2_to_imgmsg(fusion_image, "bgr8")
# bgr8로 인코딩 된 OpenCV 이미지를 ROS 이미지로 변환한다.
self.fusion_pub.publish(fusion_msg)
# 해당 이미지를 fusion_pub 로 퍼블리싱 한다.
# 즉 ros에서 다른 노드에서도 읽을 수 있도록 데이터를 방류, 공유 하는 것이다.
except CvBridgeError as e:
rospy.logerr(f"CvBridge Error: {e}")
해당 마스킹에 대한 코드는 점으로 표현하기보다는 면으로 표현하는 아래의 방식이 더 직관적이고 간편하다.
해당 코드를 적용하여 테스트 예정.
for ui, vi, range_val, deg in zip(u, v, valid_ranges, valid_degrees):
if 0 <= ui < width:
color_index = min(int(range_val * 63), 255)
color = tuple(map(int, self.color_map[color_index]))
# 최적화된 사각형 채우기
x_min, x_max = max(0, int(ui) - 5), min(width, int(ui) + 6)
y_min, y_max = max(0, int(vi) - 25), min(height, int(vi) + 26)
fusion_image[y_min:y_max, x_min:x_max] = color
# cv2.circle 호출은 그대로 유지
cv2.circle(fusion_image, (int(ui), int(vi)), 2, color, -1)
최종 코드
#!/usr/bin/env python3
import rospy
from sensor_msgs.msg import LaserScan, Image
from cv_bridge import CvBridge, CvBridgeError
import numpy as np
import cv2
import math
from message_filters import ApproximateTimeSynchronizer, Subscriber
class CameraLidarFusion:
def __init__(self):
self.bridge = CvBridge()
# 동기화를 위한 Subscriber 설정
self.image_sub = Subscriber("/usb_cam/image_raw", Image)
self.scan_sub = Subscriber("/scan", LaserScan)
self.fusion_pub = rospy.Publisher("/fusion_image", Image, queue_size=1)
# 동기화 설정
self.ts = ApproximateTimeSynchronizer([self.image_sub, self.scan_sub], 10, 0.1)
self.ts.registerCallback(self.sync_callback)
# Camera parametersrosc
self.fx = 844
self.fy = 844
self.cx = 320
self.cy = 250
# 미리 계산된 색상 맵
self.color_map = self.create_color_map()
def sync_callback(self, image_msg, scan_msg):
try:
image_data = self.bridge.imgmsg_to_cv2(image_msg, "bgr8")
self.fuse_data(image_data, scan_msg)
except CvBridgeError as e:
rospy.logerr(f"CvBridge Error: {e}")
def fuse_data(self, image_data, scan_msg):
height, width = image_data.shape[:2]
fusion_image = image_data.copy()
angles = np.arange(len(scan_msg.ranges)) * scan_msg.angle_increment + scan_msg.angle_min
degrees = np.degrees(angles)
# 각도 범위 조정
valid_indices = np.logical_or(
(degrees >= 0) & (degrees <= 15),
(degrees >= 345) & (degrees < 360)
)
valid_degrees = degrees[valid_indices]
valid_ranges = np.array(scan_msg.ranges)[valid_indices]
if len(valid_ranges) == 0:
rospy.logwarn("No range")
return # 유효한 범위가 없으면 함수 종료
x = valid_ranges * np.cos(angles[valid_indices])
y = valid_ranges * np.sin(angles[valid_indices])
# 이미지 좌표로 변환
u = np.zeros_like(valid_degrees)
u[valid_degrees <= 15] = ((-valid_degrees[valid_degrees <= 15]+15) / 15 * (width/2)).astype(int)
u[valid_degrees >= 345] = ((-valid_degrees[valid_degrees >= 345] + 360) / 15 * (width / 2)+(width/2)).astype(int)
v = np.full_like(u, height // 2, dtype=int)
for ui, vi, range_val, deg in zip(u, v, valid_ranges, valid_degrees):
if 0 <= ui < width:
color_index = min(int(range_val * 63), 255)
color = tuple(map(int, self.color_map[color_index]))
# 최적화된 사각형 채우기
x_min, x_max = max(0, int(ui) - 10), min(width, int(ui) + 12)
y_min, y_max = max(0, int(vi) - 25), min(height, int(vi) + 26)
fusion_image[y_min:y_max, x_min:x_max] = color
# cv2.circle 호출은 그대로 유지
cv2.circle(fusion_image, (int(ui), int(vi)), 2, color, -1)
#rospy.loginfo(f"LiDAR point (x, y): ({x:.2f}, {y:.2f}), projected (u, v): ({ui}, {vi})")
#rospy.loginfo(f"Angle: {deg:.2f} degrees, Distance: {range_val:.2f} meters")
#cv2.rectangle(fusion_image, (0, height // 2 - 25), (width, height // 2 + 25), (0, 255, 0), 1)
try:
fusion_msg = self.bridge.cv2_to_imgmsg(fusion_image, "bgr8")
self.fusion_pub.publish(fusion_msg)
except CvBridgeError as e:
rospy.logerr(f"CvBridge Error: {e}")
def create_color_map(self):
color_map = np.zeros((256, 3), dtype=np.uint8)
for i in range(256):
normalized_distance = i / 255.0
hue = 20 # Orange hue
saturation = 255
value = int(255 * (1 - normalized_distance))
color = cv2.cvtColor(np.uint8([[[hue, saturation, value]]]), cv2.COLOR_HSV2BGR)[0][0]
color_map[i] = color
return color_map
if __name__ == '__main__':
rospy.init_node('camera_lidar_fusion', anonymous=True)
try:
fusion_node = CameraLidarFusion()
rospy.spin()
except rospy.ROSInterruptException:
pass
'Autonomous Driving > Turtlebot3' 카테고리의 다른 글
Puppy (0) | 2024.08.28 |
---|---|
Sensor-Fusion 1차 총 정리 (0) | 2024.08.01 |
FUSION [LiDAR+Camera] (0) | 2024.07.11 |
FUSION Code review (0) | 2024.07.08 |
Fusion (0) | 2024.07.08 |