feat: upodate uow
This commit is contained in:
@@ -17,25 +17,40 @@ class DeleteAvatarCommand:
|
||||
|
||||
@transactional
|
||||
async def _load_user(self, user_id: str) -> UserEntity:
|
||||
return await self._unit_of_work.user_repository.get_user_by_id(user_id)
|
||||
user = await self._unit_of_work.user_repository.get_user_by_id(user_id)
|
||||
self._logger.debug(f'DeleteAvatar _load_user user_id={user_id} has_avatar_link={bool(user.avatar_link)}')
|
||||
return user
|
||||
|
||||
async def __call__(self, user_id: str) -> UserEntity:
|
||||
prior = await self._load_user(user_id)
|
||||
link = prior.avatar_link
|
||||
self._logger.info(f'DeleteAvatar start user_id={user_id} had_link={bool(link)}')
|
||||
if link:
|
||||
key = self._s3.object_key_from_public_url(link)
|
||||
self._logger.debug(f'DeleteAvatar parsed_object_key user_id={user_id} has_key={bool(key)}')
|
||||
if not key:
|
||||
self._logger.warning(
|
||||
f'DeleteAvatar could not parse avatar URL for S3 user_id={user_id} link_len={len(link)}'
|
||||
)
|
||||
if key:
|
||||
self._logger.info(f'DeleteAvatar S3 delete start user_id={user_id} key={key}')
|
||||
try:
|
||||
await self._s3.delete_object(key=key)
|
||||
self._logger.info(f'DeleteAvatar S3 delete done user_id={user_id} key={key}')
|
||||
except ClientError as exc:
|
||||
code = exc.response.get('Error', {}).get('Code', '')
|
||||
if code not in ('NoSuchKey', '404'):
|
||||
self._logger.warning(f'S3 delete avatar failed user_id={user_id} code={code}: {exc}')
|
||||
self._logger.warning(f'DeleteAvatar S3 delete failed user_id={user_id} code={code}: {exc}')
|
||||
else:
|
||||
self._logger.debug(f'DeleteAvatar S3 object already absent user_id={user_id} code={code}')
|
||||
user = await self._clear_avatar_link(user_id)
|
||||
self._logger.debug(f'DeleteAvatar DB cleared user_id={user_id} entity_has_link={bool(user.avatar_link)}')
|
||||
await self._cache.set_user(user_id, user)
|
||||
self._logger.debug(f'DeleteAvatar cache updated user_id={user_id}')
|
||||
self._logger.info(f'Avatar removed user_id={user_id}')
|
||||
return user
|
||||
|
||||
@transactional
|
||||
async def _clear_avatar_link(self, user_id: str) -> UserEntity:
|
||||
self._logger.debug(f'DeleteAvatar DB transaction set_avatar_link user_id={user_id} link=None')
|
||||
return await self._unit_of_work.user_repository.set_avatar_link(user_id, None)
|
||||
|
||||
@@ -25,11 +25,16 @@ class SetAvatarCommand:
|
||||
|
||||
@transactional
|
||||
async def _load_user(self, user_id: str) -> UserEntity:
|
||||
return await self._unit_of_work.user_repository.get_user_by_id(user_id)
|
||||
user = await self._unit_of_work.user_repository.get_user_by_id(user_id)
|
||||
self._logger.debug(f'Avatar _load_user user_id={user_id} has_avatar_link={bool(user.avatar_link)}')
|
||||
return user
|
||||
|
||||
async def __call__(self, user_id: str, image_bytes: bytes) -> tuple[UserEntity, int]:
|
||||
prior = await self._load_user(user_id)
|
||||
old_link = prior.avatar_link
|
||||
self._logger.info(
|
||||
f'SetAvatar start user_id={user_id} input_bytes={len(image_bytes)} had_previous_link={bool(old_link)}'
|
||||
)
|
||||
try:
|
||||
webp_bytes = image_bytes_to_webp(image_bytes)
|
||||
except UnidentifiedImageError as exc:
|
||||
@@ -38,6 +43,8 @@ class SetAvatarCommand:
|
||||
self._logger.exception(str(exc))
|
||||
raise BadRequestException(message='Could not process image') from exc
|
||||
|
||||
self._logger.debug(f'SetAvatar webp_ready bytes={len(webp_bytes)}')
|
||||
|
||||
pid = user_id.replace('/', '').replace('.', '_')
|
||||
name_id = str(ULID())
|
||||
ts = int(datetime.now(timezone.utc).timestamp() * 1000)
|
||||
@@ -45,28 +52,49 @@ class SetAvatarCommand:
|
||||
fname = f'{name_id}_{pid}_{ts}.webp'
|
||||
object_key = f'{prefix}/{fname}' if prefix else fname
|
||||
|
||||
self._logger.info(f'SetAvatar S3 upload start user_id={user_id} key={object_key} webp_bytes={len(webp_bytes)}')
|
||||
|
||||
try:
|
||||
url = await self._s3.upload_bytes(key=object_key, body=webp_bytes, content_type='image/webp')
|
||||
except ClientError as exc:
|
||||
self._logger.exception(str(exc))
|
||||
raise ServiceUnavailableException(message='S3 upload failed') from exc
|
||||
|
||||
self._logger.info(f'SetAvatar S3 upload done user_id={user_id} key={object_key} public_url_len={len(url)}')
|
||||
|
||||
user = await self._save_avatar_link(user_id, url)
|
||||
self._logger.info(
|
||||
f'SetAvatar DB updated user_id={user_id} key={object_key} '
|
||||
f'entity_avatar_link_len={len(user.avatar_link or "")}'
|
||||
)
|
||||
await self._cache.set_user(user_id, user)
|
||||
self._logger.debug(f'SetAvatar cache updated user_id={user_id}')
|
||||
|
||||
if old_link:
|
||||
old_key = self._s3.object_key_from_public_url(old_link)
|
||||
if old_key and old_key != object_key:
|
||||
if not old_key:
|
||||
self._logger.warning(
|
||||
f'SetAvatar could not parse old avatar URL for S3 delete user_id={user_id} '
|
||||
f'old_link_len={len(old_link)}'
|
||||
)
|
||||
elif old_key == object_key:
|
||||
self._logger.debug(f'SetAvatar skip delete same object key user_id={user_id} key={object_key}')
|
||||
else:
|
||||
self._logger.info(f'SetAvatar S3 delete old object user_id={user_id} old_key={old_key}')
|
||||
try:
|
||||
await self._s3.delete_object(key=old_key)
|
||||
self._logger.info(f'SetAvatar S3 old object removed user_id={user_id} old_key={old_key}')
|
||||
except ClientError as exc:
|
||||
code = exc.response.get('Error', {}).get('Code', '')
|
||||
if code not in ('NoSuchKey', '404'):
|
||||
self._logger.warning(f'S3 delete old avatar failed user_id={user_id} code={code}: {exc}')
|
||||
else:
|
||||
self._logger.debug(f'SetAvatar old object already gone user_id={user_id} code={code}')
|
||||
|
||||
self._logger.info(f'Avatar set for user_id={user_id} key={object_key}')
|
||||
return user, len(webp_bytes)
|
||||
|
||||
@transactional
|
||||
async def _save_avatar_link(self, user_id: str, avatar_link: str) -> UserEntity:
|
||||
self._logger.debug(f'SetAvatar DB transaction set_avatar_link user_id={user_id} link_len={len(avatar_link)}')
|
||||
return await self._unit_of_work.user_repository.set_avatar_link(user_id, avatar_link)
|
||||
|
||||
@@ -15,6 +15,7 @@ class UnitOfWork(IUnitOfWork):
|
||||
self._logger: ILogger = logger
|
||||
|
||||
async def __aenter__(self):
|
||||
self._logger.debug('UnitOfWork enter')
|
||||
self._user_repository = None
|
||||
self._session_repository = None
|
||||
self._session = self.session_factory()
|
||||
@@ -22,14 +23,15 @@ class UnitOfWork(IUnitOfWork):
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
if exc_type:
|
||||
self._logger.error(str(exc_val))
|
||||
self._logger.error(f'UnitOfWork rollback_on_error exc_type={exc_type.__name__} exc_val={exc_val!r}')
|
||||
await self._session.rollback()
|
||||
self._logger.error(f'Rollback: str{exc_val})')
|
||||
self._logger.debug(f'UnitOfWork session rollback done exc_type={exc_type.__name__}')
|
||||
else:
|
||||
await self._session.flush()
|
||||
await self._session.commit()
|
||||
self._logger.debug('Commit')
|
||||
self._logger.debug('UnitOfWork commit')
|
||||
await self._session.close()
|
||||
self._logger.debug('UnitOfWork exit session closed')
|
||||
|
||||
@property
|
||||
def user_repository(self) -> IUserRepository:
|
||||
|
||||
Reference in New Issue
Block a user