1 /** 2 This module implements a convenience wrapper API for influx. 3 4 Authors: Atila Neves (Kaleidic Associates Advisory Limited) 5 6 Generated documentation: 7 http://influxdb.code.kaleidic.io/influxdb.html 8 9 */ 10 11 module influxdb.api; 12 13 version(Test_InfluxD) 14 import unit_threaded; 15 else 16 struct Values { this(string[]...) { } } 17 18 static import influxdb.vibe; 19 import std.typecons: Flag, No; 20 import std.datetime: Date, DateTime, SysTime, UTC; 21 22 /++ 23 Params: 24 time = Influx-db time string 25 Returns: 26 SysTime 27 +/ 28 SysTime influxSysTime(string time) @safe 29 { 30 import std.datetime: SysTime, DateTimeException; 31 32 try { 33 return SysTime.fromISOExtString(time); 34 } catch(DateTimeException ex) { 35 // see https://issues.dlang.org/show_bug.cgi?id=16053 36 import std.stdio: stderr; 37 import std.algorithm: countUntil; 38 39 debug { 40 (() @trusted => stderr)() 41 .writeln("Could not convert ", time, " due to a Phobos bug, reducing precision"); 42 } 43 44 // find where the fractional part starts 45 auto dotIndex = time.countUntil("."); 46 if(dotIndex < 0) 47 dotIndex = time.countUntil(","); 48 if(dotIndex < 0) 49 throw ex; 50 51 52 const firstNonDigitIndex = time[dotIndex + 1 .. $].countUntil!(a => a < '0' || a > '9') + dotIndex + 1; 53 if(firstNonDigitIndex < 0) throw ex; 54 55 const lastDigitIndex = firstNonDigitIndex - 1; 56 57 foreach(i; 0 .. 4) { 58 // try to cut out a number from the fraction 59 // inaccurate but better than throwing an exception 60 const timeStr = 61 time[0 .. lastDigitIndex - i] ~ 62 time[firstNonDigitIndex .. $]; 63 try 64 return SysTime.fromISOExtString(timeStr); 65 catch(DateTimeException _) {} 66 } 67 68 throw ex; 69 } 70 } 71 72 /// 73 alias Database = DatabaseImpl!(influxdb.vibe.manage, influxdb.vibe.query, influxdb.vibe.write); 74 75 /** 76 Holds information about the database name and URL, forwards 77 it to the implemetation functions for managing, querying and 78 writing to the DB 79 */ 80 struct DatabaseImpl(alias manageFunc, alias queryFunc, alias writeFunc) { 81 82 import influxdb.api; 83 84 version(Have_mir_algorithm) 85 { 86 import mir.series: Series; 87 import mir.ndslice.slice: DeepElementType, Slice, SliceKind; 88 } 89 90 string url; // e.g. http://localhost:8086 91 string db; // e.g. mydb 92 93 @disable this(); 94 95 this(string url, string db) { 96 this.url = url; 97 this.db = db; 98 99 manage("CREATE DATABASE " ~ db); 100 } 101 102 /** 103 Sends management commands to the DB (CREATE, DROP). 104 The parameter must be the full command (e.g. "DROP DATABASE mydb") 105 */ 106 void manage(in string cmd) const { 107 manageFunc(url, cmd); 108 } 109 110 /** 111 Queries the DB. The query must be a full InfluxDB query 112 (e.g. "SELECT * FROM foo") 113 */ 114 Response query(in string query) @trusted const { // deserialize is @system 115 import asdf: deserialize; 116 return queryFunc(url, db, query).deserialize!Response; 117 } 118 119 /** 120 Insert data into the DB. 121 */ 122 void insert(in Measurement[] measurements) const { 123 import std.format: format; 124 125 if(measurements.length == 0) return; 126 127 static if (__VERSION__ >= 2074) 128 writeFunc(url, db, () @trusted { return format!"%(%s\n%)"(measurements); }()); 129 else 130 writeFunc(url, db, format("%(%s\n%)", measurements)); 131 } 132 133 /** 134 Insert data into the DB. 135 */ 136 void insert(in Measurement[] measurements...) const { 137 insert(measurements); 138 } 139 140 /** 141 Insert Mir times-series with single column into the DB. 142 Supported time types are `SysTime`, `DateTime`, `Date`, and `long`. 143 144 See also the example in the `mir-integration` folder. 145 146 Params: 147 measurementName = measurement name 148 columnName = column name 149 series1 = 1D time-series 150 commonTags = list of tags to add to each measurement (optional) 151 */ 152 version(Have_mir_algorithm) 153 void insert(TimeIterator, SliceKind kind, Iterator)( 154 string measurementName, 155 string columnName, 156 Series!(TimeIterator, Iterator, 1, kind) series1, 157 string[string] commonTags = null, 158 ) const 159 { 160 import mir.series: series; 161 import mir.ndslice.topology: repeat, unpack; 162 import mir.ndslice.dynamic: transposed; 163 164 return this.insert( 165 measurementName, 166 [columnName], 167 series1.time.series(series1.data.repeat(1).unpack.transposed), 168 commonTags, 169 ); 170 } 171 172 /** 173 Insert Mir times-series with multiple columns into the DB. 174 Supported time types are `SysTime`, `DateTime`, `Date`, and `long`. 175 176 See also the example in the `mir-integration` folder. 177 178 Params: 179 measurementName = measurement name 180 columnNames = array of column names 181 series = 2D time-series 182 commonTags = list of tags to add to each measurement (optional) 183 */ 184 version(Have_mir_algorithm) 185 void insert(TimeIterator, SliceKind kind, Iterator)( 186 string measurementName, 187 in string[] columnNames, 188 Series!(TimeIterator, Iterator, 2, kind) series, 189 string[string] commonTags = null, 190 ) const 191 { 192 alias Time = typeof(series.front.time); 193 alias Data = DeepElementType!(typeof(series.front.data)); 194 195 import std.traits: isSomeString; 196 import std.exception: assumeUnique, enforce; 197 import std.array: appender; 198 import std.format: FormatSpec, formatValue; 199 200 enforce(measurementName.length); 201 enforce(columnNames.length == series.length!1, "columnNames.length should be equal to series.length!1"); 202 203 if (series.length == 0) 204 { 205 return; 206 } 207 208 FormatSpec!char fmt; 209 auto app = appender!(const(char)[]); 210 211 // name output 212 app.put(measurementName); 213 // tags output 214 if (commonTags.length) 215 { 216 app.put(","); 217 aaFormat(&app.put!(const(char)[]), commonTags); 218 } 219 app.put(" "); 220 // name + tags 221 auto head = app.data; 222 app = appender!(const(char)[]); 223 foreach (i; 0 .. series.length) 224 { 225 auto observation = series[i]; 226 if (i) 227 { 228 app.put("\n"); 229 } 230 231 app.put(head); 232 233 // values output 234 foreach(j, key; columnNames) 235 { 236 auto value = observation.data[j]; 237 if (j) 238 app.put(","); 239 app.formatValue(key, fmt); 240 app.put("="); 241 static if(isSomeString!Data) 242 { 243 app.put(`"`); 244 app.formatValue(value, fmt); 245 app.put(`"`); 246 } 247 else 248 { 249 app.formatValue(value, fmt); 250 } 251 } 252 253 // time output 254 static if (is(Time : long)) 255 { 256 long timestamp = observation.time; 257 } 258 else 259 static if (is(Time : SysTime)) 260 { 261 long timestamp = observation.time.toUnixTime!long * 1_000_000_000 + observation.time.fracSecs.total!"nsecs"; 262 } 263 else 264 static if (is(Time : DateTime) || is(Time : Date)) 265 { 266 long timestamp = SysTime(observation.time, UTC()).toUnixTime!long * 1_000_000_000; 267 } 268 else 269 { 270 static assert(0, "Unsupported timestamp type: " ~ Time.stringof); 271 } 272 if (timestamp != 0) 273 { 274 app.put(" "); 275 app.formatValue(timestamp, fmt); 276 } 277 } 278 writeFunc(url, db, app.data.assumeUnique); 279 } 280 281 /** 282 Delete this DB 283 */ 284 void drop() const { 285 manage("DROP DATABASE " ~ db); 286 } 287 } 288 289 /// 290 @("Database") 291 @safe unittest { // not pure because of asdf.deserialize 292 293 string[string][] manages; 294 string[string][] queries; 295 string[string][] writes; 296 297 alias TestDatabase = DatabaseImpl!( 298 (url, cmd) => manages ~= ["url": url, "cmd": cmd], // manage 299 (url, db, query) { // query 300 queries ~= ["url": url, "db": db, "query": query]; 301 return 302 `{ 303 "results": [{ 304 "series": [{ 305 "columns": ["time", "othervalue", "tag1", "tag2", "value"], 306 "name": "lename", 307 "values": [ 308 ["2015-06-11T20:46:02Z", 4, "toto", "titi", 2], 309 ["2017-03-14T23:15:01.06282785Z", 3, "letag", "othertag", 1] 310 ] 311 }], 312 "statement_id": 33 313 }] 314 }`; 315 }, 316 (url, db, line) => writes ~= ["url": url, "db": db, "line": line] 317 ); 318 319 manages.shouldBeEmpty; 320 const database = TestDatabase("http://db.com", "testdb"); 321 manages.shouldEqual([["url": "http://db.com", "cmd": "CREATE DATABASE testdb"]]); 322 323 writes.shouldBeEmpty; 324 database.insert(Measurement("cpu", ["tag1": "foo"], ["temperature": "42"])); 325 writes.shouldEqual([["url": "http://db.com", "db": "testdb", 326 "line": "cpu,tag1=foo temperature=42"]]); 327 328 queries.shouldBeEmpty; 329 const response = database.query("SELECT * from foo"); 330 queries.shouldEqual([["url": "http://db.com", "db": "testdb", "query": "SELECT * from foo"]]); 331 332 response.results.length.shouldEqual(1); 333 response.results[0].statement_id.shouldEqual(33); 334 response.results[0].series.length.shouldEqual(1); 335 const series = response.results[0].series[0]; 336 series.shouldEqual( 337 MeasurementSeries( 338 "lename", //name 339 ["time", "othervalue", "tag1", "tag2", "value"], //columns 340 //values 341 [ 342 ["2015-06-11T20:46:02Z", "4", "toto", "titi", "2"], 343 ["2017-03-14T23:15:01.06282785Z", "3", "letag", "othertag", "1"], 344 ] 345 ) 346 ); 347 } 348 349 /// 350 @("insert") 351 @safe unittest { 352 353 string[] lines; 354 355 alias TestDatabase = DatabaseImpl!( 356 (url, cmd) { }, // manage 357 (url, db, query) => `{}`, // query 358 (url, db, line) => lines ~= line // write 359 ); 360 361 const database = TestDatabase("http://db.com", "testdb"); 362 database.insert( 363 Measurement("cpu", ["index": "1"], ["temperature": "42"]), 364 Measurement("cpu", ["index": "2"], ["temperature": "42"]), 365 Measurement("cpu", ["index": "2"], ["temperature": "42"]), 366 ); 367 368 () @trusted { 369 lines.shouldEqual( 370 [ 371 "cpu,index=1 temperature=42\ncpu,index=2 temperature=42\ncpu,index=2 temperature=42", 372 ] 373 ); 374 }(); 375 } 376 377 378 @("insert with no measurements does nothing") 379 @safe unittest { 380 381 string[] lines; 382 383 alias TestDatabase = DatabaseImpl!( 384 (url, cmd) { }, // manage 385 (url, db, query) => `{}`, // query 386 (url, db, line) => lines ~= line // write 387 ); 388 389 const database = TestDatabase("http://db.com", "testdb"); 390 Measurement[] measurements; 391 database.insert(measurements); 392 lines.shouldBeEmpty; 393 } 394 395 396 /** 397 An InfluxDB measurement 398 */ 399 struct Measurement { 400 401 import std.datetime: SysTime; 402 import std.traits : Unqual; 403 404 string name; 405 string[string] tags; 406 InfluxValue[string] fields; 407 long timestamp; 408 409 this(T) 410 (string name, 411 T[string] fields, 412 SysTime time = SysTime.fromUnixTime(0)) 413 @safe 414 if (is(Unqual!T == string) || is(Unqual!T == InfluxValue)) { 415 string[string] tags; 416 this(name, tags, fields, time); 417 } 418 419 this(T)(string name, 420 string[string] tags, 421 T[string] fields, 422 SysTime time = SysTime.fromUnixTime(0)) 423 @safe // impure due to SysTime.fracSecs 424 if (is(Unqual!T == string) || is(Unqual!T == InfluxValue)) { 425 426 import std.datetime: nsecs; 427 428 this.name = name; 429 this.tags = tags; 430 431 static if (is(Unqual!T == string)) { 432 import std.typecons : Nullable; 433 InfluxValue[string] ifields; 434 () @trusted { 435 foreach(element; fields.byKeyValue) { 436 ifields[element.key] = InfluxValue(element.value, Nullable!(InfluxValue.Type).init); 437 } 438 }(); 439 this.fields = ifields; 440 } 441 else this.fields = fields; 442 // InfluxDB uses UNIX time in _nanoseconds_ 443 // stdTime is in hnsecs 444 //this.timestamp = time.stdTime / 100; 445 this.timestamp = (time.toUnixTime!long * 1_000_000_000 + time.fracSecs.total!"nsecs"); 446 } 447 448 void toString(Dg)(scope Dg dg) const { 449 450 import std.format: FormatSpec, formatValue; 451 import std.typecons: Yes; 452 453 FormatSpec!char fmt; 454 dg.escape(`, `).formatValue(name, fmt); 455 if (tags.length) { 456 dg.formatValue(',', fmt); 457 dg.aaFormat(tags); 458 } 459 dg.formatValue(' ', fmt); 460 dg.aaFormat(fields, Yes.quoteStrings); 461 if(timestamp != 0) { 462 dg.formatValue(' ', fmt); 463 dg.formatValue(timestamp, fmt); 464 } 465 } 466 467 static if (__VERSION__ < 2072) { 468 string toString() @safe const { 469 import std.array : appender; 470 auto res = appender!string; 471 toString(res); 472 return res.data; 473 } 474 } 475 else { 476 deprecated("Use std.conv.to!string instead.") 477 string toString()() @safe const { 478 import std.conv: to; 479 return this.to!string; 480 } 481 } 482 } 483 484 private void aaFormat(Dg, T : K[V], K, V) 485 (scope Dg dg, scope T aa, in Flag!"quoteStrings" quoteStrings = No.quoteStrings) 486 { 487 import std.format: FormatSpec, formatValue; 488 size_t i; 489 FormatSpec!char fmt; 490 foreach(key, value; aa) 491 { 492 if (i++) 493 dg.formatValue(',', fmt); 494 dg.escape(`,= `).formatValue(key, fmt); 495 dg.formatValue('=', fmt); 496 if(quoteStrings && valueIsString(value)) { 497 dg.formatValue('"', fmt); 498 dg.escape(`"\`).formatValue(value, fmt); 499 dg.formatValue('"', fmt); 500 } else 501 dg.escape(`, `).formatValue(value, fmt); 502 } 503 } 504 505 private auto escape(Dg)(scope Dg dg, in char[] chars...) { 506 507 struct Escaper(Dg) { 508 void put(T)(T val) { 509 import std.algorithm : canFind; 510 import std.format: FormatSpec, formatValue; 511 512 FormatSpec!char fmt; 513 foreach (c; val) { 514 if (chars.canFind(c)) 515 dg.formatValue('\\', fmt); 516 dg.formatValue(c, fmt); 517 } 518 } 519 520 static if (__VERSION__ < 2072) { 521 void putChar(char c) { 522 import std.algorithm : canFind; 523 import std.format : formattedWrite; 524 525 if (chars.canFind(c)) 526 dg.formattedWrite("%s", '\\'); 527 dg.formattedWrite("%s", c); 528 } 529 } 530 } 531 532 return Escaper!Dg(); 533 } 534 535 private auto valueIsString(T)(in T value) { 536 static if (is(T == string)) return true; 537 else static if (is(T == InfluxValue)) return value.type == InfluxValue.Type.string_; 538 else static assert(0, format!"Unexpected value type %s"(typeid(T))); 539 } 540 541 private auto guessValueType(string value) @safe pure nothrow @nogc { 542 543 import std.algorithm: all, canFind; 544 import std.string : representation; 545 546 static immutable boolValues = ["t", "T", "true", "True", "TRUE", "f", "F", "false", "False", "FALSE"]; 547 548 // test for bool values 549 if(boolValues.canFind(value)) return InfluxValue.Type.bool_; 550 551 // test for int values 552 if(value.length > 0 && value[$ - 1] == 'i') { 553 auto tmp = value[0..$-1]; 554 if (tmp[0] == '-' && tmp.length > 1) tmp = tmp[1..$]; 555 if (tmp.representation.all!(a => a >= '0' && a <= '9')) return InfluxValue.Type.int_; 556 } 557 558 // test for float values 559 if (valueIsFloat(value)) return InfluxValue.Type.float_; 560 561 return InfluxValue.Type.string_; 562 } 563 564 private bool valueIsFloat(in string value) @safe pure nothrow @nogc { 565 566 if (!value.length) return false; 567 568 int dotidx = -1; 569 int eidx = -1; 570 foreach(i, c; value) { 571 if (c == '+' || c == '-') { 572 if (i != 0 && (eidx < 0 || (eidx >= 0 && i-1 != eidx))) return false; 573 } 574 else if (c == '.') { 575 if (dotidx >= 0 || eidx > 0) return false; 576 dotidx = cast(int)i; 577 } 578 else if (c == 'e' || c == 'E') { 579 if (i == 0 || eidx > 0 || i+1 == value.length) return false; 580 eidx = cast(int)i; 581 } 582 else if (c < '0' || c > '9') return false; 583 } 584 return true; 585 } 586 587 /// 588 @("valueIsFloat") 589 @safe unittest { 590 // valid 591 "123".valueIsFloat.shouldBeTrue; 592 "-123".valueIsFloat.shouldBeTrue; 593 "1e1".valueIsFloat.shouldBeTrue; 594 "+1.e1".valueIsFloat.shouldBeTrue; 595 ".1e1".valueIsFloat.shouldBeTrue; 596 "1e+1".valueIsFloat.shouldBeTrue; 597 "1.E-1".valueIsFloat.shouldBeTrue; 598 "1.2".valueIsFloat.shouldBeTrue; 599 "-1.3e+10".valueIsFloat.shouldBeTrue; 600 "+1.".valueIsFloat.shouldBeTrue; 601 602 // invalid 603 "1a".valueIsFloat.shouldBeFalse; 604 "1e.1".valueIsFloat.shouldBeFalse; 605 "e1".valueIsFloat.shouldBeFalse; 606 "".valueIsFloat.shouldBeFalse; 607 "1eE1".valueIsFloat.shouldBeFalse; 608 "1..0".valueIsFloat.shouldBeFalse; 609 "1.e.1".valueIsFloat.shouldBeFalse; 610 "1e12.3".valueIsFloat.shouldBeFalse; 611 "1e1+1".valueIsFloat.shouldBeFalse; 612 "ee".valueIsFloat.shouldBeFalse; 613 "1ee1".valueIsFloat.shouldBeFalse; 614 "++1".valueIsFloat.shouldBeFalse; 615 "1+1".valueIsFloat.shouldBeFalse; 616 "1.1.1".valueIsFloat.shouldBeFalse; 617 "1+".valueIsFloat.shouldBeFalse; 618 "1ě+1".valueIsFloat.shouldBeFalse; 619 } 620 621 /// 622 @("Measurement.to!string no timestamp") 623 @safe unittest { 624 import std.conv: to; 625 { 626 auto m = Measurement("cpu", 627 ["tag1": "toto", "tag2": "foo"], 628 ["load": "42", "temperature": "53"]); 629 m.to!string.shouldEqualLine("cpu,tag1=toto,tag2=foo load=42,temperature=53"); 630 } 631 { 632 auto m = Measurement("thingie", 633 ["foo": "bar"], 634 ["value": "7"]); 635 m.to!string.shouldEqualLine("thingie,foo=bar value=7"); 636 } 637 } 638 639 /// 640 @("Measurement.to!string no timestamp no tags") 641 @safe unittest { 642 import std.conv: to; 643 auto m = Measurement("cpu", 644 ["load": "42", "temperature": "53"]); 645 m.to!string.shouldEqualLine("cpu load=42,temperature=53"); 646 } 647 648 /// 649 @("Measurement.to!string with timestamp") 650 @safe unittest { 651 import std.conv: to; 652 import std.datetime: SysTime; 653 654 auto m = Measurement("cpu", 655 ["tag1": "toto", "tag2": "foo"], 656 ["load": "42", "temperature": "53"], 657 SysTime.fromUnixTime(7)); 658 m.to!string.shouldEqualLine("cpu,tag1=toto,tag2=foo load=42,temperature=53 7000000000"); 659 } 660 661 /// 662 @("Measurement.to!string with timestamp no tags") 663 @safe unittest { 664 import std.conv: to; 665 import std.datetime: SysTime; 666 667 auto m = Measurement("cpu", 668 ["load": "42", "temperature": "53"], 669 SysTime.fromUnixTime(7)); 670 m.to!string.shouldEqualLine("cpu load=42,temperature=53 7000000000"); 671 } 672 673 @("Measurement fraction of a second") 674 @safe unittest { 675 import std.conv: to; 676 import std.datetime: DateTime, SysTime, Duration, usecs, nsecs, UTC; 677 auto m = Measurement("cpu", 678 ["load": "42", "temperature": "53"], 679 SysTime(DateTime(2017, 2, 1), 300.usecs + 700.nsecs, UTC())); 680 m.to!string.shouldEqualLine("cpu load=42,temperature=53 1485907200000300700"); 681 } 682 683 @("Measurement.to!string with string") 684 @safe unittest { 685 import std.conv: to; 686 import std.datetime: SysTime; 687 688 auto m = Measurement("cpu", 689 ["foo": "bar"], 690 SysTime.fromUnixTime(7)); 691 m.to!string.shouldEqualLine(`cpu foo="bar" 7000000000`); 692 } 693 694 static foreach(value; ["t", "T", "true", "True", "TRUE", "f", "F", "false", "False", "FALSE"]) { 695 @("Measurement.to!string with bool." ~ value) 696 @safe unittest { 697 import std.conv: to; 698 import std.datetime: SysTime; 699 700 auto m = Measurement("cpu", 701 ["foo": value], 702 SysTime.fromUnixTime(7)); 703 m.to!string.shouldEqualLine(`cpu foo=` ~ value ~ ` 7000000000`); 704 } 705 } 706 707 @("Measurement.to!string with int") 708 @safe unittest { 709 import std.conv: to; 710 import std.datetime: SysTime; 711 712 auto m = Measurement("cpu", 713 ["foo": "16i", "bar": "-1i", "str": "-i"], 714 SysTime.fromUnixTime(7)); 715 m.to!string.shouldEqualLine(`cpu foo=16i,bar=-1i,str="-i" 7000000000`); 716 } 717 718 @("Measurement.to!string with special characters") 719 @safe unittest { 720 import std.conv: to; 721 722 auto m = Measurement(`cpu "load", test`, 723 ["tag 1": `to"to`, "tag,2": "foo"], 724 ["foo,= ": "a,b", "b,a=r": `a " b`]); 725 m.to!string.shouldEqualLine(`cpu\ "load"\,\ test,tag\ 1=to"to,tag\,2=foo foo\,\=\ ="a,b",b\,a\=r="a \" b"`); 726 } 727 728 729 /** 730 A sum type of values that can be stored in influxdb 731 */ 732 struct InfluxValue { 733 734 import std.typecons : Nullable; 735 736 enum Type { bool_, int_, float_, string_ } 737 738 union Payload { 739 bool b; 740 long i; 741 double f; 742 } 743 744 private { 745 Payload _value; 746 string _rawString = null; 747 Type _type; 748 } 749 750 this(bool v) @safe pure nothrow { 751 _value.b = v; 752 _type = InfluxValue.Type.bool_; 753 } 754 755 this(T)(T v) @safe pure nothrow 756 if (is(T == int) || is(T == long)) { 757 _value.i = v; 758 _type = InfluxValue.Type.int_; 759 } 760 761 this(T)(T v) @safe pure nothrow 762 if (is(T == float) || is(T == double)) { 763 _value.f = v; 764 _type = InfluxValue.Type.float_; 765 } 766 767 this(string v, Nullable!Type type = Nullable!Type(Type.string_)) @safe pure nothrow 768 in (v !is null) { 769 _rawString = v; 770 if (type.isNull) _type = guessValueType(v); 771 else _type = type.get; 772 } 773 774 auto type() const @safe pure nothrow { 775 return _type; 776 } 777 778 void toString(Dg)(scope Dg dg) const { 779 780 import std.format: FormatSpec, formattedWrite, formatValue; 781 782 FormatSpec!char fmt; 783 if (_rawString !is null) { 784 if (_type == Type.int_ && _rawString[$-1] != 'i') dg.formattedWrite("%si", _rawString, fmt); 785 else dg.formatValue(_rawString, fmt); 786 } 787 else { 788 final switch (_type) with (Type) { 789 case bool_: dg.formatValue(_value.b, fmt); break; 790 case int_: dg.formattedWrite("%si", _value.i, fmt); break; 791 case float_: dg.formatValue(_value.f, fmt); break; 792 case string_: assert(0); 793 } 794 } 795 } 796 } 797 798 @("Measurement.to!string InfluxValue int") 799 @safe unittest { 800 import std.conv: to; 801 import std.datetime: SysTime; 802 803 const m = Measurement("cpu", 804 ["foo": InfluxValue(16)], 805 SysTime.fromUnixTime(7)); 806 () @trusted { return m.to!string; }().shouldEqualLine(`cpu foo=16i 7000000000`); 807 } 808 809 @("Measurement.to!string InfluxValue long") 810 @safe unittest { 811 import std.conv: to; 812 import std.datetime: SysTime; 813 814 const m = Measurement("cpu", 815 ["foo": InfluxValue(16L)], 816 SysTime.fromUnixTime(7)); 817 () @trusted { return m.to!string; }().shouldEqualLine(`cpu foo=16i 7000000000`); 818 } 819 820 @("Measurement.to!string InfluxValue float") 821 @safe unittest { 822 import std.conv: to; 823 import std.datetime: SysTime; 824 825 Measurement("cpu", 826 ["foo": InfluxValue(16.0f)], 827 SysTime.fromUnixTime(7)) 828 .to!string.shouldEqualLine(`cpu foo=16 7000000000`); 829 830 Measurement("cpu", 831 ["foo": InfluxValue(16.1)], 832 SysTime.fromUnixTime(7)) 833 .to!string.shouldEqualLine(`cpu foo=16.1 7000000000`); 834 } 835 836 @("Measurement.to!string InfluxValue boolean") 837 @safe unittest { 838 import std.conv: to; 839 import std.datetime: SysTime; 840 841 Measurement("cpu", 842 ["foo": InfluxValue(true)], 843 SysTime.fromUnixTime(7)) 844 .to!string.shouldEqualLine(`cpu foo=true 7000000000`); 845 } 846 847 @("Measurement.to!string InfluxValue string") 848 @safe unittest { 849 import std.conv: to; 850 import std.datetime: SysTime; 851 852 Measurement("cpu", 853 ["foo": InfluxValue("bar")], 854 SysTime.fromUnixTime(7)) 855 .to!string.shouldEqualLine(`cpu foo="bar" 7000000000`); 856 } 857 858 @("Measurement.to!string InfluxValue empty string") 859 @safe unittest { 860 import std.conv: to; 861 import std.datetime: SysTime; 862 863 Measurement("cpu", 864 ["foo": InfluxValue("")], 865 SysTime.fromUnixTime(7)) 866 .to!string.shouldEqualLine(`cpu foo="" 7000000000`); 867 } 868 869 @("Measurement.to!string InfluxValue string escaping") 870 @safe unittest { 871 import std.conv: to; 872 import std.datetime: SysTime; 873 874 Measurement("cpu", 875 ["foo": InfluxValue(`{"msg":"\"test\""}`)], 876 SysTime.fromUnixTime(7)) 877 .to!string.shouldEqualLine(`cpu foo="{\"msg\":\"\\\"test\\\"\"}" 7000000000`); 878 } 879 880 @("Measurement.to!string InfluxValue string with specified bool value") 881 @safe unittest { 882 import std.conv: to; 883 import std.datetime: SysTime; 884 import std.typecons : Nullable; 885 886 Measurement("cpu", 887 ["foo": InfluxValue("true", Nullable!(InfluxValue.Type)(InfluxValue.Type.bool_))], 888 SysTime.fromUnixTime(7)) 889 .to!string.shouldEqualLine(`cpu foo=true 7000000000`); 890 } 891 892 @("Measurement.to!string InfluxValue string with specified int value") 893 @safe unittest { 894 import std.conv: to; 895 import std.datetime: SysTime; 896 import std.typecons : Nullable; 897 898 Measurement("cpu", 899 ["foo": InfluxValue("42", Nullable!(InfluxValue.Type)(InfluxValue.Type.int_))], 900 SysTime.fromUnixTime(7)) 901 .to!string.shouldEqualLine(`cpu foo=42i 7000000000`); 902 } 903 904 @("Measurement.to!string InfluxValue string with specified postfixed int value") 905 @safe unittest { 906 import std.conv: to; 907 import std.datetime: SysTime; 908 import std.typecons : Nullable; 909 910 Measurement("cpu", 911 ["foo": InfluxValue("42i", Nullable!(InfluxValue.Type)(InfluxValue.Type.int_))], 912 SysTime.fromUnixTime(7)) 913 .to!string.shouldEqualLine(`cpu foo=42i 7000000000`); 914 } 915 916 @("Measurement.to!string InfluxValue string with specified float value") 917 @safe unittest { 918 import std.conv: to; 919 import std.datetime: SysTime; 920 import std.typecons : Nullable; 921 922 Measurement("cpu", 923 ["foo": InfluxValue("1.2", Nullable!(InfluxValue.Type)(InfluxValue.Type.float_))], 924 SysTime.fromUnixTime(7)) 925 .to!string.shouldEqualLine(`cpu foo=1.2 7000000000`); 926 } 927 928 @("Measurement.to!string InfluxValue string with float value") 929 @safe unittest { 930 import std.conv: to; 931 import std.datetime: SysTime; 932 933 Measurement("cpu", 934 ["foo": InfluxValue("5E57758")], 935 SysTime.fromUnixTime(7)) 936 .to!string.shouldEqualLine(`cpu foo="5E57758" 7000000000`); 937 } 938 939 @("Measurement.to!string InfluxValue string with guessed bool value") 940 @safe unittest { 941 import std.conv: to; 942 import std.datetime: SysTime; 943 import std.typecons : Nullable; 944 945 Measurement("cpu", 946 ["foo": InfluxValue("true", Nullable!(InfluxValue.Type).init)], 947 SysTime.fromUnixTime(7)) 948 .to!string.shouldEqualLine(`cpu foo=true 7000000000`); 949 } 950 951 @("Measurement.to!string InfluxValue string with guessed int value") 952 @safe unittest { 953 import std.conv: to; 954 import std.datetime: SysTime; 955 import std.typecons : Nullable; 956 957 Measurement("cpu", 958 ["foo": InfluxValue("42i", Nullable!(InfluxValue.Type).init)], 959 SysTime.fromUnixTime(7)) 960 .to!string.shouldEqualLine(`cpu foo=42i 7000000000`); 961 } 962 963 @("Measurement.to!string InfluxValue string with guessed float value") 964 @safe unittest { 965 import std.conv: to; 966 import std.datetime: SysTime; 967 import std.typecons : Nullable; 968 969 Measurement("cpu", 970 ["foo": InfluxValue("1.2", Nullable!(InfluxValue.Type).init)], 971 SysTime.fromUnixTime(7)) 972 .to!string.shouldEqualLine(`cpu foo=1.2 7000000000`); 973 } 974 975 @("Measurement.to!string InfluxValue guessed string value") 976 @safe unittest { 977 import std.conv: to; 978 import std.datetime: SysTime; 979 980 Measurement("cpu", 981 ["foo": InfluxValue("bar")], 982 SysTime.fromUnixTime(7)) 983 .to!string.shouldEqualLine(`cpu foo="bar" 7000000000`); 984 } 985 986 /** 987 A query response 988 */ 989 struct Response { 990 Result[] results; 991 } 992 993 /** 994 A result of a query 995 */ 996 struct Result { 997 MeasurementSeries[] series; 998 int statement_id; 999 } 1000 1001 /** 1002 Data for one measurement 1003 */ 1004 struct MeasurementSeries { 1005 1006 static if (__traits(compiles, { import asdf.serialization: serdeIgnoreIn; })) 1007 import asdf.serialization: serdeIgnoreIn; 1008 else 1009 import asdf.serialization: serdeIgnoreIn = serializationIgnoreIn; 1010 1011 import asdf: Asdf; 1012 1013 string name; 1014 string[] columns; 1015 @serdeIgnoreIn string[][] values; 1016 1017 static struct Rows { 1018 import std.range: Transversal, TransverseOptions; 1019 1020 const string[] columns; 1021 const(string[])[] rows; 1022 1023 static struct Row { 1024 1025 import std.datetime: SysTime; 1026 1027 const string[] columnNames; 1028 const string[] columnValues; 1029 1030 string opIndex(in string key) @safe pure const { 1031 import std.algorithm: countUntil; 1032 return columnValues[columnNames.countUntil(key)]; 1033 } 1034 1035 string get(string key, string defaultValue) @safe pure const { 1036 import std.algorithm: countUntil; 1037 auto i = columnNames.countUntil(key); 1038 return (i==-1) ? defaultValue : columnValues[i]; 1039 } 1040 1041 SysTime time() @safe const { 1042 return influxSysTime(this["time"]); 1043 } 1044 1045 void toString(Dg)(scope Dg dg) const { 1046 dg("Row("); 1047 foreach(i, value; columnValues) { 1048 if (i) 1049 dg(", "); 1050 dg(columnNames[i]); 1051 dg(": "); 1052 dg(value); 1053 } 1054 dg(")"); 1055 } 1056 1057 deprecated("Use std.conv.to!string instead.") 1058 string toString()() { 1059 import std.conv: to; 1060 return this.to!string; 1061 } 1062 } 1063 1064 Row opIndex(in size_t i) @safe pure const nothrow { 1065 return Row(columns, rows[i]); 1066 } 1067 1068 /++ 1069 Params: 1070 key = column name 1071 Returns: 1072 Column as range access range of strings. 1073 Throws: 1074 Exception, if key was not found. 1075 +/ 1076 Transversal!(const(string[])[], TransverseOptions.assumeNotJagged) 1077 opIndex(in string key) @safe pure const { 1078 import std.algorithm: countUntil; 1079 size_t idx = columns.countUntil(key); 1080 if (idx >= columns.length) 1081 throw new Exception("Unknown key " ~ key); 1082 return typeof(return)(rows, idx); 1083 } 1084 1085 size_t length() @safe pure const nothrow { return rows.length; } 1086 1087 void popFront() @safe pure nothrow { 1088 rows = rows[1 .. $]; 1089 } 1090 1091 Row front() @safe pure nothrow { 1092 return this[0]; 1093 } 1094 1095 bool empty() @safe pure nothrow const { 1096 return rows.length == 0; 1097 } 1098 } 1099 1100 inout(Rows) rows() @safe pure nothrow inout { 1101 return inout(Rows)(columns, values); 1102 } 1103 1104 void finalizeDeserialization(Asdf data) { 1105 import std.algorithm: map, count; 1106 import std.array: uninitializedArray; 1107 auto rows = data["values"].byElement.map!"a.byElement"; 1108 // count is fast for Asdf 1109 values = uninitializedArray!(string[][])(rows.count, columns.length); 1110 foreach(value; values) 1111 { 1112 auto row = rows.front; 1113 assert(row.count == columns.length); 1114 foreach (ref e; value) 1115 { 1116 // do not allocates data here because of `const`, 1117 // reuses Asdf data 1118 e = cast(string) cast(const(char)[]) row.front; 1119 row.popFront; 1120 } 1121 rows.popFront; 1122 } 1123 } 1124 } 1125 1126 /// 1127 @("MeasurementSeries") 1128 @safe unittest { 1129 1130 import std.datetime: SysTime, DateTime, UTC; 1131 import std.array: array; 1132 1133 auto series = MeasurementSeries("coolness", 1134 ["time", "foo", "bar"], 1135 [ 1136 ["2015-06-11T20:46:02Z", "red", "blue"], 1137 ["2013-02-09T12:34:56Z", "green", "yellow"], 1138 ]); 1139 1140 series.rows[0]["foo"].shouldEqual("red"); 1141 series.rows[0]["time"].shouldEqual("2015-06-11T20:46:02Z"); 1142 series.rows[0].time.shouldEqual(SysTime(DateTime(2015, 06, 11, 20, 46, 2), UTC())); 1143 1144 series.rows[1]["bar"].shouldEqual("yellow"); 1145 series.rows[1]["time"].shouldEqual("2013-02-09T12:34:56Z"); 1146 series.rows[1].time.shouldEqual(SysTime(DateTime(2013, 2, 9, 12, 34, 56), UTC())); 1147 1148 series.rows["time"][0].shouldEqual("2015-06-11T20:46:02Z"); 1149 series.rows["bar"][1].shouldEqual("yellow"); 1150 1151 series.rows.array.shouldEqual( 1152 [ 1153 MeasurementSeries.Rows.Row(["time", "foo", "bar"], 1154 ["2015-06-11T20:46:02Z", "red", "blue"], 1155 ), 1156 MeasurementSeries.Rows.Row(["time", "foo", "bar"], 1157 ["2013-02-09T12:34:56Z", "green", "yellow"], 1158 ), 1159 ] 1160 ); 1161 } 1162 1163 /// 1164 @("MeasurementSeries.get") 1165 @safe pure unittest { 1166 auto series = MeasurementSeries("coolness", 1167 ["time", "foo", "bar"], 1168 [["2015-06-11T20:46:02Z", "red", "blue"]]); 1169 series.rows[0].get("foo", "oops").shouldEqual("red"); 1170 series.rows[0].get("quux", "oops").shouldEqual("oops"); 1171 } 1172 1173 /// 1174 @("MeasurementSeries.Row.to!string") 1175 @safe pure unittest { 1176 import std.conv: to; 1177 auto series = MeasurementSeries("coolness", 1178 ["time", "foo", "bar"], 1179 [["2015-06-11T20:46:02Z", "red", "blue"]]); 1180 series.rows[0].to!string.shouldEqual("Row(time: 2015-06-11T20:46:02Z, foo: red, bar: blue)"); 1181 } 1182 1183 /// 1184 @("MeasurementSeries long fraction in ISOExtString") 1185 @safe unittest { 1186 1187 import std.datetime: SysTime, DateTime, UTC, usecs; 1188 import std.array: array; 1189 1190 auto series = MeasurementSeries("coolness", 1191 ["time", "foo", "bar"], 1192 [ 1193 ["2017-05-10T14:47:38.82524801Z", "red", "blue"], 1194 ]); 1195 1196 series.rows[0].time.shouldEqual(SysTime(DateTime(2017, 05, 10, 14, 47, 38), 825248.usecs, UTC())); 1197 } 1198 1199 1200 /** 1201 Converts a DateTime to a string suitable for use in queries 1202 e.g. SELECT * FROM foo WHERE time >= 1203 */ 1204 string toInfluxDateTime(in DateTime time) @safe { 1205 import std.datetime: UTC; 1206 return toInfluxDateTime(SysTime(time, UTC())); 1207 } 1208 1209 /// 1210 @("toInfluxDateTime with DateTime") 1211 unittest { 1212 DateTime(2017, 2, 1).toInfluxDateTime.shouldEqual("'2017-02-01T00:00:00Z'"); 1213 } 1214 1215 /** 1216 Converts a SysTime to a string suitable for use in queries 1217 e.g. SELECT * FROM foo WHERE time >= 1218 */ 1219 1220 string toInfluxDateTime(in SysTime time) @safe { 1221 return "'" ~ time.toISOExtString ~ "'"; 1222 } 1223 1224 /// 1225 @("toInfluxDateTime with SysTime") 1226 unittest { 1227 import std.datetime: UTC; 1228 SysTime(DateTime(2017, 2, 1), UTC()).toInfluxDateTime.shouldEqual("'2017-02-01T00:00:00Z'"); 1229 } 1230 1231 version(Test_InfluxD) { 1232 /** 1233 Example: 1234 The two lines must be equivalent under InfluxDB's line protocol 1235 Since the tags and fields aren't ordered, a straight comparison 1236 might yield false errors. 1237 The timestamp is also taken care of by comparing it to the current timestamp 1238 and making sure not too much time has passed since then 1239 */ 1240 void shouldEqualLine(in string actual, 1241 in string expected, 1242 in string file = __FILE__, 1243 in size_t line = __LINE__) @safe pure { 1244 1245 // reassemble the protocol line with sorted tags and fields 1246 string sortLine(in string line) { 1247 1248 import std.algorithm: sort, splitter; 1249 import std.array : array; 1250 import std.conv: text; 1251 import std.range: chain; 1252 import std.string: join, split; 1253 1254 bool isval; 1255 size_t idx; 1256 auto parts = line.splitter!((a) { 1257 if (a == ' ' && !isval && (idx == 0 || line[idx-1] != '\\')) { 1258 idx++; 1259 return true; 1260 } 1261 if (a == '"' && idx > 0 && line[idx-1] != '\\' && line[idx-1] == '=') 1262 isval = true; 1263 else if (a == '"' && idx > 0 && isval && line[idx-1] != '\\') 1264 isval = false; 1265 idx++; 1266 return false; 1267 }).array; 1268 1269 assert(parts.length == 3 || parts.length == 2, 1270 text("Illegal number of parts( ", parts.length, ") in ", line)); 1271 1272 auto nameTags = parts[0].split(","); 1273 const name = nameTags[0]; 1274 auto tags = nameTags[1..$]; 1275 1276 auto fields = parts[1].split(","); 1277 1278 auto newNameTags = chain([name], sort(tags)).join(","); 1279 auto newFields = sort(fields).join(","); 1280 auto newParts = [newNameTags, newFields]; 1281 if(parts.length > 2) newParts ~= parts[2]; 1282 1283 return newParts.join(" "); 1284 } 1285 1286 sortLine(actual).shouldEqual(sortLine(expected), file, line); 1287 } 1288 }