-
Notifications
You must be signed in to change notification settings - Fork 154
/
add_missing_data.py
93 lines (74 loc) · 4.37 KB
/
add_missing_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import csv
import numpy as np
from scipy.interpolate import interp1d
def interpolate_bounding_boxes(data):
# Extract necessary data columns from input data
frame_numbers = np.array([int(row['frame_nmr']) for row in data])
car_ids = np.array([int(float(row['car_id'])) for row in data])
car_bboxes = np.array([list(map(float, row['car_bbox'][1:-1].split())) for row in data])
license_plate_bboxes = np.array([list(map(float, row['license_plate_bbox'][1:-1].split())) for row in data])
interpolated_data = []
unique_car_ids = np.unique(car_ids)
for car_id in unique_car_ids:
frame_numbers_ = [p['frame_nmr'] for p in data if int(float(p['car_id'])) == int(float(car_id))]
print(frame_numbers_, car_id)
# Filter data for a specific car ID
car_mask = car_ids == car_id
car_frame_numbers = frame_numbers[car_mask]
car_bboxes_interpolated = []
license_plate_bboxes_interpolated = []
first_frame_number = car_frame_numbers[0]
last_frame_number = car_frame_numbers[-1]
for i in range(len(car_bboxes[car_mask])):
frame_number = car_frame_numbers[i]
car_bbox = car_bboxes[car_mask][i]
license_plate_bbox = license_plate_bboxes[car_mask][i]
if i > 0:
prev_frame_number = car_frame_numbers[i-1]
prev_car_bbox = car_bboxes_interpolated[-1]
prev_license_plate_bbox = license_plate_bboxes_interpolated[-1]
if frame_number - prev_frame_number > 1:
# Interpolate missing frames' bounding boxes
frames_gap = frame_number - prev_frame_number
x = np.array([prev_frame_number, frame_number])
x_new = np.linspace(prev_frame_number, frame_number, num=frames_gap, endpoint=False)
interp_func = interp1d(x, np.vstack((prev_car_bbox, car_bbox)), axis=0, kind='linear')
interpolated_car_bboxes = interp_func(x_new)
interp_func = interp1d(x, np.vstack((prev_license_plate_bbox, license_plate_bbox)), axis=0, kind='linear')
interpolated_license_plate_bboxes = interp_func(x_new)
car_bboxes_interpolated.extend(interpolated_car_bboxes[1:])
license_plate_bboxes_interpolated.extend(interpolated_license_plate_bboxes[1:])
car_bboxes_interpolated.append(car_bbox)
license_plate_bboxes_interpolated.append(license_plate_bbox)
for i in range(len(car_bboxes_interpolated)):
frame_number = first_frame_number + i
row = {}
row['frame_nmr'] = str(frame_number)
row['car_id'] = str(car_id)
row['car_bbox'] = ' '.join(map(str, car_bboxes_interpolated[i]))
row['license_plate_bbox'] = ' '.join(map(str, license_plate_bboxes_interpolated[i]))
if str(frame_number) not in frame_numbers_:
# Imputed row, set the following fields to '0'
row['license_plate_bbox_score'] = '0'
row['license_number'] = '0'
row['license_number_score'] = '0'
else:
# Original row, retrieve values from the input data if available
original_row = [p for p in data if int(p['frame_nmr']) == frame_number and int(float(p['car_id'])) == int(float(car_id))][0]
row['license_plate_bbox_score'] = original_row['license_plate_bbox_score'] if 'license_plate_bbox_score' in original_row else '0'
row['license_number'] = original_row['license_number'] if 'license_number' in original_row else '0'
row['license_number_score'] = original_row['license_number_score'] if 'license_number_score' in original_row else '0'
interpolated_data.append(row)
return interpolated_data
# Load the CSV file
with open('test.csv', 'r') as file:
reader = csv.DictReader(file)
data = list(reader)
# Interpolate missing data
interpolated_data = interpolate_bounding_boxes(data)
# Write updated data to a new CSV file
header = ['frame_nmr', 'car_id', 'car_bbox', 'license_plate_bbox', 'license_plate_bbox_score', 'license_number', 'license_number_score']
with open('test_interpolated.csv', 'w', newline='') as file:
writer = csv.DictWriter(file, fieldnames=header)
writer.writeheader()
writer.writerows(interpolated_data)