From 47cf6f332db128dd00326fd460d16d2b321d5d0c Mon Sep 17 00:00:00 2001
From: mxlxm <minzilu@me.com>
Date: Thu, 30 May 2019 01:04:31 +0800
Subject: [PATCH 1/5] update deps

---
 go.mod        |  8 +++++++-
 go.sum        | 25 +++++++++++++++++++++----
 river/sync.go |  2 +-
 3 files changed, 29 insertions(+), 6 deletions(-)

diff --git a/go.mod b/go.mod
index 5bd6bcc9..a0cedbd4 100644
--- a/go.mod
+++ b/go.mod
@@ -5,5 +5,11 @@ require (
 	github.com/juju/errors v0.0.0-20190207033735-e65537c515d7
 	github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726
 	github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07
-	github.com/siddontang/go-mysql v0.0.0-20190303113352-670f74e8daf5
+	github.com/siddontang/go-mysql v0.0.0-20190524062908-de6c3a84bcbe
 )
+
+replace golang.org/x/sys v0.0.0-20190529130038-5219a1e1c5f8 => github.com/golang/sys v0.0.0-20190529130038-5219a1e1c5f8
+
+replace golang.org/x/text v0.3.2 => github.com/golang/text v0.3.2
+
+replace golang.org/x/tools v0.0.0-20190529010454-aa71c3f32488 => github.com/golang/tools v0.0.0-20190529010454-aa71c3f32488
diff --git a/go.sum b/go.sum
index 97c0717c..8062e782 100644
--- a/go.sum
+++ b/go.sum
@@ -1,9 +1,21 @@
 github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
+github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/text v0.3.2 h1:vDAeTQXl8YUdGoj2vMsMnzHi1xMJJ9S7iwnTBFL/pkA=
+github.com/golang/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 github.com/juju/errors v0.0.0-20190207033735-e65537c515d7 h1:dMIPRDg6gi7CUp0Kj2+HxqJ5kTr1iAdzsXYIrLCNSmU=
 github.com/juju/errors v0.0.0-20190207033735-e65537c515d7/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ=
 github.com/pingcap/errors v0.11.0 h1:DCJQB8jrHbQ1VVlMFIrbj2ApScNNotVmkSNplu2yUt4=
 github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
+github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825 h1:U9Kdnknj4n2v76Mg7wazevZ5N9U1OIaMwSNRVLEcLX0=
+github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
+github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330 h1:rRMLMjIMFulCX9sGKZ1hoov/iROMsKyC8Snc02nSukw=
+github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
 github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
 github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE=
@@ -12,7 +24,12 @@ github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8
 github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw=
 github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 h1:oI+RNwuC9jF2g2lP0u0cVEEZrc/AYBCuFdvwrLWM/6Q=
 github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4=
-github.com/siddontang/go-mysql v0.0.0-20190123011128-88e9cd7f6643 h1:yzg8+Cip1iDhy6GGS1zKflqOybgRc4xp82eYwQrP+DU=
-github.com/siddontang/go-mysql v0.0.0-20190123011128-88e9cd7f6643/go.mod h1:/b8ZcWjAShCcHp2dWpjb1vTlNyiG03UeHEQr2jteOpI=
-github.com/siddontang/go-mysql v0.0.0-20190303113352-670f74e8daf5 h1:5Nr7spTeY+ziXzqk/9p+GLnvH4rIjp9BX+aRaYDbR44=
-github.com/siddontang/go-mysql v0.0.0-20190303113352-670f74e8daf5/go.mod h1:/b8ZcWjAShCcHp2dWpjb1vTlNyiG03UeHEQr2jteOpI=
+github.com/siddontang/go-mysql v0.0.0-20190524062908-de6c3a84bcbe h1:HMx2v6kZjEt9CGzdpIubWsEEi3j8LCoBPMv2M98NxqU=
+github.com/siddontang/go-mysql v0.0.0-20190524062908-de6c3a84bcbe/go.mod h1:Bl4lryU44qtIXEXNbP0k0pD646Nkw/qHn21wfZVGJx4=
+github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
+github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
+github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
diff --git a/river/sync.go b/river/sync.go
index 3b3854ee..0477e757 100644
--- a/river/sync.go
+++ b/river/sync.go
@@ -103,7 +103,7 @@ func (h *eventHandler) OnGTID(gtid mysql.GTIDSet) error {
 	return nil
 }
 
-func (h *eventHandler) OnPosSynced(pos mysql.Position, force bool) error {
+func (h *eventHandler) OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error {
 	return nil
 }
 

From 0b2b00aa6251c97a08b37c95458bfd3333ed4472 Mon Sep 17 00:00:00 2001
From: mxlxm <minzilu@me.com>
Date: Thu, 30 May 2019 01:10:37 +0800
Subject: [PATCH 2/5] enable millisecond timestamp to date

---
 README.md     | 2 +-
 river/sync.go | 6 +++++-
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/README.md b/README.md
index 4992d48b..72d530b0 100644
--- a/README.md
+++ b/README.md
@@ -105,7 +105,7 @@ type = "t"
     // This will map column title to elastic search title and use array type
     title=",list"
 
-    // If the created_time field type is "int", and you want to convert it to "date" type in es, you can do it as below
+    // If the created_time field type is "int/bigint" which holds `timestamp`/`millisecond timestamp`, and you want to convert it to "date" type in es, you can do it as below
     created_time=",date"
 ```
 
diff --git a/river/sync.go b/river/sync.go
index 0477e757..daaf0160 100644
--- a/river/sync.go
+++ b/river/sync.go
@@ -4,6 +4,7 @@ import (
 	"bytes"
 	"encoding/json"
 	"fmt"
+	"math"
 	"reflect"
 	"strings"
 	"time"
@@ -512,7 +513,10 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i
 			v := reflect.ValueOf(value)
 			switch v.Kind() {
 			case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
-				fieldValue = r.makeReqColumnData(col, time.Unix(v.Int(), 0).Format(mysql.TimeFormat))
+				// es accepts millisecond timestamp for date, so ignore it
+				if v.Int() < math.MaxInt32 {
+					fieldValue = r.makeReqColumnData(col, time.Unix(v.Int(), 0).Format(mysql.TimeFormat))
+				}
 			}
 		}
 	}

From 7cbcb0e13186d062d69be0c68bfb25dfbd38a9fa Mon Sep 17 00:00:00 2001
From: mxlxm <minzilu@me.com>
Date: Thu, 30 May 2019 05:13:28 +0800
Subject: [PATCH 3/5] conver millisecond timestamp to date

---
 river/sync.go | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/river/sync.go b/river/sync.go
index daaf0160..e29eab63 100644
--- a/river/sync.go
+++ b/river/sync.go
@@ -514,7 +514,9 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i
 			switch v.Kind() {
 			case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
 				// es accepts millisecond timestamp for date, so ignore it
-				if v.Int() < math.MaxInt32 {
+				if v.Int() > math.MaxInt32 {
+					fieldValue = r.makeReqColumnData(col, time.Unix(0, v.Int()).Format(mysql.TimeFormat))
+				} else {
 					fieldValue = r.makeReqColumnData(col, time.Unix(v.Int(), 0).Format(mysql.TimeFormat))
 				}
 			}

From d6b9813fd5e2ac3f3beb2b63ec99423d2c253105 Mon Sep 17 00:00:00 2001
From: mxlxm <minzilu@me.com>
Date: Thu, 30 May 2019 05:15:14 +0800
Subject: [PATCH 4/5] remove useless

---
 river/sync.go | 1 -
 1 file changed, 1 deletion(-)

diff --git a/river/sync.go b/river/sync.go
index e29eab63..e44ac0ce 100644
--- a/river/sync.go
+++ b/river/sync.go
@@ -513,7 +513,6 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i
 			v := reflect.ValueOf(value)
 			switch v.Kind() {
 			case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
-				// es accepts millisecond timestamp for date, so ignore it
 				if v.Int() > math.MaxInt32 {
 					fieldValue = r.makeReqColumnData(col, time.Unix(0, v.Int()).Format(mysql.TimeFormat))
 				} else {

From 89aa8c6baa725be4c003d6a27d47e77044b94fb9 Mon Sep 17 00:00:00 2001
From: luxiaomin <luxiaomin@oneship.cc>
Date: Thu, 30 May 2019 12:03:53 +0800
Subject: [PATCH 5/5] unit fix

---
 river/sync.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/river/sync.go b/river/sync.go
index e44ac0ce..0ec0d18f 100644
--- a/river/sync.go
+++ b/river/sync.go
@@ -514,7 +514,7 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i
 			switch v.Kind() {
 			case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
 				if v.Int() > math.MaxInt32 {
-					fieldValue = r.makeReqColumnData(col, time.Unix(0, v.Int()).Format(mysql.TimeFormat))
+					fieldValue = r.makeReqColumnData(col, time.Unix(0, int64(time.Millisecond)*v.Int()).Format(mysql.TimeFormat))
 				} else {
 					fieldValue = r.makeReqColumnData(col, time.Unix(v.Int(), 0).Format(mysql.TimeFormat))
 				}