Rx(IObservable)を使った読み取り処理
この記事は、連載C#でCSVファイル操作ライブラリ の4回目の記事です。
今回は、Reactive Extensions(Rx)を使って、データを1行ずつ読み取る処理を実装してみます。
Reactive Extensions とは何ぞや?という方は以下のサイト、記事がとても参考になります。
Rxの準備
NuGetでRxのパッケージをインストールします。
PM> Install-Package Rx-Main
IObservable<T>
を生成する
1行ずつ非同期で読み取ったデータを逐次処理したい場合は、System.Reactive.Linq.Observable
クラスのCreate
静的メソッドを使用します。
このメソッドには多数のオーバーロードがありますが、今回は非同期とキャンセルに対応した以下のメソッドを使用します。(.Net 4.5以降)
public static IObservable<TResult> Create<TResult>( Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync);
subscribeAsync
は値を通知するためのIObserver
と処理のキャンセルを検知するためのSystem.Threading.CancellationToken
を引数に取り、Task
を返すデリケートです。- ここに、1レコードずつ読み取る処理と、その結果をObserverで発行する処理を記述します。
public class XsvReader : IDisposable { public IObservable<IList<string>> ReadXsvAsObservable(ICollection<string> delimiters) { return Observable.Create<IList<string>>(async (observer, ct) => { try { while (!EndOfData) { if (ct.IsCancellationRequested) { break; } var row = await ReadXsvLineAsync(delimiters).ConfigureAwait(false); observer.OnNext(row); } observer.OnCompleted(); } catch (Exception e) { observer.OnError(e); } }); } }
- リーダが末尾に達するまで
ReadXsvLineAsync
で1行(レコード)ずつ非同期に読み取ります。 - 読み取った値を
observer.OnNext
で発行します。 - 読み取りの完了は
observer.OnCompleted
、読み取り中に発生した例外はobserver.OnError
で通知します。 - 処理のキャンセルが要求されると、
ct.IsCancellationRequested
がtrue になるので、その時点でループを抜けて処理を中止します。
使ってみる
WPFで、読み込んだCSVファイルの内容をDataGridに表示してみます。 読み込むファイルは https://gist.github.com/pierre3/7d82699d82f814f018bb のModelShip.txt を使用します。
準備
View(.xaml)側は以下のように設定します。
- Windowにデータ表示用のDataGridと「Read」ボタンおよび「Cancel」ボタンを配置します。
- DataContext に CsvDataクラスを指定し、CsvData.Row をDataGridのItemSourceにバインドします。
バインド用のクラスを用意します。
- 1レコード分のデータの格納にModelShipクラスを用意します。
DataGrid
のAutoGenerateColumns
プロパティをTrue
に設定しておくと、自動的にModelShipクラスのプロパティ名がカラム名になり、プロパティの値がフィールドとして表示されます。 - DataGridにバインドする
CsvData
のRow
プロパティは、ObservableCollection<ModelShip>
とします。 - CsvDataには、Readボタンおよび、Cancelボタンに対応した、Read()メソッドおよびCancel()メソッドを用意します。
サンプルコード
読み込みメソッド
では、読み込み処理部分(CsvData.Read)を見てみましょう。
- まずは、StreamReaderからXsvReaderを生成します。ここで通常であれば、usingステートメントを使用するのですが、ここは非同期実行なので
using
で囲ってしまうと、ReadXsvAsObservable
を実行すると即座にusingのスコープを抜けてしまい、実際にReaderを使って読み取る段階では既にDispose済みのためにエラーとなってしまいます。 - ですので、ここではusingを使わず、
Subscribe()
に渡す、OnCompleted とOnError デリゲート内でreader.Dispose()
を呼ぶようにしています。 - OnNextでは、流れてくる行データからModelShipオブジェクトを生成して
CsvData.Row
プロパティに順次追加してゆきます。 - Subscribe()の戻り値は
IDisposable
を実装するオブジェクトになります。これのDispose()を実行することで処理のキャンセルができます。
実行結果
Readボタンをクリックすると、ファイルを読み込んだ結果がこんな感じで表示されます。
使用したサンプルファイルでは一瞬で読み込みが完了してしまうので確認できませんが、試しにReadXsvAsObservable
の読み取りループ内にawait Task.Delay(100);
などと待機する処理を入れると、DataGridに1行ずつデータが追加されていく様子が確認できます。
また、当然ながら、読み取ったレコードのフィルタリング等もLINQを使って簡単に処理できます。
以下は、
- 先頭に
;
がついているレコードはコメント行としてスキップした後 - データをModelShipオブジェクトに変換して
- 価格が2,500円以上の戦艦のみを表示する サンプルです。
disposable = reader.ReadXsvAsObservable(new[] { "," }) .Where(row => !row.First().StartsWith(";")) .Select(row => ModelShip.FromCsvRecord(header, row)) .Where(row => row.Price >= 2500) .Where(row => row.ClassOfShip == "戦艦") .ObserveOn(System.Threading.SynchronizationContext.Current) .Subscribe( row => { System.Diagnostics.Debug.WriteLine("OnNext"); this.Rows.Add(row); }, error => { System.Diagnostics.Debug.WriteLine("OnError:" + error); reader.Dispose(); }, () => { System.Diagnostics.Debug.WriteLine("OnCompleted"); reader.Dispose(); });
サンプルデータの#109 金剛と,#110 比叡をコメント行として実行してみます。
... 104,戦艦,扶桑 ふそう,1785,"1,700",A ;109,戦艦,金剛 こんごう,2625,"2,500",H ;110,戦艦,比叡 ひえい,2625,"2,500",H 111,戦艦,榛名 はるな,2625,"2,500",H ...
実行結果です。ちゃんとフィルターした結果が表示されています。