feat: add vault
This commit is contained in:
392
src/main.py
Normal file
392
src/main.py
Normal file
@@ -0,0 +1,392 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import ssl
|
||||
import gzip
|
||||
import math
|
||||
from datetime import datetime, timezone
|
||||
from urllib.request import Request, urlopen
|
||||
from dotenv import load_dotenv
|
||||
import redis.asyncio as redis
|
||||
from patchright.async_api import async_playwright
|
||||
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
INTERVAL_SEC = int(os.getenv('INTERVAL_SEC', '20'))
|
||||
DIGITS_AFTER_DECIMAL = int(os.getenv('DIGITS_AFTER_DECIMAL', '5'))
|
||||
USDT_URL = os.getenv('USDT_URL', 'https://tradex.by/trading/ru/usdt/rub')
|
||||
USDC_URL = os.getenv('USDC_URL', 'https://tradex.by/trading/ru/usdc/rub')
|
||||
PRICE_SELECTOR = os.getenv('PRICE_SELECTOR', 'div.ng-star-inserted:has(span) > span')
|
||||
PRICE_INDEX = int(os.getenv('PRICE_INDEX', '2'))
|
||||
USDT_ADD_RUB = float(os.getenv('USDT_ADD_RUB', '2.5'))
|
||||
USDC_ADD_RUB = float(os.getenv('USDC_ADD_RUB', '2.0'))
|
||||
FREEZEX_TICKER_URL = os.getenv('FREEZEX_TICKER_URL', 'https://cryptottlivewebapi.free2ex.net:8443/api/v2/public/ticker')
|
||||
FREEZEX_SYMBOL = os.getenv('FREEZEX_SYMBOL', 'USDTRUB')
|
||||
FREEZEX_ADD_RUB = float(os.getenv('FREEZEX_ADD_RUB', '2.0'))
|
||||
REDIS_KEY_LAST = os.getenv('REDIS_KEY_LAST', 'tradex:rub:last')
|
||||
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper()
|
||||
USE_CHROME_CHANNEL = os.getenv('USE_CHROME_CHANNEL', '0') == '1'
|
||||
HTTP_USER_AGENT = os.getenv('HTTP_USER_AGENT', 'curl/8.0.0')
|
||||
HTTP_TIMEOUT_SEC = float(os.getenv('HTTP_TIMEOUT_SEC', '30'))
|
||||
|
||||
VAULT_ADDR = os.getenv('VAULT_ADDR', '').rstrip('/')
|
||||
VAULT_MOUNT_POINT = os.getenv('VAULT_MOUNT_POINT', '')
|
||||
VAULT_ROLE_ID = os.getenv('VAULT_ROLE_ID', '')
|
||||
VAULT_SECRET_ID = os.getenv('VAULT_SECRET_ID', '')
|
||||
VAULT_SECRET_PATH = os.getenv('VAULT_SECRET_PATH', 'keydb')
|
||||
|
||||
KEYDB_KEY_RATE = os.getenv('KEYDB_KEY_RATE', 'tradex:rub:rate')
|
||||
|
||||
logger = logging.getLogger('tradexparser')
|
||||
|
||||
|
||||
def setup_logging() -> None:
|
||||
logging.basicConfig(
|
||||
level=LOG_LEVEL,
|
||||
format='%(asctime)s | %(levelname)s | %(name)s | %(message)s',
|
||||
)
|
||||
logging.getLogger('websockets').setLevel(logging.WARNING)
|
||||
logging.getLogger('asyncio').setLevel(logging.WARNING)
|
||||
|
||||
|
||||
def ceil_to_2(value: float) -> float:
|
||||
return math.ceil(value * 100.0) / 100.0
|
||||
|
||||
|
||||
def to_float_str(text: str | None) -> str | None:
|
||||
if not text:
|
||||
return None
|
||||
t = text.split('\n')[0].strip().replace(',', '.')
|
||||
try:
|
||||
return f'{float(t):.{DIGITS_AFTER_DECIMAL}f}'
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def fmt_rate(value: float) -> str:
|
||||
return f'{value:.{DIGITS_AFTER_DECIMAL}f}'
|
||||
|
||||
|
||||
def fmt_ceil_2(rate_str: str | None) -> str | None:
|
||||
if rate_str is None:
|
||||
return None
|
||||
try:
|
||||
v = float(rate_str)
|
||||
except ValueError:
|
||||
return None
|
||||
return f'{ceil_to_2(v):.2f}'
|
||||
|
||||
|
||||
def add_to_rate(rate_str: str | None, add_rub: float) -> str | None:
|
||||
if rate_str is None:
|
||||
return None
|
||||
try:
|
||||
return fmt_rate(float(rate_str) + add_rub)
|
||||
except ValueError:
|
||||
return rate_str
|
||||
|
||||
|
||||
def avg_rates(a: str | None, b: str | None) -> str | None:
|
||||
if a is None and b is None:
|
||||
return None
|
||||
if a is None:
|
||||
return b
|
||||
if b is None:
|
||||
return a
|
||||
try:
|
||||
return fmt_rate((float(a) + float(b)) / 2.0)
|
||||
except ValueError:
|
||||
return a
|
||||
|
||||
|
||||
def _http_get_json(url: str) -> object:
|
||||
req = Request(
|
||||
url,
|
||||
headers={
|
||||
'accept': 'application/json',
|
||||
'user-agent': HTTP_USER_AGENT,
|
||||
'accept-encoding': 'gzip',
|
||||
},
|
||||
)
|
||||
ctx = ssl.create_default_context()
|
||||
with urlopen(req, timeout=HTTP_TIMEOUT_SEC, context=ctx) as resp:
|
||||
raw = resp.read()
|
||||
if str(resp.headers.get('Content-Encoding', '')).lower() == 'gzip':
|
||||
raw = gzip.decompress(raw)
|
||||
return json.loads(raw.decode('utf-8'))
|
||||
|
||||
|
||||
def _pick_mid_price(row: dict) -> float | None:
|
||||
def f(key: str) -> float | None:
|
||||
v = row.get(key)
|
||||
try:
|
||||
fv = float(v)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
return fv if fv != 0.0 else None
|
||||
|
||||
bid = f('BestBid')
|
||||
ask = f('BestAsk')
|
||||
if bid is not None and ask is not None:
|
||||
return (bid + ask) / 2.0
|
||||
|
||||
lb = f('LastBuyPrice')
|
||||
ls = f('LastSellPrice')
|
||||
if lb is not None and ls is not None:
|
||||
return (lb + ls) / 2.0
|
||||
|
||||
lp = f('LastPrice')
|
||||
if lp is not None:
|
||||
return lp
|
||||
|
||||
return ask if ask is not None else bid
|
||||
|
||||
|
||||
def _http_post_json(url: str, payload: dict) -> object:
|
||||
body = json.dumps(payload).encode('utf-8')
|
||||
req = Request(
|
||||
url,
|
||||
data=body,
|
||||
headers={
|
||||
'content-type': 'application/json',
|
||||
'accept': 'application/json',
|
||||
'user-agent': HTTP_USER_AGENT,
|
||||
},
|
||||
method='POST',
|
||||
)
|
||||
ctx = ssl.create_default_context()
|
||||
with urlopen(req, timeout=HTTP_TIMEOUT_SEC, context=ctx) as resp:
|
||||
raw = resp.read()
|
||||
if str(resp.headers.get('Content-Encoding', '')).lower() == 'gzip':
|
||||
raw = gzip.decompress(raw)
|
||||
return json.loads(raw.decode('utf-8'))
|
||||
|
||||
|
||||
def _vault_get_json(url: str, token: str) -> object:
|
||||
req = Request(
|
||||
url,
|
||||
headers={
|
||||
'accept': 'application/json',
|
||||
'user-agent': HTTP_USER_AGENT,
|
||||
'x-vault-token': token,
|
||||
},
|
||||
)
|
||||
ctx = ssl.create_default_context()
|
||||
with urlopen(req, timeout=HTTP_TIMEOUT_SEC, context=ctx) as resp:
|
||||
raw = resp.read()
|
||||
return json.loads(raw.decode('utf-8'))
|
||||
|
||||
|
||||
async def load_keydb_url_from_vault() -> str:
|
||||
if not VAULT_ADDR or not VAULT_MOUNT_POINT or not VAULT_ROLE_ID or not VAULT_SECRET_ID:
|
||||
raise RuntimeError('Vault env is not fully configured (VAULT_ADDR, VAULT_MOUNT_POINT, VAULT_ROLE_ID, VAULT_SECRET_ID)')
|
||||
|
||||
login_url = f'{VAULT_ADDR}/v1/auth/approle/login'
|
||||
login_payload = {'role_id': VAULT_ROLE_ID, 'secret_id': VAULT_SECRET_ID}
|
||||
login_resp = await asyncio.to_thread(_http_post_json, login_url, login_payload)
|
||||
if not isinstance(login_resp, dict):
|
||||
raise RuntimeError('Vault login failed: unexpected response')
|
||||
auth = login_resp.get('auth')
|
||||
if not isinstance(auth, dict):
|
||||
raise RuntimeError('Vault login failed: missing auth field')
|
||||
token = auth.get('client_token')
|
||||
if not isinstance(token, str) or not token:
|
||||
raise RuntimeError('Vault login failed: missing client_token')
|
||||
|
||||
secret_url = f'{VAULT_ADDR}/v1/{VAULT_MOUNT_POINT}/data/{VAULT_SECRET_PATH}'
|
||||
secret_resp = await asyncio.to_thread(_vault_get_json, secret_url, token)
|
||||
if not isinstance(secret_resp, dict):
|
||||
raise RuntimeError('Vault read failed: unexpected response')
|
||||
data = secret_resp.get('data')
|
||||
if not isinstance(data, dict):
|
||||
raise RuntimeError('Vault read failed: missing data field')
|
||||
data2 = data.get('data')
|
||||
if not isinstance(data2, dict):
|
||||
raise RuntimeError('Vault read failed: missing data.data field')
|
||||
|
||||
host = str(data2.get('host', '')).strip()
|
||||
port = str(data2.get('port', '')).strip()
|
||||
password = str(data2.get('password', '')).strip()
|
||||
database = str(data2.get('database', '')).strip()
|
||||
|
||||
if not host or not port or not password or database == '':
|
||||
raise RuntimeError('Vault secret is missing required keydb fields: host,port,password,database')
|
||||
|
||||
return f'redis://:{password}@{host}:{port}/{database}'
|
||||
|
||||
|
||||
async def read_freezex_usdt_rub() -> str | None:
|
||||
try:
|
||||
data = await asyncio.to_thread(_http_get_json, FREEZEX_TICKER_URL)
|
||||
if not isinstance(data, list):
|
||||
logger.warning('Freezex ticker unexpected payload type=%s', type(data).__name__)
|
||||
return None
|
||||
symbol_u = FREEZEX_SYMBOL.upper()
|
||||
for row in data:
|
||||
if not isinstance(row, dict):
|
||||
continue
|
||||
if str(row.get('Symbol', '')).upper() != symbol_u:
|
||||
continue
|
||||
price = _pick_mid_price(row)
|
||||
return fmt_rate(price) if price is not None else None
|
||||
logger.warning('Freezex ticker: symbol not found: %s', FREEZEX_SYMBOL)
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.warning('Freezex ticker fetch failed: %s', e)
|
||||
return None
|
||||
|
||||
|
||||
async def read_price(page) -> str | None:
|
||||
try:
|
||||
locator = page.locator(PRICE_SELECTOR).nth(PRICE_INDEX)
|
||||
raw = await locator.inner_text(timeout=5000)
|
||||
return to_float_str(raw)
|
||||
except Exception as e:
|
||||
logger.warning('Failed to read price: %s', e)
|
||||
return None
|
||||
|
||||
|
||||
async def write_to_redis(
|
||||
r: redis.Redis,
|
||||
ts_iso: str,
|
||||
usdt: str | None,
|
||||
usdc: str | None,
|
||||
extra: dict[str, str] | None = None,
|
||||
) -> None:
|
||||
data = {'ts': ts_iso}
|
||||
if usdt is not None:
|
||||
data['usdt_rub'] = usdt
|
||||
if usdc is not None:
|
||||
data['usdc_rub'] = usdc
|
||||
if extra:
|
||||
data.update(extra)
|
||||
await r.hset(REDIS_KEY_LAST, mapping=data)
|
||||
|
||||
|
||||
async def write_rate_value(r: redis.Redis, ts_iso: str, rate_str: str | None) -> None:
|
||||
if rate_str is None:
|
||||
return
|
||||
value2 = fmt_ceil_2(rate_str)
|
||||
if value2 is None:
|
||||
return
|
||||
await r.hset(KEYDB_KEY_RATE, mapping={'ts': ts_iso, 'value': value2})
|
||||
|
||||
|
||||
async def run_loop() -> None:
|
||||
setup_logging()
|
||||
logger.info(
|
||||
'Starting worker interval=%ss key=%s selector=%s index=%s freezex=%s symbol=%s vault=%s mount=%s secret=%s',
|
||||
INTERVAL_SEC, REDIS_KEY_LAST, PRICE_SELECTOR, PRICE_INDEX, FREEZEX_TICKER_URL, FREEZEX_SYMBOL, VAULT_ADDR, VAULT_MOUNT_POINT, VAULT_SECRET_PATH
|
||||
)
|
||||
|
||||
try:
|
||||
keydb_url = await load_keydb_url_from_vault()
|
||||
logger.info('KeyDB url loaded from vault')
|
||||
except Exception:
|
||||
logger.exception('Failed to load KeyDB config from vault')
|
||||
return
|
||||
|
||||
r = redis.from_url(keydb_url, decode_responses=True)
|
||||
try:
|
||||
await r.ping()
|
||||
logger.info('KeyDB connected')
|
||||
except Exception:
|
||||
logger.exception('KeyDB connection failed')
|
||||
return
|
||||
|
||||
async with async_playwright() as p:
|
||||
launch_kwargs = {'headless': True}
|
||||
if USE_CHROME_CHANNEL:
|
||||
launch_kwargs['channel'] = 'chrome'
|
||||
|
||||
browser = await p.chromium.launch(**launch_kwargs)
|
||||
context = await browser.new_context()
|
||||
|
||||
page_usdt = await context.new_page()
|
||||
page_usdc = await context.new_page()
|
||||
|
||||
logger.info('Opening pages...')
|
||||
await page_usdt.goto(USDT_URL, wait_until='domcontentloaded')
|
||||
await page_usdc.goto(USDC_URL, wait_until='domcontentloaded')
|
||||
await asyncio.sleep(2)
|
||||
logger.info('Pages opened')
|
||||
|
||||
try:
|
||||
while True:
|
||||
ts_iso = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
usdt_tradex_raw = await read_price(page_usdt)
|
||||
usdt_tradex = add_to_rate(usdt_tradex_raw, USDT_ADD_RUB)
|
||||
|
||||
usdt_freezex_raw = await read_freezex_usdt_rub()
|
||||
usdt_freezex = add_to_rate(usdt_freezex_raw, FREEZEX_ADD_RUB)
|
||||
|
||||
usdt_avg = avg_rates(usdt_tradex, usdt_freezex)
|
||||
|
||||
usdc_tradex_raw = await read_price(page_usdc)
|
||||
usdc_tradex = add_to_rate(usdc_tradex_raw, USDC_ADD_RUB)
|
||||
|
||||
status = (
|
||||
'ok'
|
||||
if usdt_avg and usdc_tradex
|
||||
else 'partial'
|
||||
if usdt_avg or usdc_tradex
|
||||
else 'failed'
|
||||
)
|
||||
logger.info(
|
||||
'tick status=%s USDT(avg)=%s USDT(tradex raw/adj)=%s/%s USDT(freezex raw/adj)=%s/%s USDC(raw/adj)=%s/%s',
|
||||
status,
|
||||
usdt_avg,
|
||||
usdt_tradex_raw,
|
||||
usdt_tradex,
|
||||
usdt_freezex_raw,
|
||||
usdt_freezex,
|
||||
usdc_tradex_raw,
|
||||
usdc_tradex,
|
||||
)
|
||||
|
||||
try:
|
||||
extra: dict[str, str] = {}
|
||||
if usdt_tradex_raw is not None:
|
||||
extra['usdt_rub_tradex_raw'] = usdt_tradex_raw
|
||||
if usdt_tradex is not None:
|
||||
extra['usdt_rub_tradex'] = usdt_tradex
|
||||
if usdt_freezex_raw is not None:
|
||||
extra['usdt_rub_freezex_raw'] = usdt_freezex_raw
|
||||
if usdt_freezex is not None:
|
||||
extra['usdt_rub_freezex'] = usdt_freezex
|
||||
if usdt_avg is not None:
|
||||
extra['usdt_rub_avg'] = usdt_avg
|
||||
|
||||
if usdc_tradex_raw is not None:
|
||||
extra['usdc_rub_tradex_raw'] = usdc_tradex_raw
|
||||
if usdc_tradex is not None:
|
||||
extra['usdc_rub_tradex'] = usdc_tradex
|
||||
|
||||
usdt_avg_ceil2 = fmt_ceil_2(usdt_avg)
|
||||
usdc_tradex_ceil2 = fmt_ceil_2(usdc_tradex)
|
||||
await write_to_redis(r, ts_iso, usdt_avg_ceil2, usdc_tradex_ceil2, extra=extra)
|
||||
await write_rate_value(r, ts_iso, usdt_avg)
|
||||
logger.info('Written to keydb')
|
||||
except Exception:
|
||||
logger.exception('KeyDB write failed')
|
||||
|
||||
await asyncio.sleep(INTERVAL_SEC)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info('Stopped by user')
|
||||
finally:
|
||||
await browser.close()
|
||||
await r.close()
|
||||
logger.info('Shutdown complete')
|
||||
|
||||
|
||||
def main() -> None:
|
||||
asyncio.run(run_loop())
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user