ServerDatabase

ServerDatabase abstract class API reference

ServerDatabase

Abstract interface for server-side database operations. Implement this class to connect Talon to your backend (Supabase, Firebase, custom API, etc.).

Overview

ServerDatabase handles:

  • Uploading local changes to the server
  • Downloading changes from other clients
  • Real-time subscriptions for live updates

Abstract Methods to Implement

getMessagesFromServer

Future<List<Message>> getMessagesFromServer({
  required int? lastSyncedServerTimestamp,
  required String clientId,
  required String userId,
})

Fetch messages from the server that haven't been synced locally.

ParameterDescription
lastSyncedServerTimestampOnly fetch messages newer than this
clientIdExclude messages from this client (they're local)
userIdOnly fetch this user's messages
@override
Future<List<Message>> getMessagesFromServer({
  required int? lastSyncedServerTimestamp,
  required String clientId,
  required String userId,
}) async {
  final response = await supabase
      .from('messages')
      .select()
      .eq('user_id', userId)
      .neq('client_id', clientId)
      .gt('server_timestamp', lastSyncedServerTimestamp ?? 0)
      .order('server_timestamp', ascending: true);

  return (response as List)
      .map((row) => Message.fromMap(row))
      .toList();
}

sendMessageToServer

Future<bool> sendMessageToServer({required Message message})

Send a single message to the server. Returns true if successful.

@override
Future<bool> sendMessageToServer({required Message message}) async {
  try {
    final data = message.toMap()
      ..remove('hasBeenApplied')
      ..remove('hasBeenSynced');

    await supabase.from('messages').insert(data);
    return true;
  } catch (e) {
    return false;
  }
}

subscribeToServerMessages

StreamSubscription subscribeToServerMessages({
  required String clientId,
  required String userId,
  required int? lastSyncedServerTimestamp,
  required void Function(List<Message>) onMessagesReceived,
})

Subscribe to real-time server updates. Return a StreamSubscription that Talon can cancel.

@override
StreamSubscription subscribeToServerMessages({
  required String clientId,
  required String userId,
  required int? lastSyncedServerTimestamp,
  required void Function(List<Message>) onMessagesReceived,
}) {
  return supabase
      .from('messages')
      .stream(primaryKey: ['id'])
      .eq('user_id', userId)
      .listen((List<Map<String, dynamic>> data) {
        final messages = data
            .where((row) => row['client_id'] != clientId)
            .where((row) =>
                (row['server_timestamp'] ?? 0) > (lastSyncedServerTimestamp ?? 0))
            .map((row) => Message.fromMap(row))
            .toList();

        if (messages.isNotEmpty) {
          onMessagesReceived(messages);
        }
      });
}

Optional Methods

sendMessagesToServer

Future<List<String>> sendMessagesToServer({
  required List<Message> messages,
})

Send multiple messages in a batch. Override for better performance.

Default implementation: Calls sendMessageToServer for each message sequentially.

Optimized implementation:

@override
Future<List<String>> sendMessagesToServer({
  required List<Message> messages,
}) async {
  if (messages.isEmpty) return [];

  try {
    final data = messages.map((m) {
      final map = m.toMap();
      map.remove('hasBeenApplied');
      map.remove('hasBeenSynced');
      return map;
    }).toList();

    await supabase.from('messages').insert(data);
    return messages.map((m) => m.id).toList();
  } catch (e) {
    // Batch failed - fall back to individual sends
    return super.sendMessagesToServer(messages: messages);
  }
}

Complete Implementation Examples

Supabase

import 'dart:async';
import 'package:supabase_flutter/supabase_flutter.dart';
import 'package:talon/talon.dart';

class SupabaseServerDatabase extends ServerDatabase {
  final SupabaseClient _client;

  SupabaseServerDatabase(this._client);

  @override
  Future<List<Message>> getMessagesFromServer({
    required int? lastSyncedServerTimestamp,
    required String clientId,
    required String userId,
  }) async {
    final response = await _client
        .from('messages')
        .select()
        .eq('user_id', userId)
        .neq('client_id', clientId)
        .gt('server_timestamp', lastSyncedServerTimestamp ?? 0)
        .order('server_timestamp', ascending: true);

    return (response as List)
        .map((row) => Message.fromMap(row))
        .toList();
  }

  @override
  Future<bool> sendMessageToServer({required Message message}) async {
    try {
      final data = message.toMap()
        ..remove('hasBeenApplied')
        ..remove('hasBeenSynced');
      await _client.from('messages').insert(data);
      return true;
    } catch (e) {
      return false;
    }
  }

  @override
  Future<List<String>> sendMessagesToServer({
    required List<Message> messages,
  }) async {
    if (messages.isEmpty) return [];

    try {
      final data = messages.map((m) {
        final map = m.toMap();
        map.remove('hasBeenApplied');
        map.remove('hasBeenSynced');
        return map;
      }).toList();

      await _client.from('messages').insert(data);
      return messages.map((m) => m.id).toList();
    } catch (e) {
      return super.sendMessagesToServer(messages: messages);
    }
  }

  @override
  StreamSubscription subscribeToServerMessages({
    required String clientId,
    required String userId,
    required int? lastSyncedServerTimestamp,
    required void Function(List<Message>) onMessagesReceived,
  }) {
    return _client
        .from('messages')
        .stream(primaryKey: ['id'])
        .eq('user_id', userId)
        .listen((List<Map<String, dynamic>> data) {
          final messages = data
              .where((row) => row['client_id'] != clientId)
              .where((row) =>
                  (row['server_timestamp'] ?? 0) >
                  (lastSyncedServerTimestamp ?? 0))
              .map((row) => Message.fromMap(row))
              .toList();

          if (messages.isNotEmpty) {
            onMessagesReceived(messages);
          }
        });
  }
}

Firebase Firestore

import 'dart:async';
import 'package:cloud_firestore/cloud_firestore.dart';
import 'package:talon/talon.dart';

class FirebaseServerDatabase extends ServerDatabase {
  final FirebaseFirestore _firestore;

  FirebaseServerDatabase(this._firestore);

  @override
  Future<List<Message>> getMessagesFromServer({
    required int? lastSyncedServerTimestamp,
    required String clientId,
    required String userId,
  }) async {
    var query = _firestore
        .collection('messages')
        .where('user_id', isEqualTo: userId)
        .where('server_timestamp',
               isGreaterThan: lastSyncedServerTimestamp ?? 0)
        .orderBy('server_timestamp');

    final snapshot = await query.get();

    return snapshot.docs
        .map((doc) => doc.data())
        .where((data) => data['client_id'] != clientId)
        .map((data) => Message.fromMap(data))
        .toList();
  }

  @override
  Future<bool> sendMessageToServer({required Message message}) async {
    try {
      final data = message.toMap()
        ..remove('hasBeenApplied')
        ..remove('hasBeenSynced')
        ..['server_timestamp'] = FieldValue.serverTimestamp();

      await _firestore.collection('messages').doc(message.id).set(data);
      return true;
    } catch (e) {
      return false;
    }
  }

  @override
  StreamSubscription subscribeToServerMessages({
    required String clientId,
    required String userId,
    required int? lastSyncedServerTimestamp,
    required void Function(List<Message>) onMessagesReceived,
  }) {
    return _firestore
        .collection('messages')
        .where('user_id', isEqualTo: userId)
        .where('server_timestamp',
               isGreaterThan: lastSyncedServerTimestamp ?? 0)
        .snapshots()
        .listen((snapshot) {
          final messages = snapshot.docChanges
              .where((change) => change.type == DocumentChangeType.added)
              .map((change) => change.doc.data()!)
              .where((data) => data['client_id'] != clientId)
              .map((data) => Message.fromMap(data))
              .toList();

          if (messages.isNotEmpty) {
            onMessagesReceived(messages);
          }
        });
  }
}

Custom REST API

import 'dart:async';
import 'dart:convert';
import 'package:http/http.dart' as http;
import 'package:talon/talon.dart';

class RestServerDatabase extends ServerDatabase {
  final String baseUrl;
  final String authToken;
  Timer? _pollingTimer;
  StreamController<List<Message>>? _controller;

  RestServerDatabase({
    required this.baseUrl,
    required this.authToken,
  });

  @override
  Future<List<Message>> getMessagesFromServer({
    required int? lastSyncedServerTimestamp,
    required String clientId,
    required String userId,
  }) async {
    final response = await http.get(
      Uri.parse('$baseUrl/messages').replace(queryParameters: {
        'user_id': userId,
        'after': (lastSyncedServerTimestamp ?? 0).toString(),
        'exclude_client': clientId,
      }),
      headers: {'Authorization': 'Bearer $authToken'},
    );

    if (response.statusCode != 200) return [];

    final List<dynamic> data = json.decode(response.body);
    return data.map((row) => Message.fromMap(row)).toList();
  }

  @override
  Future<bool> sendMessageToServer({required Message message}) async {
    final data = message.toMap()
      ..remove('hasBeenApplied')
      ..remove('hasBeenSynced');

    final response = await http.post(
      Uri.parse('$baseUrl/messages'),
      headers: {
        'Authorization': 'Bearer $authToken',
        'Content-Type': 'application/json',
      },
      body: json.encode(data),
    );

    return response.statusCode == 201;
  }

  @override
  StreamSubscription subscribeToServerMessages({
    required String clientId,
    required String userId,
    required int? lastSyncedServerTimestamp,
    required void Function(List<Message>) onMessagesReceived,
  }) {
    _controller = StreamController<List<Message>>();
    int lastTs = lastSyncedServerTimestamp ?? 0;

    // Poll every 5 seconds
    _pollingTimer = Timer.periodic(Duration(seconds: 5), (_) async {
      final messages = await getMessagesFromServer(
        lastSyncedServerTimestamp: lastTs,
        clientId: clientId,
        userId: userId,
      );

      if (messages.isNotEmpty) {
        lastTs = messages
            .map((m) => m.serverTimestamp ?? 0)
            .reduce((a, b) => a > b ? a : b);
        _controller?.add(messages);
      }
    });

    return _controller!.stream.listen(onMessagesReceived);
  }

  void dispose() {
    _pollingTimer?.cancel();
    _controller?.close();
  }
}

Important Notes

  1. Exclude Own Messages: Always filter out messages from the current clientId - they're already local.

  2. Server Timestamp: The server should assign server_timestamp. Use auto-incrementing IDs or database timestamps.

  3. Error Handling: Return false from sendMessageToServer on failure. Talon will retry on next sync.

  4. Batching: Override sendMessagesToServer for significant performance improvements with batch inserts.

  5. Real-time: The subscription should filter messages client-side if your backend doesn't support server-side filtering.