Golang 高并发问题的解决
Golang在高并发问题上,由于协程的使用,相对于其他编程语言,已经有了很大的优势,即相同的配置上,Golang可以以更低的代价处理更多的线程,同样的线程数,占用更低的资源!及时这样,只是解决了一部分问题而已,因为在每个协程里,处理逻辑还是会有问题。
高并发时,还是要考虑服务器所能承受的最大压力,数据库读取时的io问题,连接数问题,带宽问题等等 研究了一下并发解决方案,在此记录一下
参考文章:Handling 1 Million Requests per Minute with Go
地址:http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/ 代码如下:
//任务的请求
type MtaskRequest struct {
Ceshi int
// [redacted]
}
//job队列+work池
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// Job represents the job to be run
type Job struct {
MtaskRequest MtaskRequest
}
// A buffered channel that we can send work requests on.
// var JobQueue chan Job ---这样申明会卡主,没有初始化
var JobQueue = make(chan Job)
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
maxWorkers int
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool, maxWorkers: maxWorkers}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case <-w.JobChannel:
time.Sleep(5 * time.Second)
// we have received a work request.
fmt.Println("调起worker")
case <-w.quit:
// we have received a signal to stop
return
//不能写default
}
}
}()
}
func (d *Dispatcher) Run() {
//启动一定数量的worker
fmt.Println("启动一定数量的worker")
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
}
go d.dispatch()
}
//分派任务
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue: //接收一个job请求
fmt.Println("JobQueue 收到请求")
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
//接收到红包数据
func (this *TaskRedbao) UserGetRedbao(red_id, uid, shop_id, rand_arr, Amoney string) error {
fmt.Println("收到 接收到红包数据 http请求")
mtaskRequest := MtaskRequest{67}
work := Job{MtaskRequest: mtaskRequest}
JobQueue <- work
return nil
}