Track Interpolation

With an equal time window

import numpy as np
import aisdb
from datetime import timedelta, datetime

y1, x1 = -66.84683, -61.10595523571155
y2, x2 = -66.83036, -61.11595523571155
y3, x3 = -66.82036, -61.12595523571155
t1 = dt_2_epoch( datetime(2021, 1, 1, 1) )
t2 = dt_2_epoch( datetime(2021, 1, 1, 2) )
t3 = dt_2_epoch(datetime(2021, 1, 1, 3))

# creating a sample track
tracks_short = [
    dict(
        lon=np.array([x1, x2, x3]),
        lat=np.array([y1, y2, y3]),
        time=np.array([t1, t2, t3]),
        dynamic=set(['lon', 'lat', 'time']),
        static = set()
    )
]

tracks__ = aisdb.interp.interp_time(tracks_short, timedelta(minutes=10))

for tr in tracks__:
    print(tr)

With an equal space

import numpy as np
from datetime import timedelta, datetime
import aisdb


# creating a sample track
y1, x1 = -66.84683, -61.10595523571155
y2, x2 = -66.83036, -61.11595523571155
y3, x3 = -66.82036, -61.12595523571155
t1 = dt_2_epoch( datetime(2021, 1, 1, 1) )
t2 = dt_2_epoch( datetime(2021, 1, 1, 2) )
t3 = dt_2_epoch(datetime(2021, 1, 1, 3))
tracks_short = [
    dict(
        lon=np.array([x1, x2, x3]),
        lat=np.array([y1, y2, y3]),
        time=np.array([t1, t2, t3]),
        dynamic=set(['lon', 'lat', 'time']),
        static = set()
    )
]

# track points interpolation
tracks__ = aisdb.interp.interp_spacing(spacing=1000, tracks=tracks__)
for tr in tracks__:
    print(tr)

Geometric Track Interpolation

import numpy as np
import aisdb
from datetime import timedelta, datetime

y1, x1 = -66.84683, -61.10595523571155
y2, x2 = -66.83036, -61.11595523571155
y3, x3 = -66.82036, -61.12595523571155
t1 = dt_2_epoch( datetime(2021, 1, 1, 1) )
t2 = dt_2_epoch( datetime(2021, 1, 1, 2) )
t3 = dt_2_epoch(datetime(2021, 1, 1, 3))

# creating a sample track
tracks_short = [
    dict(
        lon=np.array([x1, x2, x3]),
        lat=np.array([y1, y2, y3]),
        time=np.array([t1, t2, t3]),
        dynamic=set(['lon', 'lat', 'time']),
        static = set()
    )
]

tracks__ = aisdb.interp.geo_interp_time(tracks_short, timedelta(minutes=10))

for tr in tracks__:
    print(tr)

Custom Track Interpolation (barycentric interpolation)

from datetime import datetime, timedelta
from aisdb.interp import interp_time
import warnings
from scipy.interpolate import barycentric_interpolate
import numpy as np
from pyproj import Geod
from pyproj import Transformer

def barycentric_interp_time(tracks, step=timedelta(minutes=1), original_crs = 4269):
        new_crs = 3857
    geod = Geod(ellps="WGS84")
    for track in tracks:
        if track['time'].size <= 1:
            # yield track
            warnings.warn('cannot interpolate track of length 1, skipping...')
            continue
        intervals = np.arange(
            start=track['time'][0],
            stop=track['time'][-1] + int(step.total_seconds()),
            step=int(step.total_seconds()),
        ).astype(int)

        assert len(intervals) >= 1

        itr = dict(
            **{k: track[k]
               for k in track['static']},
            time=intervals,
            static=track['static'],
            dynamic=track['dynamic']
        )
        if 'lat' in track['dynamic']:
            itr['lon'] = barycentric_interpolate(x=intervals.astype(int),
                      xi=track['time'].astype(int),
                      yi=track['lon'].astype(float))
            itr['lat'] = barycentric_interpolate(x=intervals.astype(int),
                          xi=track['time'].astype(int),
                          yi=track['lat'].astype(float))
            if 'cog' in track['dynamic']:
                courses, _, _ = geod.inv(itr['lon'][:-1], itr['lat'][:-1], itr['lon'][1:], itr['lat'][1:])
                itr['cog'] = np.append(courses, track['cog'][-1])
        for key in track['dynamic']:
            if key not in itr:
                itr[key] = np.interp(x=intervals.astype(int),
                     xp=track['time'].astype(int),
                     fp=track[key].astype(float))

        yield itr

    return

Let's do an example

track_ = {'dim_star': None, 'dim_port': None, 'dim_bow': None, 'mmsi': 205790000, 'ship_type': None, 'dim_stern': None,
     'ship_type_txt': None, 'imo': None, 'vessel_name': None, 'lon': np.array([-68.1179915, -60.5879110, -50.14684933], dtype=np.float32),
     'lat': np.array([49.29140307, 47.5648122,51.5698713], dtype=np.float32), 'time': np.array([1625148653, 1625160623 ,1625176600], dtype=np.uint32),
     'sog': np.array([14.4, 14.4, 14.5], dtype=np.float32), 'cog': np.array([68, 69,71], dtype=np.uint32),
     'static': {'dim_star', 'dim_port', 'dim_bow', 'mmsi', 'ship_type', 'dim_stern', 'ship_type_txt', 'imo',
                'vessel_name'}, 'dynamic': {'sog', 'cog', 'lat', 'time', 'lon'}}



track_short = [track_]

lin_tracks = interp_time(track_short,step=timedelta(hours=0.5))
lin_tracks = [x for x in lin_tracks]
geo_Tracks_ = geo_interp_time(track_short, step=timedelta(hours=0.5))
geo_Tracks_ = [x for x in geo_Tracks_]

bary_Tracks = barycentric_interp_time(track_short, step=timedelta(hours=0.5))
bary_Tracks = [x for x in bary_Tracks]

geo_lin_tracks = interp_time(geo_interp_time(track_short, step=timedelta(minutes=1)),step=timedelta(hours=0.5))
bary_geo_tracks = barycentric_interp_time(geo_interp_time(track_short, step=timedelta(hours=0.5)), step=timedelta(hours=0.2))

Last updated