Skip to content

Data Flow & Caching

This section illustrates how the Convertor pattern is applied to common data operations and details the strategy for managing the Ferry cache.

Data Flow Implementation Examples (GraphQL)

Section titled “Data Flow Implementation Examples (GraphQL)”

The pipeline for fetching a single item (e.g., NotificationModel):

  1. Request Parameter Convertor: Transforms custom input parameters into the GraphQL request object.
  2. Datasource: The GraphQLRequestExecutor is chained with GraphQLStreamConvertor, then mapped to extract the item.
@riverpod
Convertor<GQueryNotificationReq, SingleRequestParams<GnotificationFilters>>
notificationItemQueryConvertor(Ref ref) {
return Convertor((params) {
return GQueryNotificationReq((builder) {
builder
..requestId = 'notification_item_${params.toString()}'
..vars.G_filter = params.filter?.toBuilder()
..vars.G_take = 1;
});
});
}
@riverpod
StreamConvertor<GQueryNotificationData_notification_notificationItems?,
SingleRequestParams<GnotificationFilters>>
notificationItemDatasource(Ref ref) {
return GraphQLRequestExecutor<GQueryNotificationData,
SingleRequestParams<GnotificationFilters>, GQueryNotificationVars>(
gqlClient: ref.watch(gqlClientProvider),
convertor: ref.watch(notificationItemQueryConvertorProvider),
)
.then(GraphQLStreamConvertor())
.map((data) => data.notification!.notificationItems!.firstOrNull);
}
  1. Repository: Consumes the datasource and applies .thenMap(modelConvertor) to transform raw GQL data into the domain model.
class NotificationsRepository {
@riverpod
static NotificationsRepository notificationsRepository(Ref ref) {
return NotificationsRepository(
itemDatasource: ref.watch(notificationItemDatasourceProvider),
modelConvertor: ref.watch(notificationModelConvertorProvider),
);
}
const NotificationsRepository({
required this.itemDatasource,
required this.modelConvertor,
});
final StreamConvertor<
GQueryNotificationData_notification_notificationItems?,
SingleRequestParams<GnotificationFilters>> itemDatasource;
final Convertor<NotificationModel,
GQueryNotificationData_notification_notificationItems> modelConvertor;
Stream<NotificationModel> queryItem({NotificationFilterCriteria? filter}) {
final params = SingleRequestParams<GnotificationFilters>(
filter: filterConvertor.mightExecute(filter),
);
return itemDatasource
.map((item) {
if (item == null) throw NotificationNotFoundException();
return item;
})
.thenMap(modelConvertor)
.decorate((convertor, input) {
try {
return convertor.execute(input);
} catch (e, st) {
return Stream.error(e, st);
}
})
.execute(params);
}
}
  1. Model Convertor: Transforms raw GQL data to domain model with data cleaning.
@riverpod
Convertor<NotificationModel,
GQueryNotificationData_notification_notificationItems>
notificationModelConvertor(Ref ref) {
return Convertor((raw) {
return NotificationModel(
id: raw.id ?? '',
date: raw.date ?? DateTime.now(),
title: raw.title ?? 'Untitled',
attachments: raw.images
?.map((i) => AttachmentModel(id: i?.id ?? '', mediaUrl: i?.mediaUrl ?? ''))
.toList() ?? [],
);
});
}
  1. Notifier: For simple CRUD, accesses the repository directly.
@riverpod
class NotificationNotifier extends _$NotificationNotifier {
@override
Stream<NotificationModel> build(String id) {
final repository = ref.watch(notificationsRepositoryProvider);
return repository.queryItem(
filter: NotificationFilterCriteria(id: id),
);
}
}

The flow is similar to the single item request, but uses list-specific extensions and Ferry’s cache-based pagination:

  1. Datasource: Maps to a list of items.
@riverpod
Convertor<GQueryNotificationReq,
ListRequestParams<GnotificationFilters, GnotificationOrder>>
notificationListQueryConvertor(Ref ref) {
return Convertor((params) {
return GQueryNotificationReq((builder) {
builder
..requestId = 'notification_list_${params.toString()}'
..vars.G_filter = params.filter?.toBuilder()
..vars.G_sort = params.sort?.toBuilder()
..vars.G_skip = params.skip
..vars.G_take = params.take;
});
});
}
@riverpod
StreamConvertor<List<GQueryNotificationData_notification_notificationItems>,
ListRequestParams<GnotificationFilters, GnotificationOrder>>
notificationListDatasource(Ref ref) {
return GraphQLRequestExecutor<GQueryNotificationData,
ListRequestParams<GnotificationFilters, GnotificationOrder>,
GQueryNotificationVars>(
gqlClient: ref.watch(gqlClientProvider),
convertor: ref.watch(notificationListQueryConvertorProvider),
)
.then(GraphQLStreamConvertor())
.map((data) => data.notification!.notificationItems!.nonNulls.toList());
}
  1. Repository: Uses .thenEach(modelConvertor) to convert each item in the list.
Stream<List<NotificationModel>> queryList({
int? skip,
int? take,
NotificationFilterCriteria? filter,
NotificationSortCriteria? sort,
}) {
final params = ListRequestParams<GnotificationFilters, GnotificationOrder>(
skip: skip,
take: take,
filter: filterConvertor.mightExecute(filter),
sort: sortConvertor.mightExecute(sort),
);
return listDatasource.thenEach(modelConvertor).execute(params);
}
  1. Notifier with Ferry Cache-Based Pagination: Instead of using StreamGroup, use a single query and increase take to load more. Ferry’s cache serves already-fetched entities instantly.
@riverpod
class NotificationsListNotifier extends _$NotificationsListNotifier {
static const int _pageSize = 15;
int _currentTake = _pageSize;
@override
Stream<List<NotificationModel>> build() {
final repository = ref.watch(notificationsRepositoryProvider);
return repository.queryList(
skip: 0,
take: _currentTake,
filter: NotificationFilterCriteria(
systemParentId: AppConfig.instance.conferenceId,
),
sort: const NotificationSortCriteria(date: SortOptions.descending),
);
}
void loadMore() {
_currentTake += _pageSize;
ref.invalidateSelf();
}
void search(String? title) {
_currentTake = _pageSize; // Reset on filter change
ref.invalidateSelf();
}
}

Mutations use a tuple (Params, bool) to optionally enable optimistic responses:

@riverpod
Convertor<GMutateChatSaveReq, (UpsertRequestParams<GMutateChatSaveVars>, bool)>
chatUpsertConvertor(Ref ref) {
return Convertor((params) {
return GMutateChatSaveReq((builder) {
builder
..requestId = 'chat_upsert_${params.$1.toString()}'
..vars = params.$1.vars.toBuilder();
if (params.$2) {
builder
..optimisticResponse.chatSave.id = '__optimistic__'
..optimisticResponse.chatSave.isBlockedByUser1 =
params.$1.vars.chatArg?.isBlockedByUser1
..optimisticResponse.chatSave.isBlockedByUser2 =
params.$1.vars.chatArg?.isBlockedByUser2;
}
});
});
}

Subscriptions use SubscriptionRequestParams with optional context for filtering:

@riverpod
Convertor<GSubscribeToChatMessagesReq,
SubscriptionRequestParams<ChatMessageSubscriptionContext>>
chatMessageSubscriptionConvertor(Ref ref) {
return Convertor((params) {
return GSubscribeToChatMessagesReq((builder) {
builder
..requestId = 'chat_message_subscription'
..updateCacheHandlerKey = 'messageSubscriptionCacheHandler'
..updateCacheHandlerContext = params.context?.toJson();
});
});
}

The repository converts subscription data to domain models:

Stream<ChatMessageModel> subscribeToMessages({
required ChatMessageSubscriptionContext context,
}) {
final params = SubscriptionRequestParams(context: context);
return subscriptionDatasource.thenMap(modelConvertor).execute(params);
}

The architecture relies on Ferry’s built-in caching mechanism for GraphQL.

  1. Identifiers: Data items must have an identifier (id or _id) and __typename for Ferry to create separate cache nodes (e.g., '$typename:$id').
  2. Data Types: Model serialization must result in JSON compatible types (Collections, Primitives, Strings) for successful caching. Non-compatible types will cause a “Linking error.”

Since Ferry does not have an easy public method for targeted eviction, the architecture provides a manual mechanism via CacheHandlerSpecs:

// Clear a specific request from cache
CacheHandlerSpecs.clear(
mapToCachedRequest: Convertor((request) {
return GQueryNotificationReq((b) => b
..requestId = 'notification_list'
..vars = request.vars.toBuilder());
}),
);
// Clear ALL requests of the same type from cache
CacheHandlerSpecs.clearAll(
mapToCachedRequest: Convertor((request) {
return GQueryNotificationReq((b) => b..requestId = 'notification_list');
}),
);

The UpdateCacheTypedLink intercepts responses and applies CacheHandlerSpecs automatically:

class UpdateCacheTypedLink extends TypedLink {
final Cache cache;
final SetMultimap<Type, CacheHandlerSpecs> cacheHandlersSpecs;
@override
Stream<OperationResponse<TData, TVars>> request<TData, TVars>(
OperationRequest<TData, TVars> req, [forward]) =>
forward!(req).doOnData(_updateCache);
}

When a mutation creates or updates an item, merge it into the cached list:

CacheHandlerSpecs.merge(
mapToCachedRequest: Convertor((mutationRequest) {
// Map to the list query that should be updated
return GQueryNotificationReq((b) => b
..requestId = 'notification_list'
..vars.G_filter = /* same filter as list query */);
}),
mapResponse: Convertor((mutationData) {
// Extract the saved item from mutation response
return mutationData.notificationSave;
}),
mergeCachedData: Convertor(((oldData, newData)) {
// Merge the new item into the cached list
final items = oldData.notification!.notificationItems!.toList();
final index = items.indexWhere((i) => i?.id == newData.id);
if (index >= 0) {
items[index] = newData; // Update existing
} else {
items.insert(0, newData); // Add new
}
return oldData.rebuild((b) => b
..notification.notificationItems.replace(items));
}),
);

Instead of using StreamGroup for pagination, leverage Ferry’s cache:

  1. Single query approach: Issue one query with skip: 0, take: currentTotal
  2. Load more: Increase take and re-issue the query
  3. Ferry serves cached entities instantly for the already-fetched portion
  4. Only the new items are fetched from the network

This is simpler than StreamGroup and avoids the complexity of merging multiple streams.