Convertor Pattern
The Convertor pattern has replaced the legacy Strategy/DatasourceModule pattern. See ADR-005 for the migration rationale.
Convertor Pattern
Section titled âConvertor PatternâThe Convertor pattern provides composable, chainable data pipelines for all datasource operations in MOFA architecture. It is implemented as a shared Dart package reusable across projects.
Overview
Section titled âOverviewâThe Convertor pattern solves datasource concerns by:
- Composable Pipelines: Chain transformations fluently with
.then(),.map(),.thenMap(),.thenEach() - No Registration: Unlike the old DatasourceModule, Convertors are composed directly in
@riverpodproviders - Declarative Cache:
CacheHandlerSpecs+UpdateCacheTypedLinkreplace imperative cache handler registration - Type-Safe: Covariant generic types throughout the chain prevent type errors at compile time
- Testable: Each Convertor is a pure function testable in isolation
Core Types
Section titled âCore TypesâConvertor
Section titled âConvertorâThe fundamental unit - accepts a single input and produces a single output:
abstract class Convertor<To, From> { To execute(covariant From from);}StreamConvertor
Section titled âStreamConvertorâA Convertor that produces a Stream:
typedef StreamConvertor<To, From> = Convertor<Stream<To>, From>;AsyncConvertor
Section titled âAsyncConvertorâA Convertor that produces a Future:
typedef AsyncConvertor<To, From> = Convertor<Future<To>, From>;Composition Components
Section titled âComposition ComponentsâA sequence of Convertors where the output of each becomes the input of the next:
// Implicit chaining with .then()final pipeline = requestConvertor .then(executor) .then(streamConvertor) .map(extractData);Interceptor
Section titled âInterceptorâWraps a Convertor for side effects without modifying data flow:
final logged = datasource.intercept( onInput: (params) => print('Fetching: $params'), onOutput: (data) => print('Received ${data.length} items'),);Decorator
Section titled âDecoratorâWraps a Convertor to modify input/output or bypass execution:
final withFallback = datasource.decorate((convertor, input) { try { return convertor.execute(input); } catch (e) { return Stream.error(e); }});GraphQL Components
Section titled âGraphQL ComponentsâGraphQLRequestExecutor
Section titled âGraphQLRequestExecutorâExecutes a Ferry operation request, producing a response stream:
class GraphQLRequestExecutor<Data, Params, Vars> implements StreamConvertor<OperationResponse<Data, Vars>, Params> { final GqlClient _gqlClient; final Convertor<OperationRequest<Data, Vars>, Params> _convertor;
@override Stream<OperationResponse<Data, Vars>> execute(Params from) { final request = _convertor.execute(from); return _gqlClient.request(request); }}GraphQLStreamConvertor
Section titled âGraphQLStreamConvertorâUnwraps Ferryâs OperationResponse stream into a clean data stream:
class GraphQLStreamConvertor<Data, Vars> implements StreamConvertor<Data, Stream<OperationResponse<Data, Vars>>> { @override Stream<Data> execute(Stream<OperationResponse<Data, Vars>> from) { return from.transform(StreamTransformer.fromHandlers( handleData: (data, sink) { if (data.hasErrors || data.data == null) { sink.addError(data.graphqlErrors?.firstOrNull ?? data.linkException ?? Exception()); } else { sink.add(data.data!); } }, )); }}CacheHandlerSpecs
Section titled âCacheHandlerSpecsâDeclarative cache operation configuration:
// Clear a specific cached requestCacheHandlerSpecs.clear( mapToCachedRequest: Convertor((request) => /* map to cached query */),);
// Clear all cached requests of the same typeCacheHandlerSpecs.clearAll( mapToCachedRequest: Convertor((request) => /* map to cached query */),);
// Merge mutation data into cached queryCacheHandlerSpecs.merge( mapToCachedRequest: Convertor((request) => /* map to cached query */), mapResponse: Convertor((data) => /* extract merge data */), mergeCachedData: Convertor(((old, new_)) => /* merge logic */),);UpdateCacheTypedLink
Section titled âUpdateCacheTypedLinkâA Ferry TypedLink that applies CacheHandlerSpecs automatically:
class UpdateCacheTypedLink extends TypedLink { final Cache cache; final SetMultimap<Type, CacheHandlerSpecs> cacheHandlersSpecs;
@override Stream<OperationResponse<TData, TVars>> request<TData, TVars>( OperationRequest<TData, TVars> req, [forward]) => forward!(req).doOnData(_updateCache);}Standard Pipeline Pattern
Section titled âStandard Pipeline PatternâList Query
Section titled âList Queryâ@riverpodStreamConvertor<List<GData_items>, ListRequestParams<GFilters, GOrder>>featureListDatasource(Ref ref) { return GraphQLRequestExecutor<GData, ListRequestParams<GFilters, GOrder>, GVars>( gqlClient: ref.watch(gqlClientProvider), convertor: ref.watch(featureListQueryConvertorProvider), ) .then(GraphQLStreamConvertor()) .map((data) => data.feature!.items!.nonNulls.toList());}Single Item Query
Section titled âSingle Item Queryâ@riverpodStreamConvertor<GData_item?, SingleRequestParams<GFilters>>featureItemDatasource(Ref ref) { return GraphQLRequestExecutor<GData, SingleRequestParams<GFilters>, GVars>( gqlClient: ref.watch(gqlClientProvider), convertor: ref.watch(featureItemQueryConvertorProvider), ) .then(GraphQLStreamConvertor()) .map((data) => data.feature!.items!.firstOrNull);}Mutation
Section titled âMutationâ@riverpodStreamConvertor<GData_mutate, (UpsertRequestParams<GVars>, bool)>featureUpsertDatasource(Ref ref) { return GraphQLRequestExecutor<GData, (UpsertRequestParams<GVars>, bool), GVars>( gqlClient: ref.watch(gqlClientProvider), convertor: ref.watch(featureUpsertConvertorProvider), ) .then(GraphQLStreamConvertor()) .map((data) => data.featureSave);}Subscription
Section titled âSubscriptionâ@riverpodStreamConvertor<GData_subscription, SubscriptionRequestParams<SubscriptionContext>>featureSubscriptionDatasource(Ref ref) { return GraphQLRequestExecutor<GData, SubscriptionRequestParams<SubscriptionContext>, GVars>( gqlClient: ref.watch(gqlClientProvider), convertor: ref.watch(featureSubscriptionConvertorProvider), ) .then(GraphQLStreamConvertor()) .map((data) => data.featureSubscription);}REST Components
Section titled âREST ComponentsâRestJsonExecutor
Section titled âRestJsonExecutorâExecutes HTTP requests via Dio:
class RestJsonExecutor<Result> implements AsyncConvertor<Response<Result>, RestParams> { final Dio dio; @override Future<Response<Result>> execute(RestParams argument) async { return await dio.request<Result>(argument.path, /* ... */); }}RestDataExecutor
Section titled âRestDataExecutorâChains execution with response data conversion:
class RestDataExecutor<Data> implements AsyncConvertor<Response<Data>, RestParams> { final RestJsonExecutor _executor; final Convertor<Data, dynamic> _convertor; @override Future<Response<Data>> execute(RestParams from) async { final response = await _executor.execute(from); return Response(data: _convertor.execute(response.data!), /* ... */); }}Extension API Reference
Section titled âExtension API ReferenceâSee the Convertor API Reference for the complete extension method listing.
Testing Convertors
Section titled âTesting ConvertorsâEach Convertor is a pure function, making testing straightforward:
void main() { group('NotificationModelConvertor', () { test('converts GQL data to domain model', () { final convertor = notificationModelConvertor(); final gqlData = GQueryNotificationData_notification_notificationItems(/* ... */);
final result = convertor.execute(gqlData);
expect(result, isA<NotificationModel>()); expect(result.id, equals(gqlData.id)); expect(result.title, equals(gqlData.title)); }); });
group('NotificationListDatasource', () { test('chains executor -> stream convertor -> data extraction', () { // Test the full pipeline with mock GqlClient final mockClient = MockGqlClient(); // ... setup and assertions }); });}Migration from DatasourceModule
Section titled âMigration from DatasourceModuleâIf migrating from the legacy Strategy/DatasourceModule pattern:
- Replace
RequestStrategyclasses withConvertorfunctions in@riverpodproviders - Replace
DatasourceModule.create()with directGraphQLRequestExecutorcomposition - Replace
CacheHandlerStrategywithCacheHandlerSpecsinUpdateCacheTypedLinkconfig - Remove strategy key enums (not needed with direct composition)
- Remove
RequestContext(Convertors compose directly)
See ADR-005 for the complete migration guide.