ShardingJDBC分库分表
1.Maven 引用
< dependency>
< groupId> org.apache.shardingsphere</ groupId>
< artifactId> sharding-jdbc-spring-boot-starter</ artifactId>
< version> 4.1.1</ version>
</ dependency>
< dependency>
< groupId> org.springframework.boot</ groupId>
< artifactId> spring-boot-starter-data-jpa</ artifactId>
</ dependency>
< dependency>
< groupId> mysql</ groupId>
< artifactId> mysql-connector-java</ artifactId>
</ dependency>
数据库
*****_ch
*****_hk
*****_us
*****_olap
表格
kline
kline_D_0
kline_D_1
.......
kline_D_15
kline
kline_M_0
kline_M_1
.......
kline_M_15
kline_m1
kline_m1_250121
.......
kline_m1_2501221
kline_M5_0
.......
kline_M5_15
kline_M30_0
.......
kline_M30_15
kline_M60_0
.......
kline_M60_15
kline_W_0
.......
kline_W_15
kline_Y_0
.......
kline_Y_15
trade_record_240101
trade_record_250213_0
........
trade_record_250221_249
CREATE DEFINER = ` admin` @`%` PROCEDURE ` CreateKlineTables` ( )
BEGIN
DECLARE i INT DEFAULT 0 ;
DECLARE j INT DEFAULT 0 ;
DECLARE table_name VARCHAR ( 64 ) ;
DECLARE date_parts TEXT ;
DECLARE date_part VARCHAR ( 10 ) ;
SET date_parts = 'M5,M30,M60,D,W,M,Y' ;
WHILE j < LENGTH( date_parts) - LENGTH( REPLACE ( date_parts, ',' , '' ) ) + 1 DO
SET date_part = SUBSTRING_INDEX( SUBSTRING_INDEX( date_parts, ',' , j + 1 ) , ',' , - 1 ) ;
SET i = 0 ;
WHILE i < 16 DO
SET table_name = CONCAT( 'kline_' , date_part, '_' , i) ;
SET @sql = CONCAT( '
CREATE TABLE IF NOT EXISTS ' , table_name, ' LIKE kline' ) ;
PREPARE stmt FROM @sql ;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET i = i + 1 ;
END WHILE ;
SET j = j + 1 ;
END WHILE ;
END
CREATE DEFINER = ` admin` @`%` PROCEDURE ` CreateTradeRecordTables` ( IN date_part VARCHAR ( 10 ) )
BEGIN
DECLARE i INT DEFAULT 0 ;
DECLARE table_name VARCHAR ( 64 ) ;
WHILE i < 250 DO
SET table_name = CONCAT( 'trade_record_' , date_part, '_' , i) ;
SET @sql = CONCAT( '
CREATE TABLE IF NOT EXISTS ' , table_name, ' like trade_record_240101' ) ;
PREPARE stmt FROM @sql ;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET i = i + 1 ;
END WHILE ;
END
CREATE DEFINER = ` admin` @`%` PROCEDURE ` DropTradeRecordTables` ( IN date_part VARCHAR ( 10 ) )
BEGIN
DECLARE i INT DEFAULT 0 ;
DECLARE table_name VARCHAR ( 64 ) ;
WHILE i < 250 DO
SET table_name = CONCAT( 'trade_record_' , date_part, '_' , i) ;
SET @sql = CONCAT( 'DROP TABLE IF EXISTS ' , table_name) ;
PREPARE stmt FROM @sql ;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET i = i + 1 ;
END WHILE ;
END
2.application.yaml配置
spring :
port : 8888
tomcat :
uri-encoding : UTF- 8
max-http-post-size : 20MB
max-http-header-size : 20MB
http :
encoding :
force : true
charset : UTF- 8
enabled : true
aop :
auto : true
main :
allow-bean-definition-overriding : true
jpa :
database-platform : org.hibernate.dialect.MySQL5InnoDBDialect
show-sql : false
hibernate :
ddl-auto : none
dsx :
olap :
type : com.zaxxer.hikari.HikariDataSource
driverClassName : com.mysql.cj.jdbc.Driver
jdbcUrl :
username :
password :
hikari :
maximum-pool-size : 20
minimum-idle : 20
shardingsphere :
datasource :
names : center, ds0, ds1, ds2
center :
type : com.zaxxer.hikari.HikariDataSource
driverClassName : com.mysql.cj.jdbc.Driver
jdbcUrl :
username :
password :
hikari :
maximum-pool-size : 20
minimum-idle : 20
ds0 :
type : com.zaxxer.hikari.HikariDataSource
driverClassName : com.mysql.cj.jdbc.Driver
jdbcUrl :
username :
password :
hikari :
maximum-pool-size : 20
minimum-idle : 20
ds1 :
type : com.zaxxer.hikari.HikariDataSource
driverClassName : com.mysql.cj.jdbc.Driver
jdbcUrl :
username :
password :
hikari :
maximum-pool-size : 20
minimum-idle : 20
ds2 :
type : com.zaxxer.hikari.HikariDataSource
driverClassName : com.mysql.cj.jdbc.Driver
jdbcUrl :
username :
password :
hikari :
maximum-pool-size : 20
minimum-idle : 20
props :
sql :
show : false
sharding :
default-data-source-name : center
tables :
trade_record :
actual-data-nodes : ds$- > { 0..2} .trade_record_$- > { 0..10}
database-strategy :
standard :
sharding-column : market_code
precise-algorithm-class-name : com.zzc.sharding.DbShardingByMarketTypeAlgorithm
table-strategy :
complex :
sharding-columns : trade_date, symbol_id
algorithm-class-name : com.zzc.sharding.TableShardingByDateAndSymbolAlgorithm
kline_m1 :
actual-data-nodes : ds$- > { 0..2} .kline_m1
database-strategy :
standard :
sharding-column : market_code
precise-algorithm-class-name : com.zzc.sharding.DbShardingByMarketTypeAlgorithm
table-strategy :
complex :
sharding-columns : trade_date
algorithm-class-name : com.zzc.sharding.TableShardingByDateAlg
kline :
actual-data-nodes : ds$- > { 0..2} .kline_${ [ 'M5' , 'M30' , 'M60' , 'D' , 'W' , 'M' , 'Y' ] } _${ 0..15}
database-strategy :
standard :
sharding-column : market_code
precise-algorithm-class-name : com.zzc.sharding.DbShardingByMarketTypeAlgorithm
table-strategy :
complex :
sharding-columns : kline_type, symbol_id
algorithm-class-name : com.zzc.sharding.TableShardingByKlineTypeAndSymbolIdAlg
创建路由规则 DbShardingByMarketTypeAlgorithm
package com. zzc. sharding ;
import java. util. Collection ;
@Slf4j
public class DbShardingByMarketTypeAlgorithm implements PreciseShardingAlgorithm < String > {
private DatabaseShardingConfig config;
@Override
public String doSharding ( Collection < String > collection, PreciseShardingValue < String > preciseShardingValue) {
String marketType = preciseShardingValue. getValue ( ) ;
if ( config == null ) {
config = SpringContextUtil . getBean ( DatabaseShardingConfig . class ) ;
}
String dbName = config. getDbName ( marketType) ;
if ( ! collection. contains ( dbName) ) {
log. error ( "Database sharding error. column-value : [{}], DatabaseShardingConfig dbName : [{}], shardingsphere configs : [{}]" , marketType, dbName, collection) ;
throw new IllegalArgumentException ( "Database sharding error." ) ;
}
return dbName;
}
}
TableShardingByDateAndSymbolAlgorithm
package com. zzc. sharding ;
@Slf4j
public class TableShardingByDateAndSymbolAlgorithm implements ComplexKeysShardingAlgorithm {
private static final String FIELD_NAME_DATE = "trade_date" ;
private static final String FIELD_NAME_SYMBOL = "symbol_id" ;
private DatabaseShardingConfig config;
@Override
public Collection < String > doSharding ( Collection collection, ComplexKeysShardingValue complexKeysShardingValue) {
if ( config == null ) {
config = SpringContextUtil . getBean ( DatabaseShardingConfig . class ) ;
}
String date = ( ( List < String > ) complexKeysShardingValue. getColumnNameAndShardingValuesMap ( ) . get ( FIELD_NAME_DATE ) ) . get ( 0 ) ;
Long symbolId = ( ( List < Long > ) complexKeysShardingValue. getColumnNameAndShardingValuesMap ( ) . get ( FIELD_NAME_SYMBOL ) ) . get ( 0 ) ;
String logicTable = complexKeysShardingValue. getLogicTableName ( ) ;
DatabaseShardingConfig. TableShardingConfig shardingConfig = config. getTableShardingConfig ( logicTable) ;
return Collections . singletonList ( logicTable + "_" + date. substring ( 2 ) . replaceAll ( "-" , "" ) + "_" + symbolId % shardingConfig. getTableShardingNum ( ) ) ;
}
}
package com. zzc. sharding ;
@Slf4j
public class TableShardingByDateAlg implements ComplexKeysShardingAlgorithm {
@Override
public Collection < String > doSharding ( Collection collection, ComplexKeysShardingValue complexKeysShardingValue) {
String date = ( ( List < String > ) complexKeysShardingValue. getColumnNameAndShardingValuesMap ( ) . get ( "trade_date" ) ) . get ( 0 ) ;
String logicTable = complexKeysShardingValue. getLogicTableName ( ) ;
return Collections . singletonList ( logicTable
+ "_" + date. substring ( 2 ) . replaceAll ( "-" , "" ) ) ;
}
}
TableShardingByKlineTypeAndSymbolIdAlg
package com. zzc. sharding ;
@Slf4j
public class TableShardingByKlineTypeAndSymbolIdAlg implements ComplexKeysShardingAlgorithm {
private DatabaseShardingConfig config;
@Override
public Collection < String > doSharding ( Collection collection, ComplexKeysShardingValue complexKeysShardingValue) {
if ( config == null ) {
config = SpringContextUtil . getBean ( DatabaseShardingConfig . class ) ;
}
String klineType = ( ( List < String > ) complexKeysShardingValue. getColumnNameAndShardingValuesMap ( ) . get ( "kline_type" ) ) . get ( 0 ) ;
Long symbolId = ( ( List < Long > ) complexKeysShardingValue. getColumnNameAndShardingValuesMap ( ) . get ( "symbol_id" ) ) . get ( 0 ) ;
String logicTable = complexKeysShardingValue. getLogicTableName ( ) ;
DatabaseShardingConfig. TableShardingConfig shardingConfig = config. getTableShardingConfig ( logicTable) ;
log. warn ( "symbolId:{}" , symbolId) ;
log. warn ( "klineType:{}" , klineType) ;
log. warn ( "shardingConfig:{}" , shardingConfig) ;
return Collections . singletonList ( logicTable
+ "_" + klineType + "_" + symbolId % shardingConfig. getTableShardingNum ( ) ) ;
}
}
package com. zzc. service. schedule ;
@Slf4j
@Component
@RequiredArgsConstructor
public class QuotationDataManagementJob {
private final static int LOCK_WAIT_SECONDS = 10 ;
private final static int LOCK_LEASE_SECONDS = 30 * 60 ;
private final static String SHARDING_TABLE_CREATE_SQL = "CREATE TABLE IF NOT EXISTS %s LIKE %s;" ;
private final static String SHARDING_TABLE_CLEAR_SQL = "DROP TABLE IF EXISTS %s;" ;
private final static String DS_SHARDING = "shardingDataSource" ;
private final static String DS_OLAP = "olapDataSource" ;
private final DatabaseShardingConfig dbShardingConfig;
private final RedissonClient redissonClient;
private final DataSource shardingDataSource;
private final DataSource olapDataSource;
@Scheduled ( cron = "0 30 12 ? * FRI" )
public void createShardingTableJob ( ) {
RLock lock = redissonClient. getLock ( LOCK_CREATE_SHARDING_TABLE ) ;
RedisLockUtils . lockExecute ( lock, LOCK_WAIT_SECONDS , LOCK_LEASE_SECONDS , TimeUnit . SECONDS , ( ) -> {
dbShardingConfig. getTables ( ) . forEach ( ( tableName, config) -> {
if ( config. getRunCreateJob ( ) )
createShardingTable ( tableName, config) ;
} ) ;
return null ;
} ) ;
log. info ( "createShardingTable job done" ) ;
}
@Scheduled ( cron = "0 0 10 * * ?" )
public void clearShardingTableJob ( ) {
RLock lock = redissonClient. getLock ( LOCK_CLEAR_SHARDING_TABLE ) ;
RedisLockUtils . lockExecute ( lock, LOCK_WAIT_SECONDS , LOCK_LEASE_SECONDS , TimeUnit . SECONDS , ( ) -> {
dbShardingConfig. getTables ( ) . forEach ( ( tableName, config) -> {
clearShardingTable ( tableName, config) ;
} ) ;
return null ;
} ) ;
log. info ( "clearShardingTable job done" ) ;
}
private void createShardingTable ( String tableName, DatabaseShardingConfig. TableShardingConfig config) {
if ( DS_OLAP . equals ( config. getDs ( ) ) ) {
try {
Connection connection = olapDataSource. getConnection ( ) ;
List < String > nextWeekWorkDays = getNextWeekWorkDays ( ) ;
nextWeekWorkDays. forEach ( day -> {
createShardingTable ( "olap" , connection, tableName, day, config) ;
} ) ;
} catch ( Throwable t) {
log. error ( "createShardingTable error. db : [olap] tableName : [{}]" , tableName, t) ;
}
} else {
( ( ShardingDataSource ) shardingDataSource) . getDataSourceMap ( ) . forEach ( ( dbName, myDataSource) -> {
if ( dbName. equals ( dbShardingConfig. getCenterDs ( ) ) ) {
return ;
}
try {
Connection connection = myDataSource. getConnection ( ) ;
List < String > nextWeekWorkDays = getNextWeekWorkDays ( ) ;
nextWeekWorkDays. forEach ( day -> {
createShardingTable ( dbName, connection, tableName, day, config) ;
} ) ;
} catch ( Throwable t) {
log. error ( "createShardingTable error. db : [{}] tableName : [{}]" , dbName, tableName, t) ;
}
} ) ;
}
}
private void createShardingTable ( String dbName, Connection connection, String tableName, String day, DatabaseShardingConfig. TableShardingConfig config) {
DatabaseShardingConfig. TableShardingConfig tableShardingConfig = dbShardingConfig. getTableShardingConfig ( tableName) ;
if ( config. getTableShardingNum ( ) > 1 ) {
for ( int i = 0 ; i < tableShardingConfig. getTableShardingNum ( ) ; i++ ) {
String realTableName = tableName + "_" + day. substring ( 2 ) + "_" + i;
try {
String sql = String . format ( SHARDING_TABLE_CREATE_SQL , realTableName, tableShardingConfig. getTemplateTable ( ) ) ;
connection. createStatement ( ) . execute ( sql) ;
log. info ( "createShardingTable success. db : [{}] tableName : [{}], realTableName : [{}], sql : [{}]" , dbName, tableName, realTableName, sql) ;
} catch ( Throwable t) {
log. error ( "createShardingTable error. db : [{}] tableName : [{}], realTableName : [{}]" , dbName, tableName, realTableName, t) ;
}
}
} else {
String realTableName = tableName + "_" + day. substring ( 2 ) ;
try {
String sql = String . format ( SHARDING_TABLE_CREATE_SQL , realTableName, tableShardingConfig. getTemplateTable ( ) ) ;
connection. createStatement ( ) . execute ( sql) ;
log. info ( "createShardingTable success. db : [{}] tableName : [{}], realTableName : [{}], sql : [{}]" , dbName, tableName, realTableName, sql) ;
} catch ( Throwable t) {
log. error ( "createShardingTable error. db : [{}] tableName : [{}], realTableName : [{}]" , dbName, tableName, realTableName, t) ;
}
}
}
private List < String > getNextWeekWorkDays ( ) {
LocalDate today = LocalDate . now ( ) ;
LocalDate nextMonday = today. with ( TemporalAdjusters . next ( DayOfWeek . MONDAY ) ) ;
List < String > workDays = new ArrayList < > ( ) ;
DateTimeFormatter formatter = DateTimeFormatter . ofPattern ( DateUtils . YYYYMMDD ) ;
for ( int i = 0 ; i < 5 ; i++ ) {
LocalDate date = nextMonday. plusDays ( i) ;
workDays. add ( date. format ( formatter) ) ;
}
return workDays;
}
private void clearShardingTable ( String tableName, DatabaseShardingConfig. TableShardingConfig config) {
if ( DS_OLAP . equals ( config. getDs ( ) ) ) {
try {
Connection connection = olapDataSource. getConnection ( ) ;
List < String > nextWeekWorkDays = getToBeClearDays ( tableName) ;
nextWeekWorkDays. forEach ( day -> {
clearShardingTable ( "olap" , connection, tableName, day, config) ;
} ) ;
} catch ( Throwable t) {
log. error ( "clearShardingTable error. db : [olap] tableName : [{}]" , tableName, t) ;
}
} else {
( ( ShardingDataSource ) shardingDataSource) . getDataSourceMap ( ) . forEach ( ( dbName, myDataSource) -> {
if ( dbName. equals ( dbShardingConfig. getCenterDs ( ) ) ) {
return ;
}
try {
Connection connection = myDataSource. getConnection ( ) ;
List < String > nextWeekWorkDays = getToBeClearDays ( tableName) ;
nextWeekWorkDays. forEach ( day -> {
clearShardingTable ( dbName, connection, tableName, day, config) ;
} ) ;
} catch ( Throwable t) {
log. error ( "clearShardingTable error. db : [{}] tableName : [{}]" , dbName, tableName, t) ;
}
} ) ;
}
}
private void clearShardingTable ( String dbName, Connection connection, String tableName, String day, DatabaseShardingConfig. TableShardingConfig config) {
if ( config. getTableShardingNum ( ) > 1 ) {
for ( int i = 0 ; i < config. getTableShardingNum ( ) ; i++ ) {
String realTableName = tableName + "_" + day. substring ( 2 ) + "_" + i;
try {
String sql = String . format ( SHARDING_TABLE_CLEAR_SQL , realTableName) ;
connection. createStatement ( ) . execute ( sql) ;
log. info ( "clearShardingTable success. db : [{}] tableName : [{}], realTableName : [{}], sql : [{}]" , dbName, tableName, realTableName, sql) ;
} catch ( Throwable t) {
log. error ( "clearShardingTable error. db : [{}] tableName : [{}], realTableName : [{}]" , dbName, tableName, realTableName, t) ;
}
}
} else {
String realTableName = tableName + "_" + day. substring ( 2 ) ;
try {
String sql = String . format ( SHARDING_TABLE_CLEAR_SQL , realTableName) ;
connection. createStatement ( ) . execute ( sql) ;
log. info ( "clearShardingTable success. db : [{}] tableName : [{}], realTableName : [{}], sql : [{}]" , dbName, tableName, realTableName, sql) ;
} catch ( Throwable t) {
log. error ( "clearShardingTable error. db : [{}] tableName : [{}], realTableName : [{}]" , dbName, tableName, realTableName, t) ;
}
}
}
private List < String > getToBeClearDays ( String tableName) {
List < String > days = new ArrayList < > ( ) ;
DatabaseShardingConfig. TableShardingConfig tableShardingConfig = dbShardingConfig. getTableShardingConfig ( tableName) ;
LocalDate today = LocalDate . now ( ) ;
LocalDate startDay = today. minusDays ( tableShardingConfig. getClearOffset ( ) ) ;
LocalDate endDay = today. minusDays ( tableShardingConfig. getKeepDays ( ) ) ;
DateTimeFormatter formatter = DateTimeFormatter . ofPattern ( DateUtils . YYYYMMDD ) ;
for ( LocalDate date = startDay; date. isBefore ( endDay) ; date = date. plusDays ( 1 ) ) {
days. add ( date. format ( formatter) ) ;
}
return days;
}
}
package com. zzc. service. config ;
@Data
@Slf4j
@RefreshScope
@Configuration
@ConfigurationProperties ( prefix = "refinitiv.api-service.db-sharding" )
@PropertySource ( value = "classpath:guda-refinitiv-api-db-sharding.yaml" , factory = YamlPropertySourceFactory . class )
public class DatabaseShardingConfig {
private String centerDs;
private Map < String , TableShardingConfig > tables;
private Map < String , String > marketConfigs;
@Setter ( AccessLevel . PRIVATE )
private Map < String , String > dbMap;
@PostConstruct
public void init ( ) {
if ( marketConfigs == null || marketConfigs. isEmpty ( ) ) {
throw new RuntimeException ( "DatabaseShardingConfig error. configs is empty" ) ;
}
Map < String , String > tmp = new HashMap < > ( ) ;
marketConfigs. forEach ( ( dbName, markets) -> {
for ( String market : markets. split ( "," ) ) {
tmp. put ( market. trim ( ) , dbName) ;
}
} ) ;
dbMap = tmp;
log. info ( "DatabaseShardingConfig init success. config: [{}]" , this ) ;
}
public String getDbName ( String market) {
return dbMap. get ( market) ;
}
public TableShardingConfig getTableShardingConfig ( String tableName) {
return tables. get ( tableName) ;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class TableShardingConfig {
private String templateTable;
private int tableShardingNum;
private int keepDays;
private int clearOffset;
private String ds;
private Boolean runCreateJob = true ;
}
}
refinitiv.api-service :
db-sharding :
centerDs : 'center'
tables :
trade_record :
templateTable : 'trade_record_240101'
tableShardingNum : 250
keepDays : 7
clearOffset : 15
ds : 'shardingDataSource'
olap_quotation_snapshot :
templateTable : 'olap_quotation_snapshot_240101'
tableShardingNum : 1
keepDays : 30
clearOffset : 40
ds : 'olapDataSource'
kline_m1 :
templateTable : 'kline_m1'
tableShardingNum : 1
keepDays : 30
clearOffset : 40
ds : 'shardingDataSource'
kline :
templateTable : 'kline'
tableShardingNum : 16
keepDays : 30
clearOffset : 40
ds : 'shardingDataSource'
runCreateJob : false
marketConfigs :
ds1 : 'HK, HK_WRNT, HK_BONDA, HK_TRUST'
ds0 : 'US, US_PINK, US_OPTION, SH, SZ, SZ_INDEX, SZ_FUND, SZ_GEM, US_ETF'