Skip to content

Commit

Permalink
code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
rleidner committed Mar 2, 2025
1 parent 00f2e0d commit 632c6a2
Showing 1 changed file with 92 additions and 124 deletions.
216 changes: 92 additions & 124 deletions packages/modules/vehicles/bmwbc/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
DATA_PATH = Path(__file__).resolve().parents[4] / "data" / "modules" / "bmwbc"
ts_fmt = '%Y-%m-%dT%H:%M:%S'
dt_fmt = '%Y-%m-%d %H:%M:%S'
api = None

log = getLogger(__name__)

Expand Down Expand Up @@ -147,125 +146,105 @@ async def _fetch_soc(self,

storeFile = str(DATA_PATH) + '/soc_bmwbc_vh_' + str(vnum) + '.json'

# check for secondary vehicle, i.e. use file from ramdisk
if captcha_token == "SECONDARY":
try:
# initialize return values in case we get into trouble
soc = 0
range = 0.0
soc_tsX = 0.0
if self._account is not None:
vehicle = self._account.get_vehicle(vin)

# get json of vehicle data
resp = dumps(vehicle, cls=MyBMWJSONEncoder, indent=4)

# vehicle data - json to dict
respd = loads(resp)
state = respd['data']['state']

# get soc, range, lastUpdated from vehicle data
soc = int(state['electricChargingState']['chargingLevelPercent'])
range = float(state['electricChargingState']['range'])
lastUpdatedAt = state['lastUpdatedAt']
# convert lastUpdatedAt to soc_timestamp
soc_tsdtZ = datetime.strptime(lastUpdatedAt, ts_fmt + "Z")
soc_tsX = datetime.timestamp(soc_tsdtZ)
else:
log.warning("existing _account is None")
raise RequestFailed("SoC Request: CD Account not initialized")

log.info("SECONDARY: SOC/Range: " + str(soc) + '%/' + str(range) + 'KM@' + lastUpdatedAt)
return soc, range, soc_tsX
if captcha_token != "SECONDARY":
self._mode = "PRIMARY "
# set logging in httpx to WARNING to prevent unwanted log messages from bimmeer connected
getLogger("httpx").setLevel(WARNING)

try:
# set logging in httpx to WARNING to prevent unwanted log messages from bimmeer connected
getLogger("httpx").setLevel(WARNING)

if self._store is None:
self._store = load_store()
if self._store is None:
self._store = load_store()

# check for captcha_token in store
if 'captcha_token' not in self._store:
# captcha token not in self._store - add current captcha_token
log.debug("initialize captcha token in store")
self._store['captcha_token'] = captcha_token
write_store(self._store)
else:
# last used captcha token in store, compare with captcha_token in configuration
if self._store['captcha_token'] != captcha_token:
# invalidate current refresh and access token to force new login
log.debug("new captcha token configured - invalidate stored token set")
self._store['expires_at'] = None
self._store['access_token'] = None
self._store['refresh_token'] = None
self._store['session_id'] = None
self._store['gcid'] = None
# check for captcha_token in store
if 'captcha_token' not in self._store:
# captcha token not in self._store - add current captcha_token
log.debug("initialize captcha token in store")
self._store['captcha_token'] = captcha_token
write_store(self._store)
else:
log.debug("captcha token unchanged")

if self._store['expires_at'] is not None and \
self._store['refresh_token'] is not None and \
self._store['access_token'] is not None:
# authenticate via refresh and access token
# user_id, password are provided in case these are required
log.info("authenticate via current token set")
expires_at = datetime.fromisoformat(self._store['expires_at'])
if self._auth is None:
self._auth = MyBMWAuthentication(user_id, password, Regions.REST_OF_WORLD,
refresh_token=self._store['refresh_token'],
access_token=self._store['access_token'],
expires_at=expires_at)
else:
# no token, authenticate via user_id, password and captcha_token
log.info("authenticate via userid, password, captcha token")
if self._auth is None:
log.debug("# Create _auth instance")
self._auth = MyBMWAuthentication(user_id,
password,
Regions.REST_OF_WORLD,
hcaptcha_token=captcha_token)
# last used captcha token in store, compare with captcha_token in configuration
if self._store['captcha_token'] != captcha_token:
# invalidate current refresh and access token to force new login
log.debug("new captcha token configured - invalidate stored token set")
self._store['expires_at'] = None
self._store['access_token'] = None
self._store['refresh_token'] = None
self._store['session_id'] = None
self._store['gcid'] = None
else:
log.debug("captcha token unchanged")

if self._store['expires_at'] is not None and \
self._store['refresh_token'] is not None and \
self._store['access_token'] is not None:
# authenticate via refresh and access token
# user_id, password are provided in case these are required
log.info("authenticate via current token set")
expires_at = datetime.fromisoformat(self._store['expires_at'])
if self._auth is None:
self._auth = MyBMWAuthentication(user_id, password, Regions.REST_OF_WORLD,
refresh_token=self._store['refresh_token'],
access_token=self._store['access_token'],
expires_at=expires_at)
else:
log.debug("# Reuse _auth instance")

# set session_id and gcid in _auth to store values
if self._store['session_id'] is not None:
self._auth.session_id = self._store['session_id']
if self._store['gcid'] is not None:
self._auth.gcid = self._store['gcid']

# instantiate client configuration object is not existent yet
if self._clconf is None:
log.debug("# Create _clconf instance")
self._clconf = MyBMWClientConfiguration(self._auth)
else:
log.debug("# Reuse _clconf instance")

# instantiate account object of not existent yet
if self._account is None:
log.debug("# Create _account instance")
# user, password and region already set in BMWAuthentication/ClientConfiguration!
self._account = MyBMWAccount(None, None, None, config=self._clconf, hcaptcha_token=captcha_token)
self._account.set_refresh_token(refresh_token=self._store['refresh_token'],
gcid=self._store['gcid'],
access_token=self._store['access_token'],
session_id=self._store['session_id'])
# no token, authenticate via user_id, password and captcha_token
log.info("authenticate via userid, password, captcha token")
if self._auth is None:
log.debug("# Create _auth instance")
self._auth = MyBMWAuthentication(user_id,
password,
Regions.REST_OF_WORLD,
hcaptcha_token=captcha_token)
else:
log.debug("# Reuse _auth instance")

# set session_id and gcid in _auth to store values
if self._store['session_id'] is not None:
self._auth.session_id = self._store['session_id']
if self._store['gcid'] is not None:
self._auth.gcid = self._store['gcid']

# instantiate client configuration object is not existent yet
if self._clconf is None:
log.debug("# Create _clconf instance")
self._clconf = MyBMWClientConfiguration(self._auth)
else:
log.debug("# Reuse _clconf instance")

# instantiate account object of not existent yet
if self._account is None:
log.debug("# Create _account instance")
# user, password and region already set in BMWAuthentication/ClientConfiguration!
self._account = MyBMWAccount(None, None, None, config=self._clconf, hcaptcha_token=captcha_token)
self._account.set_refresh_token(refresh_token=self._store['refresh_token'],
gcid=self._store['gcid'],
access_token=self._store['access_token'],
session_id=self._store['session_id'])
else:
log.debug("# Reuse _account instance")

# experimental: login when expires_at is reached to force token refresh
if self._store['expires_at'] is not None:
expires_at = datetime.fromisoformat(self._store['expires_at'])
nowdt = datetime.now(expires_at.tzinfo)

if nowdt > expires_at:
log.debug("# Proactive login to force refresh token before get_vehicles")
log.debug("# before proactive login:" + str(self._auth.expires_at) +
"/" + self._auth.refresh_token)
await self._auth.login()
log.debug("# after proactive login:" + str(self._auth.expires_at) +
"/" + self._auth.refresh_token)

# get vehicle list - needs to be called async
await self._account.get_vehicles()
else:
log.debug("# Reuse _account instance")

# experimental: login when expires_at is reached to force token refresh
if self._store['expires_at'] is not None:
expires_at = datetime.fromisoformat(self._store['expires_at'])
nowdt = datetime.now(expires_at.tzinfo)

if nowdt > expires_at:
log.debug("# Proactive login to force refresh token before get_vehicles")
log.debug("# before proactive login:" + str(self._auth.expires_at) +
"/" + self._auth.refresh_token)
await self._auth.login()
log.debug("# after proactive login:" + str(self._auth.expires_at) +
"/" + self._auth.refresh_token)

# get vehicle list - needs to be called async
await self._account.get_vehicles()
self._mode = "SECONDARY"

# get vehicle data for vin
vehicle = self._account.get_vehicle(vin)
Expand All @@ -288,7 +267,7 @@ async def _fetch_soc(self,
# save the vehicle data for further analysis if required
dump_json(respd, '/soc_bmwbc_reply_vehicle_' + str(vnum))

log.info("PRIMARY SOC/Range: " + str(soc) + '%/' + str(range) + 'KM@' + lastUpdatedAt)
log.info(self._mode + " SOC/Range: " + str(soc) + '%/' + str(range) + 'KM@' + lastUpdatedAt)

# store token and expires_at if changed
expires_at = datetime.isoformat(self._auth.expires_at)
Expand Down Expand Up @@ -321,23 +300,12 @@ async def _fetch_soc(self,

# main entry - _fetch needs to be run async
def fetch_soc(user_id: str, password: str, vin: str, captcha_token: str, vnum: int) -> CarState:
global api
_lock = Lock()

# prepare and call async method
loop = new_event_loop()
set_event_loop(loop)

# instantiate only 1 instance of Api
if api is None:
with _lock:
if api is None:
api = Api()
log.debug("# instantiate Api as api")
else:
log.debug("# reuse instance Api as api (L)")
else:
log.debug("# reuse instance Api as api")
api = Api()

# get soc, range, soc_timestamp from server
soc, range, soc_tsX = loop.run_until_complete(api._fetch_soc(user_id, password, vin, captcha_token, vnum))
Expand Down

0 comments on commit 632c6a2

Please sign in to comment.