Alternative ATProto PDS implementation

reorganzie

+3 -2
.nix/flake.nix
··· 26 26 git 27 27 nixd 28 28 direnv 29 + libpq 29 30 ]; 30 31 overlays = [ (import rust-overlay) ]; 31 32 pkgs = import nixpkgs { ··· 41 42 nativeBuildInputs = with pkgs; [ rust pkg-config ]; 42 43 in 43 44 with pkgs; 44 - { 45 + { 45 46 devShells.default = mkShell { 46 47 inherit buildInputs nativeBuildInputs; 47 48 LD_LIBRARY_PATH = nixpkgs.legacyPackages.x86_64-linux.lib.makeLibraryPath buildInputs; ··· 49 50 DATABASE_URL = "sqlite://data/sqlite.db"; 50 51 }; 51 52 }); 52 - } 53 + }
-182
deployment.bicep
··· 1 - param webAppName string 2 - param location string = resourceGroup().location // Location for all resources 3 - 4 - param sku string = 'B1' // The SKU of App Service Plan 5 - param dockerContainerName string = '${webAppName}:latest' 6 - param repositoryUrl string = 'https://github.com/DrChat/bluepds' 7 - param branch string = 'main' 8 - param customDomain string 9 - 10 - @description('Redeploy hostnames without SSL binding. Just specify `true` if this is the first time you\'re deploying the app.') 11 - param redeployHostnamesHack bool = false 12 - 13 - var acrName = toLower('${webAppName}${uniqueString(resourceGroup().id)}') 14 - var aspName = toLower('${webAppName}-asp') 15 - var webName = toLower('${webAppName}${uniqueString(resourceGroup().id)}') 16 - var sanName = toLower('${webAppName}${uniqueString(resourceGroup().id)}') 17 - 18 - // resource appInsights 'Microsoft.OperationalInsights/workspaces@2023-09-01' = { 19 - // name: '${webAppName}-ai' 20 - // location: location 21 - // properties: { 22 - // publicNetworkAccessForIngestion: 'Enabled' 23 - // workspaceCapping: { 24 - // dailyQuotaGb: 1 25 - // } 26 - // sku: { 27 - // name: 'Standalone' 28 - // } 29 - // } 30 - // } 31 - 32 - // resource appServicePlanDiagnostics 'Microsoft.Insights/diagnosticSettings@2021-05-01-preview' = { 33 - // name: appServicePlan.name 34 - // scope: appServicePlan 35 - // properties: { 36 - // workspaceId: appInsights.id 37 - // metrics: [ 38 - // { 39 - // category: 'AllMetrics' 40 - // enabled: true 41 - // } 42 - // ] 43 - // } 44 - // } 45 - 46 - resource appServicePlan 'Microsoft.Web/serverfarms@2020-06-01' = { 47 - name: aspName 48 - location: location 49 - properties: { 50 - reserved: true 51 - } 52 - sku: { 53 - name: sku 54 - } 55 - kind: 'linux' 56 - } 57 - 58 - resource acrResource 'Microsoft.ContainerRegistry/registries@2023-01-01-preview' = { 59 - name: acrName 60 - location: location 61 - sku: { 62 - name: 'Basic' 63 - } 64 - properties: { 65 - adminUserEnabled: false 66 - } 67 - } 68 - 69 - resource appStorage 'Microsoft.Storage/storageAccounts@2023-05-01' = { 70 - name: sanName 71 - location: location 72 - kind: 'StorageV2' 73 - sku: { 74 - name: 'Standard_LRS' 75 - } 76 - } 77 - 78 - resource fileShare 'Microsoft.Storage/storageAccounts/fileServices/shares@2023-05-01' = { 79 - name: '${appStorage.name}/default/data' 80 - properties: {} 81 - } 82 - 83 - resource appService 'Microsoft.Web/sites@2020-06-01' = { 84 - name: webName 85 - location: location 86 - identity: { 87 - type: 'SystemAssigned' 88 - } 89 - properties: { 90 - httpsOnly: true 91 - serverFarmId: appServicePlan.id 92 - siteConfig: { 93 - // Sigh. This took _far_ too long to figure out. 94 - // We must authenticate to ACR, as no credentials are set up by default 95 - // (the Az CLI will implicitly set them up in the background) 96 - acrUseManagedIdentityCreds: true 97 - appSettings: [ 98 - { 99 - name: 'BLUEPDS_HOST_NAME' 100 - value: empty(customDomain) ? '${webName}.azurewebsites.net' : customDomain 101 - } 102 - { 103 - name: 'BLUEPDS_TEST' 104 - value: 'false' 105 - } 106 - { 107 - name: 'WEBSITES_PORT' 108 - value: '8000' 109 - } 110 - ] 111 - linuxFxVersion: 'DOCKER|${acrName}.azurecr.io/${dockerContainerName}' 112 - } 113 - } 114 - } 115 - 116 - resource hostNameBinding 'Microsoft.Web/sites/hostNameBindings@2024-04-01' = if (redeployHostnamesHack) { 117 - name: customDomain 118 - parent: appService 119 - properties: { 120 - siteName: appService.name 121 - hostNameType: 'Verified' 122 - sslState: 'Disabled' 123 - } 124 - } 125 - 126 - // This stupidity is required because Azure requires a circular dependency in order to define a custom hostname with SSL. 127 - // https://stackoverflow.com/questions/73077972/how-to-deploy-app-service-with-managed-ssl-certificate-using-arm 128 - module certificateBindings './deploymentBindingHack.bicep' = { 129 - name: '${deployment().name}-ssl' 130 - params: { 131 - appServicePlanResourceId: appServicePlan.id 132 - customHostnames: [customDomain] 133 - location: location 134 - webAppName: appService.name 135 - } 136 - dependsOn: [hostNameBinding] 137 - } 138 - 139 - resource appServiceStorageConfig 'Microsoft.Web/sites/config@2024-04-01' = { 140 - name: 'azurestorageaccounts' 141 - parent: appService 142 - properties: { 143 - data: { 144 - type: 'AzureFiles' 145 - shareName: 'data' 146 - mountPath: '/app/data' 147 - accountName: appStorage.name 148 - // WTF? Where's the ability to mount storage via managed identity? 149 - accessKey: appStorage.listKeys().keys[0].value 150 - } 151 - } 152 - } 153 - 154 - @description('This is the built-in AcrPull role. See https://docs.microsoft.com/azure/role-based-access-control/built-in-roles#acrpull') 155 - resource acrPullRoleDefinition 'Microsoft.Authorization/roleDefinitions@2018-01-01-preview' existing = { 156 - scope: subscription() 157 - name: '7f951dda-4ed3-4680-a7ca-43fe172d538d' 158 - } 159 - 160 - resource appServiceAcrPull 'Microsoft.Authorization/roleAssignments@2020-04-01-preview' = { 161 - name: guid(resourceGroup().id, acrResource.id, appService.id, 'AssignAcrPullToAS') 162 - scope: acrResource 163 - properties: { 164 - description: 'Assign AcrPull role to AS' 165 - principalId: appService.identity.principalId 166 - principalType: 'ServicePrincipal' 167 - roleDefinitionId: acrPullRoleDefinition.id 168 - } 169 - } 170 - 171 - resource srcControls 'Microsoft.Web/sites/sourcecontrols@2021-01-01' = { 172 - name: 'web' 173 - parent: appService 174 - properties: { 175 - repoUrl: repositoryUrl 176 - branch: branch 177 - isManualIntegration: true 178 - } 179 - } 180 - 181 - output acr string = acrResource.name 182 - output domain string = appService.properties.hostNames[0]
-30
deploymentBindingHack.bicep
··· 1 - // https://stackoverflow.com/questions/73077972/how-to-deploy-app-service-with-managed-ssl-certificate-using-arm 2 - // 3 - // TLDR: Azure requires a circular dependency in order to define an app service with a custom domain with SSL enabled. 4 - // Terrific user experience. Really makes me love using Azure in my free time. 5 - param webAppName string 6 - param location string 7 - param appServicePlanResourceId string 8 - param customHostnames array 9 - 10 - // Managed certificates can only be created once the hostname is added to the web app. 11 - resource certificates 'Microsoft.Web/certificates@2022-03-01' = [for (fqdn, i) in customHostnames: { 12 - name: '${fqdn}-${webAppName}' 13 - location: location 14 - properties: { 15 - serverFarmId: appServicePlanResourceId 16 - canonicalName: fqdn 17 - } 18 - }] 19 - 20 - // sslState and thumbprint can only be set once the managed certificate is created 21 - @batchSize(1) 22 - resource customHostname 'Microsoft.web/sites/hostnameBindings@2019-08-01' = [for (fqdn, i) in customHostnames: { 23 - name: '${webAppName}/${fqdn}' 24 - properties: { 25 - siteName: webAppName 26 - hostNameType: 'Verified' 27 - sslState: 'SniEnabled' 28 - thumbprint: certificates[i].properties.thumbprint 29 - } 30 - }]
+3 -2
flake.nix
··· 22 22 "rust-analyzer" 23 23 ]; 24 24 })); 25 - 25 + 26 26 inherit (pkgs) lib; 27 27 unfilteredRoot = ./.; # The original, unfiltered source 28 28 src = lib.fileset.toSource { ··· 109 109 git 110 110 nixd 111 111 direnv 112 + libpq 112 113 ]; 113 114 }; 114 115 }) ··· 165 166 }; 166 167 }; 167 168 }); 168 - } 169 + }
+6 -6
src/account_manager/helpers/account.rs
··· 2 2 //! blacksky-algorithms/rsky is licensed under the Apache License 2.0 3 3 //! 4 4 //! Modified for SQLite backend 5 + use crate::schema::pds::account::dsl as AccountSchema; 6 + use crate::schema::pds::actor::dsl as ActorSchema; 5 7 use anyhow::Result; 6 8 use chrono::DateTime; 7 9 use chrono::offset::Utc as UtcOffset; ··· 15 17 AccountStatus, ActorAccount, ActorJoinAccount, AvailabilityFlags, FormattedAccountStatus, 16 18 GetAccountAdminStatusOutput, format_account_status, 17 19 }; 18 - use rsky_pds::schema::pds::account::dsl as AccountSchema; 19 - use rsky_pds::schema::pds::actor::dsl as ActorSchema; 20 20 use std::ops::Add; 21 21 use std::time::SystemTime; 22 22 use thiserror::Error; ··· 253 253 deadpool_diesel::sqlite::Object, 254 254 >, 255 255 ) -> Result<()> { 256 - use rsky_pds::schema::pds::email_token::dsl as EmailTokenSchema; 257 - use rsky_pds::schema::pds::refresh_token::dsl as RefreshTokenSchema; 258 - use rsky_pds::schema::pds::repo_root::dsl as RepoRootSchema; 256 + use crate::schema::pds::email_token::dsl as EmailTokenSchema; 257 + use crate::schema::pds::refresh_token::dsl as RefreshTokenSchema; 258 + use crate::schema::pds::repo_root::dsl as RepoRootSchema; 259 259 260 260 let did = did.to_owned(); 261 261 _ = db ··· 410 410 deadpool_diesel::sqlite::Object, 411 411 >, 412 412 ) -> Result<()> { 413 - use rsky_pds::schema::pds::actor; 413 + use crate::schema::pds::actor; 414 414 415 415 let actor2 = diesel::alias!(actor as actor2); 416 416
+7 -7
src/account_manager/helpers/auth.rs
··· 22 22 deadpool_diesel::sqlite::Object, 23 23 >, 24 24 ) -> Result<()> { 25 - use rsky_pds::schema::pds::refresh_token::dsl as RefreshTokenSchema; 25 + use crate::schema::pds::refresh_token::dsl as RefreshTokenSchema; 26 26 27 27 let exp = from_micros_to_utc((payload.exp.as_millis() / 1000) as i64); 28 28 ··· 53 53 deadpool_diesel::sqlite::Object, 54 54 >, 55 55 ) -> Result<bool> { 56 - use rsky_pds::schema::pds::refresh_token::dsl as RefreshTokenSchema; 56 + use crate::schema::pds::refresh_token::dsl as RefreshTokenSchema; 57 57 db.get() 58 58 .await? 59 59 .interact(move |conn| { ··· 74 74 deadpool_diesel::sqlite::Object, 75 75 >, 76 76 ) -> Result<bool> { 77 - use rsky_pds::schema::pds::refresh_token::dsl as RefreshTokenSchema; 77 + use crate::schema::pds::refresh_token::dsl as RefreshTokenSchema; 78 78 let did = did.to_owned(); 79 79 db.get() 80 80 .await? ··· 97 97 deadpool_diesel::sqlite::Object, 98 98 >, 99 99 ) -> Result<bool> { 100 - use rsky_pds::schema::pds::refresh_token::dsl as RefreshTokenSchema; 100 + use crate::schema::pds::refresh_token::dsl as RefreshTokenSchema; 101 101 102 102 let did = did.to_owned(); 103 103 let app_pass_name = app_pass_name.to_owned(); ··· 122 122 deadpool_diesel::sqlite::Object, 123 123 >, 124 124 ) -> Result<Option<models::RefreshToken>> { 125 - use rsky_pds::schema::pds::refresh_token::dsl as RefreshTokenSchema; 125 + use crate::schema::pds::refresh_token::dsl as RefreshTokenSchema; 126 126 let id = id.to_owned(); 127 127 db.get() 128 128 .await? ··· 144 144 deadpool_diesel::sqlite::Object, 145 145 >, 146 146 ) -> Result<()> { 147 - use rsky_pds::schema::pds::refresh_token::dsl as RefreshTokenSchema; 147 + use crate::schema::pds::refresh_token::dsl as RefreshTokenSchema; 148 148 let did = did.to_owned(); 149 149 150 150 db.get() ··· 175 175 expires_at, 176 176 next_id, 177 177 } = opts; 178 - use rsky_pds::schema::pds::refresh_token::dsl as RefreshTokenSchema; 178 + use crate::schema::pds::refresh_token::dsl as RefreshTokenSchema; 179 179 180 180 drop( 181 181 update(RefreshTokenSchema::refresh_token)
+5 -5
src/account_manager/helpers/email_token.rs
··· 17 17 deadpool_diesel::sqlite::Object, 18 18 >, 19 19 ) -> Result<String> { 20 - use rsky_pds::schema::pds::email_token::dsl as EmailTokenSchema; 20 + use crate::schema::pds::email_token::dsl as EmailTokenSchema; 21 21 let token = get_random_token().to_uppercase(); 22 22 let now = rsky_common::now(); 23 23 ··· 56 56 >, 57 57 ) -> Result<()> { 58 58 let expiration_len = expiration_len.unwrap_or(MINUTE * 15); 59 - use rsky_pds::schema::pds::email_token::dsl as EmailTokenSchema; 59 + use crate::schema::pds::email_token::dsl as EmailTokenSchema; 60 60 61 61 let did = did.to_owned(); 62 62 let token = token.to_owned(); ··· 96 96 >, 97 97 ) -> Result<String> { 98 98 let expiration_len = expiration_len.unwrap_or(MINUTE * 15); 99 - use rsky_pds::schema::pds::email_token::dsl as EmailTokenSchema; 99 + use crate::schema::pds::email_token::dsl as EmailTokenSchema; 100 100 101 101 let token = token.to_owned(); 102 102 let res = db ··· 210 210 deadpool_diesel::sqlite::Object, 211 211 >, 212 212 ) -> Result<()> { 213 - use rsky_pds::schema::pds::email_token::dsl as EmailTokenSchema; 213 + use crate::schema::pds::email_token::dsl as EmailTokenSchema; 214 214 let did = did.to_owned(); 215 215 _ = db 216 216 .get() ··· 233 233 deadpool_diesel::sqlite::Object, 234 234 >, 235 235 ) -> Result<()> { 236 - use rsky_pds::schema::pds::email_token::dsl as EmailTokenSchema; 236 + use crate::schema::pds::email_token::dsl as EmailTokenSchema; 237 237 238 238 let did = did.to_owned(); 239 239 _ = db
+12 -12
src/account_manager/helpers/invite.rs
··· 23 23 deadpool_diesel::sqlite::Object, 24 24 >, 25 25 ) -> Result<()> { 26 - use rsky_pds::schema::pds::actor::dsl as ActorSchema; 27 - use rsky_pds::schema::pds::invite_code::dsl as InviteCodeSchema; 28 - use rsky_pds::schema::pds::invite_code_use::dsl as InviteCodeUseSchema; 26 + use crate::schema::pds::actor::dsl as ActorSchema; 27 + use crate::schema::pds::invite_code::dsl as InviteCodeSchema; 28 + use crate::schema::pds::invite_code_use::dsl as InviteCodeUseSchema; 29 29 30 30 db.get().await?.interact(move |conn| { 31 31 let invite: Option<models::InviteCode> = InviteCodeSchema::invite_code ··· 72 72 >, 73 73 ) -> Result<()> { 74 74 if let Some(invite_code) = invite_code { 75 - use rsky_pds::schema::pds::invite_code_use::dsl as InviteCodeUseSchema; 75 + use crate::schema::pds::invite_code_use::dsl as InviteCodeUseSchema; 76 76 77 77 _ = db 78 78 .get() ··· 100 100 deadpool_diesel::sqlite::Object, 101 101 >, 102 102 ) -> Result<()> { 103 - use rsky_pds::schema::pds::invite_code::dsl as InviteCodeSchema; 103 + use crate::schema::pds::invite_code::dsl as InviteCodeSchema; 104 104 let created_at = rsky_common::now(); 105 105 106 106 _ = db ··· 144 144 deadpool_diesel::sqlite::Object, 145 145 >, 146 146 ) -> Result<Vec<CodeDetail>> { 147 - use rsky_pds::schema::pds::invite_code::dsl as InviteCodeSchema; 147 + use crate::schema::pds::invite_code::dsl as InviteCodeSchema; 148 148 149 149 let for_account = for_account.to_owned(); 150 150 let rows = db ··· 201 201 deadpool_diesel::sqlite::Object, 202 202 >, 203 203 ) -> Result<Vec<CodeDetail>> { 204 - use rsky_pds::schema::pds::invite_code::dsl as InviteCodeSchema; 204 + use crate::schema::pds::invite_code::dsl as InviteCodeSchema; 205 205 206 206 let did = did.to_owned(); 207 207 let res: Vec<models::InviteCode> = db ··· 239 239 deadpool_diesel::sqlite::Object, 240 240 >, 241 241 ) -> Result<BTreeMap<String, Vec<CodeUse>>> { 242 - use rsky_pds::schema::pds::invite_code_use::dsl as InviteCodeUseSchema; 242 + use crate::schema::pds::invite_code_use::dsl as InviteCodeUseSchema; 243 243 244 244 let mut uses: BTreeMap<String, Vec<CodeUse>> = BTreeMap::new(); 245 245 if !codes.is_empty() { ··· 282 282 if dids.is_empty() { 283 283 return Ok(BTreeMap::new()); 284 284 } 285 - use rsky_pds::schema::pds::invite_code::dsl as InviteCodeSchema; 286 - use rsky_pds::schema::pds::invite_code_use::dsl as InviteCodeUseSchema; 285 + use crate::schema::pds::invite_code::dsl as InviteCodeSchema; 286 + use crate::schema::pds::invite_code_use::dsl as InviteCodeUseSchema; 287 287 288 288 let dids = dids.clone(); 289 289 let res: Vec<models::InviteCode> = db ··· 339 339 deadpool_diesel::sqlite::Object, 340 340 >, 341 341 ) -> Result<()> { 342 - use rsky_pds::schema::pds::account::dsl as AccountSchema; 342 + use crate::schema::pds::account::dsl as AccountSchema; 343 343 344 344 let disabled: i16 = if disabled { 1 } else { 0 }; 345 345 let did = did.to_owned(); ··· 364 364 deadpool_diesel::sqlite::Object, 365 365 >, 366 366 ) -> Result<()> { 367 - use rsky_pds::schema::pds::invite_code::dsl as InviteCodeSchema; 367 + use crate::schema::pds::invite_code::dsl as InviteCodeSchema; 368 368 369 369 let DisableInviteCodesOpts { codes, accounts } = opts; 370 370 if !codes.is_empty() {
+6 -6
src/account_manager/helpers/password.rs
··· 21 21 deadpool_diesel::sqlite::Object, 22 22 >, 23 23 ) -> Result<bool> { 24 - use rsky_pds::schema::pds::account::dsl as AccountSchema; 24 + use crate::schema::pds::account::dsl as AccountSchema; 25 25 26 26 let did = did.to_owned(); 27 27 let found = db ··· 51 51 deadpool_diesel::sqlite::Object, 52 52 >, 53 53 ) -> Result<Option<String>> { 54 - use rsky_pds::schema::pds::app_password::dsl as AppPasswordSchema; 54 + use crate::schema::pds::app_password::dsl as AppPasswordSchema; 55 55 56 56 let did = did.to_owned(); 57 57 let password = password.to_owned(); ··· 91 91 let password = chunks.join("-"); 92 92 let password_encrypted = hash_app_password(&did, &password).await?; 93 93 94 - use rsky_pds::schema::pds::app_password::dsl as AppPasswordSchema; 94 + use crate::schema::pds::app_password::dsl as AppPasswordSchema; 95 95 96 96 let created_at = now(); 97 97 ··· 129 129 deadpool_diesel::sqlite::Object, 130 130 >, 131 131 ) -> Result<Vec<(String, String)>> { 132 - use rsky_pds::schema::pds::app_password::dsl as AppPasswordSchema; 132 + use crate::schema::pds::app_password::dsl as AppPasswordSchema; 133 133 134 134 let did = did.to_owned(); 135 135 db.get() ··· 151 151 deadpool_diesel::sqlite::Object, 152 152 >, 153 153 ) -> Result<()> { 154 - use rsky_pds::schema::pds::account::dsl as AccountSchema; 154 + use crate::schema::pds::account::dsl as AccountSchema; 155 155 156 156 db.get() 157 157 .await? ··· 174 174 deadpool_diesel::sqlite::Object, 175 175 >, 176 176 ) -> Result<()> { 177 - use rsky_pds::schema::pds::app_password::dsl as AppPasswordSchema; 177 + use crate::schema::pds::app_password::dsl as AppPasswordSchema; 178 178 179 179 let did = did.to_owned(); 180 180 let name = name.to_owned();
+1 -1
src/account_manager/helpers/repo.rs
··· 16 16 >, 17 17 ) -> Result<()> { 18 18 // @TODO balance risk of a race in the case of a long retry 19 - use rsky_pds::schema::pds::repo_root::dsl as RepoRootSchema; 19 + use crate::schema::pds::repo_root::dsl as RepoRootSchema; 20 20 21 21 let now = rsky_common::now(); 22 22
+15 -15
src/actor_store/blob.rs
··· 67 67 68 68 /// Get metadata for a blob by CID 69 69 pub async fn get_blob_metadata(&self, cid: Cid) -> Result<GetBlobMetadataOutput> { 70 - use rsky_pds::schema::pds::blob::dsl as BlobSchema; 70 + use crate::schema::pds::blob::dsl as BlobSchema; 71 71 72 72 let did = self.did.clone(); 73 73 let found = self ··· 112 112 113 113 /// Get all records that reference a specific blob 114 114 pub async fn get_records_for_blob(&self, cid: Cid) -> Result<Vec<String>> { 115 - use rsky_pds::schema::pds::record_blob::dsl as RecordBlobSchema; 115 + use crate::schema::pds::record_blob::dsl as RecordBlobSchema; 116 116 117 117 let did = self.did.clone(); 118 118 let res = self ··· 169 169 170 170 /// Track a blob that hasn't been associated with any records yet 171 171 pub async fn track_untethered_blob(&self, metadata: BlobMetadata) -> Result<BlobRef> { 172 - use rsky_pds::schema::pds::blob::dsl as BlobSchema; 172 + use crate::schema::pds::blob::dsl as BlobSchema; 173 173 174 174 let did = self.did.clone(); 175 175 self.db.get().await?.interact(move |conn| { ··· 254 254 255 255 /// Delete blobs that are no longer referenced by any records 256 256 pub async fn delete_dereferenced_blobs(&self, writes: Vec<PreparedWrite>) -> Result<()> { 257 - use rsky_pds::schema::pds::blob::dsl as BlobSchema; 258 - use rsky_pds::schema::pds::record_blob::dsl as RecordBlobSchema; 257 + use crate::schema::pds::blob::dsl as BlobSchema; 258 + use crate::schema::pds::record_blob::dsl as RecordBlobSchema; 259 259 260 260 // Extract URIs 261 261 let uris: Vec<String> = writes ··· 386 386 387 387 /// Verify a blob and make it permanent 388 388 pub async fn verify_blob_and_make_permanent(&self, blob: PreparedBlobRef) -> Result<()> { 389 - use rsky_pds::schema::pds::blob::dsl as BlobSchema; 389 + use crate::schema::pds::blob::dsl as BlobSchema; 390 390 391 391 let found = self 392 392 .db ··· 433 433 434 434 /// Associate a blob with a record 435 435 pub async fn associate_blob(&self, blob: PreparedBlobRef, record_uri: String) -> Result<()> { 436 - use rsky_pds::schema::pds::record_blob::dsl as RecordBlobSchema; 436 + use crate::schema::pds::record_blob::dsl as RecordBlobSchema; 437 437 438 438 let cid = blob.cid.to_string(); 439 439 let did = self.did.clone(); ··· 460 460 461 461 /// Count all blobs for this actor 462 462 pub async fn blob_count(&self) -> Result<i64> { 463 - use rsky_pds::schema::pds::blob::dsl as BlobSchema; 463 + use crate::schema::pds::blob::dsl as BlobSchema; 464 464 465 465 let did = self.did.clone(); 466 466 self.db ··· 479 479 480 480 /// Count blobs associated with records 481 481 pub async fn record_blob_count(&self) -> Result<i64> { 482 - use rsky_pds::schema::pds::record_blob::dsl as RecordBlobSchema; 482 + use crate::schema::pds::record_blob::dsl as RecordBlobSchema; 483 483 484 484 let did = self.did.clone(); 485 485 self.db ··· 501 501 &self, 502 502 opts: ListMissingBlobsOpts, 503 503 ) -> Result<Vec<ListMissingBlobsRefRecordBlob>> { 504 - use rsky_pds::schema::pds::blob::dsl as BlobSchema; 505 - use rsky_pds::schema::pds::record_blob::dsl as RecordBlobSchema; 504 + use crate::schema::pds::blob::dsl as BlobSchema; 505 + use crate::schema::pds::record_blob::dsl as RecordBlobSchema; 506 506 507 507 let did = self.did.clone(); 508 508 self.db ··· 563 563 564 564 /// List all blobs with optional filtering 565 565 pub async fn list_blobs(&self, opts: ListBlobsOpts) -> Result<Vec<String>> { 566 - use rsky_pds::schema::pds::record::dsl as RecordSchema; 567 - use rsky_pds::schema::pds::record_blob::dsl as RecordBlobSchema; 566 + use crate::schema::pds::record::dsl as RecordSchema; 567 + use crate::schema::pds::record_blob::dsl as RecordBlobSchema; 568 568 569 569 let ListBlobsOpts { 570 570 since, ··· 617 617 618 618 /// Get the takedown status of a blob 619 619 pub async fn get_blob_takedown_status(&self, cid: Cid) -> Result<Option<StatusAttr>> { 620 - use rsky_pds::schema::pds::blob::dsl as BlobSchema; 620 + use crate::schema::pds::blob::dsl as BlobSchema; 621 621 622 622 self.db 623 623 .get() ··· 653 653 654 654 /// Update the takedown status of a blob 655 655 pub async fn update_blob_takedown_status(&self, blob: Cid, takedown: StatusAttr) -> Result<()> { 656 - use rsky_pds::schema::pds::blob::dsl as BlobSchema; 656 + use crate::schema::pds::blob::dsl as BlobSchema; 657 657 658 658 let takedown_ref: Option<String> = match takedown.applied { 659 659 true => takedown.r#ref.map_or_else(|| Some(now()), Some),
+2 -2
src/actor_store/mod.rs
··· 460 460 461 461 pub async fn destroy(&mut self) -> Result<()> { 462 462 let did: String = self.did.clone(); 463 - use rsky_pds::schema::pds::blob::dsl as BlobSchema; 463 + use crate::schema::pds::blob::dsl as BlobSchema; 464 464 465 465 let blob_rows: Vec<String> = self 466 466 .storage ··· 499 499 return Ok(vec![]); 500 500 } 501 501 let did: String = self.did.clone(); 502 - use rsky_pds::schema::pds::record::dsl as RecordSchema; 502 + use crate::schema::pds::record::dsl as RecordSchema; 503 503 504 504 let cid_strs: Vec<String> = cids.into_iter().map(|c| c.to_string()).collect(); 505 505 let touched_uri_strs: Vec<String> = touched_uris.iter().map(|t| t.to_string()).collect();
+2 -2
src/actor_store/preference.rs
··· 36 36 namespace: Option<String>, 37 37 scope: AuthScope, 38 38 ) -> Result<Vec<RefPreferences>> { 39 - use rsky_pds::schema::pds::account_pref::dsl as AccountPrefSchema; 39 + use crate::schema::pds::account_pref::dsl as AccountPrefSchema; 40 40 41 41 let did = self.did.clone(); 42 42 self.db ··· 99 99 bail!("Do not have authorization to set preferences."); 100 100 } 101 101 // get all current prefs for user and prep new pref rows 102 - use rsky_pds::schema::pds::account_pref::dsl as AccountPrefSchema; 102 + use crate::schema::pds::account_pref::dsl as AccountPrefSchema; 103 103 let all_prefs = AccountPrefSchema::account_pref 104 104 .filter(AccountPrefSchema::did.eq(&did)) 105 105 .select(AccountPref::as_select())
+17 -17
src/actor_store/record.rs
··· 43 43 44 44 /// Count the total number of records. 45 45 pub(crate) async fn record_count(&mut self) -> Result<i64> { 46 - use rsky_pds::schema::pds::record::dsl::*; 46 + use crate::schema::pds::record::dsl::*; 47 47 48 48 let other_did = self.did.clone(); 49 49 self.db ··· 59 59 60 60 /// List all collections in the repository. 61 61 pub(crate) async fn list_collections(&self) -> Result<Vec<String>> { 62 - use rsky_pds::schema::pds::record::dsl::*; 62 + use crate::schema::pds::record::dsl::*; 63 63 64 64 let other_did = self.did.clone(); 65 65 self.db ··· 90 90 rkey_end: Option<String>, 91 91 include_soft_deleted: Option<bool>, 92 92 ) -> Result<Vec<RecordsForCollection>> { 93 - use rsky_pds::schema::pds::record::dsl as RecordSchema; 94 - use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; 93 + use crate::schema::pds::record::dsl as RecordSchema; 94 + use crate::schema::pds::repo_block::dsl as RepoBlockSchema; 95 95 96 96 let include_soft_deleted: bool = include_soft_deleted.unwrap_or(false); 97 97 let mut builder = RecordSchema::record ··· 149 149 cid: Option<String>, 150 150 include_soft_deleted: Option<bool>, 151 151 ) -> Result<Option<GetRecord>> { 152 - use rsky_pds::schema::pds::record::dsl as RecordSchema; 153 - use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; 152 + use crate::schema::pds::record::dsl as RecordSchema; 153 + use crate::schema::pds::repo_block::dsl as RepoBlockSchema; 154 154 155 155 let include_soft_deleted: bool = include_soft_deleted.unwrap_or(false); 156 156 let mut builder = RecordSchema::record ··· 191 191 cid: Option<String>, 192 192 include_soft_deleted: Option<bool>, 193 193 ) -> Result<bool> { 194 - use rsky_pds::schema::pds::record::dsl as RecordSchema; 194 + use crate::schema::pds::record::dsl as RecordSchema; 195 195 196 196 let include_soft_deleted: bool = include_soft_deleted.unwrap_or(false); 197 197 let mut builder = RecordSchema::record ··· 219 219 &self, 220 220 uri: String, 221 221 ) -> Result<Option<StatusAttr>> { 222 - use rsky_pds::schema::pds::record::dsl as RecordSchema; 222 + use crate::schema::pds::record::dsl as RecordSchema; 223 223 224 224 let res = self 225 225 .db ··· 257 257 258 258 /// Get the current CID for a record URI. 259 259 pub(crate) async fn get_current_record_cid(&self, uri: String) -> Result<Option<Cid>> { 260 - use rsky_pds::schema::pds::record::dsl as RecordSchema; 260 + use crate::schema::pds::record::dsl as RecordSchema; 261 261 262 262 let res = self 263 263 .db ··· 286 286 path: String, 287 287 link_to: String, 288 288 ) -> Result<Vec<Record>> { 289 - use rsky_pds::schema::pds::backlink::dsl as BacklinkSchema; 290 - use rsky_pds::schema::pds::record::dsl as RecordSchema; 289 + use crate::schema::pds::backlink::dsl as BacklinkSchema; 290 + use crate::schema::pds::record::dsl as RecordSchema; 291 291 292 292 let res = self 293 293 .db ··· 385 385 bail!("Expected indexed URI to contain a record key") 386 386 } 387 387 388 - use rsky_pds::schema::pds::record::dsl as RecordSchema; 388 + use crate::schema::pds::record::dsl as RecordSchema; 389 389 390 390 // Track current version of record 391 391 let (record, uri) = self ··· 426 426 #[tracing::instrument(skip_all)] 427 427 pub(crate) async fn delete_record(&self, uri: &AtUri) -> Result<()> { 428 428 tracing::debug!("@LOG DEBUG RecordReader::delete_record, deleting indexed record {uri}"); 429 - use rsky_pds::schema::pds::backlink::dsl as BacklinkSchema; 430 - use rsky_pds::schema::pds::record::dsl as RecordSchema; 429 + use crate::schema::pds::backlink::dsl as BacklinkSchema; 430 + use crate::schema::pds::record::dsl as RecordSchema; 431 431 let uri = uri.to_string(); 432 432 self.db 433 433 .get() ··· 450 450 451 451 /// Remove backlinks for a URI. 452 452 pub(crate) async fn remove_backlinks_by_uri(&self, uri: &AtUri) -> Result<()> { 453 - use rsky_pds::schema::pds::backlink::dsl as BacklinkSchema; 453 + use crate::schema::pds::backlink::dsl as BacklinkSchema; 454 454 let uri = uri.to_string(); 455 455 self.db 456 456 .get() ··· 470 470 if backlinks.is_empty() { 471 471 Ok(()) 472 472 } else { 473 - use rsky_pds::schema::pds::backlink::dsl as BacklinkSchema; 473 + use crate::schema::pds::backlink::dsl as BacklinkSchema; 474 474 self.db 475 475 .get() 476 476 .await? ··· 491 491 uri: &AtUri, 492 492 takedown: StatusAttr, 493 493 ) -> Result<()> { 494 - use rsky_pds::schema::pds::record::dsl as RecordSchema; 494 + use crate::schema::pds::record::dsl as RecordSchema; 495 495 496 496 let takedown_ref: Option<String> = match takedown.applied { 497 497 true => takedown
+10 -10
src/actor_store/sql_repo.rs
··· 53 53 let cid = *cid; 54 54 55 55 Box::pin(async move { 56 - use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; 56 + use crate::schema::pds::repo_block::dsl as RepoBlockSchema; 57 57 let cached = { 58 58 let cache_guard = self.cache.read().await; 59 59 cache_guard.get(cid).cloned() ··· 104 104 let did: String = self.did.clone(); 105 105 106 106 Box::pin(async move { 107 - use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; 107 + use crate::schema::pds::repo_block::dsl as RepoBlockSchema; 108 108 let cached = { 109 109 let mut cache_guard = self.cache.write().await; 110 110 cache_guard.get_many(cids)? ··· 202 202 let did: String = self.did.clone(); 203 203 let bytes_cloned = bytes.clone(); 204 204 Box::pin(async move { 205 - use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; 205 + use crate::schema::pds::repo_block::dsl as RepoBlockSchema; 206 206 207 207 _ = self 208 208 .db ··· 235 235 let did: String = self.did.clone(); 236 236 237 237 Box::pin(async move { 238 - use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; 238 + use crate::schema::pds::repo_block::dsl as RepoBlockSchema; 239 239 240 240 let blocks: Vec<RepoBlock> = to_put 241 241 .map ··· 277 277 let now: String = self.now.clone(); 278 278 279 279 Box::pin(async move { 280 - use rsky_pds::schema::pds::repo_root::dsl as RepoRootSchema; 280 + use crate::schema::pds::repo_root::dsl as RepoRootSchema; 281 281 282 282 let is_create = is_create.unwrap_or(false); 283 283 if is_create { ··· 381 381 let did: String = self.did.clone(); 382 382 let since = since.clone(); 383 383 let cursor = cursor.clone(); 384 - use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; 384 + use crate::schema::pds::repo_block::dsl as RepoBlockSchema; 385 385 386 386 Ok(self 387 387 .db ··· 418 418 419 419 pub async fn count_blocks(&self) -> Result<i64> { 420 420 let did: String = self.did.clone(); 421 - use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; 421 + use crate::schema::pds::repo_block::dsl as RepoBlockSchema; 422 422 423 423 let res = self 424 424 .db ··· 439 439 /// Proactively cache all blocks from a particular commit (to prevent multiple roundtrips) 440 440 pub async fn cache_rev(&mut self, rev: String) -> Result<()> { 441 441 let did: String = self.did.clone(); 442 - use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; 442 + use crate::schema::pds::repo_block::dsl as RepoBlockSchema; 443 443 444 444 let result: Vec<(String, Vec<u8>)> = self 445 445 .db ··· 465 465 return Ok(()); 466 466 } 467 467 let did: String = self.did.clone(); 468 - use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; 468 + use crate::schema::pds::repo_block::dsl as RepoBlockSchema; 469 469 470 470 let cid_strings: Vec<String> = cids.into_iter().map(|c| c.to_string()).collect(); 471 471 _ = self ··· 483 483 484 484 pub async fn get_root_detailed(&self) -> Result<CidAndRev> { 485 485 let did: String = self.did.clone(); 486 - use rsky_pds::schema::pds::repo_root::dsl as RepoRootSchema; 486 + use crate::schema::pds::repo_root::dsl as RepoRootSchema; 487 487 488 488 let res = self 489 489 .db
+2 -2
src/auth.rs
··· 130 130 131 131 // Extract subject (DID) 132 132 if let Some(did) = claims.get("sub").and_then(serde_json::Value::as_str) { 133 - use rsky_pds::schema::pds::account::dsl as AccountSchema; 133 + use crate::schema::pds::account::dsl as AccountSchema; 134 134 let did_clone = did.to_owned(); 135 135 136 136 let _did = state ··· 395 395 396 396 // Extract subject (DID) from access token 397 397 if let Some(did) = claims.get("sub").and_then(|v| v.as_str()) { 398 - use rsky_pds::schema::pds::account::dsl as AccountSchema; 398 + use crate::schema::pds::account::dsl as AccountSchema; 399 399 400 400 let did_clone = did.to_owned(); 401 401
+460
src/lib.rs
··· 1 + //! PDS implementation. 2 + mod account_manager; 3 + mod actor_endpoints; 4 + mod actor_store; 5 + mod auth; 6 + mod config; 7 + mod db; 8 + mod did; 9 + mod endpoints; 10 + pub mod error; 11 + mod firehose; 12 + mod metrics; 13 + mod mmap; 14 + mod oauth; 15 + mod plc; 16 + mod schema; 17 + mod service_proxy; 18 + #[cfg(test)] 19 + mod tests; 20 + 21 + use anyhow::{Context as _, anyhow}; 22 + use atrium_api::types::string::Did; 23 + use atrium_crypto::keypair::{Export as _, Secp256k1Keypair}; 24 + use auth::AuthenticatedUser; 25 + use axum::{ 26 + Router, 27 + body::Body, 28 + extract::{FromRef, Request, State}, 29 + http::{self, HeaderMap, Response, StatusCode, Uri}, 30 + response::IntoResponse, 31 + routing::get, 32 + }; 33 + use azure_core::credentials::TokenCredential; 34 + use clap::Parser; 35 + use clap_verbosity_flag::{InfoLevel, Verbosity, log::LevelFilter}; 36 + use config::AppConfig; 37 + use db::establish_pool; 38 + use deadpool_diesel::sqlite::Pool; 39 + use diesel::prelude::*; 40 + use diesel_migrations::{EmbeddedMigrations, embed_migrations}; 41 + pub use error::Error; 42 + use figment::{Figment, providers::Format as _}; 43 + use firehose::FirehoseProducer; 44 + use http_cache_reqwest::{CacheMode, HttpCacheOptions, MokaManager}; 45 + use rand::Rng as _; 46 + use serde::{Deserialize, Serialize}; 47 + use service_proxy::service_proxy; 48 + use std::{ 49 + net::{IpAddr, Ipv4Addr, SocketAddr}, 50 + path::PathBuf, 51 + str::FromStr as _, 52 + sync::Arc, 53 + }; 54 + use tokio::net::TcpListener; 55 + use tower_http::{cors::CorsLayer, trace::TraceLayer}; 56 + use tracing::{info, warn}; 57 + use uuid::Uuid; 58 + 59 + /// The application user agent. Concatenates the package name and version. e.g. `bluepds/0.0.0`. 60 + pub const APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); 61 + 62 + /// Embedded migrations 63 + pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); 64 + 65 + /// The application-wide result type. 66 + pub type Result<T> = std::result::Result<T, Error>; 67 + /// The reqwest client type with middleware. 68 + pub type Client = reqwest_middleware::ClientWithMiddleware; 69 + /// The Azure credential type. 70 + pub type Cred = Arc<dyn TokenCredential>; 71 + 72 + #[expect( 73 + clippy::arbitrary_source_item_ordering, 74 + reason = "serialized data might be structured" 75 + )] 76 + #[derive(Serialize, Deserialize, Debug, Clone)] 77 + /// The key data structure. 78 + struct KeyData { 79 + /// Primary signing key for all repo operations. 80 + skey: Vec<u8>, 81 + /// Primary signing (rotation) key for all PLC operations. 82 + rkey: Vec<u8>, 83 + } 84 + 85 + // FIXME: We should use P256Keypair instead. SecP256K1 is primarily used for cryptocurrencies, 86 + // and the implementations of this algorithm are much more limited as compared to P256. 87 + // 88 + // Reference: https://soatok.blog/2022/05/19/guidance-for-choosing-an-elliptic-curve-signature-algorithm-in-2022/ 89 + #[derive(Clone)] 90 + /// The signing key for PLC/DID operations. 91 + pub struct SigningKey(Arc<Secp256k1Keypair>); 92 + #[derive(Clone)] 93 + /// The rotation key for PLC operations. 94 + pub struct RotationKey(Arc<Secp256k1Keypair>); 95 + 96 + impl std::ops::Deref for SigningKey { 97 + type Target = Secp256k1Keypair; 98 + 99 + fn deref(&self) -> &Self::Target { 100 + &self.0 101 + } 102 + } 103 + 104 + impl SigningKey { 105 + /// Import from a private key. 106 + pub fn import(key: &[u8]) -> Result<Self> { 107 + let key = Secp256k1Keypair::import(key).context("failed to import signing key")?; 108 + Ok(Self(Arc::new(key))) 109 + } 110 + } 111 + 112 + impl std::ops::Deref for RotationKey { 113 + type Target = Secp256k1Keypair; 114 + 115 + fn deref(&self) -> &Self::Target { 116 + &self.0 117 + } 118 + } 119 + 120 + #[derive(Parser, Debug, Clone)] 121 + /// Command line arguments. 122 + pub struct Args { 123 + /// Path to the configuration file 124 + #[arg(short, long, default_value = "default.toml")] 125 + pub config: PathBuf, 126 + /// The verbosity level. 127 + #[command(flatten)] 128 + pub verbosity: Verbosity<InfoLevel>, 129 + } 130 + 131 + pub struct ActorPools { 132 + pub repo: Pool, 133 + pub blob: Pool, 134 + } 135 + 136 + impl Clone for ActorPools { 137 + fn clone(&self) -> Self { 138 + Self { 139 + repo: self.repo.clone(), 140 + blob: self.blob.clone(), 141 + } 142 + } 143 + } 144 + 145 + #[expect(clippy::arbitrary_source_item_ordering, reason = "arbitrary")] 146 + #[derive(Clone, FromRef)] 147 + pub struct AppState { 148 + /// The application configuration. 149 + pub config: AppConfig, 150 + /// The Azure credential. 151 + pub cred: Cred, 152 + /// The main database connection pool. Used for common PDS data, like invite codes. 153 + pub db: Pool, 154 + /// Actor-specific database connection pools. Hashed by DID. 155 + pub db_actors: std::collections::HashMap<String, ActorPools>, 156 + 157 + /// The HTTP client with middleware. 158 + pub client: Client, 159 + /// The simple HTTP client. 160 + pub simple_client: reqwest::Client, 161 + /// The firehose producer. 162 + pub firehose: FirehoseProducer, 163 + 164 + /// The signing key. 165 + pub signing_key: SigningKey, 166 + /// The rotation key. 167 + pub rotation_key: RotationKey, 168 + } 169 + 170 + /// The index (/) route. 171 + async fn index() -> impl IntoResponse { 172 + r" 173 + __ __ 174 + /\ \__ /\ \__ 175 + __ \ \ ,_\ _____ _ __ ___\ \ ,_\ ___ 176 + /'__'\ \ \ \/ /\ '__'\/\''__\/ __'\ \ \/ / __'\ 177 + /\ \L\.\_\ \ \_\ \ \L\ \ \ \//\ \L\ \ \ \_/\ \L\ \ 178 + \ \__/.\_\\ \__\\ \ ,__/\ \_\\ \____/\ \__\ \____/ 179 + \/__/\/_/ \/__/ \ \ \/ \/_/ \/___/ \/__/\/___/ 180 + \ \_\ 181 + \/_/ 182 + 183 + 184 + This is an AT Protocol Personal Data Server (aka, an atproto PDS) 185 + 186 + Most API routes are under /xrpc/ 187 + 188 + Code: https://github.com/DrChat/bluepds 189 + Protocol: https://atproto.com 190 + " 191 + } 192 + 193 + /// The main application entry point. 194 + #[expect( 195 + clippy::cognitive_complexity, 196 + clippy::too_many_lines, 197 + unused_qualifications, 198 + reason = "main function has high complexity" 199 + )] 200 + pub async fn run() -> anyhow::Result<()> { 201 + let args = Args::parse(); 202 + 203 + // Set up trace logging to console and account for the user-provided verbosity flag. 204 + if args.verbosity.log_level_filter() != LevelFilter::Off { 205 + let lvl = match args.verbosity.log_level_filter() { 206 + LevelFilter::Error => tracing::Level::ERROR, 207 + LevelFilter::Warn => tracing::Level::WARN, 208 + LevelFilter::Info | LevelFilter::Off => tracing::Level::INFO, 209 + LevelFilter::Debug => tracing::Level::DEBUG, 210 + LevelFilter::Trace => tracing::Level::TRACE, 211 + }; 212 + tracing_subscriber::fmt().with_max_level(lvl).init(); 213 + } 214 + 215 + if !args.config.exists() { 216 + // Throw up a warning if the config file does not exist. 217 + // 218 + // This is not fatal because users can specify all configuration settings via 219 + // the environment, but the most likely scenario here is that a user accidentally 220 + // omitted the config file for some reason (e.g. forgot to mount it into Docker). 221 + warn!( 222 + "configuration file {} does not exist", 223 + args.config.display() 224 + ); 225 + } 226 + 227 + // Read and parse the user-provided configuration. 228 + let config: AppConfig = Figment::new() 229 + .admerge(figment::providers::Toml::file(args.config)) 230 + .admerge(figment::providers::Env::prefixed("BLUEPDS_")) 231 + .extract() 232 + .context("failed to load configuration")?; 233 + 234 + if config.test { 235 + warn!("BluePDS starting up in TEST mode."); 236 + warn!("This means the application will not federate with the rest of the network."); 237 + warn!( 238 + "If you want to turn this off, either set `test` to false in the config or define `BLUEPDS_TEST = false`" 239 + ); 240 + } 241 + 242 + // Initialize metrics reporting. 243 + metrics::setup(config.metrics.as_ref()).context("failed to set up metrics exporter")?; 244 + 245 + // Create a reqwest client that will be used for all outbound requests. 246 + let simple_client = reqwest::Client::builder() 247 + .user_agent(APP_USER_AGENT) 248 + .build() 249 + .context("failed to build requester client")?; 250 + let client = reqwest_middleware::ClientBuilder::new(simple_client.clone()) 251 + .with(http_cache_reqwest::Cache(http_cache_reqwest::HttpCache { 252 + mode: CacheMode::Default, 253 + manager: MokaManager::default(), 254 + options: HttpCacheOptions::default(), 255 + })) 256 + .build(); 257 + 258 + tokio::fs::create_dir_all(&config.key.parent().context("should have parent")?) 259 + .await 260 + .context("failed to create key directory")?; 261 + 262 + // Check if crypto keys exist. If not, create new ones. 263 + let (skey, rkey) = if let Ok(f) = std::fs::File::open(&config.key) { 264 + let keys: KeyData = serde_ipld_dagcbor::from_reader(std::io::BufReader::new(f)) 265 + .context("failed to deserialize crypto keys")?; 266 + 267 + let skey = Secp256k1Keypair::import(&keys.skey).context("failed to import signing key")?; 268 + let rkey = Secp256k1Keypair::import(&keys.rkey).context("failed to import rotation key")?; 269 + 270 + (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey))) 271 + } else { 272 + info!("signing keys not found, generating new ones"); 273 + 274 + let skey = Secp256k1Keypair::create(&mut rand::thread_rng()); 275 + let rkey = Secp256k1Keypair::create(&mut rand::thread_rng()); 276 + 277 + let keys = KeyData { 278 + skey: skey.export(), 279 + rkey: rkey.export(), 280 + }; 281 + 282 + let mut f = std::fs::File::create(&config.key).context("failed to create key file")?; 283 + serde_ipld_dagcbor::to_writer(&mut f, &keys).context("failed to serialize crypto keys")?; 284 + 285 + (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey))) 286 + }; 287 + 288 + tokio::fs::create_dir_all(&config.repo.path).await?; 289 + tokio::fs::create_dir_all(&config.plc.path).await?; 290 + tokio::fs::create_dir_all(&config.blob.path).await?; 291 + 292 + let cred = azure_identity::DefaultAzureCredential::new() 293 + .context("failed to create Azure credential")?; 294 + 295 + // Create a database connection manager and pool for the main database. 296 + let pool = 297 + establish_pool(&config.db).context("failed to establish database connection pool")?; 298 + // Create a dictionary of database connection pools for each actor. 299 + let mut actor_pools = std::collections::HashMap::new(); 300 + // let mut actor_blob_pools = std::collections::HashMap::new(); 301 + // We'll determine actors by looking in the data/repo dir for .db files. 302 + let mut actor_dbs = tokio::fs::read_dir(&config.repo.path) 303 + .await 304 + .context("failed to read repo directory")?; 305 + while let Some(entry) = actor_dbs 306 + .next_entry() 307 + .await 308 + .context("failed to read repo dir")? 309 + { 310 + let path = entry.path(); 311 + if path.extension().and_then(|s| s.to_str()) == Some("db") { 312 + let did_path = path 313 + .file_stem() 314 + .and_then(|s| s.to_str()) 315 + .context("failed to get actor DID")?; 316 + let did = Did::from_str(&format!("did:plc:{}", did_path)) 317 + .expect("should be able to parse actor DID"); 318 + 319 + // Create a new database connection manager and pool for the actor. 320 + // The path for the SQLite connection needs to look like "sqlite://data/repo/<actor>.db" 321 + let path_repo = format!("sqlite://{}", did_path); 322 + let actor_repo_pool = 323 + establish_pool(&path_repo).context("failed to create database connection pool")?; 324 + // Create a new database connection manager and pool for the actor blobs. 325 + // The path for the SQLite connection needs to look like "sqlite://data/blob/<actor>.db" 326 + let path_blob = path_repo.replace("repo", "blob"); 327 + let actor_blob_pool = 328 + establish_pool(&path_blob).context("failed to create database connection pool")?; 329 + drop(actor_pools.insert( 330 + did.to_string(), 331 + ActorPools { 332 + repo: actor_repo_pool, 333 + blob: actor_blob_pool, 334 + }, 335 + )); 336 + } 337 + } 338 + // Apply pending migrations 339 + // let conn = pool.get().await?; 340 + // conn.run_pending_migrations(MIGRATIONS) 341 + // .expect("should be able to run migrations"); 342 + 343 + let (_fh, fhp) = firehose::spawn(client.clone(), config.clone()); 344 + 345 + let addr = config 346 + .listen_address 347 + .unwrap_or(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000)); 348 + 349 + let app = Router::new() 350 + .route("/", get(index)) 351 + .merge(oauth::routes()) 352 + .nest( 353 + "/xrpc", 354 + endpoints::routes() 355 + .merge(actor_endpoints::routes()) 356 + .fallback(service_proxy), 357 + ) 358 + // .layer(RateLimitLayer::new(30, Duration::from_secs(30))) 359 + .layer(CorsLayer::permissive()) 360 + .layer(TraceLayer::new_for_http()) 361 + .with_state(AppState { 362 + cred, 363 + config: config.clone(), 364 + db: pool.clone(), 365 + db_actors: actor_pools.clone(), 366 + client: client.clone(), 367 + simple_client, 368 + firehose: fhp, 369 + signing_key: skey, 370 + rotation_key: rkey, 371 + }); 372 + 373 + info!("listening on {addr}"); 374 + info!("connect to: http://127.0.0.1:{}", addr.port()); 375 + 376 + // Determine whether or not this was the first startup (i.e. no accounts exist and no invite codes were created). 377 + // If so, create an invite code and share it via the console. 378 + let conn = pool.get().await.context("failed to get db connection")?; 379 + 380 + #[derive(QueryableByName)] 381 + struct TotalCount { 382 + #[diesel(sql_type = diesel::sql_types::Integer)] 383 + total_count: i32, 384 + } 385 + 386 + let result = conn.interact(move |conn| { 387 + diesel::sql_query( 388 + "SELECT (SELECT COUNT(*) FROM accounts) + (SELECT COUNT(*) FROM invites) AS total_count", 389 + ) 390 + .get_result::<TotalCount>(conn) 391 + }) 392 + .await 393 + .expect("should be able to query database")?; 394 + 395 + let c = result.total_count; 396 + 397 + #[expect(clippy::print_stdout)] 398 + if c == 0 { 399 + let uuid = Uuid::new_v4().to_string(); 400 + 401 + let uuid_clone = uuid.clone(); 402 + _ = conn 403 + .interact(move |conn| { 404 + diesel::sql_query( 405 + "INSERT INTO invites (id, did, count, created_at) VALUES (?, NULL, 1, datetime('now'))", 406 + ) 407 + .bind::<diesel::sql_types::Text, _>(uuid_clone) 408 + .execute(conn) 409 + .context("failed to create new invite code") 410 + .expect("should be able to create invite code") 411 + }) 412 + .await 413 + .expect("should be able to create invite code"); 414 + 415 + // N.B: This is a sensitive message, so we're bypassing `tracing` here and 416 + // logging it directly to console. 417 + println!("====================================="); 418 + println!(" FIRST STARTUP "); 419 + println!("====================================="); 420 + println!("Use this code to create an account:"); 421 + println!("{uuid}"); 422 + println!("====================================="); 423 + } 424 + 425 + let listener = TcpListener::bind(&addr) 426 + .await 427 + .context("failed to bind address")?; 428 + 429 + // Serve the app, and request crawling from upstream relays. 430 + let serve = tokio::spawn(async move { 431 + axum::serve(listener, app.into_make_service()) 432 + .await 433 + .context("failed to serve app") 434 + }); 435 + 436 + // Now that the app is live, request a crawl from upstream relays. 437 + firehose::reconnect_relays(&client, &config).await; 438 + 439 + serve 440 + .await 441 + .map_err(Into::into) 442 + .and_then(|r| r) 443 + .context("failed to serve app") 444 + } 445 + 446 + /// Creates an app router with the provided AppState. 447 + pub fn create_app(state: AppState) -> Router { 448 + Router::new() 449 + .route("/", get(index)) 450 + .merge(oauth::routes()) 451 + .nest( 452 + "/xrpc", 453 + endpoints::routes() 454 + .merge(actor_endpoints::routes()) 455 + .fallback(service_proxy), 456 + ) 457 + .layer(CorsLayer::permissive()) 458 + .layer(TraceLayer::new_for_http()) 459 + .with_state(state) 460 + }
+5 -444
src/main.rs
··· 1 - //! PDS implementation. 2 - mod account_manager; 3 - mod actor_endpoints; 4 - mod actor_store; 5 - mod auth; 6 - mod config; 7 - mod db; 8 - mod did; 9 - mod endpoints; 10 - mod error; 11 - mod firehose; 12 - mod metrics; 13 - mod mmap; 14 - mod oauth; 15 - mod plc; 16 - mod schema; 17 - mod service_proxy; 18 - #[cfg(test)] 19 - mod tests; 1 + //! BluePDS binary entry point. 20 2 21 - use anyhow::{Context as _, anyhow}; 22 - use atrium_api::types::string::Did; 23 - use atrium_crypto::keypair::{Export as _, Secp256k1Keypair}; 24 - use auth::AuthenticatedUser; 25 - use axum::{ 26 - Router, 27 - body::Body, 28 - extract::{FromRef, Request, State}, 29 - http::{self, HeaderMap, Response, StatusCode, Uri}, 30 - response::IntoResponse, 31 - routing::get, 32 - }; 33 - use azure_core::credentials::TokenCredential; 3 + use anyhow::Context as _; 34 4 use clap::Parser; 35 - use clap_verbosity_flag::{InfoLevel, Verbosity, log::LevelFilter}; 36 - use config::AppConfig; 37 - use db::establish_pool; 38 - use deadpool_diesel::sqlite::Pool; 39 - use diesel::prelude::*; 40 - use diesel_migrations::{EmbeddedMigrations, embed_migrations}; 41 - #[expect(clippy::pub_use, clippy::useless_attribute)] 42 - pub use error::Error; 43 - use figment::{Figment, providers::Format as _}; 44 - use firehose::FirehoseProducer; 45 - use http_cache_reqwest::{CacheMode, HttpCacheOptions, MokaManager}; 46 - use rand::Rng as _; 47 - use serde::{Deserialize, Serialize}; 48 - use service_proxy::service_proxy; 49 - use std::{ 50 - net::{IpAddr, Ipv4Addr, SocketAddr}, 51 - path::PathBuf, 52 - str::FromStr as _, 53 - sync::Arc, 54 - }; 55 - use tokio::net::TcpListener; 56 - use tower_http::{cors::CorsLayer, trace::TraceLayer}; 57 - use tracing::{info, warn}; 58 - use uuid::Uuid; 59 - 60 - /// The application user agent. Concatenates the package name and version. e.g. `bluepds/0.0.0`. 61 - pub const APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); 62 - 63 - /// Embedded migrations 64 - pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); 65 - 66 - /// The application-wide result type. 67 - pub type Result<T> = std::result::Result<T, Error>; 68 - /// The reqwest client type with middleware. 69 - pub type Client = reqwest_middleware::ClientWithMiddleware; 70 - /// The Azure credential type. 71 - pub type Cred = Arc<dyn TokenCredential>; 72 - 73 - #[expect( 74 - clippy::arbitrary_source_item_ordering, 75 - reason = "serialized data might be structured" 76 - )] 77 - #[derive(Serialize, Deserialize, Debug, Clone)] 78 - /// The key data structure. 79 - struct KeyData { 80 - /// Primary signing key for all repo operations. 81 - skey: Vec<u8>, 82 - /// Primary signing (rotation) key for all PLC operations. 83 - rkey: Vec<u8>, 84 - } 85 - 86 - // FIXME: We should use P256Keypair instead. SecP256K1 is primarily used for cryptocurrencies, 87 - // and the implementations of this algorithm are much more limited as compared to P256. 88 - // 89 - // Reference: https://soatok.blog/2022/05/19/guidance-for-choosing-an-elliptic-curve-signature-algorithm-in-2022/ 90 - #[derive(Clone)] 91 - /// The signing key for PLC/DID operations. 92 - pub struct SigningKey(Arc<Secp256k1Keypair>); 93 - #[derive(Clone)] 94 - /// The rotation key for PLC operations. 95 - pub struct RotationKey(Arc<Secp256k1Keypair>); 96 - 97 - impl std::ops::Deref for SigningKey { 98 - type Target = Secp256k1Keypair; 99 - 100 - fn deref(&self) -> &Self::Target { 101 - &self.0 102 - } 103 - } 104 - 105 - impl SigningKey { 106 - /// Import from a private key. 107 - pub fn import(key: &[u8]) -> Result<Self> { 108 - let key = Secp256k1Keypair::import(key).context("failed to import signing key")?; 109 - Ok(Self(Arc::new(key))) 110 - } 111 - } 112 - 113 - impl std::ops::Deref for RotationKey { 114 - type Target = Secp256k1Keypair; 115 - 116 - fn deref(&self) -> &Self::Target { 117 - &self.0 118 - } 119 - } 120 - 121 - #[derive(Parser, Debug, Clone)] 122 - /// Command line arguments. 123 - struct Args { 124 - /// Path to the configuration file 125 - #[arg(short, long, default_value = "default.toml")] 126 - config: PathBuf, 127 - /// The verbosity level. 128 - #[command(flatten)] 129 - verbosity: Verbosity<InfoLevel>, 130 - } 131 - 132 - struct ActorPools { 133 - repo: Pool, 134 - blob: Pool, 135 - } 136 - impl Clone for ActorPools { 137 - fn clone(&self) -> Self { 138 - Self { 139 - repo: self.repo.clone(), 140 - blob: self.blob.clone(), 141 - } 142 - } 143 - } 144 - 145 - #[expect(clippy::arbitrary_source_item_ordering, reason = "arbitrary")] 146 - #[derive(Clone, FromRef)] 147 - struct AppState { 148 - /// The application configuration. 149 - config: AppConfig, 150 - /// The Azure credential. 151 - cred: Cred, 152 - /// The main database connection pool. Used for common PDS data, like invite codes. 153 - db: Pool, 154 - /// Actor-specific database connection pools. Hashed by DID. 155 - db_actors: std::collections::HashMap<String, ActorPools>, 156 - 157 - /// The HTTP client with middleware. 158 - client: Client, 159 - /// The simple HTTP client. 160 - simple_client: reqwest::Client, 161 - /// The firehose producer. 162 - firehose: FirehoseProducer, 163 - 164 - /// The signing key. 165 - signing_key: SigningKey, 166 - /// The rotation key. 167 - rotation_key: RotationKey, 168 - } 169 - 170 - /// The index (/) route. 171 - async fn index() -> impl IntoResponse { 172 - r" 173 - __ __ 174 - /\ \__ /\ \__ 175 - __ \ \ ,_\ _____ _ __ ___\ \ ,_\ ___ 176 - /'__'\ \ \ \/ /\ '__'\/\''__\/ __'\ \ \/ / __'\ 177 - /\ \L\.\_\ \ \_\ \ \L\ \ \ \//\ \L\ \ \ \_/\ \L\ \ 178 - \ \__/.\_\\ \__\\ \ ,__/\ \_\\ \____/\ \__\ \____/ 179 - \/__/\/_/ \/__/ \ \ \/ \/_/ \/___/ \/__/\/___/ 180 - \ \_\ 181 - \/_/ 182 - 183 - 184 - This is an AT Protocol Personal Data Server (aka, an atproto PDS) 185 - 186 - Most API routes are under /xrpc/ 187 - 188 - Code: https://github.com/DrChat/bluepds 189 - Protocol: https://atproto.com 190 - " 191 - } 192 - 193 - /// The main application entry point. 194 - #[expect( 195 - clippy::cognitive_complexity, 196 - clippy::too_many_lines, 197 - unused_qualifications, 198 - reason = "main function has high complexity" 199 - )] 200 - async fn run() -> anyhow::Result<()> { 201 - let args = Args::parse(); 202 - 203 - // Set up trace logging to console and account for the user-provided verbosity flag. 204 - if args.verbosity.log_level_filter() != LevelFilter::Off { 205 - let lvl = match args.verbosity.log_level_filter() { 206 - LevelFilter::Error => tracing::Level::ERROR, 207 - LevelFilter::Warn => tracing::Level::WARN, 208 - LevelFilter::Info | LevelFilter::Off => tracing::Level::INFO, 209 - LevelFilter::Debug => tracing::Level::DEBUG, 210 - LevelFilter::Trace => tracing::Level::TRACE, 211 - }; 212 - tracing_subscriber::fmt().with_max_level(lvl).init(); 213 - } 214 - 215 - if !args.config.exists() { 216 - // Throw up a warning if the config file does not exist. 217 - // 218 - // This is not fatal because users can specify all configuration settings via 219 - // the environment, but the most likely scenario here is that a user accidentally 220 - // omitted the config file for some reason (e.g. forgot to mount it into Docker). 221 - warn!( 222 - "configuration file {} does not exist", 223 - args.config.display() 224 - ); 225 - } 226 - 227 - // Read and parse the user-provided configuration. 228 - let config: AppConfig = Figment::new() 229 - .admerge(figment::providers::Toml::file(args.config)) 230 - .admerge(figment::providers::Env::prefixed("BLUEPDS_")) 231 - .extract() 232 - .context("failed to load configuration")?; 233 - 234 - if config.test { 235 - warn!("BluePDS starting up in TEST mode."); 236 - warn!("This means the application will not federate with the rest of the network."); 237 - warn!( 238 - "If you want to turn this off, either set `test` to false in the config or define `BLUEPDS_TEST = false`" 239 - ); 240 - } 241 - 242 - // Initialize metrics reporting. 243 - metrics::setup(config.metrics.as_ref()).context("failed to set up metrics exporter")?; 244 - 245 - // Create a reqwest client that will be used for all outbound requests. 246 - let simple_client = reqwest::Client::builder() 247 - .user_agent(APP_USER_AGENT) 248 - .build() 249 - .context("failed to build requester client")?; 250 - let client = reqwest_middleware::ClientBuilder::new(simple_client.clone()) 251 - .with(http_cache_reqwest::Cache(http_cache_reqwest::HttpCache { 252 - mode: CacheMode::Default, 253 - manager: MokaManager::default(), 254 - options: HttpCacheOptions::default(), 255 - })) 256 - .build(); 257 - 258 - tokio::fs::create_dir_all(&config.key.parent().context("should have parent")?) 259 - .await 260 - .context("failed to create key directory")?; 261 - 262 - // Check if crypto keys exist. If not, create new ones. 263 - let (skey, rkey) = if let Ok(f) = std::fs::File::open(&config.key) { 264 - let keys: KeyData = serde_ipld_dagcbor::from_reader(std::io::BufReader::new(f)) 265 - .context("failed to deserialize crypto keys")?; 266 - 267 - let skey = Secp256k1Keypair::import(&keys.skey).context("failed to import signing key")?; 268 - let rkey = Secp256k1Keypair::import(&keys.rkey).context("failed to import rotation key")?; 269 - 270 - (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey))) 271 - } else { 272 - info!("signing keys not found, generating new ones"); 273 - 274 - let skey = Secp256k1Keypair::create(&mut rand::thread_rng()); 275 - let rkey = Secp256k1Keypair::create(&mut rand::thread_rng()); 276 - 277 - let keys = KeyData { 278 - skey: skey.export(), 279 - rkey: rkey.export(), 280 - }; 281 - 282 - let mut f = std::fs::File::create(&config.key).context("failed to create key file")?; 283 - serde_ipld_dagcbor::to_writer(&mut f, &keys).context("failed to serialize crypto keys")?; 284 - 285 - (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey))) 286 - }; 287 - 288 - tokio::fs::create_dir_all(&config.repo.path).await?; 289 - tokio::fs::create_dir_all(&config.plc.path).await?; 290 - tokio::fs::create_dir_all(&config.blob.path).await?; 291 - 292 - let cred = azure_identity::DefaultAzureCredential::new() 293 - .context("failed to create Azure credential")?; 294 - 295 - // Create a database connection manager and pool for the main database. 296 - let pool = 297 - establish_pool(&config.db).context("failed to establish database connection pool")?; 298 - // Create a dictionary of database connection pools for each actor. 299 - let mut actor_pools = std::collections::HashMap::new(); 300 - // let mut actor_blob_pools = std::collections::HashMap::new(); 301 - // We'll determine actors by looking in the data/repo dir for .db files. 302 - let mut actor_dbs = tokio::fs::read_dir(&config.repo.path) 303 - .await 304 - .context("failed to read repo directory")?; 305 - while let Some(entry) = actor_dbs 306 - .next_entry() 307 - .await 308 - .context("failed to read repo dir")? 309 - { 310 - let path = entry.path(); 311 - if path.extension().and_then(|s| s.to_str()) == Some("db") { 312 - let did = path 313 - .file_stem() 314 - .and_then(|s| s.to_str()) 315 - .context("failed to get actor DID")?; 316 - let did = Did::from_str(did).expect("should be able to parse actor DID"); 317 - 318 - // Create a new database connection manager and pool for the actor. 319 - // The path for the SQLite connection needs to look like "sqlite://data/repo/<actor>.db" 320 - let path_repo = format!("sqlite://{}", path.display()); 321 - let actor_repo_pool = 322 - establish_pool(&path_repo).context("failed to create database connection pool")?; 323 - // Create a new database connection manager and pool for the actor blobs. 324 - // The path for the SQLite connection needs to look like "sqlite://data/blob/<actor>.db" 325 - let path_blob = path_repo.replace("repo", "blob"); 326 - let actor_blob_pool = 327 - establish_pool(&path_blob).context("failed to create database connection pool")?; 328 - drop(actor_pools.insert( 329 - did.to_string(), 330 - ActorPools { 331 - repo: actor_repo_pool, 332 - blob: actor_blob_pool, 333 - }, 334 - )); 335 - } 336 - } 337 - // Apply pending migrations 338 - // let conn = pool.get().await?; 339 - // conn.run_pending_migrations(MIGRATIONS) 340 - // .expect("should be able to run migrations"); 341 - 342 - let (_fh, fhp) = firehose::spawn(client.clone(), config.clone()); 343 - 344 - let addr = config 345 - .listen_address 346 - .unwrap_or(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000)); 347 - 348 - let app = Router::new() 349 - .route("/", get(index)) 350 - .merge(oauth::routes()) 351 - .nest( 352 - "/xrpc", 353 - endpoints::routes() 354 - .merge(actor_endpoints::routes()) 355 - .fallback(service_proxy), 356 - ) 357 - // .layer(RateLimitLayer::new(30, Duration::from_secs(30))) 358 - .layer(CorsLayer::permissive()) 359 - .layer(TraceLayer::new_for_http()) 360 - .with_state(AppState { 361 - cred, 362 - config: config.clone(), 363 - db: pool.clone(), 364 - db_actors: actor_pools.clone(), 365 - client: client.clone(), 366 - simple_client, 367 - firehose: fhp, 368 - signing_key: skey, 369 - rotation_key: rkey, 370 - }); 371 - 372 - info!("listening on {addr}"); 373 - info!("connect to: http://127.0.0.1:{}", addr.port()); 374 - 375 - // Determine whether or not this was the first startup (i.e. no accounts exist and no invite codes were created). 376 - // If so, create an invite code and share it via the console. 377 - let conn = pool.get().await.context("failed to get db connection")?; 378 - 379 - #[derive(QueryableByName)] 380 - struct TotalCount { 381 - #[diesel(sql_type = diesel::sql_types::Integer)] 382 - total_count: i32, 383 - } 384 - 385 - let result = conn.interact(move |conn| { 386 - diesel::sql_query( 387 - "SELECT (SELECT COUNT(*) FROM accounts) + (SELECT COUNT(*) FROM invites) AS total_count", 388 - ) 389 - .get_result::<TotalCount>(conn) 390 - }) 391 - .await 392 - .expect("should be able to query database")?; 393 - 394 - let c = result.total_count; 395 - 396 - #[expect(clippy::print_stdout)] 397 - if c == 0 { 398 - let uuid = Uuid::new_v4().to_string(); 399 - 400 - let uuid_clone = uuid.clone(); 401 - _ = conn 402 - .interact(move |conn| { 403 - diesel::sql_query( 404 - "INSERT INTO invites (id, did, count, created_at) VALUES (?, NULL, 1, datetime('now'))", 405 - ) 406 - .bind::<diesel::sql_types::Text, _>(uuid_clone) 407 - .execute(conn) 408 - .context("failed to create new invite code") 409 - .expect("should be able to create invite code") 410 - }) 411 - .await 412 - .expect("should be able to create invite code"); 413 - 414 - // N.B: This is a sensitive message, so we're bypassing `tracing` here and 415 - // logging it directly to console. 416 - println!("====================================="); 417 - println!(" FIRST STARTUP "); 418 - println!("====================================="); 419 - println!("Use this code to create an account:"); 420 - println!("{uuid}"); 421 - println!("====================================="); 422 - } 423 - 424 - let listener = TcpListener::bind(&addr) 425 - .await 426 - .context("failed to bind address")?; 427 - 428 - // Serve the app, and request crawling from upstream relays. 429 - let serve = tokio::spawn(async move { 430 - axum::serve(listener, app.into_make_service()) 431 - .await 432 - .context("failed to serve app") 433 - }); 434 - 435 - // Now that the app is live, request a crawl from upstream relays. 436 - firehose::reconnect_relays(&client, &config).await; 437 - 438 - serve 439 - .await 440 - .map_err(Into::into) 441 - .and_then(|r| r) 442 - .context("failed to serve app") 443 - } 444 5 445 6 #[tokio::main(flavor = "multi_thread")] 446 7 async fn main() -> anyhow::Result<()> { 447 - // Dispatch out to a separate function without a derive macro to help rust-analyzer along. 448 - run().await 449 - } 8 + // Parse command line arguments and call into the library's run function 9 + bluepds::run().await.context("failed to run application") 10 + }
+2 -2
src/oauth.rs
··· 577 577 .expect("Failed to query PAR request"); 578 578 579 579 // Authenticate the user 580 - use rsky_pds::schema::pds::account::dsl as AccountSchema; 581 - use rsky_pds::schema::pds::actor::dsl as ActorSchema; 580 + use crate::schema::pds::account::dsl as AccountSchema; 581 + use crate::schema::pds::actor::dsl as ActorSchema; 582 582 let username_clone = username.to_owned(); 583 583 let account = db 584 584 .get()
+193 -9
src/schema.rs
··· 4 4 // Legacy tables 5 5 6 6 diesel::table! { 7 - pds.oauth_par_requests (request_uri) { 7 + oauth_par_requests (request_uri) { 8 8 request_uri -> Varchar, 9 9 client_id -> Varchar, 10 10 response_type -> Varchar, ··· 21 21 } 22 22 } 23 23 diesel::table! { 24 - pds.oauth_authorization_codes (code) { 24 + oauth_authorization_codes (code) { 25 25 code -> Varchar, 26 26 client_id -> Varchar, 27 27 subject -> Varchar, ··· 35 35 } 36 36 } 37 37 diesel::table! { 38 - pds.oauth_refresh_tokens (token) { 38 + oauth_refresh_tokens (token) { 39 39 token -> Varchar, 40 40 client_id -> Varchar, 41 41 subject -> Varchar, ··· 47 47 } 48 48 } 49 49 diesel::table! { 50 - pds.oauth_used_jtis (jti) { 50 + oauth_used_jtis (jti) { 51 51 jti -> Varchar, 52 52 issuer -> Varchar, 53 53 created_at -> Int8, ··· 58 58 // Upcoming tables 59 59 60 60 diesel::table! { 61 - pds.authorization_request (id) { 61 + account (did) { 62 + did -> Varchar, 63 + email -> Varchar, 64 + recoveryKey -> Nullable<Varchar>, 65 + password -> Varchar, 66 + createdAt -> Varchar, 67 + invitesDisabled -> Int2, 68 + emailConfirmedAt -> Nullable<Varchar>, 69 + } 70 + } 71 + 72 + diesel::table! { 73 + account_pref (id) { 74 + id -> Int4, 75 + did -> Varchar, 76 + name -> Varchar, 77 + valueJson -> Nullable<Text>, 78 + } 79 + } 80 + 81 + diesel::table! { 82 + actor (did) { 83 + did -> Varchar, 84 + handle -> Nullable<Varchar>, 85 + createdAt -> Varchar, 86 + takedownRef -> Nullable<Varchar>, 87 + deactivatedAt -> Nullable<Varchar>, 88 + deleteAfter -> Nullable<Varchar>, 89 + } 90 + } 91 + 92 + diesel::table! { 93 + app_password (did, name) { 94 + did -> Varchar, 95 + name -> Varchar, 96 + password -> Varchar, 97 + createdAt -> Varchar, 98 + } 99 + } 100 + 101 + diesel::table! { 102 + authorization_request (id) { 62 103 id -> Varchar, 63 104 did -> Nullable<Varchar>, 64 105 deviceId -> Nullable<Varchar>, ··· 71 112 } 72 113 73 114 diesel::table! { 74 - pds.device (id) { 115 + backlink (uri, path) { 116 + uri -> Varchar, 117 + path -> Varchar, 118 + linkTo -> Varchar, 119 + } 120 + } 121 + 122 + diesel::table! { 123 + blob (cid, did) { 124 + cid -> Varchar, 125 + did -> Varchar, 126 + mimeType -> Varchar, 127 + size -> Int4, 128 + tempKey -> Nullable<Varchar>, 129 + width -> Nullable<Int4>, 130 + height -> Nullable<Int4>, 131 + createdAt -> Varchar, 132 + takedownRef -> Nullable<Varchar>, 133 + } 134 + } 135 + 136 + diesel::table! { 137 + device (id) { 75 138 id -> Varchar, 76 139 sessionId -> Nullable<Varchar>, 77 140 userAgent -> Nullable<Varchar>, ··· 81 144 } 82 145 83 146 diesel::table! { 84 - pds.device_account (deviceId, did) { 147 + device_account (deviceId, did) { 85 148 did -> Varchar, 86 149 deviceId -> Varchar, 87 150 authenticatedAt -> Timestamptz, ··· 91 154 } 92 155 93 156 diesel::table! { 94 - pds.token (id) { 157 + did_doc (did) { 158 + did -> Varchar, 159 + doc -> Text, 160 + updatedAt -> Int8, 161 + } 162 + } 163 + 164 + diesel::table! { 165 + email_token (purpose, did) { 166 + purpose -> Varchar, 167 + did -> Varchar, 168 + token -> Varchar, 169 + requestedAt -> Varchar, 170 + } 171 + } 172 + 173 + diesel::table! { 174 + invite_code (code) { 175 + code -> Varchar, 176 + availableUses -> Int4, 177 + disabled -> Int2, 178 + forAccount -> Varchar, 179 + createdBy -> Varchar, 180 + createdAt -> Varchar, 181 + } 182 + } 183 + 184 + diesel::table! { 185 + invite_code_use (code, usedBy) { 186 + code -> Varchar, 187 + usedBy -> Varchar, 188 + usedAt -> Varchar, 189 + } 190 + } 191 + 192 + diesel::table! { 193 + record (uri) { 194 + uri -> Varchar, 195 + cid -> Varchar, 196 + did -> Varchar, 197 + collection -> Varchar, 198 + rkey -> Varchar, 199 + repoRev -> Nullable<Varchar>, 200 + indexedAt -> Varchar, 201 + takedownRef -> Nullable<Varchar>, 202 + } 203 + } 204 + 205 + diesel::table! { 206 + record_blob (blobCid, recordUri) { 207 + blobCid -> Varchar, 208 + recordUri -> Varchar, 209 + did -> Varchar, 210 + } 211 + } 212 + 213 + diesel::table! { 214 + refresh_token (id) { 215 + id -> Varchar, 216 + did -> Varchar, 217 + expiresAt -> Varchar, 218 + nextId -> Nullable<Varchar>, 219 + appPasswordName -> Nullable<Varchar>, 220 + } 221 + } 222 + 223 + diesel::table! { 224 + repo_block (cid, did) { 225 + cid -> Varchar, 226 + did -> Varchar, 227 + repoRev -> Varchar, 228 + size -> Int4, 229 + content -> Bytea, 230 + } 231 + } 232 + 233 + diesel::table! { 234 + repo_root (did) { 235 + did -> Varchar, 236 + cid -> Varchar, 237 + rev -> Varchar, 238 + indexedAt -> Varchar, 239 + } 240 + } 241 + 242 + diesel::table! { 243 + repo_seq (seq) { 244 + seq -> Int8, 245 + did -> Varchar, 246 + eventType -> Varchar, 247 + event -> Bytea, 248 + invalidated -> Int2, 249 + sequencedAt -> Varchar, 250 + } 251 + } 252 + 253 + diesel::table! { 254 + token (id) { 95 255 id -> Varchar, 96 256 did -> Varchar, 97 257 tokenId -> Varchar, ··· 109 269 } 110 270 111 271 diesel::table! { 112 - pds.used_refresh_token (refreshToken) { 272 + used_refresh_token (refreshToken) { 113 273 refreshToken -> Varchar, 114 274 tokenId -> Varchar, 115 275 } 116 276 } 277 + 278 + diesel::allow_tables_to_appear_in_same_query!( 279 + account, 280 + account_pref, 281 + actor, 282 + app_password, 283 + authorization_request, 284 + backlink, 285 + blob, 286 + device, 287 + device_account, 288 + did_doc, 289 + email_token, 290 + invite_code, 291 + invite_code_use, 292 + record, 293 + record_blob, 294 + refresh_token, 295 + repo_block, 296 + repo_root, 297 + repo_seq, 298 + token, 299 + used_refresh_token, 300 + ); 117 301 }