Skip to content

feat: add minimal implementation of changesStream for registry mirroring #1025

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions api/migrations/20250402143550_create_changes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE TYPE change_type AS ENUM (
'PACKAGE_VERSION_ADDED',
'PACKAGE_TAG_ADDED'
);

CREATE TABLE changes (
seq BIGSERIAL PRIMARY KEY,
change_type change_type NOT NULL,
package_id VARCHAR(255) NOT NULL,
data TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX changes_package_id_idx ON changes (package_id);
CREATE INDEX changes_created_at_idx ON changes (created_at);
36 changes: 36 additions & 0 deletions api/src/api/changes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2024 the JSR authors. All rights reserved. MIT license.
use hyper::{Body, Request};
use routerify::prelude::*;
use serde::Serialize;

use crate::{
db::{Change, Database},
util::{pagination, ApiResult},
};


#[derive(Serialize)]
pub struct ApiChange {
pub seq: i64,
pub r#type: String,
pub id: String,
pub changes: serde_json::Value,
}

impl From<Change> for ApiChange {
fn from(change: Change) -> Self {
Self {
seq: change.seq,
r#type: change.change_type.to_string(),
id: change.package_id,
changes: serde_json::from_str(&change.data).unwrap(),
}
}
}

pub async fn list_changes(req: Request<Body>) -> ApiResult<Vec<ApiChange>> {
let db = req.data::<Database>().unwrap();
let (start, limit) = pagination(&req);
let changes = db.list_changes(start, limit).await?;
Ok(changes.into_iter().map(ApiChange::from).collect())
}
4 changes: 4 additions & 0 deletions api/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ mod scope;
mod self_user;
mod types;
mod users;
mod changes;


use hyper::Body;
use hyper::Response;
use package::global_list_handler;
use package::global_metrics_handler;
use package::global_stats_handler;
use changes::list_changes;
use routerify::Middleware;
use routerify::Router;

Expand All @@ -40,6 +43,7 @@ pub fn api_router() -> Router<Body, ApiError> {
util::json(global_metrics_handler),
),
)
.get("/_changes", util::json(list_changes))
.middleware(Middleware::pre(util::auth_middleware))
.scope("/admin", admin_router())
.scope("/scopes", scope_router())
Expand Down
56 changes: 56 additions & 0 deletions api/src/db/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,62 @@ impl Database {
.await
}

#[instrument(name = "Database::list_changes", skip(self), err)]
pub async fn list_changes(
&self,
since: i64,
limit: i64,
) -> Result<Vec<Change>> {
sqlx::query_as!(
Change,
r#"
SELECT
seq,
change_type as "change_type: ChangeType",
package_id,
data,
created_at
FROM changes
WHERE seq > $1
ORDER BY seq ASC
LIMIT $2
"#,
since,
limit
)
.fetch_all(&self.pool)
.await
}


#[instrument(name = "Database::create_change", skip(self), err)]
pub async fn create_change(
&self,
change_type: ChangeType,
package_id: String,
data: serde_json::Value,
) -> Result<Change> {
sqlx::query_as!(
Change,
r#"
INSERT INTO changes (change_type, package_id, data)
VALUES ($1, $2, $3)
RETURNING
seq,
change_type as "change_type: ChangeType",
package_id,
data,
created_at
"#,
change_type as _,
package_id,
data.to_string()
)
.fetch_one(&self.pool)
.await
}


#[instrument(name = "Database::get_user_public", skip(self), err)]
pub async fn get_user_public(&self, id: Uuid) -> Result<Option<UserPublic>> {
sqlx::query_as!(
Expand Down
34 changes: 34 additions & 0 deletions api/src/db/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,3 +824,37 @@ impl sqlx::postgres::PgHasArrayType for DownloadKind {
sqlx::postgres::PgTypeInfo::with_name("_download_kind")
}
}

#[derive(Debug, Clone, Serialize, Deserialize, sqlx::Type)]
#[sqlx(type_name = "change_type", rename_all = "SCREAMING_SNAKE_CASE")]
#[serde(rename_all = "snake_case")]
pub enum ChangeType {

PackageVersionAdded,
PackageTagAdded,
}

impl std::fmt::Display for ChangeType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::PackageVersionAdded => write!(f, "PACKAGE_VERSION_ADDED"),
Self::PackageTagAdded => write!(f, "PACKAGE_TAG_ADDED"),
}
}
}

#[derive(Debug, Clone)]
pub struct Change {
pub seq: i64,
pub change_type: ChangeType,
pub package_id: String,
pub data: String,
pub created_at: DateTime<Utc>,
}

#[derive(Debug)]
pub struct NewChange<'s> {
pub change_type: ChangeType,
pub package_id: &'s str,
pub data: &'s str,
}
20 changes: 20 additions & 0 deletions api/src/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::collections::HashSet;
use crate::api::ApiError;
use crate::buckets::Buckets;
use crate::buckets::UploadTaskBody;
use crate::db::ChangeType;
use crate::db::Database;
use crate::db::DependencyKind;
use crate::db::ExportsMap;
Expand Down Expand Up @@ -234,6 +235,25 @@ async fn process_publishing_task(
);
}

tokio::spawn({
let db = db.clone();
let scope = publishing_task.package_scope.clone();
let name = publishing_task.package_name.clone();
let version = publishing_task.package_version.clone();

async move {
if let Err(e) = db.create_change(
ChangeType::PackageVersionAdded,
format!("@{}/{}", scope, name),
serde_json::json!({
"version": version.to_string(),
}),
).await {
error!("Failed to create change record: {}", e);
}
}
});

Ok(())
}

Expand Down
16 changes: 10 additions & 6 deletions api/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,17 @@ pub fn pagination(req: &Request<Body>) -> (i64, i64) {
.and_then(|page| page.parse::<i64>().ok())
.unwrap_or(100)
.clamp(1, 100);
let page = req
.query("page")
.and_then(|page| page.parse::<i64>().ok())
.unwrap_or(1)
.max(1);

let start = (page * limit) - limit;
let start = if let Some(since) = req.query("since").and_then(|s| s.parse::<i64>().ok()) {
since
} else {
let page = req
.query("page")
.and_then(|page| page.parse::<i64>().ok())
.unwrap_or(1)
.max(1);
(page * limit) - limit
};

(start, limit)
}
Expand Down
Loading