Skip to content

Data Flow & Caching

This section illustrates how the Convertor architecture is applied to common data operations and details the strategy for managing the Ferry cache.

Data Flow Implementation Examples (GraphQL)

Section titled “Data Flow Implementation Examples (GraphQL)”

The pipeline for fetching a single item (e.g., NotificationModel):

  1. Request Parameter Convertor: Transforms custom input parameters (e.g., SingleRequestParams) into the necessary GraphQL request object (e.g., GQueryNotificationReq).
  2. Datasource: The GraphQLRequestExecutor is executed, chained with GraphQLStreamConvertor, and then mapped to extract the single item from the response data.
mixin NotificationsDatasource {
static final itemQueryConvertorProvider = Provider.autoDispose((ref) {
return Convertor<
GQueryNotificationReq,
SingleRequestParams<GnotificationFilters>
>((params) {
return GQueryNotificationReq((builder) {
builder
..requestId = 'notification_item_${params.toString()}'
..vars.G_filter = params.filter?.toBuilder()
..vars.G_take = 1;
});
});
});
static final itemProvider = Provider.autoDispose((ref) {
return GraphQLRequestExecutor<
GQueryNotificationData,
SingleRequestParams<GnotificationFilters>,
GQueryNotificationVars
>(
gqlClient: GqlClientBuilder.getInstance(),
convertor: ref.watch(itemQueryConvertorProvider),
)
.then(GraphQLStreamConvertor())
.map((data) {
return data.notification!.notificationItems!.firstOrNull;
});
});
}
  1. Repository: The repository consumes the Datasource and uses the .thenMap(modelConvertor) extension to apply a modelConvertor (e.g., Convertor<NotificationModel, GQueryNotificationReq>) to transform the raw GraphQL data into the final domain model.
class NewNotificationsRepository {
static final provider = Provider.autoDispose((ref) {
return NewNotificationsRepository(
itemDatasource: ref.watch(NotificationsDatasource.itemProvider),
modelConvertor: ref.watch(NotificationsDomain.modelConvertor),
);
});
const NewNotificationsRepository({
required this.itemDatasource,
required this.modelConvertor,
});
final StreamConvertor<
GQueryNotificationData_notification_notificationItems?,
SingleRequestParams<GnotificationFilters>
>
itemDatasource;
final Convertor<
NotificationModel,
GQueryNotificationData_notification_notificationItems
>
modelConvertor;
Stream<NotificationModel> queryItem({NotificationFilterCriteria? filter}) {
final params = SingleRequestParams<GnotificationFilters>(
filter: filterConvertor.mightExecute(filter),
);
return itemDatasource.map((item) {
if (item == null) throw Exception('Notification item not found');
return item;
}).thenMap(modelConvertor)
.transform<NotificationModel>(StreamTransformer.fromHandlers(
handleError: (e, s, sink) {
// Handle error
sink.addError(e, s);
})).decorate((convertor, input) {
try {
return convertor.execute(input);
} catch (e, st) {
return Stream.error(e, st);
}
}).execute(params);
}
}
mixin NotificationsDomain {
static final modelConvertor = Provider.autoDispose((ref) {
return Convertor<
NotificationModel,
GQueryNotificationData_notification_notificationItems
>((notification) {
return NotificationModel(
id: notification.id!,
date: notification.date?.toDto,
title: notification.title,
attachments:
notification.images
?.map((i) => AttachmentModel(id: i?.id, mediaUrl: i?.mediaUrl))
.toList() ??
[],
);
});
});
}
  1. Controller: Executes the repository’s method to stream the result into a StreamGroup.
class NotificationNotifier
extends AutoDisposeFamilyStreamNotifier<NotificationModel, String> {
static final provider = StreamNotifierProvider.autoDispose
.family<NotificationNotifier, NotificationModel, String>(
NotificationNotifier.new,
);
late StreamGroup<NotificationModel> _sourceSubject;
late final NotificationFilterCriteria _filterCriteria = NotificationFilterCriteria(
systemParentId: AppConfig.instance.conferenceId,
id: arg,
);
@override
Stream<NotificationModel> build(String id) async* {
_sourceSubject = StreamGroup();
ref.onDispose(_sourceSubject.close);
_fetchNotification();
yield* _sourceSubject.stream;
}
Future<void> retryFetch() async {
_fetchNotification();
}
void _fetchNotification() {
final repository = ref.watch(NewNotificationsRepository.provider);
_sourceSubject.add(repository.queryItem(filter: _filterCriteria));
}
void refetch() {
_emitLoading();
ref.invalidateSelf();
}
void _emitLoading() {
state = const AsyncLoading();
}
}

The flow is similar to the single item request, but utilizes list-specific extensions:

  1. Datasource: The GraphQLRequestExecutor is executed, chained with GraphQLStreamConvertor, and then mapped to extract the single item from the response data.
mixin NotificationsDatasource {
static final queryConvertorProvider = Provider.autoDispose((ref) {
return Convertor<
GQueryNotificationReq,
ListRequestParams<GnotificationFilters, GnotificationOrder>
>((params) {
return GQueryNotificationReq((builder) {
builder
..requestId = 'notification_list_${params.toString()}'
..vars.G_filter = params.filter?.toBuilder()
..vars.G_sort = params.sort?.toBuilder()
..vars.G_skip = params.skip
..vars.G_take = params.take;
});
});
});
static final listProvider = Provider.autoDispose((ref) {
return GraphQLRequestExecutor<
GQueryNotificationData,
ListRequestParams<GnotificationFilters, GnotificationOrder>,
GQueryNotificationVars
>(
gqlClient: GqlClientBuilder.getInstance(),
convertor: ref.watch(queryConvertorProvider),
)
.then(GraphQLStreamConvertor())
.map((data) {
return data.notification!.notificationItems!.nonNulls.toList();
});
});
}
  1. Repository: The repository consumes the list-fetching Datasource. It then uses .thenEach(modelConvertor) to efficiently apply the model conversion to every item in the streamed list, resulting in a Stream<List<NotificationModel>>.
class NewNotificationsRepository {
static final provider = Provider.autoDispose((ref) {
return NewNotificationsRepository(
listDatasource: ref.watch(NotificationsDatasource.listProvider),
modelConvertor: ref.watch(NotificationsDomain.modelConvertor),
filterConvertor: ref.watch(NotificationsDomain.filterConvertor),
sortConvertor: ref.watch(NotificationsDomain.sortConvertor),
);
});
const NewNotificationsRepository({
required this.listDatasource,
required this.modelConvertor,
required this.filterConvertor,
required this.sortConvertor,
});
final StreamConvertor<
List<GQueryNotificationData_notification_notificationItems>,
ListRequestParams<GnotificationFilters, GnotificationOrder>
>
listDatasource;
final Convertor<
NotificationModel,
GQueryNotificationData_notification_notificationItems
>
modelConvertor;
final Convertor<GnotificationFilters, NotificationFilterCriteria>
filterConvertor;
final Convertor<GnotificationOrder, NotificationSortCriteria> sortConvertor;
Stream<List<NotificationModel>> queryList({
int? skip,
int? take,
NotificationFilterCriteria? filter,
NotificationSortCriteria? sort,
}) {
final params = ListRequestParams<GnotificationFilters, GnotificationOrder>(
skip: skip,
take: take,
filter: filterConvertor.mightExecute(filter),
sort: sortConvertor.mightExecute(sort),
);
return listDatasource.thenEach(modelConvertor).execute(params);
}
}
mixin NotificationsDomain {
static final modelConvertor = Provider.autoDispose((ref) {
return Convertor<
NotificationModel,
GQueryNotificationData_notification_notificationItems
>((notification) {
return NotificationModel(
id: notification.id!,
date: notification.date?.toDto,
title: notification.title,
attachments:
notification.images
?.map((i) => AttachmentModel(id: i?.id, mediaUrl: i?.mediaUrl))
.toList() ??
[],
);
});
});
static final filterConvertor = Provider.autoDispose((ref) {
return Convertor<GnotificationFilters, NotificationFilterCriteria>((
filter,
) {
return GnotificationFilters(
(b) => b
..id = filter.id
..systemParentId = filter.systemParentId
..feature = filter.feature
..title = GStringFiltersBuilder().construct(contains: filter.title),
);
});
});
static final sortConvertor = Provider.autoDispose((ref) {
return Convertor<GnotificationOrder, NotificationSortCriteria>((sort) {
return GnotificationOrder(
(b) => b
..id = sort.id?.value
..date = sort.date?.value,
);
});
});
}
  1. Controller: Manages pagination state (skip/take) and uses the repository’s queryList method to fetch a chunk of data, which is merged into a StreamGroup.
class NotificationsListNotifier
extends AutoDisposeStreamNotifier<PaginationState<NotificationModel>> {
static final int _countAtATime = 15;
static final provider =
StreamNotifierProvider.autoDispose<
NotificationsListNotifier,
PaginationState<NotificationModel>
>(NotificationsListNotifier.new);
late StreamGroup<(int, List<NotificationModel>)> _streamGroup;
late PaginationState<NotificationModel> _paginationState = PaginationState(
skip: 0,
take: _countAtATime,
);
int _nextPage = 0;
NotificationFilterCriteria _filterCriteria = NotificationFilterCriteria(
systemParentId: AppConfig.instance.conferenceId,
);
NotificationSortCriteria _sortCriteria = const NotificationSortCriteria(
date: SortOptions.descending,
);
@override
Stream<PaginationState<NotificationModel>> build() async* {
_paginationState = PaginationState(
skip: 0,
take: _countAtATime,
);
_nextPage = 0;
_streamGroup = StreamGroup();
ref.onDispose(_streamGroup.close);
_fetchNotifications();
PaginatedList<NotificationModel> items = PaginatedList(pages: {});
yield* _streamGroup.stream.transform(
StreamTransformer.fromHandlers(
handleData: (data, sink) {
items = items.clone()..updatePage(data.$1, data.$2);
var paginationState = _paginationState;
final paginatedData = PaginatedData(
items: items,
currentCount: items.length,
totalCount: items.length,
);
paginationState = paginationState.copyWith(
data: paginatedData,
loadMoreError: data.$1 == _nextPage
? null
: paginationState.loadMoreError,
isLoadingMore: data.$1 == _nextPage
? false
: paginationState.isLoadingMore,
);
_paginationState = paginationState;
sink.add(paginationState);
},
handleError: (error, st, sink) {
if (items.isEmpty) {
throw error;
}
var paginationState = _paginationState;
paginationState = paginationState.copyWith(
isLoadingMore: false,
loadMoreError: error,
);
_paginationState = paginationState;
sink.add(paginationState);
},
),
);
}
Future<void> loadMore() async {
if (_nextPage != _paginationState.data?.items?.lastPageIndex) return;
final index = _nextPage++;
_paginationState = _paginationState.copyWith(
skip: index * _countAtATime,
take: _countAtATime,
isLoadingMore: true,
);
state = AsyncData(_paginationState);
_fetchNotifications();
}
Future<void> retryFetch() async {
_fetchNotifications();
}
void _fetchNotifications() {
final repository = ref.watch(NewNotificationsRepository.provider);
final index = _nextPage;
_streamGroup.add(
repository
.queryList(
skip: index * _countAtATime,
take: _countAtATime,
filter: _filterCriteria,
sort: _sortCriteria,
)
.map((list) => (index, list)),
);
}
void sort({SortOptions? date}) {
_sortCriteria =
_sortCriteria.copyWith(date: date);
_emitLoading();
ref.invalidateSelf();
}
void filter({bool? feature, String? title}) {
_filterCriteria = _filterCriteria.copyWith(feature: feature, title: title);
_emitLoading();
ref.invalidateSelf();
}
void search([String? title]) {
filter(title: title);
}
void _emitLoading() {
state = const AsyncLoading();
}
}

Subscriptions and Mutations utilize the StreamConvertor pattern to handle continuous updates or stream responses.

  • Subscription: The Datasource is executed, and the resulting stream is mapped to the necessary type (e.g., mapped to include ChatContext) and then converted to the domain model using .thenMap(modelConvertor).
  • Mutation: The Datasource is executed with a tuple input (Params, bool) to optionally enable optimistic responses. The resulting stream data is then converted to the domain model using .thenMap(mutationModelConvertor).
static final upsertConvertorProvider = Provider.autoDispose((ref) {
return Convertor<
GMutateChatSaveReq,
(UpsertRequestParams<GMutateChatSaveVars>, bool)
>((params) {
return GMutateChatSaveReq((builder) {
builder
..requestId = 'chat_upsert_${params.$1.toString()}'
..vars = params.$1.vars.toBuilder();
if (params.$2) {
// To enable optimisticResponse
builder
..optimisticResponse.chatSave.id = '__optimistic__'
..optimisticResponse.chatSave.isBlockedByUser1 =
params.$1.vars.chatArg?.isBlockedByUser1
..optimisticResponse.chatSave.isBlockedByUser2 =
params.$1.vars.chatArg?.isBlockedByUser2;
}
});
});
});

The architecture relies on Ferry’s built-in caching mechanism for GraphQL.

  1. Identifiers: Data items must have an identifier (id or _id) and __typename for Ferry to create separate cache nodes (e.g., '$typename:$id').
  2. Data Types: Model serialization must result in JSON compatible types (Collections, Primitives, Strings) for successful caching. Non-compatible types will cause a “Linking error.”

Since Ferry does not have an easy public method for targeted eviction, the architecture provides a manual mechanism:

  1. Identify the operationDefinition and resolve the rootTypeName (Query/Mutation/Subscription).
  2. Find the specific GraphQL field node (the operation name, e.g., attendee).
  3. Call cacheProxy.evict() with the root identity, field name, and request variables.
  • To clear all requests of the same type: Pass an empty map (const <String, dynamic>{}) for the args parameter in cacheProxy.evict().
final operation = mappedRequest.operation;
final operationDefinition = getOperationDefinition(
operation.document,
operation.operationName,
);
final rootTypeName = resolveRootTypename(
operationDefinition,
cache.typePolicies,
);
var identity = cacheProxy.identify(_ToJson({'__typename': rootTypeName})); // a class that offers ‘toJson’ function that returns the same map.
if (identity != null) {
final node = operationDefinition.selectionSet.selections
.whereType<FieldNode>()
.where((node) {
return node.name.value != '__typename';
})
.firstOrNull;
if (node != null) {
cacheProxy.evict(
identity,
fieldName: node.name.value,
args: specs.clearAllRequests ? const <String, dynamic>{} : request.varsToJson(),
);
}
}

To simplify and automate complex cache operations, a Custom Cache Typed Link (UpdateCacheTypedLink) is used.

class CacheHandlerSpecs<
ResponseData,
RequestVars,
MappedRequestData,
MappedRequestVars
> {
final String? cacheHandlerKey;
final Convertor<
OperationRequest<MappedRequestData, MappedRequestVars>?,
OperationRequest<ResponseData, RequestVars>
>
mapToCachedRequest;
final Convertor<MappedRequestData?, ResponseData>? mapResponse;
final Convertor<
MappedRequestData?,
(MappedRequestData oldData, MappedRequestData newData)
>?
mergeCachedData;
final bool clearRequest;
final bool clearAllRequests;
const CacheHandlerSpecs.clear({
required this.mapToCachedRequest,
this.cacheHandlerKey,
}) : this.clearRequest = true,
this.clearAllRequests = false,
this.mapResponse = null,
this.mergeCachedData = null;
const CacheHandlerSpecs.clearAll({
required this.mapToCachedRequest,
this.cacheHandlerKey,
}) : this.clearRequest = false,
this.clearAllRequests = true,
this.mapResponse = null,
this.mergeCachedData = null;
const CacheHandlerSpecs.merge({
required this.mapToCachedRequest,
required Convertor<MappedRequestData?, ResponseData> this.mapResponse,
required Convertor<
MappedRequestData?,
(MappedRequestData oldData, MappedRequestData newData)
>
this.mergeCachedData,
this.cacheHandlerKey,
}) : this.clearRequest = false,
this.clearAllRequests = false;
}
class _ToJson {
final Map<String, dynamic> json;
const _ToJson(this.json);
Map<String, dynamic> toJson() => json;
}

It intercepts responses and uses CacheHandlerSpecs attached to the request to define how the cache should be managed:

  • clearRequest / clearAllRequests: Flags used to trigger the internal eviction logic (as described above).
  • mapResponse / mergeCachedData: Used to merge new data (e.g., from a mutation response) into an existing cached query, ensuring that list and detail views update automatically.
class UpdateCacheTypedLink extends TypedLink {
final Cache cache;
final SetMultimap<Type, CacheHandlerSpecs> cacheHandlersSpecs;
const UpdateCacheTypedLink({
required this.cache,
required this.cacheHandlersSpecs,
});
@override
Stream<OperationResponse<TData, TVars>> request<TData, TVars>(
OperationRequest<TData, TVars> req, [
forward,
]) => forward!(req).doOnData(_updateCache);
void _updateCache<TData, TVars>(OperationResponse<TData, TVars> response) {
if (response.data == null) return;
final request = response.operationRequest;
final key = request.updateCacheHandlerKey;
final handlers = cacheHandlersSpecs[request.runtimeType];
Iterable<CacheHandlerSpecs> specs = handlers.where(
(handler) => handler.cacheHandlerKey == key,
);
if (key != null && specs.isEmpty) {
throw Exception('No specs defined for key $key');
}
final proxy = switch (response.dataSource) {
DataSource.Optimistic => CacheProxy(cache, request),
DataSource.Link => CacheProxy(cache),
_ => null,
};
if (proxy == null) return;
for (final spec in specs) {
_executeUpdate(spec, request, response.data!, proxy);
}
}
void _executeUpdate<TData, TVars, MData, MVars>(
CacheHandlerSpecs<TData, TVars, MData, MVars> specs,
OperationRequest<TData, TVars> request,
TData response,
CacheProxy cacheProxy,
) {
final mappedRequest = specs.mapToCachedRequest.execute(request);
if (mappedRequest == null) return;
if (specs.clearRequest || specs.clearAllRequests) {
final operation = mappedRequest.operation;
final operationDefinition = getOperationDefinition(
operation.document,
operation.operationName,
);
final rootTypeName = resolveRootTypename(
operationDefinition,
cache.typePolicies,
);
var identity = cacheProxy.identify(_ToJson({'__typename': rootTypeName}));
if (identity != null) {
final node = operationDefinition.selectionSet.selections
.whereType<FieldNode>()
.where((node) {
return node.name.value != '__typename';
})
.firstOrNull;
if (node != null) {
cacheProxy.evict(
identity,
fieldName: node.name.value,
args: specs.clearAllRequests ? const <String, dynamic>{} : request.varsToJson(),
);
}
}
return;
}
final cachedRequestData = cacheProxy.readQuery(mappedRequest);
if (cachedRequestData == null) return;
final mappedResponse = specs.mapResponse?.execute(response);
if (mappedResponse == null) return;
final mappedCachedRequestData = specs.mergeCachedData?.execute((
cachedRequestData,
mappedResponse,
));
if (mappedCachedRequestData == null) return;
cacheProxy.writeQuery(mappedRequest, mappedCachedRequestData);
}
}