Track Interpolation
With an equal time window
import numpy as np
import aisdb
from datetime import timedelta, datetime
y1, x1 = -66.84683, -61.10595523571155
y2, x2 = -66.83036, -61.11595523571155
y3, x3 = -66.82036, -61.12595523571155
t1 = dt_2_epoch( datetime(2021, 1, 1, 1) )
t2 = dt_2_epoch( datetime(2021, 1, 1, 2) )
t3 = dt_2_epoch(datetime(2021, 1, 1, 3))
# creating a sample track
tracks_short = [
dict(
lon=np.array([x1, x2, x3]),
lat=np.array([y1, y2, y3]),
time=np.array([t1, t2, t3]),
dynamic=set(['lon', 'lat', 'time']),
static = set()
)
]
tracks__ = aisdb.interp.interp_time(tracks_short, timedelta(minutes=10))
for tr in tracks__:
print(tr)
With an equal space
import numpy as np
from datetime import timedelta, datetime
import aisdb
# creating a sample track
y1, x1 = -66.84683, -61.10595523571155
y2, x2 = -66.83036, -61.11595523571155
y3, x3 = -66.82036, -61.12595523571155
t1 = dt_2_epoch( datetime(2021, 1, 1, 1) )
t2 = dt_2_epoch( datetime(2021, 1, 1, 2) )
t3 = dt_2_epoch(datetime(2021, 1, 1, 3))
tracks_short = [
dict(
lon=np.array([x1, x2, x3]),
lat=np.array([y1, y2, y3]),
time=np.array([t1, t2, t3]),
dynamic=set(['lon', 'lat', 'time']),
static = set()
)
]
# track points interpolation
tracks__ = aisdb.interp.interp_spacing(spacing=1000, tracks=tracks__)
for tr in tracks__:
print(tr)
Geometric Track Interpolation
import numpy as np
import aisdb
from datetime import timedelta, datetime
y1, x1 = -66.84683, -61.10595523571155
y2, x2 = -66.83036, -61.11595523571155
y3, x3 = -66.82036, -61.12595523571155
t1 = dt_2_epoch( datetime(2021, 1, 1, 1) )
t2 = dt_2_epoch( datetime(2021, 1, 1, 2) )
t3 = dt_2_epoch(datetime(2021, 1, 1, 3))
# creating a sample track
tracks_short = [
dict(
lon=np.array([x1, x2, x3]),
lat=np.array([y1, y2, y3]),
time=np.array([t1, t2, t3]),
dynamic=set(['lon', 'lat', 'time']),
static = set()
)
]
tracks__ = aisdb.interp.geo_interp_time(tracks_short, timedelta(minutes=10))
for tr in tracks__:
print(tr)
Custom Track Interpolation (barycentric interpolation)
from datetime import datetime, timedelta
from aisdb.interp import interp_time
import warnings
from scipy.interpolate import barycentric_interpolate
import numpy as np
from pyproj import Geod
from pyproj import Transformer
def barycentric_interp_time(tracks, step=timedelta(minutes=1), original_crs = 4269):
new_crs = 3857
geod = Geod(ellps="WGS84")
for track in tracks:
if track['time'].size <= 1:
# yield track
warnings.warn('cannot interpolate track of length 1, skipping...')
continue
intervals = np.arange(
start=track['time'][0],
stop=track['time'][-1] + int(step.total_seconds()),
step=int(step.total_seconds()),
).astype(int)
assert len(intervals) >= 1
itr = dict(
**{k: track[k]
for k in track['static']},
time=intervals,
static=track['static'],
dynamic=track['dynamic']
)
if 'lat' in track['dynamic']:
itr['lon'] = barycentric_interpolate(x=intervals.astype(int),
xi=track['time'].astype(int),
yi=track['lon'].astype(float))
itr['lat'] = barycentric_interpolate(x=intervals.astype(int),
xi=track['time'].astype(int),
yi=track['lat'].astype(float))
if 'cog' in track['dynamic']:
courses, _, _ = geod.inv(itr['lon'][:-1], itr['lat'][:-1], itr['lon'][1:], itr['lat'][1:])
itr['cog'] = np.append(courses, track['cog'][-1])
for key in track['dynamic']:
if key not in itr:
itr[key] = np.interp(x=intervals.astype(int),
xp=track['time'].astype(int),
fp=track[key].astype(float))
yield itr
return
Let's do an example
track_ = {'dim_star': None, 'dim_port': None, 'dim_bow': None, 'mmsi': 205790000, 'ship_type': None, 'dim_stern': None,
'ship_type_txt': None, 'imo': None, 'vessel_name': None, 'lon': np.array([-68.1179915, -60.5879110, -50.14684933], dtype=np.float32),
'lat': np.array([49.29140307, 47.5648122,51.5698713], dtype=np.float32), 'time': np.array([1625148653, 1625160623 ,1625176600], dtype=np.uint32),
'sog': np.array([14.4, 14.4, 14.5], dtype=np.float32), 'cog': np.array([68, 69,71], dtype=np.uint32),
'static': {'dim_star', 'dim_port', 'dim_bow', 'mmsi', 'ship_type', 'dim_stern', 'ship_type_txt', 'imo',
'vessel_name'}, 'dynamic': {'sog', 'cog', 'lat', 'time', 'lon'}}
track_short = [track_]
lin_tracks = interp_time(track_short,step=timedelta(hours=0.5))
lin_tracks = [x for x in lin_tracks]
geo_Tracks_ = geo_interp_time(track_short, step=timedelta(hours=0.5))
geo_Tracks_ = [x for x in geo_Tracks_]
bary_Tracks = barycentric_interp_time(track_short, step=timedelta(hours=0.5))
bary_Tracks = [x for x in bary_Tracks]
geo_lin_tracks = interp_time(geo_interp_time(track_short, step=timedelta(minutes=1)),step=timedelta(hours=0.5))
bary_geo_tracks = barycentric_interp_time(geo_interp_time(track_short, step=timedelta(hours=0.5)), step=timedelta(hours=0.2))
Last updated