These Things Happen

fixing bugs and chewing bubblegum

Building and Showing

On and off I’ve been working on a site that displays candlestick charts using data from the Guild Wars 2 trading post, but I’ve never publicly linked to it.

Part of the reason is that it’s not a finished product and I’m not a web designer. But why should that stop me, eh? It’s already been an interesting product involving:

  • a service retrieving the data from the GW2 API
  • optimising sql to calculate the candlesticks quickly
  • an RTL service that takes old data and moves to a better long term storage format and then populates a cache
  • a website which must merge cached and uncached data before returning the candlesticks back to the caller.

It didn’t start out with 2 services, a website, and a cache though. It started with a program, written in Go that would call an endpoint once an hour and then save the results in a sqlite database.

At that time I didn’t know what I wanted to do with the data, I just knew I wanted to do something.

Eventually, that something became me answering the question ‘what questions do I want the answers to?’ As it turns out, what I want is to answer yet another question, ‘should I buy, sell, or make this item, and should I do it now.’ Now this question I could answer! But my tools at the time would become problematic.

I chose to display historical data using candlestick charts, as I like the view of how prices are moving in a given time period. Calculating open, close, min and max using sqlite proved to be an interesting problem. It was possible though, with some sub-queries, and with some tradeoffs. For example I could only get results for one item at a time, and the larger the dataset, the slower the calculation got. But, most importantly, it gave me a place to start. And it worked!

Once the requirements started shaking out, infrastructure changes became frequent. I was inside the AWS free tier, but as the sqlite file grew, I started to get worried about EBS storage and keeping things free. So my architecture had to change and I moved to using RDS and PostgreSQL.

Then the fact that the data, once inserted into PostgreSQL, was effectivley dead, started grating on me, and also filling up my free tier allotments of space in RDS.

So I brought in an RTL process to take the dead data out of PostgreSQL, store it more efficiently, and use it to populate a cache.

And then the cache grew too fast and I started seeing evictions. But new problem means new solution. I needed a better way (or even a way) of compressing the data going into the cache. I’ve worked with Protobuf before so after a bit of a search around to see if Avro would be an immediate better fit, I decided to go with compression via Protobuf. I also reworked the structure of the stored data, because once it was compressed, the keys were still a major source of bloat. And that worked beautifully. I went from being able to store a few months of data to at few years.

And that’s where things stand right now. The UI isn’t much to talk about but it’s clean (sparse some may say) and it’s pretty zippy. Which is a long way from the minutes it used to take to load only a single month of data.

http://www.gw2roar.com

I’m going to continue to talk more about the choices I made, and continue to make, and why I had to make them with this project. For example, the significant work around keeping both cache and database sizes sane, automating the ETL process, automating deployments, and anything that comes next.

Graceful Shutdown of Java Apps Under Docker

tl;dr application-interrupts

I ran across a problem recently when working on a Java application that needed to be allowed to finish processing its current batch before shutting down after the receipt of a shut down signal.

This app has a main loop that receives messages off an AWS SQS queue, processes those messages, takes action if required (making a put request to a third party), and then deletes the messages off the queue.

Each action must only ever send a unique record to the api. Because the third party doesn’t expose any unique identifier for a record, though it does provide an endpoint to get a list of all existing records, the application itself must handle the idea of uniqueness.

So, in the case of a double whammy of my app being shut down after taking action, but before deleting the message from the queue, and the third party being slow to update the list of previous requests, I could end up sending duplicate records when the new app starts up.

No problem, we’ve got SIGTERM

When docker stop is called on a container, SIGTERM is sent to PID 1, and in Java a shutdown hook is used to catch the SIGTERM and clean up any resources before finally stopping.

In this case though, it’s not resources that need cleaning up. This is a continuously running application that needs to finish any work currently in process before returning out of the main loop and stopping.

As it happens, Java has a good way of letting a Thread know that the application would like it to shut down before it starts its next itteration of work.

Interrupts

A Thread in Java has an interrupted method which returns true if the Thread has been interrupted since the last time Thread.interrupted() was called. (more on Interrupts)

Why are interrupts useful

In the main loop a condition of while(!Thread.interrupted()) will allow the while block to run to completion, but also prevent the next execution if an interrupt occured during the previous run, which is exactly what needs to happen to allow messages to complete processing before the app shuts down.

How to interrupt a thread

The short answer is ‘by invoking Thread.interrupt’ in the shutdown hook.

1
2
3
4
5
6
7
8
9
10
11
12
private static void threadInterruptedOnShutdown(long timeout) {
    // Setting interrupt doesn't cause the application to wait for the thread to exit.
    // The statements inside the interrupt block in the loop may or may not be executed.
    String name = "threadInterruptedOnShutdown wait " + timeout;
    Thread t = new Thread(new RunnableLoop(name, timeout));
    t.start();
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        System.out.println(name + " thread: setting interrupt");
        t.interrupt();
        System.out.println(name + " thread: shutting down");
    }));
}

Just calling interrupt on a Thread doesn’t give the control needed to ensure the current work is completed. It doesn’t necessarily wait on the Thread to exit before letting the application shut down. There’s no reason you couldn’t write the code to handle this of course, but the ExecutorService already provides that for us.

When submitting a Runnable (or Callable) to the ExecutorService, a handle for a Future is returned. Working together, the Future and the ExecutorService give control over interrupting Threads, waiting for them to exit, and if anything fails to exit, providing an opportunity to do any damage control.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private static void futureFromExecutorService(long timeout) {
    // the executor service submit method allows us to get a handle on the thread
    // via a future and set the interrupt in the shutdown hook
    String name = "futureFromExecutorService wait " + timeout;
    ExecutorService service = Executors.newSingleThreadExecutor();

    Future<?> app = service.submit(new RunnableLoop(name, timeout));

    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        System.out.println(name + " thread: setting interrupt");
        app.cancel(true);
        service.shutdown();

        try {
            // give the thread time to shutdown. This needs to be comfortably less than the
            // time the docker stop command will wait for a container to terminate on its own
            // before forcibly killing it.
            if (!service.awaitTermination(7, TimeUnit.SECONDS)) {
                System.out.println(name + " thread: did not shutdown in time, forcing service shutdown");
                service.shutdownNow();
            } else {
                System.out.println(name + " thread: shutdown cleanly");
            }
        } catch (InterruptedException e) {
            System.out.println(name + " thread: shutdown timer interrupted, forcing service shutdown");
            service.shutdownNow();
        }
    }));
}

To interrupt the Thread when SIGTERM is received, inside the shutdown hook call .cancel(true) on the Future. The boolean parameter allows the cancel method to interrupt a running Thread. Without that parameter, the Thread will not be interrupted.

Once the Future is cancelled, the service can begin shutting down. The ExecutorService has two types of shutdown. .shutdown() will stop any more work from being submitted to the service while allowing existing work to execute. .shutdownNow() on the other hand actively attempts to stop all running tasks.

After calling .shutdown() use the ExecutorService’s .awaitTermination method to both give the Threads time to finish any current work and also to handle those that do not return in time. Set the timout argument to be less than that of the docker stop command so that there will be time to do damage control and attempt a final .shutdownNow() before docker kills the container for being non-responsive.

All together, this allows a continously running application to respond to shutdown requests in a timely manner and also complete the work that is currently in process.

So You Need to Edit a Parquet File

You’ve uncovered a problem in your beautiful parquet files, some piece of data either snuck in, or was calculated incorrectly, or there was just a bug. You know exactly how to correct the data, but how do you update the files?

tl;dr: spark-edit-examples

It’s all immutable

The problem we have when we need to edit the data is that our data structures are immutable.

You can add partitions to Parquet files, but you can’t edit the data in place. Spark DataFrames are immutable.

But ultimately we can mutate the data, we just need to accept that we won’t be doing it in place. We will need to recreate the Parquet files using a combination of schemas and UDFs to correct the bad data.

Schemas

Reading in data using a schema gives you a lot of power over the resultant structure of the DataFrame (not to mention it makes reading in json files a lot faster, and will allow you to union compatible Parquet files)

Case 1: I need to drop an entire column

To drop an entire column, read the data in with a schema that doesn’t contain that column. When you write the DataFrame back out, the column will no longer exist

ColumnTransform.scala

1
2
3
4
5
6
7
8
9
10
11
object ColumnTransform {

  def transform(spark: SparkSession, sourcePath: String, destPath: String): Unit = {

    // read in the data with a new schema
    val allGoodData = spark.read.schema(ColumnDropSchema.schema).parquet(sourcePath)

    // write out the final edited data
    allGoodData.write.parquet(destPath)
  }
}

Case 2: I need to drop full rows of data

To drop full rows, read in the data and select the data you want to save into a new DataFrame using a where clause. When you write the new DataFrame it will only have the rows that match the where clause.

WhereTransform.scala

1
2
3
4
5
6
7
8
9
10
11
12
object WhereTransform {

  def transform(spark: SparkSession, sourcePath: String, destPath: String): Unit = {
    val originalData = spark.read.schema(RawDataSchema.schema).parquet(sourcePath)

    // select only the good data rows
    val allGoodData = originalData.where("myField is null")

    // write out the final edited data
    allGoodData.write.parquet(destPath)
  }
}

User Defined Functions (UDFs)

UDFs in Spark are used to apply functions to a row of data. The result of the UDF becomes the field value.

Note that when using UDFs you must alias the resultant column otherwise it will end up renamed similar to UDF(fieldName)

Case 3: I need to edit the value of a simple type (String, Boolean, …)

To edit a simple type you first need to create a function that takes and returns the same type.

This function is then registered for use as a UDF and it can then be applied to a field in a select clause

SimpleTransform.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
object SimpleTransform {

  def transform(spark: SparkSession, sourcePath: String, destPath: String): Unit = {
    val originalData = spark.read.schema(RawDataSchema.schema).parquet(sourcePath)

    // take in a String, return a String
    // cleanFunc takes the String field value and return the empty string in its place
    // you can interrogate the value and return any String here
    def cleanFunc: (String => String) = { _ => "" }

    // register the func as a udf
    val clean = udf(cleanFunc)

    // required for the $ column syntax
    import spark.sqlContext.implicits._

    // if you have data that doesn't need editing, you can separate it out
    // The data will need to be in a form that can be unioned with the edited data
    // That can be done by selecting out the fields in the same way in both the good and transformed data sets.
    val alreadyGoodData = originalData.where("myField is null").select(
      Seq[Column](
        $"myField",
        $"myMap",
        $"myStruct"
      ):_*
    )

    // apply the udf to the fields that need editing
    // selecting out all the data that will be present in the final parquet file
    val transformedData = originalData.where("myField is not null").select(
      Seq[Column](
        clean($"myField").as("myField"),
        $"myMap",
        $"myStruct"
      ):_*
    )

    // union the two DataFrames
    val allGoodData = alreadyGoodData.union(transformedData)

    // write out the final edited data
    allGoodData.write.parquet(destPath)
  }
}

Case 4: I need to edit the value of a MapType

MapTypes follow the same pattern as simple types. You write a function that takes a Map of the correct key and value types and returns a Map of the same types.

In the following example, an entire entry in the Map[String,String] is removed from the final data by filtering on the keyset.

MapTypeTransform.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
object MapTypeTransform {

  def transform(spark: SparkSession, sourcePath: String, destPath: String): Unit = {
    val originalData = spark.read.schema(RawDataSchema.schema).parquet(sourcePath)

    // cleanFunc will simply take the MapType and return an edited Map
    // in this example it removes one member of the map before returning
    def cleanFunc: (Map[String, String] => Map[String, String]) = { m => m.filterKeys(k => k != "editMe") }

    // register the func as a udf
    val clean = udf(cleanFunc)

    // required for the $ column syntax
    import spark.sqlContext.implicits._

    // if you have data that doesn't need editing, you can separate it out
    // The data will need to be in a form that can be unioned with the edited data
    // I do that here by selecting out all the fields.
    val alreadyGoodData = originalData.where("myMap.editMe is null").select(
      Seq[Column](
        $"myField",
        $"myMap",
        $"myStruct"
      ):_*
    )

    // apply the udf to the fields that need editing
    // selecting out all the data that will be present in the final parquet file
    val transformedData = originalData.where("myMap.editMe is not null").select(
      Seq[Column](
        $"myField",
        clean($"myMap").as("myMap"),
        $"myStruct"
      ):_*
    )

    // union the two DataFrames
    val allGoodData = alreadyGoodData.union(transformedData)

    // write out the final edited data
    allGoodData.write.parquet(destPath)
  }
}

Case 5: I need to change the value of a member of a StructType

Working with StructTypes requires an addition to the UDF registration statement. By supplying the schema of the StructType you are able to manipulate using a function that takes and returns a Row.

As Rows are immutable, a new Row must be created that has the same field order, type, and number as the schema. But, since the schema of the data is known, it’s relatively easy to reconstruct a new Row with the correct fields.

StructTypeTransform.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
object StructTypeTransform {

  def transform(spark: SparkSession, sourcePath: String, destPath: String): Unit = {
    val originalData = spark.read.schema(RawDataSchema.schema).parquet(sourcePath)

    // cleanFunc will take the struct as a Row and return a new Row with edited fields
    // note that the ordering and count of the fields must remain the same
    def cleanFunc: (Row => Row) = { r => RowFactory.create(r.getAs[BooleanType](0), "") }

    // register the func as a udf
    // give the UDF a schema or the Row type won't be supported
    val clean = udf(cleanFunc,
      StructType(
        StructField("myField", BooleanType, true) ::
          StructField("editMe", StringType, true) ::
          Nil
      )
    )

    // required for the $ column syntax
    import spark.sqlContext.implicits._

    // if you have data that doesn't need editing, you can separate it out
    // The data will need to be in a form that can be unioned with the edited data
    // I do that here by selecting out all the fields.
    val alreadyGoodData = originalData.where("myStruct.editMe is null").select(
      Seq[Column](
        $"myField",
        $"myStruct",
        $"myMap"
      ):_*
    )

    // apply the udf to the fields that need editing
    // selecting out all the data that will be present in the final parquet file
    val transformedData = originalData.where("myStruct.editMe is not null").select(
      Seq[Column](
        $"myField",
        clean($"myStruct").as("myStruct"),
        $"myMap"
      ):_*
    )

    // union the two DataFrames
    val allGoodData = alreadyGoodData.union(transformedData)

    // write out the final edited data
    allGoodData.write.parquet(destPath)
  }
}

Finally

Always test your transforms before you delete the original data!

Exploring Spark SQL DataTypes

I’ve been exploring how different DataTypes in Spark SQL are imported from line delimited json to try to understand which DataTypes can be used for a semi-structured data set I’m converting to parquet files. The data won’t all be processed at once and the schema will need to grow, so it’s imperative that the parquet files have schemas that are compatible.

The only one I really can’t get working yet is the CalendarIntervalType.

Looking at the Spark source files literals.scala and CalendarInterval.java I would assume that CalendarInterval.fromString is called with the value, however I’m just getting nulls back when passing in a value like ‘interval 2 days’ which, when passed to CalendarInterval.fromString, returns a non-null CalendarInterval.

Source code for the tests is at: https://github.com/dyanarose/dlr-spark

Results:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
-------------- DecimalType --------------
------- DecimalType Input
{'decimal': 1.2345}
{'decimal': 1}
{'decimal': 234.231}
{'decimal': Infinity}
{'decimal': -Infinity}
{'decimal': NaN}
{'decimal': '1'}
{'decimal': '1.2345'}
{'decimal': null}

------- DecimalType Inferred Schema
root
 |-- decimal: string (nullable = true)

+-----------+
|    decimal|
+-----------+
|     1.2345|
|          1|
|    234.231|
| "Infinity"|
|"-Infinity"|
|      "NaN"|
|          1|
|     1.2345|
|       null|
+-----------+


------- DecimalType Set Schema
root
 |-- decimal: decimal(6,3) (nullable = true)

+-------+
|decimal|
+-------+
|  1.235|
|  1.000|
|234.231|
|   null|
|   null|
|   null|
|   null|
|   null|
|   null|
+-------+



-------------- BooleanType --------------
------- BooleanType Input
{'boolean': true}
{'boolean': false}
{'boolean': 'false'}
{'boolean': 'true'}
{'boolean': null}
{'boolean': 1}
{'boolean': 0}
{'boolean': '1'}
{'boolean': '0'}
{'boolean': 'a'}

------- BooleanType Inferred Schema
root
 |-- boolean: string (nullable = true)

+-------+
|boolean|
+-------+
|   true|
|  false|
|  false|
|   true|
|   null|
|      1|
|      0|
|      1|
|      0|
|      a|
+-------+


------- BooleanType Set Schema
root
 |-- boolean: boolean (nullable = true)

+-------+
|boolean|
+-------+
|   true|
|  false|
|   null|
|   null|
|   null|
|   null|
|   null|
|   null|
|   null|
|   null|
+-------+



-------------- ByteType --------------
------- ByteType Input
{'byte': 'a'}
{'byte': 'b'}
{'byte': 1}
{'byte': 0}
{'byte': 5}
{'byte': null}

------- ByteType Inferred Schema
root
 |-- byte: string (nullable = true)

+----+
|byte|
+----+
|   a|
|   b|
|   1|
|   0|
|   5|
|null|
+----+


------- ByteType Set Schema
root
 |-- byte: byte (nullable = true)

+----+
|byte|
+----+
|null|
|null|
|   1|
|   0|
|   5|
|null|
+----+



-------------- CalendarIntervalType --------------
------- CalendarIntervalType Input
{'calendarInterval': 'interval 2 days'}
{'calendarInterval': 'interval 1 week'}
{'calendarInterval': 'interval 5 years'}
{'calendarInterval': 'interval 6 months'}
{'calendarInterval': 10}
{'calendarInterval': 'interval a'}
{'calendarInterval': null}

------- CalendarIntervalType Inferred Schema
root
 |-- calendarInterval: string (nullable = true)

+-----------------+
| calendarInterval|
+-----------------+
|  interval 2 days|
|  interval 1 week|
| interval 5 years|
|interval 6 months|
|               10|
|       interval a|
|             null|
+-----------------+


------- CalendarIntervalType Set Schema
root
 |-- calendarInterval: calendarinterval (nullable = true)

+----------------+
|calendarInterval|
+----------------+
|            null|
|            null|
|            null|
|            null|
|            null|
|            null|
|            null|
+----------------+



-------------- DateType --------------
------- DateType Input
{'date': '2016-04-24'}
{'date': '0001-01-01'}
{'date': '9999-12-31'}
{'date': '2016-04-24 12:10:01'}
{'date': 1461496201000}
{'date': null}

------- DateType Inferred Schema
root
 |-- date: string (nullable = true)

+-------------------+
|               date|
+-------------------+
|         2016-04-24|
|         0001-01-01|
|         9999-12-31|
|2016-04-24 12:10:01|
|      1461496201000|
|               null|
+-------------------+


------- DateType Set Schema
root
 |-- date: date (nullable = true)

+----------+
|      date|
+----------+
|2016-04-24|
|0001-01-01|
|9999-12-31|
|2016-04-24|
|      null|
|      null|
+----------+



-------------- DoubleType --------------
------- DoubleType Input
{'double': 1.23456}
{'double': 1}
{'double': 1.7976931348623157E308}
{'double': -1.7976931348623157E308}
{'double': Infinity}
{'double': -Infinity}
{'double': NaN}
{'double': '1'}
{'double': '1.23456'}
{'double': null}

------- DoubleType Inferred Schema
root
 |-- double: string (nullable = true)

+--------------------+
|              double|
+--------------------+
|             1.23456|
|                   1|
|1.797693134862315...|
|-1.79769313486231...|
|          "Infinity"|
|         "-Infinity"|
|               "NaN"|
|                   1|
|             1.23456|
|                null|
+--------------------+


------- DoubleType Set Schema
root
 |-- double: double (nullable = true)

+--------------------+
|              double|
+--------------------+
|             1.23456|
|                 1.0|
|1.797693134862315...|
|-1.79769313486231...|
|            Infinity|
|           -Infinity|
|                 NaN|
|                null|
|                null|
|                null|
+--------------------+



-------------- FloatType --------------
------- FloatType Input
{'float': 1.23456}
{'float': 1}
{'float': 3.4028235E38}
{'float': -3.4028235E38}
{'float': Infinity}
{'float': -Infinity}
{'float': NaN}
{'float': '1'}
{'float': '1.23456'}
{'float': null}

------- FloatType Inferred Schema
root
 |-- float: string (nullable = true)

+-------------+
|        float|
+-------------+
|      1.23456|
|            1|
| 3.4028235E38|
|-3.4028235E38|
|   "Infinity"|
|  "-Infinity"|
|        "NaN"|
|            1|
|      1.23456|
|         null|
+-------------+


------- FloatType Set Schema
root
 |-- float: float (nullable = true)

+-------------+
|        float|
+-------------+
|      1.23456|
|          1.0|
| 3.4028235E38|
|-3.4028235E38|
|     Infinity|
|    -Infinity|
|          NaN|
|         null|
|         null|
|         null|
+-------------+



-------------- IntegerType --------------
------- IntegerType Input
{'integer': 1}
{'integer': 2147483647}
{'integer': -2147483648}
{'integer': 2147483648}
{'integer': '1'}
{'integer': 1.23456}
{'integer': '1.23456'}
{'integer': null}

------- IntegerType Inferred Schema
root
 |-- integer: string (nullable = true)

+-----------+
|    integer|
+-----------+
|          1|
| 2147483647|
|-2147483648|
| 2147483648|
|          1|
|    1.23456|
|    1.23456|
|       null|
+-----------+


------- IntegerType Set Schema
root
 |-- integer: integer (nullable = true)

+-----------+
|    integer|
+-----------+
|          1|
| 2147483647|
|-2147483648|
|       null|
|       null|
|       null|
|       null|
|       null|
+-----------+



-------------- LongType --------------
------- LongType Input
{'long': 1}
{'long': 9223372036854775807}
{'long': -9223372036854775808}
{'long': '1'}
{'long': 1.23456}
{'long': '1.23456'}
{'long': null}

------- LongType Inferred Schema
root
 |-- long: string (nullable = true)

+--------------------+
|                long|
+--------------------+
|                   1|
| 9223372036854775807|
|-9223372036854775808|
|                   1|
|             1.23456|
|             1.23456|
|                null|
+--------------------+


------- LongType Set Schema
root
 |-- long: long (nullable = true)

+--------------------+
|                long|
+--------------------+
|                   1|
| 9223372036854775807|
|-9223372036854775808|
|                null|
|                null|
|                null|
|                null|
+--------------------+



-------------- MapType --------------
------- MapType Input
{'map': {'a_key': 'a value', 'b_key': 'b value'}}
{'map': {'key': 1, 'key1': null}}
{'map': null}

------- MapType Inferred Schema
root
 |-- map: struct (nullable = true)
 |    |-- a_key: string (nullable = true)
 |    |-- b_key: string (nullable = true)
 |    |-- key: long (nullable = true)
 |    |-- key1: string (nullable = true)

+--------------------+
|                 map|
+--------------------+
|[a value,b value,...|
|  [null,null,1,null]|
|                null|
+--------------------+


------- MapType Set Schema
root
 |-- map: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+--------------------+
|                 map|
+--------------------+
|Map(a_key -> a va...|
|Map(key -> 1, key...|
|                null|
+--------------------+



-------------- NullType --------------
------- NullType Input
{'null': null}
{'null': true}
{'null': false}
{'null': 1}
{'null': 0}
{'null': '1'}
{'null': '0'}
{'null': 'a'}

------- NullType Inferred Schema
root
 |-- null: string (nullable = true)

+-----+
| null|
+-----+
| null|
| true|
|false|
|    1|
|    0|
|    1|
|    0|
|    a|
+-----+


------- NullType Set Schema
root
 |-- null: null (nullable = true)

+----+
|null|
+----+
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
+----+



-------------- ShortType --------------
------- ShortType Input
{'short': 0}
{'short': 1}
{'short': 32767}
{'short': -32768}
{'short': 32768}
{'short': 1.23456}
{'short': '0'}
{'short': '1'}
{'short': '1.23456'}
{'short': null}

------- ShortType Inferred Schema
root
 |-- short: string (nullable = true)

+-------+
|  short|
+-------+
|      0|
|      1|
|  32767|
| -32768|
|  32768|
|1.23456|
|      0|
|      1|
|1.23456|
|   null|
+-------+


------- ShortType Set Schema
root
 |-- short: short (nullable = true)

+------+
| short|
+------+
|     0|
|     1|
| 32767|
|-32768|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
+------+



-------------- TimestampType --------------
------- TimestampType Input
{'timestamp': '2016-04-24'}
{'timestamp': '2016-04-24 12:10:01'}
{'timestamp': 1461496201000}
{'timestamp': '0001-01-01'}
{'timestamp': '9999-12-31'}
{'timestamp': null}

------- TimestampType Inferred Schema
root
 |-- timestamp: string (nullable = true)

+-------------------+
|          timestamp|
+-------------------+
|         2016-04-24|
|2016-04-24 12:10:01|
|      1461496201000|
|         0001-01-01|
|         9999-12-31|
|               null|
+-------------------+


------- TimestampType Set Schema
root
 |-- timestamp: timestamp (nullable = true)

+--------------------+
|           timestamp|
+--------------------+
|2016-04-24 00:00:...|
|2016-04-24 12:10:...|
|2016-04-24 12:10:...|
|0001-01-01 00:00:...|
|9999-12-31 00:00:...|
|                null|
+--------------------+



Preventing Duplication When Creating Relationships in Neo4j

Creating relationships between known nodes using Cypher in Neo4j is simple.

MATCH (p:Person), (e:Episode)
CREATE (p) - [:INTERVIEWED_IN] -> (e)

But what it you don’t know if one of the nodes exists? And further, what if you don’t know if the relationship itself already exists?

  • If the node doesn’t exist, I want it to be created.
  • If the relationship doesn’t exist, I want it to be created.
  • If both node and relationship exist, then nothing should be changed

The simple scenario is of a set of Episode nodes and a set of Person nodes.

CREATE (e:Episode {title:"foo", subtitle:"bar"})
return e
CREATE (p:Person {name:"Lynn Rose"})
return p

The Episode nodes are known to exist. The Person nodes may or may not exist.

My first attempts at getting this right in my working database were tragic failures. For example, the following, while working as intended in a new 2.0.1 database, fails and causes duplicate person nodes in my working database. (Adding a unique constraint on person.name causes the statement to throw an exception rather than create a duplicate)

MATCH (e:Episode {title: "foo"})
CREATE UNIQUE (e) <- [:INTERVIEWED_IN] - (p:Person {name:"Lynn Rose"})

As I said, the duplication doesn’t happen in a clean 2.0.1 database, so the problem must be with my working database.

  1. The database was originally created on Neo4j 1.9.3 and is now running under Neo4j 2.0.1
  2. The nodes created before the release of 2.x use the old style indexes.

But those facts aside, I still need to stop the duplication! So back to the Cypher documentation.

The Cypher documents for CREATE UNIQUE specify the following in a call out box:

MERGE might be what you want to use instead of CREATE UNIQUE

It’s MERGE that gives the ability to control what happens when a node is, or isn’t, matched. It does this through the syntax of ON MATCH and ON CREATE.

Using MERGE and ON CREATE I can get a handle on an existing person node to be able to use in our relationship creation, thus preventing duplication of person nodes.

MATCH (e:Episode {title: "foo"})
MERGE (p:Person {name: "Lynn Rose"})
ON CREATE SET p.firstname = "Lynn", p.surname = "Rose"
MERGE (e) <- [:INTERVIEWED_IN] - (p)

But, we’ve still got an issue here. This doesn’t necessarily solve the problem of duplicating relationships.

In that call out box in the CREATE UNIQUE documentation, it goes on to say:

Note however, that MERGE doesn’t give as strong guarantees for relationships being unique.

So I take from this that I should use MERGE to prevent node duplication, but CREATE UNIQUE should be used to prevent relationship duplication.

MATCH (e:Episode {title: "foo"})
MERGE (p:Person {name: "Lynn Rose"})
ON CREATE SET p.firstname = "Lynn", p.surname = "Rose"
CREATE UNIQUE (e) <- [:INTERVIEWED_IN] - (p)

And here we are.

  • If the node doesn’t exist, it is created using MERGE ON CREATE.
  • If the relationship doesn’t exist, it is created using CREATE UNIQUE.
  • If both node and relationship exist, then nothing is changed.