Tutorials

Tutorials

Tutorials

Bringing Authentication to an E-Commerce Chatbot

January 17, 2025

OAuth2.0
OAuth2.0
OAuth2.0

Implementing Authentication with Zitadel for an E-Commerce Chatbot

Before starting to read this blog post, make sure to check out the first post of the series about function tool calls.

In this second post of our series on building an e-commerce chatbot, we'll dive into implementing authentication using Zitadel and OAuth 2.0. Authentication is a critical component in ensuring that user data is accessed securely and only by authorized parties. In this use case, we leverage Zitadel's OAuth 2.0 implementation with PKCE (Proof Key for Code Exchange) to authenticate users.

Setup

For this blog post, we'll use the following setup:

version: '3.8'

services:
  psql:
    image: postgres:15
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=postgres
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  zitadel:
    image: ghcr.io/zitadel/zitadel:v2.67.1
    restart: always
    command: start-from-init --masterkeyFromEnv --tlsMode disabled
    environment:
      - ZITADEL_DATABASE_POSTGRES_HOST=psql
      - ZITADEL_DATABASE_POSTGRES_PORT=5432
      - ZITADEL_DATABASE_POSTGRES_DATABASE=${ZITADEL_DB_NAME:-zitadel}
      - ZITADEL_DATABASE_POSTGRES_ADMIN_USERNAME=${ZITADEL_DB_ADMIN_USER:-postgres}
      - ZITADEL_DATABASE_POSTGRES_ADMIN_PASSWORD=${ZITADEL_DB_ADMIN_PASSWORD:-postgres}
      - ZITADEL_DATABASE_POSTGRES_ADMIN_SSL_MODE=disable
      - ZITADEL_DATABASE_POSTGRES_USER_USERNAME=${ZITADEL_DB_USER:-zitadel_user}
      - ZITADEL_DATABASE_POSTGRES_USER_PASSWORD=${ZITADEL_DB_PASSWORD:-zitadel_password}
      - ZITADEL_DATABASE_POSTGRES_USER_SSL_MODE=disable
      - ZITADEL_EXTERNALSECURE=false
      - ZITADEL_MASTERKEY=${ZITADEL_MASTERKEY:-MasterkeyNeedsToHave32Characters}
      - ZITADEL_FIRSTINSTANCE_ORG_NAME=ZITADEL
    ports:
      - "8080:8080"
    depends_on:
      - psql

volumes:
  postgres_data:

Before we start, make sure you have a Zitadel instance running to perform the following steps at http://localhost:8080.

  1. Create a new organization

  2. Create a new project

  3. Create a new application of type WEB (PKCE)

  4. Enter the following redirect URI http://localhost:8081

  5. Note down the client id and store it in a .env file as ZITADEL_CLIENT_ID

Authentication Overview

The authentication process in our chatbot example involves several steps:

  1. Generating PKCE Pair: The process starts by generating a code verifier and a code challenge to secure the communication between the client and the authentication server.

  2. Initiating the Authorization Flow: The user is redirected to Zitadel's authentication page, where they can log in.

  3. Receiving the Authorization Code: Once the user logs in, Zitadel redirects them back to the chatbot with an authorization code.

  4. Exchanging the Authorization Code for Tokens: The chatbot exchanges the code for an ID token and an access token.

  5. Validating Tokens: The ID token and access token are validated to ensure their authenticity and integrity.

  6. Using the Tokens: The tokens are used to identify the user and enforce row-level security in the PostgreSQL database.

Implementation

Below is a breakdown of how authentication is implemented in the provided source code:

Step 1: Generating the PKCE Pair

def generate_pkce_pair():
    code_verifier = secrets.token_urlsafe(32)
    code_challenge = (
        base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest())
        .decode()
        .rstrip("=")
    )
    return code_verifier, code_challenge

This function generates a random code_verifier and a hashed code_challenge. These values are essential for ensuring that the authorization code cannot be intercepted or reused.

Step 2: Initiating the Authorization Flow

For the sake of this blog post, we'll implement the OAuth 2.0 authentication flow in a Python command line application. This is going to work as follows:

  1. The user calls the cli

  2. The user is redirected to Zitadel's login page in their browser

  3. The user logs in

  4. The user is redirected back to the cli which has started a http server in the background

  5. The cli exchanges the authorization code for tokens

  6. The cli uses the tokens to authenticate the user

def authenticate(client_id: str, zitadel_domain: str, redirect_uri: str):
    # Generate PKCE values
    code_verifier, code_challenge = generate_pkce_pair()

    # Generate authorization URL
    auth_params = {
        "client_id": client_id,
        "redirect_uri": redirect_uri,
        "response_type": "code",
        "scope": "openid profile email",
        "code_challenge": code_challenge,
        "code_challenge_method": "S256",
    }

    auth_url = f"http://{zitadel_domain}/oauth/v2/authorize?{urlencode(auth_params)}"

    # Get port from the redirect uri
    port = int(redirect_uri.split(":")[-1])

    # Start local server to receive the callback
    server = HTTPServer(("", port), AuthenticationHandler)
    server.auth_code = None
    server.should_stop = False

    # Open browser for authentication
    webbrowser.open(auth_url)

    # Wait for the callback
    while not server.should_stop:
        server.handle_request()

Step 3: Receiving the Authorization Code

A lightweight HTTP server listens for the redirect from Zitadel and captures the authorization code:

class AuthenticationHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        query_components = parse_qs(urlparse(self.path).query)

        if "code" in query_components:
            self.server.auth_code = query_components["code"][0]
            self.send_response(200)
            self.end_headers()
            self.wfile.write(b"Authentication successful! You can close this window.")
        else:
            self.send_response(400)
            self.end_headers()
            self.wfile.write(b"Authentication failed! Please try again.")

        self.server.should_stop = True

Step 4: Exchanging the Authorization Code for Tokens

Once the code is received, it is exchanged for tokens via a POST request. The rest of the authenticate function is as follows:

if not server.auth_code:
    raise Exception("No authorization code received")

# Exchange authorization code for tokens
token_url = f"http://{zitadel_domain}/oauth/v2/token"
token_data = {
    "grant_type": "authorization_code",
    "client_id": client_id,
    "code_verifier": code_verifier,
    "code": server.auth_code,
    "redirect_uri": redirect_uri,
}

response = requests.post(token_url, data=token_data)

if response.status_code != 200:
    raise Exception(f"Token exchange failed: {response.text}")

tokens = response.json()
return tokens["id_token"], tokens["access_token"]

The response contains the id token and access token upon successful authentication.

Step 5: Validating Tokens

Token validation ensures that the tokens have not been tampered with:

# Perform authentication
id_token, access_token = authenticate(client_id, zitadel_domain, redirect_uri)

oidc_config = requests.get(
    f"http://{zitadel_domain}/.well-known/openid-configuration"
).json()
jwks_client = jwt.PyJWKClient(oidc_config["jwks_uri"])
signing_algos = oidc_config["id_token_signing_alg_values_supported"]

# Validate at_hash
signing_key = jwks_client.get_signing_key_from_jwt(id_token)
data = jwt.decode_complete(
    id_token,
    key=signing_key,
    audience=client_id,
    algorithms=signing_algos,
)
payload, header = data["payload"], data["header"]

# Get the pyjwt algorithm object
alg_obj = jwt.get_algorithm_by_name(header["alg"])

# Compute at_hash, then validate
digest = alg_obj.compute_hash_digest(access_token.encode())
at_hash = (
    base64.urlsafe_b64encode(digest[: (len(digest) // 2)]).decode().rstrip("=")
)
assert at_hash == payload["at_hash"]

Main Function

Below is an example of how the main function tying everything together might look like:

def main():
    dotenv.load_dotenv()

    client_id = os.environ["ZITADEL_CLIENT_ID"]
    zitadel_domain = os.environ["ZITADEL_DOMAIN"]

    port = 8081
    redirect_uri = f"http://localhost:{port}"

    # Perform authentication
    try:
        id_token, access_token = authenticate(client_id, zitadel_domain, redirect_uri)

        oidc_config = requests.get(
            f"http://{zitadel_domain}/.well-known/openid-configuration"
        ).json()
        jwks_client = jwt.PyJWKClient(oidc_config["jwks_uri"])
        signing_algos = oidc_config["id_token_signing_alg_values_supported"]

        # Validate at_hash
        signing_key = jwks_client.get_signing_key_from_jwt(id_token)
        data = jwt.decode_complete(
            id_token,
            key=signing_key,
            audience=client_id,
            algorithms=signing_algos,
        )
        payload, header = data["payload"], data["header"]

        # Get the pyjwt algorithm object
        alg_obj = jwt.get_algorithm_by_name(header["alg"])

        # Compute at_hash, then validate
        digest = alg_obj.compute_hash_digest(access_token.encode())
        at_hash = (
            base64.urlsafe_b64encode(digest[: (len(digest) // 2)]).decode().rstrip("=")
        )
        assert at_hash == payload["at_hash"]

        # Get the user_id (=subject) from the payload
        user_id = payload["sub"]

    except Exception as e:
        logger.error("Authentication failed", error=str(e))
        return

Wrapping Up

That't it for the authentication part. In the next post, we'll see how to combine function calling with Azure OpenAI and robust user authentication by using PostgreSQL's row-level security. That way, we can ensure that the user can only access their own data when interacting with the chatbot by ensuring that the user authentication context is passed to the database.

Stay tuned!

Disclaimer

This blog post is a simplified example and does not cover all aspects that have to be considered when deploying any LLM powered application to production. In a real-world application, you would need to consider additional security measures.

Continue Reading

The latest handpicked blog articles

Switch sides. Join us.

Explore an entirely fresh approach to web development with pixfort kit.

Switch sides. Join us.

Explore an entirely fresh approach to web development with pixfort kit.

Switch sides. Join us.

Explore an entirely fresh approach to web development with pixfort kit.