Job Search Engine
967 浏览 5 years, 8 months
2.16 用户触发数据爬虫定制 (3) - 触发记录的管理
版权声明: 转载请注明出处 http://www.codingsoho.com/用户触发数据爬虫定制 (3) - 触发记录的管理
这一节处理ajax触发的后台更新
数据库的设计如下
job_entry是爬取下来工作的记录,每个触发的记录会有一条记录,它和job_entry是m2m的关系
trigger_scrap_record
- user
- href
- keywords
- last_updated_nums
- created
- updated
数据库
爬虫端
trigger_scrap_record_job_entry 和 job_entry 是m2m关系。对于django来说都封装好了,实现起来也比较简单。但是目前,我的操作是用的SQL原生语言,折腾了半天,太浪费时间,所以最后选用了sqlalchemy库
数据定义如下
from sqlalchemy import Table, Column, String, Text, DateTime, Integer, Boolean, ForeignKey, create_engine
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
import datetime
Base = declarative_base() # generate orm base class
# classic style
job_entry_m2m_trigger_scrap_record = Table(
'job_entry_m2m_trigger_scrap_record',
Base.metadata,
Column('id', Integer, primary_key=True),
Column('job_entry_id', Integer, ForeignKey('job_entry.id')),
Column('trigger_scrap_record_id', Integer, ForeignKey('trigger_scrap_record.id')),)
# ORM style
class JobEntry(Base):
__tablename__='job_entry'
id = Column(Integer, nullable=False, primary_key=True, autoincrement=True)
title = Column(String(200))
salary = Column(String(45))
region = Column(String(45))
degree = Column(String(45))
experience = Column(String(45))
company = Column(String(145))
industry = Column(String(145))
description = Column(Text(300))
href = Column(String(100))
# created = Column(DateTime, default=datetime.datetime.utcnow)
# updated = Column(DateTime, default=datetime.datetime.utcnow)
created = Column(DateTime, default=datetime.datetime.now)
updated = Column(DateTime, default=datetime.datetime.now)
trigger_scrap_records = relationship('TriggerScrapRecord',
secondary = job_entry_m2m_trigger_scrap_record,
# backref = 'job_entries',
backref=('job_entries', {'lazy':'dynamic'}),
# cascade="all, delete-orphan", # delete-orphan cascade is not supported on a many-to-many or many-to-one relationship when single_parent is not set.
lazy='dynamic',
passive_deletes=True)
def __repr__(self):
return self.title
class TriggerScrapRecord(Base):
__tablename__='trigger_scrap_record'
id = Column(Integer, nullable=False, primary_key=True, autoincrement=True)
keyword = Column(String(200))
username = Column(String(45))
email = Column(String(45))
href = Column(Text(500))
active = Column(Boolean, default=False)
last_triggered = Column(DateTime, default=datetime.datetime.now)
last_complete = Column(DateTime, default=datetime.datetime.now)
def __repr__(self):
return self.keyword
注意:relationship里的backref添加了lazy为dynamic backref=('job_entries', {'lazy':'dynamic'}),
,否则报错
AttributeError: 'InstrumentedList' object has no attribute 'filter'
- https://stackoverflow.com/questions/11578070/sqlalchemy-instrumentedlist-object-has-no-attribute-filter
- https://blog.csdn.net/weixin_40161254/article/details/82689372
- https://stackoverflow.com/questions/7075828/make-sqlalchemy-use-date-in-filter-using-postgresql
APP端
对应在django中的定义如下,
class TriggerScrapRecord(models.Model):
keyword = models.CharField(_("keyword"), max_length=200, null=False, blank=False)
username = models.CharField(_("username"), max_length=45,null=True, blank=False)
email = models.CharField(_("mail"), max_length=45,null=True, blank=False)
href = models.TextField(_("href"), max_length=500,null=True, blank=True)
active = models.BooleanField(_("active"), default=False)
last_triggered = models.DateTimeField(_("last triggered"))
last_complete = models.DateTimeField(_("last completed"))
class Meta:
db_table = 'trigger_scrap_record'
def __unicode__(self):
return self.keyword
在JobEntry里添加m2m关系,开始的时候,我想简单一点,直接指定中间table名
trigger_scrap_records = models.ManyToManyField(
'TriggerScrapRecord',
db_table='job_entry_m2m_trigger_scrap_record',
related_name='job_entries',
)
但是有以下报错
(1054, "Unknown column 'job_entry_m2m_trigger_scrap_record.triggerscraprecord_id' in 'where clause'")
原因应该是它会根据数据名自动指定关联表的字段名,而且这个没法配置,理论上我可以修改表的字段名以匹配,但是这样太过死板,最后我还是添加了这个中间对象
class JobEntryM2MTriggerScrapRecord(models.Model):
job_entry = models.ForeignKey('JobEntry')
trigger_scrap_record = models.ForeignKey('TriggerScrapRecord')
class Meta:
db_table = 'job_entry_m2m_trigger_scrap_record'
并且修改关联字段如下
trigger_scrap_records = models.ManyToManyField(
'TriggerScrapRecord',
through=JobEntryM2MTriggerScrapRecord,
related_name='job_entries',
)
后台配置
class TriggerScrapRecordAdmin(admin.ModelAdmin):
list_display = [
'keyword',
'username',
'active',
'last_triggered',
'last_complete',
]
search_fields = [
'keyword',
'username',
]
list_filter = [
'username',
'active',
'last_triggered',
'last_complete',
]
class Meta:
model = TriggerScrapRecord
admin.site.register(TriggerScrapRecord, TriggerScrapRecordAdmin)
数据表的创建
数据表的创建在爬虫应用程序实现
class Singleton(type):
def __init__(cls,name,bases,dic):
super(Singleton,cls).__init__(name,bases,dic)
cls.instance = None
def __call__(cls,*args,**kwargs):
if cls.instance is None:
cls.instance = super(Singleton,cls).__call__(*args,**kwargs)
return cls.instance
class SQLAlchemyHelper(object):
__metaclass__ = Singleton
def __init__(self,host='[127.0.0.1](127.0.0.1)',user='root',password='123', db='bestjob', encoding="utf-8", echo=True, max_overflow=5):
'''
pymysql:
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
MySQL-Python:
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
'''
self.engine = create_engine('mysql+mysqldb://{}:{}@{}:{}/{}'.format(user, password, host, 3306, db),connect_args={'charset':'utf8'}, encoding="utf-8", echo=True)
Session = sessionmaker(bind=self.engine)
self.session = Session()
self.session.execute('show databases')
Base.metadata.create_all(self.engine) # create table
tips: python import 上级目录
有时候我们可能需要import另一个路径下的python文件,例如下面这个目录结构,我们想要在_train.py里import在networks目录下的_lstm.py和上级目录下的_config.py。
_config.py
networks
_lstm.py
_cnn.py
pipelines
_train.py
只需两步操作
(1)在networks文件夹下创建空的__init__.py文件
_config.py
networks
__init__.py
_lstm.py
_cnn.py
pipelines
_train.py
(2)使用sys库添加路径
import sys
sys.path.append("..")
from networks._lstm import *
from _config import *
tips: 地址编码
- https://stackoverflow.com/questions/4116940/encode-url-in-django-template
- python中的urlencode与urldecode
- https://docs.djangoproject.com/en/2.1/ref/models/fields
同步命令
发送同步命令后,爬虫程序开始爬虫,并且返回命令反馈。爬虫的过程中不能连续发送,需等前一个命令结束
修改一下
jse.views.py
def build_liepiin_search_link(keywords_dict):
base_url = "[https://www.liepin.com/zhaopin/](https://www.liepin.com/zhaopin/)"
fields = [
"isAnalysis",
"dqs",
"pubTime",
"salary",
"subIndustry",
"industryType",
"compscale",
"key",
"init",
"searchType",
"headckid",
# "flushckid", # MUST not set, otherwise alway show page 1 content
"compkind",
"fromSearchBtn",
"sortFlag",
"ckid",
"jobKind",
"industries",
"clean_condition",
"siTag",
"d_sfrom",
"d_ckId",
# "degradeFlag",
"d_pageSize",
"d_curPage",
"d_headId",
]
fields_selected = [
"key",
"industryType",
"industries",
"jobKind",
"salary",
"compscale",
]
query_list = [field + "=" + keywords_dict.get(field, "") for field in fields_selected]
url_query='&'.join(query_list)
query_list = [field + "=" + keywords_dict.get(field, "") for field in fields]
url='&'.join(query_list)
url = base_url + "?" + url
# url += ("curPage" + "0")
return url, url_query
几处修改
- 移除
curPage
,这个在爬虫翻页时添加 - 结合列表表达式和
join
函数来生成url,优化代码,并去除url最后的&
符号 - 优化
fields
,添加了一下field,并移除flushckid
,有这个变量在,不管怎么翻页,永远显示第一页内容。这个字段估计得长期维护,或者后面通过selenium控制页面来完成。
前端的修改比较少,修改一下传递的变量
data : {
href: $('#href_id').val(),
keywords: $('#keywords_id').val(),
action:"register_job_scrab",
username:"{{request.user.username}}",
email:$('input[name="mail"]').val()
},
这几个添加变量将会给爬虫后台处理使用
- username , keyword 用于identity唯一记录
- email 用于定期爬虫时,向指定邮箱发送更新
大部分的工作会在爬虫后台完成
查询命令
前台不需要修改,这个会在爬虫后台完成,它根据查询的命令的记录,返回最后一次查询的时间
显示触发过的记录
jse.views.py
class JobScrapRegisterView(LoginRequiredMixin, FormView):
form_class = JobScrapRegisterForm
template_name = 'jse/job_scrap_register_form.html'
def get_context_data(self, *args, **kwargs):
context = super(JobScrapRegisterView,self).get_context_data(*args, **kwargs)
context['related'] = TriggerScrapRecord.objects.filter(username=self.request.user.username)
return context
job_scrap_register_form.html
<h2>{% trans "Related Records:" %}</h2>
<ul>
{% for obj in related %}
<!-- <li><a href="{% url 'job_entry_list' %}?{% filter urlencode %}keyword={{obj.keyword}} username={{obj.username}}{% endfilter %}">{{obj.username}} : {{obj.last_complete}}</a></li> -->
<li><a href="{% url 'job_entry_list' %}?keyword={{obj.keyword|my_quote}}&username={{obj.username}}">{{obj.username}} : {{obj.last_complete}}</a> | <a href="{{obj.href}}">{% trans 'href' %}</a></li>
{% endfor %}
</ul>
注意: keyword里面本身包含了key=?这样的字样,如果直接放到地址栏会引起混乱,所以需要对它进行编码
没有去细究有没有现成的,我自己写了一个
# for dict
@register.filter(name='my_urlencode')
def my_urlencode(data):
from urllib import urlencode
return urlencode(data)
# for string
@register.filter(name='my_quote')
def my_quote(data):
from urllib import quote
return quote(data)
下面是后台处理程序, 显示指定查询的结果
views.py
class JobEntryListView(PageMixin, ListView):
model = JobEntry
filter_class = JobEntryFilter
def get_context_data(self, *args, **kwargs):
object_list= self.filter_class(self.request.GET, self.get_queryset()).qs
if self.request.GET.get('keyword', None) and self.request.GET.get('username', None):
keyword = self.request.GET['keyword']
username = self.request.GET['username']
from urllib import unquote, quote
objs = TriggerScrapRecord.objects.filter(username=self.request.user.username)
for _ in objs:
if keyword.lower() == _.keyword.lower():
object_list = _.job_entries.all()
break
kwargs.update({'object_list':object_list}) # without this, Paginator will get full object_list for handle
context = super(JobEntryListView, self).get_context_data(*args, **kwargs)
...
在filter_class
处理完成之后,又再次过滤了一下,当然,也可以放到filter_class里完成
本来还想用unquote
函数解析GET里的keyword变量,但是实际上keyword = self.request.GET['keyword']
返回的已经是解析过的了。可以直接跟数据库比较
原始
key=%E8%8A%AF%E7%89%87&industryType=industry_01&industries=030&jobKind=1&salary=50$100&compscale=020
编码
key%3D%25E8%258A%25AF%25E7%2589%2587%26industryType%3Dindustry_01%26industries%3D030%26jobKind%3D1%26salary%3D50%24100%26compscale%3D020
将触发页添加到导航条
顺便添加了admin到导航条
<ul class="nav navbar-nav navbar-right">
<li><a href="{% url 'job_scrap_register' %}">Scrap</a></li>
<li><a href="{% url 'admin:index' %}">Admin</a></li>
爬虫后台概述
数据库
这个前面已经有了介绍
AJAX通信
本来是想用Flask实现的,无奈cors的问题,搞了半天,还是放弃继续用django,这个之前弄过
from comm.views import ajax
urlpatterns = [
url(r'^$', ajax, name="ajax"),
]
首先它收到ajax命令后根据已知信息构建TriggerScrapRecord相关的数据结构,然后判断是那个指定,
- 如果查询,那么查询表中是否指定关键字已存在,并且返回最近更新时间,否则返回无记录
- 如果同步,需要判断是否该指定正在同步
- 如果是,那么返回“稍等”
- 如果不是,则返回“命令确认”,并启动新的线程去爬取数据
def ajax(request):
response_data = {}
response_data['result'] = 'failed'
response_data['message'] = 'invalid input'
if request.method.lower() == "get":
action = request.GET.get("action", None)
p_data = {} # database
p_data['keyword'] = request.GET.get("keywords", None)
p_data['username'] = request.GET.get("username", None)
p_data['href'] = request.GET.get("href", None)
p_data['email'] = request.GET.get("email", None)
p_data['active'] = 0
p_data['last_triggered'] = datetime.datetime.now()
p_data['last_complete'] = datetime.datetime.now()
print(p_data)
if "query_scrab_status" == action:
helper = SQLAlchemyHelper(host=sec_settings.DB_HOST,user=sec_settings.DB_USER,password=sec_settings.DB_PWD, db=sec_settings.DB_NAME)
filter_dic = {'keyword': p_data.get('keyword', None), 'username': p_data.get('username', None)}
objs = helper.session.query(TriggerScrapRecord).filter_by(**filter_dic)
response_data['result'] = 'success'
response_data['message'] = 'last sync @ {}'.format(objs.first().last_complete) if objs.count() else "No Records"
elif "register_job_scrab" == action:
bs = JobBs(username=p_data['username'], keyword=p_data['keyword'], host=sec_settings.DB_HOST,user=sec_settings.DB_USER,password=sec_settings.DB_PWD, db=sec_settings.DB_NAME)
filter_dic = {'keyword': p_data.get('keyword', None), 'username': p_data.get('username', None)}
objs = bs.helper.session.query(TriggerScrapRecord).filter_by(**filter_dic)
if objs.count() and objs.first().active:
response_data['result'] = 'fail'
response_data['message'] = 'scrap ongoing, wait a minute'
else:
response_data['result'] = 'success'
response_data['message'] = 'registration confirmed'
t2 = threading.Thread(target=func_sync, args=(p_data.copy(),))
t2.start()
return HttpResponse(json.dumps(response_data), content_type="application/json")
线程如下
- 创建爬虫封装类 JobBs,参数除了数据库连接的相关参数,还包括
username
和keyword
,这个是TriggerScrapRecord的unique key。 - 查询该记录是否存在
- 如果存在,判断是否爬虫正进行
- 如果是,则返回,什么也不做
- 否则,设置active=1
- 不存在,则插入一条新纪录, active=1,将该记录保存到数据库
- 如果存在,判断是否爬虫正进行
- 执行爬虫
- 更新active=0, last_complete=当前时间,保存数据库
def func_sync(*args):
p_data = args[0]
filter_dic = {'keyword': p_data.get('keyword', None), 'username': p_data.get('username', None)}
bs = JobBs(username=p_data['username'], keyword=p_data['keyword'], host=sec_settings.DB_HOST,user=sec_settings.DB_USER,password=sec_settings.DB_PWD, db=sec_settings.DB_NAME)
objs = bs.helper.session.query(TriggerScrapRecord).filter_by(**filter_dic)
# New record or exist?
if objs.count():
# Query ongoing
obj = bs.helper.session.query(TriggerScrapRecord).filter_by(**filter_dic).first()
if obj.active:
return
# New query
else:
obj.active = 1
else:
# Insert new
p_data['active'] = 1
obj = TriggerScrapRecord(**p_data)
bs.helper.session.add(obj)
bs.helper.session.commit()
# do crawl ...................
url_init = p_data['href']
obj = bs.helper.session.query(TriggerScrapRecord).filter_by(**filter_dic).first()
try:
bs.crawler_data(url_init)
except:
obj.active = 0
# update db
obj.active = 0
obj.last_complete = datetime.datetime.now()
bs.helper.session.add(obj)
bs.helper.session.commit()
这一部分也尝试用 MySQLdb原生命令完成。
..........