- [nexus] Development

GitOrigin-RevId: d17172248b2b9b056335f96d7fcfa155c281f10b
This commit is contained in:
the-superpirate 2022-09-13 18:50:31 +03:00
parent 971cdf6245
commit 030743860e
4 changed files with 7 additions and 8 deletions

View File

@ -92,7 +92,6 @@ class AioPostgresPoolHolder(AioThing):
row_factory=tuple_row,
cursor_name: Optional[str] = None,
itersize: Optional[int] = None,
statement_timeout: Optional[int] = None,
):
if not self.pool:
raise RuntimeError('AioPostgresPoolHolder has not been started')
@ -100,9 +99,7 @@ class AioPostgresPoolHolder(AioThing):
async with conn.cursor(name=cursor_name, row_factory=row_factory) as cur:
if itersize is not None:
cur.itersize = itersize
await cur.execute(stmt + ';' if statement_timeout else '', values)
if statement_timeout:
await cur.execute(f'SET statement_timeout = {statement_timeout};')
await cur.execute(stmt, values)
async for row in cur:
yield row

View File

@ -72,10 +72,10 @@ class PostgresJob(BaseJob):
async def iterator(self) -> AsyncIterable[Any]:
session_id = generate_request_id()
await self.init_tables(session_id)
# await self.init_tables(session_id)
if self.batch_size:
loaded = True
current = 0
current = 168_000_000
while loaded:
loaded = False
sql = self.sql.format(left=current, right=current + self.batch_size)
@ -85,7 +85,6 @@ class PostgresJob(BaseJob):
# Mandatory for server side cursor
cursor_name='nexus_ingest_cursor',
itersize=50_000,
statement_timeout=3600 * 2,
):
loaded = True
yield row

View File

@ -55,6 +55,7 @@ h11==0.13.0
idna==3.3
iniconfig==1.1.1
isort==5.10.1
izihawa-configurator==1.0.0
izihawa-nlptools==1.1.9
izihawa-types==0.1.3
izihawa-utils==1.0.7

View File

@ -31,8 +31,10 @@ fire
flake8
grpcio
isort
izihawa-utils
izihawa-configurator
izihawa-nlptools
izihawa-types
izihawa-utils
jinja2
lemminflect
lightgbm