Skip to content
Draft
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 64 additions & 14 deletions src/prx/rinex_nav/evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,21 @@
import georinex
from prx import util
from prx import constants
from prx.util import timeit, repair_with_gfzrnx
from prx.util import timeit, repair_with_gfzrnx, compute_gps_utc_leap_seconds

log = logging.getLogger(__name__)


def detect_leap_second_insertion(df: pd.DataFrame):
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adds an additional check that asserts even when the nav file does not have leap seconds.

t1 = df["ephemeris_reference_time_isagpst"].min() - pd.Timedelta(days=1)
t2 = df["ephemeris_reference_time_isagpst"].max() - pd.Timedelta(days=2)
ls_1 = compute_gps_utc_leap_seconds(yyyy=t1.year, doy=t1.dayofyear)
ls_2 = compute_gps_utc_leap_seconds(yyyy=t2.year, doy=t2.dayofyear)
assert (
ls_1 == ls_2
), "Leap second change within ephemeris set, this case is not tested, aborting."


@timeit
def parse_rinex_nav_file(rinex_file: Path):
@util.disk_cache.cache(ignore=["rinex_file_path"])
Expand All @@ -24,6 +34,7 @@ def cached_load(rinex_file_path: Path, file_hash: str):
rinex_file_path
)
df = convert_nav_dataset_to_dataframe(ds)
detect_leap_second_insertion(df)
df["source"] = str(rinex_file_path.resolve())
df["ephemeris_hash"] = pd.util.hash_pandas_object(df, index=False).astype(str)
return df
Expand All @@ -45,9 +56,9 @@ def time_scale_integer_second_offset_wrt_gpst(time_scale, utc_gpst_leap_seconds=
if time_scale == "BDT":
return pd.Timedelta(seconds=-14)
if time_scale == "GLONASST":
assert utc_gpst_leap_seconds is not None, (
"Need GPST-UTC leap seconds to compute GLONASST integer second offset w.r.t. GPST"
)
assert (
utc_gpst_leap_seconds is not None
), "Need GPST-UTC leap seconds to compute GLONASST integer second offset w.r.t. GPST"
return pd.Timedelta(seconds=-utc_gpst_leap_seconds)
assert False, f"Unexpected time scale: {time_scale}"

Expand Down Expand Up @@ -441,28 +452,67 @@ def compute_ephemeris_and_clock_offset_reference_times(group):
"IRNSST": "GPSWeek",
}
group_time_scale = group["time_scale"].iloc[0]
group["clock_offset_reference_time_system_time"] = group["time"]
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this line for readability, no functional change.

if group_time_scale not in ["GLONASST", "SBAST"]:
full_seconds = (
full_seconds_toe = (
group[week_field[group_time_scale]] * constants.cSecondsPerWeek
+ group["Toe"]
)
group["ephemeris_reference_time_system_time"] = (
constants.system_time_scale_rinex_utc_epoch[group_time_scale]
+ pd.to_timedelta(full_seconds, unit="seconds")
+ pd.to_timedelta(full_seconds_toe, unit="seconds")
)
else:
# For SBAS and GLONASS there are no separate ephemeris reference time fields
group["ephemeris_reference_time_system_time"] = group["time"]
# The first derivative of the clock offset is in a different field for SBAS and GLONASS
group["SVclockDrift"] = group["SVrelFreqBias"]
# And the second derivative is zero, i.e. the constellation ground segment uses a fist-order clock model
# And the second derivative is zero, i.e. the constellation ground segment uses a first-order clock model
group["SVclockDriftRate"] = 0
# Make sure we have all navigation message transmission times in the same column
group["TransTime"] = group["MessageFrameTime"]
# Compute time of transmission of the ephemeris
if group_time_scale != "GLONASST":
full_seconds_tot = (
group[week_field[group_time_scale]] * constants.cSecondsPerWeek
+ group["TransTime"]
)
group["ephemeris_transmission_time_system_time"] = (
constants.system_time_scale_rinex_utc_epoch[group_time_scale]
+ pd.to_timedelta(full_seconds_tot, unit="seconds")
)
else:
# For GLONASS we need to figure out the start of the UTC week the transmission time refers to.
# Note that GLONASS UTC weeks start on Sundays 00h00, not Mondays like ISO UTC weeks.
arbitrary_utc_week_start = pd.Timestamp("1980-01-06 00:00:00")
toe_s = (
group["ephemeris_reference_time_system_time"] - arbitrary_utc_week_start
) / pd.Timedelta(seconds=1)
toe = group["ephemeris_reference_time_system_time"]
toe_week_number = np.floor(toe_s / constants.cSecondsPerWeek)
tot = arbitrary_utc_week_start + pd.to_timedelta(
(toe_week_number * constants.cSecondsPerWeek + group["TransTime"]),
unit="seconds",
)
# Close to week boundaries we can be off by one week, but we know that the time of transmission is
# close to the ephemeris reference time:
is_late = (tot - toe) > pd.Timedelta(days=1)
is_early = (tot - toe) < -pd.Timedelta(days=1)
tot[is_late] -= pd.Timedelta(weeks=1)
tot[is_early] += pd.Timedelta(weeks=1)
assert (tot - toe).abs().max() < pd.Timedelta(days=1)
group["ephemeris_transmission_time_system_time"] = tot
# Convert everything to integer-second-aligned GPST
group["ephemeris_transmission_time_isagpst"] = to_isagpst(
group["ephemeris_transmission_time_system_time"],
group_time_scale,
int(nav_ds.attrs["utc_gpst_leap_seconds"]),
)
group["ephemeris_reference_time_isagpst"] = to_isagpst(
group["ephemeris_reference_time_system_time"],
group_time_scale,
int(nav_ds.attrs["utc_gpst_leap_seconds"]),
)
group["clock_offset_reference_time_system_time"] = group["time"]
group["clock_reference_time_isagpst"] = to_isagpst(
group["clock_offset_reference_time_system_time"],
group_time_scale,
Expand Down Expand Up @@ -521,9 +571,9 @@ def compute_gal_inav_fnav_indicators(df):
)
# We expect only the following navigation message types for Galileo:
indicators = set(df[is_gal].fnav_or_inav_indicator.unique())
assert len(indicators.intersection({1, 2, 4, 5})) == len(indicators), (
f"Unexpected Galileo navigation message type: {indicators}"
)
assert len(indicators.intersection({1, 2, 4, 5})) == len(
indicators
), f"Unexpected Galileo navigation message type: {indicators}"
df.loc[is_gal & (df.fnav_or_inav_indicator == 1), "fnav_or_inav"] = "inav"
df.loc[is_gal & (df.fnav_or_inav_indicator == 2), "fnav_or_inav"] = "fnav"
df.loc[is_gal & (df.fnav_or_inav_indicator == 4), "fnav_or_inav"] = "inav"
Expand All @@ -545,9 +595,9 @@ def to_isagpst(time, timescale, gpst_utc_leapseconds):
)
)

assert False, (
f"Unexpected types: time is {type(time)}, timescale is {type(timescale)}"
)
assert (
False
), f"Unexpected types: time is {type(time)}, timescale is {type(timescale)}"


@timeit
Expand Down
Loading