Maintain local ⭤ remote in sync with automatic AT Protocol parity for Laravel (alpha & unstable)

Importing Records#

Parity includes a comprehensive import system that enables you to sync historical AT Protocol data to your Eloquent models. This complements the real-time sync provided by ParitySignal.

The Cold Start Problem#

When you start consuming the AT Protocol firehose with ParitySignal, you only receive events from that point forward. Any records created before you started listening are not captured.

Importing solves this "cold start" problem by fetching existing records from user repositories via the com.atproto.repo.listRecords API.

Quick Start#

1. Run the Migration#

Publish and run the migration to create the import state tracking table:

php artisan vendor:publish --tag=parity-migrations
php artisan migrate

2. Import a User#

# Import all registered collections for a user
php artisan parity:import did:plc:z72i7hdynmk6r22z27h6tvur

# Import a specific collection
php artisan parity:import did:plc:z72i7hdynmk6r22z27h6tvur --collection=app.bsky.feed.post

# Show progress
php artisan parity:import did:plc:z72i7hdynmk6r22z27h6tvur --progress

3. Check Status#

# Show all import status
php artisan parity:import-status

# Show status for a specific user
php artisan parity:import-status did:plc:z72i7hdynmk6r22z27h6tvur

# Show only incomplete imports
php artisan parity:import-status --pending

Programmatic Usage#

ImportService#

The ImportService is the main orchestration class:

use SocialDept\AtpParity\Import\ImportService;

$service = app(ImportService::class);

// Import all registered collections for a user
$result = $service->importUser('did:plc:z72i7hdynmk6r22z27h6tvur');

echo "Synced {$result->recordsSynced} records";

// Import a specific collection
$result = $service->importUserCollection(
    'did:plc:z72i7hdynmk6r22z27h6tvur',
    'app.bsky.feed.post'
);

// With progress callback
$result = $service->importUser('did:plc:z72i7hdynmk6r22z27h6tvur', null, function ($progress) {
    echo "Synced {$progress->recordsSynced} records from {$progress->collection}\n";
});

ImportResult#

The ImportResult value object provides information about the import operation:

$result = $service->importUser($did);

$result->recordsSynced;   // Number of records successfully synced
$result->recordsSkipped;  // Number of records skipped
$result->recordsFailed;   // Number of records that failed to sync
$result->completed;       // Whether the import completed fully
$result->cursor;          // Cursor for resuming (if incomplete)
$result->error;           // Error message (if failed)

$result->isSuccess();     // True if completed without errors
$result->isPartial();     // True if some records were synced before failure
$result->isFailed();      // True if an error occurred

Checking Status#

// Check if a collection has been imported
if ($service->isImported($did, 'app.bsky.feed.post')) {
    echo "Already imported!";
}

// Get detailed status
$state = $service->getStatus($did, 'app.bsky.feed.post');

if ($state) {
    echo "Status: {$state->status}";
    echo "Records synced: {$state->records_synced}";
}

// Get all statuses for a user
$states = $service->getStatusForUser($did);

Resuming Interrupted Imports#

If an import is interrupted (network error, timeout, etc.), you can resume it:

// Resume a specific import
$state = $service->getStatus($did, $collection);
if ($state && $state->canResume()) {
    $result = $service->resume($state);
}

// Resume all interrupted imports
$results = $service->resumeAll();

Resetting Import State#

To re-import a user or collection:

// Reset a specific collection
$service->reset($did, 'app.bsky.feed.post');

// Reset all collections for a user
$service->resetUser($did);

Queue Integration#

For large-scale importing, use the queue system:

Command Line#

# Queue an import job instead of running synchronously
php artisan parity:import did:plc:z72i7hdynmk6r22z27h6tvur --queue

# Queue imports for a list of DIDs
php artisan parity:import --file=dids.txt --queue

Programmatic#

use SocialDept\AtpParity\Jobs\ImportUserJob;

// Dispatch a single user import
ImportUserJob::dispatch('did:plc:z72i7hdynmk6r22z27h6tvur');

// Dispatch for a specific collection
ImportUserJob::dispatch('did:plc:z72i7hdynmk6r22z27h6tvur', 'app.bsky.feed.post');

Events#

Parity dispatches events during importing that you can listen to:

ImportStarted#

Fired when an import operation begins:

use SocialDept\AtpParity\Events\ImportStarted;

Event::listen(ImportStarted::class, function (ImportStarted $event) {
    Log::info("Starting import", [
        'did' => $event->did,
        'collection' => $event->collection,
    ]);
});

ImportProgress#

Fired after each page of records is processed:

use SocialDept\AtpParity\Events\ImportProgress;

Event::listen(ImportProgress::class, function (ImportProgress $event) {
    Log::info("Import progress", [
        'did' => $event->did,
        'collection' => $event->collection,
        'records_synced' => $event->recordsSynced,
    ]);
});

ImportCompleted#

Fired when an import operation completes successfully:

use SocialDept\AtpParity\Events\ImportCompleted;

Event::listen(ImportCompleted::class, function (ImportCompleted $event) {
    $result = $event->result;

    Log::info("Import completed", [
        'did' => $result->did,
        'collection' => $result->collection,
        'records_synced' => $result->recordsSynced,
    ]);
});

ImportFailed#

Fired when an import operation fails:

use SocialDept\AtpParity\Events\ImportFailed;

Event::listen(ImportFailed::class, function (ImportFailed $event) {
    Log::error("Import failed", [
        'did' => $event->did,
        'collection' => $event->collection,
        'error' => $event->error,
    ]);
});

Configuration#

Configure importing in config/parity.php:

'import' => [
    // Records per page when listing from PDS (max 100)
    'page_size' => 100,

    // Delay between pages in milliseconds (rate limiting)
    'page_delay' => 100,

    // Queue name for import jobs
    'queue' => 'parity-import',

    // Database table for storing import state
    'state_table' => 'parity_import_states',
],

Batch Importing from File#

Create a file with DIDs (one per line):

did:plc:z72i7hdynmk6r22z27h6tvur
did:plc:ewvi7nxzyoun6zhxrhs64oiz
did:plc:ragtjsm2j2vknwkz3zp4oxrd

Then run:

# Synchronous (one at a time)
php artisan parity:import --file=dids.txt --progress

# Queued (parallel via workers)
php artisan parity:import --file=dids.txt --queue

Coordinating with ParitySignal#

For a complete sync solution, combine importing with real-time firehose sync:

  1. Start the firehose consumer - Begin receiving live events
  2. Import historical data - Fetch existing records
  3. Continue firehose sync - New events are handled automatically

This ensures no gaps in your data. Records that arrive via firehose while importing will be properly deduplicated by the mapper's upsert() method (which uses the AT Protocol URI as the unique key).

// Example: Import a user then subscribe to their updates
$service->importUser($did);

// The firehose consumer (ParitySignal) handles updates automatically
// as long as it's running with signal:consume

Best Practices#

Rate Limiting#

The page_delay config option helps prevent overwhelming PDS servers. For bulk importing, consider:

  • Using queued jobs to spread load over time
  • Increasing the delay between pages
  • Running during off-peak hours

Error Handling#

Imports can fail due to:

  • Network errors
  • PDS rate limiting
  • Invalid records

The system automatically tracks progress via cursor, allowing you to resume failed imports:

# Check for failed imports
php artisan parity:import-status --failed

# Resume all failed/interrupted imports
php artisan parity:import --resume

Monitoring#

Use the events to build monitoring:

// Track import metrics
Event::listen(ImportCompleted::class, function (ImportCompleted $event) {
    Metrics::increment('parity.import.completed');
    Metrics::gauge('parity.import.records', $event->result->recordsSynced);
});

Event::listen(ImportFailed::class, function (ImportFailed $event) {
    Metrics::increment('parity.import.failed');
    Alert::send("Import failed for {$event->did}: {$event->error}");
});

Database Schema#

The import state table stores progress:

Column Type Description
id bigint Primary key
did string The DID being imported
collection string The collection NSID
status string pending, in_progress, completed, failed
cursor string Pagination cursor for resuming
records_synced int Count of successfully synced records
records_skipped int Count of skipped records
records_failed int Count of failed records
started_at timestamp When import started
completed_at timestamp When import completed
error text Error message if failed
created_at timestamp
updated_at timestamp

The combination of did and collection is unique.