@@ -957,15 +957,15 @@ async def _resolve_primary_window(self) -> SessionWindow | None:
957957 return await self ._select_window (primary .windows , source = "index" )
958958
959959 @staticmethod
960- def _index_unavailable (primary : ScheduleFetchResult ) -> tuple [ bool , str ] :
960+ def _fallback_context (primary : ScheduleFetchResult ) -> str :
961961 if primary .last_error :
962- return True , f "index error: { primary . last_error } "
962+ return "index- error"
963963 status = primary .index_http_status
964964 if status is not None and status != 200 :
965- return True , f"index unavailable: HTTP { status } "
965+ return f"index- unavailable-http- { status } "
966966 if not primary .windows :
967- return True , "index unavailable: no valid session windows "
968- return False , "index healthy "
967+ return "index-empty "
968+ return "index-stale "
969969
970970 async def _resolve_window (self ) -> tuple [SessionWindow | None , str ]:
971971 primary = await self ._index_source .async_fetch_windows (
@@ -987,8 +987,8 @@ async def _resolve_window(self) -> tuple[SessionWindow | None, str]:
987987 return primary_window , "index"
988988
989989 status = primary .index_http_status
990- index_unavailable , fallback_context = self ._index_unavailable (primary )
991- if not index_unavailable :
990+ fallback_context = self ._fallback_context (primary )
991+ if self . _fallback_source is None :
992992 self ._set_schedule_state (
993993 source = "none" ,
994994 fallback_active = False ,
@@ -997,14 +997,13 @@ async def _resolve_window(self) -> tuple[SessionWindow | None, str]:
997997 )
998998 return None , "none"
999999
1000- if self ._fallback_source is None :
1001- self . _set_schedule_state (
1002- source = "none" ,
1003- fallback_active = False ,
1004- index_http_status = status ,
1005- error = primary . last_error ,
1000+ if self ._should_log (
1001+ f"fallback_probe_ { fallback_context } " , interval_seconds = 300
1002+ ):
1003+ _LOGGER . info (
1004+ "Index has no selectable session window; trying event-tracker fallback (%s)" ,
1005+ fallback_context ,
10061006 )
1007- return None , "none"
10081007
10091008 fallback_result = await self ._fallback_source .async_fetch_windows (
10101009 pre_window = self ._pre_window ,
@@ -1204,6 +1203,8 @@ async def _session_finished(self, window: SessionWindow) -> bool:
12041203 data = await self ._fetch_json (url )
12051204 except Exception : # noqa: BLE001
12061205 return False
1206+ if not isinstance (data , dict ):
1207+ return False
12071208 status = (data or {}).get ("Status" )
12081209 started = (data or {}).get ("Started" )
12091210 if status in SESSION_END_STATES :
@@ -1219,4 +1220,39 @@ async def _fetch_json(self, url: str) -> Any:
12191220 return None
12201221 resp .raise_for_status ()
12211222 text = await resp .text ()
1222- return json .loads (text .lstrip ("\ufeff " ))
1223+ payload = (text or "" ).lstrip ("\ufeff " ).strip ()
1224+ if not payload :
1225+ return None
1226+ try :
1227+ return json .loads (payload )
1228+ except json .JSONDecodeError :
1229+ # Some jsonStream endpoints may return concatenated values.
1230+ decoder = json .JSONDecoder ()
1231+ cursor = 0
1232+ last_structured : Any = None
1233+ while cursor < len (payload ):
1234+ while cursor < len (payload ) and payload [cursor ].isspace ():
1235+ cursor += 1
1236+ if cursor >= len (payload ):
1237+ break
1238+ try :
1239+ parsed , end = decoder .raw_decode (payload , cursor )
1240+ except json .JSONDecodeError :
1241+ next_candidates = [
1242+ pos
1243+ for pos in (
1244+ payload .find ("{" , cursor + 1 ),
1245+ payload .find ("[" , cursor + 1 ),
1246+ )
1247+ if pos != - 1
1248+ ]
1249+ if not next_candidates :
1250+ break
1251+ cursor = min (next_candidates )
1252+ continue
1253+ if isinstance (parsed , (dict , list )):
1254+ last_structured = parsed
1255+ cursor = end
1256+ if last_structured is not None :
1257+ return last_structured
1258+ raise
0 commit comments