Module blaseball_mike.events
Wrapper around the SSE events API.
Expand source code
"""
Wrapper around the SSE events API.
"""
import asyncio
from concurrent import futures
import ujson
from aiohttp_sse_client import client as sse_client
from aiohttp.client_exceptions import ClientPayloadError, ClientConnectorError, ServerDisconnectedError
async def stream_events(url='https://www.blaseball.com/events/streamData', retry_base=0.01, retry_max=300):
"""
Async generator for the events API.
`retry_base` will be the minimum time to delay if there's a connection error
`retry_max` is the maximum time to delay if there's a connection error
"""
retry_delay = retry_base
while True:
try:
async with sse_client.EventSource(url, read_bufsize=2 ** 18) as src:
async for event in src:
retry_delay = retry_base # reset backoff
if not event.data:
continue
payload = ujson.loads(event.data)['value']
yield payload
except (ConnectionError,
TimeoutError,
ClientPayloadError,
futures.TimeoutError,
asyncio.exceptions.TimeoutError,
ClientConnectorError,
ServerDisconnectedError):
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, retry_max)
Functions
async def stream_events(url='https://www.blaseball.com/events/streamData', retry_base=0.01, retry_max=300)
-
Async generator for the events API.
retry_base
will be the minimum time to delay if there's a connection errorretry_max
is the maximum time to delay if there's a connection errorExpand source code
async def stream_events(url='https://www.blaseball.com/events/streamData', retry_base=0.01, retry_max=300): """ Async generator for the events API. `retry_base` will be the minimum time to delay if there's a connection error `retry_max` is the maximum time to delay if there's a connection error """ retry_delay = retry_base while True: try: async with sse_client.EventSource(url, read_bufsize=2 ** 18) as src: async for event in src: retry_delay = retry_base # reset backoff if not event.data: continue payload = ujson.loads(event.data)['value'] yield payload except (ConnectionError, TimeoutError, ClientPayloadError, futures.TimeoutError, asyncio.exceptions.TimeoutError, ClientConnectorError, ServerDisconnectedError): await asyncio.sleep(retry_delay) retry_delay = min(retry_delay * 2, retry_max)