Retrieving Data for Reporting from Google Workspace: Python SDK, API, and CLI Comparison
READER BEWARE: THE FOLLOWING WRITTEN ENTIRELY BY AI WITHOUT HUMAN EDITING.
Introduction
Google Workspace (formerly G Suite) provides powerful APIs and tools for extracting configuration and analytics data for reporting purposes. Whether you’re managing user accounts, monitoring service usage, analyzing email traffic, or auditing security settings, Google Workspace offers multiple approaches to retrieve this data programmatically.
This comprehensive guide explores three primary methods for retrieving Google Workspace data:
- Google Workspace Python SDK - Official Python client libraries
- Google Workspace Admin API - Direct REST API calls
- gcloud CLI - Command-line interface tools
We’ll compare authentication mechanisms, use cases, pros and cons, and provide practical examples for each approach.
Why Retrieve Google Workspace Data Programmatically?
Common Use Cases
- User Management Reports: Track user creation, deletion, and status changes
- License Compliance: Monitor license allocation and usage across the organization
- Security Auditing: Extract login attempts, 2FA status, and security events
- Usage Analytics: Analyze Gmail, Drive, Calendar, and Meet usage patterns
- Configuration Auditing: Document organizational settings for compliance
- Cost Optimization: Identify unused licenses and optimize subscription costs
- Automated Reporting: Generate regular status reports for management and stakeholders
Authentication Overview
All three methods require authentication with Google Workspace. The authentication mechanism varies slightly but generally involves:
- Service Account (recommended for automation) - Credentials without user interaction
- OAuth 2.0 (for user-delegated access) - User grants permission to application
- API Keys (limited use) - For some public data access only
Setting Up Google Cloud Project
Before using any method, you need a Google Cloud Project:
- Go to Google Cloud Console
- Create a new project or select existing one
- Enable required APIs (Admin SDK, Reports API, etc.)
- Create credentials (Service Account or OAuth 2.0 Client ID)
Method 1: Google Workspace Python SDK
The Python SDK provides idiomatic Python interfaces to Google Workspace APIs, handling authentication, pagination, and error handling.
Installation
# Install Google Workspace Admin SDK
pip install google-api-python-client google-auth-httplib2 google-auth-oauthlib
# For specific services
pip install google-api-python-client google-auth
Authentication with Python SDK
Option A: Service Account (Recommended for Automation)
Setup:
- Create a Service Account in Google Cloud Console
- Download JSON key file
- Enable Domain-Wide Delegation for the Service Account
- In Google Workspace Admin Console, authorize the service account with required scopes
from google.oauth2 import service_account
from googleapiclient.discovery import build
# Service Account credentials
SCOPES = [
'https://www.googleapis.com/auth/admin.directory.user.readonly',
'https://www.googleapis.com/auth/admin.reports.audit.readonly',
'https://www.googleapis.com/auth/admin.reports.usage.readonly'
]
SERVICE_ACCOUNT_FILE = 'path/to/service-account-key.json'
DELEGATED_USER_EMAIL = 'admin@example.com' # Admin user to impersonate
def get_admin_service():
"""Create Admin SDK service with service account"""
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
scopes=SCOPES
)
# Delegate to admin user
delegated_credentials = credentials.with_subject(DELEGATED_USER_EMAIL)
service = build('admin', 'directory_v1', credentials=delegated_credentials)
return service
def get_reports_service():
"""Create Reports API service with service account"""
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
scopes=SCOPES
)
delegated_credentials = credentials.with_subject(DELEGATED_USER_EMAIL)
service = build('admin', 'reports_v1', credentials=delegated_credentials)
return service
Option B: OAuth 2.0 (User Consent)
Setup:
- Create OAuth 2.0 Client ID in Google Cloud Console
- Download credentials JSON file
- Run authentication flow once to get refresh token
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import pickle
import os
SCOPES = ['https://www.googleapis.com/auth/admin.directory.user.readonly']
def get_oauth_credentials():
"""Authenticate using OAuth 2.0 flow"""
creds = None
# Token file stores access and refresh tokens
if os.path.exists('token.pickle'):
with open('token.pickle', 'rb') as token:
creds = pickle.load(token)
# If no valid credentials, run OAuth flow
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# Save credentials for future use
with open('token.pickle', 'wb') as token:
pickle.dump(creds, token)
return creds
def get_admin_service_oauth():
"""Create Admin SDK service with OAuth"""
creds = get_oauth_credentials()
service = build('admin', 'directory_v1', credentials=creds)
return service
Python SDK Examples
Example 1: List All Users in Organization
def list_all_users(service):
"""Retrieve all users from Google Workspace"""
users = []
page_token = None
try:
while True:
results = service.users().list(
customer='my_customer',
maxResults=500,
orderBy='email',
pageToken=page_token
).execute()
users.extend(results.get('users', []))
page_token = results.get('nextPageToken')
if not page_token:
break
return users
except Exception as e:
print(f"Error fetching users: {e}")
return []
# Usage
service = get_admin_service()
users = list_all_users(service)
print(f"Total users: {len(users)}")
for user in users[:5]: # Print first 5
print(f" {user['primaryEmail']}: {user['name']['fullName']}")
Example 2: Generate User Status Report
def generate_user_status_report(service, output_file='user_report.csv'):
"""Generate comprehensive user status report"""
import csv
from datetime import datetime
users = list_all_users(service)
with open(output_file, 'w', newline='') as csvfile:
fieldnames = [
'Email', 'Full Name', 'Status', 'Admin',
'Suspended', '2FA Enrolled', 'Created Date',
'Last Login', 'Org Unit'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for user in users:
writer.writerow({
'Email': user.get('primaryEmail', ''),
'Full Name': user.get('name', {}).get('fullName', ''),
'Status': 'Active' if not user.get('suspended', False) else 'Suspended',
'Admin': 'Yes' if user.get('isAdmin', False) else 'No',
'Suspended': 'Yes' if user.get('suspended', False) else 'No',
'2FA Enrolled': 'Yes' if user.get('isEnrolledIn2Sv', False) else 'No',
'Created Date': user.get('creationTime', '')[:10],
'Last Login': user.get('lastLoginTime', 'Never')[:10] if user.get('lastLoginTime') else 'Never',
'Org Unit': user.get('orgUnitPath', '/')
})
print(f"Report generated: {output_file}")
# Usage
service = get_admin_service()
generate_user_status_report(service)
Example 3: Audit Login Activity
def get_login_audit_report(service, days=7):
"""Get login audit logs for the past N days"""
from datetime import datetime, timedelta
# Calculate start time
start_time = (datetime.now() - timedelta(days=days)).isoformat() + 'Z'
activities = []
page_token = None
try:
while True:
results = service.activities().list(
userKey='all',
applicationName='login',
maxResults=1000,
startTime=start_time,
pageToken=page_token
).execute()
activities.extend(results.get('items', []))
page_token = results.get('nextPageToken')
if not page_token:
break
return activities
except Exception as e:
print(f"Error fetching login activities: {e}")
return []
def analyze_login_activities(activities):
"""Analyze login activity data"""
successful_logins = 0
failed_logins = 0
suspicious_logins = []
for activity in activities:
event_name = activity.get('events', [{}])[0].get('name', '')
if event_name == 'login_success':
successful_logins += 1
elif event_name == 'login_failure':
failed_logins += 1
elif event_name in ['suspicious_login', 'account_disabled_suspicious_activity']:
suspicious_logins.append(activity)
print(f"Login Analysis (last 7 days):")
print(f" Successful logins: {successful_logins}")
print(f" Failed logins: {failed_logins}")
print(f" Suspicious activities: {len(suspicious_logins)}")
if suspicious_logins:
print("\nSuspicious Login Details:")
for activity in suspicious_logins[:10]: # Show first 10
actor = activity.get('actor', {}).get('email', 'Unknown')
time = activity.get('id', {}).get('time', 'Unknown')
print(f" {time}: {actor}")
# Usage
reports_service = get_reports_service()
activities = get_login_audit_report(reports_service)
analyze_login_activities(activities)
Example 4: Extract Usage Statistics
def get_user_usage_statistics(service, user_email, date='2025-11-18'):
"""Get usage statistics for a specific user"""
try:
result = service.userUsageReport().get(
userKey=user_email,
date=date
).execute()
usage_reports = result.get('usageReports', [])
if usage_reports:
report = usage_reports[0]
parameters = report.get('parameters', [])
print(f"Usage statistics for {user_email} on {date}:")
for param in parameters:
name = param.get('name', '')
value = param.get('intValue', param.get('boolValue', param.get('datetimeValue', 'N/A')))
print(f" {name}: {value}")
else:
print(f"No usage data available for {user_email} on {date}")
except Exception as e:
print(f"Error fetching usage statistics: {e}")
# Usage
reports_service = get_reports_service()
get_user_usage_statistics(reports_service, 'user@example.com')
Pros and Cons of Python SDK
Pros:
- ✅ Idiomatic Python interface with type hints
- ✅ Built-in pagination handling
- ✅ Automatic retry logic for transient errors
- ✅ Extensive documentation and examples
- ✅ Active maintenance and updates
- ✅ Handles OAuth token refresh automatically
- ✅ Type safety and IDE autocomplete support
Cons:
- ❌ Requires installing additional dependencies
- ❌ Learning curve for SDK-specific patterns
- ❌ Some advanced API features may lag behind REST API
- ❌ Larger memory footprint for simple tasks
- ❌ Versioning complexity across different Google services
Method 2: Google Workspace Admin API (Direct REST)
Using REST API directly provides maximum flexibility and control, useful when SDK isn’t available or you need bleeding-edge features.
Authentication with REST API
Service Account Authentication
import requests
from google.oauth2 import service_account
from google.auth.transport.requests import Request
SERVICE_ACCOUNT_FILE = 'path/to/service-account-key.json'
SCOPES = ['https://www.googleapis.com/auth/admin.directory.user.readonly']
DELEGATED_USER = 'admin@example.com'
def get_access_token():
"""Get access token for REST API calls"""
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
scopes=SCOPES
)
delegated_credentials = credentials.with_subject(DELEGATED_USER)
# Refresh to get access token
auth_request = Request()
delegated_credentials.refresh(auth_request)
return delegated_credentials.token
# Get token
access_token = get_access_token()
# Use in API calls
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
OAuth 2.0 Authentication
import requests
from requests_oauthlib import OAuth2Session
# OAuth configuration
CLIENT_ID = 'your-client-id.apps.googleusercontent.com'
CLIENT_SECRET = 'your-client-secret'
REDIRECT_URI = 'http://localhost:8080'
AUTHORIZATION_BASE_URL = 'https://accounts.google.com/o/oauth2/v2/auth'
TOKEN_URL = 'https://oauth2.googleapis.com/token'
SCOPES = ['https://www.googleapis.com/auth/admin.directory.user.readonly']
def get_oauth_token():
"""Get OAuth access token"""
oauth = OAuth2Session(CLIENT_ID, redirect_uri=REDIRECT_URI, scope=SCOPES)
# Get authorization URL
authorization_url, state = oauth.authorization_url(
AUTHORIZATION_BASE_URL,
access_type='offline',
prompt='consent'
)
print(f"Please visit this URL to authorize: {authorization_url}")
authorization_response = input("Enter the full callback URL: ")
# Fetch token
token = oauth.fetch_token(
TOKEN_URL,
authorization_response=authorization_response,
client_secret=CLIENT_SECRET
)
return token['access_token']
REST API Examples
Example 1: List Users via REST API
import requests
def list_users_rest(access_token, domain='example.com'):
"""List all users using REST API"""
base_url = 'https://admin.googleapis.com/admin/directory/v1/users'
headers = {'Authorization': f'Bearer {access_token}'}
all_users = []
params = {
'customer': 'my_customer',
'maxResults': 500,
'orderBy': 'email'
}
while True:
response = requests.get(base_url, headers=headers, params=params)
if response.status_code != 200:
print(f"Error: {response.status_code} - {response.text}")
break
data = response.json()
all_users.extend(data.get('users', []))
# Check for next page
next_page_token = data.get('nextPageToken')
if not next_page_token:
break
params['pageToken'] = next_page_token
return all_users
# Usage
access_token = get_access_token()
users = list_users_rest(access_token)
print(f"Total users: {len(users)}")
Example 2: Get Organizational Units
def get_org_units_rest(access_token):
"""Retrieve organizational unit structure"""
url = 'https://admin.googleapis.com/admin/directory/v1/customer/my_customer/orgunits'
headers = {'Authorization': f'Bearer {access_token}'}
response = requests.get(url, headers=headers)
if response.status_code == 200:
org_units = response.json().get('organizationUnits', [])
print("Organizational Units:")
for ou in org_units:
print(f" {ou['orgUnitPath']}: {ou['name']}")
print(f" Description: {ou.get('description', 'N/A')}")
print(f" Parent: {ou.get('parentOrgUnitPath', 'Root')}")
return org_units
else:
print(f"Error: {response.status_code} - {response.text}")
return []
# Usage
access_token = get_access_token()
org_units = get_org_units_rest(access_token)
Example 3: Monitor Admin Activity
def get_admin_activities_rest(access_token, days=7):
"""Get admin activity logs"""
from datetime import datetime, timedelta
start_time = (datetime.now() - timedelta(days=days)).isoformat() + 'Z'
url = 'https://admin.googleapis.com/admin/reports/v1/activity/users/all/applications/admin'
headers = {'Authorization': f'Bearer {access_token}'}
params = {
'startTime': start_time,
'maxResults': 1000
}
all_activities = []
while True:
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
print(f"Error: {response.status_code} - {response.text}")
break
data = response.json()
all_activities.extend(data.get('items', []))
next_page_token = data.get('nextPageToken')
if not next_page_token:
break
params['pageToken'] = next_page_token
return all_activities
def analyze_admin_activities(activities):
"""Analyze admin activity logs"""
activity_counts = {}
for activity in activities:
events = activity.get('events', [])
for event in events:
event_name = event.get('name', 'Unknown')
activity_counts[event_name] = activity_counts.get(event_name, 0) + 1
print("Admin Activity Summary:")
for event_name, count in sorted(activity_counts.items(), key=lambda x: x[1], reverse=True):
print(f" {event_name}: {count}")
# Usage
access_token = get_access_token()
activities = get_admin_activities_rest(access_token)
analyze_admin_activities(activities)
Example 4: Create User Report in JSON Format
def create_detailed_user_report_rest(access_token, output_file='users_report.json'):
"""Create detailed user report in JSON format"""
import json
users = list_users_rest(access_token)
report_data = {
'generated_at': datetime.now().isoformat(),
'total_users': len(users),
'users': []
}
for user in users:
user_data = {
'email': user.get('primaryEmail'),
'name': user.get('name', {}).get('fullName'),
'suspended': user.get('suspended', False),
'is_admin': user.get('isAdmin', False),
'creation_time': user.get('creationTime'),
'last_login': user.get('lastLoginTime'),
'org_unit': user.get('orgUnitPath'),
'two_factor_enabled': user.get('isEnrolledIn2Sv', False),
'aliases': user.get('aliases', [])
}
report_data['users'].append(user_data)
with open(output_file, 'w') as f:
json.dump(report_data, f, indent=2)
print(f"Detailed report saved to {output_file}")
# Usage
access_token = get_access_token()
create_detailed_user_report_rest(access_token)
Pros and Cons of REST API
Pros:
- ✅ Maximum flexibility and control
- ✅ Access to latest API features immediately
- ✅ Minimal dependencies (just requests library)
- ✅ Easy to debug with network inspection tools
- ✅ Language-agnostic approach (easily portable)
- ✅ Smaller memory footprint
- ✅ Works in any environment with HTTP support
Cons:
- ❌ Manual pagination handling
- ❌ Manual retry logic required
- ❌ More boilerplate code
- ❌ No type safety without additional work
- ❌ Manual token refresh management
- ❌ More error-prone for complex operations
- ❌ Less IDE support for API discovery
Method 3: gcloud CLI and Related Tools
Google Cloud CLI (gcloud) and related command-line tools provide a shell-friendly way to access Google Workspace data, ideal for scripting and one-off queries.
Installation
# Install gcloud CLI (macOS)
curl https://sdk.cloud.google.com | bash
exec -l $SHELL
# Or via Homebrew
brew install --cask google-cloud-sdk
# Linux
curl https://sdk.cloud.google.com | bash
# Windows (PowerShell)
(New-Object Net.WebClient).DownloadFile("https://dl.google.com/dl/cloudsdk/channels/rapid/GoogleCloudSDKInstaller.exe", "$env:Temp\GoogleCloudSDKInstaller.exe")
& $env:Temp\GoogleCloudSDKInstaller.exe
Authentication with gcloud CLI
User Authentication
# Initialize gcloud
gcloud init
# Authenticate as user
gcloud auth login
# Set application-default credentials
gcloud auth application-default login
# Verify authentication
gcloud auth list
Service Account Authentication
# Authenticate with service account key
gcloud auth activate-service-account \
service-account@project-id.iam.gserviceaccount.com \
--key-file=/path/to/key.json
# Set as application default
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/key.json"
CLI Examples
Example 1: Query Users with gcloud
#!/bin/bash
# list-workspace-users.sh
# List all Google Workspace users
# Set project
gcloud config set project YOUR_PROJECT_ID
# Get OAuth token
ACCESS_TOKEN=$(gcloud auth application-default print-access-token)
# List users via Admin SDK API
curl -s "https://admin.googleapis.com/admin/directory/v1/users?customer=my_customer&maxResults=500" \
-H "Authorization: Bearer ${ACCESS_TOKEN}" \
-H "Content-Type: application/json" | jq '.users[] | {email: .primaryEmail, name: .name.fullName, suspended: .suspended}'
Example 2: Generate User Report Script
#!/bin/bash
# generate-user-report.sh
# Generate comprehensive user report
set -euo pipefail
PROJECT_ID="your-project-id"
OUTPUT_FILE="workspace_users_$(date +%Y%m%d).csv"
# Set project
gcloud config set project "$PROJECT_ID" --quiet
# Get access token
ACCESS_TOKEN=$(gcloud auth application-default print-access-token)
echo "Fetching users from Google Workspace..."
# Fetch users
USERS_JSON=$(curl -s "https://admin.googleapis.com/admin/directory/v1/users?customer=my_customer&maxResults=500" \
-H "Authorization: Bearer ${ACCESS_TOKEN}" \
-H "Content-Type: application/json")
# Create CSV header
echo "Email,Full Name,Status,Admin,2FA Enabled,Created,Last Login,Org Unit" > "$OUTPUT_FILE"
# Parse and format data
echo "$USERS_JSON" | jq -r '.users[] |
[
.primaryEmail,
.name.fullName,
(if .suspended then "Suspended" else "Active" end),
(if .isAdmin then "Yes" else "No" end),
(if .isEnrolledIn2Sv then "Yes" else "No" end),
.creationTime[:10],
(.lastLoginTime[:10] // "Never"),
.orgUnitPath
] | @csv' >> "$OUTPUT_FILE"
echo "Report generated: $OUTPUT_FILE"
echo "Total users: $(tail -n +2 "$OUTPUT_FILE" | wc -l)"
Example 3: Monitor Login Activity
#!/bin/bash
# monitor-logins.sh
# Monitor recent login activity
set -euo pipefail
PROJECT_ID="your-project-id"
DAYS_BACK=7
# Calculate start time
START_TIME=$(date -u -d "$DAYS_BACK days ago" '+%Y-%m-%dT%H:%M:%S.000Z')
# Get access token
ACCESS_TOKEN=$(gcloud auth application-default print-access-token)
echo "Fetching login activity from the last $DAYS_BACK days..."
# Fetch login activities
ACTIVITIES=$(curl -s "https://admin.googleapis.com/admin/reports/v1/activity/users/all/applications/login?startTime=$START_TIME&maxResults=1000" \
-H "Authorization: Bearer ${ACCESS_TOKEN}" \
-H "Content-Type: application/json")
# Count login events
SUCCESSFUL=$(echo "$ACTIVITIES" | jq '[.items[].events[] | select(.name == "login_success")] | length')
FAILED=$(echo "$ACTIVITIES" | jq '[.items[].events[] | select(.name == "login_failure")] | length')
SUSPICIOUS=$(echo "$ACTIVITIES" | jq '[.items[].events[] | select(.name == "suspicious_login")] | length')
echo "Login Activity Summary (last $DAYS_BACK days):"
echo " Successful logins: $SUCCESSFUL"
echo " Failed logins: $FAILED"
echo " Suspicious activities: $SUSPICIOUS"
# Show recent failed logins
echo ""
echo "Recent Failed Logins:"
echo "$ACTIVITIES" | jq -r '.items[] |
select(.events[].name == "login_failure") |
.actor.email + " - " + .id.time' | head -10
Example 4: Audit Admin Changes
#!/bin/bash
# audit-admin-changes.sh
# Audit administrative changes
set -euo pipefail
PROJECT_ID="your-project-id"
DAYS_BACK=30
OUTPUT_FILE="admin_audit_$(date +%Y%m%d).txt"
# Calculate start time
START_TIME=$(date -u -d "$DAYS_BACK days ago" '+%Y-%m-%dT%H:%M:%S.000Z')
# Get access token
ACCESS_TOKEN=$(gcloud auth application-default print-access-token)
echo "Auditing admin changes from the last $DAYS_BACK days..." | tee "$OUTPUT_FILE"
echo "Generated: $(date)" | tee -a "$OUTPUT_FILE"
echo "======================================" | tee -a "$OUTPUT_FILE"
# Fetch admin activities
ACTIVITIES=$(curl -s "https://admin.googleapis.com/admin/reports/v1/activity/users/all/applications/admin?startTime=$START_TIME&maxResults=1000" \
-H "Authorization: Bearer ${ACCESS_TOKEN}" \
-H "Content-Type: application/json")
# User creation events
echo "" | tee -a "$OUTPUT_FILE"
echo "User Creation Events:" | tee -a "$OUTPUT_FILE"
echo "$ACTIVITIES" | jq -r '.items[] |
select(.events[].name == "CREATE_USER") |
.id.time[:10] + " - " + .actor.email + " created user" ' | tee -a "$OUTPUT_FILE"
# User deletion events
echo "" | tee -a "$OUTPUT_FILE"
echo "User Deletion Events:" | tee -a "$OUTPUT_FILE"
echo "$ACTIVITIES" | jq -r '.items[] |
select(.events[].name == "DELETE_USER") |
.id.time[:10] + " - " + .actor.email + " deleted user"' | tee -a "$OUTPUT_FILE"
# Admin role changes
echo "" | tee -a "$OUTPUT_FILE"
echo "Admin Role Changes:" | tee -a "$OUTPUT_FILE"
echo "$ACTIVITIES" | jq -r '.items[] |
select(.events[].name | contains("ADMIN")) |
.id.time[:10] + " - " + .actor.email + " - " + .events[].name' | tee -a "$OUTPUT_FILE"
echo "" | tee -a "$OUTPUT_FILE"
echo "Audit report saved to: $OUTPUT_FILE"
Example 5: Organizational Unit Report
#!/bin/bash
# org-units-report.sh
# Generate organizational units report
set -euo pipefail
PROJECT_ID="your-project-id"
# Get access token
ACCESS_TOKEN=$(gcloud auth application-default print-access-token)
echo "Fetching organizational units..."
# Fetch org units
ORG_UNITS=$(curl -s "https://admin.googleapis.com/admin/directory/v1/customer/my_customer/orgunits" \
-H "Authorization: Bearer ${ACCESS_TOKEN}" \
-H "Content-Type: application/json")
echo "Organizational Unit Structure:"
echo "=============================="
# Display in tree format
echo "$ORG_UNITS" | jq -r '.organizationUnits[] |
"Path: " + .orgUnitPath +
"\n Name: " + .name +
"\n Description: " + (.description // "N/A") +
"\n Parent: " + (.parentOrgUnitPath // "/") +
"\n"'
# Count users per org unit
echo ""
echo "User Counts by Org Unit:"
echo "========================"
for ORG_PATH in $(echo "$ORG_UNITS" | jq -r '.organizationUnits[].orgUnitPath'); do
# URL encode the org path
ENCODED_PATH=$(echo "$ORG_PATH" | jq -sRr @uri)
USER_COUNT=$(curl -s "https://admin.googleapis.com/admin/directory/v1/users?customer=my_customer&query=orgUnitPath=$ENCODED_PATH&maxResults=1" \
-H "Authorization: Bearer ${ACCESS_TOKEN}" | jq -r '.users | length')
echo " $ORG_PATH: $USER_COUNT users"
done
Example 6: License Usage Report
#!/bin/bash
# license-usage-report.sh
# Report on license usage across the organization
set -euo pipefail
PROJECT_ID="your-project-id"
OUTPUT_FILE="license_usage_$(date +%Y%m%d).csv"
# Get access token
ACCESS_TOKEN=$(gcloud auth application-default print-access-token)
echo "Fetching license information..."
# Common Google Workspace SKUs
declare -a SKUS=(
"Google-Apps-For-Business"
"Google-Apps-Unlimited"
"Google-Apps-For-Postini"
"Google-Vault"
"Google-Vault-Former-Employee"
)
echo "SKU,Total Licenses,Assigned,Available" > "$OUTPUT_FILE"
for SKU in "${SKUS[@]}"; do
LICENSE_DATA=$(curl -s "https://reseller.googleapis.com/apps/reseller/v1/customers/my_customer/subscriptions/$SKU" \
-H "Authorization: Bearer ${ACCESS_TOKEN}" 2>/dev/null || echo "{}")
if [ "$(echo "$LICENSE_DATA" | jq -r .kind 2>/dev/null)" != "null" ]; then
TOTAL=$(echo "$LICENSE_DATA" | jq -r '.plan.maximumNumberOfSeats // "N/A"')
ASSIGNED=$(echo "$LICENSE_DATA" | jq -r '.seats.numberOfSeats // 0')
AVAILABLE=$((TOTAL - ASSIGNED))
echo "$SKU,$TOTAL,$ASSIGNED,$AVAILABLE" >> "$OUTPUT_FILE"
echo " $SKU: $ASSIGNED / $TOTAL licenses used"
fi
done
echo ""
echo "License report saved to: $OUTPUT_FILE"
Pros and Cons of gcloud CLI
Pros:
- ✅ Great for ad-hoc queries and exploration
- ✅ Easy to integrate into shell scripts
- ✅ No programming language required
- ✅ Perfect for CI/CD pipelines
- ✅ Combines well with standard Unix tools (jq, awk, sed)
- ✅ Quick authentication setup
- ✅ Easy to schedule with cron
Cons:
- ❌ Limited direct Google Workspace commands (mostly need API calls)
- ❌ Less type safety and validation
- ❌ Harder to handle complex data transformations
- ❌ Error handling can be challenging in bash
- ❌ Requires jq or similar tools for JSON processing
- ❌ String parsing can be fragile
- ❌ Less portable across different shells
Comparison Summary
| Feature | Python SDK | REST API | gcloud CLI |
|---|---|---|---|
| Ease of Use | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
| Flexibility | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ |
| Performance | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ |
| Error Handling | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐ |
| Type Safety | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐ |
| Documentation | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| Setup Complexity | ⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐ |
| Scripting | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| Dependencies | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| Latest Features | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
When to Use Each Approach
Use Python SDK when:
- Building production applications
- Need type safety and IDE support
- Want automatic retry and pagination
- Developing complex data processing workflows
- Team has Python expertise
- Need long-term maintainability
Use REST API when:
- SDK doesn’t support required features
- Working in non-Python environment
- Need minimal dependencies
- Require maximum performance
- Building microservices or serverless functions
- Need precise control over requests
Use gcloud CLI when:
- Running one-off queries or reports
- Building shell scripts
- Integrating with CI/CD pipelines
- Need quick prototyping
- Working in bash/shell environment
- Combining with other command-line tools
Authentication Comparison
Service Account (Best for Automation)
Setup Complexity: Medium Use Case: Unattended scripts, scheduled jobs, automation Security: High (credential file must be protected)
Pros:
- No user interaction required
- Can impersonate users with domain-wide delegation
- Ideal for server-to-server communication
- Long-lived credentials
Cons:
- Requires admin setup in Google Workspace Console
- More complex initial configuration
- Credential file security is critical
OAuth 2.0 (Best for User Applications)
Setup Complexity: Medium-High Use Case: Desktop applications, user-facing tools Security: High (user grants specific permissions)
Pros:
- User controls access
- Granular permission scopes
- Can revoke access easily
- Better audit trail (shows user identity)
Cons:
- Requires user interaction
- Token refresh management needed
- Not suitable for automation
- Consent screen configuration required
API Keys (Limited Use)
Setup Complexity: Low Use Case: Public data only Security: Low (easily exposed)
Pros:
- Simple to implement
- No OAuth flow required
Cons:
- Very limited functionality
- Not suitable for Google Workspace Admin data
- Security concerns if exposed
Best Practices
1. Security
# Store credentials securely
import os
from pathlib import Path
# Use environment variables
SERVICE_ACCOUNT_KEY = os.environ.get('GOOGLE_SERVICE_ACCOUNT_KEY')
# Or use secret management
from google.cloud import secretmanager
client = secretmanager.SecretManagerServiceClient()
secret_name = "projects/PROJECT_ID/secrets/SERVICE_ACCOUNT_KEY/versions/latest"
response = client.access_secret_version(request={"name": secret_name})
credentials_json = response.payload.data.decode('UTF-8')
# Never commit credentials to version control
# Add to .gitignore:
# *.json
# credentials.json
# service-account-key.json
# token.pickle
2. Rate Limiting and Quotas
import time
from google.api_core import retry
# Implement exponential backoff
@retry.Retry(
initial=1.0,
maximum=60.0,
multiplier=2.0,
deadline=300.0
)
def api_call_with_retry():
# Your API call here
pass
# Manual rate limiting
def batch_process_with_delay(items, delay=1.0):
"""Process items with delay to avoid rate limits"""
for i, item in enumerate(items):
if i > 0 and i % 10 == 0:
print(f"Processed {i} items, pausing...")
time.sleep(delay)
process_item(item)
3. Error Handling
from googleapiclient.errors import HttpError
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def robust_api_call(service, user_email):
"""API call with comprehensive error handling"""
try:
result = service.users().get(userKey=user_email).execute()
return result
except HttpError as e:
if e.resp.status == 404:
logger.warning(f"User not found: {user_email}")
elif e.resp.status == 403:
logger.error(f"Permission denied for user: {user_email}")
elif e.resp.status == 429:
logger.warning("Rate limit hit, backing off...")
time.sleep(60)
return robust_api_call(service, user_email) # Retry
else:
logger.error(f"Unexpected error: {e}")
return None
except Exception as e:
logger.error(f"Unexpected exception: {e}")
return None
4. Pagination
def get_all_paginated_results(service, request):
"""Generic pagination handler"""
all_items = []
while request is not None:
try:
response = request.execute()
items = response.get('users', response.get('items', []))
all_items.extend(items)
# Get next page request
request = service.users().list_next(request, response)
except Exception as e:
logger.error(f"Error during pagination: {e}")
break
return all_items
5. Logging and Auditing
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'workspace_report_{datetime.now():%Y%m%d}.log'),
logging.StreamHandler()
]
)
def audited_operation(operation_name):
"""Decorator to audit operations"""
def decorator(func):
def wrapper(*args, **kwargs):
logger.info(f"Starting {operation_name}")
start_time = time.time()
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
logger.info(f"Completed {operation_name} in {duration:.2f}s")
return result
except Exception as e:
logger.error(f"Failed {operation_name}: {e}")
raise
return wrapper
return decorator
@audited_operation("user_report_generation")
def generate_user_report(service):
# Implementation
pass
Complete Production Example
Here’s a production-ready script combining best practices:
#!/usr/bin/env python3
"""
Google Workspace Reporting Tool
Comprehensive reporting for Google Workspace users, logins, and configuration.
"""
import os
import sys
import logging
import argparse
import csv
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import time
# Configuration
SCOPES = [
'https://www.googleapis.com/auth/admin.directory.user.readonly',
'https://www.googleapis.com/auth/admin.reports.audit.readonly',
'https://www.googleapis.com/auth/admin.reports.usage.readonly'
]
# Logging setup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'workspace_report_{datetime.now():%Y%m%d_%H%M%S}.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class GoogleWorkspaceReporter:
"""Google Workspace reporting and analytics"""
def __init__(self, service_account_file: str, delegated_user: str):
"""Initialize with service account credentials"""
self.service_account_file = service_account_file
self.delegated_user = delegated_user
self.admin_service = None
self.reports_service = None
self._authenticate()
def _authenticate(self):
"""Authenticate and build service objects"""
try:
credentials = service_account.Credentials.from_service_account_file(
self.service_account_file,
scopes=SCOPES
)
delegated_credentials = credentials.with_subject(self.delegated_user)
self.admin_service = build('admin', 'directory_v1', credentials=delegated_credentials)
self.reports_service = build('admin', 'reports_v1', credentials=delegated_credentials)
logger.info("Authentication successful")
except Exception as e:
logger.error(f"Authentication failed: {e}")
sys.exit(1)
def get_all_users(self) -> List[Dict]:
"""Retrieve all users with pagination"""
logger.info("Fetching all users...")
users = []
page_token = None
try:
while True:
response = self.admin_service.users().list(
customer='my_customer',
maxResults=500,
orderBy='email',
pageToken=page_token
).execute()
users.extend(response.get('users', []))
page_token = response.get('nextPageToken')
if not page_token:
break
time.sleep(0.5) # Rate limiting
logger.info(f"Retrieved {len(users)} users")
return users
except HttpError as e:
logger.error(f"Error fetching users: {e}")
return []
def generate_user_report(self, output_file: str = 'user_report.csv'):
"""Generate comprehensive user report"""
logger.info(f"Generating user report: {output_file}")
users = self.get_all_users()
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = [
'Email', 'Full Name', 'Status', 'Admin', 'Suspended',
'2FA Enrolled', 'Created Date', 'Last Login', 'Org Unit',
'Aliases'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for user in users:
writer.writerow({
'Email': user.get('primaryEmail', ''),
'Full Name': user.get('name', {}).get('fullName', ''),
'Status': 'Active' if not user.get('suspended') else 'Suspended',
'Admin': 'Yes' if user.get('isAdmin') else 'No',
'Suspended': 'Yes' if user.get('suspended') else 'No',
'2FA Enrolled': 'Yes' if user.get('isEnrolledIn2Sv') else 'No',
'Created Date': user.get('creationTime', '')[:10],
'Last Login': user.get('lastLoginTime', 'Never')[:10] if user.get('lastLoginTime') else 'Never',
'Org Unit': user.get('orgUnitPath', '/'),
'Aliases': ', '.join(user.get('aliases', []))
})
logger.info(f"User report generated: {output_file}")
def get_login_activities(self, days: int = 7) -> List[Dict]:
"""Get login activities for the past N days"""
logger.info(f"Fetching login activities for the past {days} days...")
start_time = (datetime.now() - timedelta(days=days)).isoformat() + 'Z'
activities = []
page_token = None
try:
while True:
response = self.reports_service.activities().list(
userKey='all',
applicationName='login',
maxResults=1000,
startTime=start_time,
pageToken=page_token
).execute()
activities.extend(response.get('items', []))
page_token = response.get('nextPageToken')
if not page_token:
break
time.sleep(0.5)
logger.info(f"Retrieved {len(activities)} login activities")
return activities
except HttpError as e:
logger.error(f"Error fetching login activities: {e}")
return []
def generate_login_report(self, output_file: str = 'login_report.txt', days: int = 7):
"""Generate login activity report"""
logger.info(f"Generating login report: {output_file}")
activities = self.get_login_activities(days)
# Analyze activities
successful = failed = suspicious = 0
failed_logins = []
for activity in activities:
for event in activity.get('events', []):
event_name = event.get('name', '')
if event_name == 'login_success':
successful += 1
elif event_name == 'login_failure':
failed += 1
failed_logins.append(activity)
elif 'suspicious' in event_name.lower():
suspicious += 1
# Write report
with open(output_file, 'w') as f:
f.write(f"Login Activity Report\n")
f.write(f"Period: Last {days} days\n")
f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"{'=' * 60}\n\n")
f.write(f"Summary:\n")
f.write(f" Successful logins: {successful}\n")
f.write(f" Failed logins: {failed}\n")
f.write(f" Suspicious activities: {suspicious}\n\n")
if failed_logins:
f.write(f"Recent Failed Login Attempts:\n")
for activity in failed_logins[:20]:
actor = activity.get('actor', {}).get('email', 'Unknown')
time = activity.get('id', {}).get('time', 'Unknown')
f.write(f" {time}: {actor}\n")
logger.info(f"Login report generated: {output_file}")
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description='Google Workspace Reporting Tool'
)
parser.add_argument(
'--service-account',
required=True,
help='Path to service account JSON key file'
)
parser.add_argument(
'--delegated-user',
required=True,
help='Admin user email to impersonate'
)
parser.add_argument(
'--report-type',
choices=['users', 'logins', 'all'],
default='all',
help='Type of report to generate'
)
parser.add_argument(
'--output-dir',
default='./reports',
help='Output directory for reports'
)
parser.add_argument(
'--days',
type=int,
default=7,
help='Number of days for activity reports'
)
args = parser.parse_args()
# Create output directory
os.makedirs(args.output_dir, exist_ok=True)
# Initialize reporter
reporter = GoogleWorkspaceReporter(
args.service_account,
args.delegated_user
)
# Generate reports
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if args.report_type in ['users', 'all']:
user_report = os.path.join(args.output_dir, f'users_{timestamp}.csv')
reporter.generate_user_report(user_report)
if args.report_type in ['logins', 'all']:
login_report = os.path.join(args.output_dir, f'logins_{timestamp}.txt')
reporter.generate_login_report(login_report, args.days)
logger.info("All reports generated successfully")
if __name__ == '__main__':
main()
Usage:
python workspace_reporter.py \
--service-account /path/to/service-account-key.json \
--delegated-user admin@example.com \
--report-type all \
--output-dir ./reports \
--days 30
Conclusion
Google Workspace provides powerful APIs and tools for extracting configuration and analytics data. Each method—Python SDK, REST API, and gcloud CLI—has its strengths:
- Python SDK excels in production applications with its robust error handling and type safety
- REST API provides maximum flexibility and access to latest features
- gcloud CLI shines in scripting and quick one-off queries
For most production use cases, the Python SDK is recommended for its balance of ease-of-use and capabilities. Use REST API when you need features not yet available in the SDK or when working outside Python. Choose gcloud CLI for shell scripts, CI/CD integration, and quick exploratory queries.
Key Takeaways
- Authentication: Service Accounts with domain-wide delegation are best for automation
- Security: Always protect credentials and use secret management systems
- Rate Limiting: Implement backoff strategies and respect API quotas
- Error Handling: Build robust error handling for production reliability
- Pagination: Always handle pagination for complete data retrieval
- Logging: Maintain comprehensive logs for auditing and troubleshooting