PandasAI: Complete Guide to Setup, Operations, Kubernetes Deployment, and Access Control
READER BEWARE: THE FOLLOWING WRITTEN ENTIRELY BY AI WITHOUT HUMAN EDITING.
Introduction
PandasAI is an open-source Python library that enables natural language interaction with your data. It bridges the gap between non-technical users and data analysis by allowing questions to be asked in plain English (or any natural language), which are then translated into executable Python code to query and analyze data.
Whether you’re a data analyst looking to speed up exploratory analysis, or an organization building self-service analytics tools, PandasAI offers a compelling approach to democratizing data access. This comprehensive guide covers everything from initial setup to production deployment on Kubernetes, including cost considerations and access control strategies.
What is PandasAI?
PandasAI is built on top of the popular pandas library and integrates with various Large Language Models (LLMs) to provide:
- Natural Language Querying: Ask questions about your data in plain English
- Automatic Chart Generation: Generate visualizations by describing what you want to see
- Multi-DataFrame Analysis: Work with multiple datasets and ask questions that span across them
- Extensible Architecture: Support for various LLMs, data connectors, and custom skills
- Security Features: Docker-based sandboxed execution for production environments
Key Features
| Feature | Description |
|---|---|
| Natural Language Queries | Translate English questions into pandas operations |
| Chart Generation | Create matplotlib/plotly visualizations via natural language |
| Multi-Source Support | CSV, Excel, SQL databases, Snowflake, BigQuery, etc. |
| LLM Flexibility | OpenAI, Azure OpenAI, Anthropic, Google, and 100+ more via LiteLLM |
| Sandboxed Execution | Docker-based isolated execution for security |
| Conversation Memory | Maintain context across multiple questions |
| Custom Skills | Extend functionality with custom Python functions |
Setup and Installation
Prerequisites
- Python version 3.8 to 3.11 (Python 3.12+ not yet supported)
- pip or poetry package manager
- An LLM API key (OpenAI, Azure OpenAI, Anthropic, etc.)
- Docker (optional, for sandboxed execution)
Basic Installation
Install the core PandasAI library and an LLM extension:
# Install core package
pip install pandasai
# Install LiteLLM extension (recommended - supports 100+ LLM providers)
pip install pandasai-litellm
# Or install specific provider extensions
pip install pandasai-openai
Using poetry:
poetry add pandasai
poetry add pandasai-litellm
Environment Configuration
Set up your environment variables for API keys:
# For OpenAI
export OPENAI_API_KEY="sk-your-openai-api-key"
# For Azure OpenAI
export AZURE_OPENAI_API_KEY="your-azure-key"
export AZURE_OPENAI_ENDPOINT="https://your-endpoint.openai.azure.com/"
# For Anthropic
export ANTHROPIC_API_KEY="your-anthropic-key"
Quick Start Example
import pandasai as pai
from pandasai_litellm.litellm import LiteLLM
# Initialize LLM with your model of choice
llm = LiteLLM(model="gpt-4.1-mini", api_key="YOUR_OPENAI_API_KEY")
# Configure PandasAI to use this LLM
pai.config.set({
"llm": llm
})
# Load data
df = pai.read_csv("data/sales.csv")
# Ask questions in natural language
response = df.chat("What is the average revenue by region?")
print(response)
# Generate visualizations
df.chat("Plot a bar chart showing sales by product category")
Multiple DataFrame Analysis
PandasAI can analyze relationships across multiple datasets:
import pandasai as pai
from pandasai_litellm.litellm import LiteLLM
llm = LiteLLM(model="gpt-4.1-mini", api_key="YOUR_OPENAI_API_KEY")
pai.config.set({"llm": llm})
# Create multiple DataFrames
employees_df = pai.DataFrame({
'EmployeeID': [1, 2, 3, 4, 5],
'Name': ['John', 'Emma', 'Liam', 'Olivia', 'William'],
'Department': ['HR', 'Sales', 'IT', 'Marketing', 'Finance']
})
salaries_df = pai.DataFrame({
'EmployeeID': [1, 2, 3, 4, 5],
'Salary': [5000, 6000, 4500, 7000, 5500]
})
# Query across DataFrames
result = pai.chat("Who gets paid the most?", employees_df, salaries_df)
print(result) # Output: Olivia gets paid the most.
User Experience
How It Works
- User Input: You ask a question in natural language
- LLM Processing: The question is sent to the configured LLM along with data schema information
- Code Generation: The LLM generates Python/pandas code to answer the question
- Execution: The code is executed against your data
- Response: Results are returned, which can be text, numbers, DataFrames, or charts
Common Query Patterns
# Aggregations
df.chat("What is the total sales by region?")
# Filtering
df.chat("Show me all customers from California with purchases over $1000")
# Sorting and ranking
df.chat("List the top 10 products by revenue")
# Time-based analysis
df.chat("What is the month-over-month growth rate?")
# Statistical analysis
df.chat("What is the correlation between advertising spend and sales?")
# Visualizations
df.chat("Create a pie chart showing market share by company")
Conversation Context
PandasAI maintains conversation context, allowing follow-up questions:
df.chat("What are the top 5 selling products?")
# Output: Lists top 5 products
df.chat("Show me the sales trend for the first one")
# Automatically references the first product from the previous answer
Output Types
PandasAI can return various output types:
# Text response
response = df.chat("What is the average order value?")
# Returns: "The average order value is $156.42"
# DataFrame response
response = df.chat("Show me the top 10 customers by spending")
# Returns: pandas DataFrame
# Chart response
response = df.chat("Plot monthly revenue trends")
# Returns: Chart object
response.show() # Display the chart
Operational Management
Configuration Options
PandasAI provides several configuration options for operational control:
import pandasai as pai
pai.config.set({
"llm": llm,
"temperature": 0, # Set to 0 for deterministic outputs
"seed": 26, # For reproducible results
"verbose": True, # Enable detailed logging
"save_logs": True, # Save conversation logs
"enable_cache": True, # Cache responses for repeated queries
"max_retries": 3, # Number of retries on failure
"custom_prompts": {}, # Custom prompt templates
})
Logging and Monitoring
Enable comprehensive logging for debugging and monitoring:
import logging
import pandasai as pai
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('pandasai.log'),
logging.StreamHandler()
]
)
# Enable verbose mode in PandasAI
pai.config.set({"verbose": True})
Caching Strategies
PandasAI supports caching to reduce API calls and improve response times:
# Enable built-in caching
pai.config.set({"enable_cache": True})
# For production, consider implementing custom caching
import hashlib
import redis
class RedisCache:
def __init__(self, host='localhost', port=6379):
self.client = redis.Redis(host=host, port=port)
self.ttl = 3600 # 1 hour cache
def get(self, query, df_hash):
key = hashlib.md5(f"{query}:{df_hash}".encode()).hexdigest()
return self.client.get(key)
def set(self, query, df_hash, response):
key = hashlib.md5(f"{query}:{df_hash}".encode()).hexdigest()
self.client.setex(key, self.ttl, response)
Error Handling
Implement robust error handling for production:
import pandasai as pai
from pandasai.exceptions import (
PandasAIError,
LLMNotFoundError,
InvalidConfigError
)
def safe_chat(df, query):
"""Safely execute a PandasAI query with error handling."""
try:
response = df.chat(query)
return {"success": True, "response": response}
except LLMNotFoundError as e:
return {"success": False, "error": "LLM not configured", "details": str(e)}
except InvalidConfigError as e:
return {"success": False, "error": "Configuration error", "details": str(e)}
except PandasAIError as e:
return {"success": False, "error": "Query execution failed", "details": str(e)}
except Exception as e:
return {"success": False, "error": "Unexpected error", "details": str(e)}
# Usage
result = safe_chat(df, "What is the total revenue?")
if result["success"]:
print(result["response"])
else:
print(f"Error: {result['error']} - {result['details']}")
Health Checks
Implement health checks for your PandasAI service:
from flask import Flask, jsonify
import pandasai as pai
app = Flask(__name__)
@app.route('/health')
def health_check():
"""Basic health check endpoint."""
return jsonify({"status": "healthy"})
@app.route('/health/llm')
def llm_health_check():
"""Check LLM connectivity."""
try:
# Test a simple query
test_df = pai.DataFrame({"x": [1, 2, 3]})
result = test_df.chat("What is the sum of x?")
return jsonify({"status": "healthy", "llm": "connected"})
except Exception as e:
return jsonify({"status": "unhealthy", "error": str(e)}), 503
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
Cost of Usage
Understanding the cost structure is crucial for production deployments. The primary cost driver is LLM API usage.
LLM API Costs
| Model | Provider | Input (per 1K tokens) | Output (per 1K tokens) | Notes |
|---|---|---|---|---|
| GPT-4 Turbo | OpenAI | $0.01 | $0.03 | Best accuracy, higher cost |
| GPT-4o | OpenAI | $0.005 | $0.015 | Good balance |
| GPT-4o-mini | OpenAI | $0.00015 | $0.0006 | Budget option |
| GPT-3.5 Turbo | OpenAI | $0.0005 | $0.0015 | Legacy, cost-effective |
| Claude 3.5 Sonnet | Anthropic | $0.003 | $0.015 | Strong reasoning |
| Claude 3 Haiku | Anthropic | $0.00025 | $0.00125 | Budget option |
| Gemini Pro | $0.00025 | $0.0005 | Competitive pricing |
Estimated Monthly Costs by Use Case
Small Team Analytics (50 queries/day):
- Using GPT-4o-mini: ~$5-15/month
- Using GPT-4o: ~$50-100/month
Medium Business Intelligence (500 queries/day):
- Using GPT-4o-mini: ~$30-75/month
- Using GPT-4o: ~$250-500/month
Enterprise Analytics (5,000+ queries/day):
- Using GPT-4o-mini: ~$200-500/month
- Using GPT-4o: ~$1,500-3,000/month
Cost Optimization Strategies
- Model Selection: Use cheaper models (GPT-4o-mini, Claude Haiku) for simple queries
def select_model_by_complexity(query):
"""Select appropriate model based on query complexity."""
simple_keywords = ['count', 'sum', 'average', 'total', 'list']
if any(kw in query.lower() for kw in simple_keywords):
return "gpt-4o-mini" # Use cheaper model for simple queries
else:
return "gpt-4o" # Use more capable model for complex queries
Implement Caching: Cache responses for repeated or similar queries
Batch Processing: Combine related questions when possible
Rate Limiting: Implement per-user rate limits
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["100 per day", "10 per minute"]
)
@app.route('/query')
@limiter.limit("10 per minute")
def query_endpoint():
# Handle query
pass
- Token Optimization: Keep prompts concise and use schema descriptions efficiently
Kubernetes Deployment Options
PandasAI can be deployed to Kubernetes in several configurations, from simple single-pod deployments to complex multi-tenant architectures.
Option 1: Simple Deployment
A basic Kubernetes deployment for small-scale usage:
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: pandasai-service
labels:
app: pandasai
spec:
replicas: 2
selector:
matchLabels:
app: pandasai
template:
metadata:
labels:
app: pandasai
spec:
containers:
- name: pandasai
image: your-registry/pandasai-service:latest
ports:
- containerPort: 8080
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: pandasai-secrets
key: openai-api-key
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "2Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: pandasai-service
spec:
selector:
app: pandasai
ports:
- port: 80
targetPort: 8080
type: ClusterIP
---
apiVersion: v1
kind: Secret
metadata:
name: pandasai-secrets
type: Opaque
data:
openai-api-key: <base64-encoded-api-key>
Option 2: Deployment with Docker Sandbox
For production environments requiring secure code execution:
# deployment-with-sandbox.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: pandasai-secure
spec:
replicas: 3
selector:
matchLabels:
app: pandasai-secure
template:
metadata:
labels:
app: pandasai-secure
spec:
containers:
- name: pandasai
image: your-registry/pandasai-service:latest
ports:
- containerPort: 8080
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: pandasai-secrets
key: openai-api-key
- name: SANDBOX_ENABLED
value: "true"
- name: DOCKER_HOST
value: "tcp://localhost:2376"
volumeMounts:
- name: docker-socket
mountPath: /var/run/docker.sock
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "4Gi"
cpu: "2000m"
- name: docker-dind
image: docker:24.0.7-dind
securityContext:
privileged: true
volumeMounts:
- name: docker-storage
mountPath: /var/lib/docker
volumes:
- name: docker-socket
emptyDir: {}
- name: docker-storage
emptyDir: {}
Option 3: Helm Chart Deployment
Create a reusable Helm chart for PandasAI:
# Chart.yaml
apiVersion: v2
name: pandasai
description: A Helm chart for PandasAI deployment
version: 1.0.0
appVersion: "3.0.0"
# values.yaml
replicaCount: 2
image:
repository: your-registry/pandasai-service
tag: latest
pullPolicy: IfNotPresent
service:
type: ClusterIP
port: 80
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "2Gi"
cpu: "1000m"
autoscaling:
enabled: true
minReplicas: 2
maxReplicas: 10
targetCPUUtilizationPercentage: 70
llm:
provider: openai
model: gpt-4o-mini
sandbox:
enabled: false
ingress:
enabled: true
className: nginx
hosts:
- host: pandasai.example.com
paths:
- path: /
pathType: Prefix
secrets:
openaiApiKey: ""
anthropicApiKey: ""
# templates/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "pandasai.fullname" . }}
labels:
{{- include "pandasai.labels" . | nindent 4 }}
spec:
{{- if not .Values.autoscaling.enabled }}
replicas: {{ .Values.replicaCount }}
{{- end }}
selector:
matchLabels:
{{- include "pandasai.selectorLabels" . | nindent 6 }}
template:
metadata:
labels:
{{- include "pandasai.selectorLabels" . | nindent 8 }}
spec:
containers:
- name: {{ .Chart.Name }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
containerPort: 8080
protocol: TCP
env:
- name: LLM_PROVIDER
value: {{ .Values.llm.provider }}
- name: LLM_MODEL
value: {{ .Values.llm.model }}
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: {{ include "pandasai.fullname" . }}-secrets
key: openai-api-key
- name: SANDBOX_ENABLED
value: "{{ .Values.sandbox.enabled }}"
resources:
{{- toYaml .Values.resources | nindent 12 }}
livenessProbe:
httpGet:
path: /health
port: http
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: http
initialDelaySeconds: 5
periodSeconds: 10
Deploy with Helm:
# Install the chart with default model (gpt-4o-mini)
helm install pandasai ./pandasai-chart \
--set secrets.openaiApiKey=$OPENAI_API_KEY \
--set replicaCount=3
# Or override the model for more capable responses
helm install pandasai ./pandasai-chart \
--set secrets.openaiApiKey=$OPENAI_API_KEY \
--set llm.model=gpt-4o \
--set replicaCount=3
# Upgrade deployment
helm upgrade pandasai ./pandasai-chart \
--set autoscaling.maxReplicas=15
Option 4: Serverless with Knative
For event-driven, scale-to-zero deployments:
# knative-service.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: pandasai-serverless
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/minScale: "0"
autoscaling.knative.dev/maxScale: "10"
autoscaling.knative.dev/target: "50"
spec:
containers:
- image: your-registry/pandasai-service:latest
ports:
- containerPort: 8080
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: pandasai-secrets
key: openai-api-key
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "2Gi"
cpu: "1000m"
Horizontal Pod Autoscaling
Configure autoscaling based on CPU or custom metrics:
# hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: pandasai-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: pandasai-service
minReplicas: 2
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 60
Access Control: Authentication, Authorization, and Data Access
Implementing proper access control is critical for production PandasAI deployments, especially when handling sensitive data.
Authentication
1. API Key Authentication
Simple authentication using API keys:
from flask import Flask, request, jsonify
from functools import wraps
app = Flask(__name__)
# Store API keys securely (use a database in production)
VALID_API_KEYS = {
"key-12345": {"user": "analyst1", "tier": "standard"},
"key-67890": {"user": "admin", "tier": "enterprise"},
}
def require_api_key(f):
@wraps(f)
def decorated_function(*args, **kwargs):
api_key = request.headers.get('X-API-Key')
if not api_key or api_key not in VALID_API_KEYS:
return jsonify({"error": "Invalid or missing API key"}), 401
request.user_context = VALID_API_KEYS[api_key]
return f(*args, **kwargs)
return decorated_function
@app.route('/query', methods=['POST'])
@require_api_key
def query_endpoint():
user = request.user_context['user']
query = request.json.get('query')
# Process query
return jsonify({"result": "..."})
2. OAuth 2.0 / OIDC Integration
For enterprise environments, integrate with identity providers:
from flask import Flask, request, jsonify
from flask_oidc import OpenIDConnect
from functools import wraps
app = Flask(__name__)
app.config.update({
'SECRET_KEY': 'secret',
'OIDC_CLIENT_SECRETS': 'client_secrets.json',
'OIDC_ID_TOKEN_COOKIE_SECURE': True,
'OIDC_SCOPES': ['openid', 'email', 'profile'],
})
oidc = OpenIDConnect(app)
@app.route('/query', methods=['POST'])
@oidc.require_login
def query_endpoint():
user_info = oidc.user_getinfo(['email', 'sub'])
# Process query with user context
return jsonify({"result": "..."})
// client_secrets.json
{
"web": {
"client_id": "your-client-id",
"client_secret": "your-client-secret",
"auth_uri": "https://your-idp.com/oauth2/authorize",
"token_uri": "https://your-idp.com/oauth2/token",
"userinfo_uri": "https://your-idp.com/oauth2/userinfo",
"issuer": "https://your-idp.com"
}
}
3. JWT Token Authentication
Stateless authentication using JWT tokens:
from flask import Flask, request, jsonify
from functools import wraps
import jwt
from datetime import datetime, timedelta
app = Flask(__name__)
JWT_SECRET = "your-jwt-secret-key"
JWT_ALGORITHM = "HS256"
def create_token(user_id, roles):
"""Create a JWT token."""
payload = {
"sub": user_id,
"roles": roles,
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(hours=8)
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def require_jwt(f):
@wraps(f)
def decorated_function(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({"error": "Missing or invalid token"}), 401
token = auth_header.split(' ')[1]
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
request.user_context = payload
except jwt.ExpiredSignatureError:
return jsonify({"error": "Token expired"}), 401
except jwt.InvalidTokenError:
return jsonify({"error": "Invalid token"}), 401
return f(*args, **kwargs)
return decorated_function
@app.route('/query', methods=['POST'])
@require_jwt
def query_endpoint():
user = request.user_context['sub']
roles = request.user_context['roles']
# Process query
return jsonify({"result": "..."})
Authorization
Role-Based Access Control (RBAC)
Implement RBAC to control what users can access:
from enum import Enum
from functools import wraps
from flask import request, jsonify
class Role(Enum):
VIEWER = "viewer"
ANALYST = "analyst"
ADMIN = "admin"
class Permission(Enum):
READ_DATA = "read_data"
QUERY_DATA = "query_data"
UPLOAD_DATA = "upload_data"
MANAGE_USERS = "manage_users"
VIEW_ALL_DATA = "view_all_data"
# Role-permission mapping
ROLE_PERMISSIONS = {
Role.VIEWER: [Permission.READ_DATA],
Role.ANALYST: [Permission.READ_DATA, Permission.QUERY_DATA, Permission.UPLOAD_DATA],
Role.ADMIN: [Permission.READ_DATA, Permission.QUERY_DATA, Permission.UPLOAD_DATA,
Permission.MANAGE_USERS, Permission.VIEW_ALL_DATA],
}
def require_permission(permission):
"""Decorator to require specific permission."""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
user_roles = request.user_context.get('roles', [])
user_permissions = set()
for role_name in user_roles:
role = Role(role_name)
user_permissions.update(ROLE_PERMISSIONS.get(role, []))
if permission not in user_permissions:
return jsonify({"error": "Insufficient permissions"}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
@app.route('/query', methods=['POST'])
@require_jwt
@require_permission(Permission.QUERY_DATA)
def query_endpoint():
# User has permission to query data
return jsonify({"result": "..."})
Attribute-Based Access Control (ABAC)
For more granular control based on attributes:
class DataAccessPolicy:
"""Define data access policies based on user attributes."""
def __init__(self):
self.policies = []
def add_policy(self, condition, allowed_datasets):
"""Add an access policy."""
self.policies.append({
"condition": condition,
"datasets": allowed_datasets
})
def get_allowed_datasets(self, user_context):
"""Get datasets a user can access based on their attributes."""
allowed = set()
for policy in self.policies:
if policy["condition"](user_context):
allowed.update(policy["datasets"])
return allowed
# Define policies
access_policy = DataAccessPolicy()
# Policy: Sales team can access sales data
access_policy.add_policy(
condition=lambda ctx: ctx.get('department') == 'sales',
allowed_datasets=['sales', 'customers', 'products']
)
# Policy: Finance team can access financial data
access_policy.add_policy(
condition=lambda ctx: ctx.get('department') == 'finance',
allowed_datasets=['revenue', 'expenses', 'budgets']
)
# Policy: Executives can access all data
access_policy.add_policy(
condition=lambda ctx: 'executive' in ctx.get('roles', []),
allowed_datasets=['sales', 'customers', 'products', 'revenue',
'expenses', 'budgets', 'hr', 'strategic']
)
@app.route('/query', methods=['POST'])
@require_jwt
def query_endpoint():
requested_dataset = request.json.get('dataset')
allowed_datasets = access_policy.get_allowed_datasets(request.user_context)
if requested_dataset not in allowed_datasets:
return jsonify({"error": f"Access denied to dataset: {requested_dataset}"}), 403
# Process query
return jsonify({"result": "..."})
Data Access Control
Row-Level Security
Implement row-level security to filter data based on user context:
import pandas as pd
import pandasai as pai
class SecureDataFrame:
"""Wrapper that applies row-level security to DataFrames."""
def __init__(self, df, security_column, user_context):
self.original_df = df
self.security_column = security_column
self.user_context = user_context
def get_filtered_df(self):
"""Return DataFrame filtered based on user's access rights."""
user_department = self.user_context.get('department')
user_roles = self.user_context.get('roles', [])
# Admins see all data
if 'admin' in user_roles:
return pai.DataFrame(self.original_df)
# Others see only their department's data
filtered = self.original_df[
self.original_df[self.security_column] == user_department
]
return pai.DataFrame(filtered)
# Usage example
sales_data = pd.read_csv('sales_all_regions.csv')
# User from 'west' region
user_context = {'department': 'west', 'roles': ['analyst']}
secure_df = SecureDataFrame(sales_data, 'region', user_context)
# Only see west region data
df = secure_df.get_filtered_df()
result = df.chat("What are total sales?") # Only returns west region totals
Column-Level Security
Hide sensitive columns based on user permissions:
class ColumnSecurityPolicy:
"""Define column-level access policies."""
def __init__(self):
self.sensitive_columns = {
'salary': ['hr', 'executive'],
'ssn': ['hr'],
'cost': ['finance', 'executive'],
'margin': ['finance', 'executive', 'sales_manager'],
}
def filter_columns(self, df, user_roles):
"""Remove columns user doesn't have access to."""
columns_to_remove = []
for column, allowed_roles in self.sensitive_columns.items():
if column in df.columns:
if not any(role in allowed_roles for role in user_roles):
columns_to_remove.append(column)
return df.drop(columns=columns_to_remove, errors='ignore')
# Usage
column_policy = ColumnSecurityPolicy()
@app.route('/query', methods=['POST'])
@require_jwt
def query_endpoint():
df = load_dataset(request.json.get('dataset'))
user_roles = request.user_context.get('roles', [])
# Filter columns based on user roles
secure_df = column_policy.filter_columns(df, user_roles)
# Process query with filtered data
result = pai.DataFrame(secure_df).chat(request.json.get('query'))
return jsonify({"result": str(result)})
Audit Logging
Implement comprehensive audit logging:
import logging
import json
from datetime import datetime
class AuditLogger:
"""Log all data access and queries for compliance."""
def __init__(self, log_file='audit.log'):
self.logger = logging.getLogger('audit')
self.logger.setLevel(logging.INFO)
handler = logging.FileHandler(log_file)
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(message)s'
))
self.logger.addHandler(handler)
def log_query(self, user_id, query, dataset, result_type, success):
"""Log a query execution."""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": "query",
"user_id": user_id,
"query": query,
"dataset": dataset,
"result_type": result_type,
"success": success
}
self.logger.info(json.dumps(entry))
def log_data_access(self, user_id, dataset, rows_accessed, columns_accessed):
"""Log data access details."""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": "data_access",
"user_id": user_id,
"dataset": dataset,
"rows_accessed": rows_accessed,
"columns_accessed": columns_accessed
}
self.logger.info(json.dumps(entry))
audit_logger = AuditLogger()
@app.route('/query', methods=['POST'])
@require_jwt
def query_endpoint():
user_id = request.user_context['sub']
query = request.json.get('query')
dataset = request.json.get('dataset')
try:
result = process_query(query, dataset)
audit_logger.log_query(user_id, query, dataset, type(result).__name__, True)
return jsonify({"result": str(result)})
except Exception as e:
audit_logger.log_query(user_id, query, dataset, "error", False)
raise
Enterprise Features
PandasAI offers enterprise-grade features for organizations requiring additional capabilities:
Enterprise Connectors (Requires Enterprise License)
| Connector | Extension Package | Description |
|---|---|---|
| Snowflake | pandasai-snowflake | Connect to Snowflake data warehouse |
| Databricks | pandasai-databricks | Connect to Databricks lakehouse |
| BigQuery | pandasai-bigquery | Connect to Google BigQuery |
| Oracle | pandasai-oracle | Connect to Oracle databases |
Skills (Enterprise Feature)
Extend PandasAI with custom skills:
from pandasai.skills import Skill
# Define a custom skill
class SalesForecasting(Skill):
name = "sales_forecasting"
description = "Forecast future sales based on historical data"
def execute(self, df, periods=30):
# Custom forecasting logic
from sklearn.linear_model import LinearRegression
import numpy as np
# Implementation
pass
# Register the skill
pai.config.set({"skills": [SalesForecasting()]})
# Use in queries
df.chat("Forecast sales for the next 30 days")
Vector Stores for Training (Enterprise Feature)
Train the agent with domain-specific knowledge:
# Configure vector store for improved accuracy
pai.config.set({
"vector_store": {
"type": "chromadb",
"path": "./vector_store"
}
})
# Add training examples
agent = pai.Agent(df)
agent.train(
queries=[
"What are total sales?",
"Show me revenue by product category",
],
responses=[
"df['sales'].sum()",
"df.groupby('category')['revenue'].sum()",
]
)
Best Practices Summary
Security Best Practices
- Always use sandbox in production - Enable Docker sandbox for code execution
- Implement authentication - Use OAuth 2.0/OIDC for enterprise environments
- Apply least privilege - Grant minimum necessary permissions
- Enable audit logging - Log all queries and data access
- Encrypt sensitive data - Use encryption at rest and in transit
- Regular security reviews - Periodically audit access policies
Operational Best Practices
- Monitor LLM costs - Track API usage and implement budgets
- Implement caching - Reduce API calls with response caching
- Use appropriate models - Match model capability to query complexity
- Set up alerts - Monitor for errors, high latency, and unusual usage
- Plan for scaling - Use Kubernetes autoscaling for variable loads
Development Best Practices
- Test with deterministic settings - Use temperature=0 and seed for reproducible results
- Handle errors gracefully - Implement comprehensive error handling
- Document custom skills - Maintain documentation for extensions
- Version control configurations - Track config changes in git
Conclusion
PandasAI provides a powerful way to democratize data analysis through natural language interfaces. With proper setup, operational management, and security controls, it can be deployed safely in enterprise environments.
Key takeaways:
- Easy to get started - Simple pip install and configuration
- Flexible LLM support - Works with 100+ LLM providers via LiteLLM
- Production-ready - Docker sandbox, Kubernetes deployment options, and enterprise features
- Cost manageable - Model selection and caching strategies help control costs
- Security-conscious - Multiple options for authentication, authorization, and data access control