【小项目】SQL server数据实时同步到mysql

1. 利用Navicat 15 for MySQL从SQL Server同步到mysql

操作步骤
导出数据

2. 利用xorm-reverse工具生成数据库表结构

按照xorm-reverse安装步骤,生成models文件夹,也便知道了数据库有哪些表与字段

3. 构建用于存储每个表的最新ID数据库

进入mysql,创建表记录表record
table_name是要记录的表的名称,last_idsql server取到数据的最后一个id

$ mysql -h 10.0.0.0 -P 3306 -u wzz -p 

CREATE TABLE IF NOT EXISTS `record`(
      `id` INT UNSIGNED AUTO_INCREMENT,
      `table_name` VARCHAR(100) NOT NULL,
      `last_id` bigint(20) NOT NULL,
      `created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
      `updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
       PRIMARY KEY ( `id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `record` (`table_name`, `last_id`, `created_at`, `updated_at`) VALUES ('users', '0',  '2021-05-25 17:29:34', '2020-05-25 17:29:34');

4. 监测数据库变化(golang代码实现)

项目代码源码

4.1. 目录结构

sqlserver
├── config  //配置文件
│   ├── config.go
│   ├── config_test.go
│   └── dev_custom.yml
├── dao // mysql数据库操作
│   ├── mysql.go // 初始化mysql
│   ├── record.go // 记录表操作
│   ├── record_test.go
│   ├── users.go 
│   └── users_test.go
├── go.mod
├── go.sum
├── logic // 业务逻辑
│   ├── logic.go
│   └── logic_test.go
├── main.go // 入口函数
├── models  // 数据库字段模型
│   ├── record.go
│   ├── reverse // xorm数据库自动生成表结构工具
│   │   ├── README.txt
│   │   └── gennerate.sh // 可执行文件
│   └── users.go
├── sqldao // sql server数据库操作
│   ├── sql.go // 初始化sql
│   ├── users.go // users表操作
│   └── users_test.go
└── sqlserver.exe

4.2.各个文件内容

4.2.1 dev_custom.yml

mySql:
  dns: "user:passwd@tcp(127.0.0.1:3306)/wzz_db?charset=utf8"

sqlServer:
  dns: "sqlserver://sa:123456@localhost:1433?database=wzz"

4.2.2 config.go

package config

import (
	"fmt"
	"github.com/mitchellh/mapstructure"
	"github.com/spf13/viper"
	"runtime"
	"strings"
)

// CustomT CustomT
type Mysql struct {
	DNS string `yaml:"dns"`
}

type SqlServer struct {
	DNS string `yaml:"dns"`
}

// CustomT CustomT
type CustomT struct {
	Mysql     Mysql     `yaml:"mySql"`
	SqlServer SqlServer `yaml:"sqlServer"`
}

// Custom Custom
var Custom CustomT

// ReadConfig ReadConfig for custom
func ReadConfig(configName string, configPath string, configType string) *viper.Viper {
	v := viper.New()
	v.SetConfigName(configName)
	v.AddConfigPath(configPath)
	v.SetConfigType(configType)
	err := v.ReadInConfig()
	if err != nil {
		return nil
	}
	return v
}

func CurrentFileDir() string {
	_, file, _, ok := runtime.Caller(1)
	if !ok {
		return "失败"
	}
	i := strings.LastIndex(file, "/")
	if i < 0 {
		i = strings.LastIndex(file, "\\")
	}

	return string(file[0 : i+1])
}

// InitConfig InitConfig
func InitConfig() {
	path := CurrentFileDir()
	v := ReadConfig("dev_custom", path, "yml")
	md := mapstructure.Metadata{}
	err := v.Unmarshal(&Custom, func(config *mapstructure.DecoderConfig) {
		config.TagName = "yaml"
		config.Metadata = &md
	})
	if err != nil {
		panic(err)
		return
	}
	fmt.Println("InitConfig Success!")
}

4.2.3 config_test.go

package config

import (
	"testing"
)

func TestUsers(t *testing.T) {
	t.Log(CurrentFileDir())
	InitConfig()
	t.Log("mysqlDNS=", Custom.Mysql.DNS)
	t.Log("sqlserverDNS=", Custom.SqlServer.DNS)
}

4.2.4 dao/mysql.go

package dao

import (
	"fmt"
	_ "github.com/go-sql-driver/mysql"
	"sqlserver/config"
	"xorm.io/xorm"
)

var Orm *xorm.Engine

var (
	Users  = &UsersDao{}
	Record = &RecordDao{}
)

func InitMysql() {
	orm, err := xorm.NewEngine("mysql", config.Custom.Mysql.DNS)
	if err != nil {
		fmt.Println("init mysql fail. err = ", err)
		return
	}
	Orm = orm
	fmt.Println("InitMysql success!")
}

4.2.5 dao/record.go

package dao

import (
	"context"
	"sqlserver/models"
	"time"
	"xorm.io/xorm"
)

type RecordDao struct{}

func (dao *RecordDao) AddTable(ctx context.Context, session *xorm.Session, tableName string, lastId int64) error {
	record := &models.Record{}
	record.TableName = tableName
	record.LastId = lastId
	record.CreatedAt = time.Now()
	record.UpdatedAt = time.Now()
	_, err := session.InsertOne(record)
	return err
}

func (dao *RecordDao) QueryLastIdByTableName(ctx context.Context, session *xorm.Session, tableName string) *models.Record {
	record := &models.Record{}
	record.TableName = tableName
	has, err := session.Get(record)
	if err != nil || !has {
		return nil
	}
	return record
}

func (dao *RecordDao) UpdateLastId(ctx context.Context, session *xorm.Session, record *models.Record) error {
	session.Where("table_name = ?", record.TableName)
	_, err := session.Cols("last_id", "updated_at").Update(record)
	if err != nil {
		return err
	}
	return nil
}

4.2.6 dao/record_test.go

package dao

import (
	"context"
	_ "github.com/go-sql-driver/mysql"
	"sqlserver/models"
	"testing"
	"time"
	"xorm.io/xorm"
)

var mysqlDNS = "root:123456@tcp(10.0.0.0:3306)/wzz_db?charset=utf8"

func TestAddRecord(t *testing.T) {
	ctx := context.Background()
	orm, err := xorm.NewEngine("mysql", mysqlDNS)
	if err != nil {
		t.Fatal("open mysql fail. err = ", err)
	}
	session := orm.Context(ctx)
	var dao RecordDao
	err = dao.AddTable(ctx, session, "job", 0)
	if err != nil {
		t.Fatal("AddTable fail. err = ", err)
	}
}

func TestQueryLastIdByTableName(t *testing.T) {
	ctx := context.Background()
	orm, err := xorm.NewEngine("mysql", mysqlDNS)
	if err != nil {
		t.Fatal("open mysql fail. err = ", err)
	}
	session := orm.Context(ctx)
	var dao RecordDao
	record := dao.QueryLastIdByTableName(ctx, session, "users")
	if record == nil {
		t.Fatal("QueryLastIdByTableName fail. err = ", err)
	}
	t.Log("lastId=", record.LastId)
}

func TestUpdateLastId(t *testing.T) {
	ctx := context.Background()
	orm, err := xorm.NewEngine("mysql", mysqlDNS)
	if err != nil {
		t.Fatal("open mysql fail. err = ", err)
	}
	session := orm.Context(ctx)
	var dao RecordDao
	record := &models.Record{1, "users", 5, time.Now(), time.Now()}
	err = dao.UpdateLastId(ctx, session, record)
	if err != nil {
		t.Fatal("UpdateLastId fail. err = ", err)
	}
	record = dao.QueryLastIdByTableName(ctx, session, "users")
	if record == nil {
		t.Fatal("QueryLastIdByTableName fail. err = ", err)
	}
	t.Log("lastId=", record.LastId)
}

4.2.7 dao/users.go

package dao

import (
	"context"
	"errors"
	"sqlserver/models"
	"xorm.io/xorm"
)

type UsersDao struct{}

func (dao *UsersDao) BatchAddUser(ctx context.Context, session *xorm.Session, users []*models.Users) error {
	if users == nil {
		return errors.New("Jobs object are nil.\n")
	}
	_, err := session.Insert(users)
	return err
}

func (dao *UsersDao) QueryByUserID(ctx context.Context, session *xorm.Session, userID int64) *models.Users {
	user := &models.Users{}
	user.Userid = userID
	has, err := session.Get(user)
	if err != nil || !has {
		return nil
	}
	return user
}

4.2.8 dao/users_test.go

package dao

import (
	"context"
	_ "github.com/go-sql-driver/mysql"
	"sqlserver/models"
	"testing"
	"xorm.io/xorm"
)

func TestBatchAddUsers(t *testing.T) {
	var users []*models.Users
	users = append(users, &models.Users{6, "wzz", 10})
	users = append(users, &models.Users{7, "gh", 20})

	ctx := context.Background()
	orm, err := xorm.NewEngine("mysql", mysqlDNS)
	if err != nil {
		t.Fatal("open mysql fail. err = ", err)
	}
	session := orm.Context(ctx)
	var dao UsersDao
	err = dao.BatchAddUser(ctx, session, users)
	if err != nil {
		t.Fatal("AddTable fail. err = ", err)
	}
	user := dao.QueryByUserID(ctx, session, 7)
	t.Log("user=", user)
}

4.2.9 logic/logic.go

package logic

import (
	"context"
	"fmt"
	cron "github.com/robfig/cron/v3"
	"sqlserver/dao"
	"sqlserver/sqldao"
)

const (
	cronSpecCheck = "*/10 * * * * *" //Every 30s
)

func Do() *cron.Cron {
	cron := cron.New(cron.WithSeconds())
	cron.AddFunc(cronSpecCheck, Supervisor)
	cron.Start()
	return cron
}

func Supervisor() {
	ctx := context.Background()
	session := dao.Orm.Context(ctx)
	record := dao.Record.QueryLastIdByTableName(ctx, session, "users")
	if record == nil {
		fmt.Println("查询失败!")
		return
	}
	fmt.Println("lastid=", record.LastId)
	users := sqldao.SqlUser.QueryByUserIDFromSqlserver(ctx, sqldao.Sql, record.LastId)
	if len(users) == 0 {
		fmt.Println("无更新...")
		return
	}
	fmt.Println("users=", users)
	for _, use := range users {
		fmt.Println(*use)
	}
	record.LastId = users[len(users)-1].Userid
	dao.Record.UpdateLastId(ctx, session, record)
	err := dao.Users.BatchAddUser(ctx, session, users)
	if err != nil {
		fmt.Println("添加失败!")
		return
	}
}

4.2.10 logic/logic_test.go

package logic

import (
	"github.com/robfig/cron/v3"
	"sqlserver/config"
	"sqlserver/dao"
	"sqlserver/sqldao"
	"testing"
	"time"
)

func TestLogic(t *testing.T) {
	config.InitConfig()
	dao.InitMysql()
	sqldao.InitSqlserver()
	Do()
}

func TestCron(t *testing.T) {
	c := cron.New()
	i := 1
	c.AddFunc("*/1 * * * *", func() {
		t.Log("每分钟执行一次", i)
		i++
	})
	c.Start()
	time.Sleep(time.Minute * 5)
}

4.2.11 models/record.go

package models

import (
	"time"
)

type Record struct {
	Id        int       `xorm:"not null pk autoincr INT(10)"`
	TableName string    `xorm:"not null VARCHAR(100)"`
	LastId    int64     `xorm:"not null BIGINT(20)"`
	CreatedAt time.Time `xorm:"default CURRENT_TIMESTAMP TIMESTAMP"`
	UpdatedAt time.Time `xorm:"default CURRENT_TIMESTAMP TIMESTAMP"`
}

4.2.12 modles/users.go

package models

type Users struct {
	Userid int64  `xorm:"INT(11)"`
	Name   string `xorm:"CHAR(10)"`
	Age    int    `xorm:"INT(11)"`
}

4.2.13 modles/reverse/custom.yml

kind: reverse
name: testdb
source:
  database: mysql
  conn_str: 'root:123456@tcp(10.0.0.0:3306)/wzz_db?parseTime=true'
targets:
- type: codes
  multiple_files: true
  language: golang
  output_dir: ../

4.2.14 modles/reverse/generate.sh

reverse -f custom.yml

4.2.15 sqldao/sql.go

package sqldao

import (
	"fmt"
	"gorm.io/driver/sqlserver"
	"gorm.io/gorm"
	"sqlserver/config"
)

var Sql *gorm.DB
var SqlUser = &SqlUsersDao{}

func InitSqlserver() {
	gSession, err := gorm.Open(sqlserver.Open(config.Custom.SqlServer.DNS), &gorm.Config{})
	if err != nil {
		panic(err)
	}
	Sql = gSession
	fmt.Println("InitSqlserver success!")
}

4.2.16 sqldao/users.go

package sqldao

import (
	"context"
	"gorm.io/gorm"
	"sqlserver/models"
)

type SqlUsersDao struct{}

func (dao *SqlUsersDao) QueryByUserIDFromSqlserver(ctx context.Context, gSession *gorm.DB, lastId int64) []*models.Users {
	var users = make([]*models.Users, 0)
	gSession.Table("dbo.users").Where("userid> ?", lastId).Find(&users)
	return users
}

func (dao *SqlUsersDao) QueryLastFromSqlserver(ctx context.Context, gSession *gorm.DB) models.Users {
	var user models.Users
	gSession.Table("dbo.users").Last(&user)
	return user
}

4.2.17 sqldao/users_test.go

package sqldao

import (
	"context"
	"fmt"
	"gorm.io/driver/sqlserver"
	"gorm.io/gorm"
	"testing"
)

var SQLDSN = "sqlserver://sa:123456@localhost:1433?database=wzz"

func TestUsers(t *testing.T) {
	gSession, err := gorm.Open(sqlserver.Open(SQLDSN), &gorm.Config{})
	if err != nil {
		panic(err)
	}
	ctx := context.Background()
	var dao SqlUsersDao
	usr := dao.QueryLastFromSqlserver(ctx, gSession)
	fmt.Println("last id=", usr.Userid)
	users := dao.QueryByUserIDFromSqlserver(ctx, gSession, 2)
	for _, u := range users {
		fmt.Println(u)
	}
}

4.2.18 go.mod

module sqlserver

go 1.15

require (
	github.com/alexbrainman/odbc v0.0.0-20200426075526-f0492dfa1575
	github.com/denisenkom/go-mssqldb v0.10.0
	github.com/go-ole/go-ole v1.2.5 // indirect
	github.com/go-sql-driver/mysql v1.5.0
	github.com/jinzhu/gorm v1.9.16
	github.com/mattn/go-adodb v0.0.1
	github.com/mitchellh/mapstructure v1.4.1
	github.com/robfig/cron/v3 v3.0.1
	github.com/spf13/viper v1.7.1
	golang.org/x/net v0.0.0-20210525063256-abc453219eb5 // indirect
	gorm.io/driver/sqlserver v1.0.7
	gorm.io/gorm v1.21.10
	xorm.io/xorm v1.1.0
)

4.2.19 main.go

package main

import (
	"sqlserver/config"
	"sqlserver/dao"
	"sqlserver/logic"
	"sqlserver/sqldao"
)

func init() {
	config.InitConfig()
	dao.InitMysql()
	sqldao.InitSqlserver()
}

func main() {
	cron := logic.Do()
	defer cron.Stop()

	// 死循环
	ch := make(chan int)
	<-ch
}

参考资料

[1] SQL server数据实时同步到mysql(一)
[1] SQL server数据实时同步到mysql(二)


版权声明:本文为qq_44701736原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。