Data Flow & Caching
Data Flow & Caching
Section titled “Data Flow & Caching”This section illustrates how the Convertor pattern is applied to common data operations and details the strategy for managing the Ferry cache.
Data Flow Implementation Examples (GraphQL)
Section titled “Data Flow Implementation Examples (GraphQL)”1. Single Item Request
Section titled “1. Single Item Request”The pipeline for fetching a single item (e.g., NotificationModel):
- Request Parameter Convertor: Transforms custom input parameters into the GraphQL request object.
- Datasource: The
GraphQLRequestExecutoris chained withGraphQLStreamConvertor, then mapped to extract the item.
@riverpodConvertor<GQueryNotificationReq, SingleRequestParams<GnotificationFilters>>notificationItemQueryConvertor(Ref ref) { return Convertor((params) { return GQueryNotificationReq((builder) { builder ..requestId = 'notification_item_${params.toString()}' ..vars.G_filter = params.filter?.toBuilder() ..vars.G_take = 1; }); });}
@riverpodStreamConvertor<GQueryNotificationData_notification_notificationItems?, SingleRequestParams<GnotificationFilters>>notificationItemDatasource(Ref ref) { return GraphQLRequestExecutor<GQueryNotificationData, SingleRequestParams<GnotificationFilters>, GQueryNotificationVars>( gqlClient: ref.watch(gqlClientProvider), convertor: ref.watch(notificationItemQueryConvertorProvider), ) .then(GraphQLStreamConvertor()) .map((data) => data.notification!.notificationItems!.firstOrNull);}- Repository: Consumes the datasource and applies
.thenMap(modelConvertor)to transform raw GQL data into the domain model.
class NotificationsRepository { @riverpod static NotificationsRepository notificationsRepository(Ref ref) { return NotificationsRepository( itemDatasource: ref.watch(notificationItemDatasourceProvider), modelConvertor: ref.watch(notificationModelConvertorProvider), ); }
const NotificationsRepository({ required this.itemDatasource, required this.modelConvertor, });
final StreamConvertor< GQueryNotificationData_notification_notificationItems?, SingleRequestParams<GnotificationFilters>> itemDatasource;
final Convertor<NotificationModel, GQueryNotificationData_notification_notificationItems> modelConvertor;
Stream<NotificationModel> queryItem({NotificationFilterCriteria? filter}) { final params = SingleRequestParams<GnotificationFilters>( filter: filterConvertor.mightExecute(filter), );
return itemDatasource .map((item) { if (item == null) throw NotificationNotFoundException(); return item; }) .thenMap(modelConvertor) .decorate((convertor, input) { try { return convertor.execute(input); } catch (e, st) { return Stream.error(e, st); } }) .execute(params); }}- Model Convertor: Transforms raw GQL data to domain model with data cleaning.
@riverpodConvertor<NotificationModel, GQueryNotificationData_notification_notificationItems>notificationModelConvertor(Ref ref) { return Convertor((raw) { return NotificationModel( id: raw.id ?? '', date: raw.date ?? DateTime.now(), title: raw.title ?? 'Untitled', attachments: raw.images ?.map((i) => AttachmentModel(id: i?.id ?? '', mediaUrl: i?.mediaUrl ?? '')) .toList() ?? [], ); });}- Notifier: For simple CRUD, accesses the repository directly.
@riverpodclass NotificationNotifier extends _$NotificationNotifier { @override Stream<NotificationModel> build(String id) { final repository = ref.watch(notificationsRepositoryProvider); return repository.queryItem( filter: NotificationFilterCriteria(id: id), ); }}2. Paginated List Request
Section titled “2. Paginated List Request”The flow is similar to the single item request, but uses list-specific extensions and Ferry’s cache-based pagination:
- Datasource: Maps to a list of items.
@riverpodConvertor<GQueryNotificationReq, ListRequestParams<GnotificationFilters, GnotificationOrder>>notificationListQueryConvertor(Ref ref) { return Convertor((params) { return GQueryNotificationReq((builder) { builder ..requestId = 'notification_list_${params.toString()}' ..vars.G_filter = params.filter?.toBuilder() ..vars.G_sort = params.sort?.toBuilder() ..vars.G_skip = params.skip ..vars.G_take = params.take; }); });}
@riverpodStreamConvertor<List<GQueryNotificationData_notification_notificationItems>, ListRequestParams<GnotificationFilters, GnotificationOrder>>notificationListDatasource(Ref ref) { return GraphQLRequestExecutor<GQueryNotificationData, ListRequestParams<GnotificationFilters, GnotificationOrder>, GQueryNotificationVars>( gqlClient: ref.watch(gqlClientProvider), convertor: ref.watch(notificationListQueryConvertorProvider), ) .then(GraphQLStreamConvertor()) .map((data) => data.notification!.notificationItems!.nonNulls.toList());}- Repository: Uses
.thenEach(modelConvertor)to convert each item in the list.
Stream<List<NotificationModel>> queryList({ int? skip, int? take, NotificationFilterCriteria? filter, NotificationSortCriteria? sort,}) { final params = ListRequestParams<GnotificationFilters, GnotificationOrder>( skip: skip, take: take, filter: filterConvertor.mightExecute(filter), sort: sortConvertor.mightExecute(sort), );
return listDatasource.thenEach(modelConvertor).execute(params);}- Notifier with Ferry Cache-Based Pagination: Instead of using StreamGroup, use a single query and increase
taketo load more. Ferry’s cache serves already-fetched entities instantly.
@riverpodclass NotificationsListNotifier extends _$NotificationsListNotifier { static const int _pageSize = 15; int _currentTake = _pageSize;
@override Stream<List<NotificationModel>> build() { final repository = ref.watch(notificationsRepositoryProvider); return repository.queryList( skip: 0, take: _currentTake, filter: NotificationFilterCriteria( systemParentId: AppConfig.instance.conferenceId, ), sort: const NotificationSortCriteria(date: SortOptions.descending), ); }
void loadMore() { _currentTake += _pageSize; ref.invalidateSelf(); }
void search(String? title) { _currentTake = _pageSize; // Reset on filter change ref.invalidateSelf(); }}3. Mutation with Optimistic Updates
Section titled “3. Mutation with Optimistic Updates”Mutations use a tuple (Params, bool) to optionally enable optimistic responses:
@riverpodConvertor<GMutateChatSaveReq, (UpsertRequestParams<GMutateChatSaveVars>, bool)>chatUpsertConvertor(Ref ref) { return Convertor((params) { return GMutateChatSaveReq((builder) { builder ..requestId = 'chat_upsert_${params.$1.toString()}' ..vars = params.$1.vars.toBuilder(); if (params.$2) { builder ..optimisticResponse.chatSave.id = '__optimistic__' ..optimisticResponse.chatSave.isBlockedByUser1 = params.$1.vars.chatArg?.isBlockedByUser1 ..optimisticResponse.chatSave.isBlockedByUser2 = params.$1.vars.chatArg?.isBlockedByUser2; } }); });}4. Subscription
Section titled “4. Subscription”Subscriptions use SubscriptionRequestParams with optional context for filtering:
@riverpodConvertor<GSubscribeToChatMessagesReq, SubscriptionRequestParams<ChatMessageSubscriptionContext>>chatMessageSubscriptionConvertor(Ref ref) { return Convertor((params) { return GSubscribeToChatMessagesReq((builder) { builder ..requestId = 'chat_message_subscription' ..updateCacheHandlerKey = 'messageSubscriptionCacheHandler' ..updateCacheHandlerContext = params.context?.toJson(); }); });}The repository converts subscription data to domain models:
Stream<ChatMessageModel> subscribeToMessages({ required ChatMessageSubscriptionContext context,}) { final params = SubscriptionRequestParams(context: context); return subscriptionDatasource.thenMap(modelConvertor).execute(params);}Caching Strategy with Ferry
Section titled “Caching Strategy with Ferry”The architecture relies on Ferry’s built-in caching mechanism for GraphQL.
Key Caching Requirements
Section titled “Key Caching Requirements”- Identifiers: Data items must have an identifier (
idor_id) and__typenamefor Ferry to create separate cache nodes (e.g.,'$typename:$id'). - Data Types: Model serialization must result in JSON compatible types (Collections, Primitives, Strings) for successful caching. Non-compatible types will cause a “Linking error.”
How to Evict from Cache
Section titled “How to Evict from Cache”Since Ferry does not have an easy public method for targeted eviction, the architecture provides a manual mechanism via CacheHandlerSpecs:
// Clear a specific request from cacheCacheHandlerSpecs.clear( mapToCachedRequest: Convertor((request) { return GQueryNotificationReq((b) => b ..requestId = 'notification_list' ..vars = request.vars.toBuilder()); }),);
// Clear ALL requests of the same type from cacheCacheHandlerSpecs.clearAll( mapToCachedRequest: Convertor((request) { return GQueryNotificationReq((b) => b..requestId = 'notification_list'); }),);Custom Cache Typed Link
Section titled “Custom Cache Typed Link”The UpdateCacheTypedLink intercepts responses and applies CacheHandlerSpecs automatically:
class UpdateCacheTypedLink extends TypedLink { final Cache cache; final SetMultimap<Type, CacheHandlerSpecs> cacheHandlersSpecs;
@override Stream<OperationResponse<TData, TVars>> request<TData, TVars>( OperationRequest<TData, TVars> req, [forward]) => forward!(req).doOnData(_updateCache);}Merging Mutation Data into Cached Queries
Section titled “Merging Mutation Data into Cached Queries”When a mutation creates or updates an item, merge it into the cached list:
CacheHandlerSpecs.merge( mapToCachedRequest: Convertor((mutationRequest) { // Map to the list query that should be updated return GQueryNotificationReq((b) => b ..requestId = 'notification_list' ..vars.G_filter = /* same filter as list query */); }), mapResponse: Convertor((mutationData) { // Extract the saved item from mutation response return mutationData.notificationSave; }), mergeCachedData: Convertor(((oldData, newData)) { // Merge the new item into the cached list final items = oldData.notification!.notificationItems!.toList(); final index = items.indexWhere((i) => i?.id == newData.id); if (index >= 0) { items[index] = newData; // Update existing } else { items.insert(0, newData); // Add new } return oldData.rebuild((b) => b ..notification.notificationItems.replace(items)); }),);Ferry Cache-Based Pagination
Section titled “Ferry Cache-Based Pagination”Instead of using StreamGroup for pagination, leverage Ferry’s cache:
- Single query approach: Issue one query with
skip: 0, take: currentTotal - Load more: Increase
takeand re-issue the query - Ferry serves cached entities instantly for the already-fetched portion
- Only the new items are fetched from the network
This is simpler than StreamGroup and avoids the complexity of merging multiple streams.