Architecture
Overview
The Priority Pipeline is a monthly LinkedIn profile scraping system that uses BrightData to scrape priority profiles (executives, directors, key decision-makers). It uses a webhook-based architecture with Cloud Run services and jobs.
Pipeline Flow
1. Coordinator Job (Cloud Run Job - Monthly Schedule)
↓
2. BigQuery Query
- Fetch priority profiles from PersonFields (isPriority=TRUE)
- Join with PersonIdentifier for LinkedIn usernames
- Exclude profiles scraped in last 25 days
↓
3. Create Batches (20 profiles each)
↓
4. Publish to Pub/Sub (with rate limiting)
↓
5. API Service receives Pub/Sub push
↓
6. Trigger BrightData scraping with webhook URL
↓
7. BrightData scrapes profiles (asynchronously)
↓
8. BrightData POSTs data to Webhook Service
↓
9. Webhook Service:
- Inserts to LinkedinApiCall table
- Backs up to GCS bucket
Components
Cloud Run Services
| Service | Purpose | Trigger |
|---|---|---|
priority-pipeline-api |
Receives Pub/Sub batches, triggers BrightData | Pub/Sub push |
priority-pipeline-webhook |
Receives scraped data from BrightData | HTTP webhook |
Cloud Run Jobs
| Job | Purpose | Schedule |
|---|---|---|
priority-pipeline-coordinator |
Query profiles, create batches, publish to Pub/Sub | Monthly (5th at 00:00 UTC) |
Data Storage
BigQuery Tables:
- linkedin.LinkedinApiCall - Stores scraped profile data
- credentity.PersonFields - Source for priority profiles
- credmodel_google.PersonIdentifier - LinkedIn usernames
GCS Bucket:
- brightdata-monthly-priority-people - Backup storage
Pub/Sub Topic:
- linkedin-scraping-batches - Batch distribution
Data Flow Details
1. Coordinator Job (Monthly)
Schedule: 5th of each month at 00:00 UTC
Process: 1. Queries BigQuery for priority profiles 2. Filters out profiles scraped in last 25 days 3. Creates batches of 20 profiles each 4. Publishes batches to Pub/Sub with rate limiting: - Small delay after 100 batches (5 seconds) - Large delay after 1000 batches (5 minutes)
SQL Query (from config.py):
SELECT DISTINCT
pi.identifierValue AS linkedin_username,
pf.personId
FROM `credentity.PersonFields` pf
JOIN `credmodel_google.PersonIdentifier` pi
ON pf.personId = pi.personId
WHERE pf.isPriority = TRUE
AND pi.identifierType = 'LINKEDIN'
AND pi.identifierValue IS NOT NULL
AND NOT EXISTS (
SELECT 1
FROM `linkedin.LinkedinApiCall` lac
WHERE lac.requestResource = pi.identifierValue
AND lac.requestType = 'BRIGHTDATA_API_PERSON'
AND lac.requestDate > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 25 DAY)
)
2. API Service
Trigger: Pub/Sub push subscription
Process: 1. Receives batch from Pub/Sub 2. Decodes base64 message 3. For each profile in batch: - Calls BrightData API - Passes webhook URL for result delivery 4. Returns success/failure status
Configuration: - Concurrency: 1 (processes one batch at a time) - Max instances: 1000 - Timeout: 3600 seconds
3. BrightData Scraping
Service: BrightData API
Process: 1. Receives scraping request from API service 2. Scrapes LinkedIn profile 3. POSTs result to webhook URL
Typical Duration: 5-30 minutes per profile
4. Webhook Service
Trigger: HTTP POST from BrightData
Process: 1. Receives scraped profile data 2. Validates data format 3. Writes to BigQuery LinkedinApiCall table 4. Backs up to GCS bucket 5. Returns success status
Configuration: - Concurrency: 1 - Max instances: 5000 - Timeout: 3600 seconds
Configuration Settings
From config.py:
| Setting | Value | Description |
|---|---|---|
batch_size |
20 | Profiles per batch |
refresh_window_days |
25 | Skip profiles scraped within this window |
batches_per_small_group |
100 | Batches before 5s delay |
batches_per_large_group |
1000 | Batches before 5min delay |
Error Handling
Coordinator Job Failures
- Job has max-retries=0 (no automatic retries)
- Manual re-execution required if failed
- Logs available in Cloud Logging
API Service Failures
- Pub/Sub retries automatically
- Failed batches are redelivered
- Dead letter queue after max retries
Webhook Failures
- BrightData retries webhook delivery
- Data preserved in BrightData dashboard
- Manual recovery possible from BrightData
File Structure
src/priority_pipeline/
api_main.py # API service entry point
webhook_main.py # Webhook service entry point
config.py # Configuration and SQL queries
api/
routers/
batch_trigger.py # POST /batch-trigger/ endpoint
bigquery/
linkedin_api_call.py # LinkedinApiCall table operations
queries.py # SQL queries
brightdata/
person_scraper.py # BrightData API client
gcs/
backup.py # GCS backup operations
jobs/
coordinator.py # Coordinator job (monthly)
webhook/
handler.py # Webhook data processing