Development Guide
Prerequisites
- Python 3.11 or higher
- BrightData account and API token
- Google Cloud Platform project with BigQuery enabled
- Access to required BigQuery tables and GCS bucket
Local Development
1. Install Dependencies
pip install -r requirements.txt
# or using uv
uv sync --dev
uv pip install -e .
2. Set Environment Variables
export GCP_PROJECT_ID="cred-1556636033881"
export BRIGHTDATA="your-brightdata-token"
export WEBHOOK_URL="http://localhost:8081" # For API service
export ENVIRONMENT="dev"
3. Run Services Locally
Run Coordinator Job
cd /path/to/repo
python -m src.priority_pipeline.jobs.coordinator
Run API Service
uvicorn src.priority_pipeline.api_main:app --host 0.0.0.0 --port 8080
Run Webhook Service
uvicorn src.priority_pipeline.webhook_main:app --host 0.0.0.0 --port 8081
4. Test Endpoints
Health Check
# API Service
curl http://localhost:8080/health
# Webhook Service
curl http://localhost:8081/health
Test Batch Trigger (API Service)
curl -X POST http://localhost:8080/batch-trigger/ \
-H "Content-Type: application/json" \
-d '{
"message": {
"data": "eyJwcm9maWxlcyI6IFt7ImxpbmtlZGluX3VzZXJuYW1lIjogInRlc3QtdXNlciIsICJwZXJzb25faWQiOiAidGVzdC1pZCJ9XX0=",
"attributes": {"batch_number": "1", "total_batches": "1"}
}
}'
Development Workflow
Feature Development Process
-
Create Feature Branch
git checkout main git pull origin main git checkout -b feature/your-feature-name -
Develop and Test Locally
- Make your changes
- Test locally using the services above
-
Run any unit tests
-
Open Pull Request
git push origin feature/your-feature-name - Open PR to
mainbranch - This triggers test workflow
-
No automatic deployment on PR
-
Merge and Deploy
- Merge PR to
main - This triggers automatic deployment to production
- Or use manual workflow dispatch for staging
Testing Tips
Query Priority Profiles
Check how many profiles will be scraped:
SELECT COUNT(DISTINCT pi.identifierValue) as profile_count
FROM `credentity.PersonFields` pf
JOIN `credmodel_google.PersonIdentifier` pi
ON pf.personId = pi.personId
WHERE pf.isPriority = TRUE
AND pi.identifierType = 'LINKEDIN'
AND pi.identifierValue IS NOT NULL
AND NOT EXISTS (
SELECT 1
FROM `linkedin.LinkedinApiCall` lac
WHERE lac.requestResource = pi.identifierValue
AND lac.requestType = 'BRIGHTDATA_API_PERSON'
AND lac.requestDate > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 25 DAY)
);
Check Recent Scrapes
SELECT
requestResource,
requestDate,
requestStatus
FROM `linkedin.LinkedinApiCall`
WHERE requestType = 'BRIGHTDATA_API_PERSON'
AND requestDate >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
ORDER BY requestDate DESC
LIMIT 10;
Verify GCS Backup
gsutil ls gs://brightdata-monthly-priority-people/ | tail -10
Common Issues
1. Coordinator Job Not Finding Profiles
Symptoms: Coordinator completes but publishes 0 batches
Solutions:
- Check isPriority=TRUE records exist in PersonFields
- Verify LinkedIn usernames exist in PersonIdentifier
- Check refresh_window_days setting (25 days default)
- Review SQL query in config.py
2. BrightData Not Delivering Data
Symptoms: Profiles triggered but no data in BigQuery
Solutions:
- Verify WEBHOOK_URL is set correctly in API service
- Check webhook service is deployed and accessible
- Review BrightData dashboard for snapshot status
- Check webhook service logs for errors
3. Webhook Service Failures
Symptoms: 500 errors in webhook service
Solutions:
- Verify BigQuery table exists: linkedin.LinkedinApiCall
- Check GCS bucket exists: brightdata-monthly-priority-people
- Verify service account has write permissions
- Review webhook service logs
4. Pub/Sub Messages Not Processed
Symptoms: API service not receiving batches
Solutions: - Check Pub/Sub subscription exists and is active - Verify API service is deployed and running - Check Pub/Sub dead letter queue - Review API service logs
Configuration
Pipeline Settings
Edit src/priority_pipeline/config.py:
# Batch configuration
batch_size = 20 # Profiles per batch
# Refresh window
refresh_window_days = 25 # Days to skip recently scraped profiles
# Rate limiting
batches_per_small_group = 100 # Batches before 5s delay
batches_per_large_group = 1000 # Batches before 5min delay
SQL Query Customization
Modify the priority profile query in config.py:
PRIORITY_PROFILES_QUERY = """
SELECT DISTINCT
pi.identifierValue AS linkedin_username,
pf.personId
FROM `credentity.PersonFields` pf
JOIN `credmodel_google.PersonIdentifier` pi
ON pf.personId = pi.personId
WHERE pf.isPriority = TRUE
-- Add your custom filters here
"""
Best Practices
- Test with small batches first - Use
batch_size = 5for initial testing - Monitor BigQuery costs - Priority queries can be expensive
- Check GCS backup regularly - Ensure backups are being created
- Review BrightData usage - Monitor API quota and costs
- Validate webhook delivery - Check that all triggered profiles get data