diff --git a/morango/api/serializers.py b/morango/api/serializers.py index 658ed19..8b9a8dc 100644 --- a/morango/api/serializers.py +++ b/morango/api/serializers.py @@ -178,5 +178,6 @@ class Meta: "profile", "rmcb_list", "_self_ref_fk", + "_self_ref_order", ) read_only_fields = fields diff --git a/morango/constants/capabilities.py b/morango/constants/capabilities.py index 840626b..8e6afb1 100644 --- a/morango/constants/capabilities.py +++ b/morango/constants/capabilities.py @@ -2,3 +2,4 @@ ALLOW_CERTIFICATE_PUSHING = "ALLOW_CERTIFICATE_PUSHING" ASYNC_OPERATIONS = "ASYNC_OPERATIONS" FSIC_V2_FORMAT = "FSIC_V2_FORMAT" +SELF_REF_ORDER = "SELF_REF_ORDER" diff --git a/morango/migrations/0003_auto_20260422_1053.py b/morango/migrations/0003_auto_20260422_1053.py new file mode 100644 index 0000000..048bf29 --- /dev/null +++ b/morango/migrations/0003_auto_20260422_1053.py @@ -0,0 +1,24 @@ +# Generated by Django 3.2.25 on 2026-04-22 10:53 + +import django.core.validators +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('morango', '0002_store_idx_morango_deserialize'), + ] + + operations = [ + migrations.AddField( + model_name='buffer', + name='_self_ref_order', + field=models.IntegerField(blank=True, null=True, validators=[django.core.validators.MinValueValidator(0)]), + ), + migrations.AddField( + model_name='store', + name='_self_ref_order', + field=models.IntegerField(blank=True, null=True, validators=[django.core.validators.MinValueValidator(0)]), + ), + ] diff --git a/morango/models/core.py b/morango/models/core.py index 06abf3f..441ba39 100644 --- a/morango/models/core.py +++ b/morango/models/core.py @@ -6,6 +6,7 @@ from functools import reduce from django.core import exceptions +from django.core.validators import MinValueValidator from django.db import connection, models, router, transaction from django.db.models import F, Func, Max, Q, TextField, Value, signals from django.db.models.deletion import Collector @@ -391,6 +392,9 @@ class AbstractStore(models.Model): conflicting_serialized_data = models.TextField(blank=True) _self_ref_fk = models.CharField(max_length=32, blank=True) + _self_ref_order = models.IntegerField( + blank=True, null=True, validators=[MinValueValidator(0)] + ) class Meta: abstract = True @@ -777,6 +781,7 @@ class SyncableModel(UUIDModelMixin): _morango_internal_fields_not_to_serialize = ("_morango_dirty_bit",) morango_model_dependencies = () + morango_ordering = () morango_fields_not_to_serialize = () morango_profile = None diff --git a/morango/registry.py b/morango/registry.py index 25a1f92..ea99de8 100644 --- a/morango/registry.py +++ b/morango/registry.py @@ -8,7 +8,7 @@ from collections import OrderedDict from typing import Generator -from django.db.models import QuerySet +from django.db.models import F, QuerySet from django.db.models.fields.related import ForeignKey from morango.constants import transfer_stages @@ -85,7 +85,26 @@ def get_model_querysets(self, profile) -> Generator[QuerySet, None, None]: (particularly, an order) that is aware of FK dependencies. """ for model in self.get_models(profile): - yield model.syncing_objects.all() + queryset = model.syncing_objects.all() + ordering = getattr(model, "morango_ordering", ()) + if ordering: + queryset = queryset.order_by(*self._get_nulls_last_ordering(ordering)) + yield queryset + + @staticmethod + def _get_nulls_last_ordering(ordering): + normalized = [] + for order_expr in ordering: + if isinstance(order_expr, str): + descending = order_expr.startswith("-") + field_name = order_expr[1:] if descending else order_expr + if descending: + normalized.append(F(field_name).desc(nulls_last=True)) + else: + normalized.append(F(field_name).asc(nulls_last=True)) + else: + normalized.append(order_expr) + return normalized def _insert_model_in_dependency_order(self, model, profile): # When we add models to be synced, we need to make sure diff --git a/morango/sync/backends/postgres.py b/morango/sync/backends/postgres.py index e466ded..efefcca 100644 --- a/morango/sync/backends/postgres.py +++ b/morango/sync/backends/postgres.py @@ -204,10 +204,10 @@ def _dequeuing_merge_conflict_rmcb(self, cursor, transfersession_id): def _dequeuing_merge_conflict_buffer(self, cursor, current_id, transfersession_id): # transfer buffer serialized into conflicting store merge_conflict_store = """UPDATE {store} as store SET (serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, - profile, partition, source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, deserialization_error, last_transfer_session_id) + profile, partition, source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, _self_ref_order, deserialization_error, last_transfer_session_id) = (CASE buffer.hard_deleted WHEN TRUE THEN '' ELSE store.serialized END, store.deleted OR buffer.deleted, '{current_instance_id}', {current_instance_counter}, store.hard_deleted, store.model_name, store.profile, store.partition, store.source_id, - CASE buffer.hard_deleted WHEN TRUE THEN '' ELSE buffer.serialized || '\n' || store.conflicting_serialized_data END, TRUE, store._self_ref_fk, + CASE buffer.hard_deleted WHEN TRUE THEN '' ELSE buffer.serialized || '\n' || store.conflicting_serialized_data END, TRUE, store._self_ref_fk, store._self_ref_order, '', '{transfer_session_id}') /*Scope to a single record.*/ FROM {buffer} AS buffer @@ -278,26 +278,26 @@ def _dequeuing_insert_remaining_buffer(self, cursor, transfersession_id): WITH new_values as ( SELECT buffer.model_uuid, buffer.serialized, buffer.deleted, buffer.last_saved_instance, buffer.last_saved_counter, buffer.hard_deleted, - buffer.model_name, buffer.profile, buffer.partition, buffer.source_id, buffer.conflicting_serialized_data, buffer._self_ref_fk + buffer.model_name, buffer.profile, buffer.partition, buffer.source_id, buffer.conflicting_serialized_data, buffer._self_ref_fk, buffer._self_ref_order FROM {buffer} as buffer WHERE buffer.transfer_session_id = '{transfer_session_id}' ), updated as ( UPDATE {store} store SET (serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile, - partition, source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, deserialization_error, last_transfer_session_id) + partition, source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, _self_ref_order, deserialization_error, last_transfer_session_id) = (nv.serialized, nv.deleted, nv.last_saved_instance, nv.last_saved_counter, nv.hard_deleted, nv.model_name, nv.profile, nv.partition, nv.source_id, nv.conflicting_serialized_data, TRUE, - nv._self_ref_fk, '', '{transfer_session_id}') + nv._self_ref_fk, nv._self_ref_order, '', '{transfer_session_id}') FROM new_values nv WHERE nv.model_uuid = store.id returning store.* ) INSERT INTO {store}(id, serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile, - partition, source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, deserialization_error, last_transfer_session_id) + partition, source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, _self_ref_order, deserialization_error, last_transfer_session_id) SELECT ut.model_uuid, ut.serialized, ut.deleted, ut.last_saved_instance, ut.last_saved_counter, ut.hard_deleted, ut.model_name, ut.profile, ut.partition, ut.source_id, ut.conflicting_serialized_data, TRUE, - ut._self_ref_fk, '', '{transfer_session_id}' + ut._self_ref_fk, ut._self_ref_order, '', '{transfer_session_id}' FROM new_values ut WHERE ut.model_uuid not in (SELECT id FROM updated) """.format( diff --git a/morango/sync/backends/sqlite.py b/morango/sync/backends/sqlite.py index c0c60fa..91bf8d2 100644 --- a/morango/sync/backends/sqlite.py +++ b/morango/sync/backends/sqlite.py @@ -137,10 +137,10 @@ def _dequeuing_merge_conflict_rmcb(self, cursor, transfersession_id): def _dequeuing_merge_conflict_buffer(self, cursor, current_id, transfersession_id): # transfer buffer serialized into conflicting store merge_conflict_store = """REPLACE INTO {store} (id, serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile, partition, - source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, deserialization_error, last_transfer_session_id) + source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, _self_ref_order, deserialization_error, last_transfer_session_id) SELECT store.id, CASE buffer.hard_deleted WHEN 1 THEN '' ELSE store.serialized END, store.deleted OR buffer.deleted, '{current_instance_id}', {current_instance_counter}, store.hard_deleted OR buffer.hard_deleted, store.model_name, store.profile, store.partition, store.source_id, - CASE buffer.hard_deleted WHEN 1 THEN '' ELSE buffer.serialized || '\n' || store.conflicting_serialized_data END, 1, store._self_ref_fk, + CASE buffer.hard_deleted WHEN 1 THEN '' ELSE buffer.serialized || '\n' || store.conflicting_serialized_data END, 1, store._self_ref_fk, store._self_ref_order, '', '{transfer_session_id}' FROM {buffer} AS buffer, {store} AS store /*Scope to a single record.*/ @@ -191,10 +191,10 @@ def _dequeuing_update_rmcs_last_saved_by( def _dequeuing_insert_remaining_buffer(self, cursor, transfersession_id): # insert remaining records into store insert_remaining_buffer = """REPLACE INTO {store} (id, serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile, partition, - source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, deserialization_error, last_transfer_session_id) + source_id, conflicting_serialized_data, dirty_bit, _self_ref_fk, _self_ref_order, deserialization_error, last_transfer_session_id) SELECT buffer.model_uuid, buffer.serialized, buffer.deleted, buffer.last_saved_instance, buffer.last_saved_counter, buffer.hard_deleted, buffer.model_name, buffer.profile, buffer.partition, buffer.source_id, buffer.conflicting_serialized_data, 1, - buffer._self_ref_fk, '', '{transfer_session_id}' + buffer._self_ref_fk, buffer._self_ref_order, '', '{transfer_session_id}' FROM {buffer} AS buffer WHERE buffer.transfer_session_id = '{transfer_session_id}' """.format( diff --git a/morango/sync/operations.py b/morango/sync/operations.py index 1924fc7..02898d6 100644 --- a/morango/sync/operations.py +++ b/morango/sync/operations.py @@ -8,7 +8,10 @@ from django.core import exceptions from django.db import connection from django.db.models import CharField +from django.db.models import Exists +from django.db.models import OuterRef from django.db.models import Q +from django.db.models import Subquery from django.db.models import signals from django.db.utils import OperationalError from django.utils import timezone @@ -19,6 +22,7 @@ from morango.constants import transfer_statuses from morango.constants.capabilities import ASYNC_OPERATIONS from morango.constants.capabilities import FSIC_V2_FORMAT +from morango.constants.capabilities import SELF_REF_ORDER from morango.errors import MorangoInvalidFSICPartition from morango.errors import MorangoLimitExceeded from morango.errors import MorangoResumeSyncError @@ -483,7 +487,7 @@ def _queue_into_buffer_v1(transfersession): """SELECT id, serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile, partition, source_id, conflicting_serialized_data, - CAST ('{transfer_session_id}' AS {transfer_session_id_type}), _self_ref_fk + CAST ('{transfer_session_id}' AS {transfer_session_id_type}), _self_ref_fk, _self_ref_order FROM {store} WHERE {condition} """.format( transfer_session_id=transfersession.id, @@ -514,7 +518,7 @@ def _queue_into_buffer_v1(transfersession): """INSERT INTO {outgoing_buffer} (model_uuid, serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile, partition, source_id, conflicting_serialized_data, - transfer_session_id, _self_ref_fk) + transfer_session_id, _self_ref_fk, _self_ref_order) {select} """.format( outgoing_buffer=Buffer._meta.db_table, @@ -632,7 +636,7 @@ def _queue_into_buffer_v2(transfersession, chunk_size=200): """SELECT id, serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile, partition, source_id, conflicting_serialized_data, - CAST ('{transfer_session_id}' AS {transfer_session_id_type}), _self_ref_fk + CAST ('{transfer_session_id}' AS {transfer_session_id_type}), _self_ref_fk, _self_ref_order FROM {store} WHERE {condition} """.format( transfer_session_id=transfersession.id, @@ -661,7 +665,7 @@ def _queue_into_buffer_v2(transfersession, chunk_size=200): """INSERT INTO {outgoing_buffer} (model_uuid, serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile, partition, source_id, conflicting_serialized_data, - transfer_session_id, _self_ref_fk) + transfer_session_id, _self_ref_fk, _self_ref_order) {select} """.format( outgoing_buffer=Buffer._meta.db_table, @@ -679,7 +683,41 @@ def _queue_into_buffer_v2(transfersession, chunk_size=200): ) -def _dequeue_into_store(transfer_session, fsic, v2_format=False): +def _update_legacy_self_ref_order_for_model(queryset): + # root nodes set the _self_ref_order to 0 + queryset.filter(_self_ref_fk="").exclude(_self_ref_order=0).update(_self_ref_order=0) + # reset the _self_ref_order to None for all records that have a parent + queryset.exclude(_self_ref_fk="").exclude(_self_ref_order=None).update( + _self_ref_order=None + ) + + parent = Store.objects.filter( + id=OuterRef("_self_ref_fk"), + _self_ref_order__isnull=False, + ) + parent_order = parent.values("_self_ref_order")[:1] + pending = queryset.exclude(_self_ref_fk="").filter(_self_ref_order=None) + + while pending.filter(Exists(parent)).update(_self_ref_order=Subquery(parent_order) + 1): + pass + + +def _update_legacy_self_ref_order(transfer_session): + profile = transfer_session.sync_session.profile + transferred_store_records = Store.objects.filter( + last_transfer_session_id=transfer_session.id, + profile=profile, + ) + + for Model in syncable_models.get_models(profile): + queryset = transferred_store_records.filter(model_name=Model.morango_model_name) + if self_referential_fk(Model): + _update_legacy_self_ref_order_for_model(queryset) + else: + queryset.exclude(_self_ref_order=None).update(_self_ref_order=None) + + +def _dequeue_into_store(transfer_session, fsic, v2_format=False, self_ref_order=True): """ Takes data from the buffers and merges into the store and record max counters. @@ -703,6 +741,8 @@ def _dequeue_into_store(transfer_session, fsic, v2_format=False): DBBackend._dequeuing_delete_mc_buffer(cursor, transfer_session.id) DBBackend._dequeuing_insert_remaining_buffer(cursor, transfer_session.id) DBBackend._dequeuing_insert_remaining_rmcb(cursor, transfer_session.id) + if not self_ref_order: + _update_legacy_self_ref_order(transfer_session) DBBackend._dequeuing_delete_remaining_rmcb(cursor, transfer_session.id) DBBackend._dequeuing_delete_remaining_buffer(cursor, transfer_session.id) @@ -1041,6 +1081,7 @@ def handle(self, context): context.transfer_session, fsic, v2_format=FSIC_V2_FORMAT in context.capabilities, + self_ref_order=SELF_REF_ORDER in context.capabilities, ) return transfer_statuses.COMPLETED diff --git a/morango/sync/stream/serialize.py b/morango/sync/stream/serialize.py index 935a831..cae02a0 100644 --- a/morango/sync/stream/serialize.py +++ b/morango/sync/stream/serialize.py @@ -186,6 +186,13 @@ def _handle_store_update(self, task: SerializeTask): # clear last_transfer_session_id task.store.last_transfer_session_id = None + self_ref_fk = task.self_referential_fk() + if self_ref_fk: + new_fk_value = getattr(task.obj, self_ref_fk) or "" + if new_fk_value != task.store._self_ref_fk: + task.store._self_ref_fk = new_fk_value + task.store._self_ref_order = self._compute_self_ref_order(new_fk_value) + def _handle_store_create(self, task: SerializeTask): kwargs = { "id": task.obj.id, @@ -200,11 +207,29 @@ def _handle_store_create(self, task: SerializeTask): self_ref_fk = task.self_referential_fk() if self_ref_fk: - self_ref_fk_value = getattr(task.obj, self_ref_fk) - kwargs["_self_ref_fk"] = self_ref_fk_value or "" + self_ref_fk_value = getattr(task.obj, self_ref_fk) or "" + kwargs["_self_ref_fk"] = self_ref_fk_value + kwargs["_self_ref_order"] = self._compute_self_ref_order(self_ref_fk_value) task.set_store(Store(**kwargs)) + @staticmethod + def _compute_self_ref_order(self_ref_fk_value): + """ + Compute ``_self_ref_order`` for a self-referential store record. + + Returns ``0`` when the record has no parent (root), otherwise queries + the parent ``Store`` row and returns the next order value. + """ + if not self_ref_fk_value: + return 0 + parent_order = ( + Store.objects.filter(id=self_ref_fk_value) + .values_list("_self_ref_order", flat=True) + .first() + ) + return parent_order + 1 if parent_order is not None else None + class ModelPartitionBuffer(Buffer[List[SerializeTask]]): """Buffers tasks into chunks that have the same model class.""" diff --git a/morango/utils.py b/morango/utils.py index 0143f08..9539c12 100644 --- a/morango/utils.py +++ b/morango/utils.py @@ -9,6 +9,7 @@ ASYNC_OPERATIONS, FSIC_V2_FORMAT, GZIP_BUFFER_POST, + SELF_REF_ORDER, ) @@ -61,6 +62,8 @@ def get_capabilities(): if not SETTINGS.MORANGO_DISALLOW_ASYNC_OPERATIONS: capabilities.add(ASYNC_OPERATIONS) + capabilities.add(SELF_REF_ORDER) + return capabilities diff --git a/tests/testapp/tests/integration/test_syncing_models.py b/tests/testapp/tests/integration/test_syncing_models.py index 7086fb6..6f8b4e3 100644 --- a/tests/testapp/tests/integration/test_syncing_models.py +++ b/tests/testapp/tests/integration/test_syncing_models.py @@ -1,11 +1,12 @@ import json from django.test import TestCase -from facility_profile.models import MyUser, TestModel +from facility_profile.models import Facility, MyUser, TestModel from morango.models.core import Store from morango.models.manager import SyncableModelManager from morango.models.query import SyncableModelQuerySet +from morango.registry import syncable_models from morango.sync.controller import MorangoProfileController @@ -140,3 +141,40 @@ def test_hidden_models_deletion_during_deserialization(self): # The store record should still exist but marked as deleted self.assertTrue(Store.objects.filter(id=hidden_obj.id, deleted=True).exists()) + + def test_get_model_querysets_applies_morango_ordering(self): + old_ordering = Facility.morango_ordering + Facility.morango_ordering = ("-name",) + try: + Facility.objects.create(name="a-facility") + Facility.objects.create(name="z-facility") + queryset = next( + qs + for qs in syncable_models.get_model_querysets(Facility.morango_profile) + if qs.model is Facility + ) + self.assertEqual( + list(queryset.values_list("name", flat=True)), + ["z-facility", "a-facility"], + ) + finally: + Facility.morango_ordering = old_ordering + + def test_get_model_querysets_uses_nulls_last_for_string_ordering(self): + old_ordering = Facility.morango_ordering + Facility.morango_ordering = ("parent_id", "name") + try: + root_b = Facility.objects.create(name="root-b", parent=None) + root_a = Facility.objects.create(name="root-a", parent=None) + child = Facility.objects.create(name="child", parent=root_a) + queryset = next( + qs + for qs in syncable_models.get_model_querysets(Facility.morango_profile) + if qs.model is Facility + ) + self.assertEqual( + list(queryset.values_list("id", flat=True)), + [child.id, root_a.id, root_b.id], + ) + finally: + Facility.morango_ordering = old_ordering diff --git a/tests/testapp/tests/models/test_core.py b/tests/testapp/tests/models/test_core.py index b025ad6..735c2cc 100644 --- a/tests/testapp/tests/models/test_core.py +++ b/tests/testapp/tests/models/test_core.py @@ -2,13 +2,14 @@ import factory import mock +from django.core.exceptions import ValidationError from django.test import TestCase, override_settings from django.utils import timezone from facility_profile.models import Facility, MyUser from morango.constants import transfer_stages, transfer_statuses from morango.models.certificates import Filter -from morango.models.core import DatabaseMaxCounter, Store, SyncSession, TransferSession +from morango.models.core import Buffer, DatabaseMaxCounter, Store, SyncSession, TransferSession from morango.sync.controller import MorangoProfileController from ..helpers import RecordMaxCounterFactory, StoreFactory @@ -370,3 +371,63 @@ def test_deferred_clean_fields(self, mock_clean_fields): sync_filter = Filter("test") f.deferred_clean_fields(exclude=["test1"], sync_filter=sync_filter) mock_clean_fields.assert_called_once_with(exclude=["test1"], sync_filter=sync_filter) + + +class AbstractStoreSelfRefOrderValidationTestCase(TestCase): + def _store(self, self_ref_order): + return Store( + id=uuid.uuid4().hex, + profile="facilitydata", + serialized="{}", + deleted=False, + hard_deleted=False, + last_saved_instance=uuid.uuid4().hex, + last_saved_counter=1, + partition="partition", + source_id="source", + model_name="model", + _self_ref_order=self_ref_order, + ) + + def _buffer(self, self_ref_order): + sync_session = SyncSession.objects.create( + id=uuid.uuid4().hex, + profile="facilitydata", + last_activity_timestamp=timezone.now(), + ) + transfer_session = TransferSession.objects.create( + id=uuid.uuid4().hex, + sync_session=sync_session, + push=True, + last_activity_timestamp=timezone.now(), + ) + return Buffer( + transfer_session=transfer_session, + model_uuid=uuid.uuid4().hex, + profile="facilitydata", + serialized="{}", + deleted=False, + hard_deleted=False, + last_saved_instance=uuid.uuid4().hex, + last_saved_counter=1, + partition="partition", + source_id="source", + model_name="model", + _self_ref_order=self_ref_order, + ) + + def test_store_self_ref_order_allows_null(self): + self._store(None).full_clean() + + def test_store_self_ref_order_rejects_negative(self): + store = self._store(-1) + with self.assertRaises(ValidationError): + store.full_clean() + + def test_buffer_self_ref_order_allows_null(self): + self._buffer(None).full_clean() + + def test_buffer_self_ref_order_rejects_negative(self): + buffer = self._buffer(-1) + with self.assertRaises(ValidationError): + buffer.full_clean() diff --git a/tests/testapp/tests/sync/stream/test_serialize.py b/tests/testapp/tests/sync/stream/test_serialize.py index 140714a..878bc17 100644 --- a/tests/testapp/tests/sync/stream/test_serialize.py +++ b/tests/testapp/tests/sync/stream/test_serialize.py @@ -1,8 +1,9 @@ import json +import uuid import mock from django.db.models import Q -from django.test import SimpleTestCase +from django.test import SimpleTestCase, TestCase from morango.models.certificates import Filter from morango.models.core import InstanceIDModel, RecordMaxCounter, Store, SyncableModel @@ -143,6 +144,18 @@ def test_transform(self, mock_bulk, mock_qs): class StoreUpdateTestCase(SimpleTestCase): + def _build_sync_obj(self, **overrides): + obj = mock.Mock() + obj.id = uuid.uuid4().hex + obj.serialize.return_value = {} + obj.morango_model_name = "mymodel" + obj.morango_profile = "profile" + obj._morango_partition = "partition" + obj._morango_source_id = "src" + for key, value in overrides.items(): + setattr(obj, key, value) + return obj + @mock.patch("morango.sync.stream.serialize.StoreUpdate._handle_store_create") def test_transform__creates(self, mock_handle_store_create): current_id = mock.Mock(id="inst_1", counter=10) @@ -176,7 +189,8 @@ def test_transform__updates(self, mock_handle_store_update): mock_handle_store_update.assert_called_once_with(task) self.assertEqual(task.counter.counter, 10) - def test_handle_store_update(self): + @mock.patch("morango.sync.stream.serialize.self_referential_fk", return_value=None) + def test_handle_store_update(self, _mock_srf): current_id = mock.Mock(id="inst_1", counter=10) update = StoreUpdate(current_id) @@ -193,6 +207,126 @@ def test_handle_store_update(self): self.assertEqual(ser_data["old"], 1) self.assertEqual(ser_data["new"], 2) + @mock.patch("morango.sync.stream.serialize.self_referential_fk", return_value=None) + def test_handle_store_create__non_self_ref(self, _mock_srf): + current_id = mock.Mock(id="inst_1", counter=1) + update = StoreUpdate(current_id) + + obj = self._build_sync_obj() + + task = SerializeTask(mock.Mock(), obj) + update._handle_store_create(task) + + self.assertIsNone(task.store._self_ref_order) + + @mock.patch("morango.sync.stream.serialize.self_referential_fk", return_value="parent_id") + def test_handle_store_create__self_ref_no_parent(self, _mock_srf): + current_id = mock.Mock(id="inst_1", counter=1) + update = StoreUpdate(current_id) + + obj = self._build_sync_obj(parent_id=None) # no parent + + task = SerializeTask(mock.Mock(), obj) + update._handle_store_create(task) + + self.assertEqual(task.store._self_ref_fk, "") + self.assertEqual(task.store._self_ref_order, 0) + + @mock.patch("morango.sync.stream.serialize.self_referential_fk", return_value="parent_id") + def test_handle_store_update__self_ref_fk_unchanged(self, _mock_srf): + current_id = mock.Mock(id="inst_1", counter=2) + update = StoreUpdate(current_id) + + parent_id = uuid.uuid4().hex + store = Store( + serialized=json.dumps({}), + dirty_bit=False, + _self_ref_fk=parent_id, + _self_ref_order=5, + ) + obj = self._build_sync_obj(parent_id=parent_id) # same FK — no change + + task = SerializeTask(mock.Mock(), obj) + task.set_store(store) + update._handle_store_update(task) + + self.assertEqual(task.store._self_ref_fk, parent_id) + self.assertEqual(task.store._self_ref_order, 5) + + +def _make_store(**kwargs): + defaults = dict( + id=uuid.uuid4().hex, + profile="facilitydata", + serialized="{}", + last_saved_instance=uuid.uuid4().hex, + last_saved_counter=1, + partition="partition", + source_id=uuid.uuid4().hex, + model_name="facility", + ) + defaults.update(kwargs) + return Store.objects.create(**defaults) + + +class StoreUpdateSelfRefOrderDbTestCase(TestCase): + def setUp(self): + self.current_id = mock.Mock(id="inst_1", counter=1) + self.update = StoreUpdate(self.current_id) + + def _task(self, self_ref_fk_field, fk_value): + obj = mock.Mock() + obj.id = uuid.uuid4().hex + obj.serialize.return_value = {} + obj.morango_model_name = "facility" + obj.morango_profile = "facilitydata" + obj._morango_partition = "partition" + obj._morango_source_id = "src" + setattr(obj, self_ref_fk_field, fk_value) + return SerializeTask(mock.Mock(), obj) + + @mock.patch("morango.sync.stream.serialize.self_referential_fk", return_value="parent_id") + def test_handle_store_create__self_ref_with_parent(self, _mock_srf): + parent_store = _make_store(_self_ref_order=3) + + task = self._task("parent_id", parent_store.id) + self.update._handle_store_create(task) + + self.assertEqual(task.store._self_ref_fk, parent_store.id) + self.assertEqual(task.store._self_ref_order, 4) + + @mock.patch("morango.sync.stream.serialize.self_referential_fk", return_value="parent_id") + def test_handle_store_create__self_ref_parent_not_in_store(self, _mock_srf): + missing_parent_id = uuid.uuid4().hex + + task = self._task("parent_id", missing_parent_id) + self.update._handle_store_create(task) + + self.assertEqual(task.store._self_ref_fk, missing_parent_id) + self.assertIsNone(task.store._self_ref_order) + + @mock.patch("morango.sync.stream.serialize.self_referential_fk", return_value="parent_id") + def test_handle_store_update__self_ref_fk_changed(self, _mock_srf): + old_parent_store = _make_store(_self_ref_order=0) + new_parent_store = _make_store(_self_ref_order=7) + + store = Store( + serialized=json.dumps({}), + dirty_bit=False, + _self_ref_fk=old_parent_store.id, + _self_ref_order=0, + ) + obj = mock.Mock() + obj.serialize.return_value = {} + obj.parent_id = new_parent_store.id # FK changed + + task = SerializeTask(mock.Mock(), obj) + task.set_store(store) + self.update._handle_store_update(task) + + self.assertEqual(task.store._self_ref_fk, new_parent_store.id) + self.assertEqual(task.store._self_ref_order, 8) + class ModelPartitionBufferTestCase(SimpleTestCase): def test_buffer_splits_on_model_change(self): diff --git a/tests/testapp/tests/sync/test_operations.py b/tests/testapp/tests/sync/test_operations.py index 4c86c98..b8b7444 100644 --- a/tests/testapp/tests/sync/test_operations.py +++ b/tests/testapp/tests/sync/test_operations.py @@ -8,7 +8,7 @@ from facility_profile.models import ConditionalLog, Facility, MyUser, SummaryLog from morango.constants import transfer_statuses -from morango.constants.capabilities import FSIC_V2_FORMAT +from morango.constants.capabilities import FSIC_V2_FORMAT, SELF_REF_ORDER from morango.errors import MorangoLimitExceeded from morango.models.certificates import Filter from morango.models.core import ( @@ -37,6 +37,7 @@ _deserialize_from_store, _queue_into_buffer_v1, _queue_into_buffer_v2, + _update_legacy_self_ref_order, ) from morango.sync.syncsession import TransferClient @@ -88,6 +89,14 @@ def test_all_fsics(self): assertRecordsBuffered(self.data["group1_c2"]) assertRecordsBuffered(self.data["group2_c1"]) + def test_self_ref_order_propagates_to_buffer(self): + Store.objects.update(_self_ref_order=42) + fsics = {self.data["group1_id"].id: 1, self.data["group2_id"].id: 1} + self.transfer_session.client_fsic = json.dumps(fsics) + _queue_into_buffer_v1(self.transfer_session) + self.assertTrue(Buffer.objects.exists()) + self.assertFalse(Buffer.objects.exclude(_self_ref_order=42).exists()) + def test_very_many_fsics(self): """ Regression test against 'Expression tree is too large (maximum depth 1000)' error with many fsics @@ -278,6 +287,18 @@ def setUp(self): capabilities=[FSIC_V2_FORMAT], ) + def test_self_ref_order_propagates_to_buffer(self): + Store.objects.update(_self_ref_order=42) + fsics = { + "super": {}, + "sub": {"": {self.data["group1_id"].id: 1, self.data["group2_id"].id: 1}}, + } + self.transfer_session.client_fsic = json.dumps(fsics) + self.transfer_session.server_fsic = json.dumps({"super": {}, "sub": {}}) + _queue_into_buffer_v2(self.transfer_session) + self.assertTrue(Buffer.objects.exists()) + self.assertFalse(Buffer.objects.exclude(_self_ref_order=42).exists()) + # @pytest.mark.skip("Takes 30+ seconds, manual run only") def test_very_many_instances_in_fsic(self): """ @@ -634,7 +655,7 @@ def setUp(self): conn.server_info = dict(capabilities=[]) self.data["mc"] = MorangoProfileController("facilitydata") session = SyncSession.objects.create( - id=uuid.uuid4().hex, profile="", last_activity_timestamp=timezone.now() + id=uuid.uuid4().hex, profile="facilitydata", last_activity_timestamp=timezone.now() ) self.transfer_session = TransferSession.objects.create( id=uuid.uuid4().hex, @@ -664,6 +685,21 @@ def assert_store_records_not_tagged_with_last_session(self, store_ids): except Store.DoesNotExist: pass + def _make_transferred_store(self, **kwargs): + defaults = { + "id": uuid.uuid4().hex, + "serialized": "{}", + "last_saved_instance": self.current_id.id, + "last_saved_counter": 1, + "model_name": "facility", + "profile": "facilitydata", + "partition": uuid.uuid4().hex, + "source_id": uuid.uuid4().hex, + "last_transfer_session_id": self.transfer_session.id, + } + defaults.update(kwargs) + return Store.objects.create(**defaults) + def test_dequeuing_sets_last_session(self): store_ids = [self.data[key] for key in ["model2", "model3", "model4", "model5", "model7"]] self.assert_store_records_not_tagged_with_last_session(store_ids) @@ -788,6 +824,14 @@ def test_dequeuing_merge_conflict_hard_delete(self): self.assertEqual(store.serialized, "") self.assertEqual(store.conflicting_serialized_data, "") + def test_dequeuing_merge_conflict_buffer__self_ref_order_preserved(self): + Store.objects.filter(id=self.data["model2"]).update(_self_ref_order=11) + Buffer.objects.filter(model_uuid=self.data["model2"]).update(_self_ref_order=99) + with connection.cursor() as cursor: + current_id = InstanceIDModel.get_current_instance_and_increment_counter() + DBBackend._dequeuing_merge_conflict_buffer(cursor, current_id, self.transfer_session.id) + self.assertEqual(Store.objects.get(id=self.data["model2"])._self_ref_order, 11) + def test_dequeuing_update_rmcs_last_saved_by(self): self.assertFalse(RecordMaxCounter.objects.filter(instance_id=self.current_id.id).exists()) with connection.cursor() as cursor: @@ -842,6 +886,13 @@ def test_dequeuing_insert_remaining_buffer(self): self.assertEqual(Store.objects.get(id=self.data["model3"]).serialized, "buffer") self.assertTrue(Store.objects.filter(id=self.data["model4"]).exists()) + def test_dequeuing_insert_remaining_buffer__self_ref_order_propagates(self): + Buffer.objects.filter(model_uuid=self.data["model4"]).update(_self_ref_order=7) + self.assertFalse(Store.objects.filter(id=self.data["model4"]).exists()) + with connection.cursor() as cursor: + DBBackend._dequeuing_insert_remaining_buffer(cursor, self.transfer_session.id) + self.assertEqual(Store.objects.get(id=self.data["model4"])._self_ref_order, 7) + def test_dequeuing_insert_remaining_rmcb(self): for i in self.data["model4_rmcb_ids"]: self.assertFalse( @@ -956,6 +1007,55 @@ def test_dequeue_into_store(self): ).exists() ) + def test_dequeue_into_store__self_ref_order_fallback_for_missing_capability(self): + Buffer.objects.filter(model_uuid=self.data["model3"]).update( + _self_ref_fk="", _self_ref_order=None + ) + Buffer.objects.filter(model_uuid=self.data["model4"]).update( + _self_ref_fk=self.data["model3"], _self_ref_order=None + ) + + _dequeue_into_store( + self.transfer_session, + self.transfer_session.client_fsic, + v2_format=False, + self_ref_order=False, + ) + + self.assertEqual(Store.objects.get(id=self.data["model3"])._self_ref_order, 0) + self.assertEqual(Store.objects.get(id=self.data["model4"])._self_ref_order, 1) + + def test_update_legacy_self_ref_order_nulls_non_self_ref_models(self): + store = self._make_transferred_store( + model_name=SummaryLog.morango_model_name, + _self_ref_order=3, + ) + + _update_legacy_self_ref_order(self.transfer_session) + + store.refresh_from_db() + self.assertIsNone(store._self_ref_order) + + def test_update_legacy_self_ref_order_handles_deeper_self_ref_chains(self): + root = self._make_transferred_store(_self_ref_fk="", _self_ref_order=None) + child = self._make_transferred_store( + _self_ref_fk=root.id, + _self_ref_order=None, + ) + grandchild = self._make_transferred_store( + _self_ref_fk=child.id, + _self_ref_order=None, + ) + + _update_legacy_self_ref_order(self.transfer_session) + + root.refresh_from_db() + child.refresh_from_db() + grandchild.refresh_from_db() + self.assertEqual(root._self_ref_order, 0) + self.assertEqual(child._self_ref_order, 1) + self.assertEqual(grandchild._self_ref_order, 2) + def test_local_dequeue_operation(self): self.transfer_session.records_transferred = 1 self.context.filter = [self.transfer_session.filter] @@ -965,6 +1065,19 @@ def test_local_dequeue_operation(self): Buffer.objects.filter(transfer_session_id=self.transfer_session.id).exists() ) + @mock.patch("morango.sync.operations._dequeue_into_store") + def test_local_dequeue_operation__passes_self_ref_order_capability(self, mock_dequeue): + self.transfer_session.records_transferred = 1 + self.context.capabilities = {SELF_REF_ORDER} + operation = ReceiverDequeueOperation() + self.assertEqual(transfer_statuses.COMPLETED, operation.handle(self.context)) + mock_dequeue.assert_called_once_with( + self.transfer_session, + self.transfer_session.client_fsic, + v2_format=False, + self_ref_order=True, + ) + @mock.patch("morango.sync.operations._dequeue_into_store") def test_local_dequeue_operation__noop(self, mock_dequeue): self.context.is_server = False diff --git a/tests/testapp/tests/test_api.py b/tests/testapp/tests/test_api.py index b2c8a30..65adb3a 100644 --- a/tests/testapp/tests/test_api.py +++ b/tests/testapp/tests/test_api.py @@ -986,6 +986,12 @@ def test_buffer_serializer_makes_no_transfer_session_query(self): for q in ctx.captured_queries: self.assertFalse("morango_transfersession" in q["sql"]) + def test_buffer_serializer_includes_self_ref_order(self): + transfer_session_id = self.create_records_for_pulling() + Buffer.objects.filter(transfer_session_id=transfer_session_id).update(_self_ref_order=4) + buffer = Buffer.objects.filter(transfer_session_id=transfer_session_id).first() + self.assertEqual(BufferSerializer(instance=buffer).data["_self_ref_order"], 4) + def test_pull_valid_buffer_list(self): transfer_session_id = self.create_records_for_pulling() diff --git a/tests/testapp/tests/test_utils.py b/tests/testapp/tests/test_utils.py index 782c5b6..54ed96f 100644 --- a/tests/testapp/tests/test_utils.py +++ b/tests/testapp/tests/test_utils.py @@ -12,6 +12,7 @@ ALLOW_CERTIFICATE_PUSHING, ASYNC_OPERATIONS, FSIC_V2_FORMAT, + SELF_REF_ORDER, ) from morango.utils import ( CAPABILITIES_CLIENT_HEADER, @@ -73,6 +74,9 @@ def test_get_capabilities__fsic_v2_format(self): with self.settings(MORANGO_DISABLE_FSIC_V2_FORMAT=True): self.assertNotIn(FSIC_V2_FORMAT, get_capabilities()) + def test_get_capabilities__self_ref_order(self): + self.assertIn(SELF_REF_ORDER, get_capabilities()) + @mock.patch("morango.utils.CAPABILITIES", ("TEST", "SERIALIZE")) def test_serialize(self): req = Request()