AWS Amplify - 同步数据存储

AWS Amplify - sync DataStore

我正在尝试构建一个原型应用程序来评估 AWS-Amplify (DataStore) 在我们的下一个应用程序中的使用情况。我在尝试同步 2 个客户端时遇到问题。我按照此处教程的说明设置了我的 AWS-DataStore:https://docs.amplify.aws/lib/datastore/getting-started/q/platform/ios 通过使用 Cocoapods。关于 Cocoapods,一切都按预期工作。我尝试了一些测试,也可以在上面的 link 下看到“操纵数据”。我还做了“将数据同步到云”部分。我可以在 AWS 控制台的 dynamoDB 中看到我的数据。当我添加一个条目时,我可以在控制台中看到它一切都按预期工作,但我在两种情况下遇到问题:

假设我有 2 个客户端 A 和 B(都是 运行 iOS 13 - 它是真实设备还是模拟器都无关紧要)我想要这些客户端保持同步。为此,我添加了一个 PostStore(我正在使用来自 AWS-link 以上 atm 的示例模式),它看起来像这样:

import Foundation
import Combine
import SwiftUI
import Amplify

class PostStore: ObservableObject {
    
    @Published private(set) var posts: [Post] = []
    
    var postSubscription: AnyCancellable?
    
    init() {
        self.getAllPostsFromDataStore()
    }
    
    deinit {
        self.unsubscribeFromDataStore()
    }
    
    func getAllPostsFromDataStore() {
        
        Amplify.DataStore.query(Post.self) { (result) in
            switch result {
            case .success(let posts):
                DispatchQueue.main.async {
                    print("Got \(posts.count) Posts from DataStore initially")
                    self.posts = posts
                }
            case .failure(let error):
                print("Error getting Posts from DataStore: \(error.localizedDescription)")
            }
        }
    }
    
    func addRandomPostToDataStore() {
        
        let post = Post.getRandomPost()
        self.addPostToArray(post)
        
        print("Fire: \(post.id)")
        
        Amplify.DataStore.save(post) {
            
            switch [=10=] {
            case .success(let post):
                print("Added post with id: \(post.id)")
            case .failure(let error):
                print("Error adding post with title: \(post.title) Error: \(error.localizedDescription)")
            }
        }
    }
    
    func deletePostFromDataStore(for indexSet: IndexSet) {
        
        let postsToDelete = indexSet.map { self.posts[[=10=]] }
        
        self.posts.remove(atOffsets: indexSet)
        
        for post in postsToDelete {
            
            Amplify.DataStore.delete(post) { (result) in
                switch result {
                case .success():
                    print("Deleted Post from DataStore")
                case .failure(let error):
                    print("Error deleting Post From DataStore: \(error)")
                }
            }
        }
    }
        
    func subscribeToDataStore() {
        
        postSubscription = Amplify.DataStore.publisher(for: Post.self)
            .sink(receiveCompletion: { (completion) in

                print("Completion!")

                if case .failure(let error) = completion {
                    print("Subscription received Error: \(error.localizedDescription)")
                }

            }, receiveValue: { (changes) in
                
                
                //print("Subscription received mutation: \(changes)")
                print("\n\n\n")
                print("\(try! changes.toJSON())")
                print("\n\n\n")
//                print("Changes!")
                let newPost = try! changes.decodeModel(as: Post.self)

                DispatchQueue.main.async {

                    switch changes.mutationType {
                    case "create":
//                        print("Create Subscription")
                        self.addPostToArray(newPost)
                        break
                    case "update":
                        print("Update Subscription")
                        self.updatePostInArray(newPost)
                        break
                    case "delete":
                        print("Delete Subscription")
                        self.deletePostFromArray(newPost)
                        break
                    default:
                        print("AnotherType?")
                        print(changes.mutationType)
                        break
                    }
                }
                print("\n")
            })
    }
    
    func unsubscribeFromDataStore() {
        postSubscription?.cancel()
    }
    
    private func addPostToArray(_ post: Post) {
        
        if !self.posts.contains(post) {
            self.posts.append(post)
        }
    }
    
    private func deletePostFromArray(_ post: Post) {
        
        if let index = self.posts.firstIndex(of: post) {
            self.posts.remove(at: index)
        }
    }
    
    private func updatePostInArray(_ post: Post) {
        print("update?")
    }
}

subscribeToDataStore 方法是通过 SwiftUI-View(在 onAppear 中)触发的,是这里的关键。如果我正在向客户端 A 添加一些帖子(缓慢地),那么客户端 B 会收到所有更改并将其添加到商店(我使用 SwiftUI View 将其可视化,但在本例中这无关紧要)。没有问题,一切都按预期工作。

但是

  1. 当我使用客户端 A 非常快速地添加新条目(帖子)时,B 未获取一些新条目。
  2. 当我在客户端 B 离线(在飞行模式下)时添加超过 1 个条目并且我关闭了飞行模式时,每个订阅只会获取最新的条目。

在这两种情况下,当我触发 getAllPostsFromDataStore 时,每个条目都会被提取,我们可以再次进行。

所以我的问题是:为什么订阅没有提取每一个条目?

执行场景 1 时有一个有趣的事实:每个订阅接收条目时的常见日志消息如下所示:

WebsocketDidReceiveMessage - {MyObject}

然后我在接收器中记录对象 'receiveValue' 再次完成:

{MyObject}

但是当快速(在短时间内)添加一些条目时,来自 Amplify 的日志消息有时看起来像这样:(这就是订阅获取中缺少第一个“对象”的地方):

WebsocketDidReceiveMessage - {MyObject}
WebsocketDidReceiveMessage - {MyObject2}

然后我在接收器中记录对象 'receiveValue' 再次完成,只有最后一个对象被记录:

{MyObject2}

似乎 {MyObject} 在快速添加多个条目时被取消或发生了什么事。如上所述,当 B 处于飞行模式或没有连接时,仅获取最后添加的条目。 (但我也只收到 WebsocketDidReceiveMessage 的最后一条日志消息)。

我是不是遗漏了什么,或者这是 Amplify-DataStore SDK 中的错误?

请原谅我的英语不好。我不是母语人士,有时很难确切地说出问题所在。如果您对此问题有任何疑问或需要更多信息,请发表评论,我将编辑我的问题,提供您需要的一切。

此致

对于同样遇到此问题的任何人:

我提交了一个问题 here and the problem seems to be fixed in version 1.3.1 with PR #756

玩得开心!