관리 메뉴

Wiredwisdom

Fusion Final Code analysis 본문

Autonomous Driving/Turtlebot3

Fusion Final Code analysis

Duke_Wisdom 2024. 7. 14. 18:25

 

#!/usr/bin/env python3
import rospy
from sensor_msgs.msg import LaserScan, Image
from cv_bridge import CvBridge, CvBridgeError
import numpy as np
import cv2
import math
from message_filters import ApproximateTimeSynchronizer, Subscriber

class CameraLidarFusion:
    def __init__(self):
        self.bridge = CvBridge()
        
        # 동기화를 위한 Subscriber 설정
        self.image_sub = Subscriber("/usb_cam/image_raw", Image)
        self.scan_sub = Subscriber("/scan", LaserScan)
        self.fusion_pub = rospy.Publisher("/fusion_image", Image, queue_size=1)
        
        # 동기화 설정
        self.ts = ApproximateTimeSynchronizer([self.image_sub, self.scan_sub], 10, 0.1)
        self.ts.registerCallback(self.sync_callback)
        
        # Camera parametersrosc
        self.fx = 844
        self.fy = 844
        self.cx = 320
        self.cy = 250
        
        # 미리 계산된 색상 맵
        self.color_map = self.create_color_map()

    def sync_callback(self, image_msg, scan_msg):
        try:
            image_data = self.bridge.imgmsg_to_cv2(image_msg, "bgr8")
            self.fuse_data(image_data, scan_msg)
        except CvBridgeError as e:
            rospy.logerr(f"CvBridge Error: {e}")

    def fuse_data(self, image_data, scan_msg):
        height, width = image_data.shape[:2]
        fusion_image = image_data.copy()
        
        angles = np.arange(len(scan_msg.ranges)) * scan_msg.angle_increment + scan_msg.angle_min
        degrees = np.degrees(angles)
           
        # 각도 범위 조정
        valid_indices = np.logical_or(
            (degrees >= 0) & (degrees <= 45),
            (degrees >= 315) & (degrees < 360)
        )
            
        valid_degrees = degrees[valid_indices]
        valid_ranges = np.array(scan_msg.ranges)[valid_indices]
        
        if len(valid_ranges) == 0:
            rospy.logwarn("No range")
            return  # 유효한 범위가 없으면 함수 종료            
            
        x = valid_ranges * np.cos(angles[valid_indices])
        y = valid_ranges * np.sin(angles[valid_indices])
        
        # 이미지 좌표로 변환
        u = np.zeros_like(valid_degrees)
        
        u[valid_degrees <= 45] = ((-valid_degrees[valid_degrees <= 45]+45) / 45 * (width/2)).astype(int)
        u[valid_degrees >= 315] = ((-valid_degrees[valid_degrees >= 315] + 360) / 45 * (width / 2)+(width/2)).astype(int)
                        
        v = np.full_like(u, height // 2, dtype=int)
                         
        for ui, vi, range_val, deg in zip(u, v, valid_ranges, valid_degrees):
            if 0 <= ui < width:
                color_index = min(int(range_val * 63), 255)
                color = tuple(map(int, self.color_map[color_index]))
                cv2.circle(fusion_image, (int(ui), int(vi)), 2, color, -1)              
                for u_offset in range(-5, 6):
                    for v_offset in range(-25, 26):
                        x = int(ui + u_offset)
                        y = int(vi + v_offset)
                        if (0 <= x < width) and (0 <= y < height):
                            fusion_image[y, x] = tuple(map(int,color))
                             
            rospy.loginfo(f"LiDAR point (x, y): ({x:.2f}, {y:.2f}), projected (u, v): ({ui}, {vi})")
            rospy.loginfo(f"Angle: {deg:.2f} degrees, Distance: {range_val:.2f} meters")
                            
            cv2.rectangle(fusion_image, (0, height // 2 - 25), (width, height // 2 + 25), (0, 255, 0), 1)

        try:
            fusion_msg = self.bridge.cv2_to_imgmsg(fusion_image, "bgr8")
            self.fusion_pub.publish(fusion_msg)
        except CvBridgeError as e:
            rospy.logerr(f"CvBridge Error: {e}")

    def create_color_map(self):
        color_map = np.zeros((256, 3), dtype=np.uint8)
        for i in range(256):
            normalized_distance = i / 255.0
            hue = 20  # Orange hue
            saturation = 255
            value = int(255 * (1 - normalized_distance))
            color = cv2.cvtColor(np.uint8([[[hue, saturation, value]]]), cv2.COLOR_HSV2BGR)[0][0]
            color_map[i] = color
        return color_map

if __name__ == '__main__':
    rospy.init_node('camera_lidar_fusion', anonymous=True)
    try:
        fusion_node = CameraLidarFusion()
        rospy.spin()
    except rospy.ROSInterruptException:
        pass

 

 

 

class CameraLidarFusion:
    def __init__(self):
        self.bridge = CvBridge()
        openCV의 객체를 bridge로 표기한다.
        
        # 동기화를 위한 Subscriber 설정
        self.image_sub = Subscriber("/usb_cam/image_raw", Image)
        #Topic 중 하나인 sub_cam/image_raw 에서 오는 raw 이미지 파일을 수집하는 
        #토픽 구독자(Topic Subcriber), 메세지 리스터(Massage Listener, 센서 데이터 수신기(Sensor data Receiver 등으로 생각하자.
        self.scan_sub = Subscriber("/scan", LaserScan)
        #Topinc 중 하나인 /scan 에서 데이터를 읽어 오는 구독자. 
        self.fusion_pub = rospy.Publisher("/fusion_image", Image, queue_size=1)
        #/fusion_imagae를 저장하며 큐는 1인 데이터 생산자를 fusion_pub 으로 표기한다.
        
        # 동기화 설정
        self.ts = ApproximateTimeSynchronizer([self.image_sub, self.scan_sub], 10, 0.1)
        #ROS 에서는 각 데이터 타입 헤더에 시간에 대한 정보가 포함되어 있다. 
        #이를 매개로 각 데이터들의 시간들에 대한 싱크를 맞출 수 있으며
        #해당 ApproximateTimeSynchronizer의 경우도 ROS에서 제공을 해준다. 
        
        self.ts.registerCallback(self.sync_callback)
        #싱크된 데이터 도착시 시스템에 통보한다. 
        #여기서 싱크의 조건은 100ms의 차이까지다.
       
        
        
        # Camera parametersrosc
        self.fx = 844
        self.fy = 844
        self.cx = 320
        self.cy = 250
        
        # 미리 계산된 색상 맵
        self.color_map = self.create_color_map()

 

ROS 의 표준 메세지 헤더 (다중 센서 시스템에서 중요!)

Header header
  uint32 seq
  time stamp
  string frame_id

 

 

각 메시지가 도착하면 그 타임스탬프를 확인

서로 다른 토픽의 메시지들 중 타임스탬프가 지정된 시간 범위(여기서는 0.1초) 내에 있는 것들을 찾기

조건을 만족하는 메시지 세트를 찾으면, 이들을 동기화된 것으로 간주하고 등록된 콜백 함수를 호출


 

 

 def sync_callback(self, image_msg, scan_msg):
        try:
            image_data = self.bridge.imgmsg_to_cv2(image_msg, "bgr8")
            self.fuse_data(image_data, scan_msg)
        except CvBridgeError as e:
            rospy.logerr(f"CvBridge Error: {e}")

 

따라서 상기의 

 

self.ts.registerCallback(self.sync_callback)

 

함수에서 ts에 해당되는 100ms단위로 정렬된 데이터들의 쌍이 수집이 되면 

여기서 정의 된 sync_callback 함수로 이전하여 해당 이미지를 opencv2 데이터(bgr8-RGB 32bit)로 변환하고

fuse_data로 조립한다

 

def fuse_data(self, image_data, scan_msg):
        height, width = image_data.shape[:2]
        fusion_image = image_data.copy()
        # fuse_data 를 정의한다. 먼저 해당 이미지 데이터의 row 픽셀과 column 픽셀을 추출하여
        # height, width 를 정의해준다. 
        # 혹시 모르니 원본 데이터를 보존해둔다. 하지만 이 이미지는 raw data가 아닌 bgr8 으로 변환된 포멧이다.
        
        
        angles = np.arange(len(scan_msg.ranges)) * scan_msg.angle_increment + scan_msg.angle_min
        # 360 라이더는 한바퀴를 돌 때 자동적으로 ranges 배열을 생성하도록 하는데
        # 이 배열을 기반으로 해상도를 알 수 있다.
		# 만약 해상도가 60이라면 그 뒤의 데이터와 데이터 사이의 변화 각도를 토대로
        # 각 배열에 곱해주면 60개의 해상도에 대한 각 센싱의 총 이동 누적 각도를 알 수 있다.
        # 여기서는 1도의 increment를 가지고 있으며, 총 360개의 배열을 생성하므로
        # np.arange는 360, angle_increment는 1도로 계산이 된다. 
        # 각 데이터는 라이더에서 송신 받는 데이터를 기반을 하는 것으로 보여진다.
		# min 데이터의 경우 시작 데이터라 표기되지만 실제로는 Lidar에서 고정된 값으로 정해져 있다. 
        
        degrees = np.degrees(angles)
        # 해당 라디안 단위의 값을 360도 단위법으로 바꾼다.
        
        
           
        # 각도 범위 조정
        valid_indices = np.logical_or(
            (degrees >= 0) & (degrees <= 45),
            (degrees >= 315) & (degrees < 360)
        )
        # (0도 이상 and 45도 이하) or (315도 이상 and 360도 이하) 인 경우에 valid_indices 로 데이터 저장.
          
          
        valid_degrees = degrees[valid_indices]
        valid_ranges = np.array(scan_msg.ranges)[valid_indices]
           
        # 위에 만든 degrees데이터들에 valid_indices 마스크 적용하여, valid_degrees 추출. 
        # 그리고 해당 각도에 대한 거리를 구하기 위하여 valid_indices 에 해당되는 배열들에 대한 거리 값을 추출한다.
        # increment가 1도이기 때문에 360도 단위의 valid_indices로 라디안 표기의 데이터 추출이 가능한 작업이다.
        
        if len(valid_ranges) == 0:
            rospy.logwarn("No range")
            return  # 유효한 범위가 없으면 함수 종료  
            
        # 유효한 범위가 없으면 No range 를 터미널로 표기    
            
        x = valid_ranges * np.cos(angles[valid_indices])
        y = valid_ranges * np.sin(angles[valid_indices])
        
        # 이미지 좌표로 변환
        u = np.zeros_like(valid_degrees)
        # valid_degrees 와 동일한 배열을 새로 생성한 후 그 안의 값을 0으로 정리합니다.
        # 여기에는 이미지의 x축 좌표가 위치하게 됩니다.
        # 라이더의 각도에 따른 이미지 투영이 이루어지기 위해서 레졸루션이 90도인 90개의 배열을 필요로 합니다.
        
        u[valid_degrees <= 45] = ((-valid_degrees[valid_degrees <= 45]+45) / 45 * (width/2)).astype(int)
        # 0~45도의 경우 0이 정면이고 45도는 왼쪽 측면입니다. 이를 이미지 상에서는 x 축 상에서 
        # 0~ width/2 까지를 산정하므로 (45-(45도 이하의 각도들)/45)*(width/2) 를 하여 투영 해상도를 45단위로 분리합니다.    
        
        u[valid_degrees >= 315] = ((-valid_degrees[valid_degrees >= 315] + 360) / 45 * (width / 2)+(width/2)).astype(int)
        # 이 경우 315~360도의 오른쪽 라이더 해상도를 이미지의 오른쪽으로 투영시키려 하므로
        # 360도는 정면임을 감안하여 각도가 높아질수록 정면을 향하도록 수식을 만들었습니다. 때문에 역시 입력값은 -로 출발합니다.
        


        v = np.full_like(u, height // 2, dtype=int)
        # y 값의 경우 해당 이미지가 삽입되는 이미지의 중간점으로 고정시켜 줍니다. 
        # 즉, height이 480인 경우 240이 됩니다.
        
        for ui, vi, range_val, deg in zip(u, v, valid_ranges, valid_degrees):
        # ui,vi 는 이미지 상의 x,y 좌표
        # range_val 은 라이다로부터의 측정 거리
        # deg 는 각도 
        
            if 0 <= ui < width:
		# ui가 이미지의 x 축 위에 올라갈 수 있는지를 판단한다
                color_index = min(int(range_val * 63), 255)
        		# 거리에 63을 곱한 값에서 255까지의 색상을 사용한다.
                color = tuple(map(int, self.color_map[color_index]))
                # color_index에서 색상 맵을 가지고 오고 정수로 변환한다. 
                cv2.circle(fusion_image, (int(ui), int(vi)), 2, color, -1)  
                # 중심점은 ui,vi 이고 반지름은 2픽셀, 색상은 위에서 정의된 color, 그리고 -1은 원을 해당 색으로 채운다는 의미.
                for u_offset in range(-5, 6):
                # 해당 라이다 포인트 주변에 다음과 같은 사각형을 만든다. 
                # 폭이 중심 기준으로 -5~6, 상하로는 -25~26이다. 
                # 즉 폭이 10, 높이는 50이 되는 사각형이 완성된다.
                    for v_offset in range(-25, 26):                  
                        x = int(ui + u_offset)
                        y = int(vi + v_offset)
                        # 이 옵셋을 해당 ui,vi 를 중심으로 적층해나아간다.
                        if (0 <= x < width) and (0 <= y < height):
                        # 해당 좌표가 이미지 안에 들어가는지 확인하고
                            fusion_image[y, x] = tuple(map(int,color))
                        #  fusion_image 함수를 통해 이미지의 x,y 위에 해당 튜플의 색상을 덧 입힌다.
                             
                             
                                        
            rospy.loginfo(f"LiDAR point (x, y): ({x:.2f}, {y:.2f}), projected (u, v): ({ui}, {vi})")
            rospy.loginfo(f"Angle: {deg:.2f} degrees, Distance: {range_val:.2f} meters")
                            
            cv2.rectangle(fusion_image, (0, height // 2 - 25), (width, height // 2 + 25), (0, 255, 0), 1)
			# 해당 Depth이 효현될 모든 범위에 사각형의 표식을 준다. 
            
        try:
            fusion_msg = self.bridge.cv2_to_imgmsg(fusion_image, "bgr8")
            # bgr8로 인코딩 된 OpenCV 이미지를 ROS 이미지로 변환한다.
            self.fusion_pub.publish(fusion_msg)
			# 해당 이미지를 fusion_pub 로 퍼블리싱 한다. 
            # 즉 ros에서 다른 노드에서도 읽을 수 있도록 데이터를 방류, 공유 하는 것이다.
        except CvBridgeError as e:
            rospy.logerr(f"CvBridge Error: {e}")

 

 

 

해당 마스킹에 대한 코드는 점으로 표현하기보다는 면으로 표현하는 아래의 방식이 더 직관적이고 간편하다.
해당 코드를 적용하여 테스트 예정.

for ui, vi, range_val, deg in zip(u, v, valid_ranges, valid_degrees):
    if 0 <= ui < width:
        color_index = min(int(range_val * 63), 255)
        color = tuple(map(int, self.color_map[color_index]))
        
        # 최적화된 사각형 채우기
        x_min, x_max = max(0, int(ui) - 5), min(width, int(ui) + 6)
        y_min, y_max = max(0, int(vi) - 25), min(height, int(vi) + 26)
        
        fusion_image[y_min:y_max, x_min:x_max] = color

# cv2.circle 호출은 그대로 유지
cv2.circle(fusion_image, (int(ui), int(vi)), 2, color, -1)

 

 

 

 

 


 

최종 코드

#!/usr/bin/env python3
import rospy
from sensor_msgs.msg import LaserScan, Image
from cv_bridge import CvBridge, CvBridgeError
import numpy as np
import cv2
import math
from message_filters import ApproximateTimeSynchronizer, Subscriber

class CameraLidarFusion:
    def __init__(self):
        self.bridge = CvBridge()
        
        # 동기화를 위한 Subscriber 설정
        self.image_sub = Subscriber("/usb_cam/image_raw", Image)
        self.scan_sub = Subscriber("/scan", LaserScan)
        self.fusion_pub = rospy.Publisher("/fusion_image", Image, queue_size=1)
        
        # 동기화 설정
        self.ts = ApproximateTimeSynchronizer([self.image_sub, self.scan_sub], 10, 0.1)
        self.ts.registerCallback(self.sync_callback)
        
        # Camera parametersrosc
        self.fx = 844
        self.fy = 844
        self.cx = 320
        self.cy = 250
        
        # 미리 계산된 색상 맵
        self.color_map = self.create_color_map()

    def sync_callback(self, image_msg, scan_msg):
        try:
            image_data = self.bridge.imgmsg_to_cv2(image_msg, "bgr8")
            self.fuse_data(image_data, scan_msg)
        except CvBridgeError as e:
            rospy.logerr(f"CvBridge Error: {e}")

    def fuse_data(self, image_data, scan_msg):
        height, width = image_data.shape[:2]
        fusion_image = image_data.copy()
        
        angles = np.arange(len(scan_msg.ranges)) * scan_msg.angle_increment + scan_msg.angle_min
        degrees = np.degrees(angles)
    
             
               
        # 각도 범위 조정
        valid_indices = np.logical_or(
            (degrees >= 0) & (degrees <= 15),
            (degrees >= 345) & (degrees < 360)
        )
            
    
        valid_degrees = degrees[valid_indices]
        valid_ranges = np.array(scan_msg.ranges)[valid_indices]
        
        if len(valid_ranges) == 0:
            rospy.logwarn("No range")
            return  # 유효한 범위가 없으면 함수 종료            
            
        x = valid_ranges * np.cos(angles[valid_indices])
        y = valid_ranges * np.sin(angles[valid_indices])
        
        # 이미지 좌표로 변환
        u = np.zeros_like(valid_degrees)
        
        u[valid_degrees <= 15] = ((-valid_degrees[valid_degrees <= 15]+15) / 15 * (width/2)).astype(int)
        u[valid_degrees >= 345] = ((-valid_degrees[valid_degrees >= 345] + 360) / 15 * (width / 2)+(width/2)).astype(int)

       
        
        v = np.full_like(u, height // 2, dtype=int)
       
       
             
        for ui, vi, range_val, deg in zip(u, v, valid_ranges, valid_degrees):
            if 0 <= ui < width:
                color_index = min(int(range_val * 63), 255)
                color = tuple(map(int, self.color_map[color_index]))
        
                # 최적화된 사각형 채우기
                x_min, x_max = max(0, int(ui) - 10), min(width, int(ui) + 12)
                y_min, y_max = max(0, int(vi) - 25), min(height, int(vi) + 26)
        
                fusion_image[y_min:y_max, x_min:x_max] = color

        # cv2.circle 호출은 그대로 유지
        cv2.circle(fusion_image, (int(ui), int(vi)), 2, color, -1)     
          
                      
            #rospy.loginfo(f"LiDAR point (x, y): ({x:.2f}, {y:.2f}), projected (u, v): ({ui}, {vi})")
            #rospy.loginfo(f"Angle: {deg:.2f} degrees, Distance: {range_val:.2f} meters")
                            
            #cv2.rectangle(fusion_image, (0, height // 2 - 25), (width, height // 2 + 25), (0, 255, 0), 1)

        try:
            fusion_msg = self.bridge.cv2_to_imgmsg(fusion_image, "bgr8")
            self.fusion_pub.publish(fusion_msg)
        except CvBridgeError as e:
            rospy.logerr(f"CvBridge Error: {e}")

    def create_color_map(self):
        color_map = np.zeros((256, 3), dtype=np.uint8)
        for i in range(256):
            normalized_distance = i / 255.0
            hue = 20  # Orange hue
            saturation = 255
            value = int(255 * (1 - normalized_distance))
            color = cv2.cvtColor(np.uint8([[[hue, saturation, value]]]), cv2.COLOR_HSV2BGR)[0][0]
            color_map[i] = color
        return color_map

if __name__ == '__main__':
    rospy.init_node('camera_lidar_fusion', anonymous=True)
    try:
        fusion_node = CameraLidarFusion()
        rospy.spin()
    except rospy.ROSInterruptException:
        pass

 

'Autonomous Driving > Turtlebot3' 카테고리의 다른 글

Puppy  (0) 2024.08.28
Sensor-Fusion 1차 총 정리  (0) 2024.08.01
FUSION [LiDAR+Camera]  (0) 2024.07.11
FUSION Code review  (0) 2024.07.08
Fusion  (0) 2024.07.08