Data Flow & Caching
Data Flow & Caching 🛠️
Section titled “Data Flow & Caching 🛠️”This section illustrates how the Convertor architecture is applied to common data operations and details the strategy for managing the Ferry cache.
Data Flow Implementation Examples (GraphQL)
Section titled “Data Flow Implementation Examples (GraphQL)”1. Single Item Request
Section titled “1. Single Item Request”The pipeline for fetching a single item (e.g., NotificationModel):
- Request Parameter Convertor: Transforms custom input parameters (e.g.,
SingleRequestParams) into the necessary GraphQL request object (e.g.,GQueryNotificationReq). - Datasource: The
GraphQLRequestExecutoris executed, chained withGraphQLStreamConvertor, and then mapped to extract the single item from the response data.
mixin NotificationsDatasource { static final itemQueryConvertorProvider = Provider.autoDispose((ref) { return Convertor< GQueryNotificationReq, SingleRequestParams<GnotificationFilters> >((params) { return GQueryNotificationReq((builder) { builder ..requestId = 'notification_item_${params.toString()}' ..vars.G_filter = params.filter?.toBuilder() ..vars.G_take = 1; }); }); });
static final itemProvider = Provider.autoDispose((ref) { return GraphQLRequestExecutor< GQueryNotificationData, SingleRequestParams<GnotificationFilters>, GQueryNotificationVars >( gqlClient: GqlClientBuilder.getInstance(), convertor: ref.watch(itemQueryConvertorProvider), ) .then(GraphQLStreamConvertor()) .map((data) { return data.notification!.notificationItems!.firstOrNull; }); });}- Repository: The repository consumes the Datasource and uses the
.thenMap(modelConvertor)extension to apply amodelConvertor(e.g.,Convertor<NotificationModel, GQueryNotificationReq>) to transform the raw GraphQL data into the final domain model.
class NewNotificationsRepository { static final provider = Provider.autoDispose((ref) { return NewNotificationsRepository( itemDatasource: ref.watch(NotificationsDatasource.itemProvider), modelConvertor: ref.watch(NotificationsDomain.modelConvertor), ); });
const NewNotificationsRepository({ required this.itemDatasource, required this.modelConvertor, });
final StreamConvertor< GQueryNotificationData_notification_notificationItems?, SingleRequestParams<GnotificationFilters> > itemDatasource;
final Convertor< NotificationModel, GQueryNotificationData_notification_notificationItems > modelConvertor;
Stream<NotificationModel> queryItem({NotificationFilterCriteria? filter}) { final params = SingleRequestParams<GnotificationFilters>( filter: filterConvertor.mightExecute(filter), );
return itemDatasource.map((item) { if (item == null) throw Exception('Notification item not found'); return item; }).thenMap(modelConvertor) .transform<NotificationModel>(StreamTransformer.fromHandlers( handleError: (e, s, sink) { // Handle error sink.addError(e, s); })).decorate((convertor, input) { try { return convertor.execute(input); } catch (e, st) { return Stream.error(e, st); } }).execute(params); }}
mixin NotificationsDomain { static final modelConvertor = Provider.autoDispose((ref) { return Convertor< NotificationModel, GQueryNotificationData_notification_notificationItems >((notification) { return NotificationModel( id: notification.id!, date: notification.date?.toDto, title: notification.title, attachments: notification.images ?.map((i) => AttachmentModel(id: i?.id, mediaUrl: i?.mediaUrl)) .toList() ?? [], ); }); });}- Controller: Executes the repository’s method to stream the result into a
StreamGroup.
class NotificationNotifier extends AutoDisposeFamilyStreamNotifier<NotificationModel, String> { static final provider = StreamNotifierProvider.autoDispose .family<NotificationNotifier, NotificationModel, String>( NotificationNotifier.new, );
late StreamGroup<NotificationModel> _sourceSubject;
late final NotificationFilterCriteria _filterCriteria = NotificationFilterCriteria( systemParentId: AppConfig.instance.conferenceId, id: arg, );
@override Stream<NotificationModel> build(String id) async* { _sourceSubject = StreamGroup(); ref.onDispose(_sourceSubject.close); _fetchNotification(); yield* _sourceSubject.stream; }
Future<void> retryFetch() async { _fetchNotification(); }
void _fetchNotification() { final repository = ref.watch(NewNotificationsRepository.provider); _sourceSubject.add(repository.queryItem(filter: _filterCriteria)); }
void refetch() { _emitLoading(); ref.invalidateSelf(); }
void _emitLoading() { state = const AsyncLoading(); }}2. Paginated List Request
Section titled “2. Paginated List Request”The flow is similar to the single item request, but utilizes list-specific extensions:
- Datasource: The
GraphQLRequestExecutoris executed, chained withGraphQLStreamConvertor, and then mapped to extract the single item from the response data.
mixin NotificationsDatasource { static final queryConvertorProvider = Provider.autoDispose((ref) { return Convertor< GQueryNotificationReq, ListRequestParams<GnotificationFilters, GnotificationOrder> >((params) { return GQueryNotificationReq((builder) { builder ..requestId = 'notification_list_${params.toString()}' ..vars.G_filter = params.filter?.toBuilder() ..vars.G_sort = params.sort?.toBuilder() ..vars.G_skip = params.skip ..vars.G_take = params.take; }); }); });
static final listProvider = Provider.autoDispose((ref) { return GraphQLRequestExecutor< GQueryNotificationData, ListRequestParams<GnotificationFilters, GnotificationOrder>, GQueryNotificationVars >( gqlClient: GqlClientBuilder.getInstance(), convertor: ref.watch(queryConvertorProvider), ) .then(GraphQLStreamConvertor()) .map((data) { return data.notification!.notificationItems!.nonNulls.toList(); }); });}- Repository: The repository consumes the list-fetching Datasource. It then uses
.thenEach(modelConvertor)to efficiently apply the model conversion to every item in the streamed list, resulting in aStream<List<NotificationModel>>.
class NewNotificationsRepository { static final provider = Provider.autoDispose((ref) { return NewNotificationsRepository( listDatasource: ref.watch(NotificationsDatasource.listProvider), modelConvertor: ref.watch(NotificationsDomain.modelConvertor), filterConvertor: ref.watch(NotificationsDomain.filterConvertor), sortConvertor: ref.watch(NotificationsDomain.sortConvertor), ); });
const NewNotificationsRepository({ required this.listDatasource, required this.modelConvertor, required this.filterConvertor, required this.sortConvertor, });
final StreamConvertor< List<GQueryNotificationData_notification_notificationItems>, ListRequestParams<GnotificationFilters, GnotificationOrder> > listDatasource;
final Convertor< NotificationModel, GQueryNotificationData_notification_notificationItems > modelConvertor;
final Convertor<GnotificationFilters, NotificationFilterCriteria> filterConvertor;
final Convertor<GnotificationOrder, NotificationSortCriteria> sortConvertor;
Stream<List<NotificationModel>> queryList({ int? skip, int? take, NotificationFilterCriteria? filter, NotificationSortCriteria? sort, }) { final params = ListRequestParams<GnotificationFilters, GnotificationOrder>( skip: skip, take: take, filter: filterConvertor.mightExecute(filter), sort: sortConvertor.mightExecute(sort), );
return listDatasource.thenEach(modelConvertor).execute(params); }}
mixin NotificationsDomain { static final modelConvertor = Provider.autoDispose((ref) { return Convertor< NotificationModel, GQueryNotificationData_notification_notificationItems >((notification) { return NotificationModel( id: notification.id!, date: notification.date?.toDto, title: notification.title, attachments: notification.images ?.map((i) => AttachmentModel(id: i?.id, mediaUrl: i?.mediaUrl)) .toList() ?? [], ); }); });
static final filterConvertor = Provider.autoDispose((ref) { return Convertor<GnotificationFilters, NotificationFilterCriteria>(( filter, ) { return GnotificationFilters( (b) => b ..id = filter.id ..systemParentId = filter.systemParentId ..feature = filter.feature ..title = GStringFiltersBuilder().construct(contains: filter.title), ); }); });
static final sortConvertor = Provider.autoDispose((ref) { return Convertor<GnotificationOrder, NotificationSortCriteria>((sort) { return GnotificationOrder( (b) => b ..id = sort.id?.value ..date = sort.date?.value, ); }); });}- Controller: Manages pagination state (
skip/take) and uses the repository’squeryListmethod to fetch a chunk of data, which is merged into aStreamGroup.
class NotificationsListNotifier extends AutoDisposeStreamNotifier<PaginationState<NotificationModel>> { static final int _countAtATime = 15;
static final provider = StreamNotifierProvider.autoDispose< NotificationsListNotifier, PaginationState<NotificationModel> >(NotificationsListNotifier.new);
late StreamGroup<(int, List<NotificationModel>)> _streamGroup; late PaginationState<NotificationModel> _paginationState = PaginationState( skip: 0, take: _countAtATime, ); int _nextPage = 0;
NotificationFilterCriteria _filterCriteria = NotificationFilterCriteria( systemParentId: AppConfig.instance.conferenceId, ); NotificationSortCriteria _sortCriteria = const NotificationSortCriteria( date: SortOptions.descending, );
@override Stream<PaginationState<NotificationModel>> build() async* { _paginationState = PaginationState( skip: 0, take: _countAtATime, ); _nextPage = 0; _streamGroup = StreamGroup(); ref.onDispose(_streamGroup.close); _fetchNotifications(); PaginatedList<NotificationModel> items = PaginatedList(pages: {}); yield* _streamGroup.stream.transform( StreamTransformer.fromHandlers( handleData: (data, sink) { items = items.clone()..updatePage(data.$1, data.$2); var paginationState = _paginationState; final paginatedData = PaginatedData( items: items, currentCount: items.length, totalCount: items.length, ); paginationState = paginationState.copyWith( data: paginatedData, loadMoreError: data.$1 == _nextPage ? null : paginationState.loadMoreError, isLoadingMore: data.$1 == _nextPage ? false : paginationState.isLoadingMore, ); _paginationState = paginationState; sink.add(paginationState); }, handleError: (error, st, sink) { if (items.isEmpty) { throw error; } var paginationState = _paginationState; paginationState = paginationState.copyWith( isLoadingMore: false, loadMoreError: error, ); _paginationState = paginationState; sink.add(paginationState); }, ), ); }
Future<void> loadMore() async { if (_nextPage != _paginationState.data?.items?.lastPageIndex) return; final index = _nextPage++; _paginationState = _paginationState.copyWith( skip: index * _countAtATime, take: _countAtATime, isLoadingMore: true, ); state = AsyncData(_paginationState); _fetchNotifications(); }
Future<void> retryFetch() async { _fetchNotifications(); }
void _fetchNotifications() { final repository = ref.watch(NewNotificationsRepository.provider); final index = _nextPage; _streamGroup.add( repository .queryList( skip: index * _countAtATime, take: _countAtATime, filter: _filterCriteria, sort: _sortCriteria, ) .map((list) => (index, list)), ); }
void sort({SortOptions? date}) { _sortCriteria = _sortCriteria.copyWith(date: date); _emitLoading(); ref.invalidateSelf(); }
void filter({bool? feature, String? title}) { _filterCriteria = _filterCriteria.copyWith(feature: feature, title: title); _emitLoading(); ref.invalidateSelf(); }
void search([String? title]) { filter(title: title); }
void _emitLoading() { state = const AsyncLoading(); }}3. Subscription and Mutation
Section titled “3. Subscription and Mutation”Subscriptions and Mutations utilize the StreamConvertor pattern to handle continuous updates or stream responses.
- Subscription: The Datasource is executed, and the resulting stream is mapped to the necessary type (e.g., mapped to include
ChatContext) and then converted to the domain model using.thenMap(modelConvertor). - Mutation: The Datasource is executed with a tuple input
(Params, bool)to optionally enable optimistic responses. The resulting stream data is then converted to the domain model using.thenMap(mutationModelConvertor).
static final upsertConvertorProvider = Provider.autoDispose((ref) { return Convertor< GMutateChatSaveReq, (UpsertRequestParams<GMutateChatSaveVars>, bool) >((params) { return GMutateChatSaveReq((builder) { builder ..requestId = 'chat_upsert_${params.$1.toString()}' ..vars = params.$1.vars.toBuilder(); if (params.$2) { // To enable optimisticResponse builder ..optimisticResponse.chatSave.id = '__optimistic__' ..optimisticResponse.chatSave.isBlockedByUser1 = params.$1.vars.chatArg?.isBlockedByUser1 ..optimisticResponse.chatSave.isBlockedByUser2 = params.$1.vars.chatArg?.isBlockedByUser2; } }); }); });Caching Strategy with Ferry
Section titled “Caching Strategy with Ferry”The architecture relies on Ferry’s built-in caching mechanism for GraphQL.
Key Caching Requirements
Section titled “Key Caching Requirements”- Identifiers: Data items must have an identifier (
idor_id) and__typenamefor Ferry to create separate cache nodes (e.g.,'$typename:$id'). - Data Types: Model serialization must result in JSON compatible types (Collections, Primitives, Strings) for successful caching. Non-compatible types will cause a “Linking error.”
How to Evict from Cache
Section titled “How to Evict from Cache”Since Ferry does not have an easy public method for targeted eviction, the architecture provides a manual mechanism:
- Identify the
operationDefinitionand resolve therootTypeName(Query/Mutation/Subscription). - Find the specific GraphQL field node (the operation name, e.g.,
attendee). - Call
cacheProxy.evict()with the root identity, field name, and request variables.
- To clear all requests of the same type: Pass an empty map (
const <String, dynamic>{}) for theargsparameter incacheProxy.evict().
final operation = mappedRequest.operation;final operationDefinition = getOperationDefinition( operation.document, operation.operationName,);final rootTypeName = resolveRootTypename( operationDefinition, cache.typePolicies,);var identity = cacheProxy.identify(_ToJson({'__typename': rootTypeName})); // a class that offers ‘toJson’ function that returns the same map.if (identity != null) { final node = operationDefinition.selectionSet.selections .whereType<FieldNode>() .where((node) { return node.name.value != '__typename'; }) .firstOrNull; if (node != null) { cacheProxy.evict( identity, fieldName: node.name.value, args: specs.clearAllRequests ? const <String, dynamic>{} : request.varsToJson(), ); }}Custom Cache Typed Link
Section titled “Custom Cache Typed Link”To simplify and automate complex cache operations, a Custom Cache Typed Link (UpdateCacheTypedLink) is used.
class CacheHandlerSpecs< ResponseData, RequestVars, MappedRequestData, MappedRequestVars> { final String? cacheHandlerKey;
final Convertor< OperationRequest<MappedRequestData, MappedRequestVars>?, OperationRequest<ResponseData, RequestVars> > mapToCachedRequest; final Convertor<MappedRequestData?, ResponseData>? mapResponse; final Convertor< MappedRequestData?, (MappedRequestData oldData, MappedRequestData newData) >? mergeCachedData;
final bool clearRequest; final bool clearAllRequests;
const CacheHandlerSpecs.clear({ required this.mapToCachedRequest,
this.cacheHandlerKey, }) : this.clearRequest = true, this.clearAllRequests = false, this.mapResponse = null, this.mergeCachedData = null;
const CacheHandlerSpecs.clearAll({ required this.mapToCachedRequest, this.cacheHandlerKey, }) : this.clearRequest = false, this.clearAllRequests = true, this.mapResponse = null, this.mergeCachedData = null;
const CacheHandlerSpecs.merge({ required this.mapToCachedRequest, required Convertor<MappedRequestData?, ResponseData> this.mapResponse, required Convertor< MappedRequestData?, (MappedRequestData oldData, MappedRequestData newData) > this.mergeCachedData, this.cacheHandlerKey, }) : this.clearRequest = false, this.clearAllRequests = false;}
class _ToJson { final Map<String, dynamic> json;
const _ToJson(this.json);
Map<String, dynamic> toJson() => json;}It intercepts responses and uses CacheHandlerSpecs attached to the request to define how the cache should be managed:
clearRequest/clearAllRequests: Flags used to trigger the internal eviction logic (as described above).mapResponse/mergeCachedData: Used to merge new data (e.g., from a mutation response) into an existing cached query, ensuring that list and detail views update automatically.
class UpdateCacheTypedLink extends TypedLink { final Cache cache; final SetMultimap<Type, CacheHandlerSpecs> cacheHandlersSpecs;
const UpdateCacheTypedLink({ required this.cache, required this.cacheHandlersSpecs, });
@override Stream<OperationResponse<TData, TVars>> request<TData, TVars>( OperationRequest<TData, TVars> req, [ forward, ]) => forward!(req).doOnData(_updateCache);
void _updateCache<TData, TVars>(OperationResponse<TData, TVars> response) { if (response.data == null) return;
final request = response.operationRequest; final key = request.updateCacheHandlerKey; final handlers = cacheHandlersSpecs[request.runtimeType];
Iterable<CacheHandlerSpecs> specs = handlers.where( (handler) => handler.cacheHandlerKey == key, ); if (key != null && specs.isEmpty) { throw Exception('No specs defined for key $key'); }
final proxy = switch (response.dataSource) { DataSource.Optimistic => CacheProxy(cache, request), DataSource.Link => CacheProxy(cache), _ => null, }; if (proxy == null) return; for (final spec in specs) { _executeUpdate(spec, request, response.data!, proxy); } }
void _executeUpdate<TData, TVars, MData, MVars>( CacheHandlerSpecs<TData, TVars, MData, MVars> specs, OperationRequest<TData, TVars> request, TData response, CacheProxy cacheProxy, ) { final mappedRequest = specs.mapToCachedRequest.execute(request); if (mappedRequest == null) return;
if (specs.clearRequest || specs.clearAllRequests) { final operation = mappedRequest.operation; final operationDefinition = getOperationDefinition( operation.document, operation.operationName, ); final rootTypeName = resolveRootTypename( operationDefinition, cache.typePolicies, ); var identity = cacheProxy.identify(_ToJson({'__typename': rootTypeName})); if (identity != null) { final node = operationDefinition.selectionSet.selections .whereType<FieldNode>() .where((node) { return node.name.value != '__typename'; }) .firstOrNull; if (node != null) { cacheProxy.evict( identity, fieldName: node.name.value, args: specs.clearAllRequests ? const <String, dynamic>{} : request.varsToJson(), ); } } return; }
final cachedRequestData = cacheProxy.readQuery(mappedRequest); if (cachedRequestData == null) return;
final mappedResponse = specs.mapResponse?.execute(response); if (mappedResponse == null) return;
final mappedCachedRequestData = specs.mergeCachedData?.execute(( cachedRequestData, mappedResponse, )); if (mappedCachedRequestData == null) return;
cacheProxy.writeQuery(mappedRequest, mappedCachedRequestData); }}