可以根据条件重试一次吗?

Possible to retry once based on a condition?

我有一个有效的方法,基本上是这样的:

public IObservable<List<Stuff>> GetGoodStuff()
{
    return Observable.FromAsync(GetAccessTokenAsync)
        .SelectMany(accessToken =>
        {
            return httpClient.SendAsync(request);
        })
        .SelectMany(response => 
        { 
            response.EnsureSuccessStatusCode(); 
            return response.Content.ReadAsStringAsync(); 
        })
        .Select(json => 
        {
            return JsonConvert.DeserializeObject<List<Stuff>>(json);
        });
}

"GetAccessTokenAsync"returns一个缓存的访问令牌为api,或者,第一次会去取一个令牌。其余部分在 httpclient 和 Rx 方面非常标准。

事情是这样的:我想捕获 401 错误,更新访问令牌,然后重试整个过程。但只有一次 - 之后它可以将异常抛出给调用者。

在那个中间块我可以这样做:

            if (response.StatusCode == HttpStatusCode.Unauthorized)
            {
                InvalidateAccessToken();
                // what now???
            }

然后呢?看不到递归调用是如何工作的。以某种方式包装整个东西?还没看到...

编辑 1 - 2015 年 11 月 7 日

这个日期的两个答案看起来都不错。更具声明性的方法似乎变化不大,并且能够隐藏大部分 "plumbing",但我无法让它在所有情况下都能正常工作。

因此,根据@Timothy Shields 的建议,我想到了这个,它读起来很好,并且很好地隐藏了管道(哦,是的,它有效:-)

/// <summary>
/// Makes an httpclient request using the access token. If Unauthorized is received the access
/// token will be reacquired and the request will be retried once.
/// </summary>
/// <returns>The json result from a successful request.</returns>
async Task<string> MakeRequestWithAccessToken(string requestUri, CancellationToken cancellationToken)
{
    const int RetryCount = 1;

    HttpResponseMessage response = null;
    for (int i = 0; i <= RetryCount; i++)
    {
        var accessToken = await GetAccessTokenAsync();

        var request = new HttpRequestMessage(HttpMethod.Get, requestUri);
        request.Headers.Add("Authorization", "Bearer " + accessToken);

        var client = new RemoteService(ApiUrl).NewClient();

        response = await client.SendAsync(request, cancellationToken);
        if (i < RetryCount && response.StatusCode == HttpStatusCode.Unauthorized)
        {
            InvalidateAccessToken();
            continue;
        }

        response.EnsureSuccessStatusCode();
    }

    return await response.Content.ReadAsStringAsync();
}

public IObservable<List<Stuff>> GetGoodStuff(int maxCount)
{
    return Observable.FromAsync(async cancellationToken =>
    {
        var requestUri = string.Format("mypath.json?count={0}", maxCount);
        var json = await MakeRequestWithAccessToken(requestUri, cancellationToken);
        return JsonConvert.DeserializeObject<List<Stuff>>(json);
    });
}

我假设 InvalidateAccessToken 也是异步的,就像 GetAccessTokenAsync 一样。

该解决方案将触发失效并通过抛出允许重试触发的异常继续。如果请求第二次失败,无效序列将只重播将冒泡到订阅者的异常。

public IObservable<List<Stuff>> GetGoodStuff()
{
    var invalidate = Observable.FromAsync(InvalidateAccessTokenAsync)
                .Select(x => Observable.Throw<string>(new Exception()))
                .Switch()
                .Replay()
                .RefCount();

    return Observable.FromAsync(GetAccessTokenAsync)
        .SelectMany(accessToken =>
        {
            return httpClient.SendAsync(request);
        })
        .SelectMany(response => 
        { 
            if (response.StatusCode == HttpStatusCode.Unauthorized)
            {
                return invalidate;
            }

            response.EnsureSuccessStatusCode(); 
            return response.Content.ReadAsStringAsync().ToObservable(); 
        })
        .Select(json => 
        {
            return JsonConvert.DeserializeObject<List<Stuff>>(json);
        })
        .Retry(1);
}

编辑:回答@supertopi 问题

无效序列 returns 和 IOberservable<IOberservable<string>> 中的 Select。我们只对内部序列感兴趣,所以我使用 Switch 运算符移至内部序列。

Replay 运算符 returns 和 IConnectableObservable<string> 将在订阅时重播来自其源的值。 IConnectableObservable<T> 本质上允许我们共享数据值,并且在重放这些值时,Observable.FromAsync(InvalidateAccessTokenAsync) 只会被调用一次。任何迟到的订阅者将只能看到重播的值,这将是一个例外。请查看 introintorx 上的 definition 以获得详尽的解释。

如果没有 RefCount 运算符,我将需要手动调用 IConnectableObservable 上的连接。我使用 RefCount 为我处理连接并将序列转换回 IObservable<string>。再次可以找到更多信息 here.

你应该利用 async-await 来做到这一点:

public IObservable<List<Stuff>> GetGoodStuff()
{
    return Observable.FromAsync(async cancellationToken =>
    {
        const int RetryCount = 1;
        for (int i = 0; i <= RetryCount; i++)
        {
            var accessToken = await GetAccessTokenAsync();
            var request = MakeRequest(accessToken);
            var response = await httpClient.SendAsync(request, cancellationToken);
            if (i < RetryCount && response.StatusCode == HttpStatusCode.Unauthorized)
            {
                InvalidateAccessToken();
                continue;
            }
            response.EnsureSuccessStatusCode();
            var json = await response.Content.ReadAsStringAsync(cancellationToken);
            return JsonConvert.DeserializeObject<List<Stuff>>(json);
        }
    });
}

此技术允许您编写标准的命令式代码,公开为漂亮 IObservable<T>

请注意,我只能猜测您的 "retry" 的外观。不清楚你调用InvalidateAccessToken()之后想要做什么,所以我猜测并发明了MakeRequest方法。您应该很容易将其改编为完全符合您要求的代码。