Build with Firebase,
write in Python.
Phirebase is a lightweight, batteries-included Python wrapper for the Firebase REST APIs — covering Authentication, Firestore, Realtime Database, and Cloud Storage with a clean, chainable interface.
pip install phirebase
What's Included
Four Firebase services, one consistent API surface. Zero boilerplate.
Authentication
Email/password auth, custom JWT tokens, OAuth, token refresh, email verification and password reset flows.
Firestore
Full CRUD with chained queries, filters, ordering, limits, subcollections, batch writes, and special field values.
Realtime Database
Get, set, push, update, delete with ordering, filtering, and live Server-Sent Events streaming.
Cloud Storage
Upload from file path or object, download, generate signed/public URLs, delete, and list blobs.
Installation
Via pip (recommended)
pip install phirebase
From source
git clone https://github.com/firoziya/phirebase.git
cd phirebase
pip install -e .
Dependencies
| Package | Purpose | Min Version |
|---|---|---|
| requests | HTTP client for all REST calls | 2.20.0 |
| google-auth | Google OAuth2 credentials | 1.6.0 |
| google-cloud-storage | Cloud Storage admin operations | 1.30.0 |
| python-jwt | Custom token generation | 3.3.0 |
| pycryptodome | RSA key operations | 3.9.0 |
| six | Python 2/3 compatibility layer | 1.12.0 |
Configuration
Basic Setup
Initialize with your Firebase project config from the Firebase Console under Project Settings → General → Your apps.
from phirebase import initialize_app
config = {
"apiKey": "AIzaSy...", # Web API key
"authDomain": "your-project.firebaseapp.com",
"databaseURL": "https://your-project.firebaseio.com",
"storageBucket": "your-project.appspot.com",
"projectId": "your-project-id"
}
firebase = initialize_app(config)
# Access any service
auth = firebase.auth()
db = firebase.firestore()
rtdb = firebase.database()
storage = firebase.storage()
With Service Account (Admin SDK)
Service accounts unlock admin-level capabilities: custom token creation, managing Storage, bypassing security rules. Download the key JSON from Project Settings → Service accounts.
# Option A — path to JSON key file
config = {
"apiKey": "AIzaSy...",
"databaseURL": "https://your-project.firebaseio.com",
"storageBucket": "your-project.appspot.com",
"projectId": "your-project-id",
"serviceAccount": "path/to/serviceAccountKey.json"
}
# Option B — dict loaded at runtime (e.g. from env)
import json, os
config["serviceAccount"] = json.loads(os.environ["FIREBASE_SA_JSON"])
firebase = initialize_app(config)
Security note: Never commit service account keys to version control. Use environment variables or a secrets manager in production.
Authentication
The Auth service wraps Firebase's Identity Toolkit REST API. Get a reference with firebase.auth(). Most methods return a dict containing idToken, refreshToken, localId, etc.
Create a New User
idToken.auth = firebase.auth()
user = auth.create_user_with_email_and_password(
"morty@c137.com",
"securePassword123"
)
print(user["idToken"]) # JWT for authenticated requests
print(user["localId"]) # Firebase user UID
print(user["refreshToken"]) # For token refresh later
Sign In
auth.current_user and returns the auth response dict.auth = firebase.auth()
try:
user = auth.sign_in_with_email_and_password(
"morty@c137.com",
"securePassword123"
)
id_token = user["idToken"]
refresh_token = user["refreshToken"]
uid = user["localId"]
print(f"Signed in as {uid}")
except Exception as e:
print(f"Auth failed: {e}")
# Retrieve account info using the token
info = auth.get_account_info(id_token)
print(info["users"][0]["email"])
Refresh an ID Token
{"userId", "idToken", "refreshToken"}.# ID tokens expire after 1 hour — refresh them
refreshed = auth.refresh(refresh_token)
new_id_token = refreshed["idToken"]
new_refresh_token = refreshed["refreshToken"]
user_id = refreshed["userId"]
Custom Token Authentication
Custom tokens require a Service Account. They let you mint tokens with arbitrary claims for server-side auth or third-party identity federation.
# Step 1: Mint the custom token (server side)
custom_token = auth.create_custom_token(
uid="user-12345",
additional_claims={
"premium": True,
"role": "editor"
}
)
# Step 2: Sign in using the custom token (client side)
user = auth.sign_in_with_custom_token(custom_token.decode("utf-8"))
id_token = user["idToken"]
Password Management
# Send a password reset email
auth.send_password_reset_email("morty@c137.com")
# After the user clicks the link, apply the new password
result = auth.verify_password_reset_code(
reset_code="oobCode-from-email-link",
new_password="newSecurePassword!"
)
print(result["email"]) # confirmed email
# Send email verification
auth.send_email_verification(id_token)
Firestore
Get a Firestore reference with firebase.firestore(). The API is designed around chained method calls: db.collection().document().get(). Under the hood, Phirebase uses the Firestore REST API for broad compatibility — no gRPC required.
CRUD Operations
Add a Document (auto-ID)
db = firebase.firestore()
# Simple document
result = db.collection("users").add({
"name": "Morty Smith",
"age": 14,
"email": "morty@c137.com",
"active": True
})
print(f"Created doc: {result['id']}")
# With nested data and arrays
result = db.collection("users").add({
"name": "Rick Sanchez",
"age": 70,
"address": {
"street": "123 Portal Lane",
"dimension": "C-137"
},
"tags": ["genius", "scientist", "grandfather"],
"created_at": db.server_timestamp()
})
print(result["id"], result["create_time"])
Set a Document (specific ID)
# Overwrites the entire document
db.collection("users").document("summer-smith").set({
"name": "Summer Smith",
"age": 17
})
# merge=True — only update fields you specify (others are preserved)
db.collection("users").document("summer-smith").set(
{"age": 18},
merge=True
)
Get Documents
# Get a single document by ID
user = db.collection("users").document("summer-smith").get()
if user:
print(user["name"], user["age"]) # Summer Smith, 18
else:
print("Document does not exist")
# Shorthand — get by ID directly
user = db.collection("users").get_by_id("summer-smith")
# Get ALL documents in a collection
all_users = db.collection("users").get_all()
for u in all_users:
print(u["id"], u.get("name"))
Update a Document
# Only the specified fields are updated; others are preserved
db.collection("users").document("summer-smith").update({
"age": 19,
"active": False
})
Delete a Document
db.collection("users").document("summer-smith").delete()
Subcollections
# Reference: users/{uid}/posts
posts = db.collection("users").document("rick-c137").subcollection("posts")
# Add to subcollection
posts.add({
"title": "Portal Gun Physics",
"content": "Wubba lubba dub dub...",
"created_at": db.server_timestamp()
})
# List all posts
all_posts = posts.get()
for post in all_posts:
print(post["title"])
Queries & Filters
All query methods return self, so they chain naturally. Call .get() at the end to execute.
# --- Basic where clause ---
adults = db.collection("users") \
.where("age", ">=", 18) \
.get()
# --- Multiple filters (AND) ---
results = db.collection("users") \
.where("age", ">=", 18) \
.where("active", "==", True) \
.get()
# --- Order + Limit ---
top_users = db.collection("users") \
.order_by("age", "DESCENDING") \
.limit(10) \
.get()
# --- Paginate with offset ---
page_2 = db.collection("users") \
.order_by("name") \
.limit(10) \
.offset(10) \
.get()
# --- Array contains ---
scientists = db.collection("users") \
.where("tags", "array_contains", "scientist") \
.get()
# --- In / Not-in ---
selected = db.collection("users") \
.where("status", "in", ["active", "pending"]) \
.get()
# --- array_contains_any ---
tagged = db.collection("posts") \
.where("tags", "array_contains_any", ["python", "firebase"]) \
.get()
# --- Collection group: query across all 'posts' subcollections ---
all_posts = db.collection_group("posts") \
.where("published", "==", True) \
.get()
# --- Combined: price filter + category + sort ---
cheap_electronics = db.collection("products") \
.where("price", "<=", 100) \
.where("category", "==", "electronics") \
.order_by("price", "DESCENDING") \
.limit(5) \
.get()
for product in cheap_electronics:
print(f"{product['name']} — ${product['price']}")
Supported Operators
Special Field Values
These sentinel values are resolved server-side by Firestore.
# Server-side timestamp (current time at write)
db.collection("posts").add({
"title": "Hello Phirebase",
"created_at": db.server_timestamp()
})
# Increment a numeric field atomically
db.collection("users").document("rick-c137").update({
"login_count": db.increment(1),
"balance": db.increment(-50.0)
})
# Add unique elements to an array
db.collection("users").document("rick-c137").update({
"tags": db.array_union("genius", "wanted")
})
# Remove specific elements from an array
db.collection("users").document("rick-c137").update({
"tags": db.array_remove("wanted")
})
# Delete a field entirely
db.collection("users").document("rick-c137").update({
"temporary_field": db.delete_field_value()
})
Batch Writes
Batch writes are atomic — either all operations succeed, or none do. Maximum 500 operations per batch (Firebase limit).
batch = db.batch()
# Queue multiple operations
batch.set("users", "user-001", {"name": "Morty", "age": 14})
batch.set("users", "user-002", {"name": "Summer", "age": 17})
batch.update("users", "user-003", {"age": 71})
batch.delete("users", "user-inactive-99")
batch.set("posts", "post-001", {"title": "First Post"})
# Commit atomically — all or nothing
result = batch.commit()
print(result) # {"writeResults": [...]}
Authenticated Firestore Requests
# Pass idToken to respect Firestore security rules
user = auth.sign_in_with_email_and_password("u@example.com", "pw")
token = user["idToken"]
result = db.collection("private_data").get(token=token)
db.collection("posts").add({"body": "..."}, token=token)
Realtime Database
Get a reference with firebase.database(). Paths are built with .child() calls. The Realtime Database stores JSON and syncs in milliseconds. Streaming is supported via Server-Sent Events.
CRUD Operations
rtdb = firebase.database()
# ── SET — write/overwrite data at a path ──
rtdb.child("users").child("user-001").set({
"name": "Morty Smith",
"age": 14,
"score": 42.5
})
# ── PUSH — append with auto-generated key ──
result = rtdb.child("messages").push({
"text": "Wubba lubba dub dub!",
"sender": "rick",
"timestamp": {"serverTimestamp": {}}
})
auto_key = result["name"] # e.g. "-Mx2sKL9pQrT8YhN"
# ── GET — read data ──
user = rtdb.child("users").child("user-001").get()
data = user.val() # OrderedDict
print(data["name"])
# ── UPDATE — patch specific fields ──
rtdb.child("users").child("user-001").update({
"age": 15,
"score": 99
})
# ── REMOVE — delete node ──
rtdb.child("users").child("user-inactive").remove()
# ── GENERATE KEY — without writing ──
key = rtdb.generate_key()
print(key) # "-Mx2sKL9pQrT8YhN3vWb"
Reading Responses
all_users = rtdb.child("users").get()
# .val() — full OrderedDict of data
print(all_users.val())
# .key() — the path segment queried on
print(all_users.key()) # "users"
# .each() — iterate child Pyre objects
for item in all_users.each():
print(item.key(), item.val())
Querying
# Order by auto-generated key, take first 10
results = rtdb.child("messages") \
.order_by_key() \
.limit_to_first(10) \
.get()
# Order by value (for flat numeric data)
top_scores = rtdb.child("scores") \
.order_by_value() \
.limit_to_last(5) \
.get()
# Order by a child field, with range
adults = rtdb.child("users") \
.order_by_child("age") \
.start_at(18) \
.end_at(65) \
.get()
# Equal to filter
morty = rtdb.child("users") \
.order_by_child("name") \
.equal_to("Morty Smith") \
.get()
# Shallow query — returns only the keys, not values
keys = rtdb.child("users").shallow().get()
for key in keys.val():
print(key)
# Combine multiple constraints
page = rtdb.child("posts") \
.order_by_child("published_at") \
.limit_to_last(20) \
.get()
for post in page.each():
print(post.key(), post.val()["title"])
Real-time Streaming
Phirebase uses Server-Sent Events (SSE) to stream changes in a background thread. Your handler is called for every database event.
import time
def on_message(message):
event = message.get("event") # "put" | "patch" | "cancel"
path = message.get("path")
data = message.get("data")
if event == "put":
print(f"[PUT] {path} → {data}")
elif event == "patch":
print(f"[PATCH] {path} → {data}")
# Start listening — runs in a daemon thread
stream = rtdb.child("notifications").stream(
on_message,
stream_id="notif-listener" # optional, appears in msg dict
)
# Write something to trigger the handler
rtdb.child("notifications").push({
"message": "Hello from Phirebase!",
"read": False
})
time.sleep(10)
# Stop the stream cleanly
stream.close()
The initial put event delivers the current snapshot of the path. Subsequent events are incremental changes. The handler is called from a separate thread — use thread-safe data structures if sharing state.
Authenticated RTDB Requests
user = auth.sign_in_with_email_and_password("u@example.com", "pw")
token = user["idToken"]
# Pass token to respect security rules
data = rtdb.child("protected_path").get(token=token)
rtdb.child("protected_path").set({"key": "val"}, token=token)
Cloud Storage
Get a reference with firebase.storage(). Paths are built with .child(). When a Service Account is configured, operations use the google-cloud-storage client. Otherwise, they fall back to Firebase Storage REST.
Upload Files
storage = firebase.storage()
# Upload from local file path
storage.child("images/avatar.jpg").put("./local/avatar.jpg")
# Upload from a file object (binary mode)
with open("report.pdf", "rb") as f:
storage.child("documents/report.pdf").put(f)
# Upload from in-memory bytes
import io
data = io.BytesIO(b"Hello, Firebase!")
storage.child("text/hello.txt").put(data)
# Upload with user auth token (respects Storage rules)
storage.child("private/secret.pdf").put(
"secret.pdf",
token=user_id_token
)
Download Files
# Download to a local path
storage.child("images/avatar.jpg").download("./downloads/avatar.jpg")
# Download a private file with auth token
storage.child("private/secret.pdf").download(
"secret.pdf",
token=user_id_token
)
Get Download URL
# Public URL (file must have public read rules)
url = storage.child("images/avatar.jpg").get_url()
print(url)
# https://firebasestorage.googleapis.com/v0/b/project.appspot.com/o/images%2Favatar.jpg?alt=media
# Authenticated URL
url = storage.child("private/file.txt").get_url(token=user_id_token)
print(url)
# ...?alt=media&token=eyJhbGci...
Delete & List Files
delete() and list_files() require a Service Account to be configured.
# Delete a file (requires service account)
storage.delete("images/old-avatar.jpg")
# List all files in the bucket (requires service account)
files = storage.list_files()
for blob in files:
print(blob.name, blob.size, blob.time_created)
API Reference
initialize_app(config)
| Config Key | Type | Required | Description |
|---|---|---|---|
| apiKey | str | required | Firebase Web API key |
| authDomain | str | optional | Auth domain (project.firebaseapp.com) |
| databaseURL | str | optional | RTDB URL for Realtime Database |
| storageBucket | str | optional | GCS bucket (project.appspot.com) |
| projectId | str | optional | Firebase project ID (auto-detected) |
| serviceAccount | str | dict | admin | Path to JSON key or key dict |
Auth Methods
| Method | Returns | Description |
|---|---|---|
| create_user_with_email_and_password(email, password) | dict | Register new user |
| sign_in_with_email_and_password(email, password) | dict | Sign in, sets current_user |
| create_custom_token(uid, additional_claims) | bytes | Mint JWT (admin only) |
| sign_in_with_custom_token(token) | dict | Sign in with custom JWT |
| refresh(refresh_token) | dict | Refresh expired ID token |
| get_account_info(id_token) | dict | Fetch user profile data |
| send_email_verification(id_token) | dict | Trigger verification email |
| send_password_reset_email(email) | dict | Send reset link |
| verify_password_reset_code(code, new_password) | dict | Apply new password |
Firestore Methods
| Method | Returns | Description |
|---|---|---|
| collection(name) | Firestore | Reference a collection |
| document(id) | Firestore | Reference a document |
| subcollection(name) | Firestore | Reference a subcollection |
| collection_group(name) | Firestore | Cross-collection query |
| add(data, token?) | dict | Add doc with auto-ID |
| set(data, merge?, token?) | dict | Set or overwrite doc |
| get(token?) | dict | list | Get doc(s) or run query |
| get_all(token?) | list | Get all collection docs |
| get_by_id(id, token?) | dict | Get specific doc by ID |
| update(data, token?) | dict | Update specific fields |
| delete(token?) | dict | Delete the document |
| where(field, op, value) | Firestore | Add filter condition |
| order_by(field, direction?) | Firestore | Set sort order |
| limit(count) | Firestore | Limit result count |
| offset(count) | Firestore | Skip results |
| batch() | BatchHelper | Create batch writer |
| server_timestamp() | dict | Sentinel: server time |
| array_union(*values) | dict | Sentinel: add to array |
| array_remove(*values) | dict | Sentinel: remove from array |
| increment(value) | dict | Sentinel: atomic increment |
| delete_field_value() | dict | Sentinel: delete field |
Database Methods
| Method | Returns | Description |
|---|---|---|
| child(*args) | Database | Append to path |
| set(data, token?) | dict | Write at current path |
| push(data, token?) | dict | Append with auto-key |
| update(data, token?) | dict | Patch specific fields |
| get(token?) | PyreResponse | Read data |
| remove(token?) | None | Delete node |
| generate_key() | str | Generate unique key |
| stream(handler, token?, stream_id?) | Stream | Start SSE listener |
| order_by_key() | Database | Sort by auto-key |
| order_by_value() | Database | Sort by value |
| order_by_child(field) | Database | Sort by child field |
| start_at(value) | Database | Range start |
| end_at(value) | Database | Range end |
| equal_to(value) | Database | Exact match filter |
| limit_to_first(n) | Database | First n results |
| limit_to_last(n) | Database | Last n results |
| shallow() | Database | Keys-only query |
Storage Methods
| Method | Returns | Description |
|---|---|---|
| child(*args) | Storage | Append to path |
| put(file, token?) | dict | Upload file |
| download(filename, token?) | None | Download to disk |
| get_url(token?) | str | Get media URL |
| delete(name) | None | Delete blob (admin) |
| list_files() | Iterator | List all blobs (admin) |
Error Handling
All methods raise requests.exceptions.HTTPError on non-2xx responses. The error message includes the full response body, making it easy to inspect the Firebase error code.
from requests.exceptions import HTTPError
import json
try:
user = auth.sign_in_with_email_and_password("u@example.com", "bad-pw")
except HTTPError as e:
# e.args[1] is the raw JSON response body
body = json.loads(e.args[1])
code = body["error"]["message"] # "INVALID_PASSWORD"
print(f"Firebase error: {code}")
except ValueError as e:
# Raised for invalid arguments (missing collection, bad path, etc.)
print(f"Usage error: {e}")
except Exception as e:
print(f"Unexpected: {e}")
Common Firebase Error Codes
| Error Code | Meaning |
|---|---|
| EMAIL_NOT_FOUND | No user with that email address |
| INVALID_PASSWORD | Wrong password |
| USER_DISABLED | The account has been disabled |
| EMAIL_EXISTS | Email already registered |
| PERMISSION_DENIED | Security rules denied the operation |
| TOKEN_EXPIRED | ID token has expired — refresh it |
| INVALID_REFRESH_TOKEN | The refresh token is malformed or revoked |
Complete Application Example
A full end-to-end script using every major service together.
from phirebase import initialize_app
from requests.exceptions import HTTPError
import time, json
# ── 1. INITIALIZE ─────────────────────────────────────────
config = {
"apiKey": "AIzaSy...",
"authDomain": "myapp.firebaseapp.com",
"databaseURL": "https://myapp.firebaseio.com",
"storageBucket": "myapp.appspot.com",
"projectId": "myapp",
"serviceAccount": "serviceAccountKey.json"
}
firebase = initialize_app(config)
# ── 2. AUTHENTICATION ─────────────────────────────────────
auth = firebase.auth()
try:
user = auth.sign_in_with_email_and_password(
"rick@c137.com", "Wubba1ubba!"
)
id_token = user["idToken"]
refresh_token = user["refreshToken"]
uid = user["localId"]
print(f"✓ Signed in: {uid}")
except HTTPError:
# New user — register them
user = auth.create_user_with_email_and_password(
"rick@c137.com", "Wubba1ubba!"
)
auth.send_email_verification(user["idToken"])
id_token = user["idToken"]
uid = user["localId"]
print(f"✓ Registered: {uid}")
# ── 3. FIRESTORE ──────────────────────────────────────────
db = firebase.firestore()
# Create a user profile doc
db.collection("users").document(uid).set({
"name": "Rick Sanchez",
"email": "rick@c137.com",
"dimension": "C-137",
"iq": 9999,
"tags": ["genius", "scientist"],
"created_at": db.server_timestamp()
})
# Add a post
post = db.collection("posts").add({
"title": "Portal Gun Theory",
"content": "Morty, you have to get those seeds...",
"author": uid,
"likes": 0,
"published": True,
"tags": ["science", "portal"],
"created_at": db.server_timestamp()
})
post_id = post["id"]
print(f"✓ Post created: {post_id}")
# Increment likes atomically
db.collection("posts").document(post_id).update({
"likes": db.increment(1),
"tags": db.array_union("trending")
})
# Query published posts
results = db.collection("posts") \
.where("published", "==", True) \
.order_by("likes", "DESCENDING") \
.limit(3) \
.get()
print("\nTop posts:")
for p in results:
print(f" [{p['likes']}♥] {p['title']}")
# Batch: update profile + add activity log
batch = db.batch()
batch.update("users", uid, {"last_active": "today"})
batch.set("activity", f"act-{uid[:8]}", {
"user_id": uid, "action": "posted"
})
batch.commit()
# ── 4. REALTIME DATABASE ──────────────────────────────────
rtdb = firebase.database()
def on_notification(msg):
if msg.get("data") and msg["data"] != None:
print(f" [RTDB] {msg['event']}: {msg['data']}")
stream = rtdb.child("notifications").child(uid).stream(on_notification)
rtdb.child("notifications").child(uid).push({
"text": "Your post went live!",
"post_id": post_id,
"read": False
})
time.sleep(2)
stream.close()
# ── 5. STORAGE ────────────────────────────────────────────
storage = firebase.storage()
with open("portal-gun-blueprint.jpg", "rb") as f:
storage.child(f"users/{uid}/avatar.jpg").put(f)
url = storage.child(f"users/{uid}/avatar.jpg").get_url()
print(f"\n✓ Avatar URL: {url}")
# Update Firestore doc with avatar URL
db.collection("users").document(uid).update({"avatar_url": url})
print("\n✓ All done! Wubba lubba dub dub!")
For production deployments, wrap the entire initialization block in a try/except, validate config keys at startup, and use environment variables for all secrets.