Skip to content

Convertor Pattern

Convertor Pattern Replaces DatasourceModule

The Convertor pattern has replaced the legacy Strategy/DatasourceModule pattern. See ADR-005 for the migration rationale.

The Convertor pattern provides composable, chainable data pipelines for all datasource operations in MOFA architecture. It is implemented as a shared Dart package reusable across projects.

The Convertor pattern solves datasource concerns by:

  • Composable Pipelines: Chain transformations fluently with .then(), .map(), .thenMap(), .thenEach()
  • No Registration: Unlike the old DatasourceModule, Convertors are composed directly in @riverpod providers
  • Declarative Cache: CacheHandlerSpecs + UpdateCacheTypedLink replace imperative cache handler registration
  • Type-Safe: Covariant generic types throughout the chain prevent type errors at compile time
  • Testable: Each Convertor is a pure function testable in isolation

The fundamental unit - accepts a single input and produces a single output:

abstract class Convertor<To, From> {
To execute(covariant From from);
}

A Convertor that produces a Stream:

typedef StreamConvertor<To, From> = Convertor<Stream<To>, From>;

A Convertor that produces a Future:

typedef AsyncConvertor<To, From> = Convertor<Future<To>, From>;

A sequence of Convertors where the output of each becomes the input of the next:

// Implicit chaining with .then()
final pipeline = requestConvertor
.then(executor)
.then(streamConvertor)
.map(extractData);

Wraps a Convertor for side effects without modifying data flow:

final logged = datasource.intercept(
onInput: (params) => print('Fetching: $params'),
onOutput: (data) => print('Received ${data.length} items'),
);

Wraps a Convertor to modify input/output or bypass execution:

final withFallback = datasource.decorate((convertor, input) {
try {
return convertor.execute(input);
} catch (e) {
return Stream.error(e);
}
});

Executes a Ferry operation request, producing a response stream:

class GraphQLRequestExecutor<Data, Params, Vars>
implements StreamConvertor<OperationResponse<Data, Vars>, Params> {
final GqlClient _gqlClient;
final Convertor<OperationRequest<Data, Vars>, Params> _convertor;
@override
Stream<OperationResponse<Data, Vars>> execute(Params from) {
final request = _convertor.execute(from);
return _gqlClient.request(request);
}
}

Unwraps Ferry’s OperationResponse stream into a clean data stream:

class GraphQLStreamConvertor<Data, Vars>
implements StreamConvertor<Data, Stream<OperationResponse<Data, Vars>>> {
@override
Stream<Data> execute(Stream<OperationResponse<Data, Vars>> from) {
return from.transform(StreamTransformer.fromHandlers(
handleData: (data, sink) {
if (data.hasErrors || data.data == null) {
sink.addError(data.graphqlErrors?.firstOrNull ??
data.linkException ?? Exception());
} else {
sink.add(data.data!);
}
},
));
}
}

Declarative cache operation configuration:

// Clear a specific cached request
CacheHandlerSpecs.clear(
mapToCachedRequest: Convertor((request) => /* map to cached query */),
);
// Clear all cached requests of the same type
CacheHandlerSpecs.clearAll(
mapToCachedRequest: Convertor((request) => /* map to cached query */),
);
// Merge mutation data into cached query
CacheHandlerSpecs.merge(
mapToCachedRequest: Convertor((request) => /* map to cached query */),
mapResponse: Convertor((data) => /* extract merge data */),
mergeCachedData: Convertor(((old, new_)) => /* merge logic */),
);

A Ferry TypedLink that applies CacheHandlerSpecs automatically:

class UpdateCacheTypedLink extends TypedLink {
final Cache cache;
final SetMultimap<Type, CacheHandlerSpecs> cacheHandlersSpecs;
@override
Stream<OperationResponse<TData, TVars>> request<TData, TVars>(
OperationRequest<TData, TVars> req, [forward]) =>
forward!(req).doOnData(_updateCache);
}
@riverpod
StreamConvertor<List<GData_items>, ListRequestParams<GFilters, GOrder>>
featureListDatasource(Ref ref) {
return GraphQLRequestExecutor<GData, ListRequestParams<GFilters, GOrder>, GVars>(
gqlClient: ref.watch(gqlClientProvider),
convertor: ref.watch(featureListQueryConvertorProvider),
)
.then(GraphQLStreamConvertor())
.map((data) => data.feature!.items!.nonNulls.toList());
}
@riverpod
StreamConvertor<GData_item?, SingleRequestParams<GFilters>>
featureItemDatasource(Ref ref) {
return GraphQLRequestExecutor<GData, SingleRequestParams<GFilters>, GVars>(
gqlClient: ref.watch(gqlClientProvider),
convertor: ref.watch(featureItemQueryConvertorProvider),
)
.then(GraphQLStreamConvertor())
.map((data) => data.feature!.items!.firstOrNull);
}
@riverpod
StreamConvertor<GData_mutate, (UpsertRequestParams<GVars>, bool)>
featureUpsertDatasource(Ref ref) {
return GraphQLRequestExecutor<GData, (UpsertRequestParams<GVars>, bool), GVars>(
gqlClient: ref.watch(gqlClientProvider),
convertor: ref.watch(featureUpsertConvertorProvider),
)
.then(GraphQLStreamConvertor())
.map((data) => data.featureSave);
}
@riverpod
StreamConvertor<GData_subscription, SubscriptionRequestParams<SubscriptionContext>>
featureSubscriptionDatasource(Ref ref) {
return GraphQLRequestExecutor<GData, SubscriptionRequestParams<SubscriptionContext>, GVars>(
gqlClient: ref.watch(gqlClientProvider),
convertor: ref.watch(featureSubscriptionConvertorProvider),
)
.then(GraphQLStreamConvertor())
.map((data) => data.featureSubscription);
}

Executes HTTP requests via Dio:

class RestJsonExecutor<Result> implements AsyncConvertor<Response<Result>, RestParams> {
final Dio dio;
@override
Future<Response<Result>> execute(RestParams argument) async {
return await dio.request<Result>(argument.path, /* ... */);
}
}

Chains execution with response data conversion:

class RestDataExecutor<Data> implements AsyncConvertor<Response<Data>, RestParams> {
final RestJsonExecutor _executor;
final Convertor<Data, dynamic> _convertor;
@override
Future<Response<Data>> execute(RestParams from) async {
final response = await _executor.execute(from);
return Response(data: _convertor.execute(response.data!), /* ... */);
}
}

See the Convertor API Reference for the complete extension method listing.

Each Convertor is a pure function, making testing straightforward:

void main() {
group('NotificationModelConvertor', () {
test('converts GQL data to domain model', () {
final convertor = notificationModelConvertor();
final gqlData = GQueryNotificationData_notification_notificationItems(/* ... */);
final result = convertor.execute(gqlData);
expect(result, isA<NotificationModel>());
expect(result.id, equals(gqlData.id));
expect(result.title, equals(gqlData.title));
});
});
group('NotificationListDatasource', () {
test('chains executor -> stream convertor -> data extraction', () {
// Test the full pipeline with mock GqlClient
final mockClient = MockGqlClient();
// ... setup and assertions
});
});
}

If migrating from the legacy Strategy/DatasourceModule pattern:

  1. Replace RequestStrategy classes with Convertor functions in @riverpod providers
  2. Replace DatasourceModule.create() with direct GraphQLRequestExecutor composition
  3. Replace CacheHandlerStrategy with CacheHandlerSpecs in UpdateCacheTypedLink config
  4. Remove strategy key enums (not needed with direct composition)
  5. Remove RequestContext (Convertors compose directly)

See ADR-005 for the complete migration guide.