Installation

pip install fastapi-jwt

Usage

This library made in fastapi style, so it can be used as standard security features

from fastapi import FastAPI, Security
from fastapi_jwt import JwtAuthorizationCredentials, JwtAccessBearer


app = FastAPI()
access_security = JwtAccessBearer(secret_key="secret_key", auto_error=True)


@app.post("/auth")
def auth():
    subject = {"username": "username", "role": "user"}
    return {"access_token": access_security.create_access_token(subject=subject)}


@app.get("/users/me")
def read_current_user(
    credentials: JwtAuthorizationCredentials = Security(access_security),
):
    return {"username": credentials["username"], "role": credentials["role"]}

examples

security_jwt_set_cookie.py

from fastapi import FastAPI, Response
from fastapi.testclient import TestClient

from fastapi_jwt import JwtAccessCookie, JwtRefreshCookie

app = FastAPI()

access_security = JwtAccessCookie(secret_key="secret_key")
refresh_security = JwtRefreshCookie(secret_key="secret_key")


@app.post("/auth")
def auth(response: Response):
    subject = {"username": "username", "role": "user"}

    access_token = access_security.create_access_token(subject=subject)
    refresh_token = access_security.create_refresh_token(subject=subject)

    access_security.set_access_cookie(response, access_token)
    refresh_security.set_refresh_cookie(response, refresh_token)

    return {"access_token": access_token, "refresh_token": refresh_token}


@app.delete("/auth")
def logout(response: Response):
    access_security.unset_access_cookie(response)
    refresh_security.unset_refresh_cookie(response)

    return {"msg": "Successful logout"}


client = TestClient(app)

openapi_schema = {
    "openapi": "3.0.2",
    "info": {"title": "FastAPI", "version": "0.1.0"},
    "paths": {
        "/auth": {
            "post": {
                "responses": {
                    "200": {
                        "description": "Successful Response",
                        "content": {"application/json": {"schema": {}}},
                    }
                },
                "summary": "Auth",
                "operationId": "auth_auth_post",
            },
            "delete": {
                "responses": {
                    "200": {
                        "description": "Successful Response",
                        "content": {"application/json": {"schema": {}}},
                    }
                },
                "summary": "Logout",
                "operationId": "logout_auth_delete",
            },
        }
    },
}


def test_openapi_schema():
    response = client.get("/openapi.json")
    assert response.status_code == 200, response.text
    assert response.json() == openapi_schema


def test_security_jwt_auth():
    response = client.post("/auth")
    assert response.status_code == 200, response.text

    assert "access_token_cookie" in response.cookies
    assert response.cookies["access_token_cookie"] == response.json()["access_token"]
    assert "refresh_token_cookie" in response.cookies
    assert response.cookies["refresh_token_cookie"] == response.json()["refresh_token"]


def test_security_jwt_logout():
    response = client.delete("/auth")
    assert response.status_code == 200, response.text

    assert "access_token_cookie" in response.headers["set-cookie"]
    assert 'access_token_cookie=""; Max-Age=-1;' in response.headers["set-cookie"]
    assert "refresh_token_cookie" in response.headers["set-cookie"]
    assert (
        'refresh_token_cookie=""; HttpOnly; Max-Age=-1'
        in response.headers["set-cookie"]
    )
    # assert "access_token_cookie" not in response.cookies
    # assert response.cookies["access_token_cookie"].max_age == -1
    # assert "refresh_token_cookie" not in response.cookies
    # assert response.cookies["refresh_token_cookie"].max_age == -1

security_jwt_multiple_places.py

from fastapi import FastAPI, Security
from fastapi.testclient import TestClient

from fastapi_jwt import JwtAccessBearerCookie, JwtAuthorizationCredentials, JwtRefreshBearerCookie

app = FastAPI()

access_security = JwtAccessBearerCookie(secret_key="secret_key")
refresh_security = JwtRefreshBearerCookie(secret_key="secret_key")


@app.post("/auth")
def auth():
    subject = {"username": "username", "role": "user"}

    access_token = access_security.create_access_token(subject=subject)
    refresh_token = access_security.create_refresh_token(subject=subject)

    return {"access_token": access_token, "refresh_token": refresh_token}


@app.post("/refresh")
def refresh(credentials: JwtAuthorizationCredentials = Security(refresh_security)):
    access_token = refresh_security.create_access_token(subject=credentials.subject)
    refresh_token = refresh_security.create_refresh_token(subject=credentials.subject)

    return {"access_token": access_token, "refresh_token": refresh_token}


@app.get("/users/me")
def read_current_user(
    credentials: JwtAuthorizationCredentials = Security(access_security),
):
    return {"username": credentials["username"], "role": credentials["role"]}


client = TestClient(app)

openapi_schema = {
    "openapi": "3.0.2",
    "info": {"title": "FastAPI", "version": "0.1.0"},
    "paths": {
        "/auth": {
            "post": {
                "responses": {
                    "200": {
                        "description": "Successful Response",
                        "content": {"application/json": {"schema": {}}},
                    }
                },
                "summary": "Auth",
                "operationId": "auth_auth_post",
            }
        },
        "/refresh": {
            "post": {
                "responses": {
                    "200": {
                        "description": "Successful Response",
                        "content": {"application/json": {"schema": {}}},
                    }
                },
                "summary": "Refresh",
                "operationId": "refresh_refresh_post",
                "security": [{"JwtRefreshBearer": []}, {"JwtRefreshCookie": []}],
            }
        },
        "/users/me": {
            "get": {
                "responses": {
                    "200": {
                        "description": "Successful Response",
                        "content": {"application/json": {"schema": {}}},
                    }
                },
                "summary": "Read Current User",
                "operationId": "read_current_user_users_me_get",
                "security": [{"JwtAccessBearer": []}, {"JwtAccessCookie": []}],
            }
        },
    },
    "components": {
        "securitySchemes": {
            "JwtAccessBearer": {"type": "http", "scheme": "bearer"},
            "JwtAccessCookie": {
                "type": "apiKey",
                "name": "access_token_cookie",
                "in": "cookie",
            },
            "JwtRefreshBearer": {"type": "http", "scheme": "bearer"},
            "JwtRefreshCookie": {
                "type": "apiKey",
                "name": "refresh_token_cookie",
                "in": "cookie",
            },
        }
    },
}


def test_openapi_schema():
    response = client.get("/openapi.json")
    assert response.status_code == 200, response.text
    assert response.json() == openapi_schema


def test_security_jwt_access_both_correct():
    access_token = client.post("/auth").json()["access_token"]

    response = client.get(
        "/users/me",
        cookies={"access_token_cookie": access_token},
        headers={"Authorization": f"Bearer {access_token}"},
    )
    assert response.status_code == 200, response.text
    assert response.json() == {"username": "username", "role": "user"}


def test_security_jwt_access_only_cookie():
    access_token = client.post("/auth").json()["access_token"]

    response = client.get("/users/me", cookies={"access_token_cookie": access_token})
    assert response.status_code == 200, response.text
    assert response.json() == {"username": "username", "role": "user"}


def test_security_jwt_access_only_bearer():
    access_token = client.post("/auth").json()["access_token"]

    response = client.get(
        "/users/me", headers={"Authorization": f"Bearer {access_token}"}
    )
    assert response.status_code == 200, response.text
    assert response.json() == {"username": "username", "role": "user"}


def test_security_jwt_access_bearer_wrong_cookie_correct():
    access_token = client.post("/auth").json()["access_token"]

    response = client.get(
        "/users/me",
        headers={"Authorization": "Bearer wrong_access_token"},
        cookies={"access_token_cookie": access_token},
    )
    assert response.status_code == 401, response.text
    assert response.json()["detail"].startswith("Wrong token:")


def test_security_jwt_access_bearer_correct_cookie_wrong():
    access_token = client.post("/auth").json()["access_token"]

    response = client.get(
        "/users/me",
        headers={"Authorization": f"Bearer {access_token}"},
        cookies={"access_token_cookie": "wrong_access_token_cookie"},
    )
    assert response.status_code == 200, response.text
    assert response.json() == {"username": "username", "role": "user"}


def test_security_jwt_access_both_no_credentials():
    response = client.get("/users/me")
    assert response.status_code == 401, response.text
    assert response.json() == {"detail": "Credentials are not provided"}


def test_security_jwt_refresh_only_cookie():
    refresh_token = client.post("/auth").json()["refresh_token"]

    response = client.post("/refresh", cookies={"refresh_token_cookie": refresh_token})
    assert response.status_code == 200, response.text


def test_security_jwt_refresh_bearer_correct_cookie_wrong():
    refresh_token = client.post("/auth").json()["refresh_token"]

    response = client.post(
        "/refresh",
        headers={"Authorization": f"Bearer {refresh_token}"},
        cookies={"refresh_token_cookie": "wrong_refresh_token_cookie"},
    )
    assert response.status_code == 200, response.text


def test_security_jwt_refresh_bearer_wrong_cookie_correct():
    refresh_token = client.post("/auth").json()["refresh_token"]

    response = client.post(
        "/refresh",
        headers={"Authorization": "Bearer wrong_refresh_token_cookie"},
        cookies={"refresh_token_cookie": refresh_token},
    )
    assert response.status_code == 401, response.text
    assert response.json()["detail"].startswith("Wrong token:")


def test_security_jwt_refresh_cookie_wrong_using_access_token():
    tokens = client.post("/auth").json()
    access_token, refresh_token = tokens["access_token"], tokens["refresh_token"]
    assert access_token != refresh_token

    response = client.post("/refresh", cookies={"refresh_token_cookie": access_token})
    assert response.status_code == 401, response.text
    assert response.json()["detail"].startswith("Wrong token: 'type' is not 'refresh'")


def test_security_jwt_refresh_both_no_credentials():
    response = client.post("/refresh")
    assert response.status_code == 401, response.text
    assert response.json() == {"detail": "Credentials are not provided"}

security_jwt_general_optional.py

import datetime
from uuid import uuid4
from typing import Optional, Set

from fastapi import FastAPI, Security
from fastapi.testclient import TestClient
from pytest_mock import MockerFixture

from fastapi_jwt import JwtAccessBearer, JwtAuthorizationCredentials, JwtRefreshBearer

app = FastAPI()

access_security = JwtAccessBearer(secret_key="secret_key", auto_error=False)
refresh_security = JwtRefreshBearer.from_other(access_security)


unique_identifiers_database: Set[str] = set()


@app.post("/auth")
def auth():
    subject = {"username": "username", "role": "user"}
    unique_identifier = str(uuid4())
    unique_identifiers_database.add(unique_identifier)

    access_token = access_security.create_access_token(
        subject=subject, unique_identifier=unique_identifier
    )
    refresh_token = access_security.create_refresh_token(subject=subject)

    return {"access_token": access_token, "refresh_token": refresh_token}


@app.post("/refresh")
def refresh(
    credentials: Optional[JwtAuthorizationCredentials] = Security(refresh_security),
):
    if credentials is None:
        return {"msg": "Create an account first"}

    unique_identifier = str(uuid4())
    unique_identifiers_database.add(unique_identifier)

    access_token = refresh_security.create_access_token(
        subject=credentials.subject, unique_identifier=unique_identifier,
    )
    refresh_token = refresh_security.create_refresh_token(subject=credentials.subject)

    return {"access_token": access_token, "refresh_token": refresh_token}


@app.get("/users/me")
def read_current_user(
    credentials: Optional[JwtAuthorizationCredentials] = Security(access_security),
):
    if credentials is None:
        return {"msg": "Create an account first"}
    return {"username": credentials["username"], "role": credentials["role"]}


@app.get("/auth/meta")
def get_token_meta(
        credentials: JwtAuthorizationCredentials = Security(access_security),
):
    if credentials is None:
        return {"msg": "Create an account first"}
    return {"jti": credentials.jti}


class _FakeDateTimeShort(datetime.datetime):  # pragma: no cover
    @staticmethod
    def now(**kwargs):
        return datetime.datetime.now() + datetime.timedelta(minutes=3)

    @staticmethod
    def utcnow(**kwargs):
        return datetime.datetime.utcnow() + datetime.timedelta(minutes=3)


class _FakeDateTimeLong(datetime.datetime):  # pragma: no cover
    @staticmethod
    def now(**kwargs):
        return datetime.datetime.now() + datetime.timedelta(days=42)

    @staticmethod
    def utcnow(**kwargs):
        return datetime.datetime.utcnow() + datetime.timedelta(days=42)


client = TestClient(app)

openapi_schema = {
    "openapi": "3.0.2",
    "info": {"title": "FastAPI", "version": "0.1.0"},
    "paths": {
        "/auth": {
            "post": {
                "responses": {
                    "200": {
                        "description": "Successful Response",
                        "content": {"application/json": {"schema": {}}},
                    }
                },
                "summary": "Auth",
                "operationId": "auth_auth_post",
            }
        },
        "/refresh": {
            "post": {
                "responses": {
                    "200": {
                        "description": "Successful Response",
                        "content": {"application/json": {"schema": {}}},
                    }
                },
                "summary": "Refresh",
                "operationId": "refresh_refresh_post",
                "security": [{"JwtRefreshBearer": []}],
            }
        },
        "/users/me": {
            "get": {
                "responses": {
                    "200": {
                        "description": "Successful Response",
                        "content": {"application/json": {"schema": {}}},
                    }
                },
                "summary": "Read Current User",
                "operationId": "read_current_user_users_me_get",
                "security": [{"JwtAccessBearer": []}],
            }
        },
        "/auth/meta": {
            "get": {
                "responses": {
                    "200": {
                        "description": "Successful Response",
                        "content": {"application/json": {"schema": {}}},
                    }
                },
                "summary": "Get Token Meta",
                "operationId": "get_token_meta_auth_meta_get",
                "security": [{"JwtAccessBearer": []}],
            }
        },
    },
    "components": {
        "securitySchemes": {
            "JwtAccessBearer": {"type": "http", "scheme": "bearer"},
            "JwtRefreshBearer": {"type": "http", "scheme": "bearer"},
        }
    },
}


def test_openapi_schema():
    response = client.get("/openapi.json")
    assert response.status_code == 200, response.text
    assert response.json() == openapi_schema


def test_security_jwt_access_token():
    access_token = client.post("/auth").json()["access_token"]

    response = client.get(
        "/users/me", headers={"Authorization": f"Bearer {access_token}"}
    )
    assert response.status_code == 200, response.text
    assert response.json() == {"username": "username", "role": "user"}


def test_security_jwt_access_token_wrong():
    response = client.get(
        "/users/me", headers={"Authorization": "Bearer wrong_access_token"}
    )
    assert response.status_code == 200, response.text
    assert response.json() == {"msg": "Create an account first"}

    response = client.get(
        "/users/me", headers={"Authorization": "Bearer wrong.access.token"}
    )
    assert response.status_code == 200, response.text
    assert response.json() == {"msg": "Create an account first"}


def test_security_jwt_access_token_changed():
    access_token = client.post("/auth").json()["access_token"]

    access_token = access_token.split(".")[0] + ".wrong." + access_token.split(".")[-1]

    response = client.get(
        "/users/me", headers={"Authorization": f"Bearer {access_token}"}
    )
    assert response.status_code == 200, response.text
    assert response.json() == {"msg": "Create an account first"}


def test_security_jwt_access_token_expiration(mocker: MockerFixture):
    access_token = client.post("/auth").json()["access_token"]

    mocker.patch("jose.jwt.datetime", _FakeDateTimeShort)  # 3 min left

    response = client.get(
        "/users/me", headers={"Authorization": f"Bearer {access_token}"}
    )
    assert response.status_code == 200, response.text
    assert response.json() == {"username": "username", "role": "user"}

    mocker.patch("jose.jwt.datetime", _FakeDateTimeLong)  # 42 days left

    response = client.get(
        "/users/me", headers={"Authorization": f"Bearer {access_token}"}
    )
    assert response.status_code == 200, response.text
    assert response.json() == {"msg": "Create an account first"}


def test_security_jwt_refresh_token():
    refresh_token = client.post("/auth").json()["refresh_token"]

    response = client.post(
        "/refresh", headers={"Authorization": f"Bearer {refresh_token}"}
    )
    assert response.status_code == 200, response.text
    assert "msg" not in response.json()


def test_security_jwt_refresh_token_wrong():
    response = client.post(
        "/refresh", headers={"Authorization": "Bearer wrong_refresh_token"}
    )
    assert response.status_code == 200, response.text
    assert response.json() == {"msg": "Create an account first"}

    response = client.post(
        "/refresh", headers={"Authorization": "Bearer wrong.refresh.token"}
    )
    assert response.status_code == 200, response.text
    assert response.json() == {"msg": "Create an account first"}


def test_security_jwt_refresh_token_using_access_token():
    tokens = client.post("/auth").json()
    access_token, refresh_token = tokens["access_token"], tokens["refresh_token"]
    assert access_token != refresh_token

    response = client.post(
        "/refresh", headers={"Authorization": f"Bearer {access_token}"}
    )
    assert response.status_code == 200, response.text
    assert response.json() == {"msg": "Create an account first"}


def test_security_jwt_refresh_token_changed():
    refresh_token = client.post("/auth").json()["refresh_token"]

    refresh_token = (
        refresh_token.split(".")[0] + ".wrong." + refresh_token.split(".")[-1]
    )

    response = client.post(
        "/refresh", headers={"Authorization": f"Bearer {refresh_token}"}
    )
    assert response.status_code == 200, response.text
    assert response.json() == {"msg": "Create an account first"}


def test_security_jwt_refresh_token_expired(mocker: MockerFixture):
    refresh_token = client.post("/auth").json()["refresh_token"]

    mocker.patch("jose.jwt.datetime", _FakeDateTimeLong)  # 42 days left

    response = client.post(
        "/refresh", headers={"Authorization": f"Bearer {refresh_token}"}
    )
    assert response.status_code == 200, response.text
    assert response.json() == {"msg": "Create an account first"}


def test_security_jwt_custom_jti():
    access_token = client.post("/auth").json()["access_token"]

    response = client.get(
        "/auth/meta", headers={"Authorization": f"Bearer {access_token}"}
    )
    assert response.status_code == 200, response.text
    assert response.json()["jti"] in unique_identifiers_database

security_jwt_bearer.py

from fastapi import FastAPI, Security
from fastapi.testclient import TestClient

from fastapi_jwt import JwtAccessBearer, JwtAuthorizationCredentials, JwtRefreshBearer

app = FastAPI()

access_security = JwtAccessBearer(secret_key="secret_key")
refresh_security = JwtRefreshBearer(secret_key="secret_key")


@app.post("/auth")
def auth():
    subject = {"username": "username", "role": "user"}

    access_token = access_security.create_access_token(subject=subject)
    refresh_token = access_security.create_refresh_token(subject=subject)

    return {"access_token": access_token, "refresh_token": refresh_token}


@app.post("/refresh")
def refresh(credentials: JwtAuthorizationCredentials = Security(refresh_security)):
    access_token = refresh_security.create_access_token(subject=credentials.subject)
    refresh_token = refresh_security.create_refresh_token(subject=credentials.subject)

    return {"access_token": access_token, "refresh_token": refresh_token}


@app.get("/users/me")
def read_current_user(
    credentials: JwtAuthorizationCredentials = Security(access_security),
):
    return {"username": credentials["username"], "role": credentials["role"]}


client = TestClient(app)

openapi_schema = {
    "openapi": "3.0.2",
    "info": {"title": "FastAPI", "version": "0.1.0"},
    "paths": {
        "/auth": {
            "post": {
                "responses": {
                    "200": {
                        "description": "Successful Response",
                        "content": {"application/json": {"schema": {}}},
                    }
                },
                "summary": "Auth",
                "operationId": "auth_auth_post",
            }
        },
        "/refresh": {
            "post": {
                "responses": {
                    "200": {
                        "description": "Successful Response",
                        "content": {"application/json": {"schema": {}}},
                    }
                },
                "summary": "Refresh",
                "operationId": "refresh_refresh_post",
                "security": [{"JwtRefreshBearer": []}],
            }
        },
        "/users/me": {
            "get": {
                "responses": {
                    "200": {
                        "description": "Successful Response",
                        "content": {"application/json": {"schema": {}}},
                    }
                },
                "summary": "Read Current User",
                "operationId": "read_current_user_users_me_get",
                "security": [{"JwtAccessBearer": []}],
            }
        },
    },
    "components": {
        "securitySchemes": {
            "JwtAccessBearer": {"type": "http", "scheme": "bearer"},
            "JwtRefreshBearer": {"type": "http", "scheme": "bearer"},
        }
    },
}


def test_openapi_schema():
    response = client.get("/openapi.json")
    assert response.status_code == 200, response.text
    assert response.json() == openapi_schema


def test_security_jwt_auth():
    response = client.post("/auth")
    assert response.status_code == 200, response.text


def test_security_jwt_access_bearer():
    access_token = client.post("/auth").json()["access_token"]

    response = client.get(
        "/users/me", headers={"Authorization": f"Bearer {access_token}"}
    )
    assert response.status_code == 200, response.text
    assert response.json() == {"username": "username", "role": "user"}


def test_security_jwt_access_bearer_wrong():
    response = client.get(
        "/users/me", headers={"Authorization": "Bearer wrong_access_token"}
    )
    assert response.status_code == 401, response.text


def test_security_jwt_access_bearer_no_credentials():
    response = client.get("/users/me")
    assert response.status_code == 401, response.text
    assert response.json() == {"detail": "Credentials are not provided"}


def test_security_jwt_access_bearer_incorrect_scheme_credentials():
    response = client.get("/users/me", headers={"Authorization": "Basic notreally"})
    assert response.status_code == 401, response.text
    assert response.json() == {"detail": "Credentials are not provided"}
    # assert response.json() == {"detail": "Invalid authentication credentials"}


def test_security_jwt_refresh_bearer():
    refresh_token = client.post("/auth").json()["refresh_token"]

    response = client.post(
        "/refresh", headers={"Authorization": f"Bearer {refresh_token}"}
    )
    assert response.status_code == 200, response.text


def test_security_jwt_refresh_bearer_wrong():
    response = client.post(
        "/refresh", headers={"Authorization": "Bearer wrong_refresh_token"}
    )
    assert response.status_code == 401, response.text


def test_security_jwt_refresh_bearer_no_credentials():
    response = client.post("/refresh")
    assert response.status_code == 401, response.text
    assert response.json() == {"detail": "Credentials are not provided"}


def test_security_jwt_refresh_bearer_incorrect_scheme_credentials():
    response = client.post("/refresh", headers={"Authorization": "Basic notreally"})
    assert response.status_code == 401, response.text
    assert response.json() == {"detail": "Credentials are not provided"}
    # assert response.json() == {"detail": "Invalid authentication credentials"}

keywords: pythonFastAPI