Gcp Specialist Examples
Externalized from the agent definition per the few-shot-examples rule (#1587).
GCP Specialist — Worked Examples
Externalized from the agent definition per the few-shot-examples rule (#1587).
Process Sample Blocks
These are the full sample command/IaC/code blocks for the GCP Specialist's "Your Process" steps. The agent definition retains a compact capability summary for each step; the complete samples live here.
1. Project and IAM Structure
GCP resources live inside projects; projects inside folders; folders inside an organization:
# List project hierarchy under an organization
gcloud resource-manager folders list \
--organization=$(gcloud organizations list --format='value(name)' | head -1)
# Create a folder for environment isolation
gcloud resource-manager folders create \
--display-name="Production" \
--organization=$(gcloud organizations list --format='value(name)')
# Apply an organization policy to deny public IPs on all VMs
gcloud org-policies set-policy - <<'EOF'
name: organizations/123456789/policies/compute.vmExternalIpAccess
spec:
rules:
- denyAll: true
EOF
# Grant workload identity to a service account (least privilege)
gcloud projects add-iam-policy-binding my-project \
--member="serviceAccount:[email protected]" \
--role="roles/run.invoker" \
--condition='expression=resource.name.startsWith("projects/my-project/locations/us-central1/services/my-service"),title=service-scope'
2. Terraform GCP Infrastructure
# main.tf — GKE Autopilot with Workload Identity and private networking
terraform {
required_version = ">= 1.7"
required_providers {
google = {
source = "hashicorp/google"
version = "~> 5.0"
}
}
backend "gcs" {
bucket = "my-terraform-state-bucket"
prefix = "gke/production"
}
}
variable "project_id" { type = string }
variable "region" { type = string; default = "us-central1" }
variable "env" { type = string }
locals {
cluster_name = "gke-${var.env}-${var.region}"
network_name = "vpc-${var.env}"
}
# VPC with Private Google Access for private cluster nodes
resource "google_compute_network" "vpc" {
name = local.network_name
project = var.project_id
auto_create_subnetworks = false
}
resource "google_compute_subnetwork" "gke" {
name = "subnet-gke-${var.env}"
project = var.project_id
region = var.region
network = google_compute_network.vpc.id
ip_cidr_range = "10.0.0.0/20"
private_ip_google_access = true # Required for private cluster internet egress
secondary_ip_range {
range_name = "pods"
ip_cidr_range = "10.4.0.0/14"
}
secondary_ip_range {
range_name = "services"
ip_cidr_range = "10.8.0.0/20"
}
}
# GKE Autopilot: Google manages node pools; you manage workloads
resource "google_container_cluster" "primary" {
name = local.cluster_name
project = var.project_id
location = var.region
enable_autopilot = true # Removes node pool management; enforces security baselines
network = google_compute_network.vpc.id
subnetwork = google_compute_subnetwork.gke.id
ip_allocation_policy {
cluster_secondary_range_name = "pods"
services_secondary_range_name = "services"
}
private_cluster_config {
enable_private_nodes = true
enable_private_endpoint = false # true locks control plane to VPN only
master_ipv4_cidr_block = "172.16.0.0/28"
}
workload_identity_config {
workload_pool = "${var.project_id}.svc.id.goog"
}
release_channel {
channel = "REGULAR" # RAPID for new features; STABLE for regulated workloads
}
maintenance_policy {
recurring_window {
start_time = "2024-01-01T02:00:00Z"
end_time = "2024-01-01T06:00:00Z"
recurrence = "FREQ=WEEKLY;BYDAY=SU"
}
}
deletion_protection = true
}
3. Cloud Run Service Configuration
# Cloud Run with traffic splitting and Secret Manager integration
resource "google_cloud_run_v2_service" "app" {
name = "my-api-${var.env}"
project = var.project_id
location = var.region
ingress = "INGRESS_TRAFFIC_INTERNAL_LOAD_BALANCER" # No direct public access
template {
service_account = google_service_account.app.email
scaling {
min_instance_count = var.env == "prod" ? 2 : 0 # Keep-warm in prod
max_instance_count = 100
}
containers {
image = "us-central1-docker.pkg.dev/${var.project_id}/my-repo/my-api:latest"
resources {
limits = {
cpu = "2"
memory = "1Gi"
}
cpu_idle = true # CPU throttled between requests; set false for background processing
}
env {
name = "PROJECT_ID"
value = var.project_id
}
# Reference Secret Manager secrets without storing values in IaC
env {
name = "DATABASE_URL"
value_source {
secret_key_ref {
secret = google_secret_manager_secret.db_url.secret_id
version = "latest"
}
}
}
startup_probe {
http_get { path = "/healthz" }
initial_delay_seconds = 5
timeout_seconds = 1
period_seconds = 3
failure_threshold = 10
}
liveness_probe {
http_get { path = "/healthz" }
period_seconds = 30
failure_threshold = 3
}
}
}
traffic {
type = "TRAFFIC_TARGET_ALLOCATION_TYPE_LATEST"
percent = 100
}
}
# Cloud Load Balancer with Cloud Armor (WAF)
resource "google_compute_backend_service" "app" {
name = "bs-app-${var.env}"
project = var.project_id
backend {
group = google_compute_region_network_endpoint_group.app.id
}
security_policy = google_compute_security_policy.waf.id
}
resource "google_compute_security_policy" "waf" {
name = "waf-${var.env}"
project = var.project_id
rule {
action = "deny(403)"
priority = 1000
match {
expr {
expression = "evaluatePreconfiguredExpr('sqli-stable')"
}
}
description = "Block SQL injection"
}
rule {
action = "throttle"
priority = 2000
match {
versioned_expr = "SRC_IPS_V1"
config {
src_ip_ranges = ["*"]
}
}
rate_limit_options {
rate_limit_threshold {
count = 1000
interval_sec = 60
}
conform_action = "allow"
exceed_action = "deny(429)"
enforce_on_key = "IP"
}
description = "Rate limit: 1000 req/min per IP"
}
rule {
action = "allow"
priority = 2147483647
match {
versioned_expr = "SRC_IPS_V1"
config { src_ip_ranges = ["*"] }
}
description = "Default allow"
}
}
4. BigQuery Schema and Optimization
Partition and cluster every large table — queries that filter on partition columns skip entire file groups:
# Create partitioned, clustered table optimized for time-series event queries
bq mk \
--table \
--schema 'event_id:STRING,user_id:STRING,event_type:STRING,properties:JSON,created_at:TIMESTAMP' \
--time_partitioning_field created_at \
--time_partitioning_type DAY \
--clustering_fields user_id,event_type \
--require_partition_filter true \ # Prevent full-table scans
--description "User events — partitioned by day, clustered by user and type" \
my-project:my_dataset.user_events
# Check table partition metadata and row distribution
bq query --use_legacy_sql=false "
SELECT
partition_id,
total_rows,
total_logical_bytes / POW(1024, 3) AS gb,
last_modified_time
FROM \`my-project.my_dataset.INFORMATION_SCHEMA.PARTITIONS\`
WHERE table_name = 'user_events'
ORDER BY partition_id DESC
LIMIT 30
"
# Identify expensive queries via INFORMATION_SCHEMA
bq query --use_legacy_sql=false "
SELECT
job_id,
query,
total_bytes_processed / POW(1024, 4) AS tb_processed,
total_slot_ms / 1000 AS slot_seconds,
creation_time
FROM \`region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT\`
WHERE creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
AND statement_type = 'SELECT'
AND total_bytes_processed > 100 * POW(1024, 3) -- Only queries processing >100GB
ORDER BY total_bytes_processed DESC
LIMIT 20
"
-- BigQuery ML: train a classification model in-database (no data export)
CREATE OR REPLACE MODEL `my_dataset.churn_classifier`
OPTIONS (
model_type = 'LOGISTIC_REG',
input_label_cols = ['churned'],
auto_class_weights = TRUE,
enable_global_explain = TRUE, -- Shapley feature importance
max_iterations = 20,
data_split_method = 'AUTO_SPLIT'
) AS
SELECT
user_id,
days_since_last_login,
total_purchases_30d,
avg_session_duration_sec,
support_tickets_90d,
account_age_days,
churned -- Label: 1 if churned within 30 days, 0 if retained
FROM `my_dataset.user_features`
WHERE feature_date = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY);
-- Evaluate model performance
SELECT *
FROM ML.EVALUATE(MODEL `my_dataset.churn_classifier`);
-- Batch predict on current users
SELECT
user_id,
predicted_churned,
predicted_churned_probs[OFFSET(1)].prob AS churn_probability
FROM ML.PREDICT(
MODEL `my_dataset.churn_classifier`,
(
SELECT * FROM `my_dataset.user_features`
WHERE feature_date = CURRENT_DATE()
)
)
WHERE predicted_churned_probs[OFFSET(1)].prob > 0.7 -- High churn risk
ORDER BY churn_probability DESC;
5. Pub/Sub Event-Driven Architecture
# Pub/Sub topic with schema validation and dead-letter handling
resource "google_pubsub_schema" "order_event" {
name = "order-event-schema"
project = var.project_id
type = "AVRO"
definition = jsonencode({
type = "record"
name = "OrderEvent"
fields = [
{ name = "order_id", type = "string" },
{ name = "user_id", type = "string" },
{ name = "event_type", type = "string" },
{ name = "amount_cents", type = "int" },
{ name = "occurred_at", type = "string" }
]
})
}
resource "google_pubsub_topic" "orders" {
name = "orders-${var.env}"
project = var.project_id
schema_settings {
schema = google_pubsub_schema.order_event.id
encoding = "JSON"
}
message_retention_duration = "604800s" # 7 days — replay capability for outages
}
resource "google_pubsub_topic" "orders_dead_letter" {
name = "orders-dead-letter-${var.env}"
project = var.project_id
}
resource "google_pubsub_subscription" "order_processor" {
name = "order-processor-${var.env}"
project = var.project_id
topic = google_pubsub_topic.orders.id
ack_deadline_seconds = 60 # Processing SLA; extend with modifyAckDeadline for long jobs
message_retention_duration = "604800s"
retry_policy {
minimum_backoff = "10s"
maximum_backoff = "600s" # Exponential backoff up to 10 minutes
}
dead_letter_policy {
dead_letter_topic = google_pubsub_topic.orders_dead_letter.id
max_delivery_attempts = 5 # After 5 failures, route to dead-letter for inspection
}
expiration_policy { ttl = "" } # Never expire — retain for replay
push_config {
push_endpoint = google_cloud_run_v2_service.order_processor.uri
oidc_token {
service_account_email = google_service_account.pubsub_invoker.email
}
}
}
# Cloud Function gen2: process Pub/Sub messages with structured logging
import functions_framework
import base64
import json
import logging
from google.cloud import bigquery
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
bq_client = bigquery.Client() # Reused across invocations (warm instance)
@functions_framework.cloud_event
def process_order_event(cloud_event):
"""Process order events from Pub/Sub and write to BigQuery."""
try:
message_data = base64.b64decode(cloud_event.data["message"]["data"])
event = json.loads(message_data)
row = {
"order_id": event["order_id"],
"user_id": event["user_id"],
"event_type": event["event_type"],
"amount_cents": event["amount_cents"],
"occurred_at": event["occurred_at"],
"processed_at": "AUTO",
}
errors = bq_client.insert_rows_json("my-project.orders.events", [row])
if errors:
logger.error("BigQuery insert error", extra={"errors": errors, "order_id": event["order_id"]})
raise RuntimeError(f"BigQuery insert failed: {errors}")
logger.info("Order event processed", extra={"order_id": event["order_id"], "type": event["event_type"]})
except (KeyError, json.JSONDecodeError) as e:
# Malformed message — do NOT raise; raising causes infinite retry
# Return 200 so Pub/Sub acknowledges and routes to dead-letter after max_delivery_attempts
logger.error("Malformed message, dropping", extra={"error": str(e)})
6. Dataflow Pipeline for Stream Processing
# Apache Beam pipeline running on Dataflow
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition
class EnrichEvent(beam.DoFn):
def process(self, element):
import json
event = json.loads(element)
# Enrich: add geolocation, resolve user segment, etc.
event["region"] = self._lookup_region(event.get("ip_address"))
yield event
def _lookup_region(self, ip: str) -> str:
# In production: call MaxMind or IP2Location
return "us-east"
def run():
options = PipelineOptions(
runner="DataflowRunner",
project="my-project",
region="us-central1",
temp_location="gs://my-dataflow-temp/tmp",
staging_location="gs://my-dataflow-temp/staging",
streaming=True,
save_main_session=True,
max_num_workers=10,
worker_machine_type="n2-standard-4",
use_public_ips=False, # Private IPs; requires Cloud NAT
)
with beam.Pipeline(options=options) as p:
events = (
p
| "ReadPubSub" >> ReadFromPubSub(
subscription="projects/my-project/subscriptions/order-processor-prod"
)
| "EnrichEvent" >> beam.ParDo(EnrichEvent())
| "WriteBigQuery" >> WriteToBigQuery(
table="my-project:orders.enriched_events",
schema={
"fields": [
{"name": "order_id", "type": "STRING", "mode": "REQUIRED"},
{"name": "user_id", "type": "STRING", "mode": "REQUIRED"},
{"name": "region", "type": "STRING", "mode": "NULLABLE"},
{"name": "occurred_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
]
},
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
method="STREAMING_INSERTS",
)
)
if __name__ == "__main__":
run()
7. Vertex AI Model Serving
# Deploy a trained model to Vertex AI Endpoints
MODEL_ID=$(gcloud ai models upload \
--region=us-central1 \
--display-name="churn-classifier-v2" \
--container-image-uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest" \
--artifact-uri="gs://my-models/churn-classifier/v2/" \
--format="value(model)")
ENDPOINT_ID=$(gcloud ai endpoints create \
--region=us-central1 \
--display-name="churn-endpoint" \
--format="value(name)" | rev | cut -d'/' -f1 | rev)
gcloud ai endpoints deploy-model "$ENDPOINT_ID" \
--region=us-central1 \
--model="$MODEL_ID" \
--display-name="churn-v2" \
--machine-type="n1-standard-4" \
--min-replica-count=1 \
--max-replica-count=10 \
--traffic-split="0=100"
# Online prediction
gcloud ai endpoints predict "$ENDPOINT_ID" \
--region=us-central1 \
--json-request='{"instances": [{"days_since_last_login": 45, "total_purchases_30d": 0, "support_tickets_90d": 3}]}'
Few-Shot Examples
Example 1: BigQuery Partition Pruning Investigation
Input:
"Our BigQuery queries are slow and we're getting billed for scanning 500TB per query. The table has 3TB of actual data."
Output:
500TB scanned on a 3TB table means partition pruning is not working — queries are scanning every partition. Diagnose:
-- Check whether recent queries used partition filters
SELECT
job_id,
query,
total_bytes_processed / POW(1024, 4) AS tb_scanned,
REGEXP_CONTAINS(query, r'WHERE.*partition_col') AS has_partition_filter,
creation_time
FROM `region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
AND total_bytes_processed > 50 * POW(1024, 3)
ORDER BY total_bytes_processed DESC
Common root causes:
1. Partition filter applied to a derived column — BigQuery cannot prune:
-- WRONG: function on partition column defeats pruning
WHERE DATE(created_at) = '2024-01-15'
-- RIGHT: filter directly on the partition column
WHERE created_at >= '2024-01-15'
AND created_at < '2024-01-16'
2. Partition filter is on a subquery — pruning not propagated:
-- WRONG: partition filter in subquery, outer query scans all
SELECT * FROM (SELECT * FROM my_table) WHERE created_at = '2024-01-15'
-- RIGHT: filter in the outermost FROM clause
SELECT * FROM my_table WHERE created_at = '2024-01-15'
3. Enforce partition filters at the table level:
bq update \
--require_partition_filter \
my-project:my_dataset.my_table
After enforcing partition filters, any query without a partition predicate will fail with an explicit error instead of silently scanning all partitions.
Example 2: GKE Workload Identity Setup
Input:
"How do I let my GKE pod access a Cloud Storage bucket without a service account key file?"
Output:
Workload Identity binds a Kubernetes service account to a GCP service account using the cluster's OIDC token. The pod gets GCP credentials automatically.
# Step 1: Enable Workload Identity on the cluster (if not already enabled)
gcloud container clusters update my-cluster \
--region=us-central1 \
--workload-pool=my-project.svc.id.goog
# Step 2: Create a GCP service account for your app
gcloud iam service-accounts create my-app-sa \
--display-name="My App Service Account"
# Step 3: Grant the GCS role to the service account
gcloud projects add-iam-policy-binding my-project \
--member="serviceAccount:[email protected]" \
--role="roles/storage.objectViewer"
# Step 4: Allow the Kubernetes service account to impersonate the GCP service account
gcloud iam service-accounts add-iam-policy-binding [email protected] \
--role="roles/iam.workloadIdentityUser" \
--member="serviceAccount:my-project.svc.id.goog[my-namespace/my-ksa]"
# Step 5: Annotate the Kubernetes service account
apiVersion: v1
kind: ServiceAccount
metadata:
name: my-ksa
namespace: my-namespace
annotations:
iam.gke.io/gcp-service-account: [email protected]
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-app
spec:
template:
spec:
serviceAccountName: my-ksa # Use the annotated KSA
containers:
- name: app
image: my-app:latest
env:
- name: BUCKET_NAME
value: my-data-bucket
# No secret mounts needed — Application Default Credentials works automatically
# Application code — no credentials; Google client libraries use ADC automatically
from google.cloud import storage
client = storage.Client() # Uses Workload Identity token automatically
bucket = client.bucket(os.environ["BUCKET_NAME"])
blob = bucket.blob("my/file.json")
data = blob.download_as_bytes()
Example 3: Cloud Run Cold Start Elimination
Input:
"Our Cloud Run service has 2-3 second cold starts that affect user experience. It's a Node.js service handling API requests."
Output:
Cold starts occur when a new instance initializes. Three levers: minimum instances, startup probe tuning, and reducing initialization work.
# Set minimum instances to keep instances warm
resource "google_cloud_run_v2_service" "api" {
template {
scaling {
min_instance_count = 2 # Always-warm; costs ~$8/mo per instance at n1-standard-1
max_instance_count = 50
}
containers {
startup_probe {
http_get { path = "/healthz" }
initial_delay_seconds = 0 # Start probing immediately
period_seconds = 1 # Probe every second
failure_threshold = 30 # Allow 30 seconds total for startup
timeout_seconds = 1
}
}
}
}
// Optimize Node.js initialization: defer expensive work outside the handler
// WRONG: expensive initialization inside the handler
exports.handler = async (req, res) => {
const db = new Database(process.env.DATABASE_URL); // New connection on every cold start
await db.connect();
// ...
};
// RIGHT: initialize at module level — runs once per instance, not per request
import { Database } from './db.js';
// Module-level initialization — executed once when the instance starts
const db = new Database(process.env.DATABASE_URL);
await db.connect(); // Top-level await in ESM modules
export async function handler(req, res) {
// db is already connected — no cold start penalty
const result = await db.query('SELECT 1');
res.json({ status: 'ok' });
}
With `min_instance_count = 2`, users never hit a cold start unless traffic spikes beyond 2 concurrent instances. At that point, new instances warm up in parallel — users on existing instances are unaffected.
Expected outcome: P99 latency drops from 2-3s to <200ms for 99%+ of requests. Cost: ~$16/month for 2 minimum instances on the smallest CPU/memory configuration.