1 /** 2 This module implements a convenience wrapper API for influx. 3 4 Authors: Atila Neves (Kaleidic Associates Advisory Limited) 5 6 Generated documentation: 7 http://influxdb.code.kaleidic.io/influxdb.html 8 9 */ 10 11 module influxdb.api; 12 13 version(Test_InfluxD) 14 import unit_threaded; 15 else 16 struct Values { this(string[]...) { } } 17 18 static import influxdb.vibe; 19 import std.typecons: Flag, No; 20 import std.datetime: Date, DateTime, SysTime, UTC; 21 22 /++ 23 Params: 24 time = Influx-db time string 25 Returns: 26 SysTime 27 +/ 28 SysTime influxSysTime(string time) @safe 29 { 30 import std.datetime: SysTime, DateTimeException; 31 32 try { 33 return SysTime.fromISOExtString(time); 34 } catch(DateTimeException ex) { 35 // see https://issues.dlang.org/show_bug.cgi?id=16053 36 import std.stdio: stderr; 37 import std.algorithm: countUntil; 38 39 debug { 40 (() @trusted => stderr)() 41 .writeln("Could not convert ", time, " due to a Phobos bug, reducing precision"); 42 } 43 44 // find where the fractional part starts 45 auto dotIndex = time.countUntil("."); 46 if(dotIndex < 0) 47 dotIndex = time.countUntil(","); 48 if(dotIndex < 0) 49 throw ex; 50 51 52 const firstNonDigitIndex = time[dotIndex + 1 .. $].countUntil!(a => a < '0' || a > '9') + dotIndex + 1; 53 if(firstNonDigitIndex < 0) throw ex; 54 55 const lastDigitIndex = firstNonDigitIndex - 1; 56 57 foreach(i; 0 .. 4) { 58 // try to cut out a number from the fraction 59 // inaccurate but better than throwing an exception 60 const timeStr = 61 time[0 .. lastDigitIndex - i] ~ 62 time[firstNonDigitIndex .. $]; 63 try 64 return SysTime.fromISOExtString(timeStr); 65 catch(DateTimeException _) {} 66 } 67 68 throw ex; 69 } 70 } 71 72 /// 73 alias Database = DatabaseImpl!(influxdb.vibe.manage, influxdb.vibe.query, influxdb.vibe.write); 74 75 /** 76 Holds information about the database name and URL, forwards 77 it to the implemetation functions for managing, querying and 78 writing to the DB 79 */ 80 struct DatabaseImpl(alias manageFunc, alias queryFunc, alias writeFunc) { 81 82 import influxdb.api; 83 84 version(Have_mir_algorithm) 85 { 86 import mir.series: Series; 87 import mir.ndslice.slice: DeepElementType, Slice, SliceKind; 88 } 89 90 string url; // e.g. http://localhost:8086 91 string db; // e.g. mydb 92 93 @disable this(); 94 95 this(string url, string db) { 96 this.url = url; 97 this.db = db; 98 99 manage("CREATE DATABASE " ~ db); 100 } 101 102 /** 103 Sends management commands to the DB (CREATE, DROP). 104 The parameter must be the full command (e.g. "DROP DATABASE mydb") 105 */ 106 void manage(in string cmd) const { 107 manageFunc(url, cmd); 108 } 109 110 /** 111 Queries the DB. The query must be a full InfluxDB query 112 (e.g. "SELECT * FROM foo") 113 */ 114 Response query(in string query) @trusted const { // deserialize is @system 115 import asdf: deserialize; 116 return queryFunc(url, db, query).deserialize!Response; 117 } 118 119 /** 120 Insert data into the DB. 121 */ 122 void insert(in Measurement[] measurements) const { 123 import std.format: format; 124 125 if(measurements.length == 0) return; 126 127 static if (__VERSION__ >= 2074) 128 writeFunc(url, db, () @trusted { return format!"%(%s\n%)"(measurements); }()); 129 else 130 writeFunc(url, db, format("%(%s\n%)", measurements)); 131 } 132 133 /** 134 Insert data into the DB. 135 */ 136 void insert(in Measurement[] measurements...) const { 137 insert(measurements); 138 } 139 140 /** 141 Insert Mir times-series with single column into the DB. 142 Supported time types are `SysTime`, `DateTime`, `Date`, and `long`. 143 144 See also the example in the `mir-integration` folder. 145 146 Params: 147 measurementName = measurement name 148 columnName = column name 149 series1 = 1D time-series 150 commonTags = list of tags to add to each measurement (optional) 151 */ 152 version(Have_mir_algorithm) 153 void insert(TimeIterator, SliceKind kind, Iterator)( 154 string measurementName, 155 string columnName, 156 Series!(TimeIterator, Iterator, 1, kind) series1, 157 string[string] commonTags = null, 158 ) const 159 { 160 import mir.series: series; 161 import mir.ndslice.topology: repeat, unpack; 162 import mir.ndslice.dynamic: transposed; 163 164 return this.insert( 165 measurementName, 166 [columnName], 167 series1.time.series(series1.data.repeat(1).unpack.transposed), 168 commonTags, 169 ); 170 } 171 172 /** 173 Insert Mir times-series with multiple columns into the DB. 174 Supported time types are `SysTime`, `DateTime`, `Date`, and `long`. 175 176 See also the example in the `mir-integration` folder. 177 178 Params: 179 measurementName = measurement name 180 columnNames = array of column names 181 series = 2D time-series 182 commonTags = list of tags to add to each measurement (optional) 183 */ 184 version(Have_mir_algorithm) 185 void insert(TimeIterator, SliceKind kind, Iterator)( 186 string measurementName, 187 in string[] columnNames, 188 Series!(TimeIterator, Iterator, 2, kind) series, 189 string[string] commonTags = null, 190 ) const 191 { 192 alias Time = typeof(series.front.time); 193 alias Data = DeepElementType!(typeof(series.front.data)); 194 195 import std.traits: isSomeString; 196 import std.exception: assumeUnique, enforce; 197 import std.array: appender; 198 import std.format: FormatSpec, formatValue; 199 200 enforce(measurementName.length); 201 enforce(columnNames.length == series.length!1, "columnNames.length should be equal to series.length!1"); 202 203 if (series.length == 0) 204 { 205 return; 206 } 207 208 FormatSpec!char fmt; 209 auto app = appender!(const(char)[]); 210 211 // name output 212 app.put(measurementName); 213 // tags output 214 if (commonTags.length) 215 { 216 app.put(","); 217 aaFormat(&app.put!(const(char)[]), commonTags); 218 } 219 app.put(" "); 220 // name + tags 221 auto head = app.data; 222 app = appender!(const(char)[]); 223 foreach (i; 0 .. series.length) 224 { 225 auto observation = series[i]; 226 if (i) 227 { 228 app.put("\n"); 229 } 230 231 app.put(head); 232 233 // values output 234 foreach(j, key; columnNames) 235 { 236 auto value = observation.data[j]; 237 if (j) 238 app.put(","); 239 app.formatValue(key, fmt); 240 app.put("="); 241 static if(isSomeString!Data) 242 { 243 app.put(`"`); 244 app.formatValue(value, fmt); 245 app.put(`"`); 246 } 247 else 248 { 249 app.formatValue(value, fmt); 250 } 251 } 252 253 // time output 254 static if (is(Time : long)) 255 { 256 long timestamp = observation.time; 257 } 258 else 259 static if (is(Time : SysTime)) 260 { 261 long timestamp = observation.time.toUnixTime!long * 1_000_000_000 + observation.time.fracSecs.total!"nsecs"; 262 } 263 else 264 static if (is(Time : DateTime) || is(Time : Date)) 265 { 266 long timestamp = SysTime(observation.time, UTC()).toUnixTime!long * 1_000_000_000; 267 } 268 else 269 { 270 static assert(0, "Unsupported timestamp type: " ~ Time.stringof); 271 } 272 if (timestamp != 0) 273 { 274 app.put(" "); 275 app.formatValue(timestamp, fmt); 276 } 277 } 278 writeFunc(url, db, app.data.assumeUnique); 279 } 280 281 /** 282 Delete this DB 283 */ 284 void drop() const { 285 manage("DROP DATABASE " ~ db); 286 } 287 } 288 289 /// 290 @("Database") 291 @safe unittest { // not pure because of asdf.deserialize 292 293 string[string][] manages; 294 string[string][] queries; 295 string[string][] writes; 296 297 alias TestDatabase = DatabaseImpl!( 298 (url, cmd) => manages ~= ["url": url, "cmd": cmd], // manage 299 (url, db, query) { // query 300 queries ~= ["url": url, "db": db, "query": query]; 301 return 302 `{ 303 "results": [{ 304 "series": [{ 305 "columns": ["time", "othervalue", "tag1", "tag2", "value"], 306 "name": "lename", 307 "values": [ 308 ["2015-06-11T20:46:02Z", 4, "toto", "titi", 2], 309 ["2017-03-14T23:15:01.06282785Z", 3, "letag", "othertag", 1] 310 ] 311 }], 312 "statement_id": 33 313 }] 314 }`; 315 }, 316 (url, db, line) => writes ~= ["url": url, "db": db, "line": line] 317 ); 318 319 manages.shouldBeEmpty; 320 const database = TestDatabase("http://db.com", "testdb"); 321 manages.shouldEqual([["url": "http://db.com", "cmd": "CREATE DATABASE testdb"]]); 322 323 writes.shouldBeEmpty; 324 database.insert(Measurement("cpu", ["tag1": "foo"], ["temperature": "42"])); 325 writes.shouldEqual([["url": "http://db.com", "db": "testdb", 326 "line": "cpu,tag1=foo temperature=42"]]); 327 328 queries.shouldBeEmpty; 329 const response = database.query("SELECT * from foo"); 330 queries.shouldEqual([["url": "http://db.com", "db": "testdb", "query": "SELECT * from foo"]]); 331 332 response.results.length.shouldEqual(1); 333 response.results[0].statement_id.shouldEqual(33); 334 response.results[0].series.length.shouldEqual(1); 335 const series = response.results[0].series[0]; 336 series.shouldEqual( 337 MeasurementSeries( 338 "lename", //name 339 ["time", "othervalue", "tag1", "tag2", "value"], //columns 340 //values 341 [ 342 ["2015-06-11T20:46:02Z", "4", "toto", "titi", "2"], 343 ["2017-03-14T23:15:01.06282785Z", "3", "letag", "othertag", "1"], 344 ] 345 ) 346 ); 347 } 348 349 /// 350 @("insert") 351 @safe unittest { 352 353 string[] lines; 354 355 alias TestDatabase = DatabaseImpl!( 356 (url, cmd) { }, // manage 357 (url, db, query) => `{}`, // query 358 (url, db, line) => lines ~= line // write 359 ); 360 361 const database = TestDatabase("http://db.com", "testdb"); 362 database.insert( 363 Measurement("cpu", ["index": "1"], ["temperature": "42"]), 364 Measurement("cpu", ["index": "2"], ["temperature": "42"]), 365 Measurement("cpu", ["index": "2"], ["temperature": "42"]), 366 ); 367 368 () @trusted { 369 lines.shouldEqual( 370 [ 371 "cpu,index=1 temperature=42\ncpu,index=2 temperature=42\ncpu,index=2 temperature=42", 372 ] 373 ); 374 }(); 375 } 376 377 378 @("insert with no measurements does nothing") 379 @safe unittest { 380 381 string[] lines; 382 383 alias TestDatabase = DatabaseImpl!( 384 (url, cmd) { }, // manage 385 (url, db, query) => `{}`, // query 386 (url, db, line) => lines ~= line // write 387 ); 388 389 const database = TestDatabase("http://db.com", "testdb"); 390 Measurement[] measurements; 391 database.insert(measurements); 392 lines.shouldBeEmpty; 393 } 394 395 396 /** 397 An InfluxDB measurement 398 */ 399 struct Measurement { 400 401 import std.datetime: SysTime; 402 import std.traits : Unqual; 403 404 string name; 405 string[string] tags; 406 InfluxValue[string] fields; 407 long timestamp; 408 409 this(T) 410 (string name, 411 T[string] fields, 412 SysTime time = SysTime.fromUnixTime(0)) 413 @safe 414 if (is(Unqual!T == string) || is(Unqual!T == InfluxValue)) { 415 string[string] tags; 416 this(name, tags, fields, time); 417 } 418 419 this(T)(string name, 420 string[string] tags, 421 T[string] fields, 422 SysTime time = SysTime.fromUnixTime(0)) 423 @safe // impure due to SysTime.fracSecs 424 if (is(Unqual!T == string) || is(Unqual!T == InfluxValue)) { 425 426 import std.datetime: nsecs; 427 428 this.name = name; 429 this.tags = tags; 430 431 static if (is(Unqual!T == string)) { 432 import std.typecons : Nullable; 433 InfluxValue[string] ifields; 434 () @trusted { 435 foreach(element; fields.byKeyValue) { 436 ifields[element.key] = InfluxValue(element.value, Nullable!(InfluxValue.Type).init); 437 } 438 }(); 439 this.fields = ifields; 440 } 441 else this.fields = fields; 442 // InfluxDB uses UNIX time in _nanoseconds_ 443 // stdTime is in hnsecs 444 //this.timestamp = time.stdTime / 100; 445 this.timestamp = (time.toUnixTime!long * 1_000_000_000 + time.fracSecs.total!"nsecs"); 446 } 447 448 void toString(Dg)(scope Dg dg) const { 449 450 import std.format: FormatSpec, formatValue; 451 import std.typecons: Yes; 452 453 FormatSpec!char fmt; 454 dg.escape(`, `).formatValue(name, fmt); 455 if (tags.length) { 456 dg.formatValue(',', fmt); 457 dg.aaFormat(tags); 458 } 459 dg.formatValue(' ', fmt); 460 dg.aaFormat(fields, Yes.quoteStrings); 461 if(timestamp != 0) { 462 dg.formatValue(' ', fmt); 463 dg.formatValue(timestamp, fmt); 464 } 465 } 466 467 static if (__VERSION__ < 2072) { 468 string toString() @safe const { 469 import std.array : appender; 470 auto res = appender!string; 471 toString(res); 472 return res.data; 473 } 474 } 475 else { 476 deprecated("Use std.conv.to!string instead.") 477 string toString()() @safe const { 478 import std.conv: to; 479 return this.to!string; 480 } 481 } 482 } 483 484 private void aaFormat(Dg, T : K[V], K, V) 485 (scope Dg dg, scope T aa, in Flag!"quoteStrings" quoteStrings = No.quoteStrings) 486 { 487 import std.format: FormatSpec, formatValue; 488 size_t i; 489 FormatSpec!char fmt; 490 foreach(key, value; aa) 491 { 492 if (i++) 493 dg.formatValue(',', fmt); 494 dg.escape(`,= `).formatValue(key, fmt); 495 dg.formatValue('=', fmt); 496 if(quoteStrings && valueIsString(value)) { 497 dg.formatValue('"', fmt); 498 dg.escape(`"\`).formatValue(value, fmt); 499 dg.formatValue('"', fmt); 500 } else 501 dg.escape(`, `).formatValue(value, fmt); 502 } 503 } 504 505 private auto escape(Dg)(scope Dg dg, in char[] chars...) { 506 507 struct Escaper(Dg) { 508 void put(T)(T val) { 509 import std.algorithm : canFind; 510 import std.format: FormatSpec, formatValue; 511 512 FormatSpec!char fmt; 513 foreach (c; val) { 514 if (chars.canFind(c)) 515 dg.formatValue('\\', fmt); 516 dg.formatValue(c, fmt); 517 } 518 } 519 520 static if (__VERSION__ < 2072) { 521 void putChar(char c) { 522 import std.algorithm : canFind; 523 import std.format : formattedWrite; 524 525 if (chars.canFind(c)) 526 dg.formattedWrite("%s", '\\'); 527 dg.formattedWrite("%s", c); 528 } 529 } 530 } 531 532 return Escaper!Dg(); 533 } 534 535 private auto valueIsString(T)(in T value) { 536 static if (is(T == string)) return true; 537 else static if (is(T == InfluxValue)) return value.type == InfluxValue.Type.string_; 538 else static assert(0, format!"Unexpected value type %s"(typeid(T))); 539 } 540 541 private auto guessValueType(string value) @safe pure nothrow @nogc { 542 543 import std.algorithm: all, canFind; 544 import std.string : representation; 545 546 static immutable boolValues = ["t", "T", "true", "True", "TRUE", "f", "F", "false", "False", "FALSE"]; 547 548 // test for bool values 549 if(boolValues.canFind(value)) return InfluxValue.Type.bool_; 550 551 // test for int values 552 if(value.length > 0 && value[$ - 1] == 'i') { 553 auto tmp = value[0..$-1]; 554 if (tmp[0] == '-' && tmp.length > 1) tmp = tmp[1..$]; 555 if (tmp.representation.all!(a => a >= '0' && a <= '9')) return InfluxValue.Type.int_; 556 } 557 558 // test for float values 559 if (valueIsFloat(value)) return InfluxValue.Type.float_; 560 561 return InfluxValue.Type.string_; 562 } 563 564 private bool valueIsFloat(in string value) @safe pure nothrow @nogc { 565 566 if (!value.length) return false; 567 568 int dotidx = -1; 569 int eidx = -1; 570 foreach(i, c; value) { 571 if (c == '+' || c == '-') { 572 if (i != 0 && (eidx < 0 || (eidx >= 0 && i-1 != eidx))) return false; 573 } 574 else if (c == '.') { 575 if (dotidx >= 0 || eidx > 0) return false; 576 dotidx = cast(int)i; 577 } 578 else if (c == 'e' || c == 'E') { 579 if (i == 0 || eidx > 0 || i+1 == value.length) return false; 580 eidx = cast(int)i; 581 } 582 else if (c < '0' || c > '9') return false; 583 } 584 return true; 585 } 586 587 /// 588 @("valueIsFloat") 589 @safe unittest { 590 // valid 591 "123".valueIsFloat.shouldBeTrue; 592 "-123".valueIsFloat.shouldBeTrue; 593 "1e1".valueIsFloat.shouldBeTrue; 594 "+1.e1".valueIsFloat.shouldBeTrue; 595 ".1e1".valueIsFloat.shouldBeTrue; 596 "1e+1".valueIsFloat.shouldBeTrue; 597 "1.E-1".valueIsFloat.shouldBeTrue; 598 "1.2".valueIsFloat.shouldBeTrue; 599 "-1.3e+10".valueIsFloat.shouldBeTrue; 600 "+1.".valueIsFloat.shouldBeTrue; 601 602 // invalid 603 "1a".valueIsFloat.shouldBeFalse; 604 "1e.1".valueIsFloat.shouldBeFalse; 605 "e1".valueIsFloat.shouldBeFalse; 606 "".valueIsFloat.shouldBeFalse; 607 "1eE1".valueIsFloat.shouldBeFalse; 608 "1..0".valueIsFloat.shouldBeFalse; 609 "1.e.1".valueIsFloat.shouldBeFalse; 610 "1e12.3".valueIsFloat.shouldBeFalse; 611 "1e1+1".valueIsFloat.shouldBeFalse; 612 "ee".valueIsFloat.shouldBeFalse; 613 "1ee1".valueIsFloat.shouldBeFalse; 614 "++1".valueIsFloat.shouldBeFalse; 615 "1+1".valueIsFloat.shouldBeFalse; 616 "1.1.1".valueIsFloat.shouldBeFalse; 617 "1+".valueIsFloat.shouldBeFalse; 618 "1ě+1".valueIsFloat.shouldBeFalse; 619 } 620 621 /// 622 @("Measurement.to!string no timestamp") 623 @safe unittest { 624 import std.conv: to; 625 { 626 auto m = Measurement("cpu", 627 ["tag1": "toto", "tag2": "foo"], 628 ["load": "42", "temperature": "53"]); 629 m.to!string.shouldEqualLine("cpu,tag1=toto,tag2=foo load=42,temperature=53"); 630 } 631 { 632 auto m = Measurement("thingie", 633 ["foo": "bar"], 634 ["value": "7"]); 635 m.to!string.shouldEqualLine("thingie,foo=bar value=7"); 636 } 637 } 638 639 /// 640 @("Measurement.to!string no timestamp no tags") 641 @safe unittest { 642 import std.conv: to; 643 auto m = Measurement("cpu", 644 ["load": "42", "temperature": "53"]); 645 m.to!string.shouldEqualLine("cpu load=42,temperature=53"); 646 } 647 648 /// 649 @("Measurement.to!string with timestamp") 650 @safe unittest { 651 import std.conv: to; 652 import std.datetime: SysTime; 653 654 auto m = Measurement("cpu", 655 ["tag1": "toto", "tag2": "foo"], 656 ["load": "42", "temperature": "53"], 657 SysTime.fromUnixTime(7)); 658 m.to!string.shouldEqualLine("cpu,tag1=toto,tag2=foo load=42,temperature=53 7000000000"); 659 } 660 661 /// 662 @("Measurement.to!string with timestamp no tags") 663 @safe unittest { 664 import std.conv: to; 665 import std.datetime: SysTime; 666 667 auto m = Measurement("cpu", 668 ["load": "42", "temperature": "53"], 669 SysTime.fromUnixTime(7)); 670 m.to!string.shouldEqualLine("cpu load=42,temperature=53 7000000000"); 671 } 672 673 @("Measurement fraction of a second") 674 @safe unittest { 675 import std.conv: to; 676 import std.datetime: DateTime, SysTime, Duration, usecs, nsecs, UTC; 677 auto m = Measurement("cpu", 678 ["load": "42", "temperature": "53"], 679 SysTime(DateTime(2017, 2, 1), 300.usecs + 700.nsecs, UTC())); 680 m.to!string.shouldEqualLine("cpu load=42,temperature=53 1485907200000300700"); 681 } 682 683 @("Measurement.to!string with string") 684 @safe unittest { 685 import std.conv: to; 686 import std.datetime: SysTime; 687 688 auto m = Measurement("cpu", 689 ["foo": "bar"], 690 SysTime.fromUnixTime(7)); 691 m.to!string.shouldEqualLine(`cpu foo="bar" 7000000000`); 692 } 693 694 static foreach(value; ["t", "T", "true", "True", "TRUE", "f", "F", "false", "False", "FALSE"]) { 695 @("Measurement.to!string with bool." ~ value) 696 @safe unittest { 697 import std.conv: to; 698 import std.datetime: SysTime; 699 700 auto m = Measurement("cpu", 701 ["foo": value], 702 SysTime.fromUnixTime(7)); 703 m.to!string.shouldEqualLine(`cpu foo=` ~ value ~ ` 7000000000`); 704 } 705 } 706 707 @("Measurement.to!string with int") 708 @safe unittest { 709 import std.conv: to; 710 import std.datetime: SysTime; 711 712 auto m = Measurement("cpu", 713 ["foo": "16i", "bar": "-1i", "str": "-i"], 714 SysTime.fromUnixTime(7)); 715 m.to!string.shouldEqualLine(`cpu foo=16i,bar=-1i,str="-i" 7000000000`); 716 } 717 718 @("Measurement.to!string with special characters") 719 @safe unittest { 720 import std.conv: to; 721 722 auto m = Measurement(`cpu "load", test`, 723 ["tag 1": `to"to`, "tag,2": "foo"], 724 ["foo,= ": "a,b", "b,a=r": `a " b`]); 725 m.to!string.shouldEqualLine(`cpu\ "load"\,\ test,tag\ 1=to"to,tag\,2=foo foo\,\=\ ="a,b",b\,a\=r="a \" b"`); 726 } 727 728 729 /** 730 A sum type of values that can be stored in influxdb 731 */ 732 struct InfluxValue { 733 734 import std.typecons : Nullable; 735 736 enum Type { bool_, int_, float_, string_ } 737 738 union Payload { 739 bool b; 740 long i; 741 double f; 742 } 743 744 private { 745 Payload _value; 746 string _rawString = null; 747 Type _type; 748 } 749 750 this(bool v) @safe pure nothrow { 751 _value.b = v; 752 _type = InfluxValue.Type.bool_; 753 } 754 755 this(T)(T v) @safe pure nothrow 756 if (is(T == int) || is(T == long)) { 757 _value.i = v; 758 _type = InfluxValue.Type.int_; 759 } 760 761 this(T)(T v) @safe pure nothrow 762 if (is(T == float) || is(T == double)) { 763 _value.f = v; 764 _type = InfluxValue.Type.float_; 765 } 766 767 this(string v, Nullable!Type type = Nullable!Type(Type.string_)) @safe pure nothrow 768 in { 769 assert(v !is null); 770 } body { 771 _rawString = v; 772 if (type.isNull) _type = guessValueType(v); 773 else _type = type.get; 774 } 775 776 auto type() const @safe pure nothrow { 777 return _type; 778 } 779 780 void toString(Dg)(scope Dg dg) const { 781 782 import std.format: FormatSpec, formattedWrite, formatValue; 783 784 FormatSpec!char fmt; 785 if (_rawString !is null) { 786 if (_type == Type.int_ && _rawString[$-1] != 'i') dg.formattedWrite("%si", _rawString, fmt); 787 else dg.formatValue(_rawString, fmt); 788 } 789 else { 790 final switch (_type) with (Type) { 791 case bool_: dg.formatValue(_value.b, fmt); break; 792 case int_: dg.formattedWrite("%si", _value.i, fmt); break; 793 case float_: dg.formatValue(_value.f, fmt); break; 794 case string_: assert(0); 795 } 796 } 797 } 798 } 799 800 @("Measurement.to!string InfluxValue int") 801 @safe unittest { 802 import std.conv: to; 803 import std.datetime: SysTime; 804 805 const m = Measurement("cpu", 806 ["foo": InfluxValue(16)], 807 SysTime.fromUnixTime(7)); 808 () @trusted { return m.to!string; }().shouldEqualLine(`cpu foo=16i 7000000000`); 809 } 810 811 @("Measurement.to!string InfluxValue long") 812 @safe unittest { 813 import std.conv: to; 814 import std.datetime: SysTime; 815 816 const m = Measurement("cpu", 817 ["foo": InfluxValue(16L)], 818 SysTime.fromUnixTime(7)); 819 () @trusted { return m.to!string; }().shouldEqualLine(`cpu foo=16i 7000000000`); 820 } 821 822 @("Measurement.to!string InfluxValue float") 823 @safe unittest { 824 import std.conv: to; 825 import std.datetime: SysTime; 826 827 Measurement("cpu", 828 ["foo": InfluxValue(16.0f)], 829 SysTime.fromUnixTime(7)) 830 .to!string.shouldEqualLine(`cpu foo=16 7000000000`); 831 832 Measurement("cpu", 833 ["foo": InfluxValue(16.1)], 834 SysTime.fromUnixTime(7)) 835 .to!string.shouldEqualLine(`cpu foo=16.1 7000000000`); 836 } 837 838 @("Measurement.to!string InfluxValue boolean") 839 @safe unittest { 840 import std.conv: to; 841 import std.datetime: SysTime; 842 843 Measurement("cpu", 844 ["foo": InfluxValue(true)], 845 SysTime.fromUnixTime(7)) 846 .to!string.shouldEqualLine(`cpu foo=true 7000000000`); 847 } 848 849 @("Measurement.to!string InfluxValue string") 850 @safe unittest { 851 import std.conv: to; 852 import std.datetime: SysTime; 853 854 Measurement("cpu", 855 ["foo": InfluxValue("bar")], 856 SysTime.fromUnixTime(7)) 857 .to!string.shouldEqualLine(`cpu foo="bar" 7000000000`); 858 } 859 860 @("Measurement.to!string InfluxValue empty string") 861 @safe unittest { 862 import std.conv: to; 863 import std.datetime: SysTime; 864 865 Measurement("cpu", 866 ["foo": InfluxValue("")], 867 SysTime.fromUnixTime(7)) 868 .to!string.shouldEqualLine(`cpu foo="" 7000000000`); 869 } 870 871 @("Measurement.to!string InfluxValue string escaping") 872 @safe unittest { 873 import std.conv: to; 874 import std.datetime: SysTime; 875 876 Measurement("cpu", 877 ["foo": InfluxValue(`{"msg":"\"test\""}`)], 878 SysTime.fromUnixTime(7)) 879 .to!string.shouldEqualLine(`cpu foo="{\"msg\":\"\\\"test\\\"\"}" 7000000000`); 880 } 881 882 @("Measurement.to!string InfluxValue string with specified bool value") 883 @safe unittest { 884 import std.conv: to; 885 import std.datetime: SysTime; 886 import std.typecons : Nullable; 887 888 Measurement("cpu", 889 ["foo": InfluxValue("true", Nullable!(InfluxValue.Type)(InfluxValue.Type.bool_))], 890 SysTime.fromUnixTime(7)) 891 .to!string.shouldEqualLine(`cpu foo=true 7000000000`); 892 } 893 894 @("Measurement.to!string InfluxValue string with specified int value") 895 @safe unittest { 896 import std.conv: to; 897 import std.datetime: SysTime; 898 import std.typecons : Nullable; 899 900 Measurement("cpu", 901 ["foo": InfluxValue("42", Nullable!(InfluxValue.Type)(InfluxValue.Type.int_))], 902 SysTime.fromUnixTime(7)) 903 .to!string.shouldEqualLine(`cpu foo=42i 7000000000`); 904 } 905 906 @("Measurement.to!string InfluxValue string with specified postfixed int value") 907 @safe unittest { 908 import std.conv: to; 909 import std.datetime: SysTime; 910 import std.typecons : Nullable; 911 912 Measurement("cpu", 913 ["foo": InfluxValue("42i", Nullable!(InfluxValue.Type)(InfluxValue.Type.int_))], 914 SysTime.fromUnixTime(7)) 915 .to!string.shouldEqualLine(`cpu foo=42i 7000000000`); 916 } 917 918 @("Measurement.to!string InfluxValue string with specified float value") 919 @safe unittest { 920 import std.conv: to; 921 import std.datetime: SysTime; 922 import std.typecons : Nullable; 923 924 Measurement("cpu", 925 ["foo": InfluxValue("1.2", Nullable!(InfluxValue.Type)(InfluxValue.Type.float_))], 926 SysTime.fromUnixTime(7)) 927 .to!string.shouldEqualLine(`cpu foo=1.2 7000000000`); 928 } 929 930 @("Measurement.to!string InfluxValue string with float value") 931 @safe unittest { 932 import std.conv: to; 933 import std.datetime: SysTime; 934 935 Measurement("cpu", 936 ["foo": InfluxValue("5E57758")], 937 SysTime.fromUnixTime(7)) 938 .to!string.shouldEqualLine(`cpu foo="5E57758" 7000000000`); 939 } 940 941 @("Measurement.to!string InfluxValue string with guessed bool value") 942 @safe unittest { 943 import std.conv: to; 944 import std.datetime: SysTime; 945 import std.typecons : Nullable; 946 947 Measurement("cpu", 948 ["foo": InfluxValue("true", Nullable!(InfluxValue.Type).init)], 949 SysTime.fromUnixTime(7)) 950 .to!string.shouldEqualLine(`cpu foo=true 7000000000`); 951 } 952 953 @("Measurement.to!string InfluxValue string with guessed int value") 954 @safe unittest { 955 import std.conv: to; 956 import std.datetime: SysTime; 957 import std.typecons : Nullable; 958 959 Measurement("cpu", 960 ["foo": InfluxValue("42i", Nullable!(InfluxValue.Type).init)], 961 SysTime.fromUnixTime(7)) 962 .to!string.shouldEqualLine(`cpu foo=42i 7000000000`); 963 } 964 965 @("Measurement.to!string InfluxValue string with guessed float value") 966 @safe unittest { 967 import std.conv: to; 968 import std.datetime: SysTime; 969 import std.typecons : Nullable; 970 971 Measurement("cpu", 972 ["foo": InfluxValue("1.2", Nullable!(InfluxValue.Type).init)], 973 SysTime.fromUnixTime(7)) 974 .to!string.shouldEqualLine(`cpu foo=1.2 7000000000`); 975 } 976 977 @("Measurement.to!string InfluxValue guessed string value") 978 @safe unittest { 979 import std.conv: to; 980 import std.datetime: SysTime; 981 982 Measurement("cpu", 983 ["foo": InfluxValue("bar")], 984 SysTime.fromUnixTime(7)) 985 .to!string.shouldEqualLine(`cpu foo="bar" 7000000000`); 986 } 987 988 /** 989 A query response 990 */ 991 struct Response { 992 Result[] results; 993 } 994 995 /** 996 A result of a query 997 */ 998 struct Result { 999 MeasurementSeries[] series; 1000 int statement_id; 1001 } 1002 1003 /** 1004 Data for one measurement 1005 */ 1006 struct MeasurementSeries { 1007 1008 import asdf: serializationIgnoreIn, Asdf; 1009 1010 string name; 1011 string[] columns; 1012 @serializationIgnoreIn string[][] values; 1013 1014 static struct Rows { 1015 import std.range: Transversal, TransverseOptions; 1016 1017 const string[] columns; 1018 const(string[])[] rows; 1019 1020 static struct Row { 1021 1022 import std.datetime: SysTime; 1023 1024 const string[] columnNames; 1025 const string[] columnValues; 1026 1027 string opIndex(in string key) @safe pure const { 1028 import std.algorithm: countUntil; 1029 return columnValues[columnNames.countUntil(key)]; 1030 } 1031 1032 string get(string key, string defaultValue) @safe pure const { 1033 import std.algorithm: countUntil; 1034 auto i = columnNames.countUntil(key); 1035 return (i==-1) ? defaultValue : columnValues[i]; 1036 } 1037 1038 SysTime time() @safe const { 1039 return influxSysTime(this["time"]); 1040 } 1041 1042 void toString(Dg)(scope Dg dg) const { 1043 dg("Row("); 1044 foreach(i, value; columnValues) { 1045 if (i) 1046 dg(", "); 1047 dg(columnNames[i]); 1048 dg(": "); 1049 dg(value); 1050 } 1051 dg(")"); 1052 } 1053 1054 deprecated("Use std.conv.to!string instead.") 1055 string toString()() { 1056 import std.conv: to; 1057 return this.to!string; 1058 } 1059 } 1060 1061 Row opIndex(in size_t i) @safe pure const nothrow { 1062 return Row(columns, rows[i]); 1063 } 1064 1065 /++ 1066 Params: 1067 key = column name 1068 Returns: 1069 Column as range access range of strings. 1070 Throws: 1071 Exception, if key was not found. 1072 +/ 1073 Transversal!(const(string[])[], TransverseOptions.assumeNotJagged) 1074 opIndex(in string key) @safe pure const { 1075 import std.algorithm: countUntil; 1076 size_t idx = columns.countUntil(key); 1077 if (idx >= columns.length) 1078 throw new Exception("Unknown key " ~ key); 1079 return typeof(return)(rows, idx); 1080 } 1081 1082 size_t length() @safe pure const nothrow { return rows.length; } 1083 1084 void popFront() @safe pure nothrow { 1085 rows = rows[1 .. $]; 1086 } 1087 1088 Row front() @safe pure nothrow { 1089 return this[0]; 1090 } 1091 1092 bool empty() @safe pure nothrow const { 1093 return rows.length == 0; 1094 } 1095 } 1096 1097 inout(Rows) rows() @safe pure nothrow inout { 1098 return inout(Rows)(columns, values); 1099 } 1100 1101 void finalizeDeserialization(Asdf data) { 1102 import std.algorithm: map, count; 1103 import std.array: uninitializedArray; 1104 auto rows = data["values"].byElement.map!"a.byElement"; 1105 // count is fast for Asdf 1106 values = uninitializedArray!(string[][])(rows.count, columns.length); 1107 foreach(value; values) 1108 { 1109 auto row = rows.front; 1110 assert(row.count == columns.length); 1111 foreach (ref e; value) 1112 { 1113 // do not allocates data here because of `const`, 1114 // reuses Asdf data 1115 e = cast(string) cast(const(char)[]) row.front; 1116 row.popFront; 1117 } 1118 rows.popFront; 1119 } 1120 } 1121 } 1122 1123 /// 1124 @("MeasurementSeries") 1125 @safe unittest { 1126 1127 import std.datetime: SysTime, DateTime, UTC; 1128 import std.array: array; 1129 1130 auto series = MeasurementSeries("coolness", 1131 ["time", "foo", "bar"], 1132 [ 1133 ["2015-06-11T20:46:02Z", "red", "blue"], 1134 ["2013-02-09T12:34:56Z", "green", "yellow"], 1135 ]); 1136 1137 series.rows[0]["foo"].shouldEqual("red"); 1138 series.rows[0]["time"].shouldEqual("2015-06-11T20:46:02Z"); 1139 series.rows[0].time.shouldEqual(SysTime(DateTime(2015, 06, 11, 20, 46, 2), UTC())); 1140 1141 series.rows[1]["bar"].shouldEqual("yellow"); 1142 series.rows[1]["time"].shouldEqual("2013-02-09T12:34:56Z"); 1143 series.rows[1].time.shouldEqual(SysTime(DateTime(2013, 2, 9, 12, 34, 56), UTC())); 1144 1145 series.rows["time"][0].shouldEqual("2015-06-11T20:46:02Z"); 1146 series.rows["bar"][1].shouldEqual("yellow"); 1147 1148 series.rows.array.shouldEqual( 1149 [ 1150 MeasurementSeries.Rows.Row(["time", "foo", "bar"], 1151 ["2015-06-11T20:46:02Z", "red", "blue"], 1152 ), 1153 MeasurementSeries.Rows.Row(["time", "foo", "bar"], 1154 ["2013-02-09T12:34:56Z", "green", "yellow"], 1155 ), 1156 ] 1157 ); 1158 } 1159 1160 /// 1161 @("MeasurementSeries.get") 1162 @safe pure unittest { 1163 auto series = MeasurementSeries("coolness", 1164 ["time", "foo", "bar"], 1165 [["2015-06-11T20:46:02Z", "red", "blue"]]); 1166 series.rows[0].get("foo", "oops").shouldEqual("red"); 1167 series.rows[0].get("quux", "oops").shouldEqual("oops"); 1168 } 1169 1170 /// 1171 @("MeasurementSeries.Row.to!string") 1172 @safe pure unittest { 1173 import std.conv: to; 1174 auto series = MeasurementSeries("coolness", 1175 ["time", "foo", "bar"], 1176 [["2015-06-11T20:46:02Z", "red", "blue"]]); 1177 series.rows[0].to!string.shouldEqual("Row(time: 2015-06-11T20:46:02Z, foo: red, bar: blue)"); 1178 } 1179 1180 /// 1181 @("MeasurementSeries long fraction in ISOExtString") 1182 @safe unittest { 1183 1184 import std.datetime: SysTime, DateTime, UTC, usecs; 1185 import std.array: array; 1186 1187 auto series = MeasurementSeries("coolness", 1188 ["time", "foo", "bar"], 1189 [ 1190 ["2017-05-10T14:47:38.82524801Z", "red", "blue"], 1191 ]); 1192 1193 series.rows[0].time.shouldEqual(SysTime(DateTime(2017, 05, 10, 14, 47, 38), 825248.usecs, UTC())); 1194 } 1195 1196 1197 /** 1198 Converts a DateTime to a string suitable for use in queries 1199 e.g. SELECT * FROM foo WHERE time >= 1200 */ 1201 string toInfluxDateTime(in DateTime time) @safe { 1202 import std.datetime: UTC; 1203 return toInfluxDateTime(SysTime(time, UTC())); 1204 } 1205 1206 /// 1207 @("toInfluxDateTime with DateTime") 1208 unittest { 1209 DateTime(2017, 2, 1).toInfluxDateTime.shouldEqual("'2017-02-01T00:00:00Z'"); 1210 } 1211 1212 /** 1213 Converts a SysTime to a string suitable for use in queries 1214 e.g. SELECT * FROM foo WHERE time >= 1215 */ 1216 1217 string toInfluxDateTime(in SysTime time) @safe { 1218 return "'" ~ time.toISOExtString ~ "'"; 1219 } 1220 1221 /// 1222 @("toInfluxDateTime with SysTime") 1223 unittest { 1224 import std.datetime: UTC; 1225 SysTime(DateTime(2017, 2, 1), UTC()).toInfluxDateTime.shouldEqual("'2017-02-01T00:00:00Z'"); 1226 } 1227 1228 version(Test_InfluxD) { 1229 /** 1230 Example: 1231 The two lines must be equivalent under InfluxDB's line protocol 1232 Since the tags and fields aren't ordered, a straight comparison 1233 might yield false errors. 1234 The timestamp is also taken care of by comparing it to the current timestamp 1235 and making sure not too much time has passed since then 1236 */ 1237 void shouldEqualLine(in string actual, 1238 in string expected, 1239 in string file = __FILE__, 1240 in size_t line = __LINE__) @safe pure { 1241 1242 // reassemble the protocol line with sorted tags and fields 1243 string sortLine(in string line) { 1244 1245 import std.algorithm: sort, splitter; 1246 import std.array : array; 1247 import std.conv: text; 1248 import std.range: chain; 1249 import std.string: join, split; 1250 1251 bool isval; 1252 size_t idx; 1253 auto parts = line.splitter!((a) { 1254 if (a == ' ' && !isval && (idx == 0 || line[idx-1] != '\\')) { 1255 idx++; 1256 return true; 1257 } 1258 if (a == '"' && idx > 0 && line[idx-1] != '\\' && line[idx-1] == '=') 1259 isval = true; 1260 else if (a == '"' && idx > 0 && isval && line[idx-1] != '\\') 1261 isval = false; 1262 idx++; 1263 return false; 1264 }).array; 1265 1266 assert(parts.length == 3 || parts.length == 2, 1267 text("Illegal number of parts( ", parts.length, ") in ", line)); 1268 1269 auto nameTags = parts[0].split(","); 1270 const name = nameTags[0]; 1271 auto tags = nameTags[1..$]; 1272 1273 auto fields = parts[1].split(","); 1274 1275 auto newNameTags = chain([name], sort(tags)).join(","); 1276 auto newFields = sort(fields).join(","); 1277 auto newParts = [newNameTags, newFields]; 1278 if(parts.length > 2) newParts ~= parts[2]; 1279 1280 return newParts.join(" "); 1281 } 1282 1283 sortLine(actual).shouldEqual(sortLine(expected), file, line); 1284 } 1285 }