Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions morango/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,5 +178,6 @@ class Meta:
"profile",
"rmcb_list",
"_self_ref_fk",
"_self_ref_order",
)
read_only_fields = fields
1 change: 1 addition & 0 deletions morango/constants/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
ALLOW_CERTIFICATE_PUSHING = "ALLOW_CERTIFICATE_PUSHING"
ASYNC_OPERATIONS = "ASYNC_OPERATIONS"
FSIC_V2_FORMAT = "FSIC_V2_FORMAT"
SELF_REF_ORDER = "SELF_REF_ORDER"
24 changes: 24 additions & 0 deletions morango/migrations/0003_auto_20260422_1053.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Generated by Django 3.2.25 on 2026-04-22 10:53

import django.core.validators
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('morango', '0002_store_idx_morango_deserialize'),
]

operations = [
migrations.AddField(
model_name='buffer',
name='_self_ref_order',
field=models.IntegerField(blank=True, null=True, validators=[django.core.validators.MinValueValidator(0)]),
),
migrations.AddField(
model_name='store',
name='_self_ref_order',
field=models.IntegerField(blank=True, null=True, validators=[django.core.validators.MinValueValidator(0)]),
),
]
5 changes: 5 additions & 0 deletions morango/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from functools import reduce

from django.core import exceptions
from django.core.validators import MinValueValidator
from django.db import connection, models, router, transaction
from django.db.models import F, Func, Max, Q, TextField, Value, signals
from django.db.models.deletion import Collector
Expand Down Expand Up @@ -391,6 +392,9 @@ class AbstractStore(models.Model):
conflicting_serialized_data = models.TextField(blank=True)

_self_ref_fk = models.CharField(max_length=32, blank=True)
_self_ref_order = models.IntegerField(
blank=True, null=True, validators=[MinValueValidator(0)]
)

class Meta:
abstract = True
Expand Down Expand Up @@ -777,6 +781,7 @@ class SyncableModel(UUIDModelMixin):

_morango_internal_fields_not_to_serialize = ("_morango_dirty_bit",)
morango_model_dependencies = ()
morango_ordering = ()
morango_fields_not_to_serialize = ()
morango_profile = None

Expand Down
23 changes: 21 additions & 2 deletions morango/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from collections import OrderedDict
from typing import Generator

from django.db.models import QuerySet
from django.db.models import F, QuerySet
from django.db.models.fields.related import ForeignKey

from morango.constants import transfer_stages
Expand Down Expand Up @@ -85,7 +85,26 @@ def get_model_querysets(self, profile) -> Generator[QuerySet, None, None]:
(particularly, an order) that is aware of FK dependencies.
"""
for model in self.get_models(profile):
yield model.syncing_objects.all()
queryset = model.syncing_objects.all()
ordering = getattr(model, "morango_ordering", ())
if ordering:
queryset = queryset.order_by(*self._get_nulls_last_ordering(ordering))
yield queryset

@staticmethod
def _get_nulls_last_ordering(ordering):
normalized = []
for order_expr in ordering:
if isinstance(order_expr, str):
descending = order_expr.startswith("-")
field_name = order_expr[1:] if descending else order_expr
if descending:
normalized.append(F(field_name).desc(nulls_last=True))
else:
normalized.append(F(field_name).asc(nulls_last=True))
else:
normalized.append(order_expr)
return normalized

def _insert_model_in_dependency_order(self, model, profile):
# When we add models to be synced, we need to make sure
Expand Down
14 changes: 7 additions & 7 deletions morango/sync/backends/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,10 @@ def _dequeuing_merge_conflict_rmcb(self, cursor, transfersession_id):
def _dequeuing_merge_conflict_buffer(self, cursor, current_id, transfersession_id):
# transfer buffer serialized into conflicting store
merge_conflict_store = """UPDATE {store} as store SET (serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name,
profile, partition, source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, deserialization_error, last_transfer_session_id)
profile, partition, source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, _self_ref_order, deserialization_error, last_transfer_session_id)
= (CASE buffer.hard_deleted WHEN TRUE THEN '' ELSE store.serialized END, store.deleted OR buffer.deleted, '{current_instance_id}',
{current_instance_counter}, store.hard_deleted, store.model_name, store.profile, store.partition, store.source_id,
CASE buffer.hard_deleted WHEN TRUE THEN '' ELSE buffer.serialized || '\n' || store.conflicting_serialized_data END, TRUE, store._self_ref_fk,
CASE buffer.hard_deleted WHEN TRUE THEN '' ELSE buffer.serialized || '\n' || store.conflicting_serialized_data END, TRUE, store._self_ref_fk, store._self_ref_order,
'', '{transfer_session_id}')
/*Scope to a single record.*/
FROM {buffer} AS buffer
Expand Down Expand Up @@ -278,26 +278,26 @@ def _dequeuing_insert_remaining_buffer(self, cursor, transfersession_id):
WITH new_values as
(
SELECT buffer.model_uuid, buffer.serialized, buffer.deleted, buffer.last_saved_instance, buffer.last_saved_counter, buffer.hard_deleted,
buffer.model_name, buffer.profile, buffer.partition, buffer.source_id, buffer.conflicting_serialized_data, buffer._self_ref_fk
buffer.model_name, buffer.profile, buffer.partition, buffer.source_id, buffer.conflicting_serialized_data, buffer._self_ref_fk, buffer._self_ref_order
FROM {buffer} as buffer
WHERE buffer.transfer_session_id = '{transfer_session_id}'
),
updated as
(
UPDATE {store} store SET (serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile,
partition, source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, deserialization_error, last_transfer_session_id)
partition, source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, _self_ref_order, deserialization_error, last_transfer_session_id)
= (nv.serialized, nv.deleted, nv.last_saved_instance, nv.last_saved_counter, nv.hard_deleted,
nv.model_name, nv.profile, nv.partition, nv.source_id, nv.conflicting_serialized_data, TRUE,
nv._self_ref_fk, '', '{transfer_session_id}')
nv._self_ref_fk, nv._self_ref_order, '', '{transfer_session_id}')
FROM new_values nv
WHERE nv.model_uuid = store.id
returning store.*
)
INSERT INTO {store}(id, serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile,
partition, source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, deserialization_error, last_transfer_session_id)
partition, source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, _self_ref_order, deserialization_error, last_transfer_session_id)
SELECT ut.model_uuid, ut.serialized, ut.deleted, ut.last_saved_instance, ut.last_saved_counter, ut.hard_deleted,
ut.model_name, ut.profile, ut.partition, ut.source_id, ut.conflicting_serialized_data, TRUE,
ut._self_ref_fk, '', '{transfer_session_id}'
ut._self_ref_fk, ut._self_ref_order, '', '{transfer_session_id}'
FROM new_values ut
WHERE ut.model_uuid not in (SELECT id FROM updated)
""".format(
Expand Down
8 changes: 4 additions & 4 deletions morango/sync/backends/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ def _dequeuing_merge_conflict_rmcb(self, cursor, transfersession_id):
def _dequeuing_merge_conflict_buffer(self, cursor, current_id, transfersession_id):
# transfer buffer serialized into conflicting store
merge_conflict_store = """REPLACE INTO {store} (id, serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile, partition,
source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, deserialization_error, last_transfer_session_id)
source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, _self_ref_order, deserialization_error, last_transfer_session_id)
SELECT store.id, CASE buffer.hard_deleted WHEN 1 THEN '' ELSE store.serialized END, store.deleted OR buffer.deleted, '{current_instance_id}',
{current_instance_counter}, store.hard_deleted OR buffer.hard_deleted, store.model_name, store.profile, store.partition, store.source_id,
CASE buffer.hard_deleted WHEN 1 THEN '' ELSE buffer.serialized || '\n' || store.conflicting_serialized_data END, 1, store._self_ref_fk,
CASE buffer.hard_deleted WHEN 1 THEN '' ELSE buffer.serialized || '\n' || store.conflicting_serialized_data END, 1, store._self_ref_fk, store._self_ref_order,
'', '{transfer_session_id}'
FROM {buffer} AS buffer, {store} AS store
/*Scope to a single record.*/
Expand Down Expand Up @@ -191,10 +191,10 @@ def _dequeuing_update_rmcs_last_saved_by(
def _dequeuing_insert_remaining_buffer(self, cursor, transfersession_id):
# insert remaining records into store
insert_remaining_buffer = """REPLACE INTO {store} (id, serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile, partition,
source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, deserialization_error, last_transfer_session_id)
source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, _self_ref_order, deserialization_error, last_transfer_session_id)
SELECT buffer.model_uuid, buffer.serialized, buffer.deleted, buffer.last_saved_instance, buffer.last_saved_counter, buffer.hard_deleted,
buffer.model_name, buffer.profile, buffer.partition, buffer.source_id, buffer.conflicting_serialized_data, 1,
buffer._self_ref_fk, '', '{transfer_session_id}'
buffer._self_ref_fk, buffer._self_ref_order, '', '{transfer_session_id}'
FROM {buffer} AS buffer
WHERE buffer.transfer_session_id = '{transfer_session_id}'
""".format(
Expand Down
51 changes: 46 additions & 5 deletions morango/sync/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
from django.core import exceptions
from django.db import connection
from django.db.models import CharField
from django.db.models import Exists
from django.db.models import OuterRef
from django.db.models import Q
from django.db.models import Subquery
from django.db.models import signals
from django.db.utils import OperationalError
from django.utils import timezone
Expand All @@ -19,6 +22,7 @@
from morango.constants import transfer_statuses
from morango.constants.capabilities import ASYNC_OPERATIONS
from morango.constants.capabilities import FSIC_V2_FORMAT
from morango.constants.capabilities import SELF_REF_ORDER
from morango.errors import MorangoInvalidFSICPartition
from morango.errors import MorangoLimitExceeded
from morango.errors import MorangoResumeSyncError
Expand Down Expand Up @@ -483,7 +487,7 @@ def _queue_into_buffer_v1(transfersession):
"""SELECT
id, serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile,
partition, source_id, conflicting_serialized_data,
CAST ('{transfer_session_id}' AS {transfer_session_id_type}), _self_ref_fk
CAST ('{transfer_session_id}' AS {transfer_session_id_type}), _self_ref_fk, _self_ref_order
FROM {store} WHERE {condition}
""".format(
transfer_session_id=transfersession.id,
Expand Down Expand Up @@ -514,7 +518,7 @@ def _queue_into_buffer_v1(transfersession):
"""INSERT INTO {outgoing_buffer}
(model_uuid, serialized, deleted, last_saved_instance, last_saved_counter,
hard_deleted, model_name, profile, partition, source_id, conflicting_serialized_data,
transfer_session_id, _self_ref_fk)
transfer_session_id, _self_ref_fk, _self_ref_order)
{select}
""".format(
outgoing_buffer=Buffer._meta.db_table,
Expand Down Expand Up @@ -632,7 +636,7 @@ def _queue_into_buffer_v2(transfersession, chunk_size=200):
"""SELECT
id, serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile,
partition, source_id, conflicting_serialized_data,
CAST ('{transfer_session_id}' AS {transfer_session_id_type}), _self_ref_fk
CAST ('{transfer_session_id}' AS {transfer_session_id_type}), _self_ref_fk, _self_ref_order
FROM {store} WHERE {condition}
""".format(
transfer_session_id=transfersession.id,
Expand Down Expand Up @@ -661,7 +665,7 @@ def _queue_into_buffer_v2(transfersession, chunk_size=200):
"""INSERT INTO {outgoing_buffer}
(model_uuid, serialized, deleted, last_saved_instance, last_saved_counter,
hard_deleted, model_name, profile, partition, source_id, conflicting_serialized_data,
transfer_session_id, _self_ref_fk)
transfer_session_id, _self_ref_fk, _self_ref_order)
{select}
""".format(
outgoing_buffer=Buffer._meta.db_table,
Expand All @@ -679,7 +683,41 @@ def _queue_into_buffer_v2(transfersession, chunk_size=200):
)


def _dequeue_into_store(transfer_session, fsic, v2_format=False):
def _update_legacy_self_ref_order_for_model(queryset):
# root nodes set the _self_ref_order to 0
queryset.filter(_self_ref_fk="").exclude(_self_ref_order=0).update(_self_ref_order=0)
# reset the _self_ref_order to None for all records that have a parent
queryset.exclude(_self_ref_fk="").exclude(_self_ref_order=None).update(
_self_ref_order=None
)

parent = Store.objects.filter(
id=OuterRef("_self_ref_fk"),
_self_ref_order__isnull=False,
)
parent_order = parent.values("_self_ref_order")[:1]
pending = queryset.exclude(_self_ref_fk="").filter(_self_ref_order=None)

while pending.filter(Exists(parent)).update(_self_ref_order=Subquery(parent_order) + 1):
pass


def _update_legacy_self_ref_order(transfer_session):
profile = transfer_session.sync_session.profile
transferred_store_records = Store.objects.filter(
last_transfer_session_id=transfer_session.id,
profile=profile,
)

for Model in syncable_models.get_models(profile):
queryset = transferred_store_records.filter(model_name=Model.morango_model_name)
if self_referential_fk(Model):
_update_legacy_self_ref_order_for_model(queryset)
else:
queryset.exclude(_self_ref_order=None).update(_self_ref_order=None)


def _dequeue_into_store(transfer_session, fsic, v2_format=False, self_ref_order=True):
"""
Takes data from the buffers and merges into the store and record max counters.

Expand All @@ -703,6 +741,8 @@ def _dequeue_into_store(transfer_session, fsic, v2_format=False):
DBBackend._dequeuing_delete_mc_buffer(cursor, transfer_session.id)
DBBackend._dequeuing_insert_remaining_buffer(cursor, transfer_session.id)
DBBackend._dequeuing_insert_remaining_rmcb(cursor, transfer_session.id)
if not self_ref_order:
_update_legacy_self_ref_order(transfer_session)
DBBackend._dequeuing_delete_remaining_rmcb(cursor, transfer_session.id)
DBBackend._dequeuing_delete_remaining_buffer(cursor, transfer_session.id)

Expand Down Expand Up @@ -1041,6 +1081,7 @@ def handle(self, context):
context.transfer_session,
fsic,
v2_format=FSIC_V2_FORMAT in context.capabilities,
self_ref_order=SELF_REF_ORDER in context.capabilities,
)

return transfer_statuses.COMPLETED
Expand Down
29 changes: 27 additions & 2 deletions morango/sync/stream/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ def _handle_store_update(self, task: SerializeTask):
# clear last_transfer_session_id
task.store.last_transfer_session_id = None

self_ref_fk = task.self_referential_fk()
if self_ref_fk:
new_fk_value = getattr(task.obj, self_ref_fk) or ""
if new_fk_value != task.store._self_ref_fk:
task.store._self_ref_fk = new_fk_value
task.store._self_ref_order = self._compute_self_ref_order(new_fk_value)

def _handle_store_create(self, task: SerializeTask):
kwargs = {
"id": task.obj.id,
Expand All @@ -200,11 +207,29 @@ def _handle_store_create(self, task: SerializeTask):

self_ref_fk = task.self_referential_fk()
if self_ref_fk:
self_ref_fk_value = getattr(task.obj, self_ref_fk)
kwargs["_self_ref_fk"] = self_ref_fk_value or ""
self_ref_fk_value = getattr(task.obj, self_ref_fk) or ""
kwargs["_self_ref_fk"] = self_ref_fk_value
kwargs["_self_ref_order"] = self._compute_self_ref_order(self_ref_fk_value)

task.set_store(Store(**kwargs))

@staticmethod
def _compute_self_ref_order(self_ref_fk_value):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So from our discussion on slack and seeing this, I think some of the behaviors have been taken very literally, which is my fault for not making that clearer.

if _self_ref_fk is None and the model is self-referential, the order field should be 0
otherwise, it should query for the parent's order (the Store._self_ref_order for id=_self_ref_fk)

We might need to query the database for this, but I think it's worth thinking through how and when this should occur. This satisfies the criteria of the behavior, but the engineering choices are up to you. The behaviors are not meant to be taken literally that this functionality should explicitly be placed here.

I say this for a couple reasons:

  1. Look at how we manage database calls-- in StoreLookup we do bulk queries for the store records, and how StoreUpdate actually does none. There's room for improvement in managing these queries.
  2. The circumstances may not always work with the pipeline because it buffers records. On this note, I'm thinking in regards to this test case test_handle_store_create__self_ref_parent_not_in_store and whether that is an acceptable outcome.

More on (2): say we have our buffer batch size set to 2. In the first batch, we have a parent and a child. Since the batch size is two, the parent won't be saved yet when the child is processed, so this function should result in None. Then lets say batch #2 has two more children, of the first child (grandchildren). The function should find the store, but the store would have a None order, leading to None for the grandchildren. Thus we have a parent-child relationship which could get out of order because of None.

"""
Compute ``_self_ref_order`` for a self-referential store record.

Returns ``0`` when the record has no parent (root), otherwise queries
the parent ``Store`` row and returns the next order value.
"""
if not self_ref_fk_value:
return 0
parent_order = (
Store.objects.filter(id=self_ref_fk_value)
.values_list("_self_ref_order", flat=True)
.first()
)
return parent_order + 1 if parent_order is not None else None


class ModelPartitionBuffer(Buffer[List[SerializeTask]]):
"""Buffers tasks into chunks that have the same model class."""
Expand Down
3 changes: 3 additions & 0 deletions morango/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
ASYNC_OPERATIONS,
FSIC_V2_FORMAT,
GZIP_BUFFER_POST,
SELF_REF_ORDER,
)


Expand Down Expand Up @@ -61,6 +62,8 @@ def get_capabilities():
if not SETTINGS.MORANGO_DISALLOW_ASYNC_OPERATIONS:
capabilities.add(ASYNC_OPERATIONS)

capabilities.add(SELF_REF_ORDER)

return capabilities


Expand Down
Loading
Loading