Queries

Queries are used to retrieve data from a database.

A query is a request for information from a database table or combination of tables. A query can be used to retrieve data from a single table or multiple tables. A query can also be used to insert, update, or delete data from a table.

Session

To execute a query you must first create a rx.session. You can use the session to query the database using SQLModel or SQLAlchemy syntax.

The rx.session statement will automatically close the session when the code block is finished. If session.commit() is not called, the changes will be rolled back and not persisted to the database. The code can also explicitly rollback without closing the session via session.rollback().

The following example shows how to create a session and query the database. First we create a table called User.

class User(rx.Model, table=True):
    username: str
    email: str

Select

Then we create a session and query the User table.

class QueryUser(rx.State):
    name: str
    users: list[User]

    @rx.event
    def get_users(self):
        with rx.session() as session:
            self.users = session.exec(
                User.select().where(
                    User.username.contains(self.name)
                )
            ).all()

The get_users method will query the database for all users that contain the value of the state var name.

Insert

Similarly, the session.add() method to add a new record to the database or persist an existing object.

class AddUser(rx.State):
    username: str
    email: str

    @rx.event
    def add_user(self):
        with rx.session() as session:
            session.add(
                User(
                    username=self.username, email=self.email
                )
            )
            session.commit()

Update

To update the user, first query the database for the object, make the desired modifications, .add the object to the session and finally call .commit().

class ChangeEmail(rx.State):
    username: str
    email: str

    @rx.event
    def modify_user(self):
        with rx.session() as session:
            user = session.exec(
                User.select().where(
                    (User.username == self.username)
                )
            ).first()
            user.email = self.email
            session.add(user)
            session.commit()

Delete

To delete a user, first query the database for the object, then call .delete() on the session and finally call .commit().

class RemoveUser(rx.State):
    username: str

    @rx.event
    def delete_user(self):
        with rx.session() as session:
            user = session.exec(
                User.select().where(
                    User.username == self.username
                )
            ).first()
            session.delete(user)
            session.commit()

ORM Object Lifecycle

The objects returned by queries are bound to the session that created them, and cannot generally be used outside that session. After adding or updating an object, not all fields are automatically updated, so accessing certain attributes may trigger additional queries to refresh the object.

To avoid this, the session.refresh() method can be used to update the object explicitly and ensure all fields are up to date before exiting the session.

class AddUserForm(rx.State):
    user: User | None = None

    @rx.event
    def add_user(self, form_data: dict[str, Any]):
        with rx.session() as session:
            self.user = User(**form_data)
            session.add(self.user)
            session.commit()
            session.refresh(self.user)

Now the self.user object will have a correct reference to the autogenerated primary key, id, even though this was not provided when the object was created from the form data.

If self.user needs to be modified or used in another query in a new session, it must be added to the session. Adding an object to a session does not necessarily create the object, but rather associates it with a session where it may either be created or updated accordingly.

class AddUserForm(rx.State):
    ...

    @rx.event
    def update_user(self, form_data: dict[str, Any]):
        if self.user is None:
            return
        with rx.session() as session:
            self.user.set(**form_data)
            session.add(self.user)
            session.commit()
            session.refresh(self.user)

If an ORM object will be referenced and accessed outside of a session, you should call .refresh() on it to avoid stale object exceptions.

Using SQL Directly

Avoiding SQL is one of the main benefits of using an ORM, but sometimes it is necessary for particularly complex queries, or when using database-specific features.

SQLModel exposes the session.execute() method that can be used to execute raw SQL strings. If parameter binding is needed, the query may be wrapped in sqlalchemy.text, which allows colon-prefix names to be used as placeholders.

Never use string formatting to construct SQL queries, as this may lead to SQL injection vulnerabilities in the app.

import sqlalchemy

import reflex as rx


class State(rx.State):
    @rx.event
    def insert_user_raw(self, username, email):
        with rx.session() as session:
            session.execute(
                sqlalchemy.text(
                    "INSERT INTO user (username, email) "
                    "VALUES (:username, :email)"
                ),
                {"username": username, "email": email},
            )
            session.commit()

    @rx.var
    def raw_user_tuples(self) -> list[list]:
        with rx.session() as session:
            return [
                list(row)
                for row in session.execute(
                    "SELECT * FROM user"
                ).all()
            ]

Async Database Operations

Reflex provides an async version of the session function called rx.asession for asynchronous database operations. This is useful when you need to perform database operations in an async context, such as within async event handlers.

The rx.asession function returns an async SQLAlchemy session that must be used with an async context manager. Most operations against the asession must be awaited.

import sqlalchemy.ext.asyncio
import sqlalchemy

import reflex as rx


class AsyncUserState(rx.State):
    users: list[User] = []

    @rx.event(background=True)
    async def get_users_async(self):
        async with rx.asession() as asession:
            result = await asession.execute(User.select())
            async with self:
                self.users = await result.all()

Async Select

The following example shows how to query the database asynchronously:

class AsyncQueryUser(rx.State):
    name: str
    users: list[User] = []

    @rx.event(background=True)
    async def get_users(self):
        async with rx.asession() as asession:
            stmt = User.select().where(
                User.username.contains(self.name)
            )
            result = await asession.execute(stmt)
            async with self:
                self.users = await result.all()

Async Insert

To add a new record to the database asynchronously:

class AsyncAddUser(rx.State):
    username: str
    email: str

    @rx.event(background=True)
    async def add_user(self):
        async with rx.asession() as asession:
            asession.add(
                User(
                    username=self.username, email=self.email
                )
            )
            await asession.commit()

Async Update

To update a user asynchronously:

class AsyncChangeEmail(rx.State):
    username: str
    email: str

    @rx.event(background=True)
    async def modify_user(self):
        async with rx.asession() as asession:
            stmt = User.select().where(
                User.username == self.username
            )
            result = await asession.execute(stmt)
            user = await result.first()
            if user:
                user.email = self.email
                asession.add(user)
                await asession.commit()

Async Delete

To delete a user asynchronously:

class AsyncRemoveUser(rx.State):
    username: str

    @rx.event(background=True)
    async def delete_user(self):
        async with rx.asession() as asession:
            stmt = User.select().where(
                User.username == self.username
            )
            result = await asession.execute(stmt)
            user = await result.first()
            if user:
                await asession.delete(user)
                await asession.commit()

Async Refresh

Similar to the regular session, you can refresh an object to ensure all fields are up to date:

class AsyncAddUserForm(rx.State):
    user: User | None = None

    @rx.event(background=True)
    async def add_user(self, form_data: dict[str, str]):
        async with rx.asession() as asession:
            async with self:
                self.user = User(**form_data)
            asession.add(self.user)
            await asession.commit()
            await asession.refresh(self.user)

Async SQL Execution

You can also execute raw SQL asynchronously:

class AsyncRawSQL(rx.State):
    users: list[list] = []

    @rx.event(background=True)
    async def insert_user_raw(self, username, email):
        async with rx.asession() as asession:
            await asession.execute(
                sqlalchemy.text(
                    "INSERT INTO user (username, email) "
                    "VALUES (:username, :email)"
                ),
                dict(username=username, email=email),
            )
            await asession.commit()

    @rx.event(background=True)
    async def get_raw_users(self):
        async with rx.asession() as asession:
            result = await asession.execute(
                "SELECT * FROM user"
            )
            async with self:
                self.users = [
                    list(row)
                    for row in (await result.all())
                ]

Built with Reflex