Retrieving Data for Reporting from Google Workspace: Python SDK, API, and CLI Comparison

READER BEWARE: THE FOLLOWING WRITTEN ENTIRELY BY AI WITHOUT HUMAN EDITING.

Introduction

Google Workspace (formerly G Suite) provides powerful APIs and tools for extracting configuration and analytics data for reporting purposes. Whether you’re managing user accounts, monitoring service usage, analyzing email traffic, or auditing security settings, Google Workspace offers multiple approaches to retrieve this data programmatically.

This comprehensive guide explores three primary methods for retrieving Google Workspace data:

  1. Google Workspace Python SDK - Official Python client libraries
  2. Google Workspace Admin API - Direct REST API calls
  3. gcloud CLI - Command-line interface tools

We’ll compare authentication mechanisms, use cases, pros and cons, and provide practical examples for each approach.

Why Retrieve Google Workspace Data Programmatically?

Common Use Cases

  • User Management Reports: Track user creation, deletion, and status changes
  • License Compliance: Monitor license allocation and usage across the organization
  • Security Auditing: Extract login attempts, 2FA status, and security events
  • Usage Analytics: Analyze Gmail, Drive, Calendar, and Meet usage patterns
  • Configuration Auditing: Document organizational settings for compliance
  • Cost Optimization: Identify unused licenses and optimize subscription costs
  • Automated Reporting: Generate regular status reports for management and stakeholders

Authentication Overview

All three methods require authentication with Google Workspace. The authentication mechanism varies slightly but generally involves:

  1. Service Account (recommended for automation) - Credentials without user interaction
  2. OAuth 2.0 (for user-delegated access) - User grants permission to application
  3. API Keys (limited use) - For some public data access only

Setting Up Google Cloud Project

Before using any method, you need a Google Cloud Project:

  1. Go to Google Cloud Console
  2. Create a new project or select existing one
  3. Enable required APIs (Admin SDK, Reports API, etc.)
  4. Create credentials (Service Account or OAuth 2.0 Client ID)

Method 1: Google Workspace Python SDK

The Python SDK provides idiomatic Python interfaces to Google Workspace APIs, handling authentication, pagination, and error handling.

Installation

# Install Google Workspace Admin SDK
pip install google-api-python-client google-auth-httplib2 google-auth-oauthlib

# For specific services
pip install google-api-python-client google-auth

Authentication with Python SDK

Setup:

  1. Create a Service Account in Google Cloud Console
  2. Download JSON key file
  3. Enable Domain-Wide Delegation for the Service Account
  4. In Google Workspace Admin Console, authorize the service account with required scopes
from google.oauth2 import service_account
from googleapiclient.discovery import build

# Service Account credentials
SCOPES = [
    'https://www.googleapis.com/auth/admin.directory.user.readonly',
    'https://www.googleapis.com/auth/admin.reports.audit.readonly',
    'https://www.googleapis.com/auth/admin.reports.usage.readonly'
]

SERVICE_ACCOUNT_FILE = 'path/to/service-account-key.json'
DELEGATED_USER_EMAIL = 'admin@example.com'  # Admin user to impersonate

def get_admin_service():
    """Create Admin SDK service with service account"""
    credentials = service_account.Credentials.from_service_account_file(
        SERVICE_ACCOUNT_FILE,
        scopes=SCOPES
    )
    
    # Delegate to admin user
    delegated_credentials = credentials.with_subject(DELEGATED_USER_EMAIL)
    
    service = build('admin', 'directory_v1', credentials=delegated_credentials)
    return service

def get_reports_service():
    """Create Reports API service with service account"""
    credentials = service_account.Credentials.from_service_account_file(
        SERVICE_ACCOUNT_FILE,
        scopes=SCOPES
    )
    
    delegated_credentials = credentials.with_subject(DELEGATED_USER_EMAIL)
    service = build('admin', 'reports_v1', credentials=delegated_credentials)
    return service

Setup:

  1. Create OAuth 2.0 Client ID in Google Cloud Console
  2. Download credentials JSON file
  3. Run authentication flow once to get refresh token
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import pickle
import os

SCOPES = ['https://www.googleapis.com/auth/admin.directory.user.readonly']

def get_oauth_credentials():
    """Authenticate using OAuth 2.0 flow"""
    creds = None
    
    # Token file stores access and refresh tokens
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    
    # If no valid credentials, run OAuth flow
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        
        # Save credentials for future use
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    
    return creds

def get_admin_service_oauth():
    """Create Admin SDK service with OAuth"""
    creds = get_oauth_credentials()
    service = build('admin', 'directory_v1', credentials=creds)
    return service

Python SDK Examples

Example 1: List All Users in Organization

def list_all_users(service):
    """Retrieve all users from Google Workspace"""
    users = []
    page_token = None
    
    try:
        while True:
            results = service.users().list(
                customer='my_customer',
                maxResults=500,
                orderBy='email',
                pageToken=page_token
            ).execute()
            
            users.extend(results.get('users', []))
            
            page_token = results.get('nextPageToken')
            if not page_token:
                break
        
        return users
    
    except Exception as e:
        print(f"Error fetching users: {e}")
        return []

# Usage
service = get_admin_service()
users = list_all_users(service)

print(f"Total users: {len(users)}")
for user in users[:5]:  # Print first 5
    print(f"  {user['primaryEmail']}: {user['name']['fullName']}")

Example 2: Generate User Status Report

def generate_user_status_report(service, output_file='user_report.csv'):
    """Generate comprehensive user status report"""
    import csv
    from datetime import datetime
    
    users = list_all_users(service)
    
    with open(output_file, 'w', newline='') as csvfile:
        fieldnames = [
            'Email', 'Full Name', 'Status', 'Admin', 
            'Suspended', '2FA Enrolled', 'Created Date', 
            'Last Login', 'Org Unit'
        ]
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        
        for user in users:
            writer.writerow({
                'Email': user.get('primaryEmail', ''),
                'Full Name': user.get('name', {}).get('fullName', ''),
                'Status': 'Active' if not user.get('suspended', False) else 'Suspended',
                'Admin': 'Yes' if user.get('isAdmin', False) else 'No',
                'Suspended': 'Yes' if user.get('suspended', False) else 'No',
                '2FA Enrolled': 'Yes' if user.get('isEnrolledIn2Sv', False) else 'No',
                'Created Date': user.get('creationTime', '')[:10],
                'Last Login': user.get('lastLoginTime', 'Never')[:10] if user.get('lastLoginTime') else 'Never',
                'Org Unit': user.get('orgUnitPath', '/')
            })
    
    print(f"Report generated: {output_file}")

# Usage
service = get_admin_service()
generate_user_status_report(service)

Example 3: Audit Login Activity

def get_login_audit_report(service, days=7):
    """Get login audit logs for the past N days"""
    from datetime import datetime, timedelta
    
    # Calculate start time
    start_time = (datetime.now() - timedelta(days=days)).isoformat() + 'Z'
    
    activities = []
    page_token = None
    
    try:
        while True:
            results = service.activities().list(
                userKey='all',
                applicationName='login',
                maxResults=1000,
                startTime=start_time,
                pageToken=page_token
            ).execute()
            
            activities.extend(results.get('items', []))
            
            page_token = results.get('nextPageToken')
            if not page_token:
                break
        
        return activities
    
    except Exception as e:
        print(f"Error fetching login activities: {e}")
        return []

def analyze_login_activities(activities):
    """Analyze login activity data"""
    successful_logins = 0
    failed_logins = 0
    suspicious_logins = []
    
    for activity in activities:
        event_name = activity.get('events', [{}])[0].get('name', '')
        
        if event_name == 'login_success':
            successful_logins += 1
        elif event_name == 'login_failure':
            failed_logins += 1
        elif event_name in ['suspicious_login', 'account_disabled_suspicious_activity']:
            suspicious_logins.append(activity)
    
    print(f"Login Analysis (last 7 days):")
    print(f"  Successful logins: {successful_logins}")
    print(f"  Failed logins: {failed_logins}")
    print(f"  Suspicious activities: {len(suspicious_logins)}")
    
    if suspicious_logins:
        print("\nSuspicious Login Details:")
        for activity in suspicious_logins[:10]:  # Show first 10
            actor = activity.get('actor', {}).get('email', 'Unknown')
            time = activity.get('id', {}).get('time', 'Unknown')
            print(f"  {time}: {actor}")

# Usage
reports_service = get_reports_service()
activities = get_login_audit_report(reports_service)
analyze_login_activities(activities)

Example 4: Extract Usage Statistics

def get_user_usage_statistics(service, user_email, date='2025-11-18'):
    """Get usage statistics for a specific user"""
    try:
        result = service.userUsageReport().get(
            userKey=user_email,
            date=date
        ).execute()
        
        usage_reports = result.get('usageReports', [])
        
        if usage_reports:
            report = usage_reports[0]
            parameters = report.get('parameters', [])
            
            print(f"Usage statistics for {user_email} on {date}:")
            for param in parameters:
                name = param.get('name', '')
                value = param.get('intValue', param.get('boolValue', param.get('datetimeValue', 'N/A')))
                print(f"  {name}: {value}")
        else:
            print(f"No usage data available for {user_email} on {date}")
    
    except Exception as e:
        print(f"Error fetching usage statistics: {e}")

# Usage
reports_service = get_reports_service()
get_user_usage_statistics(reports_service, 'user@example.com')

Pros and Cons of Python SDK

Pros:

  • ✅ Idiomatic Python interface with type hints
  • ✅ Built-in pagination handling
  • ✅ Automatic retry logic for transient errors
  • ✅ Extensive documentation and examples
  • ✅ Active maintenance and updates
  • ✅ Handles OAuth token refresh automatically
  • ✅ Type safety and IDE autocomplete support

Cons:

  • ❌ Requires installing additional dependencies
  • ❌ Learning curve for SDK-specific patterns
  • ❌ Some advanced API features may lag behind REST API
  • ❌ Larger memory footprint for simple tasks
  • ❌ Versioning complexity across different Google services

Method 2: Google Workspace Admin API (Direct REST)

Using REST API directly provides maximum flexibility and control, useful when SDK isn’t available or you need bleeding-edge features.

Authentication with REST API

Service Account Authentication

import requests
from google.oauth2 import service_account
from google.auth.transport.requests import Request

SERVICE_ACCOUNT_FILE = 'path/to/service-account-key.json'
SCOPES = ['https://www.googleapis.com/auth/admin.directory.user.readonly']
DELEGATED_USER = 'admin@example.com'

def get_access_token():
    """Get access token for REST API calls"""
    credentials = service_account.Credentials.from_service_account_file(
        SERVICE_ACCOUNT_FILE,
        scopes=SCOPES
    )
    
    delegated_credentials = credentials.with_subject(DELEGATED_USER)
    
    # Refresh to get access token
    auth_request = Request()
    delegated_credentials.refresh(auth_request)
    
    return delegated_credentials.token

# Get token
access_token = get_access_token()

# Use in API calls
headers = {
    'Authorization': f'Bearer {access_token}',
    'Content-Type': 'application/json'
}

OAuth 2.0 Authentication

import requests
from requests_oauthlib import OAuth2Session

# OAuth configuration
CLIENT_ID = 'your-client-id.apps.googleusercontent.com'
CLIENT_SECRET = 'your-client-secret'
REDIRECT_URI = 'http://localhost:8080'
AUTHORIZATION_BASE_URL = 'https://accounts.google.com/o/oauth2/v2/auth'
TOKEN_URL = 'https://oauth2.googleapis.com/token'
SCOPES = ['https://www.googleapis.com/auth/admin.directory.user.readonly']

def get_oauth_token():
    """Get OAuth access token"""
    oauth = OAuth2Session(CLIENT_ID, redirect_uri=REDIRECT_URI, scope=SCOPES)
    
    # Get authorization URL
    authorization_url, state = oauth.authorization_url(
        AUTHORIZATION_BASE_URL,
        access_type='offline',
        prompt='consent'
    )
    
    print(f"Please visit this URL to authorize: {authorization_url}")
    authorization_response = input("Enter the full callback URL: ")
    
    # Fetch token
    token = oauth.fetch_token(
        TOKEN_URL,
        authorization_response=authorization_response,
        client_secret=CLIENT_SECRET
    )
    
    return token['access_token']

REST API Examples

Example 1: List Users via REST API

import requests

def list_users_rest(access_token, domain='example.com'):
    """List all users using REST API"""
    base_url = 'https://admin.googleapis.com/admin/directory/v1/users'
    headers = {'Authorization': f'Bearer {access_token}'}
    
    all_users = []
    params = {
        'customer': 'my_customer',
        'maxResults': 500,
        'orderBy': 'email'
    }
    
    while True:
        response = requests.get(base_url, headers=headers, params=params)
        
        if response.status_code != 200:
            print(f"Error: {response.status_code} - {response.text}")
            break
        
        data = response.json()
        all_users.extend(data.get('users', []))
        
        # Check for next page
        next_page_token = data.get('nextPageToken')
        if not next_page_token:
            break
        
        params['pageToken'] = next_page_token
    
    return all_users

# Usage
access_token = get_access_token()
users = list_users_rest(access_token)
print(f"Total users: {len(users)}")

Example 2: Get Organizational Units

def get_org_units_rest(access_token):
    """Retrieve organizational unit structure"""
    url = 'https://admin.googleapis.com/admin/directory/v1/customer/my_customer/orgunits'
    headers = {'Authorization': f'Bearer {access_token}'}
    
    response = requests.get(url, headers=headers)
    
    if response.status_code == 200:
        org_units = response.json().get('organizationUnits', [])
        
        print("Organizational Units:")
        for ou in org_units:
            print(f"  {ou['orgUnitPath']}: {ou['name']}")
            print(f"    Description: {ou.get('description', 'N/A')}")
            print(f"    Parent: {ou.get('parentOrgUnitPath', 'Root')}")
        
        return org_units
    else:
        print(f"Error: {response.status_code} - {response.text}")
        return []

# Usage
access_token = get_access_token()
org_units = get_org_units_rest(access_token)

Example 3: Monitor Admin Activity

def get_admin_activities_rest(access_token, days=7):
    """Get admin activity logs"""
    from datetime import datetime, timedelta
    
    start_time = (datetime.now() - timedelta(days=days)).isoformat() + 'Z'
    
    url = 'https://admin.googleapis.com/admin/reports/v1/activity/users/all/applications/admin'
    headers = {'Authorization': f'Bearer {access_token}'}
    params = {
        'startTime': start_time,
        'maxResults': 1000
    }
    
    all_activities = []
    
    while True:
        response = requests.get(url, headers=headers, params=params)
        
        if response.status_code != 200:
            print(f"Error: {response.status_code} - {response.text}")
            break
        
        data = response.json()
        all_activities.extend(data.get('items', []))
        
        next_page_token = data.get('nextPageToken')
        if not next_page_token:
            break
        
        params['pageToken'] = next_page_token
    
    return all_activities

def analyze_admin_activities(activities):
    """Analyze admin activity logs"""
    activity_counts = {}
    
    for activity in activities:
        events = activity.get('events', [])
        for event in events:
            event_name = event.get('name', 'Unknown')
            activity_counts[event_name] = activity_counts.get(event_name, 0) + 1
    
    print("Admin Activity Summary:")
    for event_name, count in sorted(activity_counts.items(), key=lambda x: x[1], reverse=True):
        print(f"  {event_name}: {count}")

# Usage
access_token = get_access_token()
activities = get_admin_activities_rest(access_token)
analyze_admin_activities(activities)

Example 4: Create User Report in JSON Format

def create_detailed_user_report_rest(access_token, output_file='users_report.json'):
    """Create detailed user report in JSON format"""
    import json
    
    users = list_users_rest(access_token)
    
    report_data = {
        'generated_at': datetime.now().isoformat(),
        'total_users': len(users),
        'users': []
    }
    
    for user in users:
        user_data = {
            'email': user.get('primaryEmail'),
            'name': user.get('name', {}).get('fullName'),
            'suspended': user.get('suspended', False),
            'is_admin': user.get('isAdmin', False),
            'creation_time': user.get('creationTime'),
            'last_login': user.get('lastLoginTime'),
            'org_unit': user.get('orgUnitPath'),
            'two_factor_enabled': user.get('isEnrolledIn2Sv', False),
            'aliases': user.get('aliases', [])
        }
        report_data['users'].append(user_data)
    
    with open(output_file, 'w') as f:
        json.dump(report_data, f, indent=2)
    
    print(f"Detailed report saved to {output_file}")

# Usage
access_token = get_access_token()
create_detailed_user_report_rest(access_token)

Pros and Cons of REST API

Pros:

  • ✅ Maximum flexibility and control
  • ✅ Access to latest API features immediately
  • ✅ Minimal dependencies (just requests library)
  • ✅ Easy to debug with network inspection tools
  • ✅ Language-agnostic approach (easily portable)
  • ✅ Smaller memory footprint
  • ✅ Works in any environment with HTTP support

Cons:

  • ❌ Manual pagination handling
  • ❌ Manual retry logic required
  • ❌ More boilerplate code
  • ❌ No type safety without additional work
  • ❌ Manual token refresh management
  • ❌ More error-prone for complex operations
  • ❌ Less IDE support for API discovery

Google Cloud CLI (gcloud) and related command-line tools provide a shell-friendly way to access Google Workspace data, ideal for scripting and one-off queries.

Installation

# Install gcloud CLI (macOS)
curl https://sdk.cloud.google.com | bash
exec -l $SHELL

# Or via Homebrew
brew install --cask google-cloud-sdk

# Linux
curl https://sdk.cloud.google.com | bash

# Windows (PowerShell)
(New-Object Net.WebClient).DownloadFile("https://dl.google.com/dl/cloudsdk/channels/rapid/GoogleCloudSDKInstaller.exe", "$env:Temp\GoogleCloudSDKInstaller.exe")
& $env:Temp\GoogleCloudSDKInstaller.exe

Authentication with gcloud CLI

User Authentication

# Initialize gcloud
gcloud init

# Authenticate as user
gcloud auth login

# Set application-default credentials
gcloud auth application-default login

# Verify authentication
gcloud auth list

Service Account Authentication

# Authenticate with service account key
gcloud auth activate-service-account \
    service-account@project-id.iam.gserviceaccount.com \
    --key-file=/path/to/key.json

# Set as application default
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/key.json"

CLI Examples

Example 1: Query Users with gcloud

#!/bin/bash
# list-workspace-users.sh
# List all Google Workspace users

# Set project
gcloud config set project YOUR_PROJECT_ID

# Get OAuth token
ACCESS_TOKEN=$(gcloud auth application-default print-access-token)

# List users via Admin SDK API
curl -s "https://admin.googleapis.com/admin/directory/v1/users?customer=my_customer&maxResults=500" \
  -H "Authorization: Bearer ${ACCESS_TOKEN}" \
  -H "Content-Type: application/json" | jq '.users[] | {email: .primaryEmail, name: .name.fullName, suspended: .suspended}'

Example 2: Generate User Report Script

#!/bin/bash
# generate-user-report.sh
# Generate comprehensive user report

set -euo pipefail

PROJECT_ID="your-project-id"
OUTPUT_FILE="workspace_users_$(date +%Y%m%d).csv"

# Set project
gcloud config set project "$PROJECT_ID" --quiet

# Get access token
ACCESS_TOKEN=$(gcloud auth application-default print-access-token)

echo "Fetching users from Google Workspace..."

# Fetch users
USERS_JSON=$(curl -s "https://admin.googleapis.com/admin/directory/v1/users?customer=my_customer&maxResults=500" \
  -H "Authorization: Bearer ${ACCESS_TOKEN}" \
  -H "Content-Type: application/json")

# Create CSV header
echo "Email,Full Name,Status,Admin,2FA Enabled,Created,Last Login,Org Unit" > "$OUTPUT_FILE"

# Parse and format data
echo "$USERS_JSON" | jq -r '.users[] | 
  [
    .primaryEmail,
    .name.fullName,
    (if .suspended then "Suspended" else "Active" end),
    (if .isAdmin then "Yes" else "No" end),
    (if .isEnrolledIn2Sv then "Yes" else "No" end),
    .creationTime[:10],
    (.lastLoginTime[:10] // "Never"),
    .orgUnitPath
  ] | @csv' >> "$OUTPUT_FILE"

echo "Report generated: $OUTPUT_FILE"
echo "Total users: $(tail -n +2 "$OUTPUT_FILE" | wc -l)"

Example 3: Monitor Login Activity

#!/bin/bash
# monitor-logins.sh
# Monitor recent login activity

set -euo pipefail

PROJECT_ID="your-project-id"
DAYS_BACK=7

# Calculate start time
START_TIME=$(date -u -d "$DAYS_BACK days ago" '+%Y-%m-%dT%H:%M:%S.000Z')

# Get access token
ACCESS_TOKEN=$(gcloud auth application-default print-access-token)

echo "Fetching login activity from the last $DAYS_BACK days..."

# Fetch login activities
ACTIVITIES=$(curl -s "https://admin.googleapis.com/admin/reports/v1/activity/users/all/applications/login?startTime=$START_TIME&maxResults=1000" \
  -H "Authorization: Bearer ${ACCESS_TOKEN}" \
  -H "Content-Type: application/json")

# Count login events
SUCCESSFUL=$(echo "$ACTIVITIES" | jq '[.items[].events[] | select(.name == "login_success")] | length')
FAILED=$(echo "$ACTIVITIES" | jq '[.items[].events[] | select(.name == "login_failure")] | length')
SUSPICIOUS=$(echo "$ACTIVITIES" | jq '[.items[].events[] | select(.name == "suspicious_login")] | length')

echo "Login Activity Summary (last $DAYS_BACK days):"
echo "  Successful logins: $SUCCESSFUL"
echo "  Failed logins: $FAILED"
echo "  Suspicious activities: $SUSPICIOUS"

# Show recent failed logins
echo ""
echo "Recent Failed Logins:"
echo "$ACTIVITIES" | jq -r '.items[] | 
  select(.events[].name == "login_failure") | 
  .actor.email + " - " + .id.time' | head -10

Example 4: Audit Admin Changes

#!/bin/bash
# audit-admin-changes.sh
# Audit administrative changes

set -euo pipefail

PROJECT_ID="your-project-id"
DAYS_BACK=30
OUTPUT_FILE="admin_audit_$(date +%Y%m%d).txt"

# Calculate start time
START_TIME=$(date -u -d "$DAYS_BACK days ago" '+%Y-%m-%dT%H:%M:%S.000Z')

# Get access token
ACCESS_TOKEN=$(gcloud auth application-default print-access-token)

echo "Auditing admin changes from the last $DAYS_BACK days..." | tee "$OUTPUT_FILE"
echo "Generated: $(date)" | tee -a "$OUTPUT_FILE"
echo "======================================" | tee -a "$OUTPUT_FILE"

# Fetch admin activities
ACTIVITIES=$(curl -s "https://admin.googleapis.com/admin/reports/v1/activity/users/all/applications/admin?startTime=$START_TIME&maxResults=1000" \
  -H "Authorization: Bearer ${ACCESS_TOKEN}" \
  -H "Content-Type: application/json")

# User creation events
echo "" | tee -a "$OUTPUT_FILE"
echo "User Creation Events:" | tee -a "$OUTPUT_FILE"
echo "$ACTIVITIES" | jq -r '.items[] | 
  select(.events[].name == "CREATE_USER") | 
  .id.time[:10] + " - " + .actor.email + " created user" ' | tee -a "$OUTPUT_FILE"

# User deletion events
echo "" | tee -a "$OUTPUT_FILE"
echo "User Deletion Events:" | tee -a "$OUTPUT_FILE"
echo "$ACTIVITIES" | jq -r '.items[] | 
  select(.events[].name == "DELETE_USER") | 
  .id.time[:10] + " - " + .actor.email + " deleted user"' | tee -a "$OUTPUT_FILE"

# Admin role changes
echo "" | tee -a "$OUTPUT_FILE"
echo "Admin Role Changes:" | tee -a "$OUTPUT_FILE"
echo "$ACTIVITIES" | jq -r '.items[] | 
  select(.events[].name | contains("ADMIN")) | 
  .id.time[:10] + " - " + .actor.email + " - " + .events[].name' | tee -a "$OUTPUT_FILE"

echo "" | tee -a "$OUTPUT_FILE"
echo "Audit report saved to: $OUTPUT_FILE"

Example 5: Organizational Unit Report

#!/bin/bash
# org-units-report.sh
# Generate organizational units report

set -euo pipefail

PROJECT_ID="your-project-id"

# Get access token
ACCESS_TOKEN=$(gcloud auth application-default print-access-token)

echo "Fetching organizational units..."

# Fetch org units
ORG_UNITS=$(curl -s "https://admin.googleapis.com/admin/directory/v1/customer/my_customer/orgunits" \
  -H "Authorization: Bearer ${ACCESS_TOKEN}" \
  -H "Content-Type: application/json")

echo "Organizational Unit Structure:"
echo "=============================="

# Display in tree format
echo "$ORG_UNITS" | jq -r '.organizationUnits[] | 
  "Path: " + .orgUnitPath + 
  "\n  Name: " + .name + 
  "\n  Description: " + (.description // "N/A") + 
  "\n  Parent: " + (.parentOrgUnitPath // "/") + 
  "\n"'

# Count users per org unit
echo ""
echo "User Counts by Org Unit:"
echo "========================"

for ORG_PATH in $(echo "$ORG_UNITS" | jq -r '.organizationUnits[].orgUnitPath'); do
  # URL encode the org path
  ENCODED_PATH=$(echo "$ORG_PATH" | jq -sRr @uri)
  
  USER_COUNT=$(curl -s "https://admin.googleapis.com/admin/directory/v1/users?customer=my_customer&query=orgUnitPath=$ENCODED_PATH&maxResults=1" \
    -H "Authorization: Bearer ${ACCESS_TOKEN}" | jq -r '.users | length')
  
  echo "  $ORG_PATH: $USER_COUNT users"
done

Example 6: License Usage Report

#!/bin/bash
# license-usage-report.sh
# Report on license usage across the organization

set -euo pipefail

PROJECT_ID="your-project-id"
OUTPUT_FILE="license_usage_$(date +%Y%m%d).csv"

# Get access token
ACCESS_TOKEN=$(gcloud auth application-default print-access-token)

echo "Fetching license information..."

# Common Google Workspace SKUs
declare -a SKUS=(
  "Google-Apps-For-Business"
  "Google-Apps-Unlimited"
  "Google-Apps-For-Postini"
  "Google-Vault"
  "Google-Vault-Former-Employee"
)

echo "SKU,Total Licenses,Assigned,Available" > "$OUTPUT_FILE"

for SKU in "${SKUS[@]}"; do
  LICENSE_DATA=$(curl -s "https://reseller.googleapis.com/apps/reseller/v1/customers/my_customer/subscriptions/$SKU" \
    -H "Authorization: Bearer ${ACCESS_TOKEN}" 2>/dev/null || echo "{}")
  
  if [ "$(echo "$LICENSE_DATA" | jq -r .kind 2>/dev/null)" != "null" ]; then
    TOTAL=$(echo "$LICENSE_DATA" | jq -r '.plan.maximumNumberOfSeats // "N/A"')
    ASSIGNED=$(echo "$LICENSE_DATA" | jq -r '.seats.numberOfSeats // 0')
    AVAILABLE=$((TOTAL - ASSIGNED))
    
    echo "$SKU,$TOTAL,$ASSIGNED,$AVAILABLE" >> "$OUTPUT_FILE"
    echo "  $SKU: $ASSIGNED / $TOTAL licenses used"
  fi
done

echo ""
echo "License report saved to: $OUTPUT_FILE"

Pros and Cons of gcloud CLI

Pros:

  • ✅ Great for ad-hoc queries and exploration
  • ✅ Easy to integrate into shell scripts
  • ✅ No programming language required
  • ✅ Perfect for CI/CD pipelines
  • ✅ Combines well with standard Unix tools (jq, awk, sed)
  • ✅ Quick authentication setup
  • ✅ Easy to schedule with cron

Cons:

  • ❌ Limited direct Google Workspace commands (mostly need API calls)
  • ❌ Less type safety and validation
  • ❌ Harder to handle complex data transformations
  • ❌ Error handling can be challenging in bash
  • ❌ Requires jq or similar tools for JSON processing
  • ❌ String parsing can be fragile
  • ❌ Less portable across different shells

Comparison Summary

FeaturePython SDKREST APIgcloud CLI
Ease of Use⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Flexibility⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Performance⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Error Handling⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Type Safety⭐⭐⭐⭐⭐⭐⭐
Documentation⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Setup Complexity⭐⭐⭐⭐⭐⭐⭐⭐⭐
Scripting⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Dependencies⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Latest Features⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐

When to Use Each Approach

Use Python SDK when:

  • Building production applications
  • Need type safety and IDE support
  • Want automatic retry and pagination
  • Developing complex data processing workflows
  • Team has Python expertise
  • Need long-term maintainability

Use REST API when:

  • SDK doesn’t support required features
  • Working in non-Python environment
  • Need minimal dependencies
  • Require maximum performance
  • Building microservices or serverless functions
  • Need precise control over requests

Use gcloud CLI when:

  • Running one-off queries or reports
  • Building shell scripts
  • Integrating with CI/CD pipelines
  • Need quick prototyping
  • Working in bash/shell environment
  • Combining with other command-line tools

Authentication Comparison

Service Account (Best for Automation)

Setup Complexity: Medium Use Case: Unattended scripts, scheduled jobs, automation Security: High (credential file must be protected)

Pros:

  • No user interaction required
  • Can impersonate users with domain-wide delegation
  • Ideal for server-to-server communication
  • Long-lived credentials

Cons:

  • Requires admin setup in Google Workspace Console
  • More complex initial configuration
  • Credential file security is critical

OAuth 2.0 (Best for User Applications)

Setup Complexity: Medium-High Use Case: Desktop applications, user-facing tools Security: High (user grants specific permissions)

Pros:

  • User controls access
  • Granular permission scopes
  • Can revoke access easily
  • Better audit trail (shows user identity)

Cons:

  • Requires user interaction
  • Token refresh management needed
  • Not suitable for automation
  • Consent screen configuration required

API Keys (Limited Use)

Setup Complexity: Low Use Case: Public data only Security: Low (easily exposed)

Pros:

  • Simple to implement
  • No OAuth flow required

Cons:

  • Very limited functionality
  • Not suitable for Google Workspace Admin data
  • Security concerns if exposed

Best Practices

1. Security

# Store credentials securely
import os
from pathlib import Path

# Use environment variables
SERVICE_ACCOUNT_KEY = os.environ.get('GOOGLE_SERVICE_ACCOUNT_KEY')

# Or use secret management
from google.cloud import secretmanager
client = secretmanager.SecretManagerServiceClient()
secret_name = "projects/PROJECT_ID/secrets/SERVICE_ACCOUNT_KEY/versions/latest"
response = client.access_secret_version(request={"name": secret_name})
credentials_json = response.payload.data.decode('UTF-8')

# Never commit credentials to version control
# Add to .gitignore:
# *.json
# credentials.json
# service-account-key.json
# token.pickle

2. Rate Limiting and Quotas

import time
from google.api_core import retry

# Implement exponential backoff
@retry.Retry(
    initial=1.0,
    maximum=60.0,
    multiplier=2.0,
    deadline=300.0
)
def api_call_with_retry():
    # Your API call here
    pass

# Manual rate limiting
def batch_process_with_delay(items, delay=1.0):
    """Process items with delay to avoid rate limits"""
    for i, item in enumerate(items):
        if i > 0 and i % 10 == 0:
            print(f"Processed {i} items, pausing...")
            time.sleep(delay)
        
        process_item(item)

3. Error Handling

from googleapiclient.errors import HttpError
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def robust_api_call(service, user_email):
    """API call with comprehensive error handling"""
    try:
        result = service.users().get(userKey=user_email).execute()
        return result
    
    except HttpError as e:
        if e.resp.status == 404:
            logger.warning(f"User not found: {user_email}")
        elif e.resp.status == 403:
            logger.error(f"Permission denied for user: {user_email}")
        elif e.resp.status == 429:
            logger.warning("Rate limit hit, backing off...")
            time.sleep(60)
            return robust_api_call(service, user_email)  # Retry
        else:
            logger.error(f"Unexpected error: {e}")
        return None
    
    except Exception as e:
        logger.error(f"Unexpected exception: {e}")
        return None

4. Pagination

def get_all_paginated_results(service, request):
    """Generic pagination handler"""
    all_items = []
    
    while request is not None:
        try:
            response = request.execute()
            items = response.get('users', response.get('items', []))
            all_items.extend(items)
            
            # Get next page request
            request = service.users().list_next(request, response)
        
        except Exception as e:
            logger.error(f"Error during pagination: {e}")
            break
    
    return all_items

5. Logging and Auditing

import logging
from datetime import datetime

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'workspace_report_{datetime.now():%Y%m%d}.log'),
        logging.StreamHandler()
    ]
)

def audited_operation(operation_name):
    """Decorator to audit operations"""
    def decorator(func):
        def wrapper(*args, **kwargs):
            logger.info(f"Starting {operation_name}")
            start_time = time.time()
            
            try:
                result = func(*args, **kwargs)
                duration = time.time() - start_time
                logger.info(f"Completed {operation_name} in {duration:.2f}s")
                return result
            
            except Exception as e:
                logger.error(f"Failed {operation_name}: {e}")
                raise
        
        return wrapper
    return decorator

@audited_operation("user_report_generation")
def generate_user_report(service):
    # Implementation
    pass

Complete Production Example

Here’s a production-ready script combining best practices:

#!/usr/bin/env python3
"""
Google Workspace Reporting Tool
Comprehensive reporting for Google Workspace users, logins, and configuration.
"""

import os
import sys
import logging
import argparse
import csv
from datetime import datetime, timedelta
from typing import List, Dict, Optional

from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import time

# Configuration
SCOPES = [
    'https://www.googleapis.com/auth/admin.directory.user.readonly',
    'https://www.googleapis.com/auth/admin.reports.audit.readonly',
    'https://www.googleapis.com/auth/admin.reports.usage.readonly'
]

# Logging setup
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'workspace_report_{datetime.now():%Y%m%d_%H%M%S}.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


class GoogleWorkspaceReporter:
    """Google Workspace reporting and analytics"""
    
    def __init__(self, service_account_file: str, delegated_user: str):
        """Initialize with service account credentials"""
        self.service_account_file = service_account_file
        self.delegated_user = delegated_user
        self.admin_service = None
        self.reports_service = None
        
        self._authenticate()
    
    def _authenticate(self):
        """Authenticate and build service objects"""
        try:
            credentials = service_account.Credentials.from_service_account_file(
                self.service_account_file,
                scopes=SCOPES
            )
            
            delegated_credentials = credentials.with_subject(self.delegated_user)
            
            self.admin_service = build('admin', 'directory_v1', credentials=delegated_credentials)
            self.reports_service = build('admin', 'reports_v1', credentials=delegated_credentials)
            
            logger.info("Authentication successful")
        
        except Exception as e:
            logger.error(f"Authentication failed: {e}")
            sys.exit(1)
    
    def get_all_users(self) -> List[Dict]:
        """Retrieve all users with pagination"""
        logger.info("Fetching all users...")
        users = []
        page_token = None
        
        try:
            while True:
                response = self.admin_service.users().list(
                    customer='my_customer',
                    maxResults=500,
                    orderBy='email',
                    pageToken=page_token
                ).execute()
                
                users.extend(response.get('users', []))
                page_token = response.get('nextPageToken')
                
                if not page_token:
                    break
                
                time.sleep(0.5)  # Rate limiting
            
            logger.info(f"Retrieved {len(users)} users")
            return users
        
        except HttpError as e:
            logger.error(f"Error fetching users: {e}")
            return []
    
    def generate_user_report(self, output_file: str = 'user_report.csv'):
        """Generate comprehensive user report"""
        logger.info(f"Generating user report: {output_file}")
        
        users = self.get_all_users()
        
        with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
            fieldnames = [
                'Email', 'Full Name', 'Status', 'Admin', 'Suspended',
                '2FA Enrolled', 'Created Date', 'Last Login', 'Org Unit',
                'Aliases'
            ]
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()
            
            for user in users:
                writer.writerow({
                    'Email': user.get('primaryEmail', ''),
                    'Full Name': user.get('name', {}).get('fullName', ''),
                    'Status': 'Active' if not user.get('suspended') else 'Suspended',
                    'Admin': 'Yes' if user.get('isAdmin') else 'No',
                    'Suspended': 'Yes' if user.get('suspended') else 'No',
                    '2FA Enrolled': 'Yes' if user.get('isEnrolledIn2Sv') else 'No',
                    'Created Date': user.get('creationTime', '')[:10],
                    'Last Login': user.get('lastLoginTime', 'Never')[:10] if user.get('lastLoginTime') else 'Never',
                    'Org Unit': user.get('orgUnitPath', '/'),
                    'Aliases': ', '.join(user.get('aliases', []))
                })
        
        logger.info(f"User report generated: {output_file}")
    
    def get_login_activities(self, days: int = 7) -> List[Dict]:
        """Get login activities for the past N days"""
        logger.info(f"Fetching login activities for the past {days} days...")
        
        start_time = (datetime.now() - timedelta(days=days)).isoformat() + 'Z'
        activities = []
        page_token = None
        
        try:
            while True:
                response = self.reports_service.activities().list(
                    userKey='all',
                    applicationName='login',
                    maxResults=1000,
                    startTime=start_time,
                    pageToken=page_token
                ).execute()
                
                activities.extend(response.get('items', []))
                page_token = response.get('nextPageToken')
                
                if not page_token:
                    break
                
                time.sleep(0.5)
            
            logger.info(f"Retrieved {len(activities)} login activities")
            return activities
        
        except HttpError as e:
            logger.error(f"Error fetching login activities: {e}")
            return []
    
    def generate_login_report(self, output_file: str = 'login_report.txt', days: int = 7):
        """Generate login activity report"""
        logger.info(f"Generating login report: {output_file}")
        
        activities = self.get_login_activities(days)
        
        # Analyze activities
        successful = failed = suspicious = 0
        failed_logins = []
        
        for activity in activities:
            for event in activity.get('events', []):
                event_name = event.get('name', '')
                
                if event_name == 'login_success':
                    successful += 1
                elif event_name == 'login_failure':
                    failed += 1
                    failed_logins.append(activity)
                elif 'suspicious' in event_name.lower():
                    suspicious += 1
        
        # Write report
        with open(output_file, 'w') as f:
            f.write(f"Login Activity Report\n")
            f.write(f"Period: Last {days} days\n")
            f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write(f"{'=' * 60}\n\n")
            
            f.write(f"Summary:\n")
            f.write(f"  Successful logins: {successful}\n")
            f.write(f"  Failed logins: {failed}\n")
            f.write(f"  Suspicious activities: {suspicious}\n\n")
            
            if failed_logins:
                f.write(f"Recent Failed Login Attempts:\n")
                for activity in failed_logins[:20]:
                    actor = activity.get('actor', {}).get('email', 'Unknown')
                    time = activity.get('id', {}).get('time', 'Unknown')
                    f.write(f"  {time}: {actor}\n")
        
        logger.info(f"Login report generated: {output_file}")


def main():
    """Main entry point"""
    parser = argparse.ArgumentParser(
        description='Google Workspace Reporting Tool'
    )
    parser.add_argument(
        '--service-account',
        required=True,
        help='Path to service account JSON key file'
    )
    parser.add_argument(
        '--delegated-user',
        required=True,
        help='Admin user email to impersonate'
    )
    parser.add_argument(
        '--report-type',
        choices=['users', 'logins', 'all'],
        default='all',
        help='Type of report to generate'
    )
    parser.add_argument(
        '--output-dir',
        default='./reports',
        help='Output directory for reports'
    )
    parser.add_argument(
        '--days',
        type=int,
        default=7,
        help='Number of days for activity reports'
    )
    
    args = parser.parse_args()
    
    # Create output directory
    os.makedirs(args.output_dir, exist_ok=True)
    
    # Initialize reporter
    reporter = GoogleWorkspaceReporter(
        args.service_account,
        args.delegated_user
    )
    
    # Generate reports
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    
    if args.report_type in ['users', 'all']:
        user_report = os.path.join(args.output_dir, f'users_{timestamp}.csv')
        reporter.generate_user_report(user_report)
    
    if args.report_type in ['logins', 'all']:
        login_report = os.path.join(args.output_dir, f'logins_{timestamp}.txt')
        reporter.generate_login_report(login_report, args.days)
    
    logger.info("All reports generated successfully")


if __name__ == '__main__':
    main()

Usage:

python workspace_reporter.py \
    --service-account /path/to/service-account-key.json \
    --delegated-user admin@example.com \
    --report-type all \
    --output-dir ./reports \
    --days 30

Conclusion

Google Workspace provides powerful APIs and tools for extracting configuration and analytics data. Each method—Python SDK, REST API, and gcloud CLI—has its strengths:

  • Python SDK excels in production applications with its robust error handling and type safety
  • REST API provides maximum flexibility and access to latest features
  • gcloud CLI shines in scripting and quick one-off queries

For most production use cases, the Python SDK is recommended for its balance of ease-of-use and capabilities. Use REST API when you need features not yet available in the SDK or when working outside Python. Choose gcloud CLI for shell scripts, CI/CD integration, and quick exploratory queries.

Key Takeaways

  1. Authentication: Service Accounts with domain-wide delegation are best for automation
  2. Security: Always protect credentials and use secret management systems
  3. Rate Limiting: Implement backoff strategies and respect API quotas
  4. Error Handling: Build robust error handling for production reliability
  5. Pagination: Always handle pagination for complete data retrieval
  6. Logging: Maintain comprehensive logs for auditing and troubleshooting

Resources