无法从弹性客户端搜索响应中获取 _source 字典键值

Unable to fetch _source dictionary key-val from elastic client search response

我正在尝试访问 hits _source 字典以加载到数据库中。 点击 returns null,我做错了什么?

备注: searchResponse 返回 JSON 数据,debuginformation 证实了这一点。

但是,Hit 和 _Source class 以及基础数据变量不可访问,变量 hits returns 为空。

调试模式下局部变量的以下代码显示了数据。 如果需要,我可以包含更多数据或局部变量的图像或调试信息 window 如果这有助于解决问题的范围。

提前致谢。

尝试使用 searchResponse.Documents 和 foreach 语句访问 _source 键值对以访问命中中的元素。但是无法访问 _source 键值对。

/*Declared classes in visual studio console application for c#:
.NET framework 4.5*/

class Program
{

    public class Doc
    {
        public int took { get; set; }
        public bool timed_out { get; set; }
        public _Shards _shards { get; set; }
        public Hits hits { get; set; }
    }

    public class _Shards
    {
        public int total { get; set; }
        public int successful { get; set; }
        public int skipped { get; set; }
        public int failed { get; set; }
    }

    public class Hits
    {
        public int total { get; set; }
        public float max_score { get; set; }
        public Hit[] hits { get; set; }
    }

    public class Hit
    {
        public string _index { get; set; }
        public string _type { get; set; }
        public string _id { get; set; }
        public float _score { get; set; }
        public _Source _source { get; set; }
    }

    public class _Source
    {
        public int duration { get; set; }
        public string group_id { get; set; }
        public DateTime var_time { get; set; }
        public string var_name { get; set; }
    }

    static void Main(string[] args)
    {
        var uri = new Uri("http://domain_name.val.url:9203/");
        var pool = new SingleNodeConnectionPool(uri);
        var connectionSettings = new ConnectionSettings(pool)
                                .DisableDirectStreaming();
        var resolver = new IndexNameResolver(connectionSettings);
        var client = new ElasticClient(connectionSettings);

        if (!client.IndexExists("test_index").Exists)
        {
            client.CreateIndex("test_index");
        }

        var searchResponse = client.Search<Doc>(s => s
        .Index("test_index")
        .AllTypes()
        .Size(1)
        .Query(q => q
        .MatchAll())
        .TypedKeys(null)
        .SearchType(Elasticsearch.Net.SearchType.DfsQueryThenFetch)
        .Scroll("30s")
      );
    MessageBox.Show("searchResponse.DebugInformation=" + searchResponse.DebugInformation);
    }
}





Elastic Search 示例 URL 数据:

{
  "took" : 12,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2700881,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "test_index",
        "_type" : "doc",
        "_id" : "R22224!!5333e7e4-9ee3-45f4-9dc3-2a8b8d8cdcf8",
        "_score" : 1.0,
        "_source" : {
          "duration" : 14986283,
          "group_id" : "com",
          "var_time" : "2018-04-24T17:05:13.082+02:00",
          "var_name" : "2",
        }
      }
    ]
  }
}



更新: 办公室内部有人建议使用以下代码解决方案,然后遍历键值对。

        var searchResponse = client.Search<Doc>(s => s
            .Index("test_index")
            .AllTypes()
            .Size(10)
            .Query(q => q
            .MatchAll())
            .TypedKeys(null)
            .SearchType(Elasticsearch.Net.SearchType.DfsQueryThenFetch)
            .Scroll("30s")
            .RequestConfiguration(r=>r
            .DisableDirectStreaming()
            )
            );
        var raw = Encoding.UTF8.GetString(searchResponse.ApiCall.ResponseBodyInBytes);  
        JavaScriptSerializer jss = new JavaScriptSerializer();
        jss.MaxJsonLength = Int32.MaxValue;
        var pairs = jss.Deserialize<Dictionary<string, dynamic>>(raw); 

看来你误会了客户的API;您不需要声明 _ShardsHitHits_Source 等。客户端负责反序列化 Elasticsearch API 的这些部分你.

您需要定义的唯一部分是一个 POCO,它将映射到响应中每个 "_source" 字段中的 JSON 对象,即

{
  "duration" : 14986283,
  "group_id" : "com",
  "var_time" : "2018-04-24T17:05:13.082+02:00",
  "var_name" : "2",
}

它看起来像 _Source POCO(尽管我倾向于给它起一个更有意义的名字!)。我们暂时称它为 MyDocument

MyDocument定义为

public class MyDocument
{
    [PropertyName("duration")]
    public int Duration { get; set; }

    [PropertyName("group_id")]
    public string GroupId { get; set; }

    [PropertyName("var_time")]
    public DateTime Time { get; set; }

    [PropertyName("var_name")]
    public string Name { get; set; }
}

一个简单的搜索是

var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));

var settings = new ConnectionSettings(pool)
    .DefaultMappingFor<MyDocument>(m => m
        .IndexName("test_index")
        .TypeName("doc")
    );

var client = new ElasticClient(settings);

var searchResponse = client.Search<MyDocument>();

// A collection of the top 10 matching documents
var documents = searchResponse.Documents;

只要文档的泛型类型为MyDocumentDefaultMappingFor<MyDocument>(...) 将使用索引名称"test_index" 和类型名称“doc”,并且它们未明确定义在请求中。

以上搜索生成了以下对 Elasticsearch 的查询

POST http://localhost:9200/test_index/doc/_search
{}

现在,您似乎想使用 Scroll API 到 return all 匹配文档。要使用 Scroll API 执行此操作,您将编写一个循环以在文档 returned

期间继续发出滚动请求
var searchResponse = client.Search<MyDocument>(s => s
    .Size(1000)
    .Scroll("30s")
);

while (searchResponse.Documents.Any())
{
    foreach (var document in searchResponse.Documents)
    {
        // do something with this set of 1000 documents
    }

    // make an additional request
    searchResponse = client.Scroll<MyDocument>("30s", searchResponse.ScrollId);
}

// clear scroll id at the end
var clearScrollResponse = client.ClearScroll(c => c.ScrollId(searchResponse.ScrollId));

有一个 ScrollAll 可观察的帮助程序,您可以使用它来简化编写和 parallelizes the operation using sliced_scroll。与上面相同的操作,但使用 ScrollAll

// set to number of shards in targeted indices
var numberOfSlices = 4;

var scrollAllObservable = client.ScrollAll<MyDocument>("30s", numberOfSlices);

Exception exception = null;
var manualResetEvent = new ManualResetEvent(false);

var scrollAllObserver = new ScrollAllObserver<MyDocument>(
    onNext: s => 
    {
        var documents = s.SearchResponse.Documents;

        foreach (var document in documents)
        {
            // do something with this set of documents
        }
    },
    onError: e =>
    {
        exception = e;
        manualResetEvent.Set();
    },
    onCompleted: () => manualResetEvent.Set()
);

scrollAllObservable.Subscribe(scrollAllObserver);

manualResetEvent.WaitOne();

if (exception != null)
    throw exception;

如果不需要对观察者的全部控制,可以使用简化版。有了这个,您确实需要为整个操作指定最大 运行 时间

var numberOfSlices = 4;

var scrollAllObservable = client.ScrollAll<MyDocument>("30s", numberOfSlices)
    .Wait(TimeSpan.FromHours(2), onNext: s =>
        {
           var documents = s.SearchResponse.Documents;

           foreach (var document in documents)
           {
                // do something with this set of documents
            }
        });