Open source TOTP (#1677)

This commit is contained in:
Shuchang Zheng
2025-01-30 14:06:22 +08:00
committed by GitHub
parent 04e2a2b063
commit d522c579c6
4 changed files with 93 additions and 0 deletions

View File

@@ -19,6 +19,7 @@ from skyvern.forge.sdk.core.skyvern_context import SkyvernContext
from skyvern.forge.sdk.db.exceptions import NotFoundError
from skyvern.forge.sdk.routes.agent_protocol import base_router, v2_router
from skyvern.forge.sdk.routes.streaming import websocket_router
from skyvern.forge.sdk.routes.totp import totp_router
LOG = structlog.get_logger()
@@ -49,6 +50,7 @@ def get_agent_app() -> FastAPI:
app.include_router(base_router, prefix="/api/v1")
app.include_router(v2_router, prefix="/api/v2")
app.include_router(websocket_router, prefix="/api/v1/stream")
app.include_router(totp_router, prefix="/api/v1/totp")
app.add_middleware(
RawContextMiddleware,

View File

@@ -0,0 +1,19 @@
You receive either an email or a text message containing 2FA/MFA code or activation key. Your job is to parse the content, identify the code and return the code. There should be only one code in the content. The code must be from the content
The most common form of code will be a series of digits, although sometimes it may contain letters.
"Here is your code: 123456" or "Your code is: 123456" or "123456 is your code", "Here is your activation key: abcdefg", "Here is your key: 123asd" are some examples of the possible content.
MAKE SURE YOU OUTPUT VALID JSON. No text before or after JSON, no trailing commas, no comments (//), no unnecessary quotes, etc.
Reply in the following JSON format:
{
"reasoning": str, // How you figure out what the code is or why the code is missing. Be precise here to explain the data source and the context that makes you believe where the correct code is
"code_found": bool, // true if the code is found. false if the code is not found
"code": str, // the 2FA/MFA verification code. If you cannot identifiy any code, do not come up with a code and return null
}
Received Content containing 2FA/MFA code:
```
{{ content }}
```

View File

@@ -1924,6 +1924,33 @@ class AgentDB:
totp_code = (await session.scalars(query)).all()
return [TOTPCode.model_validate(totp_code) for totp_code in totp_code]
async def create_totp_code(
self,
organization_id: str,
totp_identifier: str,
content: str,
code: str,
task_id: str | None = None,
workflow_id: str | None = None,
source: str | None = None,
expired_at: datetime | None = None,
) -> TOTPCode:
async with self.Session() as session:
new_totp_code = TOTPCodeModel(
organization_id=organization_id,
totp_identifier=totp_identifier,
content=content,
code=code,
task_id=task_id,
workflow_id=workflow_id,
source=source,
expired_at=expired_at,
)
session.add(new_totp_code)
await session.commit()
await session.refresh(new_totp_code)
return TOTPCode.model_validate(new_totp_code)
async def create_action(self, action: Action) -> Action:
async with self.Session() as session:
new_action = ActionModel(

View File

@@ -0,0 +1,45 @@
import structlog
from fastapi import APIRouter, Depends, HTTPException
from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.schemas.organizations import Organization
from skyvern.forge.sdk.schemas.totp_codes import TOTPCode, TOTPCodeCreate
from skyvern.forge.sdk.services import org_auth_service
LOG = structlog.get_logger()
totp_router = APIRouter()
@totp_router.post("")
@totp_router.post("/", include_in_schema=False)
async def save_totp_code(
data: TOTPCodeCreate, curr_org: Organization = Depends(org_auth_service.get_current_org)
) -> TOTPCode:
LOG.info(
"Saving TOTP code",
data=data,
organization_id=curr_org.organization_id,
totp_identifier=data.totp_identifier,
task_id=data.task_id,
workflow_id=data.workflow_id,
)
code = await parse_totp_code(data.content)
if not code:
raise HTTPException(status_code=400, detail="Failed to parse totp code")
return await app.DATABASE.create_totp_code(
organization_id=curr_org.organization_id,
totp_identifier=data.totp_identifier,
content=data.content,
code=code,
task_id=data.task_id,
workflow_id=data.workflow_id,
source=data.source,
expired_at=data.expired_at,
)
async def parse_totp_code(content: str) -> str | None:
prompt = prompt_engine.load_prompt("parse-verification-code", content=content)
code_resp = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt)
return code_resp.get("code", None)