Developing AI Agents with Microsoft Promptflow: A Complete Guide with Slack Bot Use Cases
READER BEWARE: THE FOLLOWING WRITTEN ENTIRELY BY AI WITHOUT HUMAN EDITING.
Introduction
Building AI agents has become increasingly accessible with the emergence of orchestration frameworks that abstract away the complexity of managing prompts, model interactions, and workflow logic. Microsoft’s Promptflow is one such framework that provides a powerful yet straightforward approach to developing, testing, and deploying AI-powered applications.
In this comprehensive guide, we’ll explore how to use Promptflow to build production-ready AI agents, covering everything from initial setup to live deployment. We’ll walk through two practical use cases—a Slack bot for monitoring channel discussions and a security review bot for open source repositories—and compare Promptflow with alternative frameworks like LangChain and Semantic Kernel.
What is Promptflow?
Promptflow is an open-source development framework from Microsoft designed to streamline the development lifecycle of AI applications powered by Large Language Models (LLMs). It provides tools for:
- Flow Development: Visual and code-based tools for creating LLM workflows
- Prompt Engineering: Systematic approach to crafting and refining prompts
- Testing and Evaluation: Built-in evaluation capabilities for quality assurance
- Deployment: Integration with Azure AI and other deployment targets
- Tracing and Debugging: Comprehensive logging and debugging tools
Key Concepts
Flows: The core building block in Promptflow. A flow is a directed acyclic graph (DAG) that connects various nodes including LLM calls, Python functions, and tools.
Nodes: Individual processing units within a flow. Types include:
- LLM nodes: Calls to language models (OpenAI, Azure OpenAI, etc.)
- Python nodes: Custom Python code execution
- Prompt nodes: Prompt template processing
Connections: Secure storage for API keys and endpoints for LLM services.
Runs: Execution instances of flows that capture inputs, outputs, and metrics.
Setup and Installation
Prerequisites
Before getting started, ensure you have:
- Python 3.9 or later
- pip package manager
- An LLM API key (OpenAI, Azure OpenAI, or compatible service)
- Basic familiarity with Python and command-line tools
Installation
Install Promptflow using pip:
# Install the core Promptflow package
pip install promptflow
# Install additional tools for development
pip install promptflow-tools
# For Azure AI integration
pip install promptflow-azure
# For local development UI (optional but recommended)
pip install promptflow[azure]
Verify Installation
# Check the installation
pf --version
# Expected output:
# promptflow: 1.x.x
# promptflow-core: 1.x.x
# promptflow-devkit: 1.x.x
# promptflow-tracing: 1.x.x
Configure LLM Connection
Set up your LLM connection (using Azure OpenAI as an example):
# Create a connection configuration file
pf connection create --file azure_openai_connection.yaml
Create azure_openai_connection.yaml:
$schema: https://azuremlschemas.azureedge.net/promptflow/latest/AzureOpenAIConnection.schema.json
name: azure_openai_connection
type: azure_open_ai
api_key: "${env:AZURE_OPENAI_API_KEY}"
api_base: "https://your-resource.openai.azure.com/"
api_type: azure
api_version: "2024-02-15-preview"
For OpenAI:
$schema: https://azuremlschemas.azureedge.net/promptflow/latest/OpenAIConnection.schema.json
name: openai_connection
type: open_ai
api_key: "${env:OPENAI_API_KEY}"
organization: "${env:OPENAI_ORG_ID}" # Optional
Project Structure
A typical Promptflow project structure:
my-promptflow-project/
├── flows/
│ ├── slack-monitor/
│ │ ├── flow.dag.yaml
│ │ ├── analyze_message.py
│ │ ├── classify_relevance.jinja2
│ │ └── requirements.txt
│ └── security-review/
│ ├── flow.dag.yaml
│ ├── run_security_scan.py
│ ├── analyze_results.jinja2
│ └── requirements.txt
├── connections/
│ └── azure_openai_connection.yaml
├── evaluations/
│ └── accuracy_evaluation.py
├── tests/
│ └── test_flows.py
└── pyproject.toml
Potential Costs
Understanding the cost structure is crucial for production deployments. Here’s a comprehensive breakdown:
LLM API Costs
| Model | Input (per 1K tokens) | Output (per 1K tokens) | Notes |
|---|---|---|---|
| GPT-4 Turbo | $0.01 | $0.03 | Best for complex reasoning |
| GPT-4o | $0.005 | $0.015 | Optimized for cost/performance |
| GPT-4o-mini | $0.00015 | $0.0006 | Budget option for simpler tasks |
| GPT-3.5 Turbo | $0.0005 | $0.0015 | Legacy, still cost-effective |
Azure AI Hosting Costs (Optional)
| Service | Cost | Description |
|---|---|---|
| Azure AI Studio | Varies | Managed deployment for flows |
| Azure Container Apps | ~$0.000012/vCPU-second | Serverless container hosting |
| Azure Functions | First 1M executions free | Event-driven execution |
| Azure Kubernetes | Varies by node size | Full container orchestration |
Estimated Monthly Costs by Use Case
Slack Channel Monitor Bot (Use Case 1):
- Assuming 10,000 messages/day monitored
- Average 100 tokens per message analysis
- Using GPT-4o-mini for classification
- Estimated: $15-50/month for LLM costs
Security Review Bot (Use Case 2):
- Assuming 50 repositories reviewed/week
- Average 5,000 tokens per security analysis
- Using GPT-4 Turbo for detailed analysis
- Estimated: $50-200/month for LLM costs
Cost Optimization Strategies
Use Appropriate Models: Use cheaper models (GPT-4o-mini) for classification tasks, reserve expensive models (GPT-4 Turbo) for complex analysis
Implement Caching: Cache responses for repeated queries to reduce API calls
Batch Processing: Combine multiple requests when possible
Token Optimization: Keep prompts concise and use system messages efficiently
Rate Limiting: Implement rate limits to prevent runaway costs
# Example: Cost-aware model selection
def select_model(task_complexity: str) -> str:
"""Select appropriate model based on task complexity."""
model_mapping = {
"simple": "gpt-4o-mini",
"moderate": "gpt-4o",
"complex": "gpt-4-turbo"
}
return model_mapping.get(task_complexity, "gpt-4o-mini")
Iterative Development Workflow
Promptflow encourages an iterative development approach. Here’s a structured workflow:
Phase 1: Prototype Development
Step 1: Define Your Flow
Create flow.dag.yaml:
$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
inputs:
message:
type: string
description: "The input message to analyze"
context:
type: string
description: "Additional context for analysis"
outputs:
analysis:
type: string
reference: ${analyze_message.output}
classification:
type: string
reference: ${classify_relevance.output}
nodes:
- name: classify_relevance
type: llm
source:
type: code
path: classify_relevance.jinja2
inputs:
deployment_name: gpt-4o-mini
message: ${inputs.message}
connection: azure_openai_connection
api: chat
- name: analyze_message
type: python
source:
type: code
path: analyze_message.py
inputs:
message: ${inputs.message}
classification: ${classify_relevance.output}
Step 2: Create Prompt Templates
classify_relevance.jinja2:
system:
You are a message classification assistant. Classify the following message based on its relevance to specific stakeholders.
user:
Message: {{ message }}
Classify this message into one of the following categories:
- HIGH_PRIORITY: Urgent matters requiring immediate attention
- RELEVANT: Topics of interest to stakeholders
- INFORMATIONAL: General updates or announcements
- NOT_RELEVANT: Messages outside stakeholder interests
Respond with only the category name and a brief justification.
Step 3: Test Locally
# Run your flow locally
pf flow test --flow ./flows/slack-monitor --inputs message="Urgent: Production server down" context="DevOps team"
# Interactive testing with UI
pf flow serve --source ./flows/slack-monitor --port 8080
Phase 2: Evaluation and Refinement
Step 1: Create Evaluation Dataset
evaluation_data.jsonl:
{"message": "Server is down in production", "expected_classification": "HIGH_PRIORITY"}
{"message": "Team lunch tomorrow at noon", "expected_classification": "NOT_RELEVANT"}
{"message": "New security patch available", "expected_classification": "RELEVANT"}
{"message": "Q4 roadmap discussion scheduled", "expected_classification": "RELEVANT"}
Step 2: Define Evaluation Metrics
evaluations/accuracy_evaluation.py:
from promptflow import tool
@tool
def evaluate_classification(
prediction: str,
ground_truth: str
) -> dict:
"""Evaluate classification accuracy."""
prediction_category = prediction.split(':')[0].strip().upper()
is_correct = prediction_category == ground_truth.upper()
return {
"is_correct": is_correct,
"prediction": prediction_category,
"expected": ground_truth
}
Step 3: Run Batch Evaluation
# Run evaluation
pf run create \
--flow ./flows/slack-monitor \
--data ./evaluation_data.jsonl \
--column-mapping message='${data.message}' \
--stream
# View results
pf run show-details --name <run-name>
pf run visualize --name <run-name>
Phase 3: Optimization
Prompt Tuning: Iterate on prompts based on evaluation results
# Experiment tracking
from promptflow import PFClient
client = PFClient()
# Create variants for A/B testing
variants = [
{"prompt_version": "v1", "system_prompt": "..."},
{"prompt_version": "v2", "system_prompt": "..."},
]
for variant in variants:
run = client.runs.create_or_update(
flow="./flows/slack-monitor",
data="./evaluation_data.jsonl",
variant=variant["prompt_version"],
display_name=f"Evaluation - {variant['prompt_version']}"
)
Performance Optimization:
# Add caching to reduce API calls
nodes:
- name: classify_relevance
type: llm
source:
type: code
path: classify_relevance.jinja2
inputs:
deployment_name: gpt-4o-mini
message: ${inputs.message}
connection: azure_openai_connection
api: chat
cache: true # Enable caching for repeated inputs
Use Case 1: Slack Channel Monitor Bot
This bot monitors Slack channels for discussions relevant to specific stakeholders and notifies them in real-time.
Architecture Overview
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Slack Events │───▶│ Promptflow Bot │───▶│ Notifications │
│ (Messages) │ │ (Classification)│ │ (DMs/Alerts) │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
Webhook Listener LLM Analysis Slack API (DM)
Implementation
Flow Definition (flow.dag.yaml):
$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
inputs:
message_text:
type: string
description: "The Slack message content"
channel_name:
type: string
description: "The source channel"
sender:
type: string
description: "Message author"
stakeholder_interests:
type: object
description: "Mapping of stakeholders to their interests"
outputs:
should_notify:
type: bool
reference: ${determine_notification.output.should_notify}
stakeholders:
type: list
reference: ${determine_notification.output.stakeholders}
summary:
type: string
reference: ${summarize_discussion.output}
nodes:
- name: analyze_topic
type: llm
source:
type: code
path: prompts/analyze_topic.jinja2
inputs:
deployment_name: gpt-4o-mini
message: ${inputs.message_text}
channel: ${inputs.channel_name}
connection: azure_openai_connection
api: chat
- name: determine_notification
type: python
source:
type: code
path: tools/determine_notification.py
inputs:
topic_analysis: ${analyze_topic.output}
stakeholder_interests: ${inputs.stakeholder_interests}
channel: ${inputs.channel_name}
- name: summarize_discussion
type: llm
source:
type: code
path: prompts/summarize.jinja2
inputs:
deployment_name: gpt-4o-mini
message: ${inputs.message_text}
topic: ${analyze_topic.output}
connection: azure_openai_connection
api: chat
activate:
when: ${determine_notification.output.should_notify}
is: true
Topic Analysis Prompt (prompts/analyze_topic.jinja2):
system:
You are an expert at analyzing workplace communications. Extract the main topics, technologies mentioned, and the nature of the discussion from Slack messages.
user:
Channel: {{ channel }}
Message: {{ message }}
Analyze this message and provide:
1. Main topic (one phrase)
2. Technologies/products mentioned (comma-separated list)
3. Discussion type: question, announcement, incident, casual, decision, or review
4. Urgency level: high, medium, or low
5. Key entities mentioned (people, teams, systems)
Format your response as JSON:
{
"topic": "",
"technologies": [],
"discussion_type": "",
"urgency": "",
"entities": []
}
Notification Logic (tools/determine_notification.py):
import json
from promptflow import tool
from typing import Any
@tool
def determine_notification(
topic_analysis: str,
stakeholder_interests: dict,
channel: str
) -> dict:
"""
Determine which stakeholders should be notified based on
their interests and the analyzed message topic.
"""
try:
analysis = json.loads(topic_analysis)
except json.JSONDecodeError:
return {"should_notify": False, "stakeholders": [], "reason": "Invalid analysis"}
stakeholders_to_notify = []
for stakeholder, interests in stakeholder_interests.items():
interest_keywords = interests.get("keywords", [])
interest_channels = interests.get("channels", [])
interest_discussion_types = interests.get("discussion_types", [])
urgency_threshold = interests.get("urgency_threshold", "high")
# Check channel match
channel_match = channel in interest_channels or "*" in interest_channels
# Check topic/technology match
topic_match = any(
keyword.lower() in analysis.get("topic", "").lower() or
keyword.lower() in [t.lower() for t in analysis.get("technologies", [])]
for keyword in interest_keywords
)
# Check discussion type match
discussion_match = (
analysis.get("discussion_type", "") in interest_discussion_types or
"*" in interest_discussion_types
)
# Check urgency
urgency_levels = {"low": 1, "medium": 2, "high": 3}
message_urgency = urgency_levels.get(analysis.get("urgency", "low"), 1)
threshold_urgency = urgency_levels.get(urgency_threshold, 3)
urgency_match = message_urgency >= threshold_urgency
# Determine if stakeholder should be notified
if channel_match and (topic_match or discussion_match) and urgency_match:
stakeholders_to_notify.append({
"id": stakeholder,
"match_reasons": {
"channel": channel_match,
"topic": topic_match,
"discussion_type": discussion_match,
"urgency": urgency_match
}
})
return {
"should_notify": len(stakeholders_to_notify) > 0,
"stakeholders": stakeholders_to_notify,
"analysis": analysis
}
Slack Integration (slack_integration.py):
import os
import json
from slack_sdk import WebClient
from slack_sdk.socket_mode import SocketModeClient
from slack_sdk.socket_mode.request import SocketModeRequest
from slack_sdk.socket_mode.response import SocketModeResponse
from promptflow import load_flow
class SlackMonitorBot:
def __init__(self):
self.slack_client = WebClient(token=os.environ["SLACK_BOT_TOKEN"])
self.socket_client = SocketModeClient(
app_token=os.environ["SLACK_APP_TOKEN"],
web_client=self.slack_client
)
self.flow = load_flow("./flows/slack-monitor")
self.stakeholder_interests = self._load_stakeholder_config()
def _load_stakeholder_config(self) -> dict:
"""Load stakeholder interest configuration."""
with open("config/stakeholders.json") as f:
return json.load(f)
def handle_message(self, client, req: SocketModeRequest):
"""Handle incoming Slack messages."""
if req.type == "events_api":
event = req.payload.get("event", {})
if event.get("type") == "message" and not event.get("bot_id"):
# Acknowledge the request
response = SocketModeResponse(envelope_id=req.envelope_id)
client.send_socket_mode_response(response)
# Get channel info
channel_info = self.slack_client.conversations_info(
channel=event["channel"]
)
channel_name = channel_info["channel"]["name"]
# Get sender info
user_info = self.slack_client.users_info(user=event["user"])
sender_name = user_info["user"]["real_name"]
# Run the Promptflow flow
result = self.flow(
message_text=event["text"],
channel_name=channel_name,
sender=sender_name,
stakeholder_interests=self.stakeholder_interests
)
# Send notifications if needed
if result["should_notify"]:
self._send_notifications(
stakeholders=result["stakeholders"],
summary=result["summary"],
channel=channel_name,
original_channel_id=event["channel"],
message_ts=event["ts"]
)
def _send_notifications(
self,
stakeholders: list,
summary: str,
channel: str,
original_channel_id: str,
message_ts: str
):
"""Send DM notifications to relevant stakeholders."""
# Construct permalink
permalink = self.slack_client.chat_getPermalink(
channel=original_channel_id,
message_ts=message_ts
)["permalink"]
for stakeholder in stakeholders:
# Open DM channel with stakeholder
dm = self.slack_client.conversations_open(
users=[stakeholder["id"]]
)
# Send notification
self.slack_client.chat_postMessage(
channel=dm["channel"]["id"],
blocks=[
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"📢 Relevant Discussion in #{channel}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Summary:*\n{summary}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Why you're seeing this:*\n" +
", ".join(k for k, v in stakeholder["match_reasons"].items() if v)
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {
"type": "plain_text",
"text": "View Discussion"
},
"url": permalink
},
{
"type": "button",
"text": {
"type": "plain_text",
"text": "Mute This Topic"
},
"action_id": "mute_topic"
}
]
}
]
)
def start(self):
"""Start the bot."""
self.socket_client.socket_mode_request_listeners.append(
self.handle_message
)
self.socket_client.connect()
print("Bot is running...")
if __name__ == "__main__":
bot = SlackMonitorBot()
bot.start()
Stakeholder Configuration (config/stakeholders.json):
{
"U12345678": {
"name": "Sarah (Security Lead)",
"keywords": ["security", "vulnerability", "CVE", "breach", "authentication", "authorization"],
"channels": ["security-alerts", "engineering", "incidents"],
"discussion_types": ["incident", "announcement", "question"],
"urgency_threshold": "medium"
},
"U87654321": {
"name": "Mike (Platform Lead)",
"keywords": ["kubernetes", "docker", "AWS", "infrastructure", "deployment", "scaling"],
"channels": ["platform", "engineering", "incidents", "devops"],
"discussion_types": ["incident", "decision", "question"],
"urgency_threshold": "medium"
},
"U11111111": {
"name": "Lisa (Product Manager)",
"keywords": ["roadmap", "feature", "customer", "feedback", "release"],
"channels": ["product", "general", "engineering"],
"discussion_types": ["decision", "announcement"],
"urgency_threshold": "low"
}
}
Use Case 2: Security Review Bot for Open Source Repositories
This bot performs automated security reviews on open source repositories using Python security scanning tools and provides analysis through Slack.
Architecture Overview
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Slack Command │───▶│ Promptflow Bot │───▶│ Results Report │
│ /security-scan │ │ + Python Tools │ │ (Slack Thread) │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
Repo URL Input Security Scans Stakeholder DMs
(Bandit, Safety, (for high risk)
Semgrep, etc.)
Implementation
Flow Definition (flow.dag.yaml):
$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
inputs:
repo_url:
type: string
description: "GitHub repository URL to scan"
scan_types:
type: list
description: "Types of security scans to run"
default: ["bandit", "safety", "semgrep"]
notify_stakeholders:
type: list
description: "Stakeholder IDs to notify on high-risk findings"
outputs:
summary:
type: string
reference: ${generate_report.output.summary}
risk_level:
type: string
reference: ${generate_report.output.risk_level}
findings:
type: list
reference: ${generate_report.output.findings}
recommendations:
type: string
reference: ${generate_recommendations.output}
nodes:
- name: clone_repository
type: python
source:
type: code
path: tools/clone_repo.py
inputs:
repo_url: ${inputs.repo_url}
- name: run_bandit_scan
type: python
source:
type: code
path: tools/run_bandit.py
inputs:
repo_path: ${clone_repository.output.repo_path}
activate:
when: ${inputs.scan_types}
contains: "bandit"
- name: run_safety_scan
type: python
source:
type: code
path: tools/run_safety.py
inputs:
repo_path: ${clone_repository.output.repo_path}
activate:
when: ${inputs.scan_types}
contains: "safety"
- name: run_semgrep_scan
type: python
source:
type: code
path: tools/run_semgrep.py
inputs:
repo_path: ${clone_repository.output.repo_path}
activate:
when: ${inputs.scan_types}
contains: "semgrep"
- name: aggregate_results
type: python
source:
type: code
path: tools/aggregate_results.py
inputs:
bandit_results: ${run_bandit_scan.output}
safety_results: ${run_safety_scan.output}
semgrep_results: ${run_semgrep_scan.output}
- name: generate_report
type: llm
source:
type: code
path: prompts/generate_report.jinja2
inputs:
deployment_name: gpt-4-turbo
aggregated_results: ${aggregate_results.output}
repo_url: ${inputs.repo_url}
connection: azure_openai_connection
api: chat
- name: generate_recommendations
type: llm
source:
type: code
path: prompts/recommendations.jinja2
inputs:
deployment_name: gpt-4-turbo
findings: ${generate_report.output.findings}
risk_level: ${generate_report.output.risk_level}
connection: azure_openai_connection
api: chat
- name: cleanup
type: python
source:
type: code
path: tools/cleanup.py
inputs:
repo_path: ${clone_repository.output.repo_path}
Clone Repository Tool (tools/clone_repo.py):
import os
import tempfile
import subprocess
from promptflow import tool
@tool
def clone_repository(repo_url: str) -> dict:
"""Clone a GitHub repository for security scanning."""
# Validate URL format
if not repo_url.startswith(("https://github.com/", "git@github.com:")):
raise ValueError("Only GitHub repositories are supported")
# Create temp directory
temp_dir = tempfile.mkdtemp(prefix="security_scan_")
try:
# Clone the repository
result = subprocess.run(
["git", "clone", "--depth", "1", repo_url, temp_dir],
capture_output=True,
text=True,
timeout=120
)
if result.returncode != 0:
raise RuntimeError(f"Failed to clone repository: {result.stderr}")
# Extract repo name
repo_name = repo_url.rstrip("/").split("/")[-1].replace(".git", "")
return {
"repo_path": temp_dir,
"repo_name": repo_name,
"success": True
}
except Exception as e:
# Cleanup on failure
if os.path.exists(temp_dir):
subprocess.run(["rm", "-rf", temp_dir])
raise e
Bandit Scanner (tools/run_bandit.py):
import subprocess
import json
from promptflow import tool
@tool
def run_bandit_scan(repo_path: str) -> dict:
"""
Run Bandit security scanner on Python code.
Bandit is a tool designed to find common security issues in Python code.
"""
try:
result = subprocess.run(
[
"bandit",
"-r", repo_path,
"-f", "json",
"-ll", # Only report medium and higher severity
"--exclude", "*/test*,*/.git/*,*/venv/*"
],
capture_output=True,
text=True,
timeout=300
)
# Bandit returns exit code 1 if issues are found
if result.stdout:
findings = json.loads(result.stdout)
else:
findings = {"results": [], "metrics": {}}
# Transform to common format
issues = []
for finding in findings.get("results", []):
issues.append({
"tool": "bandit",
"severity": finding.get("issue_severity", "UNKNOWN"),
"confidence": finding.get("issue_confidence", "UNKNOWN"),
"file": finding.get("filename", "").replace(repo_path, ""),
"line": finding.get("line_number", 0),
"issue_text": finding.get("issue_text", ""),
"test_id": finding.get("test_id", ""),
"cwe": finding.get("issue_cwe", {}).get("id", None),
"code_snippet": finding.get("code", "")
})
return {
"tool": "bandit",
"success": True,
"issues": issues,
"summary": {
"high": len([i for i in issues if i["severity"] == "HIGH"]),
"medium": len([i for i in issues if i["severity"] == "MEDIUM"]),
"low": len([i for i in issues if i["severity"] == "LOW"])
}
}
except subprocess.TimeoutExpired:
return {
"tool": "bandit",
"success": False,
"error": "Scan timed out after 5 minutes",
"issues": []
}
except Exception as e:
return {
"tool": "bandit",
"success": False,
"error": str(e),
"issues": []
}
Safety Scanner (tools/run_safety.py):
import subprocess
import json
import os
from promptflow import tool
@tool
def run_safety_scan(repo_path: str) -> dict:
"""
Run Safety to check for known vulnerabilities in Python dependencies.
"""
issues = []
requirements_files = []
# Find all requirements files
for root, dirs, files in os.walk(repo_path):
# Skip hidden and virtual environment directories
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ['venv', 'env', 'node_modules']]
for file in files:
if file in ['requirements.txt', 'requirements-dev.txt', 'requirements-test.txt']:
requirements_files.append(os.path.join(root, file))
elif file == 'Pipfile.lock' or file == 'poetry.lock':
requirements_files.append(os.path.join(root, file))
for req_file in requirements_files:
try:
result = subprocess.run(
[
"safety", "check",
"-r", req_file,
"--json"
],
capture_output=True,
text=True,
timeout=120
)
if result.stdout:
try:
safety_output = json.loads(result.stdout)
vulnerabilities = safety_output if isinstance(safety_output, list) else safety_output.get("vulnerabilities", [])
for vuln in vulnerabilities:
issues.append({
"tool": "safety",
"severity": "HIGH" if "critical" in str(vuln).lower() else "MEDIUM",
"package": vuln[0] if isinstance(vuln, list) else vuln.get("package_name", "unknown"),
"affected_version": vuln[2] if isinstance(vuln, list) else vuln.get("installed_version", "unknown"),
"vulnerability_id": vuln[4] if isinstance(vuln, list) else vuln.get("vulnerability_id", "unknown"),
"description": vuln[3] if isinstance(vuln, list) else vuln.get("advisory", ""),
"file": req_file.replace(repo_path, "")
})
except json.JSONDecodeError:
pass
except Exception as e:
continue
return {
"tool": "safety",
"success": True,
"issues": issues,
"files_scanned": len(requirements_files),
"summary": {
"high": len([i for i in issues if i["severity"] == "HIGH"]),
"medium": len([i for i in issues if i["severity"] == "MEDIUM"]),
"low": len([i for i in issues if i["severity"] == "LOW"])
}
}
Semgrep Scanner (tools/run_semgrep.py):
import subprocess
import json
from promptflow import tool
@tool
def run_semgrep_scan(repo_path: str) -> dict:
"""
Run Semgrep with security-focused rulesets.
"""
try:
result = subprocess.run(
[
"semgrep", "scan",
"--config", "p/security-audit",
"--config", "p/secrets",
"--json",
"--timeout", "300",
repo_path
],
capture_output=True,
text=True,
timeout=360
)
issues = []
if result.stdout:
try:
semgrep_output = json.loads(result.stdout)
for finding in semgrep_output.get("results", []):
# Map Semgrep severity to our format
severity_map = {
"ERROR": "HIGH",
"WARNING": "MEDIUM",
"INFO": "LOW"
}
issues.append({
"tool": "semgrep",
"severity": severity_map.get(finding.get("extra", {}).get("severity", "INFO"), "LOW"),
"rule_id": finding.get("check_id", ""),
"file": finding.get("path", "").replace(repo_path, ""),
"line_start": finding.get("start", {}).get("line", 0),
"line_end": finding.get("end", {}).get("line", 0),
"message": finding.get("extra", {}).get("message", ""),
"category": finding.get("extra", {}).get("metadata", {}).get("category", "security"),
"cwe": finding.get("extra", {}).get("metadata", {}).get("cwe", []),
"owasp": finding.get("extra", {}).get("metadata", {}).get("owasp", [])
})
except json.JSONDecodeError:
pass
return {
"tool": "semgrep",
"success": True,
"issues": issues,
"summary": {
"high": len([i for i in issues if i["severity"] == "HIGH"]),
"medium": len([i for i in issues if i["severity"] == "MEDIUM"]),
"low": len([i for i in issues if i["severity"] == "LOW"])
}
}
except subprocess.TimeoutExpired:
return {
"tool": "semgrep",
"success": False,
"error": "Scan timed out",
"issues": []
}
except Exception as e:
return {
"tool": "semgrep",
"success": False,
"error": str(e),
"issues": []
}
Report Generation Prompt (prompts/generate_report.jinja2):
system:
You are a security analyst expert. Analyze the security scan results and provide a comprehensive, actionable security report. Focus on severity, exploitability, and practical remediation steps.
user:
Repository: {{ repo_url }}
Security Scan Results:
{{ aggregated_results | tojson(indent=2) }}
Generate a security report with the following structure:
1. Executive Summary (2-3 sentences)
2. Overall Risk Level (CRITICAL, HIGH, MEDIUM, LOW)
3. Key Findings (top 5 most critical issues)
4. Detailed Findings by Category
5. Statistics
Format your response as JSON:
{
"summary": "Executive summary text",
"risk_level": "CRITICAL|HIGH|MEDIUM|LOW",
"key_findings": [
{
"title": "Finding title",
"severity": "HIGH|MEDIUM|LOW",
"description": "Brief description",
"file": "affected file",
"remediation": "How to fix"
}
],
"findings_by_category": {
"code_vulnerabilities": [...],
"dependency_vulnerabilities": [...],
"secrets_exposure": [...],
"other": [...]
},
"statistics": {
"total_issues": 0,
"critical": 0,
"high": 0,
"medium": 0,
"low": 0,
"files_affected": 0
}
}
Slack Integration for Security Bot (security_bot_slack.py):
import os
import json
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
from promptflow import load_flow
app = App(token=os.environ["SLACK_BOT_TOKEN"])
flow = load_flow("./flows/security-review")
# Stakeholders to notify for high-risk findings
SECURITY_STAKEHOLDERS = ["U12345678", "U87654321"]
@app.command("/security-scan")
def handle_security_scan(ack, say, command):
"""Handle the /security-scan slash command."""
ack()
repo_url = command["text"].strip()
user_id = command["user_id"]
channel_id = command["channel_id"]
# Validate input
if not repo_url:
say("Please provide a repository URL. Usage: `/security-scan https://github.com/owner/repo`")
return
# Send initial response
initial_message = say(
blocks=[
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"🔍 *Starting security scan*\nRepository: `{repo_url}`\n\nThis may take a few minutes..."
}
}
]
)
try:
# Run the security scan flow
result = flow(
repo_url=repo_url,
scan_types=["bandit", "safety", "semgrep"],
notify_stakeholders=SECURITY_STAKEHOLDERS
)
# Format and send results
send_scan_results(
channel_id=channel_id,
thread_ts=initial_message["ts"],
result=result,
repo_url=repo_url
)
# Notify stakeholders if high risk
if result["risk_level"] in ["CRITICAL", "HIGH"]:
notify_security_stakeholders(
result=result,
repo_url=repo_url,
requested_by=user_id,
channel_id=channel_id
)
except Exception as e:
app.client.chat_postMessage(
channel=channel_id,
thread_ts=initial_message["ts"],
text=f"❌ Security scan failed: {str(e)}"
)
def send_scan_results(channel_id: str, thread_ts: str, result: dict, repo_url: str):
"""Send formatted scan results to Slack."""
# Risk level emoji mapping
risk_emoji = {
"CRITICAL": "🔴",
"HIGH": "🟠",
"MEDIUM": "🟡",
"LOW": "🟢"
}
# Build blocks
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"Security Scan Complete {risk_emoji.get(result['risk_level'], '⚪')}"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f"*Repository:*\n<{repo_url}|{repo_url.split('/')[-1]}>"
},
{
"type": "mrkdwn",
"text": f"*Risk Level:*\n{result['risk_level']}"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Summary:*\n{result['summary']}"
}
},
{"type": "divider"}
]
# Add statistics
stats = result.get("findings", {}).get("statistics", {})
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Statistics:*\n• Total Issues: {stats.get('total_issues', 0)}\n• Critical: {stats.get('critical', 0)}\n• High: {stats.get('high', 0)}\n• Medium: {stats.get('medium', 0)}\n• Low: {stats.get('low', 0)}"
}
})
# Add key findings
if result.get("findings"):
blocks.append({"type": "divider"})
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Key Findings:*"
}
})
for finding in result["findings"][:5]: # Top 5 findings
severity_emoji = {"HIGH": "🔴", "MEDIUM": "🟡", "LOW": "🟢"}.get(finding.get("severity", ""), "⚪")
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"{severity_emoji} *{finding.get('title', 'Unknown')}*\n{finding.get('description', '')}\n_File: {finding.get('file', 'N/A')}_"
}
})
# Add recommendations
if result.get("recommendations"):
blocks.append({"type": "divider"})
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Recommendations:*\n{result['recommendations']}"
}
})
app.client.chat_postMessage(
channel=channel_id,
thread_ts=thread_ts,
blocks=blocks
)
def notify_security_stakeholders(result: dict, repo_url: str, requested_by: str, channel_id: str):
"""Send DM notifications to security stakeholders for high-risk findings."""
for stakeholder_id in SECURITY_STAKEHOLDERS:
dm = app.client.conversations_open(users=[stakeholder_id])
app.client.chat_postMessage(
channel=dm["channel"]["id"],
blocks=[
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"🚨 High-Risk Security Scan Alert"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"A security scan of `{repo_url}` has returned *{result['risk_level']}* risk findings.\n\n*Summary:*\n{result['summary']}"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f"*Requested by:*\n<@{requested_by}>"
},
{
"type": "mrkdwn",
"text": f"*Channel:*\n<#{channel_id}>"
}
]
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {
"type": "plain_text",
"text": "View Full Report"
},
"action_id": "view_report",
"value": json.dumps({"channel": channel_id, "repo": repo_url})
}
]
}
]
)
if __name__ == "__main__":
handler = SocketModeHandler(app, os.environ["SLACK_APP_TOKEN"])
handler.start()
Releasing a Live Version
Deployment Options
Option 1: Azure Container Apps (Recommended)
# azure-container-app.yaml
name: promptflow-slack-bot
resourceGroup: my-resource-group
location: eastus
properties:
configuration:
ingress:
external: false
targetPort: 8080
secrets:
- name: slack-bot-token
value: ${SLACK_BOT_TOKEN}
- name: openai-api-key
value: ${OPENAI_API_KEY}
template:
containers:
- name: slack-bot
image: myregistry.azurecr.io/slack-bot:latest
resources:
cpu: 0.5
memory: 1Gi
env:
- name: SLACK_BOT_TOKEN
secretRef: slack-bot-token
- name: OPENAI_API_KEY
secretRef: openai-api-key
scale:
minReplicas: 1
maxReplicas: 5
Deploy:
# Build and push container
docker build -t myregistry.azurecr.io/slack-bot:latest .
docker push myregistry.azurecr.io/slack-bot:latest
# Deploy to Azure Container Apps
az containerapp create \
--name promptflow-slack-bot \
--resource-group my-resource-group \
--image myregistry.azurecr.io/slack-bot:latest \
--environment my-container-env \
--secrets slack-bot-token=$SLACK_BOT_TOKEN openai-api-key=$OPENAI_API_KEY \
--env-vars SLACK_BOT_TOKEN=secretref:slack-bot-token OPENAI_API_KEY=secretref:openai-api-key
Option 2: Azure AI Studio Deployment
# Deploy flow to Azure AI
pf flow build --source ./flows/slack-monitor --output ./build --format docker
# Create managed endpoint
az ml online-endpoint create --file endpoint.yaml
# Deploy the flow
az ml online-deployment create --file deployment.yaml
Option 3: Self-Hosted with Docker Compose
# docker-compose.yaml
version: '3.8'
services:
slack-monitor-bot:
build:
context: ./flows/slack-monitor
dockerfile: Dockerfile
environment:
- SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN}
- SLACK_APP_TOKEN=${SLACK_APP_TOKEN}
- AZURE_OPENAI_API_KEY=${AZURE_OPENAI_API_KEY}
- AZURE_OPENAI_ENDPOINT=${AZURE_OPENAI_ENDPOINT}
restart: unless-stopped
volumes:
- ./config:/app/config:ro
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
security-review-bot:
build:
context: ./flows/security-review
dockerfile: Dockerfile
environment:
- SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN}
- SLACK_APP_TOKEN=${SLACK_APP_TOKEN}
- AZURE_OPENAI_API_KEY=${AZURE_OPENAI_API_KEY}
- AZURE_OPENAI_ENDPOINT=${AZURE_OPENAI_ENDPOINT}
restart: unless-stopped
volumes:
- ./config:/app/config:ro
- /tmp/security-scans:/tmp/security-scans
Production Dockerfile
# Dockerfile
FROM python:3.11-slim
# Install system dependencies for security tools
RUN apt-get update && apt-get install -y \
git \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Install security scanning tools
RUN pip install bandit safety semgrep
# Copy application code
COPY . .
# Create non-root user
RUN useradd -m appuser && chown -R appuser:appuser /app
USER appuser
# Health check endpoint
EXPOSE 8080
CMD ["python", "main.py"]
Monitoring and Observability
# monitoring.py
import logging
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# Configure tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
otlp_exporter = OTLPSpanExporter(endpoint="http://otel-collector:4317")
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def log_flow_execution(flow_name: str, inputs: dict, outputs: dict, duration: float):
"""Log flow execution for monitoring."""
with tracer.start_as_current_span(f"flow.{flow_name}") as span:
span.set_attribute("flow.name", flow_name)
span.set_attribute("flow.duration_ms", duration * 1000)
span.set_attribute("flow.input_size", len(str(inputs)))
span.set_attribute("flow.output_size", len(str(outputs)))
logger.info(
f"Flow executed: {flow_name}",
extra={
"flow_name": flow_name,
"duration": duration,
"success": True
}
)
Comparison with Alternative Frameworks
LangChain
LangChain is a popular framework for building LLM applications.
Strengths
| Feature | LangChain | Promptflow |
|---|---|---|
| Community | Larger ecosystem, more tutorials | Microsoft-backed, Azure integration |
| Chains | Flexible chain composition | DAG-based flow structure |
| Agents | Sophisticated agent framework | Simpler agent patterns |
| Memory | Multiple memory types built-in | Requires custom implementation |
| Integrations | 100+ LLM/tool integrations | Focused on Azure ecosystem |
When to Choose LangChain
- Need extensive third-party integrations
- Building complex agent systems with sophisticated reasoning
- Prefer Python-first development
- Require advanced memory/state management
LangChain Equivalent Implementation
from langchain.chat_models import ChatOpenAI
from langchain.agents import initialize_agent, AgentType
from langchain.tools import tool
from langchain.prompts import ChatPromptTemplate
# Define tools
@tool
def analyze_message(message: str) -> dict:
"""Analyze a Slack message for relevance."""
llm = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_messages([
("system", "Analyze the message for topic, urgency, and relevance."),
("user", "{message}")
])
chain = prompt | llm
return chain.invoke({"message": message})
@tool
def notify_stakeholder(stakeholder_id: str, summary: str) -> str:
"""Send notification to a stakeholder."""
# Implementation here
return f"Notified {stakeholder_id}"
# Create agent
llm = ChatOpenAI(model="gpt-4o")
agent = initialize_agent(
tools=[analyze_message, notify_stakeholder],
llm=llm,
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True
)
result = agent.run("Monitor this channel message and notify relevant parties")
Semantic Kernel
Semantic Kernel is Microsoft’s SDK for integrating LLMs into applications.
Strengths
| Feature | Semantic Kernel | Promptflow |
|---|---|---|
| Language Support | C#, Python, Java | Python |
| Enterprise Focus | Strong enterprise patterns | Azure AI focus |
| Plugin System | Extensible plugin architecture | Node-based tools |
| Memory | Semantic memory built-in | External integration |
| Orchestration | Planner-based orchestration | DAG-based flows |
When to Choose Semantic Kernel
- Working in a .NET/C# environment
- Need enterprise-grade patterns
- Building applications with semantic memory
- Prefer SDK-style development
Semantic Kernel Equivalent Implementation
import semantic_kernel as sk
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion
# Initialize kernel
kernel = sk.Kernel()
kernel.add_chat_service("gpt4", OpenAIChatCompletion("gpt-4o", api_key))
# Define semantic function
analyze_message = kernel.create_semantic_function(
"""
Analyze the following Slack message and classify its relevance:
Message: {{$message}}
Provide:
1. Topic
2. Urgency (high/medium/low)
3. Relevant stakeholders
""",
function_name="analyze_message",
plugin_name="SlackMonitor"
)
# Define native function
@sk.kernel_function(name="notify_stakeholder")
def notify_stakeholder(stakeholder_id: str, summary: str) -> str:
"""Send notification to stakeholder."""
# Implementation
return f"Notified {stakeholder_id}"
kernel.register_native_function(notify_stakeholder)
# Create plan and execute
from semantic_kernel.planning import SequentialPlanner
planner = SequentialPlanner(kernel)
plan = await planner.create_plan_async("Analyze the message and notify stakeholders")
result = await plan.invoke_async()
Comparison Summary
| Aspect | Promptflow | LangChain | Semantic Kernel |
|---|---|---|---|
| Learning Curve | Moderate | Steep | Moderate |
| Visual Development | Yes (flow diagrams) | No | No |
| Testing/Evaluation | Built-in | Separate tools | Basic |
| Azure Integration | Native | Through LangChain | Native |
| Community Size | Growing | Large | Medium |
| Enterprise Ready | Yes | Yes | Yes |
| Best For | Azure-centric orgs | Complex agents | .NET shops |
Choosing the Right Framework
Choose Promptflow when:
- Your organization uses Azure AI services
- You prefer visual flow development
- You need built-in evaluation and testing
- You want straightforward deployment to Azure
Choose LangChain when:
- You need maximum flexibility and integrations
- Building complex multi-agent systems
- You have diverse LLM provider requirements
- Community support and examples are important
Choose Semantic Kernel when:
- Working primarily in C# or Java
- Need semantic memory capabilities
- Following Microsoft’s enterprise patterns
- Building applications alongside existing .NET infrastructure
Conclusion
Microsoft Promptflow provides a comprehensive framework for developing, testing, and deploying AI agents. Its strengths lie in its visual development experience, built-in evaluation capabilities, and seamless integration with Azure AI services.
Through our two use cases—a Slack channel monitor and a security review bot—we’ve demonstrated how Promptflow can be used to build practical, production-ready AI agents that provide real value to organizations. The framework’s DAG-based approach to flow design makes complex workflows manageable and testable.
When compared to alternatives like LangChain and Semantic Kernel, Promptflow offers a middle ground: more structure than LangChain’s flexibility, with more accessibility than Semantic Kernel’s enterprise focus. The choice ultimately depends on your organization’s technology stack, team expertise, and specific requirements.
Key Takeaways
- Start with clear flows: Promptflow’s DAG structure encourages well-defined, testable workflows
- Leverage built-in evaluation: Use the evaluation framework from day one to ensure quality
- Consider costs early: Model selection and caching strategies significantly impact operational costs
- Test iteratively: The local testing capabilities allow rapid iteration before deployment
- Plan for production: Promptflow’s Azure integration makes production deployment straightforward